weegee/weegee/dazy.py

902 lines
29 KiB
Python

from __future__ import annotations
from logging import getLogger
import os
import re
from copy import deepcopy
from enum import Enum
from dataclasses import dataclass, field
from ast import literal_eval
import ipaddress
from fnmatch import fnmatch
from types import SimpleNamespace
from itertools import chain
from typing import (
Optional as O, Union as U, Type as TypeOf, Any, TypeVar, Generic,
Iterator, Iterable, Tuple, List, Dict, Set,
)
import jinja2
import jinja2.meta
logger = getLogger(__name__)
IPAddressType = U[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPNetworkType = U[ipaddress.IPv4Network, ipaddress.IPv6Network]
IPInterfaceType = U[ipaddress.IPv4Interface, ipaddress.IPv6Interface]
T = TypeVar('T')
def stripsplit(s: str, sep: str, maxsplit: int = -1) -> List[str]:
return [x.strip() for x in s.split(sep, maxsplit=maxsplit)]
def quotesplit(s: str, sep: str, maxsplit: int = - 1) -> List[str]:
parts: List[str] = []
pos = -1
if maxsplit < 0:
maxsplit = len(s)
while len(parts) < maxsplit:
npos = s.find(sep, pos + 1)
if npos < 0:
break
pos = npos
if pos > 0 and s[pos - 1] == '\\':
s = s[:pos - 1] + s[pos:]
continue
parts.append(s[:pos])
s = s[pos + 1:]
pos = -1
parts.append(s)
return parts
def quotestripsplit(s: str, sep: str, maxsplit: int = -1) -> List[str]:
return [x.strip() for x in quotesplit(s, sep, maxsplit=maxsplit)]
def indent(s, amount=0, skip=0) -> str:
spacing = ' ' * amount
return '\n'.join((spacing if i >= skip else '') + l for i, l in enumerate(s.splitlines()))
class Instance:
def __init__(self, conf_dir: str, data_dir: O[str] = None) -> None:
self.conf_dir = conf_dir
self.data_dir = data_dir or conf_dir
class ParseError(ValueError):
pass
class LitParser(Generic[T]):
@classmethod
def parse(cls, value: str) -> T:
...
@classmethod
def match(cls, value: Any) -> bool:
...
@classmethod
def dump(cls, value: T) -> str:
...
class Bool(LitParser[bool]):
@classmethod
def parse(cls, value: str) -> bool:
return {
'true': True,
'1': True,
'false': False,
'0': False,
}[value]
@classmethod
def match(cls, value: Any) -> bool:
return isinstance(value, bool)
@classmethod
def dump(cls, value: bool) -> str:
return {
True: 'true',
False: 'false',
}[value]
class Int(LitParser[int]):
@classmethod
def parse(cls, value: str) -> int:
return int(value)
@classmethod
def match(cls, value: Any) -> bool:
return isinstance(value, int) and not isinstance(value, bool)
@classmethod
def dump(cls, value: int) -> str:
return repr(value)
class Str(LitParser[str]):
@classmethod
def parse(cls, value: str) -> str:
val = literal_eval(value)
if not isinstance(val, str):
raise TypeError(value)
return val
@classmethod
def match(cls, value: Any) -> bool:
return isinstance(value, str)
@classmethod
def dump(cls, value: str) -> str:
return repr(value)
class IPAddress(LitParser[IPAddressType]):
@classmethod
def parse(cls, value: str) -> IPAddressType:
return ipaddress.ip_address(value)
@classmethod
def match(cls, value: Any) -> bool:
return isinstance(value, (ipaddress.IPv4Address, ipaddress.IPv6Address))
@classmethod
def dump(cls, value: IPAddressType) -> str:
return str(value)
class IPNetwork(LitParser[IPNetworkType]):
@classmethod
def parse(cls, value: str) -> IPNetworkType:
return ipaddress.ip_network(value)
@classmethod
def match(cls, value: Any) -> bool:
return isinstance(value, (ipaddress.IPv4Network, ipaddress.IPv6Network))
@classmethod
def dump(cls, value: IPNetworkType) -> str:
return str(value)
class IPInterface(LitParser[IPInterfaceType]):
@classmethod
def parse(cls, value: str) -> IPInterfaceType:
return ipaddress.ip_interface(value)
@classmethod
def match(cls, value: Any) -> bool:
return isinstance(value, (ipaddress.IPv4Interface, ipaddress.IPv6Interface))
@classmethod
def dump(cls, value: IPInterfaceType) -> str:
return str(value)
TemplateT = TypeVar('TemplateT')
class Type(Generic[T, TemplateT]):
@classmethod
def parse(cls, input: str) -> 'Type':
if input.startswith('?'):
return OptionalType.parse(input)
if input.startswith('['):
return ArrType.parse(input)
if input.startswith('{'):
return MapType.parse(input)
if input.startswith('@'):
return RefType.parse(input)
if input.startswith('*'):
return GlobType.parse(input)
return LitType.parse(input)
def dump(self) -> str:
raise NotImplementedError
def match_value(self, input: Any, instance: Instance) -> bool:
return False
def parse_value(self, input: str, instance: Instance) -> T:
raise TypeError
def dump_value(self, input: T) -> str:
return repr(input)
def template_value(self, input: T) -> TemplateT:
raise NotImplementedError
@dataclass
class OptionalType(Generic[T, TemplateT], Type[O[T], O[TemplateT]]):
subtype: Type[T, TemplateT]
@classmethod
def parse(cls, input: str) -> 'OptionalType[T, TemplateT]':
if input[0] != '?':
raise ParseError()
return cls(Type.parse(input[1:]))
def dump(self) -> str:
return f'?{self.subtype.dump()}'
def match_value(self, input: Any, instance: Instance) -> bool:
return input is None or self.subtype.match_value(input, instance)
def parse_value(self, input: str, instance: Instance) -> O[T]:
if not input:
return None
return self.subtype.parse_value(input, instance)
def dump_value(self, input: O[T]) -> str:
if input is None:
return ''
return self.subtype.dump_value(input)
def template_value(self, input: O[T]) -> O[TemplateT]:
if input is None:
return None
return self.subtype.template_value(input)
@dataclass
class RefType(Type['Item', SimpleNamespace]):
name: str
@classmethod
def parse(cls, input: str) -> 'RefType':
if input[0] != '@':
raise ParseError()
return cls(input[1:])
def dump(self) -> str:
return f'@{self.name}'
def match_value(self, input: Any, instance: Instance) -> bool:
meta = Meta.load(instance, self.name)
return isinstance(input, Item) and input.is_complete(meta)
def parse_value(self, input: str, instance: Instance) -> 'Item':
meta = Meta.load(instance, self.name)
return Item.load(instance, input).resolve(meta)
def dump_value(self, input: 'Item') -> str:
return input.name
def template_value(self, input: 'Item') -> SimpleNamespace:
return input.template()
@dataclass
class GlobType(Type[Dict[str, 'Item'], Dict[str, SimpleNamespace]]):
name: str
@classmethod
def parse(cls, input: str) -> 'GlobType':
if input[0] != '*':
raise ParseError()
return cls(input[1:])
def dump(self) -> str:
s = f'*{self.name}'
return s
def match_value(self, input: Any, instance: Instance) -> bool:
meta = Meta.load(instance, self.name)
return isinstance(input, list) and all(isinstance(x, Item) and x.is_complete(meta) for x in input)
def parse_value(self, input: str, instance: Instance) -> Dict[str, 'Item']:
meta = Meta.load(instance, self.name)
out = {}
for x in Item.filter(instance, input):
item = Item.load(instance, x)
if not item.is_complete(meta):
continue
out[x] = item.resolve(meta)
return out
def dump_value(self, input: Dict[str, 'Item']) -> str:
return '<unimplemented>'
def template_value(self, input: Dict[str, 'Item']) -> Dict[str, SimpleNamespace]:
return {x.name: x.template() for x in input.values()}
@dataclass
class LitType(Generic[T], Type[T, T]):
SUBTYPES = {
'bool': Bool,
'int': Int,
'str': Str,
'ipaddr': IPAddress,
'ipnet': IPNetwork,
'ipintf': IPInterface,
}
name: str
subtype: TypeOf[LitParser[T]]
@classmethod
def parse(cls, input: str) -> 'LitType[T]':
if input not in cls.SUBTYPES:
raise ParseError()
return cls(name=input, subtype=cls.SUBTYPES[input])
def dump(self) -> str:
return self.name
def match_value(self, input: Any, instance: Instance) -> bool:
return self.subtype.match(input)
def parse_value(self, input: str, instance: Instance) -> T:
return self.subtype.parse(input)
def template_value(self, input: T) -> T:
return input
def dump_value(self, input: T) -> str:
return self.subtype.dump(input)
@dataclass
class ArrType(Generic[T, TemplateT], Type[List[T], List[TemplateT]]):
subtype: Type[T, TemplateT]
@classmethod
def parse(cls, input: str) -> 'ArrType[T, TemplateT]':
if input[0] != '[' or input[-1] != ']':
raise ParseError()
return cls(subtype=Type.parse(input[1:-1]))
def dump(self) -> str:
return f'[{self.subtype.dump()}]'
def match_value(self, input: Any, instance: Instance) -> bool:
return isinstance(input, list) and all(self.subtype.match_value(x, instance) for x in input)
def parse_value(self, input: str, instance: Instance) -> List[T]:
if input[0] != '[' or input[-1] != ']':
raise ParseError()
input = input[1:-1].strip()
if not input:
return []
return [self.subtype.parse_value(x.strip(), instance) for x in quotesplit(input, ',')]
def dump_value(self, input: List[T]) -> str:
return '[' + ', '.join(self.subtype.dump_value(x).replace(',', '\\,') for x in input) + ']'
def template_value(self, input: List[T]) -> List[TemplateT]:
return [self.subtype.template_value(x) for x in input]
V = TypeVar('V')
TemplateV = TypeVar('TemplateV')
@dataclass
class MapType(Generic[T, TemplateT, V, TemplateV], Type[Dict[T, V], Dict[TemplateT, TemplateV]]):
keytype: Type[T, TemplateT]
valtype: Type[V, TemplateV]
@classmethod
def parse(cls, input: str) -> 'MapType[T, TemplateT, V, TemplateV]':
if input[0] != '{' or input[-1] != '}':
raise ParseError()
kt, vt = quotesplit(input[1:-1], ':', maxsplit=1)
return cls(keytype=Type.parse(kt.strip()), valtype=Type.parse(vt.strip()))
def dump(self) -> str:
kt = self.keytype.dump().replace(':', '\\:')
vt = self.valtype.dump()
return f'{{{kt}: {vt}}}'
def match_value(self, input: Any, instance: Instance) -> bool:
return isinstance(input, dict) and all(self.keytype.match_value(k, instance) and self.valtype.match_value(v, instance) for (k, v) in input.items())
def parse_value(self, input: str, instance: Instance) -> Dict[T, V]:
if input[0] != '{' or input[-1] != '}':
raise ParseError()
out = {}
input[1:-1].strip()
if input:
items = quotesplit(input, ',')
for item in items:
k, v = quotesplit(item, ':', maxsplit=1)
out[self.keytype.parse_value(k.strip(), instance)] = self.valtype.parse_value(v.strip(), instance)
return out
def dump_value(self, input: Dict[T, V]) -> str:
return '{' + ', '.join((self.keytype.dump_value(k).replace(':', '\\:') + ': ' + self.valtype.dump_value(v)).replace(',', '\\,') for (k, v) in input.items()) + '}'
def template_value(self, input: Dict[T, V]) -> Dict[TemplateT, TemplateV]:
return {self.keytype.template_value(k): self.valtype.template_value(v) for (k, v) in input.items()}
C = TypeVar('C', bound='Config')
@dataclass
class Config:
name: str
types: Dict[str, Type]
values: Dict[str, Any]
unresolved: Dict[str, str]
instance: 'Instance'
kind: O[str] = None
@classmethod
def parse(cls: TypeOf[C], instance: Instance, name: str, lines: Iterable[str]) -> C:
c = cls(name=name, types={}, values={}, unresolved={}, instance=instance)
for line in lines:
line, *_ = quotestripsplit(line, '#', maxsplit=1)
if not line:
continue
line, *parts = quotestripsplit(line, '=', maxsplit=1)
valstr = parts[0] if parts else None
vname, *parts = quotestripsplit(line, ':', maxsplit=1)
typestr = parts[0] if parts else None
if valstr is not None:
c.unresolved[vname] = valstr
if typestr is not None:
c.resolve_type(vname, Type.parse(typestr))
return c
def dump(self) -> List[str]:
keys = {}
for k in self.types:
keys[k] = True
for k in self.values:
keys[k] = True
for k in self.unresolved:
keys[k] = True
out = []
for name in keys:
line = name.replace(':', '\\:').replace('=', '\\=')
if name in self.types:
type = self.types[name].dump().replace('=', '\\=')
line += f': {type}'
value = None
if name in self.values:
if name not in self.types:
raise ValueError()
value = self.types[name].dump_value(self.values[name])
elif name in self.unresolved:
value = self.unresolved[name]
if value is not None:
line += f' = {value}'
out.append(line.replace('#', '\\#'))
return out
@classmethod
def define(cls: TypeOf[C], _instance: Instance, _name: str, _kind: O[str] = None, **args: Type) -> C:
return cls(name=_name, types=args, values={}, unresolved={}, instance=_instance, kind=_kind)
@classmethod
def make(cls: TypeOf[C], _instance: Instance, _name: str, _kind: O[str] = None, **args: Dict[str, Any]) -> C:
return cls(name=_name, types={}, values=args, unresolved={}, instance=_instance, kind=_kind)
def copy(self: C) -> C:
return self.__class__(self.name, instance=self.instance, kind=self.kind,
types=self.types.copy(), values=self.values.copy(), unresolved=self.unresolved.copy(),
)
def template(self) -> SimpleNamespace:
resolved = {}
for name, value in self.values.items():
if name in self.types:
resolved[name] = self.types[name].template_value(value)
return SimpleNamespace(**resolved)
def merge(self: C, other: 'Config') -> C:
c = self.__class__(name=self.name, types={}, values={}, unresolved={}, instance=self.instance, kind=self.kind)
for name, value in chain(other.unresolved.items(), self.unresolved.items()):
if name not in self:
c.unresolved[name] = value
for name, type in chain(other.types.items(), self.types.items()):
c.resolve_type(name, type)
for name, value in chain(self.values.items(), other.values.items()):
if name not in c:
c.values[name] = deepcopy(value)
return c
def split(self: C, other: 'Config') -> C:
c = self.copy()
for name, value in other.values.items():
if name in c and c[name] == value:
del c[name]
for name, type in other.types.items():
if c.is_type_resolved(name) and c.types[name] == type:
c.unresolve_type(name)
for name, value in other.unresolved.items():
if name in self.unresolved and self.unresolved[name] == value:
del c.unresolved[name]
return c
def purify(self: C) -> C:
c = self.copy()
c.unresolved = {}
return c
def is_type_resolved(self, name: str) -> bool:
return name in self.types
def is_value_resolved(self, name: str) -> bool:
return name in self.values
def is_unresolved(self, name: str) -> bool:
return name in self.unresolved
def resolve_type(self, name: str, type: Type) -> None:
self.types[name] = type
if name in self.unresolved:
self.values[name] = self.resolve_value(name, self.unresolved[name])
del self.unresolved[name]
def unresolve_type(self, name: str) -> None:
if name in self.values:
self.unresolved[name] = self.unresolve_value(name, self.values[name])
del self.values[name]
del self.types[name]
def resolve_value(self, name: str, value: str) -> Any:
return self.types[name].parse_value(value, self.instance)
def unresolve_value(self, name: str, value: Any) -> str:
return self.types[name].dump_value(value)
def is_type_complete(self, other: O['Config'] = None) -> bool:
if not other:
other = self
return all(self.is_type_resolved(x) for x in other.values)
def is_value_complete(self, other: O['Config'] = None) -> bool:
if not other:
other = self
return all(self.is_value_resolved(x) for x in other.types)
def __getitem__(self, name: str) -> Any:
return self.values[name]
def __setitem__(self, name: str, value: Any) -> None:
vtype = self.types[name]
if not vtype.match_value(value, self.instance):
raise TypeError(f'{name}: {value} must match {vtype.dump()}')
self.values[name] = value
def __delitem__(self, name: str) -> None:
del self.values[name]
def __contains__(self, name: str) -> bool:
return name in self.values
def __iter__(self) -> Iterator[str]:
return iter(self.values)
def __repr__(self) -> str:
keys = {}
for k in self.types:
keys[k] = True
for k in self.values:
keys[k] = True
for k in self.unresolved:
keys[k] = True
value = self.name
if self.kind:
value += f': {self.kind}'
value += ' {\n'
for name in keys:
value += f' {name}'
if name in self.types:
value += f': {self.types[name].dump()}'
if name in self.values:
value += f' = {indent(str(self.values[name]), amount=2, skip=1)}'
elif name in self.unresolved:
value += f' = ?{indent(str(self.unresolved[name]), amount=2, skip=1)}'
value += '\n'
value += '}'
return value
@dataclass(repr=False)
class Meta(Config):
@classmethod
def load(cls, instance: 'Instance', name: str) -> 'Meta':
with open(f'{instance.conf_dir}/configs/{name}.conf', 'r') as f:
return cls.parse(instance, name, f.readlines())
def save(self) -> None:
path = f'{self.instance.conf_dir}/configs/{self.name}.conf'
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write('\n'.join(self.dump()) + '\n')
@property
def is_complete(self) -> bool:
return self.is_type_complete()
def check_complete(self) -> None:
if not self.is_complete:
raise ValueError(f'Incomplete config for meta: {self}')
def __eq__(self, other: Any) -> bool:
return isinstance(other, self.__class__) and self.name == other.name
@dataclass(repr=False)
class Item(Config):
@classmethod
def get_path(cls, instance: Instance, name: str) -> str:
return f'{instance.data_dir}/items/{name}.conf'
@classmethod
def get_trashed_path(cls, instance: Instance, name: str) -> str:
return f'{instance.data_dir}/trash/{name}.conf'
@classmethod
def exists(cls, instance: Instance, name: str) -> bool:
return os.path.isfile(cls.get_path(instance, name))
@classmethod
def restorable(cls, instance: Instance, name: str) -> bool:
return os.path.isfile(cls.get_trashed_path(instance, name))
@classmethod
def filter(cls, instance: Instance, pattern: str) -> List[str]:
basedir = f'{instance.data_dir}/items/'
allfiles = []
for root, _, files in os.walk(basedir):
for f in files:
if not f.endswith('.conf'):
continue
base = os.path.join(root[len(basedir):], f[:-len('.conf')])
if not fnmatch(base, pattern):
continue
allfiles.append(base)
return allfiles
@classmethod
def load(cls, instance: Instance, name: str) -> 'Item':
return cls.loadfile(instance, name, cls.get_path(instance, name))
@classmethod
def loadfile(cls, instance: Instance, name: str, path: str, ) -> 'Item':
with open(path, 'r') as f:
return cls.parse(instance, name, f)
def save(self) -> None:
return self.savefile(self.get_path(self.instance, self.name))
def savefile(self, path: str) -> None:
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write('\n'.join(self.dump()) + '\n')
def trash(self) -> None:
trash_path = self.get_trashed_path(self.instance, self.name)
os.makedirs(os.path.dirname(trash_path), exist_ok=True)
os.rename(self.get_path(self.instance, self.name), trash_path)
@classmethod
def restore(cls, instance: Instance, name: str) -> 'Item':
path = cls.get_path(instance, name)
os.makedirs(os.path.dirname(path), exist_ok=True)
os.rename(cls.get_trashed_path(instance, name), path)
return cls.load(instance, name)
def delete(self) -> None:
if self.exists(self.instance, self.name):
os.remove(self.get_path(self.instance, self.name))
def resolve(self, config: Config) -> 'Item':
c = self.merge(config)
c.kind = config.name
return c
def unresolve(self, config: Config) -> 'Item':
c = self.split(config)
c.kind = None
return c
def is_complete(self, config: Config) -> bool:
return self.resolve(config).is_value_complete(config)
def check_complete(self, config: Config) -> None:
if not self.is_complete(config):
raise ValueError(f'incomplete config {self.name} for {config.name}: {self}')
def __eq__(self, other: Any) -> bool:
return isinstance(other, self.__class__) and self.name == other.name
TO_IDENT_PATTERN = re.compile(r'[^a-zA_Z0-9]')
@dataclass
class Template:
name: str
source: str
instance: Instance
env: jinja2.Environment = field(init=False)
template: jinja2.Template = field(init=False)
def __post_init__(self) -> None:
self.env = jinja2.Environment()
self.env.filters['to_ident'] = self.filter_to_ident
self.template = self.env.from_string(self.source)
@staticmethod
def filter_to_ident(value: str, sep='_') -> str:
value = TO_IDENT_PATTERN.sub(sep, value)
dsep = sep + sep
while dsep in value:
value = value.replace(dsep, sep)
if value[0].isdigit():
value = sep + value
return value
@classmethod
def load(cls, instance: Instance, name: str) -> 'Template':
with open(f'{instance.conf_dir}/templates/{name}.tmpl', 'r') as f:
source = f.read()
return cls(name=name, source=source, instance=instance)
def save(self) -> None:
path = f'{self.instance.conf_dir}/templates/{self.name}.tmpl'
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
f.write(self.source)
def get_variables(self) -> Set[str]:
return jinja2.meta.find_undeclared_variables(jinja2.Environment().parse(self.source))
def render(self, config: Config) -> str:
templated = config.template()
return self.template.render(**templated.__dict__)
EXPORT_ADD_MARKER = '#+++ '
EXPORT_DEL_MARKER = '#--- '
def export_items(objects: List[Item]) -> str:
lines = []
for o in objects:
if not o.exists(o.instance, o.name):
lines.append(EXPORT_DEL_MARKER + o.name)
else:
lines.append(EXPORT_ADD_MARKER + o.name)
lines.extend(o.dump())
return '\n'.join(lines)
def import_items(instance: Instance, spec: str, duplicates: bool = True, replacements: bool = True, deletions: bool = True) -> O[Tuple[Set[str], Set[str]]]:
current = None
operations: List[Tuple[str, O[List[str]]]]
for line in spec.splitlines():
line = line.rstrip('\n')
if line.startswith(EXPORT_DEL_MARKER):
current = line[len(EXPORT_DEL_MARKER):].strip()
if not deletions:
logger.warn(f'import: {current}: deletion found, but not allowed')
continue
if current in deleted:
logger.warn(f'import: {current}: duplicate found')
if not duplicates:
return None
operations.append((current, None))
current = None
continue
if line.startswith(EXPORT_ADD_MARKER):
current = line[len(EXPORT_ADD_MARKER):].strip()
if Item.exists(instance, current) and not replacements:
logger.warn(f'import: {current}: already exists')
return None
if current in added:
logger.warn(f'import: {current}: duplicate found')
if not duplicates:
return None
operations.append((current, []))
continue
if not current:
logger.warn(f'import: ignoring leading line: {line}')
continue
item_spec = operations[-1][1]
if item_spec is not None:
item_spec.append(line)
added: Dict[str, Item] = {}
deleted: Dict[str, Item] = {}
for name, contents in operations:
if name not in deleted and name not in added and Item.exists(instance, name):
deleted[name] = Item.load(instance, name)
try:
if contents is None:
deleted[name].delete()
else:
item = Item.parse(instance, name, spec)
item.save()
added[name] = item
except:
logger.error(f'import: {name}: failed, rolling back')
break
else:
return set(added), set(deleted)
for name, item in added.items():
item.delete()
for name, item in deleted.items():
item.save()
return None
if __name__ == '__main__':
import sys
import argparse
parser = argparse.ArgumentParser()
parser.set_defaults(func=None)
parser.add_argument('-d', '--base-dir', default='.', help='Base directory for configuration')
commands = parser.add_subparsers(title='commands')
def split_meta(name: str, instance: Instance) -> Tuple[str, O[Meta]]:
meta = None
if ':' in name:
name, metaname = name.split(':', maxsplit=1)
meta = Meta.load(instance, metaname)
return name, meta
def do_new_meta(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
meta = Meta.parse(instance, args.name, args.spec)
meta.save()
new_meta = commands.add_parser('new-meta')
new_meta.add_argument('name')
new_meta.add_argument('spec', nargs='*')
new_meta.set_defaults(func=do_new_meta)
def do_new_item(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
name, meta = split_meta(args.name, instance)
item = Item.parse(instance, name, args.spec)
if meta:
meta.check_complete()
item.check_complete(meta)
item.save()
new_item = commands.add_parser('new')
new_item.add_argument('name')
new_item.add_argument('spec', nargs='*')
new_item.set_defaults(func=do_new_item)
def do_check_item(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> int:
name, meta = split_meta(args.name, instance)
item = Item.load(instance, name)
if meta:
meta.check_complete()
return 0 if item.is_complete(meta) else 1
else:
return 0
check_item = commands.add_parser('check')
check_item.add_argument('name')
check_item.set_defaults(func=do_check_item)
def do_template(parser: argparse.ArgumentParser, args: argparse.Namespace, instance: Instance) -> None:
template = Template.load(instance, args.name)
config = Config.parse(instance, '<vars>', args.spec)
missing = template.get_variables() - set(config)
if missing:
parser.error(f'missing variables: {", ".join(missing)}')
print(template.render(config))
template_item = commands.add_parser('template')
template_item.add_argument('name')
template_item.add_argument('spec', nargs='*')
template_item.set_defaults(func=do_template)
args = parser.parse_args()
if not args.func:
parser.error('a subcommand must be provided')
instance = Instance(args.base_dir)
sys.exit(args.func(parser, args, instance))