Compare commits

...

31 Commits

Author SHA1 Message Date
zuckerberg
eb95b31089 download only in real time 2021-10-03 15:11:25 -04:00
zuckerberg
514318311e decode utf-8 2021-10-03 15:05:36 -04:00
zuckerberg
1bb789dedf Download torrent directly using libtorrent 2021-10-03 14:37:38 -04:00
zuckerberg
a79b0e99bc remove fuse 2021-10-02 18:02:52 -04:00
zuckerberg
4255c43cf6 Direct path to wrapped fusermount 2021-10-02 17:38:04 -04:00
zuckerberg
b9b050fec1 Direct path to wrapped fusermount 2021-10-02 17:23:42 -04:00
zuckerberg
544b090b58 allow other users for torrent 2021-09-25 21:37:35 -04:00
zuckerberg
462c835232 try to sleep 2021-09-25 21:19:11 -04:00
zuckerberg
b8f36cc59f debug 2021-09-24 18:20:57 -04:00
zuckerberg
96f6572a4b debug 2021-09-24 17:52:26 -04:00
zuckerberg
209e5e2292 fix storage location 2021-09-24 16:43:33 -04:00
zuckerberg
658c84d5df handle missing directory 2021-09-24 16:37:27 -04:00
zuckerberg
72995c0d3a bittorrent media source 2021-09-24 12:45:07 -04:00
zuckerberg
5bf607fed9 fix playing playlists 2021-09-13 22:13:09 -04:00
zuckerberg
462d06e84a support other types of streams such as rtmp 2021-09-08 21:21:46 -04:00
zuckerberg
2a95c597cc support other types of streams such as rtmp 2021-09-08 21:09:31 -04:00
zuckerberg
7576e847ca support other types of streams such as rtmp 2021-09-08 20:58:54 -04:00
zuckerberg
5a5b0afaaa support direct mp4 files 2021-09-08 19:32:46 -04:00
zuckerberg
5de2148123 open file in correct order 2021-09-06 17:30:39 -04:00
zuckerberg
1fcffb7b00 open file correctly 2021-09-06 17:20:42 -04:00
zuckerberg
636fd12b5f open file correctly 2021-09-06 17:12:09 -04:00
zuckerberg
d5eca0a039 use fifo file to fool ffmpeg 2021-09-06 17:01:21 -04:00
zuckerberg
b0867df965 set bitrate to normal 2021-09-06 14:50:03 -04:00
zuckerberg
ca3eac69b8 undo opus experiment 2021-09-06 14:26:58 -04:00
zuckerberg
092cbb824d opus test 2021-09-06 14:18:40 -04:00
zuckerberg
d83b55799e opus test 2021-09-06 14:01:36 -04:00
zuckerberg
ec016a2186 remove option that breaks downloads... 2021-09-03 18:24:35 -04:00
zuckerberg
86f35f4996 move to yt-dlp 2021-09-03 18:09:52 -04:00
zuckerberg
d0a08a202b specify ports and locations via cmd line 2021-08-28 09:56:34 -04:00
zuckerberg
b6a53eafef update radio port 2021-08-27 20:53:35 -04:00
zuckerberg
71a1443301 internel buffer for input song 2021-06-17 21:07:46 -04:00
12 changed files with 597 additions and 67 deletions

55
buffer.py Normal file
View File

@ -0,0 +1,55 @@
from stream import ByteSource
from queue import Queue
from threading import Lock
from time import sleep
from stream_listener import StreamListener
# a simple FIFO buffer for ensuring stream stability
class Buffer(ByteSource):
def __init__(self, upstream, size=200000):
self.fifo = Queue()
self.targetSize = size
self.size = 0
self.quit = False
self.mutex = Lock()
self.listener = StreamListener(upstream, self)
self.listener.start()
def stop(self):
self.quit = True
self.listener.stop()
def peek(self):
self.mutex.acquire()
try:
if self.size > 0 and not self.quit:
return self.fifo.queue[0]
else:
return None
finally:
self.mutex.release()
def read(self):
self.mutex.acquire()
try:
if self.size > 0 and not self.quit:
out = self.fifo.get_nowait()
self.size -= len(out)
return out
else:
return None
finally:
self.mutex.release()
def write(self, chunk):
while not self.quit:
try:
self.mutex.acquire()
if self.size < self.targetSize:
self.size += len(chunk)
self.fifo.put(chunk)
break
finally:
self.mutex.release()
sleep(0.1)

