diff --git a/buffer.py b/buffer.py new file mode 100644 index 0000000..076c3de --- /dev/null +++ b/buffer.py @@ -0,0 +1,55 @@ +from stream import ByteSource +from queue import Queue +from threading import Lock +from time import sleep +from stream_listener import StreamListener + +# a simple FIFO buffer for ensuring stream stability +class Buffer(ByteSource): + + def __init__(self, upstream, size=200000): + self.fifo = Queue() + self.targetSize = size + self.size = 0 + self.quit = False + self.mutex = Lock() + self.listener = StreamListener(upstream, self) + self.listener.start() + + def stop(self): + self.quit = True + self.listener.stop() + + def peek(self): + self.mutex.acquire() + try: + if self.size > 0 and not self.quit: + return self.fifo.queue[0] + else: + return None + finally: + self.mutex.release() + + def read(self): + self.mutex.acquire() + try: + if self.size > 0 and not self.quit: + out = self.fifo.get_nowait() + self.size -= len(out) + return out + else: + return None + finally: + self.mutex.release() + + def write(self, chunk): + while not self.quit: + try: + self.mutex.acquire() + if self.size < self.targetSize: + self.size += len(chunk) + self.fifo.put(chunk) + break + finally: + self.mutex.release() + sleep(0.1) \ No newline at end of file diff --git a/downloader.py b/downloader.py index 20123a4..4ff0e15 100644 --- a/downloader.py +++ b/downloader.py @@ -14,6 +14,7 @@ import signal from logger import logger from threading import Thread, main_thread from time import sleep +from stream import StreamSource def updateYoutubeDL(): pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl']) @@ -25,7 +26,7 @@ dirpath = tempfile.mkdtemp() sys.path.append(dirpath) updateYoutubeDL() -class Downloader(Thread): +class Downloader(Thread, StreamSource): def __init__(self, url, cb): Thread.__init__(self) @@ -64,7 +65,6 @@ class Downloader(Thread): if self.isAlive(): os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) self.popen.stdout.close() - # self.popen.wait() self.exit = True # checks to see if the current download has finished diff --git a/nullsrc.py b/nullsrc.py index d0b1741..b5e97b5 100644 --- a/nullsrc.py +++ b/nullsrc.py @@ -1,9 +1,10 @@ import ffmpeg from logger import logger +from stream import StreamSource # A null audio source -class NullSrc(object): +class NullSrc(StreamSource): def __init__(self): self.process = ( ffmpeg @@ -16,7 +17,6 @@ class NullSrc(object): def stop(self): self.process.stdout.close() self.process.stderr.close() - # self.process.wait() def getStream(self): return self.process.stdout \ No newline at end of file diff --git a/radio.py b/radio.py index 908f03d..d0a962f 100644 --- a/radio.py +++ b/radio.py @@ -1,6 +1,7 @@ import downloader import uploader import transcoder +import buffer from time import sleep from flask import Flask, request from queue import Queue @@ -16,6 +17,7 @@ class Radio(object): def __init__(self): self.downloader = None self.transcoder = None + self.buffer = None self.uploader = uploader.Uploader(host, stream) self.playingUrl = None self.queue = Queue() @@ -25,7 +27,8 @@ class Radio(object): self.playingUrl = self.queue.get() self.downloader = downloader.Downloader(self.playingUrl, self.downloadFinished) self.transcoder = transcoder.Transcoder(self.downloader) - self.uploader.setUpstream(self.transcoder) + self.buffer = buffer.Buffer(self.transcoder) + self.uploader.setUpstream(self.buffer) def isPlaying(self): return not self.playingUrl is None @@ -48,6 +51,7 @@ class Radio(object): def stopPlaying(self): self.downloader.stop() self.transcoder.stop() + self.buffer.stop() self.playingUrl = None # downloader callback function, called when the downloader is finished diff --git a/setup.py b/setup.py index 117bd7e..3849a77 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,9 @@ setup( 'util', 'stream_listener', 'transcoder', - 'nullsrc' + 'nullsrc', + 'buffer', + 'stream' ], entry_points={ 'console_scripts': ['radio = radio:run'] diff --git a/stream.py b/stream.py new file mode 100644 index 0000000..a65c891 --- /dev/null +++ b/stream.py @@ -0,0 +1,46 @@ +from util import non_block_read, non_block_peek + +class StreamSource(object): + def getStream(self): + """ Gets the python stream for reading from """ + pass + +class ByteSource(object): + def peek(self): + """ Gets the bytes at the head of the queue (if any) without incrementing + Non-blocking! + Returns None if no bytes available + """ + pass + def read(self): + """ Gets the bytes at the head of the queue (if any) + Non-blocking! + Returns None if no bytes available + """ + pass + +class StreamByteSource(ByteSource): + def __init__(self, stream): + self.stream = stream + + def peek(self): + output = non_block_peek(self.stream) + if output == None or output == b'': + return None + return output + + def read(self): + output = non_block_read(self.stream) + if output == None or output == b'': + return None + return output + +def toByteSource(src): + if isinstance(src, ByteSource): + return src + elif isinstance(src, StreamSource): + return StreamByteSource(src.getStream()) + elif src is None: + return None + else: + raise Exception("not implemented") \ No newline at end of file diff --git a/stream_listener.py b/stream_listener.py index 5aa7638..35b9333 100644 --- a/stream_listener.py +++ b/stream_listener.py @@ -1,25 +1,22 @@ from threading import Thread, main_thread from util import non_block_read, non_block_peek from time import sleep, time +from stream import toByteSource # time (ms) to wait before switching over to backup source -FALLBACK_TIME = 500 +FALLBACK_TIME = 100 def current_milli_time(): return round(time() * 1000) -def empty(output): - return output == None or output == b'' - # # In a new thread, listens for new data on the stream # and passes it to the listener when available # -# Upstreams implement getStream() +# Upstreams implement class StreamSource or ByteSource # Downstreams implement write() for bytes to be given to # class StreamListener(Thread): - def __init__(self, upstream, downstream, backupUpstream=None): Thread.__init__(self) self.setUpstream(upstream) @@ -29,26 +26,20 @@ class StreamListener(Thread): self.lastData = 0 def setUpstream(self, upstream): - if not upstream is None: - self.stream = upstream.getStream() - else: - self.stream = None + self.src = toByteSource(upstream) def setBackupUpstream(self, upstream): - if not upstream is None: - self.backupStream = upstream.getStream() - else: - self.backupStream = None + self.backupSrc = toByteSource(upstream) def setDownstream(self, downstream): self.listener = downstream # blocks until there is nothing left to read from upstream def blockUntilEmpty(self): - if not self.stream is None: + if not self.src is None: while True: - output = non_block_peek(self.stream) - if output == None or output == b'': + output = self.src.peek() + if output == None: break sleep(0.1) @@ -59,13 +50,13 @@ class StreamListener(Thread): while main_thread().is_alive() and not self.quit: while True: output = None - if not self.stream is None: - output = non_block_read(self.stream) - if empty(output) and not self.backupStream is None and current_milli_time() - self.lastData > FALLBACK_TIME: - output = non_block_read(self.backupStream) + if not self.src is None: + output = self.src.read() + if output == None and not self.backupSrc is None and current_milli_time() - self.lastData > FALLBACK_TIME: + output = self.backupSrc.read() else: self.lastData = current_milli_time() - if empty(output): + if output == None: break self.listener.write(output) sleep(0.1) \ No newline at end of file diff --git a/transcoder.py b/transcoder.py index baba0c1..7f3108d 100644 --- a/transcoder.py +++ b/transcoder.py @@ -1,9 +1,10 @@ import ffmpeg from logger import logger from stream_listener import StreamListener +from stream import StreamSource # converts the stream to mp3 before sending to the long-lasting mp3 connection -class Transcoder(object): +class Transcoder(StreamSource): def __init__(self, upstream): self.process = ( ffmpeg @@ -20,7 +21,6 @@ class Transcoder(object): self.process.stdin.close() self.process.stdout.close() self.process.stderr.close() - # self.process.wait() def getStream(self): return self.process.stdout diff --git a/uploader.py b/uploader.py index 48e5108..b350d49 100644 --- a/uploader.py +++ b/uploader.py @@ -31,9 +31,10 @@ class Uploader(object): def stop(self): self.listener.stop() - self.process.stdin.close() - self.process.stderr.close() - self.process.wait() + if not self.process.stdin.closed: + self.process.stdin.close() + if not self.process.stderr.closed: + self.process.stderr.close() def write(self, chunk): while True: