This commit is contained in:
Shiz 2020-05-17 04:29:43 +02:00
commit a16d211be7
16 changed files with 752 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
*.pyc
__pycache__

12
README.md Normal file
View File

@ -0,0 +1,12 @@
# hemisphere
Multi-DJ live streaming toolkit.
# Requirements
* `liquidsoap` (git master)
* Python 3.6
- `trio_cdp`
- `pyzmq`
- `twitchio`
* `ffmpeg` (with ZMQ support)

2
src/muxer/config.liq Normal file
View File

@ -0,0 +1,2 @@
djs = ["shiz", "cyan"]
visualists = ["flopine", "merri"]

20
src/muxer/main.liq Normal file
View File

@ -0,0 +1,20 @@
%include "config.liq"
def create_sources(names, prefix, port)
list.mapi(fun (i, name) ->
input.srt(id=prefix ^ "-" ^ name, port=port + i, clock_safe=false),
names)
end
dj = (mix(id="dj", create_sources(djs, "dj", 9000)):source(2,1,0))
viz = (mix(id="viz", create_sources(visualists, "viz", 9100)):source(2,1,0))
master_audio = drop_video(dj)
master_video = drop_audio(fallback([dj, viz]))
master = mksafe(mux_video(master_audio, video=master_video))
%include "rtmp.liq"
output.rtmp.multi(master, id="out-rtmp", urls=["rtmp://localhost/app/live"])

52
src/muxer/rtmp.liq Normal file
View File

@ -0,0 +1,52 @@
def output.rtmp(~id="",
~video_bitrate=2000,
~video_opts="byte-stream=false key-int-max=60 bframes=0 aud=true tune=zerolatency",
~video_encoder="x264enc",
~video_type="video/x-h264,profile=main",
~audio_bitrate=192000,
~audio_opts="",
~audio_encoder="faac",
~audio_type="audio/mpeg,mpegversion=4",
~url,
source) =
output.gstreamer.audio_video(source,
video_pipeline="videoconvert ! \
#{video_encoder} bitrate=#{video_bitrate} #{video_opts} ! \
#{video_type} ! queue ! mux.",
audio_pipeline = "audioconvert ! \
#{audio_encoder} bitrate=#{audio_bitrate} #{audio_opts} ! \
#{audio_type} ! queue ! mux.",
pipeline="flvmux streamable=true name=mux ! rtmpsink location=\"#{url}\""
)
end
def list.join(sep, l)
s = list.fold(fun (x, e) -> x ^ e ^ sep, "", l)
string.sub(s, start=0, length=string.length(s) - string.length(sep))
end
def output.rtmp.multi(~id="",
~video_bitrate=2000,
~video_opts="byte-stream=false key-int-max=60 bframes=0 aud=true tune=zerolatency",
~video_encoder="x264enc",
~video_type="video/x-h264,profile=main",
~audio_bitrate=192000,
~audio_opts="",
~audio_encoder="faac",
~audio_type="audio/mpeg,mpegversion=4",
~urls,
source) =
rtmp_pipelines = list.map(fun (url) ->
"rtmp. ! queue ! rtmpsink location=\"#{url}\"",
urls
)
output.gstreamer.audio_video(source,
video_pipeline="videoconvert ! \
#{video_encoder} bitrate=#{video_bitrate} #{video_opts} ! \
#{video_type} ! queue ! mux.",
audio_pipeline = "audioconvert ! \
#{audio_encoder} bitrate=#{audio_bitrate} #{audio_opts} ! \
#{audio_type} ! queue ! mux.",
pipeline="flvmux streamable=true name=mux ! tee name=rtmp " ^ list.join(" ", rtmp_pipelines)
)
end

0
src/overlay/__init__.py Normal file
View File

44
src/overlay/__main__.py Normal file
View File