View File

@ -4,5 +4,5 @@ pkgs.python3Packages.buildPythonApplication {
pname = "radio";
src = self;
version = "0.1";
propagatedBuildInputs = with pkgs.python3Packages; [ pip ffmpeg-python flask requests pkgs.ffmpeg ];
propagatedBuildInputs = with pkgs.python3Packages; [ pip ffmpeg-python flask requests pkgs.ffmpeg libtorrent-rasterbar ];
}

View File

@ -1,60 +1,100 @@
#
# Downloads the video/audio as a stream from a provided link using youtube-dl
# Downloads the video/audio as a stream from a provided link using yt-dlp
# does not save the file, only the most recent fragment is held. Thus, this is
# ideal for devices with little memory
# TODO gather video metadata before download
#
import ffmpeg
import tempfile
import sys
import subprocess
import os
import pip
import signal
import json
import shutil
import glob
from logger import logger
from threading import Thread, main_thread
from time import sleep
from stream import StreamSource
def updateYoutubeDL():
pip.main(['install', '--target=' + dirpath, '--upgrade', 'youtube_dl'])
def updateYtdlp():
pip.main(['install', '--target=' + dirpath, '--upgrade', 'yt-dlp'])
def importYoutubeDL():
return __import__('youtube_dl')
return __import__('yt-dlp')
dirpath = tempfile.mkdtemp()
sys.path.append(dirpath)
updateYoutubeDL()
updateYtdlp()
class Downloader(Thread):
def __init__(self, url, cb):
Thread.__init__(self)
# update youtube-dl
# TODO: do this only every once in a while
# updateYoutubeDL()
self.cb = cb
self.exit = False
env = dict(os.environ)
env["PYTHONPATH"] = dirpath
cmd = [
sys.executable,
dirpath + "/bin/youtube-dl",
"-o", "-",
"-f", "bestaudio/best",
baseOpts = [
"-f", "bestaudio/best", # select for best audio
# "--audio-format", "mp3", "-x", # cannot do because it requires a tmp file to re-encode
"--prefer-ffmpeg",
"--no-mark-watched",
"--geo-bypass",
"--no-playlist",
"--retries", "100",
"--extractor-retries", "100",
# "--throttled-rate", "100K", # get around youtube throttling; probably not needed anymore
"--no-call-home",
url
"--sponsorblock-remove", "sponsor,intro,selfpromo,interaction,preview,music_offtopic",
]
self.popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)
logger.add(self.popen.stderr, "youtube-dl.log")
def runYtdlp(opts):
# update yt-dlp
# TODO: do this only every once in a while
# updateYtdlp()
env = dict(os.environ)
env["PYTHONPATH"] = dirpath
cmd = [
sys.executable,
dirpath + "/bin/yt-dlp"
] + baseOpts + opts
popen = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)
logger.add(popen.stderr, "yt-dlp.log")
return popen
def getVideoInfo(url):
popen = runYtdlp([
"-j", # dump all metadata as json
"--playlist-items", "1", # don't go through every single item in playlist
url
])
j = popen.communicate()[0]
# make sure the ytdlp instance is closed
if popen.poll() is None: # is alive?
os.killpg(os.getpgid(popen.pid), signal.SIGTERM) # kill
popen.stdout.close()
# keys of interest (but not always present)
# title, direct, extractor, upload_date
# format_id (machine readable)
# view_count, uploader, duration, view_count, average_rating
# age_limit, like_count, dislike_count, thumbnail,
try:
return json.loads(j)
except:
return None
# Downloads using yt-dlp
class YtdlpDownloader(Thread, StreamSource):
def __init__(self, url, cb):
Thread.__init__(self)
self.cb = cb
self.exit = False
self.popen = runYtdlp([
"-o", "-", # output stream to stdout
url
])
self.start()
def isAlive(self):
@ -64,7 +104,6 @@ class Downloader(Thread):
if self.isAlive():
os.killpg(os.getpgid(self.popen.pid), signal.SIGTERM)
self.popen.stdout.close()
# self.popen.wait()
self.exit = True
# checks to see if the current download has finished
@ -77,3 +116,39 @@ class Downloader(Thread):
def getStream(self):
return self.popen.stdout
# Downloads the file and plays it (must be a direct URL, ex: cannot be a yt link)
class DirectDownloader(Thread, StreamSource):
def __init__(self, url, cb):
Thread.__init__(self)
self.cb = cb
self.exit = False
self.process = ( ffmpeg
.input(url, re=None)
.output('pipe:', format='mp3')
.run_async(pipe_stdout=True, pipe_stderr=True)
)
logger.add(self.process.stderr, "direct-downloader.log")
self.start()
def isAlive(self):
return self.process.poll() is None
# checks to see if the current download has finished
# if yes, cleans up and fires callback
def run(self):
while main_thread().is_alive() and not self.exit:
if not self.isAlive():
self.cb()
sleep(0.1)
def stop(self):
self.process.stdout.close()
self.process.stderr.close()
self.exit = True
def getStream(self):
return self.process.stdout

