internel buffer for input song

This commit is contained in:
zuckerberg 2021-06-17 21:07:46 -04:00
parent 1b83257ee3
commit 71a1443301
9 changed files with 132 additions and 33 deletions

55
buffer.py Normal file
View File

@ -0,0 +1,55 @@
from stream import ByteSource
from queue import Queue
from threading import Lock
from time import sleep
from stream_listener import StreamListener
# a simple FIFO buffer for ensuring stream stability
class Buffer(ByteSource):
def __init__(self, upstream, size=200000):
self.fifo = Queue()
self.targetSize = size
self.size = 0
self.quit = False
self.mutex = Lock()
self.listener = StreamListener(upstream, self)
self.listener.start()
def stop(self):
self.quit = True
self.listener.stop()
def peek(self):
self.mutex.acquire()
try:
if self.size > 0 and not self.quit:
return self.fifo.queue[0]
else:
return None
finally:
self.mutex.release()
def read(self):
self.mutex.acquire()
try:
if self.size > 0 and not self.quit:
out = self.fifo.get_nowait()
self.size -= len(out)
return out
else:
return None
finally:
self.mutex.release()
def write(self, chunk):
while not self.quit:
try:
self.mutex.acquire()
if self.size < self.targetSize:
self.size += len(chunk)
self.fifo.put(chunk)
break
finally:
self.mutex.release()
sleep(0.1)

View File

@ -14,6 +14,7 @@ import signal
from logger import logger from logger import logger
from threading import Thread, main_thread from threading import Thread, main_thread
from time import sleep from time import sleep
from stream import StreamSource
def updateYoutubeDL(): def updateYoutubeDL():
pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl']) pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl'])
@ -25,7 +26,7 @@ dirpath = tempfile.mkdtemp()
sys.path.append(dirpath) sys.path.append(dirpath)
updateYoutubeDL() updateYoutubeDL()
class Downloader(Thread): class Downloader(Thread, StreamSource):
def __init__(self, url, cb): def __init__(self, url, cb):
Thread.__init__(self) Thread.__init__(self)
@ -64,7 +65,6 @@ class Downloader(Thread):
if self.isAlive(): if self.isAlive():
os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM) os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
self.popen.stdout.close() self.popen.stdout.close()
# self.popen.wait()
self.exit = True self.exit = True
# checks to see if the current download has finished # checks to see if the current download has finished

View File

