226 lines
8.5 KiB
Python
226 lines
8.5 KiB
Python
import enum
|
|
import io
|
|
from typing import Optional as O, Generic as G, Union as U, TypeVar, Callable, Sequence, Mapping, Type as Ty
|
|
from ..core.base import Type, Context, PathElement
|
|
from ..core.io import Stream, Pos, PosInfo, to_stream
|
|
from ..core.meta import Wrapper
|
|
from ..core import to_type, dump
|
|
|
|
from .data import Data
|
|
|
|
|
|
T = TypeVar('T')
|
|
V = TypeVar('V')
|
|
|
|
class Default(G[T], Wrapper[T]):
|
|
def __init__(self, child: Type[T], default: T) -> None:
|
|
super().__init__(child)
|
|
self._default = default
|
|
|
|
def default(self, context: Context) -> T:
|
|
return self._default
|
|
|
|
|
|
Preprocessor = U[Callable[[bytes], bytes], Callable[[bytes, Context], bytes]]
|
|
|
|
class Preproc(G[T], Wrapper[T]):
|
|
def __init__(self, child: Type[T], parse: Preprocessor, dump: O[Preprocessor] = None, sizeof: O[U[Callable[[O[T]], Pos], Callable[[O[T], Context], Pos]]] = None, context: bool = False, bits: bool = False, str: O[str] = None, repr: O[str] = None) -> None:
|
|
super().__init__(child)
|
|
self.on_parse = parse
|
|
self.on_dump = dump or self.on_parse
|
|
self.on_sizeof = sizeof
|
|
self.context = context
|
|
self.bits = bits
|
|
self.on_str = str or repr
|
|
self.on_repr = repr or str
|
|
|
|
def parse(self, context: Context, stream: Stream) -> T:
|
|
size = self.sizeof(context, None)
|
|
if size is None:
|
|
raise ValueError(f'{self.__class__.__name__} needs a sized child')
|
|
|
|
bits = context.get(self.bits)
|
|
if bits:
|
|
raw = stream.read(int(size * 8), bits=True)
|
|
else:
|
|
raw = stream.read(size)
|
|
|
|
ctx = context.get(self.context)
|
|
proc = self.on_parse(raw, context) if ctx else self.on_parse(raw)
|
|
|
|
return super().parse(context, to_stream(proc))
|
|
|
|
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
|
raw_stream = io.BytesIO()
|
|
super().on_dump(context, raw_stream, value)
|
|
|
|
raw = raw_stream.getvalue()
|
|
ctx = context.get(self.context)
|
|
proc = self.on_dump(raw, context) if ctx else self.on_dump(raw)
|
|
|
|
size = self.sizeof(context, value)
|
|
bits = context.get(self.bits)
|
|
if bits:
|
|
stream.write(proc, bits=int(size * 8))
|
|
else:
|
|
stream.write(proc)
|
|
|
|
def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]:
|
|
if value is not None:
|
|
raw_stream = io.BytesIO()
|
|
dump(context.peek(self.child), value, raw_stream)
|
|
value = raw_stream.getvalue()
|
|
ctx = context.peek(self.context)
|
|
if self.on_sizeof:
|
|
size = self.on_sizeof(value, context) if ctx else self.on_sizeof(value)
|
|
if size is None:
|
|
size = super().sizeof(context, start, value)
|
|
return size
|
|
|
|
def __str__(self) -> str:
|
|
if self.on_str is not None:
|
|
return self.on_str
|
|
return f'λ({self.child})'
|
|
|
|
def __repr__(self) -> str:
|
|
if self.on_repr is not None:
|
|
return self.on_repr
|
|
return f'{__name__}.Preproc({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, sizeof={self.on_sizeof!r}, context={self.context!r}, bits={self.bits!r}, str={self.on_str!r}, repr={self.on_repr!r})'
|
|
|
|
class Proc(G[T, V], Type[V]):
|
|
def __init__(self, child: Type[T], parse: U[Callable[[T], V], Callable[[T, Context], V]], dump: U[Callable[[V], T], Callable[[V, Context], T]], context: bool = False, str: O[str] = None, repr: O[str] = None) -> None:
|
|
self.child = child
|
|
self.on_parse = parse
|
|
self.on_dump = dump
|
|
self.context = context
|
|
self.on_str = str or repr
|
|
self.on_repr = repr or str
|
|
|
|
def parse(self, context: Context, stream: Stream) -> V:
|
|
child = to_type(self.child)
|
|
with context.enter(None, child):
|
|
value = context.parse(child, stream)
|
|
return self.on_parse(value, context) if self.context else self.on_parse(value)
|
|
|
|
def dump(self, context: Context, stream: Stream, value: V) -> None:
|
|
value = self.on_dump(value, context) if self.context else self.on_dump(value)
|
|
child = to_type(self.child)
|
|
with context.enter(None, child):
|
|
context.dump(child, stream, value)
|
|
|
|
def sizeof(self, context: Context, start: PosInfo, value: O[V]) -> O[int]:
|
|
if value is not None:
|
|
value = self.on_dump(value, context) if self.context else self.on_dump(value)
|
|
child = to_type(self.child)
|
|
with context.enter(None, child):
|
|
return context.sizeof(child, start, value)
|
|
|
|
def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[V]) -> O[int]:
|
|
if value is not None:
|
|
value = self.on_dump(value, context) if self.context else self.on_dump(value)
|
|
child = to_type(self.child)
|
|
with context.enter(None, child):
|
|
return context.offsetof(child, start, path, value)
|
|
|
|
def default(self, context: Context) -> V:
|
|
child = to_type(self.child)
|
|
with context.enter(None, child):
|
|
value = context.default(child)
|
|
return self.on_parse(value, context) if self.context else self.on_parse(value)
|
|
|
|
def __str__(self) -> str:
|
|
if self.on_str is not None:
|
|
return self.on_str
|
|
return f'λ({self.child})'
|
|
|
|
def __repr__(self) -> str:
|
|
if self.on_repr is not None:
|
|
return self.on_repr
|
|
return f'{__name__}.Proc({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, context={self.context!r}, str={self.on_str!r}, repr={self.on_repr!r})'
|
|
|
|
|
|
class Mapped(G[T, V], Proc[T, V]):
|
|
def __init__(self, child: Type[T], mapping: Mapping[T, V], str: O[str] = None, repr: O[str] = None) -> None:
|
|
reverse = {v: k for k, v in mapping.items()}
|
|
super().__init__(child,
|
|
parse=mapping.__getitem__,
|
|
dump=reverse.__getitem__,
|
|
context=False,
|
|
str=str or f'{mapping}[{child}]',
|
|
repr=repr or f'{__name__}.Mapped({child!r}, {mapping!r})'
|
|
)
|
|
|
|
|
|
E = TypeVar('E', bound=enum.Enum)
|
|
|
|
class Enum(G[T, E], Proc[T, E]):
|
|
def __init__(self, enum: Ty[E], child: Type[T]) -> None:
|
|
super().__init__(child,
|
|
parse=enum,
|
|
dump=lambda x: x.value,
|
|
context=False,
|
|
str=f'{enum.__name__}({child})',
|
|
repr=f'{__name__}.Enum({enum!r}, {child!r})',
|
|
)
|
|
|
|
|
|
class Check(G[T], Wrapper[T]):
|
|
def __init__(self, check: Callable[[T], bool], child: Type[T]) -> None:
|
|
super().__init__(child)
|
|
self.check = check
|
|
|
|
def parse(self, context: Context, stream: Stream) -> T:
|
|
value = super().parse(context, stream)
|
|
check = context.get(self.check)
|
|
if not check(value):
|
|
raise ValueError(f'{value} failed check {check}')
|
|
return value
|
|
|
|
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
|
check = context.get(self.check)
|
|
if not check(value):
|
|
raise ValueError(f'{value} failed check {check}')
|
|
super().dump(context, stream, value)
|
|
|
|
def __repr__(self) -> str:
|
|
return f'{__name__}.Check({self.check!r}, {super().__repr__()})'
|
|
|
|
class Fixed(G[T], Wrapper[T]):
|
|
__slots__ = ('value',)
|
|
|
|
def __init__(self, value: T, child: O[Type[T]] = None):
|
|
if not child:
|
|
if not isinstance(value, bytes):
|
|
raise TypeError(f'no child specified')
|
|
child = Data(len(value))
|
|
super().__init__(child)
|
|
self.value = value
|
|
|
|
def parse(self, context: Context, stream: Stream) -> T:
|
|
expected = context.get(self.value)
|
|
value = super().parse(context, stream)
|
|
if value != expected:
|
|
raise ValueError(f'{value!r} != expected {expected!r}')
|
|
return value
|
|
|
|
def dump(self, context: Context, stream: Stream, value: O[T]) -> None:
|
|
expected = context.get(self.value)
|
|
if value != expected:
|
|
raise ValueError(f'value {value!r} != expected {expected!r}')
|
|
super().dump(context, stream, value)
|
|
|
|
def default(self, context: Context) -> T:
|
|
return self.value
|
|
|
|
def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]:
|
|
return super().sizeof(context, start, value or context.peek(self.value))
|
|
|
|
def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[Pos]:
|
|
return super().offsetof(context, start, path, value or context.peek(self.value))
|
|
|
|
def __str__(self) -> str:
|
|
return str(self.value)
|
|
|
|
def __repr__(self) -> str:
|
|
return f'{__name__}.Fixed({self.value!r}, {super().__repr__()})'
|