View File

@ -37,9 +37,9 @@ class Logger(Thread):
output = non_block_read(stream)
if output == None or output == b'':
break
print(output.decode('ascii'))
f.write(output)
f.flush()
print(output.decode('utf-8'))
finally:
self.mutex.release()
sleep(0.1)

View File

@ -1,9 +1,10 @@
import ffmpeg
from logger import logger
from stream import StreamSource
# A null audio source
class NullSrc(object):
class NullSrc(StreamSource):
def __init__(self):
self.process = ( ffmpeg
@ -16,7 +17,6 @@ class NullSrc(object):
def stop(self):
self.process.stdout.close()
self.process.stderr.close()
# self.process.wait()
def getStream(self):
return self.process.stdout

View File

@ -1,21 +1,25 @@
import downloader
import uploader
import transcoder
import torrent
import buffer
from time import sleep
from flask import Flask, request
from queue import Queue
import json
import requests
import sys
app = Flask(__name__)
host = "localhost:8000"
stream = "stream.mp3"
host = sys.argv[1]
stream = sys.argv[2]
class Radio(object):
def __init__(self):
self.downloader = None
self.transcoder = None
self.buffer = None
self.uploader = uploader.Uploader(host, stream)
self.playingUrl = None
self.queue = Queue()
@ -23,9 +27,34 @@ class Radio(object):
# plays the next song in the queue
def play(self):
self.playingUrl = self.queue.get()
self.downloader = downloader.Downloader(self.playingUrl, self.downloadFinished)
# determine what downloader needs to be used and create the downloader
if self.playingUrl.startswith("magnet:?"):
# it's a torrent
torrent.mountTorrent(self.playingUrl)
self.playingUrl = torrent.getTorrentMedia()
if self.playingUrl is None:
# this torrent is unplayable, skip it
torrent.umountTorrent()
return self.play()
else:
self.downloader = downloader.DirectDownloader(self.playingUrl, self.downloadFinished)
else:
# assume http/https
info = downloader.getVideoInfo(self.playingUrl)
if info is None:
# this url is unplayable, skip it
return self.play()
elif ("direct" in info and info["direct"] == True) or ("format_id" in info and info["format_id"] == "rtmp"): # stdout for rtmp in ytdl is broken
# direct source
self.downloader = downloader.DirectDownloader(self.playingUrl, self.downloadFinished)
else:
# requires youtube-dl
self.downloader = downloader.YtdlpDownloader(self.playingUrl, self.downloadFinished)
self.transcoder = transcoder.Transcoder(self.downloader)
self.uploader.setUpstream(self.transcoder)
self.buffer = buffer.Buffer(self.transcoder)
self.uploader.setUpstream(self.buffer)
def isPlaying(self):
return not self.playingUrl is None
@ -48,7 +77,9 @@ class Radio(object):
def stopPlaying(self):
self.downloader.stop()
self.transcoder.stop()
self.buffer.stop()
self.playingUrl = None
torrent.umountTorrent() # make sure torrent is unmounted
# downloader callback function, called when the downloader is finished
# but may still have bytes left that need to be read and played
@ -89,7 +120,7 @@ def listeners():
return str(r.listenerCount())
def run():
app.run(host="0.0.0.0")
app.run(host="0.0.0.0", port=int(sys.argv[3]))
if __name__ == "__main__":
run()

View File

@ -13,7 +13,10 @@ setup(
'util',
'stream_listener',
'transcoder',
'nullsrc'
'nullsrc',
'buffer',
'stream',
'torrent'
],
entry_points={
'console_scripts': ['radio = radio:run']

46
stream.py Normal file
View File

@ -0,0 +1,46 @@
from util import non_block_read, non_block_peek
class StreamSource(object):
def getStream(self):
""" Gets the python stream for reading from """
pass
class ByteSource(object):
def peek(self):
""" Gets the bytes at the head of the queue (if any) without incrementing
Non-blocking!
Returns None if no bytes available
"""
pass
def read(self):
""" Gets the bytes at the head of the queue (if any)
Non-blocking!
Returns None if no bytes available
"""
pass
class StreamByteSource(ByteSource):
def __init__(self, stream):
self.stream = stream
def peek(self):
output = non_block_peek(self.stream)
if output == None or output == b'':
return None
return output
def read(self):
output = non_block_read(self.stream)
if output == None or output == b'':
return None
return output
def toByteSource(src):
if isinstance(src, ByteSource):
return src
elif isinstance(src, StreamSource):
return StreamByteSource(src.getStream())
elif src is None:
return None
else:
raise Exception("not implemented")

View File

@ -1,25 +1,22 @@
from threading import Thread, main_thread
from util import non_block_read, non_block_peek
from time import sleep, time
from stream import toByteSource
# time (ms) to wait before switching over to backup source
FALLBACK_TIME = 500
FALLBACK_TIME = 100
def current_milli_time():
return round(time() * 1000)
def empty(output):
return output == None or output == b''
#
# In a new thread, listens for new data on the stream
# and passes it to the listener when available
#
# Upstreams implement getStream()
# Upstreams implement class StreamSource or ByteSource
# Downstreams implement write() for bytes to be given to
#
class StreamListener(Thread):
def __init__(self, upstream, downstream, backupUpstream=None):
Thread.__init__(self)
self.setUpstream(upstream)
@ -29,26 +26,20 @@ class StreamListener(Thread):
self.lastData = 0
def setUpstream(self, upstream):
if not upstream is None:
self.stream = upstream.getStream()
else:
self.stream = None
self.src = toByteSource(upstream)
def setBackupUpstream(self, upstream):
if not upstream is None:
self.backupStream = upstream.getStream()
else:
self.backupStream = None
self.backupSrc = toByteSource(upstream)
def setDownstream(self, downstream):
self.listener = downstream
# blocks until there is nothing left to read from upstream
def blockUntilEmpty(self):
if not self.stream is None:
if not self.src is None:
while True:
output = non_block_peek(self.stream)
if output == None or output == b'':
output = self.src.peek()
if output == None:
break
sleep(0.1)
@ -59,13 +50,13 @@ class StreamListener(Thread):
while main_thread().is_alive() and not self.quit:
while True:
output = None
if not self.stream is None:
output = non_block_read(self.stream)
if empty(output) and not self.backupStream is None and current_milli_time() - self.lastData > FALLBACK_TIME:
output = non_block_read(self.backupStream)
if not self.src is None:
output = self.src.read()
if output == None and not self.backupSrc is None and current_milli_time() - self.lastData > FALLBACK_TIME:
output = self.backupSrc.read()
else:
self.lastData = current_milli_time()
if empty(output):
if output == None:
break
self.listener.write(output)
sleep(0.1)

325
torrent.py Normal file
View File

@ -0,0 +1,325 @@
from urllib.parse import quote
import tempfile
import os
import mimetypes
from random import randint
from time import sleep
import libtorrent as lt
mimetypes.init()
STATUSES = [
'queued', 'checking', 'downloading_metadata', 'downloading', 'finished',
'seeding', 'allocating', 'checking_fastresume'
]
TRACKERS = (
"udp://tracker.openbittorrent.com:80/announce",
"udp://tracker.publicbt.com:80/announce"
)
DHT = (
("router.utorrent.com", 6881),
("router.bittorrent.com", 6881),
("dht.transmissionbt.com", 6881),
("router.bitcomet.com", 6881),
("dht.aelitis.com", 6881)
)
EXTENSIONS = ('ut_pex', 'ut_metadata', 'smart_ban', 'metadata_transfoer')
PORTS = (randint(1024, 2000), randint(1024, 2000))
def get_indexed(func):
"""Return currently indedex torrent"""
def inner(*args, **kwargs):
"""Executes a method, and returns result[class_instance.index]"""
return list(func(*args, **kwargs)())[args[0].index]
return inner
class TorrentSession:
"""Represent a torrent session. May handle multiple torrents"""
def __init__(self, ports=PORTS, extensions=EXTENSIONS, dht_routers=DHT):
self.session = lt.session()
# self.session.set_severity_level(lt.alert.severity_levels.critical)
self.session.listen_on(*ports)
for extension in extensions:
self.session.add_extension(extension)
self.session.start_dht()
self.session.start_lsd()
self.session.start_upnp()
self.session.start_natpmp()
for router in dht_routers:
self.session.add_dht_router(*router)
self.torrents = []
def remove_torrent(self, *args, **kwargs):
"""Remove torrent from session."""
self.session.remove_torrent(*args, **kwargs)
def add_torrent(self, **kwargs):
"""Add a torrent to this session
For accepted parameters reference, see over `Torrent` definition.
"""
torrent = Torrent(session=self, **kwargs)
self.torrents.append(torrent)
return torrent
def __iter__(self):
"""Iterating trough a session will give you all the currently-downloading torrents"""
return iter(self.torrents)
class Torrent:
"""Wrapper over libtorrent"""
def __init__(self, magnet_link: str, session: TorrentSession, trackers: tuple = TRACKERS,
remove_after: bool = False, **params):
"""Set default parameters to a magnet link, and add ourselves to a session
Arguments:
magnet_link: Magnet link. Currently torrent files are not supported
session: TorrentSession instance
trackers: Tracker list to add to magnet link. Defaults to TRACKERS
constant
remove_after: Delete download dir upon __exit__. Only if params.save_path has not been specified
save_path: Path to save the torrent into. A temporary directory
will be created if not specified
storage_mode: Property of lt.storage_mode_t
"""
self.session = session
self.temp_dir = None
self.remove_after = remove_after
self.params = {
'save_path': None,
'storage_mode': lt.storage_mode_t.storage_mode_sparse,
**params
}
#: Force trackers into magnet link. Not the best coding practice.
trackers = (quote(t, safe='') for t in trackers)
self.magnet_link = f'{magnet_link}&tr={"&tr=".join(trackers)}'
self.handle = None
def __enter__(self):
if not self.params.get('save_path'):
self.temp_dir = tempfile.TemporaryDirectory()
self.params['save_path'] = self.temp_dir.name
self.handle = lt.add_magnet_uri(self.session.session, self.magnet_link, self.params)
return self
def __exit__(self, *args, **kwargs):
if self.temp_dir and self.remove_after:
self.temp_dir.cleanup()
self.session.remove_torrent(self.handle)
def sequential(self, value: bool):
"""Set sequential download"""
self.handle.set_sequential_download(value)
@property
def queue(self):
""" Download queue """
return self.handle.get_download_queue()
@property
def queue_status(self):
""" Returns a represented queue status """
state_char = [' ', '-', '=', '#']
def repr_piece(piece):
""" Represents a piece """
return {
piece['piece_index']:
[state_char[block['state']] for block in piece['blocks']]
}
return [repr_piece(piece) for piece in self.queue]
@property
def name(self):
""" Torrent name """
if not self.handle.has_metadata():
return "N/A"
return self.torrent_info.name()
@property
def status(self):
"""
Return a status dict.
"""
status = self.handle.status()
result = {
'name': self.name,
'download': status.download_rate,
'total_download': status.total_download,
'upload': status.upload_rate,
'total_upload': status.total_upload
}
if not self.finished:
result.update({
'state': STATUSES[status.state],
'total_downloaded': status.total_done,
'peers': status.num_peers,
'seeds': status.num_seeds,
'progress': '%5.4f%%' % (status.progress * 100),
})
return result
@property
def finished(self):
"""Checks if torrent is finished."""
return self.handle.is_finished()
@property
def started(self):
""" Checks if handle has metadata"""
return self.handle.has_metadata()
@property
def torrent_info(self):
"""Return handle.torrent_info"""
return self.handle.get_torrent_info()
@property
def files(self):
"""Returns a `TorrentFile` object for each file"""
fnum = range(len(self.torrent_info.files()))
return [TorrentFile(self, i) for i in fnum]
def update_priorities(self):
"""Update file priorities with self.files."""
self.handle.prioritize_files([a.priority for a in self.files])
def download_only(self, file):
""" Filter out priorities for every file except this one"""
if file not in self.files:
return None
for file_ in self.files:
file.priority = 7 if file == file_ else 0
return file
def wait_for(self, status):
"""Wait for a specific status
Example:
>>> # This will wait for a torrent to start, and return the torrent
>>> torrent = await Torrent("magnet:...").wait_for('started')
>>> # This will wait for a torrent to finish, and return the torrent
>>> torrent = await Torrent("magnet:...").wait_for('finished')
"""
while not getattr(self, status):
sleep(0.5)
def __iter__(self):
"""Iterating trough a Torrent instance will return each TorrentFile"""
return iter(self.files)
class TorrentFile:
""" Wrapper over libtorrent.file """
def __init__(self, parent: Torrent, index: int):
self.root = parent.params.get('save_path')
self.index = index
self.handle = parent.handle
self.torrent = parent
def wait_for_completion(self, percent):
while self.completed_percent < percent:
sleep(1)
@property
def full_path(self):
return f'{self.root}/{self.path}'
@property
def mime_type(self):
"""Return file mimetype"""
return mimetypes.guess_type(self.path)[0] or ''
@property
def is_media(self):
"""Return true if file is a media type"""
return any(self.mime_type.startswith(f) for f in ('audio', 'video'))
@property
def path(self):
"""Return torrent path on filesystem"""
return self.hfile.path
@property
def file(self):
"""Return a file object with this file's path open in rb mode """
return open(self.path, 'rb')
@property
def filehash(self):
"""File hash"""
return self.hfile.filehash
@property
def size(self):
"""File size"""
return self.hfile.size
@property
@get_indexed
def hfile(self):
""" Return file from libtorrent """
return self.handle.get_torrent_info().files
@property
@get_indexed
def priority(self):
""" Readonly file priority from libtorrent """
return self.handle.file_priorities
@priority.setter
def priority(self, value):
self._priority = value
self.parent.update_priorities()
# @property
# @get_indexed
# def file_progress(self):
# """ Returns file progress """
# return self.handle.file_progress
@property
def completed_percent(self):
""" Returns this file completed percentage """
# return (self.file_progress / self.size) * 100
return self.handle.status().progress * 100
session = TorrentSession()
torrent = None
def mountTorrent(magnet):
global session
global torrent
print("mounting torrent...")
torrent = session.add_torrent(magnet_link=magnet, remove_after=True)
torrent.__enter__()
torrent.sequential(True)
# Wait for torrent to be started
torrent.wait_for('started')
def umountTorrent():
global session
global torrent
print("unmounting torrent...")
torrent.__exit__()
def getTorrentMedia():
global session
global torrent
print("Getting torrent media...")
# Get first match of a media file
try:
media = next(a for a in torrent if a.is_media and not 'sample' in a.path.lower())
except StopIteration:
return None # no playable media
# Wait for 5% download
media.wait_for_completion(5)
return media.full_path

View File

@ -1,9 +1,13 @@
import ffmpeg
from logger import logger
from stream_listener import StreamListener
from stream import StreamSource
import os
# converts the stream to mp3 before sending to the long-lasting mp3 connection
class Transcoder(object):
# write to a fifo file instead of directly via stdin to fool ffmpeg into thinking it has seekable input
# https://lists.libav.org/pipermail/ffmpeg-user/2007-May/008917.html
class Transcoder(StreamSource):
def __init__(self, upstream):
self.process = ( ffmpeg
@ -20,7 +24,6 @@ class Transcoder(object):
self.process.stdin.close()
self.process.stdout.close()
self.process.stderr.close()
# self.process.wait()
def getStream(self):
return self.process.stdout

View File

@ -31,9 +31,10 @@ class Uploader(object):
def stop(self):
self.listener.stop()
if not self.process.stdin.closed:
self.process.stdin.close()
if not self.process.stderr.closed:
self.process.stderr.close()
self.process.wait()
def write(self, chunk):
while True: