import os
from enum import Enum
from dataclasses import dataclass
from collections import UserString
from ast import literal_eval
import ipaddress
from fnmatch import fnmatch
import jinja2
import jinja2.meta
from types import SimpleNamespace
from io import StringIO
from typing import Optional as O, Union as U, Any, Iterable, Iterator, TypeVar, Generic
def stripsplit(s: str, sep: str, maxsplit: int = -1) -> list[str]:
return [x.strip() for x in s.split(sep, maxsplit=maxsplit)]
def quotesplit(s: str, sep: str, maxsplit: int = - 1) -> list[str]:
parts = []
pos = -1
if maxsplit < 0:
maxsplit = len(s)
while len(parts) < maxsplit:
npos = s.find(sep, pos + 1)
if npos < 0:
pos = npos
if pos > 0 and s[pos - 1] == '\\':
s = s[:pos - 1] + s[pos:]
s = s[pos + 1:]
pos = -1
return parts
def quotejoin(sep: str, a: list[str]) -> str:
return sep.join(x.replace(sep, '\\' + sep) for x in a)
def indent(s, amount=0, skip=0) -> int:
spacing = ' ' * amount
return '\n'.join((spacing if i >= skip else '') + l for i, l in enumerate(s.splitlines()))
class Instance:
def __init__(self, conf_dir: str, data_dir: O[str] = None) -> None:
self.conf_dir = conf_dir
self.data_dir = data_dir or conf_dir
class Int(int):
def parse(cls, value: str) -> 'Int':
return cls(value)
class Str(UserString):
def parse(cls, value: str) -> 'Str':
val = literal_eval(value)
if not isinstance(val, str):
raise TypeError(value)
return cls(val)
class IPAddress:
address: U[ipaddress.IPv4Address, ipaddress.IPv6Address]
def parse(cls, s: str) -> 'IPAddress':
return cls(ipaddress.ip_address(s))
def __getattr__(self, name: str) -> Any:
return getattr(self.address, name)
def __repr__(self) -> str:
return str(self.address)
class IPNetwork:
network: U[ipaddress.IPv4Network, ipaddress.IPv6Network]
def parse(cls, s: str) -> 'IPNetwork':
return cls(ipaddress.ip_network(s))
def __getattr__(self, name: str) -> Any:
return getattr(self.network, name)
def __repr__(self) -> str:
return str(self.network)
class IPInterface:
interface: U[ipaddress.IPv4Interface, ipaddress.IPv6Interface]
def parse(cls, s: str) -> 'IPInterface':
return cls(ipaddress.ip_interface(s))
def __getattr__(self, name: str) -> Any:
return getattr(self.interface, name)
def __repr__(self) -> str:
return str(self.interface)
T = TypeVar('T')
class Type(Generic[T]):
def parse(cls, input: str) -> 'Type':
if input.startswith('?'):
return OptionalType.parse(input)
if input.startswith('['):
return ArrType.parse(input)
if input.startswith('{'):
return MapType.parse(input)
if input.startswith('@'):
return RefType.parse(input)
if input.startswith('*'):
return GlobType.parse(input)
return LitType.parse(input)
def dump(self) -> str:
raise NotImplementedError
def match_value(self, input: Any, instance: Instance) -> bool:
return False
def parse_value(self, input: str, instance: Instance) -> T:
raise TypeError
def dump_value(self, input: T) -> str:
return repr(input)
def template_value(self, input: T) -> T:
return input
class OptionalType(Generic[T], Type[O[T]]):
subtype: Type[T]
def parse(cls, input: str) -> 'OptionalType[T]':
if input[0] != '?':
raise ParseError()
return cls(Type.parse(input[1:]))
def dump(self) -> str:
return f'?{self.subtype.dump()}'
def match_value(self, input: Any, instance: Instance) -> bool:
return input is None or self.subtype.match_value(input, instance)
def parse_value(self, input: str, instance: Instance) -> O[T]:
if not input:
return None
return self.subtype.parse_value(input, instance)
def dump_value(self, input: O[T]) -> str:
if input is None:
return ''
return self.subtype.dump_value(input)
def template_value(self, input: O[T]) -> O[T]:
if input is None:
return None
return self.subtype.template_value(input)
class RefType(Type['Item']):
name: str
def parse(cls, input: str) -> 'RefType':
if input[0] != '@':
raise ParseError()
return cls(input[1:])
def dump(self) -> str:
return f'@{self.name}'
def match_value(self, input: Any, instance: Instance) -> bool:
meta = Meta.load(instance, self.name)
return isinstance(input, Item) and input.is_complete(meta)
def parse_value(self, input: str, instance: Instance) -> 'Item':
meta = Meta.load(instance, self.name)
return Item.load(instance, input)
def dump_value(self, input: 'Item') -> str:
return input.name
def template_value(self, input: 'Item') -> 'Config':
return input.config.template()
class GlobType(Type[dict[str, 'Item']]):
name: str
def parse(cls, input: str) -> 'GlobType':
if input[0] != '*':
raise ParseError()
return cls(input[1:])
def dump(self) -> str:
s = f'*{self.name}'
return s
def match_value(self, input: Any, instance: Instance) -> bool:
meta = Meta.load(instance, self.name)
return isinstance(input, list) and all(isinstance(x, Item) and fnmatch(x.name, input) and x.is_complete(meta) for x in input)
def parse_value(self, input: str, instance: Instance) -> dict[str, 'Item']:
meta = Meta.load(instance, self.name)
out = {}
for x in Item.filter(instance, input):
item = Item.load(instance, x)
if not item.is_complete(meta):
out[x] = item
return out
def dump_value(self, input: dict[str, 'Item']) -> str:
return '<unimplemented>'
def template_value(self, input: dict[str, 'Item']) -> dict[str, 'Item']:
return {x.name: x.config.template() for x in input}
class LitType(Generic[T], Type[T]):
'int': Int,
'str': Str,
'ipaddr': IPAddress,
'ipnet': IPNetwork,
'ipintf': IPInterface,
name: str
subtype: T
def parse(cls, input: str) -> 'LitType[T]':
if input not in cls.SUBTYPES:
raise ParseError()
return cls(name=input, subtype=cls.SUBTYPES[input])
def dump(self) -> str:
return self.name
def match_value(self, input: Any, instance: Instance) -> bool:
return isinstance(input, self.subtype)
def parse_value(self, input: str, instance: Instance) -> T:
return self.subtype.parse(input)
def dump_value(self, input: T) -> str:
return repr(input)
class ArrType(Generic[T], Type[list[T]]):
subtype: Type[T]
def parse(cls, input: str) -> 'ArrType':
if input[0] != '[' or input[-1] != ']':
raise ParseError()
return cls(subtype=Type.parse(input[1:-1]))
def dump(self) -> str:
return f'[{self.subtype.dump()}]'
def match_value(self, input: Any, instance: Instance) -> bool:
return isinstance(input, list) and all(self.subtype.match_value(x, instance) for x in input)
def parse_value(self, input: str, instance: Instance) -> list[T]:
if input[0] != '[' or input[-1] != ']':
raise ParseError()
input = input[1:-1].strip()
if not input:
return []
return [self.subtype.parse_value(x.strip(), instance) for x in quotesplit(input, ',')]
def dump_value(self, input: list[T]) -> str:
return '[' + quotejoin(', ', (self.subtype.dump_value(x) for x in input)) + ']'
def template_value(self, input: list[T]) -> list[T]:
return [self.subtype.template_value(x) for x in input]
V = TypeVar('V')
class MapType(Generic[T, V], Type[dict[T, V]]):
keytype: Type[T]
valtype: Type[V]
def parse(cls, input: str) -> 'MapType[T, V]':
if input[0] != '{' or input[-1] != '}':
raise ParseError()
kt, vt = quotesplit(input[1:-1], ':', maxsplit=1)
return cls(keytype=Type.parse(kt.strip()), valtype=Type.parse(vt.strip()))
def dump(self) -> str:
kt = self.keytype.dump().replace(':', '\\:')
vt = self.valtype.dump()
return f'{{{kt}: {vt}}}'
def match_value(self, input: Any, instance: Instance) -> bool:
return isinstance(input, dict) and all(self.keytype.match_value(k, instance) and self.valtype.match_value(v, instance) for (k, v) in input.items())
def parse_value(self, input: str, instance: Instance) -> tuple[dict[T, V], str]:
if input[0] != '{' or input[-1] != '}':
raise ParseError()
out = {}
if input:
items = quotesplit(input, ',')
for item in items:
k, v = quotesplit(item, ':', maxsplit=1)
out[self.keytype.parse_value(k.strip(), instance)] = self.valtype.parse_value(v.strip(), instance)
return out
def dump_value(self, input: dict[T, V]) -> str:
return '{' + quotejoin(', ', (self.keytype.dump_value(k).replace(':', '\\:') + ': ' + self.valtype.dump_value(v) for (k, v) in input.items())) + '}'
def template_value(self, input: dict[T, V]) -> dict[T, V]:
return {self.keytype.template_value(k): self.valtype.template_value(v) for (k, v) in input.items()}
NoValue = object()
class Config:
name: str
resolved: dict[str, tuple[Type, O[Any]]]
unresolved: dict[str, str]
instance: 'Instance'
kind: O[str] = None
def parse(cls, instance: Instance, name: str, lines: Iterable[str]) -> 'Config':
resolved = {}
unresolved = {}
for line in lines:
line, *_ = stripsplit(line, '#', maxsplit=1)
line, *parts = stripsplit(line, '=', maxsplit=1)
valstr = parts[0] if parts else None
vname, *parts = stripsplit(line, ':', maxsplit=1)
typestr = parts[0] if parts else None
if typestr is not None:
vtype = Type.parse(typestr)
if valstr is not None:
vval = vtype.parse_value(valstr, instance)
vval = NoValue
resolved[vname] = (vtype, vval)
unresolved[vname] = valstr
return cls(name=name, resolved=resolved, unresolved=unresolved, instance=instance)
def make(cls, _instance: Instance, _name: str, **args: dict[str, tuple[Type, O[Any]]]) -> 'Config':
return cls(name=_name, resolved=args, unresolved={}, instance=_instance)
def dump(self) -> list[str]:
out = []
for name, (vtype, vval) in self.resolved.items():
line = f'{name}: {vtype.dump()}'
if vval is not NoValue:
line += f' = {vtype.dump_value(vval)}'
for name, vval in self.unresolved.items():
line = f'{name} = {vval}'
return out
def template(self) -> SimpleNamespace:
resolved = {}
for name, (vtype, vval) in self.resolved.items():
resolved[name] = vtype.template_value(vval) if vval is not NoValue else vval
return SimpleNamespace(**resolved)
def matches(self, other: 'Config') -> bool:
for vname, (vtype, vval) in other.resolved.items():
if vname in self.unresolved:
vval = vtype.parse_value(self.unresolved[vname], self.instance)
elif vname in self.resolved:
vval = self[vname]
if vval is NoValue:
return False
return True
def type_complete(self) -> bool:
return not self.unresolved
def value_complete(self) -> bool:
return self.type_complete and all(vval is not NoValue for _, vval in self.resolved.values())
def __getitem__(self, name: str) -> Any:
_, value = self.resolved[name]
if value is NoValue:
raise KeyError(name)
return value
def __setitem__(self, name: str, value: Any) -> None:
vtype, _ = self.resolved[name]
if not vtype.match_value(value, self.instance):
raise TypeError(f'{name}: {value} must match {vtype.dump()}')
self.resolved[name] = (vtype, value)
def __contains__(self, name: str) -> bool:
if name not in self.resolved:
return False
_, value = self.resolved[name]
return value is not NoValue
def __iter__(self) -> Iterator[str]:
return iter(self.resolved)
def __repr__(self) -> str:
value = self.name
if self.kind:
value += f': {self.kind}'
value += ' {\n'
for name, (vtype, vval) in self.resolved.items():
valstr = f' {name}: {vtype.dump()}'
if vval is not NoValue:
valstr += f' = {indent(str(vval), amount=2, skip=1)}'
value += valstr + '\n'
for name, vval in self.unresolved.items():
value += f' {name}: ?{indent(str(vval), amount=2, skip=1)}\n'
value += '}'
return value
class Meta:
config: Config
def load(cls, instance: 'Instance', name: str) -> 'Meta':
with open(f'{instance.conf_dir}/configs/{name}.conf', 'r') as f:
config = Config.parse(instance, name, f.readlines())
return cls(config=config)
def save(self) -> None:
path = f'{self.config.instance.conf_dir}/configs/{self.name}.conf'
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write('\n'.join(self.config.dump()) + '\n')
def name(self) -> str:
return self.config.name
def is_complete(self) -> bool:
return self.config.type_complete
def check_complete(self) -> None:
if not self.is_complete:
raise ValueError(f'Incomplete config for meta: {self.config}')
class Item:
config: Config
def filter(cls, instance: Instance, pattern: str) -> str:
basedir = f'{instance.data_dir}/items/'
allfiles = []
for root, _, files in os.walk(basedir):
for f in files:
if not f.endswith('.conf'):
base = os.path.join(root[len(basedir):], f[:-len('.conf')])
if not fnmatch(base, pattern):
return allfiles
def load(cls, instance: Instance, name: str) -> None:
with open(f'{instance.data_dir}/items/{name}.conf', 'r') as f:
config = Config.parse(instance, name, f)
return cls(config=config)
def save(self) -> None:
path = f'{self.config.instance.data_dir}/items/{self.name}.conf'
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write('\n'.join(self.config.dump()) + '\n')
def name(self) -> str:
return self.config.name
def resolve(self, meta: Meta) -> O['Item']:
c = Config(name=self.name, resolved={}, unresolved={}, instance=self.config.instance, kind=meta.name)
for name, (vtype, vval) in meta.config.resolved.items():
c.resolved[name] = (vtype, NoValue)
if name in self.config.resolved:
vval = self.config[name]
elif name in self.config.unresolved:
vval = vtype.parse_value(self.config.unresolved[name], self.config.instance)
if vval is not NoValue:
c[name] = vval
return self.__class__(c)
def is_complete(self, meta: Meta) -> bool:
return self.resolve(meta).config.value_complete
def check_complete(self, meta: Meta) -> None:
if not self.is_complete(meta):
raise ValueError(f'incomplete config {self.config.name} for {meta.name}: {self.config}')
def __str__(self) -> str:
return str(self.config)
class Template:
name: str
source: str
instance: Instance
template: O[jinja2.Template] = None
def __post_init__(self) -> None:
if not self.template:
self.template = jinja2.Template(self.source)
def load(cls, instance: Instance, name: str) -> None:
env = jinja2.Environment()
with open(f'{instance.conf_dir}/templates/{name}.tmpl', 'r') as f:
source = f.read()
return cls(name=name, source=source, instance=instance)
def save(self) -> None:
path = f'{self.instance.conf_dir}/templates/{self.name}.tmpl'
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
def get_variables(self) -> set[str]:
return jinja2.meta.find_undeclared_variables(jinja2.Environment().parse(self.source))
def render(self, config: Config) -> str:
templated = config.template()
return self.template.render(**templated.__dict__)
if __name__ == '__main__':
import sys
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--base-dir', default='.', help='Base directory for configuration')
commands = parser.add_subparsers(title='commands')
def split_meta(name: str, instance: Instance) -> tuple[str, O[Meta]]:
if ':' in name:
name, metaname = name.split(':', maxsplit=1)
meta = Meta.load(instance, metaname)
meta = None
return name, meta
def do_new_meta(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
meta = Meta(config=Config.parse(instance, args.name, args.spec))
new_meta = commands.add_parser('new-meta')
new_meta.add_argument('spec', nargs='*')
def do_new_item(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
name, meta = split_meta(args.name, instance)
item = Item(config=Config.parse(instance, name, args.spec))
if meta:
new_item = commands.add_parser('new')
new_item.add_argument('spec', nargs='*')
def do_check_item(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
name, meta = split_meta(args.name, instance)
item = Item.load(instance, name)
if meta:
return 0 if item.is_complete(meta) else 1
return 0
check_item = commands.add_parser('check')
def do_template(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
template = Template.load(instance, args.name)
config = Config.parse(instance, '<vars>', args.spec)
missing = template.get_variables() - set(config)
if missing:
parser.error(f'missing variables: {", ".join(missing)}')
template_item = commands.add_parser('template')
template_item.add_argument('spec', nargs='*')
args = parser.parse_args()
if not args.func:
parser.error('a subcommand must be provided')
instance = Instance(args.base_dir)
sys.exit(args.func(parser, args, instance))