@ -0,0 +1,44 @@
import argparse
import subprocess
import shlex
import threading
from .obs import OBSScene
from .twitch_chat import make_twitch_chat
parser = argparse.ArgumentParser()
parser.add_argument('infile', type=argparse.FileType('r'), help='scene description file')
parser.add_argument('--nick-font', default='/Users/partynorge/down/Montserrat/Montserrat-Bold.ttf')
parser.add_argument('--chat-font', default='/Users/partynorge/down/Montserrat/Montserrat-Light.ttf')
parser.add_argument('-n', '--nickname', help='Twitch chat nickname', default='shizacular')
parser.add_argument('-t', '--token', help='Twitch OAuth token', required=True)
parser.add_argument('channel', help='Twitch channel')
args = parser.parse_args()
# Load scene
scene = OBSScene()
scene.load(args.infile)
# Convert scene graph
runners, graph = scene.to_ffmpeg()
chat_runner, chat_chains = make_twitch_chat('test', args.nickname, args.token, args.channel,
nick_args={'fontfile': args.nick_font},
chat_args={'fontfile': args.chat_font}
)
runners.extend(chat_runner)
graph.extend(chat_chains, link=True)
graph.fixup()
# Launch background runners
threads = {}
for r in runners:
t = threading.Thread(target=r, daemon=True)
t.start()
threads[r] = t
# Launch main process
chain = str(graph)
subprocess.run('ffmpeg -re -f lavfi -i ' + shlex.quote(chain) + ' -f nut -c:v libx264 -preset veryfast pipe:1 | ffplay -', shell=True)

194
src/overlay/chromium.py Normal file
View File

@ -0,0 +1,194 @@
import sys
import os
import re
import tempfile
import shutil
import subprocess
import trio
from trio_cdp import open_cdp, target
if sys.platform in ('win32', 'cygwin'):
CHROMIUM_BINARIES = [
r'{}\Google\Chrome\Application\chrome.exe'.format(os.environ['LocalAppData']),
]
elif sys.platform == 'darwin':
CHROMIUM_BINARIES = [
'/Applications/Chromium.app/Contents/MacOS/Chromium',
'/Applications/Google Chrome.app/Contents/MacOS/Google Chrome',
]
else:
CHROMIUM_BINARIES = ['chromium-browser', 'chromium', 'google-chrome', 'chrome']
CHROMIUM_ARGS = {
'hide-scrollbars',
'no-first-run',
'use-mock-keychain',
'password-store=basic',
'incognito',
}
CHROMIUM_ENABLES = {
'automation',
'features=NetworkService,NetworkServiceInProcess',
}
CHROMIUM_DISABLES = {
'background-networking',
'background-timer-throttling',
'backgrounding-occluded-windows',
'breakpad',
'client-side-phishing-detection',
'crash-reporter',
'component-extensions-with-background-pages',
'default-apps',
'dev-shm-usage',
'extensions',
'gaia-services',
'hang-monitor',
'ipc-flooding-protection',
'login-animations',
'login-screen-apps',
'notifications',
'popup-blocking',
'prompt-on-repost',
'renderer-backgrounding',
'search-geolocation-disclosure',
'sync',
'features=TranslateUI',
}
CHROMIUM_HEADLESS_ARGS = {
'headless'
}
# From: https://stackoverflow.com/questions/53575979/how-can-i-read-one-line-at-a-time-from-a-trio-receivestream
class LineReader:
def __init__(self, stream, max_line_length=16384):
self.stream = stream
self._line_generator = self.generate_lines(max_line_length)
@staticmethod
def generate_lines(max_line_length):
buf = bytearray()
find_start = 0
while True:
newline_idx = buf.find(b'\n', find_start)
if newline_idx < 0:
# no b'\n' found in buf
if len(buf) > max_line_length:
raise ValueError("line too long")
# next time, start the search where this one left off
find_start = len(buf)
more_data = yield
else:
# b'\n' found in buf so return the line and move up buf
line = buf[:newline_idx+1]
# Update the buffer in place, to take advantage of bytearray's
# optimized delete-from-beginning feature.
del buf[:newline_idx+1]
# next time, start the search from the beginning
find_start = 0
more_data = yield line
if more_data is not None:
buf += bytes(more_data)
async def readline(self):
line = next(self._line_generator)
while line is None:
more_data = await self.stream.receive_some()
if not more_data:
return None
line = self._line_generator.send(more_data)
return line
async def readlines(self):
while True:
line = await self.readline()
if line is None:
break
yield line
class Chromium:
DEVTOOLS_PATTERN = re.compile('^DevTools listening on (\S+)$')
def __init__(self, binary=None, headless=True, args=[]):
self.binary = binary
self.headless = headless
self.args = args
self.profile_dir = None
self.process = None
async def launch(self):
if self.process:
await self.close()
self.profile_dir = tempfile.TemporaryDirectory()
base_args = (
['--disable-{}'.format(x) for x in CHROMIUM_DISABLES] +
['--enable-{}'.format(x) for x in CHROMIUM_ENABLES] +
['--{}'.format(x) for x in CHROMIUM_ARGS] +
[
'--user-data-dir={}'.format(self.profile_dir.name),
'--remote-debugging-port=0'
]
)
if self.headless:
base_args += ['--{}'.format(x) for x in CHROMIUM_HEADLESS_ARGS]
if all(x.startswith('-') for x in self.args):
self.args.append('about:blank')
if not self.binary:
for path in CHROMIUM_BINARIES:
if '/' in path or '\\' in path:
if os.path.isfile(path):
binary = path
break
else:
if shutil.which(path):
binary = path
break
else:
raise ValueError('could not find Chromium executable!')
else:
binary = self.binary
self.process = await trio.open_process([binary] + base_args + self.args, stderr=subprocess.PIPE)
return self.process
async def close(self):
if not self.process:
return
self.process.terminate()
self.process = None
self.profile_dir.__exit__()
self.profile_dir = None
async def connect(self):
process = await self.launch()
reader = LineReader(process.stderr)
async for line in reader.readlines():
match = self.DEVTOOLS_PATTERN.match(line.decode('utf-8'))
if match:
cdp_url = match.group(1)
break
else:
raise ValueError('could not connect to Chromium')
return open_cdp(cdp_url)
async def select(self, conn):
targets = await target.get_targets()
for t in targets:
if t.attached:
continue
if t.type_ != 'page':
continue
if t.url.startswith('devtools://'):
continue
target_id = t.target_id
break
else:
raise ValueError('could not find target')
# Create a new session with the chosen target.
return conn.open_session(target_id)

