song transitioning PoC

This commit is contained in:
zuckerberg 2021-05-31 16:54:39 -04:00
parent 52622b22e7
commit cccc3f4dc5
8 changed files with 246 additions and 76 deletions

View File

@ -12,6 +12,8 @@ import os
import pip import pip
import signal import signal
from logger import logger from logger import logger
from threading import Thread, main_thread
from time import sleep
def updateYoutubeDL(): def updateYoutubeDL():
pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl']) pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl'])
@ -23,7 +25,18 @@ dirpath = tempfile.mkdtemp()
sys.path.append(dirpath) sys.path.append(dirpath)
updateYoutubeDL() updateYoutubeDL()
def executeYoutubeDL(url, cb): class Downloader(Thread):
def __init__(self, url, cb):
Thread.__init__(self)
# update youtube-dl
# TODO: do this only every once in a while
# updateYoutubeDL()
self.cb = cb
self.exit = False
env = dict(os.environ) env = dict(os.environ)
env["PYTHONPATH"] = dirpath env["PYTHONPATH"] = dirpath
cmd = [ cmd = [
@ -40,23 +53,27 @@ def executeYoutubeDL(url, cb):
"--no-call-home", "--no-call-home",
url url
] ]
popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid) self.popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)
logger.add(self.popen.stderr, "youtube-dl.log")
self.start()
# monitor the stdout and send to callback, if result from callback function is true, def isAlive(self):
# then kill the download process return self.popen.poll() is None
BUFFER_SIZE = 8096
logger.add(popen.stderr, "youtube-dl.log")
for chunk in iter(lambda: popen.stdout.read(BUFFER_SIZE), b''):
if cb(chunk):
os.killpg(os.getpgid(popen.pid), signal.SIGTERM)
break
popen.stdout.close()
popen.wait()
def download(url, cb): def stop(self):
# update youtube-dl if self.isAlive():
# TODO: do this only every once in a while os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
# updateYoutubeDL() self.popen.stdout.close()
self.popen.wait()
self.exit = True
# start downloader so that it's stdout (with fragments) may be captured # checks to see if the current download has finished
executeYoutubeDL(url, cb) # if yes, cleans up and fires callback
def run(self):
while main_thread().is_alive() and not self.exit:
if not self.isAlive():
self.cb()
sleep(0.1)
def getStream(self):
return self.popen.stdout

View File

@ -1,38 +1,36 @@
import sys import sys
import fcntl
import os
from subprocess import PIPE, Popen from subprocess import PIPE, Popen
from threading import Thread, main_thread from threading import Thread, main_thread, Lock
from queue import Queue, Empty from queue import Queue, Empty
from time import sleep from time import sleep
from pathlib import Path from pathlib import Path
from util import non_block_read
READ_SIZE = 8096 READ_SIZE = 8096
LOG_DIR = "logs/" LOG_DIR = "logs/"
def non_block_read(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return b''
class Logger(Thread): class Logger(Thread):
def __init__(self): def __init__(self):
Thread.__init__(self) Thread.__init__(self)
self.streams = dict() self.streams = dict()
self.files = dict() self.files = dict()
self.mutex = Lock()
Path(LOG_DIR).mkdir(exist_ok=True) Path(LOG_DIR).mkdir(exist_ok=True)
def add(self, stream, filename): def add(self, stream, filename):
self.files[filename] = open(LOG_DIR + filename, "wb") self.mutex.acquire()
try:
if not filename in self.files.keys():
self.files[filename] = open(LOG_DIR + filename, "ab")
self.streams[filename] = stream self.streams[filename] = stream
finally:
self.mutex.release()
def run(self): def run(self):
while main_thread().is_alive(): while main_thread().is_alive():
self.mutex.acquire()
try:
for filename, stream in self.streams.items(): for filename, stream in self.streams.items():
f = self.files[filename] f = self.files[filename]
while True: while True:
@ -42,6 +40,8 @@ class Logger(Thread):
print(output.decode('ascii')) print(output.decode('ascii'))
f.write(output) f.write(output)
f.flush() f.flush()
finally:
self.mutex.release()
sleep(0.1) sleep(0.1)

View File

@ -1,26 +1,35 @@
import downloader import downloader
import ffmpeg import uploader
import sys import transcoder
from logger import logger from time import sleep
def run(): def cb():
process = ( ffmpeg global d
.input('pipe:', re=None) global t
.output("icecast://source:hackme@localhost:8000/stream.mp3", format='mp3', content_type="audio/mpeg") print("----------------FINISHED-----------")
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) t.listener.blockUntilEmpty()
) u.listener.blockUntilEmpty()
d.stop()
t.stop()
d = downloader.Downloader('https://www.youtube.com/watch?v=BQQ3qZ9FC70', cb)
t = transcoder.Transcoder(d)
u.setUpstream(t)
logger.add(process.stdout, "ffmpeg.out.log") u = uploader.Uploader()
logger.add(process.stderr, "ffmpeg.err.log") d = downloader.Downloader('https://www.youtube.com/watch?v=kgBcg4uBd9Q', cb)
t = transcoder.Transcoder(d)
u.setUpstream(t)
def cb(chunk): #def run():
process.stdin.write(chunk) # up = uploader.Uploader()
return False # t = transcoder.Transcoder(up)
# downloader.download('https://www.youtube.com/watch?v=BaW_jenozKc', t.callback)
downloader.download('https://www.youtube.com/watch?v=BaW_jenozKc', cb) # t = transcoder.Transcoder(up)
downloader.download('https://www.youtube.com/watch?v=EbnH3VHzhu8', cb) # downloader.download('https://www.youtube.com/watch?v=EbnH3VHzhu8', t.callback)
# downloader.download('https://www.youtube.com/watch?v=kgBcg4uBd9Q', cb) # downloader.download('https://www.youtube.com/watch?v=kgBcg4uBd9Q', cb)
# downloader.download('https://www.youtube.com/watch?v=EbnH3VHzhu8', cb) # downloader.download('https://www.youtube.com/watch?v=EbnH3VHzhu8', cb)
if __name__ == "__main__": if __name__ == "__main__":
run() #run()
while True:
sleep(1)

