sx/sx/core/base.py

216 lines
6.6 KiB
Python

import os
from types import SimpleNamespace, FunctionType
from contextlib import contextmanager
from typing import (
Any, Callable, Generator, Iterable, List, Mapping, Dict, Sequence, Tuple,
TypeVar, Generic, Union as U, Optional as O, Generic as G, cast,
)
from .util import seeking
from .io import Segment, Stream, Pos
class Params:
__slots__ = ('segments', 'default_segment', 'user')
def __init__(self, segments: Sequence[Segment] = None, user: Mapping[str, Any] = {}):
default = segments[0] if segments else Segment('default')
self.segments = {s.name: s for s in (segments or [default, Segment('refs', [default])])}
self.default_segment = default
self.user = SimpleNamespace(**user)
def reset(self):
for s in self.segments.values():
s.reset()
PathElement = U[None, str, int]
PathEntry = Tuple[PathElement, 'Type']
def format_path(path: Iterable[PathElement]) -> str:
s = ''
first = True
for p in path:
if p is None:
continue
sep = '.'
if isinstance(p, int):
p = '[' + str(p) + ']'
sep = ''
if sep and not first:
s += sep
s += p
first = False
return s
T = TypeVar('T')
PT = TypeVar('PT')
class PossibleDynamic(Generic[T]):
pass
class Context:
__slots__ = ('root', 'value', 'params', 'path', 'segment_path')
def __init__(self, root: 'Type', value: O[Any] = None, params: O[Params] = None) -> None:
self.root = root
self.value = value
self.params = params or Params()
self.path: List[PathEntry] = []
self.segment_path: List[Segment] = []
def copy(self) -> 'Context':
c = self.__class__(root=self.root, value=self.value, params=self.params)
c.path = self.path.copy()
c.segment_path = self.segment_path.copy()
return c
@property
def segment(self) -> Segment:
return self.segment_path[-1] if self.segment_path else self.params.default_segment
@contextmanager
def enter(self, entry: PathElement, type: 'Type') -> Generator:
self.path.append((entry, type))
yield
self.path.pop()
@contextmanager
def enter_segment(self, segment: Segment, stream: O[Stream] = None, pos: O[Pos] = None, reference = os.SEEK_SET) -> Generator[O[Stream], None, None]:
if stream:
if pos is None:
if segment.offset is None:
segment.offset = self.segment_offset(segment)
segment.pos = segment.offset
pos = segment.pos
if pos is None:
raise Error(self, ValueError('could not enter segment {}: could not calculate offset'.format(segment)))
with seeking(stream.root, pos, reference) as s, stream.wrapped(s) as f:
self.segment_path.append(segment)
yield f
self.segment_path.pop()
segment.pos = f.tell()
else:
self.segment_path.append(segment)
yield stream
self.segment_path.pop()
def segment_offset(self, segment: Segment) -> O[Pos]:
size: Pos = 0
for s in segment.dependents:
sz = self.segment_size(s)
if sz is None:
return None
off = self.segment_offset(s)
if off is None:
return None
size += off + sz
return size
def segment_size(self, segment: Segment) -> O[Pos]:
sizes = self.sizeof(self.root, self.value)
return sizes.get(segment, None)
def format_path(self) -> str:
return format_path(name for name, _ in self.path)
def to_size(self, value: Any) -> Dict[Segment, Pos]:
if not isinstance(value, dict):
stream = self.segment_path[-1] if self.segment_path else self.params.default_segment
value = {stream: value}
return value
def get(self, value: U[T, PossibleDynamic[T]]) -> T:
from .expr import Expr, get
if isinstance(value, Expr):
value = get(value)
return cast(T, value)
def peek(self, value: U[T, PossibleDynamic[T]]) -> O[T]:
from .expr import Expr, peek
if isinstance(value, Expr):
value = peek(value)
return cast(T, value)
def put(self, value: U[T, PossibleDynamic[T]], new: T) -> None:
from .expr import Expr, put
if isinstance(value, Expr):
put(value, new)
def parse(self, type: 'Type[PT]', stream: Stream) -> PT:
return type.parse(self, stream)
def dump(self, type: 'Type[PT]', stream: Stream, value: PT) -> None:
return type.dump(self, stream, value)
def sizeof(self, type: 'Type[PT]', value: O[PT] = None) -> Dict[Segment, Pos]:
return self.to_size(type.sizeof(self, value))
def offsetof(self, type: 'Type[PT]', path: Sequence[PathElement], value: O[PT] = None) -> Dict[Segment, Pos]:
return self.to_size(type.offsetof(self, path, value))
def default(self, type: 'Type[PT]') -> PT:
return type.default(self)
class Type(G[PT]):
__slots__ = ()
def parse(self, context: Context, stream: Stream) -> PT:
raise NotImplementedError
def dump(self, context: Context, stream: Stream, value: PT) -> None:
raise NotImplementedError
def sizeof(self, context: Context, value: O[PT]) -> U[Mapping[str, int], O[int]]:
return None
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[PT]) -> O[int]:
if path:
return None
else:
return 0
def default(self, context: Context) -> PT:
raise NotImplementedError
class Error(Exception):
__slots__ = ('context',)
def __init__(self, context: Context, exception: Exception) -> None:
path = context.format_path()
if path:
path = '[' + path + '] '
if not isinstance(exception, Exception):
exception = ValueError(exception)
super().__init__('{}{}: {}'.format(
path, exception.__class__.__name__, str(exception),
))
self.exception = exception
self.context = context.copy()
PossibleType = U[Type, list, tuple, Callable[[O[Any]], Type]]
def to_type(type: PossibleType, ident: O[Any] = None) -> Type:
if isinstance(type, Type):
return type
t = getattr(type, '_sx_type_', None)
if t:
return t
getter = getattr(type, '_get_sx_type_', None)
if getter:
return getter(ident)
if isinstance(type, FunctionType):
return type(ident)
raise ValueError('Could not figure out specification from argument {}.'.format(type))