From cccc3f4dc564929f0e164f838d2228e5d2bcf7b6 Mon Sep 17 00:00:00 2001 From: zuckerberg <5-zuckerberg@users.noreply.git.neet.dev> Date: Mon, 31 May 2021 16:54:39 -0400 Subject: [PATCH] song transitioning PoC --- downloader.py | 85 +++++++++++++++++++++++++++------------------- logger.py | 46 ++++++++++++------------- radio.py | 45 ++++++++++++++---------- setup.py | 2 +- stream_listener.py | 49 ++++++++++++++++++++++++++ transcoder.py | 29 ++++++++++++++++ uploader.py | 42 +++++++++++++++++++++++ util.py | 24 +++++++++++++ 8 files changed, 246 insertions(+), 76 deletions(-) create mode 100644 stream_listener.py create mode 100644 transcoder.py create mode 100644 uploader.py create mode 100644 util.py diff --git a/downloader.py b/downloader.py index 1326f5e..27eb26a 100644 --- a/downloader.py +++ b/downloader.py @@ -12,6 +12,8 @@ import os import pip import signal from logger import logger +from threading import Thread, main_thread +from time import sleep def updateYoutubeDL(): pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl']) @@ -23,40 +25,55 @@ dirpath = tempfile.mkdtemp() sys.path.append(dirpath) updateYoutubeDL() -def executeYoutubeDL(url, cb): - env = dict(os.environ) - env["PYTHONPATH"] = dirpath - cmd = [ - sys.executable, - dirpath + "/bin/youtube-dl", - "-o", "-", - "-f", "bestaudio/best", - # "--audio-format", "mp3", "-x", # cannot do because it requires a tmp file to re-encode - "--prefer-ffmpeg", - "--no-mark-watched", - "--geo-bypass", - "--no-playlist", - "--retries", "100", - "--no-call-home", - url - ] - popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid) +class Downloader(Thread): - # monitor the stdout and send to callback, if result from callback function is true, - # then kill the download process - BUFFER_SIZE = 8096 - logger.add(popen.stderr, "youtube-dl.log") - for chunk in iter(lambda: popen.stdout.read(BUFFER_SIZE), b''): - if cb(chunk): - os.killpg(os.getpgid(popen.pid), signal.SIGTERM) - break - popen.stdout.close() - popen.wait() + def __init__(self, url, cb): + Thread.__init__(self) -def download(url, cb): - # update youtube-dl - # TODO: do this only every once in a while - # updateYoutubeDL() + # update youtube-dl + # TODO: do this only every once in a while + # updateYoutubeDL() - # start downloader so that it's stdout (with fragments) may be captured - executeYoutubeDL(url, cb) + self.cb = cb + self.exit = False + + env = dict(os.environ) + env["PYTHONPATH"] = dirpath + cmd = [ + sys.executable, + dirpath + "/bin/youtube-dl", + "-o", "-", + "-f", "bestaudio/best", + # "--audio-format", "mp3", "-x", # cannot do because it requires a tmp file to re-encode + "--prefer-ffmpeg", + "--no-mark-watched", + "--geo-bypass", + "--no-playlist", + "--retries", "100", + "--no-call-home", + url + ] + self.popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid) + logger.add(self.popen.stderr, "youtube-dl.log") + self.start() + + def isAlive(self): + return self.popen.poll() is None + + def stop(self): + if self.isAlive(): + os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) + self.popen.stdout.close() + self.popen.wait() + self.exit = True + + # checks to see if the current download has finished + # if yes, cleans up and fires callback + def run(self): + while main_thread().is_alive() and not self.exit: + if not self.isAlive(): + self.cb() + sleep(0.1) + + def getStream(self): + return self.popen.stdout \ No newline at end of file diff --git a/logger.py b/logger.py index 5055743..97fffcb 100644 --- a/logger.py +++ b/logger.py @@ -1,47 +1,47 @@ import sys -import fcntl -import os from subprocess import PIPE, Popen -from threading import Thread, main_thread +from threading import Thread, main_thread, Lock from queue import Queue, Empty from time import sleep from pathlib import Path +from util import non_block_read READ_SIZE = 8096 LOG_DIR = "logs/" -def non_block_read(output): - fd = output.fileno() - fl = fcntl.fcntl(fd, fcntl.F_GETFL) - fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) - try: - return output.read() - except: - return b'' - class Logger(Thread): def __init__(self): Thread.__init__(self) self.streams = dict() self.files = dict() + self.mutex = Lock() Path(LOG_DIR).mkdir(exist_ok=True) def add(self, stream, filename): - self.files[filename] = open(LOG_DIR + filename, "wb") - self.streams[filename] = stream + self.mutex.acquire() + try: + if not filename in self.files.keys(): + self.files[filename] = open(LOG_DIR + filename, "ab") + self.streams[filename] = stream + finally: + self.mutex.release() def run(self): while main_thread().is_alive(): - for filename, stream in self.streams.items(): - f = self.files[filename] - while True: - output = non_block_read(stream) - if output == None or output == b'': - break - print(output.decode('ascii')) - f.write(output) - f.flush() + self.mutex.acquire() + try: + for filename, stream in self.streams.items(): + f = self.files[filename] + while True: + output = non_block_read(stream) + if output == None or output == b'': + break + print(output.decode('ascii')) + f.write(output) + f.flush() + finally: + self.mutex.release() sleep(0.1) diff --git a/radio.py b/radio.py index b61c23b..aa8384c 100644 --- a/radio.py +++ b/radio.py @@ -1,26 +1,35 @@ import downloader -import ffmpeg -import sys -from logger import logger +import uploader +import transcoder +from time import sleep -def run(): - process = ( ffmpeg - .input('pipe:', re=None) - .output("icecast://source:hackme@localhost:8000/stream.mp3", format='mp3', content_type="audio/mpeg") - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) - ) +def cb(): + global d + global t + print("----------------FINISHED-----------") + t.listener.blockUntilEmpty() + u.listener.blockUntilEmpty() + d.stop() + t.stop() + d = downloader.Downloader('https://www.youtube.com/watch?v=BQQ3qZ9FC70', cb) + t = transcoder.Transcoder(d) + u.setUpstream(t) - logger.add(process.stdout, "ffmpeg.out.log") - logger.add(process.stderr, "ffmpeg.err.log") +u = uploader.Uploader() +d = downloader.Downloader('https://www.youtube.com/watch?v=kgBcg4uBd9Q', cb) +t = transcoder.Transcoder(d) +u.setUpstream(t) - def cb(chunk): - process.stdin.write(chunk) - return False - - downloader.download('https://www.youtube.com/watch?v=BaW_jenozKc', cb) - downloader.download('https://www.youtube.com/watch?v=EbnH3VHzhu8', cb) +#def run(): + # up = uploader.Uploader() + # t = transcoder.Transcoder(up) + # downloader.download('https://www.youtube.com/watch?v=BaW_jenozKc', t.callback) + # t = transcoder.Transcoder(up) + # downloader.download('https://www.youtube.com/watch?v=EbnH3VHzhu8', t.callback) # downloader.download('https://www.youtube.com/watch?v=kgBcg4uBd9Q', cb) # downloader.download('https://www.youtube.com/watch?v=EbnH3VHzhu8', cb) if __name__ == "__main__": - run() \ No newline at end of file + #run() + while True: + sleep(1) \ No newline at end of file diff --git a/setup.py b/setup.py index 7e77da3..dc96392 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ requires = ["pip","ffmpeg-python"] setup( name='radio', version='0.1', - py_modules=['radio','downloader','logger'], + py_modules=['radio','downloader','uploader','logger','util'], entry_points={ 'console_scripts': ['radio = radio:run'] }, diff --git a/stream_listener.py b/stream_listener.py new file mode 100644 index 0000000..0da6d5e --- /dev/null +++ b/stream_listener.py @@ -0,0 +1,49 @@ +from threading import Thread, main_thread +from util import non_block_read, non_block_peek +from time import sleep + +# +# In a new thread, listens for new data on the stream +# and passes it to the listener when available +# +# Upstreams implement getStream() +# Downstreams implement write() for bytes to be given to +# +class StreamListener(Thread): + + def __init__(self, upstream, downstream): + Thread.__init__(self) + self.setUpstream(upstream) + self.setDownstream(downstream) + self.quit = False + + def setUpstream(self, upstream): + if not upstream is None: + self.stream = upstream.getStream() + else: + self.stream = None + + def setDownstream(self, downstream): + self.listener = downstream + + # blocks until there is nothing left to read from upstream + def blockUntilEmpty(self): + if not self.stream is None: + while True: + output = non_block_peek(self.stream) + if output == None or output == b'': + break + sleep(0.1) + + def stop(self): + self.quit = True + + def run(self): + while main_thread().is_alive() and not self.quit: + while True: + if not self.stream is None: + output = non_block_read(self.stream) + if output == None or output == b'': + break + self.listener.write(output) + sleep(0.1) \ No newline at end of file diff --git a/transcoder.py b/transcoder.py new file mode 100644 index 0000000..e82938d --- /dev/null +++ b/transcoder.py @@ -0,0 +1,29 @@ +import ffmpeg +from logger import logger +from stream_listener import StreamListener + +# converts the stream to mp3 before sending to the long-lasting mp3 connection +class Transcoder(object): + + def __init__(self, upstream): + self.process = ( ffmpeg + .input('pipe:') + .output('pipe:', format='mp3') + .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + ) + logger.add(self.process.stderr, "transcoder.log") + self.listener = StreamListener(upstream, self) + self.listener.start() + + def stop(self): + self.listener.stop() + self.process.stdin.close() + self.process.stdout.close() + self.process.stderr.close() + self.process.wait() + + def getStream(self): + return self.process.stdout + + def write(self, chunk): + self.process.stdin.write(chunk) \ No newline at end of file diff --git a/uploader.py b/uploader.py new file mode 100644 index 0000000..b0740cd --- /dev/null +++ b/uploader.py @@ -0,0 +1,42 @@ +import ffmpeg +from logger import logger +import sys +from stream_listener import StreamListener + +class Uploader(object): + + def __init__(self): + self.connect() + + def connect(self): + self.process = ( ffmpeg + .input('pipe:', re=None) + .output("icecast://source:hackme@localhost:8000/stream.mp3", format='mp3', content_type="audio/mpeg") + .run_async(pipe_stdin=True, pipe_stderr=True) + ) + logger.add(self.process.stderr, "uploader.log") + self.listener = StreamListener(None, self) + self.listener.start() + + def reconnect(self): + self.process.stdin.close() + self.process.wait() + self.connect() + + def setUpstream(self, upstream): + self.listener.setUpstream(upstream) + + def stop(self): + self.listener.stop() + self.process.stdin.close() + self.process.stderr.close() + self.process.wait() + + def write(self, chunk): + while True: + try: + self.process.stdin.write(chunk) + break + except: + print("Unexpected error:", sys.exc_info()) + self.reconnect() \ No newline at end of file diff --git a/util.py b/util.py new file mode 100644 index 0000000..c12b420 --- /dev/null +++ b/util.py @@ -0,0 +1,24 @@ +import fcntl +import os + +def non_block_read(output): + if output.closed: + return b'' + fd = output.fileno() + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + try: + return output.read() + except: + return b'' + +def non_block_peek(output): + if output.closed: + return b'' + fd = output.fileno() + fl = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) + try: + return output.peek() + except: + return b'' \ No newline at end of file