Compare commits
31 Commits
1b83257ee3
...
main
Author | SHA1 | Date | |
---|---|---|---|
|
eb95b31089 | ||
|
514318311e | ||
|
1bb789dedf | ||
|
a79b0e99bc | ||
|
4255c43cf6 | ||
|
b9b050fec1 | ||
|
544b090b58 | ||
|
462c835232 | ||
|
b8f36cc59f | ||
|
96f6572a4b | ||
|
209e5e2292 | ||
|
658c84d5df | ||
|
72995c0d3a | ||
|
5bf607fed9 | ||
|
462d06e84a | ||
|
2a95c597cc | ||
|
7576e847ca | ||
|
5a5b0afaaa | ||
|
5de2148123 | ||
|
1fcffb7b00 | ||
|
636fd12b5f | ||
|
d5eca0a039 | ||
|
b0867df965 | ||
|
ca3eac69b8 | ||
|
092cbb824d | ||
|
d83b55799e | ||
|
ec016a2186 | ||
|
86f35f4996 | ||
|
d0a08a202b | ||
|
b6a53eafef | ||
|
71a1443301 |
55
buffer.py
Normal file
55
buffer.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
from stream import ByteSource
|
||||||
|
from queue import Queue
|
||||||
|
from threading import Lock
|
||||||
|
from time import sleep
|
||||||
|
from stream_listener import StreamListener
|
||||||
|
|
||||||
|
# a simple FIFO buffer for ensuring stream stability
|
||||||
|
class Buffer(ByteSource):
|
||||||
|
|
||||||
|
def __init__(self, upstream, size=200000):
|
||||||
|
self.fifo = Queue()
|
||||||
|
self.targetSize = size
|
||||||
|
self.size = 0
|
||||||
|
self.quit = False
|
||||||
|
self.mutex = Lock()
|
||||||
|
self.listener = StreamListener(upstream, self)
|
||||||
|
self.listener.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.quit = True
|
||||||
|
self.listener.stop()
|
||||||
|
|
||||||
|
def peek(self):
|
||||||
|
self.mutex.acquire()
|
||||||
|
try:
|
||||||
|
if self.size > 0 and not self.quit:
|
||||||
|
return self.fifo.queue[0]
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
self.mutex.release()
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
self.mutex.acquire()
|
||||||
|
try:
|
||||||
|
if self.size > 0 and not self.quit:
|
||||||
|
out = self.fifo.get_nowait()
|
||||||
|
self.size -= len(out)
|
||||||
|
return out
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
self.mutex.release()
|
||||||
|
|
||||||
|
def write(self, chunk):
|
||||||
|
while not self.quit:
|
||||||
|
try:
|
||||||
|
self.mutex.acquire()
|
||||||
|
if self.size < self.targetSize:
|
||||||
|
self.size += len(chunk)
|
||||||
|
self.fifo.put(chunk)
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
self.mutex.release()
|
||||||
|
sleep(0.1)
|
@ -4,5 +4,5 @@ pkgs.python3Packages.buildPythonApplication {
|
|||||||
pname = "radio";
|
pname = "radio";
|
||||||
src = self;
|
src = self;
|
||||||
version = "0.1";
|
version = "0.1";
|
||||||
propagatedBuildInputs = with pkgs.python3Packages; [ pip ffmpeg-python flask requests pkgs.ffmpeg ];
|
propagatedBuildInputs = with pkgs.python3Packages; [ pip ffmpeg-python flask requests pkgs.ffmpeg libtorrent-rasterbar ];
|
||||||
}
|
}
|
||||||
|
133
downloader.py
133
downloader.py
@ -1,60 +1,100 @@
|
|||||||
#
|
#
|
||||||
# Downloads the video/audio as a stream from a provided link using youtube-dl
|
# Downloads the video/audio as a stream from a provided link using yt-dlp
|
||||||
# does not save the file, only the most recent fragment is held. Thus, this is
|
# does not save the file, only the most recent fragment is held. Thus, this is
|
||||||
# ideal for devices with little memory
|
# ideal for devices with little memory
|
||||||
# TODO gather video metadata before download
|
|
||||||
#
|
#
|
||||||
|
|
||||||
|
import ffmpeg
|
||||||
import tempfile
|
import tempfile
|
||||||
import sys
|
import sys
|
||||||
import subprocess
|
import subprocess
|
||||||
import os
|
import os
|
||||||
import pip
|
import pip
|
||||||
import signal
|
import signal
|
||||||
|
import json
|
||||||
|
import shutil
|
||||||
|
import glob
|
||||||
from logger import logger
|
from logger import logger
|
||||||
from threading import Thread, main_thread
|
from threading import Thread, main_thread
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
from stream import StreamSource
|
||||||
|
|
||||||
def updateYoutubeDL():
|
def updateYtdlp():
|
||||||
pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl'])
|
pip.main(['install', '--target=' + dirpath, '--upgrade', 'yt-dlp'])
|
||||||
|
|
||||||
def importYoutubeDL():
|
def importYoutubeDL():
|
||||||
return __import__('youtube_dl')
|
return __import__('yt-dlp')
|
||||||
|
|
||||||
dirpath = tempfile.mkdtemp()
|
dirpath = tempfile.mkdtemp()
|
||||||
sys.path.append(dirpath)
|
sys.path.append(dirpath)
|
||||||
updateYoutubeDL()
|
updateYtdlp()
|
||||||
|
|
||||||
class Downloader(Thread):
|
baseOpts = [
|
||||||
|
"-f", "bestaudio/best", # select for best audio
|
||||||
def __init__(self, url, cb):
|
|
||||||
Thread.__init__(self)
|
|
||||||
|
|
||||||
# update youtube-dl
|
|
||||||
# TODO: do this only every once in a while
|
|
||||||
# updateYoutubeDL()
|
|
||||||
|
|
||||||
self.cb = cb
|
|
||||||
self.exit = False
|
|
||||||
|
|
||||||
env = dict(os.environ)
|
|
||||||
env["PYTHONPATH"] = dirpath
|
|
||||||
cmd = [
|
|
||||||
sys.executable,
|
|
||||||
dirpath + "/bin/youtube-dl",
|
|
||||||
"-o", "-",
|
|
||||||
"-f", "bestaudio/best",
|
|
||||||
# "--audio-format", "mp3", "-x", # cannot do because it requires a tmp file to re-encode
|
# "--audio-format", "mp3", "-x", # cannot do because it requires a tmp file to re-encode
|
||||||
"--prefer-ffmpeg",
|
"--prefer-ffmpeg",
|
||||||
"--no-mark-watched",
|
"--no-mark-watched",
|
||||||
"--geo-bypass",
|
"--geo-bypass",
|
||||||
"--no-playlist",
|
"--no-playlist",
|
||||||
"--retries", "100",
|
"--retries", "100",
|
||||||
|
"--extractor-retries", "100",
|
||||||
|
# "--throttled-rate", "100K", # get around youtube throttling; probably not needed anymore
|
||||||
"--no-call-home",
|
"--no-call-home",
|
||||||
|
"--sponsorblock-remove", "sponsor,intro,selfpromo,interaction,preview,music_offtopic",
|
||||||
|
]
|
||||||
|
|
||||||
|
def runYtdlp(opts):
|
||||||
|
# update yt-dlp
|
||||||
|
# TODO: do this only every once in a while
|
||||||
|
# updateYtdlp()
|
||||||
|
|
||||||
|
env = dict(os.environ)
|
||||||
|
env["PYTHONPATH"] = dirpath
|
||||||
|
cmd = [
|
||||||
|
sys.executable,
|
||||||
|
dirpath + "/bin/yt-dlp"
|
||||||
|
] + baseOpts + opts
|
||||||
|
popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)
|
||||||
|
logger.add(popen.stderr, "yt-dlp.log")
|
||||||
|
return popen
|
||||||
|
|
||||||
|
def getVideoInfo(url):
|
||||||
|
popen = runYtdlp([
|
||||||
|
"-j", # dump all metadata as json
|
||||||
|
"--playlist-items", "1", # don't go through every single item in playlist
|
||||||
url
|
url
|
||||||
]
|
])
|
||||||
self.popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)
|
|
||||||
logger.add(self.popen.stderr, "youtube-dl.log")
|
j = popen.communicate()[0]
|
||||||
|
|
||||||
|
# make sure the ytdlp instance is closed
|
||||||
|
if popen.poll() is None: # is alive?
|
||||||
|
os.killpg(os.getpgid(popen.pid), signal.SIGTERM) # kill
|
||||||
|
popen.stdout.close()
|
||||||
|
|
||||||
|
# keys of interest (but not always present)
|
||||||
|
# title, direct, extractor, upload_date
|
||||||
|
# format_id (machine readable)
|
||||||
|
# view_count, uploader, duration, view_count, average_rating
|
||||||
|
# age_limit, like_count, dislike_count, thumbnail,
|
||||||
|
|
||||||
|
try:
|
||||||
|
return json.loads(j)
|
||||||
|
except:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Downloads using yt-dlp
|
||||||
|
class YtdlpDownloader(Thread, StreamSource):
|
||||||
|
def __init__(self, url, cb):
|
||||||
|
Thread.__init__(self)
|
||||||
|
|
||||||
|
self.cb = cb
|
||||||
|
self.exit = False
|
||||||
|
|
||||||
|
self.popen = runYtdlp([
|
||||||
|
"-o", "-", # output stream to stdout
|
||||||
|
url
|
||||||
|
])
|
||||||
self.start()
|
self.start()
|
||||||
|
|
||||||
def isAlive(self):
|
def isAlive(self):
|
||||||
@ -64,7 +104,6 @@ class Downloader(Thread):
|
|||||||
if self.isAlive():
|
if self.isAlive():
|
||||||
os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
|
os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
|
||||||
self.popen.stdout.close()
|
self.popen.stdout.close()
|
||||||
# self.popen.wait()
|
|
||||||
self.exit = True
|
self.exit = True
|
||||||
|
|
||||||
# checks to see if the current download has finished
|
# checks to see if the current download has finished
|
||||||
@ -77,3 +116,39 @@ class Downloader(Thread):
|
|||||||
|
|
||||||
def getStream(self):
|
def getStream(self):
|
||||||
return self.popen.stdout
|
return self.popen.stdout
|
||||||
|
|
||||||
|
# Downloads the file and plays it (must be a direct URL, ex: cannot be a yt link)
|
||||||
|
class DirectDownloader(Thread, StreamSource):
|
||||||
|
def __init__(self, url, cb):
|
||||||
|
Thread.__init__(self)
|
||||||
|
|
||||||
|
self.cb = cb
|
||||||
|
self.exit = False
|
||||||
|
|
||||||
|
self.process = ( ffmpeg
|
||||||
|
.input(url, re=None)
|
||||||
|
.output('pipe:', format='mp3')
|
||||||
|
.run_async(pipe_stdout=True, pipe_stderr=True)
|
||||||
|
)
|
||||||
|
logger.add(self.process.stderr, "direct-downloader.log")
|
||||||
|
|
||||||
|
self.start()
|
||||||
|
|
||||||
|
def isAlive(self):
|
||||||
|
return self.process.poll() is None
|
||||||
|
|
||||||
|
# checks to see if the current download has finished
|
||||||
|
# if yes, cleans up and fires callback
|
||||||
|
def run(self):
|
||||||
|
while main_thread().is_alive() and not self.exit:
|
||||||
|
if not self.isAlive():
|
||||||
|
self.cb()
|
||||||
|
sleep(0.1)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.process.stdout.close()
|
||||||
|
self.process.stderr.close()
|
||||||
|
self.exit = True
|
||||||
|
|
||||||
|
def getStream(self):
|
||||||
|
return self.process.stdout
|
@ -37,9 +37,9 @@ class Logger(Thread):
|
|||||||
output = non_block_read(stream)
|
output = non_block_read(stream)
|
||||||
if output == None or output == b'':
|
if output == None or output == b'':
|
||||||
break
|
break
|
||||||
print(output.decode('ascii'))
|
|
||||||
f.write(output)
|
f.write(output)
|
||||||
f.flush()
|
f.flush()
|
||||||
|
print(output.decode('utf-8'))
|
||||||
finally:
|
finally:
|
||||||
self.mutex.release()
|
self.mutex.release()
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
import ffmpeg
|
import ffmpeg
|
||||||
from logger import logger
|
from logger import logger
|
||||||
|
from stream import StreamSource
|
||||||
|
|
||||||
# A null audio source
|
# A null audio source
|
||||||
|
|
||||||
class NullSrc(object):
|
class NullSrc(StreamSource):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.process = ( ffmpeg
|
self.process = ( ffmpeg
|
||||||
@ -16,7 +17,6 @@ class NullSrc(object):
|
|||||||
def stop(self):
|
def stop(self):
|
||||||
self.process.stdout.close()
|
self.process.stdout.close()
|
||||||
self.process.stderr.close()
|
self.process.stderr.close()
|
||||||
# self.process.wait()
|
|
||||||
|
|
||||||
def getStream(self):
|
def getStream(self):
|
||||||
return self.process.stdout
|
return self.process.stdout
|
41
radio.py
41
radio.py
@ -1,21 +1,25 @@
|
|||||||
import downloader
|
import downloader
|
||||||
import uploader
|
import uploader
|
||||||
import transcoder
|
import transcoder
|
||||||
|
import torrent
|
||||||
|
import buffer
|
||||||
from time import sleep
|
from time import sleep
|
||||||
from flask import Flask, request
|
from flask import Flask, request
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
import json
|
import json
|
||||||
import requests
|
import requests
|
||||||
|
import sys
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
|
||||||
host = "localhost:8000"
|
host = sys.argv[1]
|
||||||
stream = "stream.mp3"
|
stream = sys.argv[2]
|
||||||
|
|
||||||
class Radio(object):
|
class Radio(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.downloader = None
|
self.downloader = None
|
||||||
self.transcoder = None
|
self.transcoder = None
|
||||||
|
self.buffer = None
|
||||||
self.uploader = uploader.Uploader(host, stream)
|
self.uploader = uploader.Uploader(host, stream)
|
||||||
self.playingUrl = None
|
self.playingUrl = None
|
||||||
self.queue = Queue()
|
self.queue = Queue()
|
||||||
@ -23,9 +27,34 @@ class Radio(object):
|
|||||||
# plays the next song in the queue
|
# plays the next song in the queue
|
||||||
def play(self):
|
def play(self):
|
||||||
self.playingUrl = self.queue.get()
|
self.playingUrl = self.queue.get()
|
||||||
self.downloader = downloader.Downloader(self.playingUrl, self.downloadFinished)
|
|
||||||
|
# determine what downloader needs to be used and create the downloader
|
||||||
|
if self.playingUrl.startswith("magnet:?"):
|
||||||
|
# it's a torrent
|
||||||
|
torrent.mountTorrent(self.playingUrl)
|
||||||
|
self.playingUrl = torrent.getTorrentMedia()
|
||||||
|
if self.playingUrl is None:
|
||||||
|
# this torrent is unplayable, skip it
|
||||||
|
torrent.umountTorrent()
|
||||||
|
return self.play()
|
||||||
|
else:
|
||||||
|
self.downloader = downloader.DirectDownloader(self.playingUrl, self.downloadFinished)
|
||||||
|
else:
|
||||||
|
# assume http/https
|
||||||
|
info = downloader.getVideoInfo(self.playingUrl)
|
||||||
|
if info is None:
|
||||||
|
# this url is unplayable, skip it
|
||||||
|
return self.play()
|
||||||
|
elif ("direct" in info and info["direct"] == True) or ("format_id" in info and info["format_id"] == "rtmp"): # stdout for rtmp in ytdl is broken
|
||||||
|
# direct source
|
||||||
|
self.downloader = downloader.DirectDownloader(self.playingUrl, self.downloadFinished)
|
||||||
|
else:
|
||||||
|
# requires youtube-dl
|
||||||
|
self.downloader = downloader.YtdlpDownloader(self.playingUrl, self.downloadFinished)
|
||||||
|
|
||||||
self.transcoder = transcoder.Transcoder(self.downloader)
|
self.transcoder = transcoder.Transcoder(self.downloader)
|
||||||
self.uploader.setUpstream(self.transcoder)
|
self.buffer = buffer.Buffer(self.transcoder)
|
||||||
|
self.uploader.setUpstream(self.buffer)
|
||||||
|
|
||||||
def isPlaying(self):
|
def isPlaying(self):
|
||||||
return not self.playingUrl is None
|
return not self.playingUrl is None
|
||||||
@ -48,7 +77,9 @@ class Radio(object):
|
|||||||
def stopPlaying(self):
|
def stopPlaying(self):
|
||||||
self.downloader.stop()
|
self.downloader.stop()
|
||||||
self.transcoder.stop()
|
self.transcoder.stop()
|
||||||
|
self.buffer.stop()
|
||||||
self.playingUrl = None
|
self.playingUrl = None
|
||||||
|
torrent.umountTorrent() # make sure torrent is unmounted
|
||||||
|
|
||||||
# downloader callback function, called when the downloader is finished
|
# downloader callback function, called when the downloader is finished
|
||||||
# but may still have bytes left that need to be read and played
|
# but may still have bytes left that need to be read and played
|
||||||
@ -89,7 +120,7 @@ def listeners():
|
|||||||
return str(r.listenerCount())
|
return str(r.listenerCount())
|
||||||
|
|
||||||
def run():
|
def run():
|
||||||
app.run(host="0.0.0.0")
|
app.run(host="0.0.0.0", port=int(sys.argv[3]))
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
run()
|
run()
|
5
setup.py
5
setup.py
@ -13,7 +13,10 @@ setup(
|
|||||||
'util',
|
'util',
|
||||||
'stream_listener',
|
'stream_listener',
|
||||||
'transcoder',
|
'transcoder',
|
||||||
'nullsrc'
|
'nullsrc',
|
||||||
|
'buffer',
|
||||||
|
'stream',
|
||||||
|
'torrent'
|
||||||
],
|
],
|
||||||
entry_points={
|
entry_points={
|
||||||
'console_scripts': ['radio = radio:run']
|
'console_scripts': ['radio = radio:run']
|
||||||
|
46
stream.py
Normal file
46
stream.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
from util import non_block_read, non_block_peek
|
||||||
|
|
||||||
|
class StreamSource(object):
|
||||||
|
def getStream(self):
|
||||||
|
""" Gets the python stream for reading from """
|
||||||
|
pass
|
||||||
|
|
||||||
|
class ByteSource(object):
|
||||||
|
def peek(self):
|
||||||
|
""" Gets the bytes at the head of the queue (if any) without incrementing
|
||||||
|
Non-blocking!
|
||||||
|
Returns None if no bytes available
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
def read(self):
|
||||||
|
""" Gets the bytes at the head of the queue (if any)
|
||||||
|
Non-blocking!
|
||||||
|
Returns None if no bytes available
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
class StreamByteSource(ByteSource):
|
||||||
|
def __init__(self, stream):
|
||||||
|
self.stream = stream
|
||||||
|
|
||||||
|
def peek(self):
|
||||||
|
output = non_block_peek(self.stream)
|
||||||
|
if output == None or output == b'':
|
||||||
|
return None
|
||||||
|
return output
|
||||||
|
|
||||||
|
def read(self):
|
||||||
|
output = non_block_read(self.stream)
|
||||||
|
if output == None or output == b'':
|
||||||
|
return None
|
||||||
|
return output
|
||||||
|
|
||||||
|
def toByteSource(src):
|
||||||
|
if isinstance(src, ByteSource):
|
||||||
|
return src
|
||||||
|
elif isinstance(src, StreamSource):
|
||||||
|
return StreamByteSource(src.getStream())
|
||||||
|
elif src is None:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
raise Exception("not implemented")
|
@ -1,25 +1,22 @@
|
|||||||
from threading import Thread, main_thread
|
from threading import Thread, main_thread
|
||||||
from util import non_block_read, non_block_peek
|
from util import non_block_read, non_block_peek
|
||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
|
from stream import toByteSource
|
||||||
|
|
||||||
# time (ms) to wait before switching over to backup source
|
# time (ms) to wait before switching over to backup source
|
||||||
FALLBACK_TIME = 500
|
FALLBACK_TIME = 100
|
||||||
|
|
||||||
def current_milli_time():
|
def current_milli_time():
|
||||||
return round(time() * 1000)
|
return round(time() * 1000)
|
||||||
|
|
||||||
def empty(output):
|
|
||||||
return output == None or output == b''
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# In a new thread, listens for new data on the stream
|
# In a new thread, listens for new data on the stream
|
||||||
# and passes it to the listener when available
|
# and passes it to the listener when available
|
||||||
#
|
#
|
||||||
# Upstreams implement getStream()
|
# Upstreams implement class StreamSource or ByteSource
|
||||||
# Downstreams implement write() for bytes to be given to
|
# Downstreams implement write() for bytes to be given to
|
||||||
#
|
#
|
||||||
class StreamListener(Thread):
|
class StreamListener(Thread):
|
||||||
|
|
||||||
def __init__(self, upstream, downstream, backupUpstream=None):
|
def __init__(self, upstream, downstream, backupUpstream=None):
|
||||||
Thread.__init__(self)
|
Thread.__init__(self)
|
||||||
self.setUpstream(upstream)
|
self.setUpstream(upstream)
|
||||||
@ -29,26 +26,20 @@ class StreamListener(Thread):
|
|||||||
self.lastData = 0
|
self.lastData = 0
|
||||||
|
|
||||||
def setUpstream(self, upstream):
|
def setUpstream(self, upstream):
|
||||||
if not upstream is None:
|
self.src = toByteSource(upstream)
|
||||||
self.stream = upstream.getStream()
|
|
||||||
else:
|
|
||||||
self.stream = None
|
|
||||||
|
|
||||||
def setBackupUpstream(self, upstream):
|
def setBackupUpstream(self, upstream):
|
||||||
if not upstream is None:
|
self.backupSrc = toByteSource(upstream)
|
||||||
self.backupStream = upstream.getStream()
|
|
||||||
else:
|
|
||||||
self.backupStream = None
|
|
||||||
|
|
||||||
def setDownstream(self, downstream):
|
def setDownstream(self, downstream):
|
||||||
self.listener = downstream
|
self.listener = downstream
|
||||||
|
|
||||||
# blocks until there is nothing left to read from upstream
|
# blocks until there is nothing left to read from upstream
|
||||||
def blockUntilEmpty(self):
|
def blockUntilEmpty(self):
|
||||||
if not self.stream is None:
|
if not self.src is None:
|
||||||
while True:
|
while True:
|
||||||
output = non_block_peek(self.stream)
|
output = self.src.peek()
|
||||||
if output == None or output == b'':
|
if output == None:
|
||||||
break
|
break
|
||||||
sleep(0.1)
|
sleep(0.1)
|
||||||
|
|
||||||
@ -59,13 +50,13 @@ class StreamListener(Thread):
|
|||||||
while main_thread().is_alive() and not self.quit:
|
while main_thread().is_alive() and not self.quit:
|
||||||
while True:
|
while True:
|
||||||
output = None
|
output = None
|
||||||
if not self.stream is None:
|
if not self.src is None:
|
||||||
output = non_block_read(self.stream)
|
output = self.src.read()
|
||||||
if empty(output) and not self.backupStream is None and current_milli_time() - self.lastData > FALLBACK_TIME:
|
if output == None and not self.backupSrc is None and current_milli_time() - self.lastData > FALLBACK_TIME:
|
||||||
output = non_block_read(self.backupStream)
|
output = self.backupSrc.read()
|
||||||
else:
|
else:
|
||||||
self.lastData = current_milli_time()
|
self.lastData = current_milli_time()
|
||||||
if empty(output):
|
if output == None:
|
||||||
break
|
break
|
||||||
self.listener.write(output)
|
self.listener.write(output)
|
||||||
sleep(0.1)
|
sleep(0.1)
|
325
torrent.py
Normal file
325
torrent.py
Normal file
@ -0,0 +1,325 @@
|
|||||||
|
from urllib.parse import quote
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
import mimetypes
|
||||||
|
from random import randint
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
import libtorrent as lt
|
||||||
|
|
||||||
|
mimetypes.init()
|
||||||
|
STATUSES = [
|
||||||
|
'queued', 'checking', 'downloading_metadata', 'downloading', 'finished',
|
||||||
|
'seeding', 'allocating', 'checking_fastresume'
|
||||||
|
]
|
||||||
|
|
||||||
|
TRACKERS = (
|
||||||
|
"udp://tracker.openbittorrent.com:80/announce",
|
||||||
|
"udp://tracker.publicbt.com:80/announce"
|
||||||
|
)
|
||||||
|
|
||||||
|
DHT = (
|
||||||
|
("router.utorrent.com", 6881),
|
||||||
|
("router.bittorrent.com", 6881),
|
||||||
|
("dht.transmissionbt.com", 6881),
|
||||||
|
("router.bitcomet.com", 6881),
|
||||||
|
("dht.aelitis.com", 6881)
|
||||||
|
)
|
||||||
|
|
||||||
|
EXTENSIONS = ('ut_pex', 'ut_metadata', 'smart_ban', 'metadata_transfoer')
|
||||||
|
|
||||||
|
PORTS = (randint(1024, 2000), randint(1024, 2000))
|
||||||
|
|
||||||
|
def get_indexed(func):
|
||||||
|
"""Return currently indedex torrent"""
|
||||||
|
def inner(*args, **kwargs):
|
||||||
|
"""Executes a method, and returns result[class_instance.index]"""
|
||||||
|
return list(func(*args, **kwargs)())[args[0].index]
|
||||||
|
return inner
|
||||||
|
|
||||||
|
class TorrentSession:
|
||||||
|
"""Represent a torrent session. May handle multiple torrents"""
|
||||||
|
def __init__(self, ports=PORTS, extensions=EXTENSIONS, dht_routers=DHT):
|
||||||
|
self.session = lt.session()
|
||||||
|
# self.session.set_severity_level(lt.alert.severity_levels.critical)
|
||||||
|
self.session.listen_on(*ports)
|
||||||
|
for extension in extensions:
|
||||||
|
self.session.add_extension(extension)
|
||||||
|
self.session.start_dht()
|
||||||
|
self.session.start_lsd()
|
||||||
|
self.session.start_upnp()
|
||||||
|
self.session.start_natpmp()
|
||||||
|
for router in dht_routers:
|
||||||
|
self.session.add_dht_router(*router)
|
||||||
|
self.torrents = []
|
||||||
|
|
||||||
|
def remove_torrent(self, *args, **kwargs):
|
||||||
|
"""Remove torrent from session."""
|
||||||
|
self.session.remove_torrent(*args, **kwargs)
|
||||||
|
|
||||||
|
def add_torrent(self, **kwargs):
|
||||||
|
"""Add a torrent to this session
|
||||||
|
|
||||||
|
For accepted parameters reference, see over `Torrent` definition.
|
||||||
|
"""
|
||||||
|
torrent = Torrent(session=self, **kwargs)
|
||||||
|
self.torrents.append(torrent)
|
||||||
|
return torrent
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
"""Iterating trough a session will give you all the currently-downloading torrents"""
|
||||||
|
return iter(self.torrents)
|
||||||
|
|
||||||
|
class Torrent:
|
||||||
|
"""Wrapper over libtorrent"""
|
||||||
|
def __init__(self, magnet_link: str, session: TorrentSession, trackers: tuple = TRACKERS,
|
||||||
|
remove_after: bool = False, **params):
|
||||||
|
"""Set default parameters to a magnet link, and add ourselves to a session
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
magnet_link: Magnet link. Currently torrent files are not supported
|
||||||
|
session: TorrentSession instance
|
||||||
|
trackers: Tracker list to add to magnet link. Defaults to TRACKERS
|
||||||
|
constant
|
||||||
|
remove_after: Delete download dir upon __exit__. Only if params.save_path has not been specified
|
||||||
|
save_path: Path to save the torrent into. A temporary directory
|
||||||
|
will be created if not specified
|
||||||
|
storage_mode: Property of lt.storage_mode_t
|
||||||
|
"""
|
||||||
|
self.session = session
|
||||||
|
self.temp_dir = None
|
||||||
|
self.remove_after = remove_after
|
||||||
|
self.params = {
|
||||||
|
'save_path': None,
|
||||||
|
'storage_mode': lt.storage_mode_t.storage_mode_sparse,
|
||||||
|
**params
|
||||||
|
}
|
||||||
|
|
||||||
|
#: Force trackers into magnet link. Not the best coding practice.
|
||||||
|
trackers = (quote(t, safe='') for t in trackers)
|
||||||
|
self.magnet_link = f'{magnet_link}&tr={"&tr=".join(trackers)}'
|
||||||
|
self.handle = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
if not self.params.get('save_path'):
|
||||||
|
self.temp_dir = tempfile.TemporaryDirectory()
|
||||||
|
self.params['save_path'] = self.temp_dir.name
|
||||||
|
self.handle = lt.add_magnet_uri(self.session.session, self.magnet_link, self.params)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *args, **kwargs):
|
||||||
|
if self.temp_dir and self.remove_after:
|
||||||
|
self.temp_dir.cleanup()
|
||||||
|
self.session.remove_torrent(self.handle)
|
||||||
|
|
||||||
|
def sequential(self, value: bool):
|
||||||
|
"""Set sequential download"""
|
||||||
|
self.handle.set_sequential_download(value)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def queue(self):
|
||||||
|
""" Download queue """
|
||||||
|
return self.handle.get_download_queue()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def queue_status(self):
|
||||||
|
""" Returns a represented queue status """
|
||||||
|
state_char = [' ', '-', '=', '#']
|
||||||
|
|
||||||
|
def repr_piece(piece):
|
||||||
|
""" Represents a piece """
|
||||||
|
return {
|
||||||
|
piece['piece_index']:
|
||||||
|
[state_char[block['state']] for block in piece['blocks']]
|
||||||
|
}
|
||||||
|
|
||||||
|
return [repr_piece(piece) for piece in self.queue]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
""" Torrent name """
|
||||||
|
if not self.handle.has_metadata():
|
||||||
|
return "N/A"
|
||||||
|
return self.torrent_info.name()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def status(self):
|
||||||
|
"""
|
||||||
|
Return a status dict.
|
||||||
|
"""
|
||||||
|
status = self.handle.status()
|
||||||
|
result = {
|
||||||
|
'name': self.name,
|
||||||
|
'download': status.download_rate,
|
||||||
|
'total_download': status.total_download,
|
||||||
|
'upload': status.upload_rate,
|
||||||
|
'total_upload': status.total_upload
|
||||||
|
}
|
||||||
|
|
||||||
|
if not self.finished:
|
||||||
|
result.update({
|
||||||
|
'state': STATUSES[status.state],
|
||||||
|
'total_downloaded': status.total_done,
|
||||||
|
'peers': status.num_peers,
|
||||||
|
'seeds': status.num_seeds,
|
||||||
|
'progress': '%5.4f%%' % (status.progress * 100),
|
||||||
|
})
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
@property
|
||||||
|
def finished(self):
|
||||||
|
"""Checks if torrent is finished."""
|
||||||
|
return self.handle.is_finished()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def started(self):
|
||||||
|
""" Checks if handle has metadata"""
|
||||||
|
return self.handle.has_metadata()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def torrent_info(self):
|
||||||
|
"""Return handle.torrent_info"""
|
||||||
|
return self.handle.get_torrent_info()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def files(self):
|
||||||
|
"""Returns a `TorrentFile` object for each file"""
|
||||||
|
fnum = range(len(self.torrent_info.files()))
|
||||||
|
return [TorrentFile(self, i) for i in fnum]
|
||||||
|
|
||||||
|
def update_priorities(self):
|
||||||
|
"""Update file priorities with self.files."""
|
||||||
|
self.handle.prioritize_files([a.priority for a in self.files])
|
||||||
|
|
||||||
|
def download_only(self, file):
|
||||||
|
""" Filter out priorities for every file except this one"""
|
||||||
|
if file not in self.files:
|
||||||
|
return None
|
||||||
|
for file_ in self.files:
|
||||||
|
file.priority = 7 if file == file_ else 0
|
||||||
|
return file
|
||||||
|
|
||||||
|
def wait_for(self, status):
|
||||||
|
"""Wait for a specific status
|
||||||
|
|
||||||
|
Example:
|
||||||
|
>>> # This will wait for a torrent to start, and return the torrent
|
||||||
|
>>> torrent = await Torrent("magnet:...").wait_for('started')
|
||||||
|
|
||||||
|
>>> # This will wait for a torrent to finish, and return the torrent
|
||||||
|
>>> torrent = await Torrent("magnet:...").wait_for('finished')
|
||||||
|
"""
|
||||||
|
while not getattr(self, status):
|
||||||
|
sleep(0.5)
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
"""Iterating trough a Torrent instance will return each TorrentFile"""
|
||||||
|
return iter(self.files)
|
||||||
|
|
||||||
|
class TorrentFile:
|
||||||
|
""" Wrapper over libtorrent.file """
|
||||||
|
def __init__(self, parent: Torrent, index: int):
|
||||||
|
self.root = parent.params.get('save_path')
|
||||||
|
self.index = index
|
||||||
|
self.handle = parent.handle
|
||||||
|
self.torrent = parent
|
||||||
|
|
||||||
|
def wait_for_completion(self, percent):
|
||||||
|
while self.completed_percent < percent:
|
||||||
|
sleep(1)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def full_path(self):
|
||||||
|
return f'{self.root}/{self.path}'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def mime_type(self):
|
||||||
|
"""Return file mimetype"""
|
||||||
|
return mimetypes.guess_type(self.path)[0] or ''
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_media(self):
|
||||||
|
"""Return true if file is a media type"""
|
||||||
|
return any(self.mime_type.startswith(f) for f in ('audio', 'video'))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def path(self):
|
||||||
|
"""Return torrent path on filesystem"""
|
||||||
|
return self.hfile.path
|
||||||
|
|
||||||
|
@property
|
||||||
|
def file(self):
|
||||||
|
"""Return a file object with this file's path open in rb mode """
|
||||||
|
return open(self.path, 'rb')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def filehash(self):
|
||||||
|
"""File hash"""
|
||||||
|
return self.hfile.filehash
|
||||||
|
|
||||||
|
@property
|
||||||
|
def size(self):
|
||||||
|
"""File size"""
|
||||||
|
return self.hfile.size
|
||||||
|
|
||||||
|
@property
|
||||||
|
@get_indexed
|
||||||
|
def hfile(self):
|
||||||
|
""" Return file from libtorrent """
|
||||||
|
return self.handle.get_torrent_info().files
|
||||||
|
|
||||||
|
@property
|
||||||
|
@get_indexed
|
||||||
|
def priority(self):
|
||||||
|
""" Readonly file priority from libtorrent """
|
||||||
|
return self.handle.file_priorities
|
||||||
|
|
||||||
|
@priority.setter
|
||||||
|
def priority(self, value):
|
||||||
|
self._priority = value
|
||||||
|
self.parent.update_priorities()
|
||||||
|
|
||||||
|
# @property
|
||||||
|
# @get_indexed
|
||||||
|
# def file_progress(self):
|
||||||
|
# """ Returns file progress """
|
||||||
|
# return self.handle.file_progress
|
||||||
|
|
||||||
|
@property
|
||||||
|
def completed_percent(self):
|
||||||
|
""" Returns this file completed percentage """
|
||||||
|
# return (self.file_progress / self.size) * 100
|
||||||
|
return self.handle.status().progress * 100
|
||||||
|
|
||||||
|
session = TorrentSession()
|
||||||
|
torrent = None
|
||||||
|
|
||||||
|
def mountTorrent(magnet):
|
||||||
|
global session
|
||||||
|
global torrent
|
||||||
|
print("mounting torrent...")
|
||||||
|
torrent = session.add_torrent(magnet_link=magnet, remove_after=True)
|
||||||
|
torrent.__enter__()
|
||||||
|
torrent.sequential(True)
|
||||||
|
# Wait for torrent to be started
|
||||||
|
torrent.wait_for('started')
|
||||||
|
|
||||||
|
def umountTorrent():
|
||||||
|
global session
|
||||||
|
global torrent
|
||||||
|
print("unmounting torrent...")
|
||||||
|
torrent.__exit__()
|
||||||
|
|
||||||
|
def getTorrentMedia():
|
||||||
|
global session
|
||||||
|
global torrent
|
||||||
|
print("Getting torrent media...")
|
||||||
|
# Get first match of a media file
|
||||||
|
try:
|
||||||
|
media = next(a for a in torrent if a.is_media and not 'sample' in a.path.lower())
|
||||||
|
except StopIteration:
|
||||||
|
return None # no playable media
|
||||||
|
# Wait for 5% download
|
||||||
|
media.wait_for_completion(5)
|
||||||
|
return media.full_path
|
@ -1,9 +1,13 @@
|
|||||||
import ffmpeg
|
import ffmpeg
|
||||||
from logger import logger
|
from logger import logger
|
||||||
from stream_listener import StreamListener
|
from stream_listener import StreamListener
|
||||||
|
from stream import StreamSource
|
||||||
|
import os
|
||||||
|
|
||||||
# converts the stream to mp3 before sending to the long-lasting mp3 connection
|
# converts the stream to mp3 before sending to the long-lasting mp3 connection
|
||||||
class Transcoder(object):
|
# write to a fifo file instead of directly via stdin to fool ffmpeg into thinking it has seekable input
|
||||||
|
# https://lists.libav.org/pipermail/ffmpeg-user/2007-May/008917.html
|
||||||
|
class Transcoder(StreamSource):
|
||||||
|
|
||||||
def __init__(self, upstream):
|
def __init__(self, upstream):
|
||||||
self.process = ( ffmpeg
|
self.process = ( ffmpeg
|
||||||
@ -20,7 +24,6 @@ class Transcoder(object):
|
|||||||
self.process.stdin.close()
|
self.process.stdin.close()
|
||||||
self.process.stdout.close()
|
self.process.stdout.close()
|
||||||
self.process.stderr.close()
|
self.process.stderr.close()
|
||||||
# self.process.wait()
|
|
||||||
|
|
||||||
def getStream(self):
|
def getStream(self):
|
||||||
return self.process.stdout
|
return self.process.stdout
|
||||||
|
@ -31,9 +31,10 @@ class Uploader(object):
|
|||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.listener.stop()
|
self.listener.stop()
|
||||||
|
if not self.process.stdin.closed:
|
||||||
self.process.stdin.close()
|
self.process.stdin.close()
|
||||||
|
if not self.process.stderr.closed:
|
||||||
self.process.stderr.close()
|
self.process.stderr.close()
|
||||||
self.process.wait()
|
|
||||||
|
|
||||||
def write(self, chunk):
|
def write(self, chunk):
|
||||||
while True:
|
while True:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user