import os from types import SimpleNamespace, FunctionType from contextlib import contextmanager from typing import ( Any, Callable, Generator, Iterable, List, Mapping, Dict, Sequence, Tuple, TypeVar, Generic, Union as U, Optional as O, Generic as G, cast, ) from .util import seeking from .io import Segment, Stream, Pos T = TypeVar('T') class DebugTreeNode(Generic[T]): __slots__ = ('type', 'pos', 'end', 'value', 'children') def __init__(self, type: 'Type[T]', pos: Pos = None) -> None: self.type = type self.pos = pos self.value = None self.end = None self.children = [] def finalize(self, value: T, end: Pos) -> None: self.value = value self.end = end def add_child(self, ident, node: 'DebugTreeNode') -> None: self.children.append((ident, node)) class Params: __slots__ = ('segments', 'default_segment', 'user', 'debug_path', 'debug_root') def __init__(self, segments: Sequence[Segment] = None, user: Mapping[str, Any] = {}, debug: bool = False): default = segments[0] if segments else Segment('default') self.segments = {s.name: s for s in (segments or [default, Segment('refs', [default])])} self.default_segment = default self.user = SimpleNamespace(**user) self.debug_path: O[List[DebugTreeNode]] = [] if debug else None self.debug_root: O[DebugTreeNode] = None def reset(self): self.debug_root = None self.debug_path = [] if self.debug_path is not None else None for s in self.segments.values(): s.reset() PathElement = U[None, str, int] PathEntry = Tuple[PathElement, 'Type'] def format_path(path: Iterable[PathElement]) -> str: s = '' first = True for p in path: if p is None: continue sep = '.' if isinstance(p, int): p = '[' + str(p) + ']' sep = '' if sep and not first: s += sep s += p first = False return s PT = TypeVar('PT') class PossibleDynamic(Generic[T]): pass class Context: __slots__ = ('root', 'value', 'params', 'path', 'segment_path') def __init__(self, root: 'Type', value: O[Any] = None, params: O[Params] = None) -> None: self.root = root self.value = value self.params = params or Params() self.path: List[PathEntry] = [] self.segment_path: List[Segment] = [] self.params.reset() def copy(self) -> 'Context': c = self.__class__(root=self.root, value=self.value, params=self.params) c.path = self.path.copy() c.segment_path = self.segment_path.copy() return c @property def segment(self) -> Segment: return self.segment_path[-1] if self.segment_path else self.params.default_segment @contextmanager def enter(self, entry: PathElement, type: 'Type') -> Generator: self.path.append((entry, type)) yield self.path.pop() @contextmanager def enter_segment(self, segment: Segment, stream: O[Stream] = None, pos: O[Pos] = None, reference = os.SEEK_SET) -> Generator[O[Stream], None, None]: if stream: if pos is None: if segment.offset is None: segment.offset = self.segment_offset(segment) segment.pos = segment.offset pos = segment.pos if pos is None: raise Error(self, ValueError('could not enter segment {}: could not calculate offset'.format(segment))) with seeking(stream.root, pos, reference) as s, stream.wrapped(s) as f: self.segment_path.append(segment) yield f self.segment_path.pop() segment.pos = f.tell() else: self.segment_path.append(segment) yield stream self.segment_path.pop() def segment_offset(self, segment: Segment) -> O[Pos]: size: Pos = 0 for s in segment.dependents: sz = self.segment_size(s) if sz is None: return None off = self.segment_offset(s) if off is None: return None size += off + sz return size def segment_size(self, segment: Segment) -> O[Pos]: sizes = self.sizeof(self.root, self.value) return sizes.get(segment, None) def format_path(self) -> str: return format_path(name for name, _ in self.path) def to_size(self, value: Any) -> Dict[Segment, Pos]: if not isinstance(value, dict): stream = self.segment_path[-1] if self.segment_path else self.params.default_segment value = {stream: value} return value def get(self, value: U[T, PossibleDynamic[T]]) -> T: from .expr import Expr, get if isinstance(value, Expr): value = get(value) return cast(T, value) def peek(self, value: U[T, PossibleDynamic[T]]) -> O[T]: from .expr import Expr, peek if isinstance(value, Expr): value = peek(value) return cast(T, value) def put(self, value: U[T, PossibleDynamic[T]], new: T) -> None: from .expr import Expr, put if isinstance(value, Expr): put(value, new) def parse(self, type: 'Type[PT]', stream: Stream) -> PT: if self.params.debug_path is not None: node = DebugTreeNode(type, stream.root.tell()) if self.params.debug_path: self.params.debug_path[-1].add_child(self.path[-1][0], node) else: self.params.debug_root = node self.params.debug_path.append(node) value = None try: value = type.parse(self, stream) return value finally: if self.params.debug_path is not None: node.finalize(value, stream.root.tell()) self.params.debug_path.pop() def dump(self, type: 'Type[PT]', stream: Stream, value: PT) -> None: return type.dump(self, stream, value) def sizeof(self, type: 'Type[PT]', value: O[PT] = None) -> Dict[Segment, Pos]: return self.to_size(type.sizeof(self, value)) def offsetof(self, type: 'Type[PT]', path: Sequence[PathElement], value: O[PT] = None) -> Dict[Segment, Pos]: return self.to_size(type.offsetof(self, path, value)) def default(self, type: 'Type[PT]') -> PT: return type.default(self) class Type(G[PT]): __slots__ = () def parse(self, context: Context, stream: Stream) -> PT: raise NotImplementedError def dump(self, context: Context, stream: Stream, value: PT) -> None: raise NotImplementedError def sizeof(self, context: Context, value: O[PT]) -> U[Mapping[str, int], O[int]]: return None def offsetof(self, context: Context, path: Sequence[PathElement], value: O[PT]) -> O[int]: if path: return None else: return 0 def default(self, context: Context) -> PT: raise NotImplementedError class Error(Exception): __slots__ = ('context',) def __init__(self, context: Context, exception: Exception) -> None: path = context.format_path() if path: path = '[' + path + '] ' if not isinstance(exception, Exception): exception = ValueError(exception) super().__init__('{}{}: {}'.format( path, exception.__class__.__name__, str(exception), )) self.exception = exception self.context = context.copy() PossibleType = U[Type, list, tuple, Callable[[O[Any]], Type]] def to_type(type: PossibleType, ident: O[Any] = None) -> Type: if isinstance(type, Type): return type t = getattr(type, '_sx_type_', None) if t: return t getter = getattr(type, '_get_sx_type_', None) if getter: return getter(ident) if isinstance(type, FunctionType): return type(ident) raise ValueError('Could not figure out specification from argument {}.'.format(type))