902 lines
29 KiB
Python
902 lines
29 KiB
Python
from __future__ import annotations
|
|
from logging import getLogger
|
|
|
|
import os
|
|
import re
|
|
from copy import deepcopy
|
|
from enum import Enum
|
|
from dataclasses import dataclass, field
|
|
from ast import literal_eval
|
|
import ipaddress
|
|
from fnmatch import fnmatch
|
|
from types import SimpleNamespace
|
|
from itertools import chain
|
|
from typing import (
|
|
Optional as O, Union as U, Type as TypeOf, Any, TypeVar, Generic,
|
|
Iterator, Iterable, Tuple, List, Dict, Set,
|
|
)
|
|
|
|
import jinja2
|
|
import jinja2.meta
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
IPAddressType = U[ipaddress.IPv4Address, ipaddress.IPv6Address]
|
|
IPNetworkType = U[ipaddress.IPv4Network, ipaddress.IPv6Network]
|
|
IPInterfaceType = U[ipaddress.IPv4Interface, ipaddress.IPv6Interface]
|
|
T = TypeVar('T')
|
|
|
|
|
|
def stripsplit(s: str, sep: str, maxsplit: int = -1) -> List[str]:
|
|
return [x.strip() for x in s.split(sep, maxsplit=maxsplit)]
|
|
|
|
def quotesplit(s: str, sep: str, maxsplit: int = - 1) -> List[str]:
|
|
parts: List[str] = []
|
|
pos = -1
|
|
if maxsplit < 0:
|
|
maxsplit = len(s)
|
|
|
|
while len(parts) < maxsplit:
|
|
npos = s.find(sep, pos + 1)
|
|
if npos < 0:
|
|
break
|
|
pos = npos
|
|
if pos > 0 and s[pos - 1] == '\\':
|
|
s = s[:pos - 1] + s[pos:]
|
|
continue
|
|
parts.append(s[:pos])
|
|
s = s[pos + 1:]
|
|
pos = -1
|
|
|
|
parts.append(s)
|
|
return parts
|
|
|
|
def quotestripsplit(s: str, sep: str, maxsplit: int = -1) -> List[str]:
|
|
return [x.strip() for x in quotesplit(s, sep, maxsplit=maxsplit)]
|
|
|
|
def indent(s, amount=0, skip=0) -> str:
|
|
spacing = ' ' * amount
|
|
return '\n'.join((spacing if i >= skip else '') + l for i, l in enumerate(s.splitlines()))
|
|
|
|
|
|
class Instance:
|
|
def __init__(self, conf_dir: str, data_dir: O[str] = None) -> None:
|
|
self.conf_dir = conf_dir
|
|
self.data_dir = data_dir or conf_dir
|
|
|
|
|
|
class ParseError(ValueError):
|
|
pass
|
|
|
|
|
|
class LitParser(Generic[T]):
|
|
@classmethod
|
|
def parse(cls, value: str) -> T:
|
|
...
|
|
|
|
@classmethod
|
|
def match(cls, value: Any) -> bool:
|
|
...
|
|
|
|
@classmethod
|
|
def dump(cls, value: T) -> str:
|
|
...
|
|
|
|
class Bool(LitParser[bool]):
|
|
@classmethod
|
|
def parse(cls, value: str) -> bool:
|
|
return {
|
|
'true': True,
|
|
'1': True,
|
|
'false': False,
|
|
'0': False,
|
|
}[value]
|
|
|
|
@classmethod
|
|
def match(cls, value: Any) -> bool:
|
|
return isinstance(value, bool)
|
|
|
|
@classmethod
|
|
def dump(cls, value: bool) -> str:
|
|
return {
|
|
True: 'true',
|
|
False: 'false',
|
|
}[value]
|
|
|
|
class Int(LitParser[int]):
|
|
@classmethod
|
|
def parse(cls, value: str) -> int:
|
|
return int(value)
|
|
|
|
@classmethod
|
|
def match(cls, value: Any) -> bool:
|
|
return isinstance(value, int) and not isinstance(value, bool)
|
|
|
|
@classmethod
|
|
def dump(cls, value: int) -> str:
|
|
return repr(value)
|
|
|
|
class Str(LitParser[str]):
|
|
@classmethod
|
|
def parse(cls, value: str) -> str:
|
|
val = literal_eval(value)
|
|
if not isinstance(val, str):
|
|
raise TypeError(value)
|
|
return val
|
|
|
|
@classmethod
|
|
def match(cls, value: Any) -> bool:
|
|
return isinstance(value, str)
|
|
|
|
@classmethod
|
|
def dump(cls, value: str) -> str:
|
|
return repr(value)
|
|
|
|
class IPAddress(LitParser[IPAddressType]):
|
|
@classmethod
|
|
def parse(cls, value: str) -> IPAddressType:
|
|
return ipaddress.ip_address(value)
|
|
|
|
@classmethod
|
|
def match(cls, value: Any) -> bool:
|
|
return isinstance(value, (ipaddress.IPv4Address, ipaddress.IPv6Address))
|
|
|
|
@classmethod
|
|
def dump(cls, value: IPAddressType) -> str:
|
|
return str(value)
|
|
|
|
class IPNetwork(LitParser[IPNetworkType]):
|
|
@classmethod
|
|
def parse(cls, value: str) -> IPNetworkType:
|
|
return ipaddress.ip_network(value)
|
|
|
|
@classmethod
|
|
def match(cls, value: Any) -> bool:
|
|
return isinstance(value, (ipaddress.IPv4Network, ipaddress.IPv6Network))
|
|
|
|
@classmethod
|
|
def dump(cls, value: IPNetworkType) -> str:
|
|
return str(value)
|
|
|
|
class IPInterface(LitParser[IPInterfaceType]):
|
|
@classmethod
|
|
def parse(cls, value: str) -> IPInterfaceType:
|
|
return ipaddress.ip_interface(value)
|
|
|
|
@classmethod
|
|
def match(cls, value: Any) -> bool:
|
|
return isinstance(value, (ipaddress.IPv4Interface, ipaddress.IPv6Interface))
|
|
|
|
@classmethod
|
|
def dump(cls, value: IPInterfaceType) -> str:
|
|
return str(value)
|
|
|
|
|
|
TemplateT = TypeVar('TemplateT')
|
|
|
|
class Type(Generic[T, TemplateT]):
|
|
@classmethod
|
|
def parse(cls, input: str) -> 'Type':
|
|
if input.startswith('?'):
|
|
return OptionalType.parse(input)
|
|
if input.startswith('['):
|
|
return ArrType.parse(input)
|
|
if input.startswith('{'):
|
|
return MapType.parse(input)
|
|
if input.startswith('@'):
|
|
return RefType.parse(input)
|
|
if input.startswith('*'):
|
|
return GlobType.parse(input)
|
|
return LitType.parse(input)
|
|
|
|
def dump(self) -> str:
|
|
raise NotImplementedError
|
|
|
|
def match_value(self, input: Any, instance: Instance) -> bool:
|
|
return False
|
|
|
|
def parse_value(self, input: str, instance: Instance) -> T:
|
|
raise TypeError
|
|
|
|
def dump_value(self, input: T) -> str:
|
|
return repr(input)
|
|
|
|
def template_value(self, input: T) -> TemplateT:
|
|
raise NotImplementedError
|
|
|
|
@dataclass
|
|
class OptionalType(Generic[T, TemplateT], Type[O[T], O[TemplateT]]):
|
|
subtype: Type[T, TemplateT]
|
|
|
|
@classmethod
|
|
def parse(cls, input: str) -> 'OptionalType[T, TemplateT]':
|
|
if input[0] != '?':
|
|
raise ParseError()
|
|
return cls(Type.parse(input[1:]))
|
|
|
|
def dump(self) -> str:
|
|
return f'?{self.subtype.dump()}'
|
|
|
|
def match_value(self, input: Any, instance: Instance) -> bool:
|
|
return input is None or self.subtype.match_value(input, instance)
|
|
|
|
def parse_value(self, input: str, instance: Instance) -> O[T]:
|
|
if not input:
|
|
return None
|
|
return self.subtype.parse_value(input, instance)
|
|
|
|
def dump_value(self, input: O[T]) -> str:
|
|
if input is None:
|
|
return ''
|
|
return self.subtype.dump_value(input)
|
|
|
|
def template_value(self, input: O[T]) -> O[TemplateT]:
|
|
if input is None:
|
|
return None
|
|
return self.subtype.template_value(input)
|
|
|
|
@dataclass
|
|
class RefType(Type['Item', SimpleNamespace]):
|
|
name: str
|
|
|
|
@classmethod
|
|
def parse(cls, input: str) -> 'RefType':
|
|
if input[0] != '@':
|
|
raise ParseError()
|
|
return cls(input[1:])
|
|
|
|
def dump(self) -> str:
|
|
return f'@{self.name}'
|
|
|
|
def match_value(self, input: Any, instance: Instance) -> bool:
|
|
meta = Meta.load(instance, self.name)
|
|
return isinstance(input, Item) and input.is_complete(meta)
|
|
|
|
def parse_value(self, input: str, instance: Instance) -> 'Item':
|
|
meta = Meta.load(instance, self.name)
|
|
return Item.load(instance, input).resolve(meta)
|
|
|
|
def dump_value(self, input: 'Item') -> str:
|
|
return input.name
|
|
|
|
def template_value(self, input: 'Item') -> SimpleNamespace:
|
|
return input.template()
|
|
|
|
@dataclass
|
|
class GlobType(Type[Dict[str, 'Item'], Dict[str, SimpleNamespace]]):
|
|
name: str
|
|
|
|
@classmethod
|
|
def parse(cls, input: str) -> 'GlobType':
|
|
if input[0] != '*':
|
|
raise ParseError()
|
|
return cls(input[1:])
|
|
|
|
def dump(self) -> str:
|
|
s = f'*{self.name}'
|
|
return s
|
|
|
|
def match_value(self, input: Any, instance: Instance) -> bool:
|
|
meta = Meta.load(instance, self.name)
|
|
return isinstance(input, list) and all(isinstance(x, Item) and x.is_complete(meta) for x in input)
|
|
|
|
def parse_value(self, input: str, instance: Instance) -> Dict[str, 'Item']:
|
|
meta = Meta.load(instance, self.name)
|
|
out = {}
|
|
for x in Item.filter(instance, input):
|
|
item = Item.load(instance, x)
|
|
if not item.is_complete(meta):
|
|
continue
|
|
out[x] = item.resolve(meta)
|
|
return out
|
|
|
|
def dump_value(self, input: Dict[str, 'Item']) -> str:
|
|
return '<unimplemented>'
|
|
|
|
def template_value(self, input: Dict[str, 'Item']) -> Dict[str, SimpleNamespace]:
|
|
return {x.name: x.template() for x in input.values()}
|
|
|
|
@dataclass
|
|
class LitType(Generic[T], Type[T, T]):
|
|
SUBTYPES = {
|
|
'bool': Bool,
|
|
'int': Int,
|
|
'str': Str,
|
|
'ipaddr': IPAddress,
|
|
'ipnet': IPNetwork,
|
|
'ipintf': IPInterface,
|
|
}
|
|
name: str
|
|
subtype: TypeOf[LitParser[T]]
|
|
|
|
@classmethod
|
|
def parse(cls, input: str) -> 'LitType[T]':
|
|
if input not in cls.SUBTYPES:
|
|
raise ParseError()
|
|
return cls(name=input, subtype=cls.SUBTYPES[input])
|
|
|
|
def dump(self) -> str:
|
|
return self.name
|
|
|
|
def match_value(self, input: Any, instance: Instance) -> bool:
|
|
return self.subtype.match(input)
|
|
|
|
def parse_value(self, input: str, instance: Instance) -> T:
|
|
return self.subtype.parse(input)
|
|
|
|
def template_value(self, input: T) -> T:
|
|
return input
|
|
|
|
def dump_value(self, input: T) -> str:
|
|
return self.subtype.dump(input)
|
|
|
|
@dataclass
|
|
class ArrType(Generic[T, TemplateT], Type[List[T], List[TemplateT]]):
|
|
subtype: Type[T, TemplateT]
|
|
|
|
@classmethod
|
|
def parse(cls, input: str) -> 'ArrType[T, TemplateT]':
|
|
if input[0] != '[' or input[-1] != ']':
|
|
raise ParseError()
|
|
return cls(subtype=Type.parse(input[1:-1]))
|
|
|
|
def dump(self) -> str:
|
|
return f'[{self.subtype.dump()}]'
|
|
|
|
def match_value(self, input: Any, instance: Instance) -> bool:
|
|
return isinstance(input, list) and all(self.subtype.match_value(x, instance) for x in input)
|
|
|
|
def parse_value(self, input: str, instance: Instance) -> List[T]:
|
|
if input[0] != '[' or input[-1] != ']':
|
|
raise ParseError()
|
|
input = input[1:-1].strip()
|
|
if not input:
|
|
return []
|
|
return [self.subtype.parse_value(x.strip(), instance) for x in quotesplit(input, ',')]
|
|
|
|
def dump_value(self, input: List[T]) -> str:
|
|
return '[' + ', '.join(self.subtype.dump_value(x).replace(',', '\\,') for x in input) + ']'
|
|
|
|
def template_value(self, input: List[T]) -> List[TemplateT]:
|
|
return [self.subtype.template_value(x) for x in input]
|
|
|
|
V = TypeVar('V')
|
|
TemplateV = TypeVar('TemplateV')
|
|
|
|
@dataclass
|
|
class MapType(Generic[T, TemplateT, V, TemplateV], Type[Dict[T, V], Dict[TemplateT, TemplateV]]):
|
|
keytype: Type[T, TemplateT]
|
|
valtype: Type[V, TemplateV]
|
|
|
|
@classmethod
|
|
def parse(cls, input: str) -> 'MapType[T, TemplateT, V, TemplateV]':
|
|
if input[0] != '{' or input[-1] != '}':
|
|
raise ParseError()
|
|
kt, vt = quotesplit(input[1:-1], ':', maxsplit=1)
|
|
return cls(keytype=Type.parse(kt.strip()), valtype=Type.parse(vt.strip()))
|
|
|
|
def dump(self) -> str:
|
|
kt = self.keytype.dump().replace(':', '\\:')
|
|
vt = self.valtype.dump()
|
|
return f'{{{kt}: {vt}}}'
|
|
|
|
def match_value(self, input: Any, instance: Instance) -> bool:
|
|
return isinstance(input, dict) and all(self.keytype.match_value(k, instance) and self.valtype.match_value(v, instance) for (k, v) in input.items())
|
|
|
|
def parse_value(self, input: str, instance: Instance) -> Dict[T, V]:
|
|
if input[0] != '{' or input[-1] != '}':
|
|
raise ParseError()
|
|
out = {}
|
|
input[1:-1].strip()
|
|
if input:
|
|
items = quotesplit(input, ',')
|
|
for item in items:
|
|
k, v = quotesplit(item, ':', maxsplit=1)
|
|
out[self.keytype.parse_value(k.strip(), instance)] = self.valtype.parse_value(v.strip(), instance)
|
|
return out
|
|
|
|
def dump_value(self, input: Dict[T, V]) -> str:
|
|
return '{' + ', '.join((self.keytype.dump_value(k).replace(':', '\\:') + ': ' + self.valtype.dump_value(v)).replace(',', '\\,') for (k, v) in input.items()) + '}'
|
|
|
|
def template_value(self, input: Dict[T, V]) -> Dict[TemplateT, TemplateV]:
|
|
return {self.keytype.template_value(k): self.valtype.template_value(v) for (k, v) in input.items()}
|
|
|
|
|
|
C = TypeVar('C', bound='Config')
|
|
|
|
@dataclass
|
|
class Config:
|
|
name: str
|
|
types: Dict[str, Type]
|
|
values: Dict[str, Any]
|
|
unresolved: Dict[str, str]
|
|
instance: 'Instance'
|
|
kind: O[str] = None
|
|
|
|
@classmethod
|
|
def parse(cls: TypeOf[C], instance: Instance, name: str, lines: Iterable[str]) -> C:
|
|
c = cls(name=name, types={}, values={}, unresolved={}, instance=instance)
|
|
|
|
for line in lines:
|
|
line, *_ = quotestripsplit(line, '#', maxsplit=1)
|
|
if not line:
|
|
continue
|
|
line, *parts = quotestripsplit(line, '=', maxsplit=1)
|
|
valstr = parts[0] if parts else None
|
|
vname, *parts = quotestripsplit(line, ':', maxsplit=1)
|
|
typestr = parts[0] if parts else None
|
|
if valstr is not None:
|
|
c.unresolved[vname] = valstr
|
|
if typestr is not None:
|
|
c.resolve_type(vname, Type.parse(typestr))
|
|
|
|
return c
|
|
|
|
def dump(self) -> List[str]:
|
|
keys = {}
|
|
for k in self.types:
|
|
keys[k] = True
|
|
for k in self.values:
|
|
keys[k] = True
|
|
for k in self.unresolved:
|
|
keys[k] = True
|
|
|
|
out = []
|
|
for name in keys:
|
|
line = name.replace(':', '\\:').replace('=', '\\=')
|
|
if name in self.types:
|
|
type = self.types[name].dump().replace('=', '\\=')
|
|
line += f': {type}'
|
|
value = None
|
|
if name in self.values:
|
|
if name not in self.types:
|
|
raise ValueError()
|
|
value = self.types[name].dump_value(self.values[name])
|
|
elif name in self.unresolved:
|
|
value = self.unresolved[name]
|
|
if value is not None:
|
|
line += f' = {value}'
|
|
out.append(line.replace('#', '\\#'))
|
|
|
|
return out
|
|
|
|
@classmethod
|
|
def define(cls: TypeOf[C], _instance: Instance, _name: str, _kind: O[str] = None, **args: Type) -> C:
|
|
return cls(name=_name, types=args, values={}, unresolved={}, instance=_instance, kind=_kind)
|
|
|
|
@classmethod
|
|
def make(cls: TypeOf[C], _instance: Instance, _name: str, _kind: O[str] = None, **args: Dict[str, Any]) -> C:
|
|
return cls(name=_name, types={}, values=args, unresolved={}, instance=_instance, kind=_kind)
|
|
|
|
def copy(self: C) -> C:
|
|
return self.__class__(self.name, instance=self.instance, kind=self.kind,
|
|
types=self.types.copy(), values=self.values.copy(), unresolved=self.unresolved.copy(),
|
|
)
|
|
|
|
|
|
def template(self) -> SimpleNamespace:
|
|
resolved = {}
|
|
for name, value in self.values.items():
|
|
if name in self.types:
|
|
resolved[name] = self.types[name].template_value(value)
|
|
return SimpleNamespace(**resolved)
|
|
|
|
|
|
def merge(self: C, other: 'Config') -> C:
|
|
c = self.__class__(name=self.name, types={}, values={}, unresolved={}, instance=self.instance, kind=self.kind)
|
|
for name, value in chain(other.unresolved.items(), self.unresolved.items()):
|
|
if name not in self:
|
|
c.unresolved[name] = value
|
|
for name, type in chain(other.types.items(), self.types.items()):
|
|
c.resolve_type(name, type)
|
|
for name, value in chain(self.values.items(), other.values.items()):
|
|
if name not in c:
|
|
c.values[name] = deepcopy(value)
|
|
return c
|
|
|
|
def split(self: C, other: 'Config') -> C:
|
|
c = self.copy()
|
|
for name, value in other.values.items():
|
|
if name in c and c[name] == value:
|
|
del c[name]
|
|
for name, type in other.types.items():
|
|
if c.is_type_resolved(name) and c.types[name] == type:
|
|
c.unresolve_type(name)
|
|
for name, value in other.unresolved.items():
|
|
if name in self.unresolved and self.unresolved[name] == value:
|
|
del c.unresolved[name]
|
|
return c
|
|
|
|
def purify(self: C) -> C:
|
|
c = self.copy()
|
|
c.unresolved = {}
|
|
return c
|
|
|
|
|
|
def is_type_resolved(self, name: str) -> bool:
|
|
return name in self.types
|
|
|
|
def is_value_resolved(self, name: str) -> bool:
|
|
return name in self.values
|
|
|
|
def is_unresolved(self, name: str) -> bool:
|
|
return name in self.unresolved
|
|
|
|
def resolve_type(self, name: str, type: Type) -> None:
|
|
self.types[name] = type
|
|
if name in self.unresolved:
|
|
self.values[name] = self.resolve_value(name, self.unresolved[name])
|
|
del self.unresolved[name]
|
|
|
|
def unresolve_type(self, name: str) -> None:
|
|
if name in self.values:
|
|
self.unresolved[name] = self.unresolve_value(name, self.values[name])
|
|
del self.values[name]
|
|
del self.types[name]
|
|
|
|
def resolve_value(self, name: str, value: str) -> Any:
|
|
return self.types[name].parse_value(value, self.instance)
|
|
|
|
def unresolve_value(self, name: str, value: Any) -> str:
|
|
return self.types[name].dump_value(value)
|
|
|
|
|
|
def is_type_complete(self, other: O['Config'] = None) -> bool:
|
|
if not other:
|
|
other = self
|
|
return all(self.is_type_resolved(x) for x in other.values)
|
|
|
|
def is_value_complete(self, other: O['Config'] = None) -> bool:
|
|
if not other:
|
|
other = self
|
|
return all(self.is_value_resolved(x) for x in other.types)
|
|
|
|
|
|
def __getitem__(self, name: str) -> Any:
|
|
return self.values[name]
|
|
|
|
def __setitem__(self, name: str, value: Any) -> None:
|
|
vtype = self.types[name]
|
|
if not vtype.match_value(value, self.instance):
|
|
raise TypeError(f'{name}: {value} must match {vtype.dump()}')
|
|
self.values[name] = value
|
|
|
|
def __delitem__(self, name: str) -> None:
|
|
del self.values[name]
|
|
|
|
def __contains__(self, name: str) -> bool:
|
|
return name in self.values
|
|
|
|
def __iter__(self) -> Iterator[str]:
|
|
return iter(self.values)
|
|
|
|
def __repr__(self) -> str:
|
|
keys = {}
|
|
for k in self.types:
|
|
keys[k] = True
|
|
for k in self.values:
|
|
keys[k] = True
|
|
for k in self.unresolved:
|
|
keys[k] = True
|
|
|
|
value = self.name
|
|
if self.kind:
|
|
value += f': {self.kind}'
|
|
value += ' {\n'
|
|
|
|
for name in keys:
|
|
value += f' {name}'
|
|
if name in self.types:
|
|
value += f': {self.types[name].dump()}'
|
|
if name in self.values:
|
|
value += f' = {indent(str(self.values[name]), amount=2, skip=1)}'
|
|
elif name in self.unresolved:
|
|
value += f' = ?{indent(str(self.unresolved[name]), amount=2, skip=1)}'
|
|
value += '\n'
|
|
|
|
value += '}'
|
|
return value
|
|
|
|
|
|
@dataclass(repr=False)
|
|
class Meta(Config):
|
|
@classmethod
|
|
def load(cls, instance: 'Instance', name: str) -> 'Meta':
|
|
with open(f'{instance.conf_dir}/configs/{name}.conf', 'r') as f:
|
|
return cls.parse(instance, name, f.readlines())
|
|
|
|
def save(self) -> None:
|
|
path = f'{self.instance.conf_dir}/configs/{self.name}.conf'
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, 'w') as f:
|
|
f.write('\n'.join(self.dump()) + '\n')
|
|
|
|
@property
|
|
def is_complete(self) -> bool:
|
|
return self.is_type_complete()
|
|
|
|
def check_complete(self) -> None:
|
|
if not self.is_complete:
|
|
raise ValueError(f'Incomplete config for meta: {self}')
|
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
return isinstance(other, self.__class__) and self.name == other.name
|
|
|
|
@dataclass(repr=False)
|
|
class Item(Config):
|
|
@classmethod
|
|
def get_path(cls, instance: Instance, name: str) -> str:
|
|
return f'{instance.data_dir}/items/{name}.conf'
|
|
|
|
@classmethod
|
|
def get_trashed_path(cls, instance: Instance, name: str) -> str:
|
|
return f'{instance.data_dir}/trash/{name}.conf'
|
|
|
|
@classmethod
|
|
def exists(cls, instance: Instance, name: str) -> bool:
|
|
return os.path.isfile(cls.get_path(instance, name))
|
|
|
|
@classmethod
|
|
def restorable(cls, instance: Instance, name: str) -> bool:
|
|
return os.path.isfile(cls.get_trashed_path(instance, name))
|
|
|
|
@classmethod
|
|
def filter(cls, instance: Instance, pattern: str) -> List[str]:
|
|
basedir = f'{instance.data_dir}/items/'
|
|
allfiles = []
|
|
for root, _, files in os.walk(basedir):
|
|
for f in files:
|
|
if not f.endswith('.conf'):
|
|
continue
|
|
base = os.path.join(root[len(basedir):], f[:-len('.conf')])
|
|
if not fnmatch(base, pattern):
|
|
continue
|
|
allfiles.append(base)
|
|
return allfiles
|
|
|
|
@classmethod
|
|
def load(cls, instance: Instance, name: str) -> 'Item':
|
|
return cls.loadfile(instance, name, cls.get_path(instance, name))
|
|
|
|
@classmethod
|
|
def loadfile(cls, instance: Instance, name: str, path: str, ) -> 'Item':
|
|
with open(path, 'r') as f:
|
|
return cls.parse(instance, name, f)
|
|
|
|
def save(self) -> None:
|
|
return self.savefile(self.get_path(self.instance, self.name))
|
|
|
|
def savefile(self, path: str) -> None:
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, 'w') as f:
|
|
f.write('\n'.join(self.dump()) + '\n')
|
|
|
|
def trash(self) -> None:
|
|
trash_path = self.get_trashed_path(self.instance, self.name)
|
|
os.makedirs(os.path.dirname(trash_path), exist_ok=True)
|
|
os.rename(self.get_path(self.instance, self.name), trash_path)
|
|
|
|
@classmethod
|
|
def restore(cls, instance: Instance, name: str) -> 'Item':
|
|
path = cls.get_path(instance, name)
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
os.rename(cls.get_trashed_path(instance, name), path)
|
|
return cls.load(instance, name)
|
|
|
|
def delete(self) -> None:
|
|
if self.exists(self.instance, self.name):
|
|
os.remove(self.get_path(self.instance, self.name))
|
|
|
|
def resolve(self, config: Config) -> 'Item':
|
|
c = self.merge(config)
|
|
c.kind = config.name
|
|
return c
|
|
|
|
def unresolve(self, config: Config) -> 'Item':
|
|
c = self.split(config)
|
|
c.kind = None
|
|
return c
|
|
|
|
def is_complete(self, config: Config) -> bool:
|
|
return self.resolve(config).is_value_complete(config)
|
|
|
|
def check_complete(self, config: Config) -> None:
|
|
if not self.is_complete(config):
|
|
raise ValueError(f'incomplete config {self.name} for {config.name}: {self}')
|
|
|
|
def __eq__(self, other: Any) -> bool:
|
|
return isinstance(other, self.__class__) and self.name == other.name
|
|
|
|
|
|
TO_IDENT_PATTERN = re.compile(r'[^a-zA_Z0-9]')
|
|
|
|
@dataclass
|
|
class Template:
|
|
name: str
|
|
source: str
|
|
instance: Instance
|
|
env: jinja2.Environment = field(init=False)
|
|
template: jinja2.Template = field(init=False)
|
|
|
|
def __post_init__(self) -> None:
|
|
self.env = jinja2.Environment()
|
|
self.env.filters['to_ident'] = self.filter_to_ident
|
|
self.template = self.env.from_string(self.source)
|
|
|
|
@staticmethod
|
|
def filter_to_ident(value: str, sep='_') -> str:
|
|
value = TO_IDENT_PATTERN.sub(sep, value)
|
|
dsep = sep + sep
|
|
while dsep in value:
|
|
value = value.replace(dsep, sep)
|
|
if value[0].isdigit():
|
|
value = sep + value
|
|
return value
|
|
|
|
@classmethod
|
|
def load(cls, instance: Instance, name: str) -> 'Template':
|
|
with open(f'{instance.conf_dir}/templates/{name}.tmpl', 'r') as f:
|
|
source = f.read()
|
|
return cls(name=name, source=source, instance=instance)
|
|
|
|
def save(self) -> None:
|
|
path = f'{self.instance.conf_dir}/templates/{self.name}.tmpl'
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, 'w') as f:
|
|
f.write(self.source)
|
|
|
|
def get_variables(self) -> Set[str]:
|
|
return jinja2.meta.find_undeclared_variables(jinja2.Environment().parse(self.source))
|
|
|
|
def render(self, config: Config) -> str:
|
|
templated = config.template()
|
|
return self.template.render(**templated.__dict__)
|
|
|
|
|
|
EXPORT_ADD_MARKER = '#+++ '
|
|
EXPORT_DEL_MARKER = '#--- '
|
|
|
|
def export_items(objects: List[Item]) -> str:
|
|
lines = []
|
|
for o in objects:
|
|
if not o.exists(o.instance, o.name):
|
|
lines.append(EXPORT_DEL_MARKER + o.name)
|
|
else:
|
|
lines.append(EXPORT_ADD_MARKER + o.name)
|
|
lines.extend(o.dump())
|
|
return '\n'.join(lines)
|
|
|
|
def import_items(instance: Instance, spec: str, duplicates: bool = True, replacements: bool = True, deletions: bool = True) -> O[Tuple[Set[str], Set[str]]]:
|
|
current = None
|
|
|
|
operations: List[Tuple[str, O[List[str]]]]
|
|
for line in spec.splitlines():
|
|
line = line.rstrip('\n')
|
|
if line.startswith(EXPORT_DEL_MARKER):
|
|
current = line[len(EXPORT_DEL_MARKER):].strip()
|
|
if not deletions:
|
|
logger.warn(f'import: {current}: deletion found, but not allowed')
|
|
continue
|
|
if current in deleted:
|
|
logger.warn(f'import: {current}: duplicate found')
|
|
if not duplicates:
|
|
return None
|
|
operations.append((current, None))
|
|
current = None
|
|
continue
|
|
if line.startswith(EXPORT_ADD_MARKER):
|
|
current = line[len(EXPORT_ADD_MARKER):].strip()
|
|
if Item.exists(instance, current) and not replacements:
|
|
logger.warn(f'import: {current}: already exists')
|
|
return None
|
|
if current in added:
|
|
logger.warn(f'import: {current}: duplicate found')
|
|
if not duplicates:
|
|
return None
|
|
operations.append((current, []))
|
|
continue
|
|
if not current:
|
|
logger.warn(f'import: ignoring leading line: {line}')
|
|
continue
|
|
item_spec = operations[-1][1]
|
|
if item_spec is not None:
|
|
item_spec.append(line)
|
|
|
|
added: Dict[str, Item] = {}
|
|
deleted: Dict[str, Item] = {}
|
|
for name, contents in operations:
|
|
if name not in deleted and name not in added and Item.exists(instance, name):
|
|
deleted[name] = Item.load(instance, name)
|
|
try:
|
|
if contents is None:
|
|
deleted[name].delete()
|
|
else:
|
|
item = Item.parse(instance, name, spec)
|
|
item.save()
|
|
added[name] = item
|
|
except:
|
|
logger.error(f'import: {name}: failed, rolling back')
|
|
break
|
|
else:
|
|
return set(added), set(deleted)
|
|
|
|
for name, item in added.items():
|
|
item.delete()
|
|
for name, item in deleted.items():
|
|
item.save()
|
|
|
|
return None
|
|
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.set_defaults(func=None)
|
|
parser.add_argument('-d', '--base-dir', default='.', help='Base directory for configuration')
|
|
|
|
commands = parser.add_subparsers(title='commands')
|
|
|
|
def split_meta(name: str, instance: Instance) -> Tuple[str, O[Meta]]:
|
|
meta = None
|
|
if ':' in name:
|
|
name, metaname = name.split(':', maxsplit=1)
|
|
meta = Meta.load(instance, metaname)
|
|
return name, meta
|
|
|
|
|
|
def do_new_meta(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
|
|
meta = Meta.parse(instance, args.name, args.spec)
|
|
meta.save()
|
|
|
|
new_meta = commands.add_parser('new-meta')
|
|
new_meta.add_argument('name')
|
|
new_meta.add_argument('spec', nargs='*')
|
|
new_meta.set_defaults(func=do_new_meta)
|
|
|
|
def do_new_item(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
|
|
name, meta = split_meta(args.name, instance)
|
|
item = Item.parse(instance, name, args.spec)
|
|
if meta:
|
|
meta.check_complete()
|
|
item.check_complete(meta)
|
|
item.save()
|
|
|
|
new_item = commands.add_parser('new')
|
|
new_item.add_argument('name')
|
|
new_item.add_argument('spec', nargs='*')
|
|
new_item.set_defaults(func=do_new_item)
|
|
|
|
def do_check_item(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> int:
|
|
name, meta = split_meta(args.name, instance)
|
|
item = Item.load(instance, name)
|
|
if meta:
|
|
meta.check_complete()
|
|
return 0 if item.is_complete(meta) else 1
|
|
else:
|
|
return 0
|
|
|
|
check_item = commands.add_parser('check')
|
|
check_item.add_argument('name')
|
|
check_item.set_defaults(func=do_check_item)
|
|
|
|
def do_template(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
|
|
template = Template.load(instance, args.name)
|
|
config = Config.parse(instance, '<vars>', args.spec)
|
|
missing = template.get_variables() - set(config)
|
|
if missing:
|
|
parser.error(f'missing variables: {", ".join(missing)}')
|
|
print(template.render(config))
|
|
|
|
template_item = commands.add_parser('template')
|
|
template_item.add_argument('name')
|
|
template_item.add_argument('spec', nargs='*')
|
|
template_item.set_defaults(func=do_template)
|
|
|
|
args = parser.parse_args()
|
|
if not args.func:
|
|
parser.error('a subcommand must be provided')
|
|
instance = Instance(args.base_dir)
|
|
sys.exit(args.func(parser, args, instance))
|