weegee/weegee/__main__.py

453 lines
22 KiB
Python

from __future__ import annotations
import sys
import os.path
import argparse
import logging
import ipaddress
from datetime import datetime
from typing import Optional as O
logging.basicConfig(level=logging.DEBUG)
from .wireguard import WireguardHostType
from . import (
WeegeeContext, WeegeeConfig,
WeegeeHook, WeegeeHost, WeegeeInterface, WeegeePeer, WeegeeConnection,
WeegeeServer, WeegeeClient,
find_interface_other_peers,
sync_all_interfaces,
setup,
)
def find_order(n: int, orders: list[tuple[int, str]]) -> (float, str):
for threshold, o in reversed(orders):
if n >= threshold:
return n / threshold, o
return float(n), ''
def siify(n: int) -> str:
orders = ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y']
n, unit = find_order(n, [(1 << (10 * i), o) for (i, o) in enumerate(orders)])
if unit:
unit += 'i'
return f'{round(n, 2)} {unit}B'
def timestampify(dt: datetime) -> str:
delta = datetime.now() - dt
if delta.days != 0:
future = delta.days < 0
orders = [(1, 'day'), (7, 'week'), (30, 'month'), (365, 'year')]
n, unit = find_order(delta.days * (-1 if future else 1), orders)
else:
future = delta.seconds < 0
orders = [(1, 'second'), (60, 'minute'), (3600, 'hour')]
n, unit = find_order(delta.seconds * (-1 if future else 1), orders)
return f'{round(n)} {unit}{"s" if n >= 1.5 else ""} {"from now" if future else "ago"}'
def main():
parser = argparse.ArgumentParser()
parser.set_defaults(func=None)
parser.add_argument('-c', '--config', help='path to configuration file')
parser.add_argument('--yes-i-want-to-destroy-this', action='store_true', default=False)
commands = parser.add_subparsers(title='commands')
def do_status(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
success = True
for interface in WeegeeInterface.find_all(ctx):
if interface.hosts:
print(f'{interface.name}:')
for host in interface.hosts:
print(f' @ {host.name}:')
wg_peers = {p.name: p for p in host.get_peers(interface.interface_name)}
for peer in find_interface_other_peers(interface):
if peer.interface.public_key not in wg_peers:
print(f' {peer.name}: expected but not found')
success = False
else:
wg_peer = wg_peers.pop(peer.interface.public_key)
tx, rx = wg_peer.get_total_transfer()
last_handshake = wg_peer.last_handshake()
if last_handshake:
last_handshake = timestampify(last_handshake)
endpoint = wg_peer.last_endpoint()
if endpoint:
endpoint = f'{endpoint[0]}:{endpoint[1]}'
print(f' {peer.name} ({wg_peer.name}): last handshake {last_handshake or "never"}{f" from {endpoint}" if endpoint else ""}, {siify(tx)} sent, {siify(rx)} received')
for wg_peer in wg_peers:
print(f' ({wg_peer.name}): found but not expected')
success = False
return 0 if success else 1
status = commands.add_parser('status')
status.set_defaults(func=do_status)
def do_sync(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
sync_all_interfaces(ctx)
sync = commands.add_parser('sync')
sync.set_defaults(func=do_sync)
# System commands
system = commands.add_parser('system')
system_commands = system.add_subparsers(title='system commands')
def do_setup(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
setup(ctx)
sys_setup = system_commands.add_parser('setup')
sys_setup.set_defaults(func=do_setup)
def do_configure(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
config = ctx.get_config()
if args.config_dir:
config.meta_dir = args.config_dir
config.data_dir = args.data_dir or args.config_dir
if args.log_level:
config.log_level = args.log_level
if args.reset_default_client_hosts:
config.default_client_hosts = []
for host in args.del_default_client_host:
host_item = WeegeeHost.load(context, host).name
if host_item in config.default_client_hosts:
config.default_client_hosts.remove(host_item)
for host in args.add_default_client_host:
host_item = WeegeeHost.load(context, host).name
if host_item not in config.default_client_hosts:
config.default_client_hosts.append(host_item)
if args.reset_default_server_hosts:
config.default_server_hosts = []
for host in args.del_default_server_host:
host_item = WeegeeHost.load(context, host).name
if host_item in config.default_server_hosts:
config.default_server_hosts.remove(host_item)
for host in args.add_default_server_host:
host_item = WeegeeHost.load(context, host).name
if host_item not in config.default_server_hosts:
config.default_server_hosts.append(host_item)
config.save(args.file)
sys_configure = system_commands.add_parser('configure')
sys_configure.add_argument('-s', '--system-file', dest='file', action='store_const', const=WeegeeConfig.GLOBAL_PATH, help=f'store to system configuration file ({WeegeeConfig.GLOBAL_PATH})')
sys_configure.add_argument('-u', '--user-file', dest='file', action='store_const', const=WeegeeConfig.USER_PATH, help=f'store to user configuration file ({WeegeeConfig.USER_PATH})')
sys_configure.add_argument('-l', '--local-file', dest='file', action='store_const', const=WeegeeConfig.LOCAL_PATH, help=f'store to local configuration file ({WeegeeConfig.LOCAL_PATH})')
sys_configure.add_argument('-f', '--file', help='store to given configuration file')
sys_configure.add_argument('-d', '--config-dir', metavar='PATH', help='config base directory (optional)')
sys_configure.add_argument('-D', '--data-dir', metavar='PATH', help='data base directory (optional, defaults to config base directory)')
sys_configure.add_argument('-L', '--log-level', metavar='LEVEL', help='log level', choices=('debug', 'info', 'warning', 'error', 'critical'))
for x in ('client', 'server'):
sys_configure.add_argument(f'--reset-default-{x}-hosts', action='store_true')
sys_configure.add_argument(f'--add-default-{x}-host', metavar='HOST', action='append', default=[], help=f'add default host for new {x}s')
sys_configure.add_argument(f'--del-default-{x}-host', metavar='HOST', action='append', default=[], help=f'remove default host for new {x}s')
sys_configure.set_defaults(func=do_configure)
def do_migrate(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
pass
sys_migrate = system_commands.add_parser('migrate')
sys_migrate.set_defaults(func=do_migrate)
def do_clean(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
success = True
while success:
success = False
for peer in WeegeePeer.find_all(ctx):
if not WeegeeConnection.find_for_peers(ctx, {peer}):
print(f'destroying peer: {peer.name}')
peer.delete()
success = True
for connection in WeegeeConnection.find_all(ctx):
if len(connection.peers) <= 1:
print(f'destroying connection: {connection.name}')
connection.delete()
success = True
for interface in WeegeeInterface.find_all(ctx):
if not WeegeePeer.find_for_interfaces(ctx, {interface}):
print(f'destroying interface: {interface.name}')
interface.delete()
success = True
sys_clean = system_commands.add_parser('clean')
sys_clean.set_defaults(func=do_clean)
# Host commands
host = commands.add_parser('host')
host_commands = host.add_subparsers(title='host commands')
def do_add_host(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
if args.auto_manage and not args.type:
parser.error('have to specify -t/--type when --auto-manage is enabled')
host = WeegeeHost.create(ctx,
args.name, type=args.type, host=args.host, user=args.user, elevate_user=args.elevate_user,
autosync=args.auto_sync, automanage=args.auto_manage,
)
host.save()
add_host = host_commands.add_parser('create')
add_host.add_argument('-t', '--type', type=WireguardHostType, help='host type for auto-manage operations')
add_host.add_argument('-H', '--host', help='remote host')
add_host.add_argument('-u', '--user', help='username')
add_host.add_argument('-U', '--elevate_user', metavar='USER', help='username to elevate privileges to')
add_host.add_argument('-a', '--auto-sync', action='store_true', default=False, help='whether to auto-synchronize config')
add_host.add_argument('-A', '--auto-manage', action='store_true', default=False, help='whether to auto-synchronize interfaces')
add_host.add_argument('name', help='host name')
add_host.set_defaults(func=do_add_host)
def do_set_host(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
host = WeegeeHost.load(ctx, args.name)
if args.type is not None:
host.type = args.type
if args.host is not None:
host.host = args.host
if args.user is not None:
host.user = args.user
if args.elevate_user is not None:
host.elevate_user = args.elevate_user
if args.auto_sync is not None:
host.autosync = args.auto_sync
if args.auto_manage is not None:
host.automanage = args.auto_manage
for hook_name in host.list_hooks():
got_pre = getattr(args, f'hook_pre_{hook_name}')
got_post = getattr(args, f'hook_post_{hook_name}')
hook_conf_name = f'{args.name}/{hook_name.replace("_", "-")}'
hook_attr_name = f'{hook_name}_hooks'
exists = WeegeeHook.exists(ctx, hook_conf_name)
if exists:
hook = WeegeeHook.load(ctx, hook_conf_name)
if (got_pre or got_post):
if not exists:
hook = WeegeeHook.create(ctx, hook_conf_name, pre=got_pre, post=got_post)
else:
hook.pre = got_pre
hook.post = got_post
hook.save()
if hook not in host.get_hooks(hook_name).hooks:
getattr(host, hook_attr_name).append(hook.item)
elif exists:
if hook in host.get_hooks(hook_name).hooks:
getattr(host, hook_attr_name).remove(hook.item)
hook.delete()
host.save()
set_host = host_commands.add_parser('set')
set_host.add_argument('-t', '--type', type=WireguardHostType, help='host type for auto-manage operations')
set_host.add_argument('-H', '--host', help='remote host')
set_host.add_argument('-u', '--user', help='username')
set_host.add_argument('-U', '--elevate_user', metavar='USER', help='username to elevate privileges to')
set_host.add_argument('-a', '--auto-sync', action='store_true', default=None, help='auto-synchronize config')
set_host.add_argument('--no-auto-sync', dest='auto_sync', action='store_false', help='do not auto-synchronize config')
set_host.add_argument('-A', '--auto-manage', action='store_true', default=None, help='auto-synchronize interfaces')
set_host.add_argument('--no-auto-manage', dest='auto_manage', action='store_false', help='do not auto-synchronize interface')
for hook in WeegeeHost.list_hooks():
hook_arg = hook.replace('_', '-')
hook_desc = hook.replace('_', ' ')
set_host.add_argument(f'--hook-pre-{hook_arg}', metavar='CMD', action='append', default=[], help=f'pre-{hook_desc} hook command')
set_host.add_argument(f'--hook-post-{hook_arg}', metavar='CMD', action='append', default=[], help=f'post-{hook_desc} hook commnd')
set_host.add_argument('name', help='host name')
set_host.set_defaults(func=do_set_host)
def do_del_host(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
if not args.yes_i_want_to_destroy_this:
parser.error('please pass --yes-i-want-to-destroy-this if you really want to destroy this host')
host = WeegeeHost.load(ctx, args.name)
host.delete()
del_host = host_commands.add_parser('destroy')
del_host.add_argument('name', help='host name')
del_host.set_defaults(func=do_del_host)
# Interface commands
interface = commands.add_parser('interface')
interface_commands = interface.add_subparsers(title='interface commands')
def do_add_interface(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
interface_name = args.interface or 'wg0'
interface = WeegeeInterface.create(ctx, args.name, interface_name,
private_key=args.private_key, public_key=args.public_key,
addresses=args.address, port=args.port, hosts=args.hosts
)
interface.save()
add_interface = interface_commands.add_parser('create')
add_interface.add_argument('-H', '--host', action='append', default=[], help='interface host(s)')
add_interface.add_argument('-a', '--address', type=ipaddress.ip_interface, action='append', default=[], help='interface address(es)')
add_interface.add_argument('-k', '--public-key', metavar='KEY',help='public key (optional)')
add_interface.add_argument('-K', '--private-key', metavar='KEY', help='private key (optional)')
add_interface.add_argument('-i', '--interface', metavar='NAME', help='interface name (optional)')
add_interface.add_argument('-p', '--port', type=int, help='listen port')
add_interface.add_argument('name', help='interface name')
add_interface.set_defaults(func=do_add_interface)
def do_del_interface(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
if not args.yes_i_want_to_destroy_this:
parser.error('please pass --yes-i-want-to-destroy-this if you really want to destroy this interface')
interface = WeegeeInterface.load(ctx, args.name)
interface.delete()
del_interface = interface_commands.add_parser('destroy')
del_interface.add_argument('name', help='interface name')
del_interface.set_defaults(func=do_del_interface)
def do_connect_interface(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
left_interface = WeegeeInterface.load(ctx, args.left)
left_peer = WeegeePeer.create(ctx, f'{args.left}-{args.right}', left_interface,
routes=args.left_route, host=args.left_host, port=args.left_port,
)
left_peer.save()
right_interface = WeegeeInterface.load(ctx, args.right)
right_peer = WeegeePeer.create(ctx, f'{args.right}-{args.left}', right_interface,
routes=args.right_route, host=args.right_host, port=args.right_port,
)
right_peer.save()
connection = WeegeeConnection.create(ctx, f'{args.left}-{args.right}', [left_peer, right_peer],
preshared_key=args.preshared_key,
)
connection.save()
connect_interface = interface_commands.add_parser('connect')
for i, (argname, helpname) in enumerate([('left', 'first'), ('right', 'second')]):
connect_interface.add_argument('-R' if i else '-r', f'--{argname}-route', metavar='ROUTE', type=ipaddress.ip_network, action='append', default=[], help=f'{helpname} interface route(s)')
connect_interface.add_argument('-E' if i else '-e', f'--{argname}-host', metavar='HOST', help=f'{helpname} interface endpoint (optional)')
connect_interface.add_argument('-P' if i else '-p', f'--{argname}-port', metavar='PORT', type=int, help=f'{helpname} interface port (optional)')
connect_interface.add_argument('-k', '--preshared-key', metavar='KEY', help='preshared key (optional)')
connect_interface.add_argument('left', help='first interface')
connect_interface.add_argument('right', help='second interface')
connect_interface.set_defaults(func=do_connect_interface)
def do_disconnect_interface(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
connection = WeegeeConnection.load(ctx, f'{args.left}-{args.right}')
connection.delete()
disconnect_interface = interface_commands.add_parser('disconnect')
disconnect_interface.add_argument('left', help='first interface')
disconnect_interface.add_argument('right', help='second interface')
disconnect_interface.set_defaults(func=do_disconnect_interface)
# Server commands
server = commands.add_parser('server')
server_commands = server.add_subparsers(title='server commands')
def do_add_server(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
interface_name = args.interface or f'wg-{args.name}'
interface = WeegeeInterface.create(ctx, args.name, interface_name,
private_key=args.private_key, public_key=args.public_key,
addresses=args.address, port=args.port, hosts=[WeegeeHost.load(ctx, name) for name in ctx.get_config().default_server_hosts],
)
interface.save()
server = WeegeeServer.create(ctx, args.name, interface, routes=args.routes, host=args.host)
server.save()
add_server = server_commands.add_parser('create')
add_server.add_argument('-a', '--address', type=ipaddress.ip_interface, action='append', help='interface address(es)')
add_server.add_argument('-r', '--routes', type=ipaddress.ip_network, action='append', help='interface route(s) for clients')
add_server.add_argument('-k', '--public-key', help='public key (optional)')
add_server.add_argument('-K', '--private-key', help='private key (optional)')
add_server.add_argument('-i', '--interface', help='interface name (optional)')
add_server.add_argument('name', help='server name')
add_server.add_argument('host', help='endpoint host')
add_server.add_argument('port', type=int, help='listen port')
add_server.set_defaults(func=do_add_server)
def do_del_server(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
if not args.yes_i_want_to_destroy_this:
parser.error('please pass --yes-i-want-to-destroy-this if you really want to destroy this server')
server = WeegeeServer.load(ctx, args.name)
server.delete()
del_server = server_commands.add_parser('destroy')
del_server.add_argument('name', help='server name')
del_server.set_defaults(func=do_del_server)
def do_conf_server(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
server = WeegeeServer.load(ctx, args.name)
print(server.gen_config())
conf_server = server_commands.add_parser('config')
conf_server.add_argument('name', help='server name')
conf_server.set_defaults(func=do_conf_server)
# Client commands
client = commands.add_parser('client')
client_commands = client.add_subparsers(title='client commands')
def do_add_client(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
server = WeegeeServer.load(ctx, args.server)
full_name = server.get_client_name(args.name)
interface_name = args.interface or f'wg0'
interface = WeegeeInterface.create(ctx, full_name, interface_name,
private_key=args.private_key, public_key=args.public_key,
addresses=args.address, hosts=[WeegeeHost.load(ctx, name) for name in ctx.get_config().default_client_hosts],
)
interface.save()
client = WeegeeClient.create(ctx, args.name, server, preshared_key=args.preshared_key, interface=interface, routes=[])
client.save()
add_client = client_commands.add_parser('create')
add_client.add_argument('-a', '--address', type=ipaddress.ip_interface, action='append', help='interface address(es)')
add_client.add_argument('-k', '--public-key', help='public key (optional)')
add_client.add_argument('-K', '--private-key', help='private key (optional)')
add_client.add_argument('-p', '--preshared-key', help='preshared key (optional)')
add_client.add_argument('-i', '--interface', help='interface name (optional)')
add_client.add_argument('name', help='client name')
add_client.add_argument('server', help='server name')
add_client.set_defaults(func=do_add_client)
def do_del_client(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
if not args.yes_i_want_to_destroy_this:
parser.error('please pass --yes-i-want-to-destroy-this if you really want to destroy this client')
server = WeegeeServer.load(ctx, args.server)
client = server.get_client(args.name)
client.delete()
del_client = client_commands.add_parser('destroy')
del_client.add_argument('name', help='name')
del_client.add_argument('server', help='server name')
del_client.set_defaults(func=do_del_client)
def do_conf_client(parser: argparse.ArgumentParser, args: argparse.Namespace, ctx: WeegeeContext) -> O[int]:
server = WeegeeServer.load(ctx, args.server)
client = server.get_client(args.name)
print(client.gen_config())
conf_client = client_commands.add_parser('config')
conf_client.add_argument('name', help='name')
conf_client.add_argument('server', help='server name')
conf_client.set_defaults(func=do_conf_client)
# The main buffet
args = parser.parse_args()
if not args.func:
parser.error('a subcommand must be provided')
config = WeegeeConfig.find()
if args.config:
config.load(args.config)
context = WeegeeContext(config)
sys.exit(args.func(parser, args, context))
# isn't this a tautology?
if __name__ == '__main__':
main()