79
src/overlay/filters.py Normal file
View File

@ -0,0 +1,79 @@
def filter_escape(s):
return (str(s)
.replace(':', '\\:')
.replace(',', '\\,')
.replace(';', '\\;')
)
class FFmpegFilter:
def __init__(self, name, *, ins=[], outs=[], **args):
self.name = name
self.args = args
self.ins = ins
self.outs = outs
def __repr__(self):
return '{}({}, {}, ins={!r}, outs={!r})'.format(
self.__class__.__name__,
self.name,
', '.join('{}={}'.format(k, v) for k, v in self.args.items()),
self.ins,
self.outs
)
def __str__(self):
return '{}{}{}{}{}'.format(
''.join('[{}] '.format(i) for i in self.ins),
filter_escape(self.name),
'=' if self.args else '',
':'.join('{}={}'.format(k, filter_escape(v)) for k, v in self.args.items()),
''.join(' [{}]'.format(o) for o in self.outs),
)
class FFmpegChain:
def __init__(self, *filters):
self.filters = list(filters)
def append(self, f):
self.filters.append(f)
def extend(self, l):
self.filters.extend(l)
def __getitem__(self, i):
return self.filters[i]
def __add__(self, o):
if not isinstance(o, FFmpegChain):
raise TypeError('incompatible')
return FFmpegChain(*(self.filters + o.filters))
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, ', '.join(repr(f) for f in self.filters))
def __str__(self):
return ', '.join(str(f) for f in self.filters)
class FFmpegGraph:
def __init__(self, *chains):
self.chains = list(chains)
def append(self, f):
self.chains.append(f)
def extend(self, l, link=False):
if self.chains and link:
l[0][0].ins = self.chains[-1][-1].outs
self.chains.extend(l)
def fixup(self):
self.chains[-1][-1].outs = []
def __getitem__(self, i):
return self.chains[i]
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, ', '.join(repr(c) for c in self.chains))
def __str__(self):
return '; '.join(str(c) for c in self.chains)

View File

@ -0,0 +1,2 @@
from .base import OBSSource, OBSScene
from . import browser, scene, text, video

60
src/overlay/obs/base.py Normal file
View File

@ -0,0 +1,60 @@
import json
from ..filters import FFmpegGraph
class OBSSource:
TYPES = {}
def __init__(self, name='Untitled'):
self.name = name
def load(self, data):
raise NotImplementedError
def to_ffmpeg(self, scene):
raise NotImplementedError
@classmethod
def register(cls, type, c):
cls.TYPES[type] = c
@classmethod
def type(cls, type):
def inner(c):
cls.register(type, c)
return c
return inner
@classmethod
def find_and_load(cls, data):
name = data['name']
type = data['id']
if type not in cls.TYPES:
raise NotImplementedType('unknown type: {}'.format(type))
s = cls.TYPES[type](name)
s.load(data)
return s
class OBSScene:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sources = {}
self.current_scene = None
self.dimensions = (1280, 720)
def load(self, infile):
data = json.load(infile)
self.current_scene = data['current_scene']
for src in data['sources']:
self.sources[src['name']] = OBSSource.find_and_load(src)
def to_ffmpeg(self, scene=None):
scene = scene or self.current_scene
runners, chains = self.sources[scene].to_ffmpeg(self)
r = FFmpegGraph()
r.extend(chains)
return runners, r

View File

@ -0,0 +1,70 @@
import sys
import os
import tempfile
import shutil
import base64
import trio
from trio_cdp import page, emulation, dom
from ..chromium import Chromium
from ..filters import FFmpegChain, FFmpegFilter
from .base import OBSSource
async def do_screencast(url, outfile, dims):
c = Chromium()
async with await c.connect() as conn, await c.select(conn) as sess, sess.page_enable():
# Set viewport size and transparency.
await emulation.set_device_metrics_override(
width=dims[0], height=dims[1],
device_scale_factor=1.0, mobile=False
)
await emulation.set_default_background_color_override(dom.RGBA(0, 0, 0, 0))
# Navigate to the website.
async with sess.wait_for(page.LoadEventFired):
await page.navigate(url)
# Start capture.
await page.start_screencast(format_='jpeg', quality=60)
async for frame in sess.listen(page.ScreencastFrame):
outfile.write(base64.b64decode(frame.data))
await page.screencast_frame_ack(frame.session_id)
def capture_page(url, outfile, dims=(1280, 720)):
trio.run(do_screencast, url, outfile, dims, restrict_keyboard_interrupt_to_checkpoints=True)
@OBSSource.type('browser')
class OBSBrowserSource(OBSSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.local = False
self.url = None
self.width = 1280
self.height = 720
self.audio = False
self.tempdir = tempfile.mkdtemp()
def __del__(self):
shutil.rmtree(self.tempdir, ignore_errors=True)
def load(self, data):
self.name = data['name']
self.local = data['settings']['is_local_file']
if self.local:
self.url = data['settings']['local_file']
else:
self.url = data['settings']['url']
self.width = data['settings']['width']
self.height = data['settings']['height']
self.audio = data['settings']['reroute_audio']
def to_ffmpeg(self, scene):
fifo_path = os.path.join(self.tempdir, 'frame.bin')
fifo = os.mkfifo(fifo_path)
runner = functools.partial(capture_page, url=self.url, outfile=fifo_path, dims=(self.width, self.height))
return [runner], [FFmpegChain(
FFmpegFilter('movie', filename=fifo_path, loop=0)
)]

72
src/overlay/obs/scene.py Normal file
View File

@ -0,0 +1,72 @@
import math
import collections
import string
from ..filters import FFmpegGraph, FFmpegChain, FFmpegFilter
from .base import OBSSource
IDENTIFY_MAPPING = collections.defaultdict(lambda: '_', {ord(k): k for k in string.ascii_letters + string.digits})
def identifiery(n):
return n.translate(IDENTIFY_MAPPING)
def deg2rad(n):
return n * (math.pi / 180)
@OBSSource.type('scene')
class OBSSceneSource(OBSSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = None
self.items = []
def load(self, data):
self.name = data['name']
self.items = data['settings']['items']
def to_ffmpeg(self, scene):
prefix = identifiery(self.name)
i = 1
graph = [FFmpegChain(
FFmpegFilter('color', c='black', s='{}x{}'.format(*scene.dimensions), outs=[prefix + str(i)])
)]
runners = []
for item in self.items:
if not item['visible']:
continue
rs, chains = scene.sources[item['name']].to_ffmpeg(scene)
runners.extend(rs)
c = chains[-1]
if item['scale']['x'] != 1.0 or item['scale']['y'] != 1.0:
c.append(FFmpegFilter('scale',
w='{} * in_w'.format(item['scale']['x']),
h='{} * in_h'.format(item['scale']['y'])
))
if item['crop_top'] != 0 or item['crop_bottom'] != 0 or item['crop_left'] or item['crop_right'] != 0:
c.append(FFmpegFilter('crop',
w='in_w - {} - {}'.format(item['crop_left'], item['crop_right']),
h='in_h - {} - {}'.format(item['crop_top'], item['crop_bottom']),
x=item['crop_left'],
y=item['crop_top']
))
if item['rot'] != 0.0:
c.append(FFmpegFilter('rotate',
angle=deg2rad(item['rot'])
))
c.append(FFmpegFilter('overlay',
eval='init',
x=item['pos']['x'],
y=item['pos']['y'],
ins=[prefix + str(i)],
outs=[prefix + str(i + 1)])
)
i += 1
graph.extend(chains)
return runners, graph

40
src/overlay/obs/text.py Normal file
View File

@ -0,0 +1,40 @@
from ..filters import FFmpegChain, FFmpegFilter
from .base import OBSSource
class TextSource(OBSSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.font = None
self.size = 32
self.color = '#ffffff'
self.contents = ''
self.shadow = 0
self.shadow_color = '#000000'
def to_ffmpeg(self, scene):
return [], [FFmpegChain(
FFmpegFilter('drawtext',
expansion='none',
fontcolor=self.color,
font=self.font,
text=self.contents,
shadowcolor=self.shadow_color,
shadowx=self.shadow,
shadowy=self.shadow,
)
)]
@OBSSource.type('text_gdiplus')
class GDIPlusSource(TextSource):
def load(self, data):
self.font = data['settings']['font']['face']
self.size = data['settings']['font']['size']
self.color = data['settings']['color']
@OBSSource.type('text_ft2_source')
class FreeType2Source(TextSource):
def load(self, data):
self.font = data['settings']['font']['face']
self.size = data['settings']['font']['size']
self.color = data['settings']['color0']

34
src/overlay/obs/video.py Normal file
View File

@ -0,0 +1,34 @@
from ..filters import FFmpegChain, FFmpegFilter
from .base import OBSSource
@OBSSource.type('image_source')
class ImageSource(OBSSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.infile = None
def load(self, data):
self.infile = data['settings']['file']
def to_ffmpeg(self, scene):
return [], [FFmpegChain(
FFmpegFilter('movie', filename=self.infile)
)]
@OBSSource.type('ffmpeg_source')
class FFmpegSource(OBSSource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.infile = None
self.loop = False
def load(self, data):
self.infile = data['settings']['local_file']
self.loop = data['settings']['looping']
def to_ffmpeg(self, scene):
return [], [FFmpegChain(
FFmpegFilter('movie', filename=self.infile, loop=0 if self.loop else 1),
FFmpegFilter('realtime')
)]

View File

@ -0,0 +1,69 @@
import twitchio.ext.commands
import collections
import zmq
from .filters import FFmpegChain, FFmpegFilter, filter_escape
def make_twitch_chat(name, nickname, token, channel, x=200, y=25, addr='tcp://localhost:5555', nick_width=160, nick_args={}, chat_width=300, chat_args={}):
nick_id = 'drawtext@twitch-{}-nicks'.format(name)
chat_id = 'drawtext@twitch-{}-messages'.format(name)
runner = TwitchChatUpdater(nickname=nickname, token=token, channel=channel, nick_target=nick_id, chat_target=chat_id)
return [runner.run], [FFmpegChain(
FFmpegFilter(nick_id,
text='Loading',
expansion='none',
x='{} + max(0, {} - text_w)'.format(x, nick_width),
y=y,
**nick_args
),
FFmpegFilter(chat_id,
text='chat...',
expansion='none',
x='{} + {}'.format(x, nick_width),
y=y,
**chat_args
),
FFmpegFilter('zmq')
)]
def zmq_escape(v):
return (filter_escape(v)
.replace('\\', '\\\\')
.replace('\n', '\\\n')
.replace(' ', '\\ ')
)
class TwitchChatUpdater(twitchio.ext.commands.Bot):
def __init__(self, nickname, token, channel, nick_target, chat_target, addr='tcp://localhost:5555', maxlen=15):
super().__init__(irc_token=token, nick=nickname, initial_channels=[channel], prefix='kwdakwdwajd')
self.zmq_messages = collections.deque(maxlen=maxlen)
self.zmq = zmq.Context()
self.zmq_sock = self.zmq.socket(zmq.REQ)
self.zmq_sock.connect(addr)
self.zmq_nick_target = nick_target
self.zmq_chat_target = chat_target
def zmq_send_command(self, target, command, **args):
cmd = '{} {} {}'.format(
target, command,
':'.join(
'{}={}'.format(k, zmq_escape(v)) for k, v in args.items()
)
).encode('utf-8')
self.zmq_sock.send(cmd)
resp = self.zmq_sock.recv()
def zmq_update(self):
nick_text = '\n'.join('<{}>'.format(m.author.name) for m in self.zmq_messages if m)
self.zmq_send_command(self.zmq_nick_target, 'reinit', text=nick_text)
chat_text = '\n'.join(m.content for m in self.zmq_messages if m)
self.zmq_send_command(self.zmq_chat_target, 'reinit', text=chat_text)
async def event_ready(self):
self.zmq_update()
async def event_message(self, message):
print(message.tags)
self.zmq_messages.append(message)
self.zmq_update()