View File

@ -5,7 +5,7 @@ requires = ["pip","ffmpeg-python"]
setup( setup(
name='radio', name='radio',
version='0.1', version='0.1',
py_modules=['radio','downloader','logger'], py_modules=['radio','downloader','uploader','logger','util'],
entry_points={ entry_points={
'console_scripts': ['radio = radio:run'] 'console_scripts': ['radio = radio:run']
}, },

49
stream_listener.py Normal file
View File

@ -0,0 +1,49 @@
from threading import Thread, main_thread
from util import non_block_read, non_block_peek
from time import sleep
#
# In a new thread, listens for new data on the stream
# and passes it to the listener when available
#
# Upstreams implement getStream()
# Downstreams implement write() for bytes to be given to
#
class StreamListener(Thread):
def __init__(self, upstream, downstream):
Thread.__init__(self)
self.setUpstream(upstream)
self.setDownstream(downstream)
self.quit = False
def setUpstream(self, upstream):
if not upstream is None:
self.stream = upstream.getStream()
else:
self.stream = None
def setDownstream(self, downstream):
self.listener = downstream
# blocks until there is nothing left to read from upstream
def blockUntilEmpty(self):
if not self.stream is None:
while True:
output = non_block_peek(self.stream)
if output == None or output == b'':
break
sleep(0.1)
def stop(self):
self.quit = True
def run(self):
while main_thread().is_alive() and not self.quit:
while True:
if not self.stream is None:
output = non_block_read(self.stream)
if output == None or output == b'':
break
self.listener.write(output)
sleep(0.1)

29
transcoder.py Normal file
View File

@ -0,0 +1,29 @@
import ffmpeg
from logger import logger
from stream_listener import StreamListener
# converts the stream to mp3 before sending to the long-lasting mp3 connection
class Transcoder(object):
def __init__(self, upstream):
self.process = ( ffmpeg
.input('pipe:')
.output('pipe:', format='mp3')
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
logger.add(self.process.stderr, "transcoder.log")
self.listener = StreamListener(upstream, self)
self.listener.start()
def stop(self):
self.listener.stop()
self.process.stdin.close()
self.process.stdout.close()
self.process.stderr.close()
self.process.wait()
def getStream(self):
return self.process.stdout
def write(self, chunk):
self.process.stdin.write(chunk)

42
uploader.py Normal file
View File

@ -0,0 +1,42 @@
import ffmpeg
from logger import logger
import sys
from stream_listener import StreamListener
class Uploader(object):
def __init__(self):
self.connect()
def connect(self):
self.process = ( ffmpeg
.input('pipe:', re=None)
.output("icecast://source:hackme@localhost:8000/stream.mp3", format='mp3', content_type="audio/mpeg")
.run_async(pipe_stdin=True, pipe_stderr=True)
)
logger.add(self.process.stderr, "uploader.log")
self.listener = StreamListener(None, self)
self.listener.start()
def reconnect(self):
self.process.stdin.close()
self.process.wait()
self.connect()
def setUpstream(self, upstream):
self.listener.setUpstream(upstream)
def stop(self):
self.listener.stop()
self.process.stdin.close()
self.process.stderr.close()
self.process.wait()
def write(self, chunk):
while True:
try:
self.process.stdin.write(chunk)
break
except:
print("Unexpected error:", sys.exc_info())
self.reconnect()

24
util.py Normal file
View File

@ -0,0 +1,24 @@
import fcntl
import os
def non_block_read(output):
if output.closed:
return b''
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return b''
def non_block_peek(output):
if output.closed:
return b''
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.peek()
except:
return b''