@ -1,9 +1,10 @@
import ffmpeg import ffmpeg
from logger import logger from logger import logger
from stream import StreamSource
# A null audio source # A null audio source
class NullSrc(object): class NullSrc(StreamSource):
def __init__(self): def __init__(self):
self.process = ( ffmpeg self.process = ( ffmpeg
@ -16,7 +17,6 @@ class NullSrc(object):
def stop(self): def stop(self):
self.process.stdout.close() self.process.stdout.close()
self.process.stderr.close() self.process.stderr.close()
# self.process.wait()
def getStream(self): def getStream(self):
return self.process.stdout return self.process.stdout

View File

@ -1,6 +1,7 @@
import downloader import downloader
import uploader import uploader
import transcoder import transcoder
import buffer
from time import sleep from time import sleep
from flask import Flask, request from flask import Flask, request
from queue import Queue from queue import Queue
@ -16,6 +17,7 @@ class Radio(object):
def __init__(self): def __init__(self):
self.downloader = None self.downloader = None
self.transcoder = None self.transcoder = None
self.buffer = None
self.uploader = uploader.Uploader(host, stream) self.uploader = uploader.Uploader(host, stream)
self.playingUrl = None self.playingUrl = None
self.queue = Queue() self.queue = Queue()
@ -25,7 +27,8 @@ class Radio(object):
self.playingUrl = self.queue.get() self.playingUrl = self.queue.get()
self.downloader = downloader.Downloader(self.playingUrl, self.downloadFinished) self.downloader = downloader.Downloader(self.playingUrl, self.downloadFinished)
self.transcoder = transcoder.Transcoder(self.downloader) self.transcoder = transcoder.Transcoder(self.downloader)
self.uploader.setUpstream(self.transcoder) self.buffer = buffer.Buffer(self.transcoder)
self.uploader.setUpstream(self.buffer)
def isPlaying(self): def isPlaying(self):
return not self.playingUrl is None return not self.playingUrl is None
@ -48,6 +51,7 @@ class Radio(object):
def stopPlaying(self): def stopPlaying(self):
self.downloader.stop() self.downloader.stop()
self.transcoder.stop() self.transcoder.stop()
self.buffer.stop()
self.playingUrl = None self.playingUrl = None
# downloader callback function, called when the downloader is finished # downloader callback function, called when the downloader is finished

View File

@ -13,7 +13,9 @@ setup(
'util', 'util',
'stream_listener', 'stream_listener',
'transcoder', 'transcoder',
'nullsrc' 'nullsrc',
'buffer',
'stream'
], ],
entry_points={ entry_points={
'console_scripts': ['radio = radio:run'] 'console_scripts': ['radio = radio:run']

46
stream.py Normal file
View File

@ -0,0 +1,46 @@
from util import non_block_read, non_block_peek
class StreamSource(object):
def getStream(self):
""" Gets the python stream for reading from """
pass
class ByteSource(object):
def peek(self):
""" Gets the bytes at the head of the queue (if any) without incrementing
Non-blocking!
Returns None if no bytes available
"""
pass
def read(self):
""" Gets the bytes at the head of the queue (if any)
Non-blocking!
Returns None if no bytes available
"""
pass
class StreamByteSource(ByteSource):
def __init__(self, stream):
self.stream = stream
def peek(self):
output = non_block_peek(self.stream)
if output == None or output == b'':
return None
return output
def read(self):
output = non_block_read(self.stream)
if output == None or output == b'':
return None
return output
def toByteSource(src):
if isinstance(src, ByteSource):
return src
elif isinstance(src, StreamSource):
return StreamByteSource(src.getStream())
elif src is None:
return None
else:
raise Exception("not implemented")

View File

@ -1,25 +1,22 @@
from threading import Thread, main_thread from threading import Thread, main_thread
from util import non_block_read, non_block_peek from util import non_block_read, non_block_peek
from time import sleep, time from time import sleep, time
from stream import toByteSource
# time (ms) to wait before switching over to backup source # time (ms) to wait before switching over to backup source
FALLBACK_TIME = 500 FALLBACK_TIME = 100
def current_milli_time(): def current_milli_time():
return round(time() * 1000) return round(time() * 1000)
def empty(output):
return output == None or output == b''
# #
# In a new thread, listens for new data on the stream # In a new thread, listens for new data on the stream
# and passes it to the listener when available # and passes it to the listener when available
# #
# Upstreams implement getStream() # Upstreams implement class StreamSource or ByteSource
# Downstreams implement write() for bytes to be given to # Downstreams implement write() for bytes to be given to
# #
class StreamListener(Thread): class StreamListener(Thread):
def __init__(self, upstream, downstream, backupUpstream=None): def __init__(self, upstream, downstream, backupUpstream=None):
Thread.__init__(self) Thread.__init__(self)
self.setUpstream(upstream) self.setUpstream(upstream)
@ -29,26 +26,20 @@ class StreamListener(Thread):
self.lastData = 0 self.lastData = 0
def setUpstream(self, upstream): def setUpstream(self, upstream):
if not upstream is None: self.src = toByteSource(upstream)
self.stream = upstream.getStream()
else:
self.stream = None
def setBackupUpstream(self, upstream): def setBackupUpstream(self, upstream):
if not upstream is None: self.backupSrc = toByteSource(upstream)
self.backupStream = upstream.getStream()
else:
self.backupStream = None
def setDownstream(self, downstream): def setDownstream(self, downstream):
self.listener = downstream self.listener = downstream
# blocks until there is nothing left to read from upstream # blocks until there is nothing left to read from upstream
def blockUntilEmpty(self): def blockUntilEmpty(self):
if not self.stream is None: if not self.src is None:
while True: while True:
output = non_block_peek(self.stream) output = self.src.peek()
if output == None or output == b'': if output == None:
break break
sleep(0.1) sleep(0.1)
@ -59,13 +50,13 @@ class StreamListener(Thread):
while main_thread().is_alive() and not self.quit: while main_thread().is_alive() and not self.quit:
while True: while True:
output = None output = None
if not self.stream is None: if not self.src is None:
output = non_block_read(self.stream) output = self.src.read()
if empty(output) and not self.backupStream is None and current_milli_time() - self.lastData > FALLBACK_TIME: if output == None and not self.backupSrc is None and current_milli_time() - self.lastData > FALLBACK_TIME:
output = non_block_read(self.backupStream) output = self.backupSrc.read()
else: else:
self.lastData = current_milli_time() self.lastData = current_milli_time()
if empty(output): if output == None:
break break
self.listener.write(output) self.listener.write(output)
sleep(0.1) sleep(0.1)

View File

@ -1,9 +1,10 @@
import ffmpeg import ffmpeg
from logger import logger from logger import logger
from stream_listener import StreamListener from stream_listener import StreamListener
from stream import StreamSource
# converts the stream to mp3 before sending to the long-lasting mp3 connection # converts the stream to mp3 before sending to the long-lasting mp3 connection
class Transcoder(object): class Transcoder(StreamSource):
def __init__(self, upstream): def __init__(self, upstream):
self.process = ( ffmpeg self.process = ( ffmpeg
@ -20,7 +21,6 @@ class Transcoder(object):
self.process.stdin.close() self.process.stdin.close()
self.process.stdout.close() self.process.stdout.close()
self.process.stderr.close() self.process.stderr.close()
# self.process.wait()
def getStream(self): def getStream(self):
return self.process.stdout return self.process.stdout

View File

@ -31,9 +31,10 @@ class Uploader(object):
def stop(self): def stop(self):
self.listener.stop() self.listener.stop()
self.process.stdin.close() if not self.process.stdin.closed:
self.process.stderr.close() self.process.stdin.close()
self.process.wait() if not self.process.stderr.closed:
self.process.stderr.close()
def write(self, chunk): def write(self, chunk):
while True: while True: