Compare commits
31 Commits
4400209545
...
d09fa3146f
Author | SHA1 | Date |
---|---|---|
Shiz | d09fa3146f | |
Shiz | 04fe78c611 | |
Shiz | 398fe27e77 | |
Shiz | b91a01957c | |
Shiz | e96837d429 | |
Shiz | fb78843d6b | |
Shiz | c3a4715141 | |
Shiz | 5afa169647 | |
Shiz | 23bc195d6a | |
Shiz | 4f76bea4b2 | |
Shiz | 3042be6e02 | |
Shiz | 75a4e879a5 | |
Shiz | 9a613c9070 | |
Shiz | f886d9a769 | |
Shiz | 1ce5fac5fc | |
Shiz | f9d9e3497b | |
Shiz | ff4830c25b | |
Shiz | 7a3d57db51 | |
Shiz | b4ce3eb788 | |
Shiz | 05dd3a31e5 | |
Shiz | eb53863305 | |
Shiz | f671b70854 | |
Shiz | c602ad8120 | |
Shiz | 71732b9743 | |
Shiz | 81f338a13f | |
Shiz | 0a45c440c9 | |
Shiz | d1406959eb | |
Shiz | 21641e0b1f | |
Shiz | 8bf9b77505 | |
Shiz | ea61dbf1e3 | |
Shiz | e71cacb8f7 |
|
@ -0,0 +1,76 @@
|
|||
from __future__ import annotations
|
||||
import enum
|
||||
from sx import Generic, Struct, parse
|
||||
|
||||
|
||||
_Chunk = Generic('Chunk')
|
||||
|
||||
class FormatCompression(enum.Enum):
|
||||
Unknown = 0
|
||||
PCM = 1
|
||||
MSADPCM = 2
|
||||
ALaw = 6
|
||||
MuLaw = 7
|
||||
IMAADPCM = 17
|
||||
G723ADPCM = 20
|
||||
GSM610 = 49
|
||||
G721ADPCM = 64
|
||||
MPEG = 80
|
||||
Experimental = 0xFFFF
|
||||
|
||||
class FormatChunk(Struct, partial=True):
|
||||
compression: Enum(FormatCompression, uint16le)
|
||||
channel_count: uint16le
|
||||
sample_rate: uint32le
|
||||
bytes_per_sec: uint32le
|
||||
alignment: uint16le
|
||||
bits_per_sample: uint16le
|
||||
extra_length: uint16le
|
||||
extra: Data(self.extra_length)
|
||||
|
||||
class FactChunk(Struct):
|
||||
sample_count: uint32le
|
||||
|
||||
class SampleLoop(Struct):
|
||||
id: uint32le
|
||||
type: uint32le
|
||||
start: uint32le
|
||||
end: uint32le
|
||||
fraction: uint32le
|
||||
count: uint32le
|
||||
|
||||
class SampleChunk(Struct):
|
||||
manufacturer: uint32le
|
||||
product: uint32le
|
||||
period: uint32le
|
||||
unity_note: uint32le
|
||||
pitch_fraction: uint32le
|
||||
smtpe_format: uint32le
|
||||
smpte_offset: uint32le
|
||||
sample_loop_count: uint32le
|
||||
padding_length: uint32le
|
||||
sample_loops: Arr(SampleLoop, count=self.sample_loop_count)
|
||||
|
||||
class DataChunk(Struct):
|
||||
data: data
|
||||
|
||||
class RIFFChunk(Struct):
|
||||
type: Fixed(b'WAVE')
|
||||
children: Arr(_Chunk)
|
||||
|
||||
class Chunk(Struct):
|
||||
type: Data(4)
|
||||
length: uint32le
|
||||
contents: Sized(Switch({
|
||||
b'fmt ': FormatChunk,
|
||||
b'fact': FactChunk,
|
||||
b'smpl': SampleChunk,
|
||||
b'data': DataChunk,
|
||||
b'RIFF': RIFFChunk,
|
||||
}, fallback=DataChunk, selector=self.type), self.length)
|
||||
|
||||
_Chunk.push(Chunk)
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
print(parse(Chunk, open(sys.argv[1], 'rb')))
|
|
@ -0,0 +1,12 @@
|
|||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name='sx',
|
||||
version='0.1.0',
|
||||
author='Shiz <hi@shiz.me>',
|
||||
url='https://weeaboo.software/Shiz/sx',
|
||||
description='declarative binary data parsing and dumping library',
|
||||
|
||||
packages=['sx', 'sx.core', 'sx.types'],
|
||||
zip_safe=True,
|
||||
)
|
|
@ -1,26 +1,35 @@
|
|||
from .core import parse, dump, sizeof, offsetof, default, context
|
||||
from .core.base import Params, Context, Type, to_type, Error
|
||||
from .core.base import Params, Context, Type, to_type, Error
|
||||
from .core.io import Stream, Segment, to_stream, Endian, BitAlignment
|
||||
from .core.expr import BaseExpr, Expr, const, infer
|
||||
from .core.expr import BaseExpr, Expr, const, infer, bool_, not_, and_, or_, in_, len_, int_, float_
|
||||
from .core.meta import Wrapper, Generic
|
||||
del core
|
||||
|
||||
from .types.data import Nothing, Static, Ignored, Data, data, Bits, bit, nibble
|
||||
from .types.data import Nothing, Implied, Ignored, Pad, Data, data, Bits, bit, nibble
|
||||
from .types.num import (
|
||||
Bool, Int, Float, bool, int8, uint8, byte,
|
||||
Bool, Int, Float, bool8, int8, uint8, byte,
|
||||
int16, uint16, int16be, int16le, uint16be, uint16le, word,
|
||||
int32, uint32, int32be, int32le, uint32be, uint32le, dword,
|
||||
int64, uint64, int64be, int64le, uint64be, uint64le, qword,
|
||||
intbe, intle, uintbe, uintle,
|
||||
float16, float16le, float16be, binary16, binary16le, binary16be, half,
|
||||
float32, float32le, float32be, binary32, binary32le, binary32be, float_,
|
||||
float32, float32le, float32be, binary32, binary32le, binary32be,
|
||||
float64, float64le, float64be, binary64, binary64le, binary64be, double,
|
||||
)
|
||||
from .types.str import Str, StrType
|
||||
from .types.str import Str, CStr, cstr, wcstr, utf8cstr, utf16cstr
|
||||
from .types.seq import Arr, Tuple
|
||||
from .types.struct import StructType, Struct
|
||||
from .types.transforms import Default, Transform, Mapped, Enum, Check, Fixed
|
||||
from .types.struct import StructType, Struct, Union
|
||||
from .types.transforms import Default, Preproc, Proc, Mapped, Enum, Check, Fixed
|
||||
from .types.control import Switch, If
|
||||
from .types.io import Sized, Ref, AlignTo, AlignedTo
|
||||
from .types.io import Sized, Terminated, Ref, AlignTo, AlignedTo, Lazy
|
||||
del types
|
||||
|
||||
# Compatibility
|
||||
Static = Calc = Implied
|
||||
Transform = Proc
|
||||
|
||||
__all__ = [k for k in globals() if not k.startswith('_')]
|
||||
|
||||
# Extra types that shouldn't pollute the namespace they're mass-imported into
|
||||
bool = bool8
|
||||
float = float32
|
||||
|
|
|
@ -7,7 +7,7 @@ from .io import Stream, PossibleStream, Segment, ceil_sizes, to_stream
|
|||
from .expr import ProxyExpr
|
||||
|
||||
|
||||
context = ProxyExpr('<context>')
|
||||
context = ProxyExpr('context')
|
||||
|
||||
@contextmanager
|
||||
def resolving(expr: ProxyExpr, value: Any) -> Generator[ProxyExpr, None, None]:
|
||||
|
|
|
@ -11,27 +11,53 @@ from .util import seeking
|
|||
from .io import Segment, Stream, Pos
|
||||
|
||||
|
||||
class Params:
|
||||
__slots__ = ('segments', 'default_segment', 'user')
|
||||
T = TypeVar('T')
|
||||
|
||||
def __init__(self, segments: Sequence[Segment] = None, user: Mapping[str, Any] = {}):
|
||||
class DebugTreeNode(Generic[T]):
|
||||
__slots__ = ('type', 'pos', 'end', 'value', 'children')
|
||||
|
||||
def __init__(self, type: 'Type[T]', pos: Pos = None) -> None:
|
||||
self.type = type
|
||||
self.pos = pos
|
||||
self.value = None
|
||||
self.end = None
|
||||
self.children = []
|
||||
|
||||
def finalize(self, value: T, end: Pos) -> None:
|
||||
self.value = value
|
||||
self.end = end
|
||||
|
||||
def add_child(self, ident, node: 'DebugTreeNode') -> None:
|
||||
self.children.append((ident, node))
|
||||
|
||||
|
||||
class Params:
|
||||
__slots__ = ('segments', 'default_segment', 'user', 'debug_path', 'debug_root')
|
||||
|
||||
def __init__(self, segments: Sequence[Segment] = None, user: Mapping[str, Any] = {}, debug: bool = False):
|
||||
default = segments[0] if segments else Segment('default')
|
||||
self.segments = {s.name: s for s in (segments or [default, Segment('refs', [default])])}
|
||||
self.default_segment = default
|
||||
self.user = SimpleNamespace(**user)
|
||||
self.debug_path: O[List[DebugTreeNode]] = [] if debug else None
|
||||
self.debug_root: O[DebugTreeNode] = None
|
||||
|
||||
def reset(self):
|
||||
self.debug_root = None
|
||||
self.debug_path = [] if self.debug_path is not None else None
|
||||
for s in self.segments.values():
|
||||
s.reset()
|
||||
|
||||
|
||||
PathElement = U[str, int]
|
||||
PathElement = U[None, str, int]
|
||||
PathEntry = Tuple[PathElement, 'Type']
|
||||
|
||||
def format_path(path: Iterable[PathElement]) -> str:
|
||||
s = ''
|
||||
first = True
|
||||
for p in path:
|
||||
if p is None:
|
||||
continue
|
||||
sep = '.'
|
||||
if isinstance(p, int):
|
||||
p = '[' + str(p) + ']'
|
||||
|
@ -43,10 +69,8 @@ def format_path(path: Iterable[PathElement]) -> str:
|
|||
return s
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
PT = TypeVar('PT')
|
||||
|
||||
|
||||
class PossibleDynamic(Generic[T]):
|
||||
pass
|
||||
|
||||
|
@ -60,6 +84,8 @@ class Context:
|
|||
self.path: List[PathEntry] = []
|
||||
self.segment_path: List[Segment] = []
|
||||
|
||||
self.params.reset()
|
||||
|
||||
def copy(self) -> 'Context':
|
||||
c = self.__class__(root=self.root, value=self.value, params=self.params)
|
||||
c.path = self.path.copy()
|
||||
|
@ -89,13 +115,17 @@ class Context:
|
|||
raise Error(self, ValueError('could not enter segment {}: could not calculate offset'.format(segment)))
|
||||
with seeking(stream.root, pos, reference) as s, stream.wrapped(s) as f:
|
||||
self.segment_path.append(segment)
|
||||
yield f
|
||||
self.segment_path.pop()
|
||||
segment.pos = f.tell()
|
||||
try:
|
||||
yield f
|
||||
segment.pos = f.tell()
|
||||
finally:
|
||||
self.segment_path.pop()
|
||||
else:
|
||||
self.segment_path.append(segment)
|
||||
yield stream
|
||||
self.segment_path.pop()
|
||||
try:
|
||||
yield stream
|
||||
finally:
|
||||
self.segment_path.pop()
|
||||
|
||||
def segment_offset(self, segment: Segment) -> O[Pos]:
|
||||
size: Pos = 0
|
||||
|
@ -110,7 +140,7 @@ class Context:
|
|||
return size
|
||||
|
||||
def segment_size(self, segment: Segment) -> O[Pos]:
|
||||
sizes = self.sizeof(self.root, self.value)
|
||||
sizes = Context(self.root, self.value, params=self.params, reset=False).sizeof(self.root, self.value)
|
||||
return sizes.get(segment, None)
|
||||
|
||||
def format_path(self) -> str:
|
||||
|
@ -125,24 +155,38 @@ class Context:
|
|||
|
||||
def get(self, value: U[T, PossibleDynamic[T]]) -> T:
|
||||
from .expr import Expr, get
|
||||
if isinstance(value, Expr):
|
||||
if isinstance(value, (Expr, FunctionType, tuple)):
|
||||
value = get(value)
|
||||
return cast(T, value)
|
||||
|
||||
def peek(self, value: U[T, PossibleDynamic[T]]) -> O[T]:
|
||||
from .expr import Expr, peek
|
||||
if isinstance(value, Expr):
|
||||
if isinstance(value, (Expr, FunctionType, tuple)):
|
||||
value = peek(value)
|
||||
return cast(T, value)
|
||||
|
||||
def put(self, value: U[T, PossibleDynamic[T]], new: T) -> None:
|
||||
from .expr import Expr, put
|
||||
if isinstance(value, Expr):
|
||||
if isinstance(value, (Expr, FunctionType, tuple)):
|
||||
put(value, new)
|
||||
|
||||
|
||||
def parse(self, type: 'Type[PT]', stream: Stream) -> PT:
|
||||
return type.parse(self, stream)
|
||||
if self.params.debug_path is not None:
|
||||
node = DebugTreeNode(type, stream.root.tell())
|
||||
if self.params.debug_path:
|
||||
self.params.debug_path[-1].add_child(self.path[-1][0], node)
|
||||
else:
|
||||
self.params.debug_root = node
|
||||
self.params.debug_path.append(node)
|
||||
value = None
|
||||
try:
|
||||
value = type.parse(self, stream)
|
||||
return value
|
||||
finally:
|
||||
if self.params.debug_path is not None:
|
||||
node.finalize(value, stream.root.tell())
|
||||
self.params.debug_path.pop()
|
||||
|
||||
def dump(self, type: 'Type[PT]', stream: Stream, value: PT) -> None:
|
||||
return type.dump(self, stream, value)
|
||||
|
|
150
sx/core/expr.py
150
sx/core/expr.py
|
@ -1,6 +1,7 @@
|
|||
import math
|
||||
import operator
|
||||
import functools
|
||||
import types
|
||||
from typing import Any, Optional as O, Union as U, Sequence, Mapping, Callable, Generic as G, TypeVar, List
|
||||
|
||||
|
||||
|
@ -51,6 +52,8 @@ reverse = {
|
|||
}
|
||||
|
||||
T = TypeVar('T')
|
||||
V = TypeVar('V')
|
||||
X = TypeVar('X')
|
||||
|
||||
|
||||
class BaseExpr(G[T]):
|
||||
|
@ -66,6 +69,14 @@ class BaseExpr(G[T]):
|
|||
def _sx_is_const_(self) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
def raise_type_error(self, *y, op=None, alternatives=[]):
|
||||
if not alternatives:
|
||||
alternatives = [op + '_']
|
||||
alternatives = [x + (f'({self}, {y[0]})' if y else f'({self})') for x in alternatives]
|
||||
msg = f'bad operator __{op}__ called on Expr, try {" / ".join(alternatives)}; or use lambdas for calling standard functions!'
|
||||
raise TypeError(msg)
|
||||
|
||||
class Expr(G[T], BaseExpr[T]):
|
||||
def __getattr__(self, name: str) -> 'AttrExpr':
|
||||
return AttrExpr(self, name)
|
||||
|
@ -76,30 +87,31 @@ class Expr(G[T], BaseExpr[T]):
|
|||
def __call__(self, *args: Any, **kwargs: Any) -> 'CallExpr':
|
||||
return CallExpr(self, args, kwargs)
|
||||
|
||||
for x, y in {'bool': ['bool_', 'not_', 'and_', 'or_', 'in_'], 'contains': ['in_'], 'int': [], 'float': [], 'len': []}.items():
|
||||
locals()['__' + x + '__'] = functools.partialmethod(raise_type_error, op=x, alternatives=y)
|
||||
for x in ('lt', 'le', 'eq', 'ne', 'ge', 'gt'):
|
||||
locals()['__' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x, other: CompExpr(getattr(operator, x), self, other), x)
|
||||
for x in ('not_', 'truth', 'abs', 'index', 'inv', 'neg', 'pos'):
|
||||
for x in ('abs', 'invert', 'neg', 'pos'):
|
||||
locals()['__' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x: UnaryExpr(getattr(operator, x), self), x)
|
||||
for x in (
|
||||
'add', 'and_', 'floordiv', 'lshift', 'mod', 'mul', 'matmul', 'or_', 'pow', 'rshift', 'sub', 'truediv', 'xor',
|
||||
'concat', 'contains',
|
||||
):
|
||||
locals()[ '__' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x, other: BinExpr(getattr(operator, x), self, other), x)
|
||||
locals()['__r' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x, other: BinExpr(getattr(operator, x), other, self), x)
|
||||
del x
|
||||
del x, y
|
||||
|
||||
class AttrExpr(G[T], Expr[T]):
|
||||
def __init__(self, parent: BaseExpr, attr: str) -> None:
|
||||
class AttrExpr(G[T, V], Expr[V]):
|
||||
def __init__(self, parent: BaseExpr[T], attr: str) -> None:
|
||||
self.__parent = parent
|
||||
self.__attr = attr
|
||||
|
||||
def _sx_get_(self, pop: bool = True) -> T:
|
||||
def _sx_get_(self, pop: bool = True) -> V:
|
||||
return getattr(get(self.__parent, pop=pop), get(self.__attr, pop=pop))
|
||||
|
||||
def _sx_peek_(self, pop: bool = True) -> T:
|
||||
def _sx_peek_(self, pop: bool = True) -> V:
|
||||
return getattr(peek(self.__parent, pop=pop), peek(self.__attr, pop=pop))
|
||||
|
||||
def _sx_put_(self, value: T, pop: bool = True) -> None:
|
||||
def _sx_put_(self, value: V, pop: bool = True) -> None:
|
||||
parent = get(self.__parent, pop=False)
|
||||
setattr(parent, get(self.__attr, pop=pop), value)
|
||||
put(self.__parent, parent, pop=pop)
|
||||
|
@ -113,44 +125,60 @@ class AttrExpr(G[T], Expr[T]):
|
|||
def __repr__(self) -> str:
|
||||
return f'{self.__parent!r}.{self.__attr}'
|
||||
|
||||
class ItemExpr(G[T], Expr[T]):
|
||||
def __init__(self, parent: BaseExpr, item: Any) -> None:
|
||||
class ItemExpr(G[T, V], Expr[V]):
|
||||
def __init__(self, parent: BaseExpr[T], item: Any) -> None:
|
||||
self.__parent = parent
|
||||
self.__item = item
|
||||
|
||||
def _sx_get_(self, pop: bool = True) -> T:
|
||||
return get(self.__parent, pop=pop)[get(self.__item, pop=pop)]
|
||||
def _sx_get_(self, pop: bool = True) -> V:
|
||||
if isinstance(self.__item, slice):
|
||||
item = slice(get(self.__item.start, pop=pop), get(self.__item.stop, pop=pop), get(self.__item.step, pop=pop))
|
||||
else:
|
||||
item = get(self.__item, pop=pop)
|
||||
return get(self.__parent, pop=pop)[item]
|
||||
|
||||
def _sx_peek_(self, pop: bool = True) -> T:
|
||||
return peek(self.__parent, pop=pop)[peek(self.__item, pop=pop)]
|
||||
def _sx_peek_(self, pop: bool = True) -> V:
|
||||
if isinstance(self.__item, slice):
|
||||
item = slice(peek(self.__item.start, pop=pop), peek(self.__item.stop, pop=pop), peek(self.__item.step, pop=pop))
|
||||
else:
|
||||
item = peek(self.__item, pop=pop)
|
||||
return peek(self.__parent, pop=pop)[item]
|
||||
|
||||
def _sx_put_(self, value: T, pop: bool = True) -> None:
|
||||
def _sx_put_(self, value: V, pop: bool = True) -> None:
|
||||
parent = get(self.__parent, pop=False)
|
||||
parent[get(self.__item, pop=pop)] = value
|
||||
if isinstance(self.__item, slice):
|
||||
item = slice(get(self.__item.start, pop=pop), get(self.__item.stop, pop=pop), get(self.__item.step, pop=pop))
|
||||
else:
|
||||
item = get(self.__item, pop=pop)
|
||||
parent[item] = value
|
||||
put(self.__parent, parent, pop=pop)
|
||||
|
||||
def _sx_is_const_(self) -> bool:
|
||||
return is_const(self.__parent) and is_const(self.__item)
|
||||
if isinstance(self.__item, slice):
|
||||
item_const = is_const(self.__item.start) and is_const(self.__item.stop) and is_const(self.__item.step)
|
||||
else:
|
||||
item_const = is_const(self.__item)
|
||||
return is_const(self.__parent) and item_const
|
||||
|
||||
def __repr__(self) -> str:
|
||||
def __str__(self) -> str:
|
||||
return f'{self.__parent}[{self.__item}]'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{self.__parent!r}[{self.__item!r}]'
|
||||
|
||||
class CallExpr(G[T], Expr[T]):
|
||||
def __init__(self, parent: BaseExpr, args: Sequence[Any], kwargs: Mapping[str, Any]) -> None:
|
||||
class CallExpr(G[T, V], Expr[V]):
|
||||
def __init__(self, parent: BaseExpr[T], args: Sequence[Any], kwargs: Mapping[str, Any]) -> None:
|
||||
self.__parent = parent
|
||||
self.__args = args
|
||||
self.__kwargs = kwargs
|
||||
|
||||
def _sx_get_(self, pop: bool = True) -> T:
|
||||
def _sx_get_(self, pop: bool = True) -> V:
|
||||
return get(self.__parent, pop=pop)(*(get(a, pop=pop) for a in self.__args), **{k: get(v, pop=pop) for k, v in self.__kwargs.items()})
|
||||
|
||||
def _sx_peek_(self, pop: bool = True) -> T:
|
||||
def _sx_peek_(self, pop: bool = True) -> V:
|
||||
return peek(self.__parent, pop=pop)(*(peek(a, pop=pop) for a in self.__args), **{k: peek(v, pop=pop) for k, v in self.__kwargs.items()})
|
||||
|
||||
def _sx_put_(self, value: T, pop: bool = True) -> None:
|
||||
def _sx_put_(self, value: V, pop: bool = True) -> None:
|
||||
raise NotImplementedError(f'{self.__class__.__name__} is not invertible')
|
||||
|
||||
def _sx_is_const_(self) -> bool:
|
||||
|
@ -169,18 +197,18 @@ class CallExpr(G[T], Expr[T]):
|
|||
return f'{self.__parent!r}({a})'
|
||||
|
||||
|
||||
class UnaryExpr(G[T], Expr[T]):
|
||||
def __init__(self, op: Callable[[Any], T], value: BaseExpr) -> None:
|
||||
class UnaryExpr(G[T, V], Expr[V]):
|
||||
def __init__(self, op: Callable[[T], V], value: BaseExpr[T]) -> None:
|
||||
self.__op = op
|
||||
self.__value = value
|
||||
|
||||
def _sx_get_(self, pop: bool = True) -> T:
|
||||
def _sx_get_(self, pop: bool = True) -> V:
|
||||
return self.__op(get(self.__value, pop=pop))
|
||||
|
||||
def _sx_peek_(self, pop: bool = True) -> T:
|
||||
def _sx_peek_(self, pop: bool = True) -> V:
|
||||
return self.__op(peek(self.__value, pop=pop))
|
||||
|
||||
def _sx_put_(self, value: T, pop: bool = True) -> None:
|
||||
def _sx_put_(self, value: V, pop: bool = True) -> None:
|
||||
if self.__op not in reverse:
|
||||
raise NotImplementedError(f'{self.__class__.__name__} {symbols[self.__op]!r} is not invertible')
|
||||
put(self.__value, reverse[self.__op](value), pop=pop)
|
||||
|
@ -194,19 +222,19 @@ class UnaryExpr(G[T], Expr[T]):
|
|||
def __repr__(self) -> str:
|
||||
return f'({symbols[self.__op]}{self.__value!r})'
|
||||
|
||||
class BinExpr(G[T], Expr[T]):
|
||||
def __init__(self, op: Callable[[Any, Any], T], left: BaseExpr, right: BaseExpr) -> None:
|
||||
class BinExpr(G[T, V, X], Expr[X]):
|
||||
def __init__(self, op: Callable[[T, V], X], left: BaseExpr[T], right: BaseExpr[V]) -> None:
|
||||
self.__op = op
|
||||
self.__left = left
|
||||
self.__right = right
|
||||
|
||||
def _sx_get_(self, pop: bool = True) -> T:
|
||||
def _sx_get_(self, pop: bool = True) -> X:
|
||||
return self.__op(get(self.__left, pop=pop), get(self.__right, pop=pop))
|
||||
|
||||
def _sx_peek_(self, pop: bool = True) -> T:
|
||||
def _sx_peek_(self, pop: bool = True) -> X:
|
||||
return self.__op(peek(self.__left, pop=pop), peek(self.__right, pop=pop))
|
||||
|
||||
def _sx_put_(self, value: T, pop: bool = True) -> None:
|
||||
def _sx_put_(self, value: X, pop: bool = True) -> None:
|
||||
if is_const(self.__left):
|
||||
operand = self.__left
|
||||
target = self.__right
|
||||
|
@ -233,8 +261,8 @@ class BinExpr(G[T], Expr[T]):
|
|||
def __repr__(self) -> str:
|
||||
return f'({self.__left!r} {symbols[self.__op]} {self.__right!r})'
|
||||
|
||||
class CompExpr(Expr[bool]):
|
||||
def __init__(self, op: Callable[[Any, Any], bool], left: BaseExpr, right: BaseExpr) -> None:
|
||||
class CompExpr(G[T, V], Expr[bool]):
|
||||
def __init__(self, op: Callable[[T, V], bool], left: BaseExpr[T], right: BaseExpr[V]) -> None:
|
||||
self.__op = op
|
||||
self.__left = left
|
||||
self.__right = right
|
||||
|
@ -274,8 +302,9 @@ class CompExpr(Expr[bool]):
|
|||
return f'({self.__left!r} {symbols[self.__op]} {self.__right!r})'
|
||||
|
||||
class ProxyExpr(G[T], Expr[T]):
|
||||
def __init__(self, name: str) -> None:
|
||||
def __init__(self, name: str, parent: O[str] = None) -> None:
|
||||
self.__name = name
|
||||
self.__parent = parent + '.' if parent else ''
|
||||
self.__stack: List[BaseExpr[T]] = []
|
||||
|
||||
def _sx_push_(self, value: BaseExpr[T]) -> None:
|
||||
|
@ -297,10 +326,10 @@ class ProxyExpr(G[T], Expr[T]):
|
|||
return is_const(self.__stack[-1])
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'${self.__name}'
|
||||
return f'{self.__parent}{self.__name}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'${self.__name}(=> {self.__stack!r})'
|
||||
return f'{self.__parent}{self.__name}(=> {self.__stack!r})'
|
||||
|
||||
class ConstChangeExpr(G[T], Expr[T]):
|
||||
def __init__(self, child: BaseExpr[T], const: bool = True) -> None:
|
||||
|
@ -326,23 +355,68 @@ class ConstChangeExpr(G[T], Expr[T]):
|
|||
return f'infer({self.__child})'
|
||||
|
||||
|
||||
def bool_(x: BaseExpr[T]) -> UnaryExpr[T, bool]:
|
||||
return UnaryExpr(bool, x)
|
||||
|
||||
def not_(x: BaseExpr[T]) -> UnaryExpr[T, bool]:
|
||||
return UnaryExpr(lambda a: not a, x)
|
||||
|
||||
def and_(x: BaseExpr[T], y: BaseExpr[V]) -> BinExpr[T, V, bool]:
|
||||
return BinExpr(lambda a, b: a and b, x, y)
|
||||
|
||||
def or_(x: BaseExpr[T], y: BaseExpr[V]) -> BinExpr[T, V, bool]:
|
||||
return BinExpr(lambda a, b: a or b, x, y)
|
||||
|
||||
def in_(x: BaseExpr[T], y: BaseExpr[V]) -> BinExpr[T, V, bool]:
|
||||
return BinExpr(lambda a, b: b in a, x, y)
|
||||
|
||||
def len_(x: BaseExpr[T]) -> UnaryExpr[T, int]:
|
||||
return UnaryExpr(len, x)
|
||||
|
||||
def int_(x: BaseExpr[T]) -> UnaryExpr[T, int]:
|
||||
return UnaryExpr(int, x)
|
||||
|
||||
def float_(x: BaseExpr[T]) -> UnaryExpr[T, float]:
|
||||
return UnaryExpr(float, x)
|
||||
|
||||
|
||||
def get(expr: U[T, BaseExpr[T]], pop: bool = True) -> T:
|
||||
if isinstance(expr, BaseExpr):
|
||||
return expr._sx_get_(pop=pop)
|
||||
#if isinstance(expr, types.FunctionType):
|
||||
# return expr(pop)
|
||||
if isinstance(expr, tuple) and len(expr) >= 2:
|
||||
_get, _put = expr[:2]
|
||||
return _get(pop)
|
||||
return expr
|
||||
|
||||
def peek(expr: U[T, BaseExpr[T]], pop: bool = True) -> T:
|
||||
if isinstance(expr, BaseExpr):
|
||||
return expr._sx_peek_(pop=pop)
|
||||
if isinstance(expr, types.FunctionType):
|
||||
return expr(pop)
|
||||
if isinstance(expr, tuple) and len(expr) >= 2:
|
||||
if len(expr) == 2:
|
||||
_peek = expr[0]
|
||||
else:
|
||||
_get, _put, _peek = expr[:3]
|
||||
return _peek(pop)
|
||||
return expr
|
||||
|
||||
def put(expr: U[T, BaseExpr[T]], value: T, pop: bool = True) -> None:
|
||||
if isinstance(expr, BaseExpr):
|
||||
expr._sx_put_(value, pop=pop)
|
||||
if isinstance(expr, tuple) and len(expr) >= 2:
|
||||
_get, _put, _peek = expr[:2]
|
||||
return _put(value, pop)
|
||||
|
||||
def is_const(expr: U[T, BaseExpr[T]]) -> bool:
|
||||
if isinstance(expr, BaseExpr):
|
||||
return expr._sx_is_const_()
|
||||
if isinstance(expr, types.FunctionType):
|
||||
return False
|
||||
if isinstance(expr, tuple) and isinstance(expr[0], types.FunctionType):
|
||||
return False
|
||||
return True
|
||||
|
||||
def const(expr: U[T, BaseExpr[T]]) -> ConstChangeExpr[T]:
|
||||
|
|
|
@ -13,19 +13,29 @@ class Wrapper(G[T], Type[T]):
|
|||
self.child = child
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
return context.parse(to_type(self.child), stream)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
return context.parse(child, stream)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: O[T]) -> None:
|
||||
context.dump(to_type(self.child), stream, value)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
context.dump(child, stream, value)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[int]:
|
||||
return context.sizeof(to_type(self.child), value)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
return context.sizeof(child, value)
|
||||
|
||||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]:
|
||||
return context.offsetof(to_type(self.child), path, value)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
return context.offsetof(child, path, value)
|
||||
|
||||
def default(self, context: Context) -> T:
|
||||
return context.default(to_type(self.child))
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
return context.default(child)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return str(to_type(self.child))
|
||||
|
@ -64,27 +74,37 @@ class Generic(G[T], Type[T], BaseExpr[T]):
|
|||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
if not self.stack:
|
||||
raise Error(context, 'unresolved generic')
|
||||
return context.parse(to_type(self.stack[-1]), stream)
|
||||
child = to_type(self.stack[-1])
|
||||
with context.enter(None, child):
|
||||
return context.parse(child, stream)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
||||
if not self.stack:
|
||||
raise Error(context, 'unresolved generic')
|
||||
context.dump(to_type(self.stack[-1]), stream, value)
|
||||
child = to_type(self.stack[-1])
|
||||
with context.enter(None, child):
|
||||
context.dump(child, stream, value)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[int]:
|
||||
if not self.stack:
|
||||
return None
|
||||
return context.sizeof(to_type(self.stack[-1]), value)
|
||||
child = to_type(self.stack[-1])
|
||||
with context.enter(None, child):
|
||||
return context.sizeof(child, value)
|
||||
|
||||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]:
|
||||
if not self.stack:
|
||||
return None
|
||||
return context.offsetof(to_type(self.stack[-1]), path, value)
|
||||
child = to_type(self.stack[-1])
|
||||
with context.enter(None, child):
|
||||
return context.offsetof(child, path, value)
|
||||
|
||||
def default(self, context: Context) -> T:
|
||||
if not self.stack:
|
||||
raise Error(context, 'unresolved generic')
|
||||
return context.default(to_type(self.stack[-1]))
|
||||
child = to_type(self.stack[-1])
|
||||
with context.enter(None, child):
|
||||
return context.default(child)
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.stack:
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import os
|
||||
import inspect
|
||||
import ast
|
||||
import collections
|
||||
from typing import BinaryIO, Generator, Callable, Union as U, Tuple, Mapping, Any, cast
|
||||
import collections.abc
|
||||
from typing import BinaryIO, Generator, Callable, Optional as O, Union as U, Tuple, Mapping, Any, cast
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
|
@ -38,12 +38,17 @@ def indent(s: str, count: int, start: bool = False) -> str:
|
|||
lines[i] = ' ' * count + lines[i]
|
||||
return '\n'.join(lines)
|
||||
|
||||
def format_bytes(bs: bytes) -> str:
|
||||
return '[' + ' '.join(hex(b)[2:].zfill(2) for b in bs) + ']'
|
||||
def format_bytes(bs: bytes, cutoff=256) -> str:
|
||||
if len(bs) > cutoff:
|
||||
trailer = ' ...'
|
||||
bs = bs[:cutoff]
|
||||
else:
|
||||
trailer = ''
|
||||
return '[' + ' '.join(hex(b)[2:].zfill(2) for b in bs) + trailer + ']'
|
||||
|
||||
def format_value(value: Any, formatter: Callable[[Any], str], indentation: int = 0) -> str:
|
||||
""" Format containers to use the given formatter function instead of always repr(). """
|
||||
if isinstance(value, (dict, collections.Mapping)):
|
||||
if isinstance(value, (dict, collections.abc.Mapping)):
|
||||
if value:
|
||||
fmt = '{{\n{}\n}}'
|
||||
values = [indent(',\n'.join('{}: {}'.format(
|
||||
|
@ -93,3 +98,15 @@ def get_annot_locations(cls: type) -> Tuple[str, Mapping[str, int]]:
|
|||
lines[t.id] = start + b.lineno - 2
|
||||
|
||||
return fn, lines
|
||||
|
||||
|
||||
def find_overlap(haystack: bytes, needle: bytes, start: int = 0) -> O[int]:
|
||||
""" Find occurrence of `needle` in `haystack` or start of `needle` at the end of `haystack` """
|
||||
if needle in haystack[start:]:
|
||||
return haystack.index(needle, start)
|
||||
n = len(needle) - 1
|
||||
while n > 0:
|
||||
if haystack[-n:] == needle[:n]:
|
||||
return len(haystack) - n
|
||||
n -= 1
|
||||
return None
|
||||
|
|
|
@ -25,27 +25,39 @@ class Switch(G[T, V], Type[T]):
|
|||
selector = context.peek(self.default_key) if peek else context.get(self.default_key)
|
||||
else:
|
||||
return to_type(default)
|
||||
if selector not in options and default:
|
||||
return to_type(default)
|
||||
if selector not in options:
|
||||
if default:
|
||||
return to_type(default)
|
||||
raise ValueError(f'selector {selector} not any in options {", ".join(str(o) for o in options)}')
|
||||
return to_type(options[selector])
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
return context.parse(self.get_value(context), stream)
|
||||
child = self.get_value(context)
|
||||
with context.enter(None, child):
|
||||
return context.parse(child, stream)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
||||
context.dump(self.get_value(context), stream, value)
|
||||
child = self.get_value(context)
|
||||
with context.enter(None, child):
|
||||
context.dump(child, stream, value)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
|
||||
return context.sizeof(self.get_value(context, peek=True), value)
|
||||
child = self.get_value(context, peek=True)
|
||||
with context.enter(None, child):
|
||||
return context.sizeof(child, value)
|
||||
|
||||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]:
|
||||
return context.offsetof(self.get_value(context, peek=True), path, value)
|
||||
child = self.get_value(context, peek=True)
|
||||
with context.enter(None, child):
|
||||
return context.offsetof(child, path, value)
|
||||
|
||||
def default(self, context: Context) -> T:
|
||||
return context.default(self.get_value(context, peek=True))
|
||||
child = self.get_value(context, peek=True)
|
||||
with context.enter(None, child):
|
||||
return context.default(ChildProcessError)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'{format_value(self.options)}[{self.selector}]'
|
||||
return f'{format_value(self.options, str)}[{self.selector}]'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Switch({self.options!r}, selector={self.selector!r}, default={self.default_key!r}, fallback={self.default_val!r})'
|
||||
|
@ -58,33 +70,43 @@ class If(G[T,V], Type[U[T, V]]):
|
|||
|
||||
def parse(self, context: Context, stream: Stream) -> U[T, V]:
|
||||
if context.get(self.cond):
|
||||
return context.parse(to_type(self.true), stream)
|
||||
child = to_type(self.true)
|
||||
else:
|
||||
return context.parse(to_type(self.false), stream)
|
||||
child = to_type(self.false)
|
||||
with context.enter(None, child):
|
||||
return context.parse(child, stream)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: U[T, V]) -> None:
|
||||
if context.get(self.cond):
|
||||
context.dump(to_type(self.true), stream, value)
|
||||
child = to_type(self.true)
|
||||
else:
|
||||
context.dump(to_type(self.false), stream, value)
|
||||
child = to_type(self.false)
|
||||
with context.enter(None, child):
|
||||
return context.dump(child, stream, value)
|
||||
|
||||
def sizeof(self, context: Context, value: O[U[T, V]]) -> O[Pos]:
|
||||
if context.peek(self.cond):
|
||||
return context.sizeof(to_type(self.true), value)
|
||||
child = to_type(self.true)
|
||||
else:
|
||||
return context.sizeof(to_type(self.false), value)
|
||||
child = to_type(self.false)
|
||||
with context.enter(None, child):
|
||||
return context.sizeof(child, value)
|
||||
|
||||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[U[T, V]]) -> O[Pos]:
|
||||
if context.peek(self.cond):
|
||||
return context.offsetof(to_type(self.true), path, value)
|
||||
child = to_type(self.true)
|
||||
else:
|
||||
return context.offsetof(to_type(self.false), path, value)
|
||||
child = to_type(self.false)
|
||||
with context.enter(None, child):
|
||||
return context.offsetof(child, path, value)
|
||||
|
||||
def default(self, context: Context) -> U[T, V]:
|
||||
if context.peek(self.cond):
|
||||
return context.default(to_type(self.true))
|
||||
child = to_type(self.true)
|
||||
else:
|
||||
return context.default(to_type(self.false))
|
||||
child = to_type(self.false)
|
||||
with context.enter(None, child):
|
||||
return context.default(self.child)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'({self.cond} ? {self.true} : {self.false})'
|
||||
|
|
125
sx/types/data.py
125
sx/types/data.py
|
@ -5,6 +5,8 @@ from ..core.meta import Wrapper
|
|||
from ..core.expr import BaseExpr
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
class Nothing(Type[None]):
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
@ -27,7 +29,81 @@ class Nothing(Type[None]):
|
|||
def __repr__(self) -> str:
|
||||
return '{__name__}.Nothing()'
|
||||
|
||||
class Implied(G[T], Type[T]):
|
||||
""" Parse/dump nothing, yield value. """
|
||||
__slots__ = ('value',)
|
||||
|
||||
def __init__(self, value: U[BaseExpr[T], T]) -> None:
|
||||
self.value = value
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
return context.get(self.value)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
||||
context.put(self.value, value)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
|
||||
return 0
|
||||
|
||||
def default(self, context: Context) -> T:
|
||||
return context.peek(self.value)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'={self.value}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Static({self.value!r})'
|
||||
|
||||
class Ignored(G[T], Wrapper[T]):
|
||||
""" Parse/dump something, yield nothing. """
|
||||
def __init__(self, child: Type[T]) -> None:
|
||||
super().__init__(child)
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> None:
|
||||
super().parse(context, stream)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: None) -> None:
|
||||
super().dump(context, stream, super().default(context))
|
||||
|
||||
def default(self, context: Context) -> None:
|
||||
return None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'(void){super().__str__()}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Ignored({super().__repr__()}'
|
||||
|
||||
class Pad(Type[None]):
|
||||
""" Seek something, yield nothing. """
|
||||
__slots__ = ('amount', 'value')
|
||||
|
||||
def __init__(self, amount=0, value=b'\x00'):
|
||||
self.amount = amount
|
||||
self.value = value
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> None:
|
||||
stream.seek(context.get(self.amount), os.SEEK_CUR)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: None) -> None:
|
||||
value = stretch(context.get(self.value), context.get(self.amount))
|
||||
stream.write(value)
|
||||
|
||||
def sizeof(self, context: Context) -> O[Pos]:
|
||||
return context.peek(self.amount)
|
||||
|
||||
def default(self, context: Context) -> None:
|
||||
return None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'[padding: {self.amount}]'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Pad({self.amount!r}, value={self.value!r})'
|
||||
|
||||
|
||||
class Data(Type[bytes]):
|
||||
""" Parse/dump and yield bytes. """
|
||||
__slots__ = ('size',)
|
||||
|
||||
def __init__(self, size: U[D, O[int]] = None) -> None:
|
||||
|
@ -41,6 +117,7 @@ class Data(Type[bytes]):
|
|||
|
||||
def dump(self, context: Context, stream: Stream, value: bytes) -> None:
|
||||
stream.write(value)
|
||||
context.put(self.size, len(value))
|
||||
|
||||
def default(self, context: Context) -> bytes:
|
||||
size = context.peek(self.size)
|
||||
|
@ -49,6 +126,8 @@ class Data(Type[bytes]):
|
|||
return bytes(size)
|
||||
|
||||
def sizeof(self, context: Context, value: O[bytes]) -> O[Pos]:
|
||||
if value is not None:
|
||||
return len(value)
|
||||
return context.peek(self.size)
|
||||
|
||||
def __str__(self) -> str:
|
||||
|
@ -59,8 +138,8 @@ class Data(Type[bytes]):
|
|||
|
||||
data = Data()
|
||||
|
||||
|
||||
class Bits(Type[int]):
|
||||
""" Parse/dump and yield bits. """
|
||||
__slots__ = ('amount',)
|
||||
|
||||
def __init__(self, amount: U[D, int] = 0) -> None:
|
||||
|
@ -90,47 +169,3 @@ class Bits(Type[int]):
|
|||
|
||||
bit = Bits(1)
|
||||
nibble = Bits(4)
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
class Static(G[T], Type[T]):
|
||||
def __init__(self, value: U[BaseExpr[T], T]) -> None:
|
||||
self.value = value
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
return context.get(self.value)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
||||
context.put(self.value, value)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
|
||||
return 0
|
||||
|
||||
def default(self, context: Context) -> T:
|
||||
return context.peek(self.value)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'={self.value}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Static({self.value!r})'
|
||||
|
||||
class Ignored(G[T], Wrapper[T]):
|
||||
def __init__(self, child: Type[T]) -> None:
|
||||
super().__init__(child)
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> None:
|
||||
super().parse(context, stream)
|
||||
|
||||
def emit(self, context: Context, stream: Stream, value: None) -> None:
|
||||
super().emit(context, stream, super().default(context))
|
||||
|
||||
def default(self, context: Context) -> None:
|
||||
return None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'(void){super().__str__()}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Ignored({super().__repr__()}'
|
||||
|
|
243
sx/types/io.py
243
sx/types/io.py
|
@ -1,10 +1,10 @@
|
|||
from typing import Any, Generic as G, TypeVar, Union as U, Optional as O, Sequence
|
||||
import os
|
||||
import errno
|
||||
from ..core.base import Type, Context, PossibleDynamic, PathElement
|
||||
from ..core.io import Stream, Segment, Pos
|
||||
from ..core.base import Type, Context, PossibleDynamic, PathElement, to_type
|
||||
from ..core.io import Stream, Segment, Pos, add_sizes
|
||||
from ..core.meta import Wrapper
|
||||
from ..core.util import stretch, seeking
|
||||
from ..core.util import stretch, seeking, find_overlap
|
||||
|
||||
|
||||
class SizedStream:
|
||||
|
@ -61,9 +61,10 @@ class SizedStream:
|
|||
T = TypeVar('T')
|
||||
|
||||
class Sized(G[T], Wrapper[T]):
|
||||
def __init__(self, child: Type[T], limit: U[Pos, PossibleDynamic]):
|
||||
def __init__(self, child: Type[T], limit: U[Pos, PossibleDynamic], hard=False) -> None:
|
||||
super().__init__(child)
|
||||
self.limit = limit
|
||||
self.hard = hard
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
limit = max(0, context.get(self.limit))
|
||||
|
@ -72,14 +73,154 @@ class Sized(G[T], Wrapper[T]):
|
|||
stream.seek(start + limit, os.SEEK_SET)
|
||||
return value
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: O[T]) -> None:
|
||||
limit = max(0, context.get(self.limit))
|
||||
start = stream.tell()
|
||||
super().dump(context, SizedStream(stream, limit), value)
|
||||
stream.seek(start + limit, os.SEEK_SET)
|
||||
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
||||
hard = context.get(self.hard)
|
||||
if hard:
|
||||
limit = max(0, context.get(self.limit))
|
||||
start = stream.tell()
|
||||
super().dump(context, SizedStream(stream, limit), value)
|
||||
stream.seek(start + limit, os.SEEK_SET)
|
||||
else:
|
||||
start = stream.tell()
|
||||
super().dump(context, stream, value)
|
||||
size = stream.tell() - start
|
||||
context.put(self.limit, size)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
|
||||
return context.peek(self.limit)
|
||||
hard = context.peek(self.hard)
|
||||
if hard:
|
||||
return context.peek(self.limit)
|
||||
else:
|
||||
return super().sizeof(context, value)
|
||||
|
||||
|
||||
class TerminatedStream:
|
||||
def __init__(self, stream: Stream, terminator: bytes, included: bool, blocksize: int = 8192) -> None:
|
||||
self._stream = stream
|
||||
self._terminator = terminator
|
||||
self._included = included
|
||||
self._end_pos = None
|
||||
self._blocksize = blocksize
|
||||
|
||||
def read(self, n: int = -1, *, bits: bool = False) -> U[bytes, int]:
|
||||
if bits:
|
||||
raise ValueError('terminated streams can not use bit-level reads')
|
||||
if self._end_pos is not None:
|
||||
if self._stream.tell() >= self._end_pos:
|
||||
raise EOFError
|
||||
remaining = self._end_pos - self._stream.tell()
|
||||
if n < 0:
|
||||
n = remaining
|
||||
else:
|
||||
n = min(n, remaining)
|
||||
|
||||
value = b''
|
||||
while n < 0 or len(value) < n:
|
||||
try:
|
||||
p = self._stream.tell()
|
||||
v = self._stream.read(n if n >= 0 else self._blocksize)
|
||||
except EOFError:
|
||||
if n < 0:
|
||||
self._stream.seek(p, os.SEEK_SET)
|
||||
v = self._stream.read(-1)
|
||||
else:
|
||||
raise
|
||||
if not v:
|
||||
break
|
||||
value += v
|
||||
|
||||
# find full terminator or start of terminator at the end
|
||||
if self._end_pos is None:
|
||||
termpos = find_overlap(value, self._terminator, len(value) - len(v))
|
||||
if termpos is not None:
|
||||
# need to read more data?
|
||||
termrem = len(self._terminator) - (len(value) - termpos)
|
||||
|
||||
if termrem > 0:
|
||||
p = self._stream.tell()
|
||||
try:
|
||||
value += self._stream.read(termrem)
|
||||
except EOFError:
|
||||
pass
|
||||
if value.endswith(self._terminator):
|
||||
termrem = 0
|
||||
else:
|
||||
self._stream.seek(p, os.SEEK_SET)
|
||||
|
||||
if termrem <= 0:
|
||||
# terminator found, reset overread data
|
||||
self._stream.seek(-(len(value) - (termpos + len(self._terminator))), os.SEEK_CUR)
|
||||
self._end_pos = self._stream.tell() - len(self._terminator)
|
||||
if self._included:
|
||||
termpos += len(self._terminator)
|
||||
self._end_pos += len(self._terminator)
|
||||
value = value[:termpos]
|
||||
break
|
||||
|
||||
if n > 0 and len(value) != n:
|
||||
raise EOFError
|
||||
|
||||
return value
|
||||
|
||||
def seek(self, pos: U[int, float], whence: int = os.SEEK_SET) -> None:
|
||||
if self._end_pos is not None:
|
||||
if whence == os.SEEK_SET:
|
||||
pos = min(pos, self._end_pos)
|
||||
elif whence == os.SEEK_CUR:
|
||||
pos = min(pos + self._stream.tell(), self._end_pos)
|
||||
elif whence == os.SEEK_END:
|
||||
pos += self._end_pos
|
||||
whence = os.SEEK_SET
|
||||
return self._stream.seek(pos, whence)
|
||||
|
||||
def tell(self) -> U[int, float]:
|
||||
pos = self._stream.tell()
|
||||
if self._end_pos is not None:
|
||||
pos = min(pos, self._end_pos)
|
||||
return pos
|
||||
|
||||
def __getattr__(self, attr: str) -> Any:
|
||||
return getattr(self._stream, attr)
|
||||
|
||||
class Terminated(G[T], Wrapper[T]):
|
||||
def __init__(self, child: Type[T], terminator: U[bytes, PossibleDynamic], required: U[bool, PossibleDynamic] = True, included: U[bool, PossibleDynamic] = False, blocksize: int = 8192) -> None:
|
||||
super().__init__(child)
|
||||
self.terminator = terminator
|
||||
self.required = required
|
||||
self.included = included
|
||||
self.blocksize = blocksize
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
terminator = context.get(self.terminator)
|
||||
required = context.get(self.required)
|
||||
included = context.get(self.included)
|
||||
tstream = TerminatedStream(stream, terminator, included, blocksize=self.blocksize)
|
||||
value = super().parse(context, tstream)
|
||||
if required and tstream._end_pos is None:
|
||||
raise EOFError(f'terminator {terminator} not found in stream')
|
||||
return value
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
||||
terminator = context.get(self.terminator)
|
||||
required = context.get(self.required)
|
||||
included = context.get(self.included)
|
||||
tstream = TerminatedStream(stream, terminator, included, blocksize=self.blocksize)
|
||||
super().dump(context, tstream, value)
|
||||
if required and not included:
|
||||
stream.write(terminator)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
|
||||
terminator = context.peek(self.terminator)
|
||||
required = context.peek(self.required)
|
||||
included = context.peek(self.included)
|
||||
if not required:
|
||||
return None
|
||||
size = super().sizeof(context, value)
|
||||
if size is None:
|
||||
return None
|
||||
if not included:
|
||||
size = add_sizes(size, context.to_size(len(terminator)))
|
||||
return size
|
||||
|
||||
|
||||
class Ref(G[T], Wrapper[T]):
|
||||
|
@ -120,7 +261,7 @@ class Ref(G[T], Wrapper[T]):
|
|||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]:
|
||||
segment = context.peek(self.segment) or context.params.segments['refs']
|
||||
with context.enter_segment(segment):
|
||||
return super().contextof(context, path, value)
|
||||
return super().offsetof(context, path, value)
|
||||
|
||||
def __str__(self) -> str:
|
||||
indicator = {os.SEEK_SET: '', os.SEEK_CUR: '+', os.SEEK_END: '-'}.get(self.whence, self.whence)
|
||||
|
@ -131,6 +272,86 @@ class Ref(G[T], Wrapper[T]):
|
|||
return f'{__name__}.Ref({super().__repr__()}, pos={self.pos!r}, whence={self.whence!r}, segment={self.segment!r})'
|
||||
|
||||
|
||||
LazyUnresolved = object()
|
||||
|
||||
class LazyEntry(G[T]):
|
||||
__slots__ = ('type', 'context', 'stream', 'positions', 'value')
|
||||
|
||||
def __init__(self, type: Type[T], context: Context, stream: Stream, pos: dict[Segment, Pos], value=LazyUnresolved) -> None:
|
||||
self.type = type
|
||||
self.context = context.copy()
|
||||
self.stream = stream
|
||||
self.positions = pos
|
||||
self.value = value
|
||||
|
||||
def __call__(self) -> T:
|
||||
if self.value is LazyUnresolved:
|
||||
old_pos = {}
|
||||
for segment, pos in self.positions.items():
|
||||
old_pos[segment] = segment.pos
|
||||
segment.pos = pos
|
||||
try:
|
||||
with self.context.enter_segment(self.context.segment, self.stream, self.positions[self.context.segment]) as stream:
|
||||
self.value = self.context.parse(self.type, stream)
|
||||
finally:
|
||||
for segment, pos in old_pos.items():
|
||||
segment.pos = pos
|
||||
return self.value
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.value is LazyUnresolved:
|
||||
return f'~{self.type}'
|
||||
return f'~{self.value}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if self.value is LazyUnresolved:
|
||||
return f'{__name__}.{self.__class__.__name__}({self.type!r})'
|
||||
return f'{__name__}.{self.__class__.__name__}(value={self.valu!r})'
|
||||
|
||||
class Lazy(G[T], Type[LazyEntry[T]]):
|
||||
__slots__ = ('type',)
|
||||
|
||||
def __init__(self, type: Type[T]) -> None:
|
||||
self.type = type
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> LazyEntry[T]:
|
||||
sizes = context.to_size(self.sizeof(context, None))
|
||||
pos = {}
|
||||
|
||||
base_size = sizes.pop(context.segment)
|
||||
if base_size is None:
|
||||
raise ValueError(f'lazy type size in current segment must be known at parse time')
|
||||
pos[context.segment] = stream.tell()
|
||||
stream.seek(base_size, os.SEEK_CUR)
|
||||
|
||||
for segment, size in sizes.items():
|
||||
if size is None:
|
||||
raise ValueError(f'lazy type size in segment {segment.name} must be known at parse time')
|
||||
with context.enter_segment(segment, stream) as ss:
|
||||
pos[segment] = ss.tell()
|
||||
ss.seek(size, os.SEEK_CUR)
|
||||
|
||||
return LazyEntry(to_type(self.type), context, stream, pos)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: LazyEntry[T]) -> None:
|
||||
return context.dump(to_type(self.type), stream, value())
|
||||
|
||||
def sizeof(self, context: Context, value: O[LazyEntry[T]]) -> O[Pos]:
|
||||
return context.sizeof(to_type(self.type), value() if value is not None else value)
|
||||
|
||||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[LazyEntry[T]]) -> O[Pos]:
|
||||
return context.offsetof(to_type(self.type), path, value() if value is not None else value)
|
||||
|
||||
def default(self, context: Context) -> LazyEntry[T]:
|
||||
return LazyEntry(self.type, context, None, {}, value=context.default(to_type(self.type)))
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'~{self.type}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.{self.__class__.__name__}({self.type!r})'
|
||||
|
||||
|
||||
class AlignTo(G[T], Wrapper[T]):
|
||||
__slots__ = ('alignment', 'value')
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ class Int(Type[int]):
|
|||
|
||||
def parse(self, context: Context, stream: Stream) -> int:
|
||||
n = context.get(self.bits)
|
||||
bs = stream.read(n // 8)
|
||||
bs = stream.read(n // 8 if n is not None else -1)
|
||||
return int.from_bytes(bs, byteorder=context.get(self.endian).to_python(), signed=context.get(self.signed))
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: U[int, float]) -> None:
|
||||
|
@ -40,7 +40,8 @@ class Int(Type[int]):
|
|||
def __str__(self) -> str:
|
||||
endian = {Endian.Big: 'be', Endian.Little: 'le'}.get(self.endian, self.endian) if self.bits != 8 else ''
|
||||
sign = {True: '', False: 'u'}.get(self.signed, self.signed)
|
||||
return f'{sign}int{self.bits}{endian}'
|
||||
bits = self.bits if self.bits is not None else ''
|
||||
return f'{sign}int{bits}{endian}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Int({self.bits!r}, endian={self.endian!r}, signed={self.signed!r})'
|
||||
|
@ -73,6 +74,11 @@ uint64 = \
|
|||
uint64le = Int(64, endian=Endian.Little, signed=False)
|
||||
uint64be = Int(64, endian=Endian.Big, signed=False)
|
||||
|
||||
intle = Int(None, endian=Endian.Little, signed=True)
|
||||
intbe = Int(None, endian=Endian.Big, signed=True)
|
||||
uintle = Int(None, endian=Endian.Little, signed=False)
|
||||
uintbe = Int(None, endian=Endian.Big, signed=False)
|
||||
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
|
@ -83,7 +89,7 @@ class Bool(G[T], Mapped[T, bool]):
|
|||
repr=f'<{__name__}.Bool({child!r}, true: {true_value!r}, false: {false_value!r})>',
|
||||
)
|
||||
|
||||
bool = Bool(uint8)
|
||||
bool8 = Bool(uint8)
|
||||
|
||||
|
||||
class Float(Type[float]):
|
||||
|
@ -128,7 +134,6 @@ float16le = \
|
|||
binary16le = Float(16, endian=Endian.Little)
|
||||
float16be = \
|
||||
binary16be = Float(16, endian=Endian.Big)
|
||||
float_ = \
|
||||
float32 = \
|
||||
binary32 = \
|
||||
float32le = \
|
||||
|
|
|
@ -59,7 +59,7 @@ class Arr(G[T], Type[List[T]]):
|
|||
for i, elem in enumerate(value):
|
||||
c = to_type(child)
|
||||
with context.enter(i, c):
|
||||
context.dump(child, stream, elem)
|
||||
context.dump(c, stream, elem)
|
||||
|
||||
context.put(self.count, len(value))
|
||||
|
||||
|
|
|
@ -2,57 +2,80 @@ import enum
|
|||
from typing import Optional as O, Union as U
|
||||
from ..core.base import PossibleDynamic as D, Type, Context
|
||||
from ..core.io import Stream
|
||||
from ..core.util import stretch
|
||||
from .num import uint8
|
||||
from .io import Terminated
|
||||
|
||||
|
||||
class StrType(enum.Enum):
|
||||
Raw = enum.auto()
|
||||
ZeroTerminated = C = enum.auto()
|
||||
LengthPrefixed = Pascal = enum.auto()
|
||||
|
||||
class Str(Type[str]):
|
||||
def __init__(self, type: U[D, StrType], length: U[D, O[int]] = None, encoding: U[D, str] = 'utf-8', char_size: U[D, int] = 1, length_type: U[D, Type] = uint8, terminator: U[D, bytes] = b'\x00') -> None:
|
||||
self.type = type
|
||||
def __init__(self, length: U[D, O[int]] = None, encoding: U[D, str] = 'utf-8', char_size: U[D, int] = 1) -> None:
|
||||
self.length = length
|
||||
self.encoding = encoding
|
||||
self.char_size = char_size
|
||||
self.length_type = length_type
|
||||
self.terminator = terminator
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> str:
|
||||
type = context.get(self.type)
|
||||
length = context.get(self.length)
|
||||
encoding = context.get(self.encoding)
|
||||
char_size = context.get(self.char_size)
|
||||
|
||||
if type == StrType.Raw:
|
||||
if length is None:
|
||||
data = stream.read()
|
||||
else:
|
||||
data = stream.read(length * char_size)
|
||||
elif type == StrType.C:
|
||||
terminator = context.get(self.terminator)
|
||||
if len(terminator) != char_size:
|
||||
terminator = stretch(terminator, char_size)
|
||||
data = b''
|
||||
while True:
|
||||
d = stream.read(char_size)
|
||||
if d == terminator:
|
||||
break
|
||||
data += d
|
||||
if length is not None and len(data) >= length * char_size:
|
||||
break
|
||||
elif type == StrType.Pascal:
|
||||
length_type = context.get(self.length_type)
|
||||
plength = context.parse(length_type, stream)
|
||||
if length is None:
|
||||
data = stream.read()
|
||||
else:
|
||||
data = stream.read(length * char_size)
|
||||
|
||||
return data.decode(encoding)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: str) -> None:
|
||||
pass
|
||||
encoding = context.get(self.encoding)
|
||||
char_size = context.get(self.char_size)
|
||||
|
||||
bs = value.encode(encoding)
|
||||
length = len(bs) // char_size
|
||||
stream.write(bs)
|
||||
|
||||
context.put(self.length, length)
|
||||
|
||||
def default(self, context: Context) -> str:
|
||||
return ''
|
||||
|
||||
def sizeof(self, context: Context, value: O[str]) -> O[int]:
|
||||
return 0
|
||||
if value is not None:
|
||||
return len(value.encode(context.peek(self.encoding)))
|
||||
return None
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.length is not None:
|
||||
length = f'({self.length})'
|
||||
else:
|
||||
length = ''
|
||||
if self.encoding != 'utf-8':
|
||||
encoding = f'.{self.encoding}'
|
||||
else:
|
||||
encoding = ''
|
||||
return f'str{self.encoding}{length}'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'{__name__}.Str(length={self.length!r}, encoding={self.encoding!r}, char_size={self.char_size!r})'
|
||||
|
||||
|
||||
class CStr(Terminated[str]):
|
||||
def __init__(self, *args, terminator_required=True, **kwargs) -> None:
|
||||
terminator = '\x00'.encode(kwargs.get('encoding', 'utf-8'))
|
||||
super().__init__(Str(*args, **kwargs), terminator, required=terminator_required, blocksize=16)
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.child.encoding == 'utf-8':
|
||||
return 'cstr'
|
||||
if self.child.encoding == 'utf-16le':
|
||||
return 'wcstr'
|
||||
if self.child.encoding == 'sjis':
|
||||
return 'jcstr'
|
||||
return str(self.child)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'<{__name__}.CStr({self.child!r})>'
|
||||
|
||||
cstr = \
|
||||
utf8cstr = CStr()
|
||||
wcstr = \
|
||||
utf16cstr = CStr(encoding='utf-16le', char_size=2)
|
||||
jcstr = \
|
||||
sjiscstr = CStr(encoding='sjis')
|
||||
|
|
|
@ -11,7 +11,7 @@ from contextlib import contextmanager
|
|||
import sx
|
||||
from ..core import to_type
|
||||
from ..core.base import Context, Type, PathElement
|
||||
from ..core.io import Stream, Pos, add_sizes
|
||||
from ..core.io import Stream, Pos, add_sizes, max_sizes
|
||||
from ..core.util import indent, format_value, get_annot_locations
|
||||
from ..core.meta import Generic, TypeSource
|
||||
from ..core.expr import ProxyExpr
|
||||
|
@ -24,7 +24,7 @@ class ProxyStruct:
|
|||
self._sx_fields_ = {}
|
||||
|
||||
def __getattr__(self, name: str) -> ProxyExpr:
|
||||
e = ProxyExpr(name)
|
||||
e = ProxyExpr(name, 'self')
|
||||
try:
|
||||
self._sx_fields_[name].append(e)
|
||||
except KeyError:
|
||||
|
@ -110,7 +110,16 @@ class StructType(G[T], Type[T]):
|
|||
pos = stream.tell()
|
||||
|
||||
with self.enter():
|
||||
for name, type in self.fields.items():
|
||||
if self.union:
|
||||
if value._sx_lastset_:
|
||||
fields = [value._sx_lastset_]
|
||||
else:
|
||||
fields = next(self.fields)
|
||||
else:
|
||||
fields = list(self.fields)
|
||||
|
||||
for name in fields:
|
||||
type = self.fields[name]
|
||||
with context.enter(name, type):
|
||||
if self.union:
|
||||
stream.seek(pos, os.SEEK_SET)
|
||||
|
@ -142,15 +151,22 @@ class StructType(G[T], Type[T]):
|
|||
elem = getattr(value, field)
|
||||
else:
|
||||
elem = None
|
||||
with context.enter(field, child):
|
||||
size = context.sizeof(child, elem)
|
||||
c = to_type(child, field)
|
||||
with context.enter(field, c):
|
||||
size = context.sizeof(c, elem)
|
||||
sizes.append(size)
|
||||
return sizes
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[Mapping[str, int]]:
|
||||
with self.enter():
|
||||
sizes = self.get_sizes(context, value, None)
|
||||
return add_sizes(*sizes) if sizes else 0
|
||||
if sizes:
|
||||
if self.union:
|
||||
return max_sizes(*sizes)
|
||||
else:
|
||||
return add_sizes(*sizes)
|
||||
else:
|
||||
return 0
|
||||
|
||||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]:
|
||||
if not path:
|
||||
|
@ -165,7 +181,10 @@ class StructType(G[T], Type[T]):
|
|||
|
||||
child = self.fields[field]
|
||||
with self.enter():
|
||||
sizes = self.get_sizes(context, value, field)
|
||||
if self.union:
|
||||
sizes = []
|
||||
else:
|
||||
sizes = self.get_sizes(context, value, field)
|
||||
if path:
|
||||
with context.enter(field, child):
|
||||
sizes.append(context.offsetof(child, path, getattr(value, field) if value is not None else None))
|
||||
|
@ -222,6 +241,7 @@ class Struct:
|
|||
if parent:
|
||||
generics = parent.generics + generics
|
||||
bound = parent.bound + bound
|
||||
kwargs.setdefault('union', parent.union)
|
||||
|
||||
# Get all annotations
|
||||
annots = {}
|
||||
|
@ -269,10 +289,11 @@ class Struct:
|
|||
item = (item,)
|
||||
subtype = cls._sx_type_[item]
|
||||
new_name = '{}[{}]'.format(cls.__name__, ', '.join(str(g) for g in subtype.bound))
|
||||
new = type(new_name, (cls,), {})
|
||||
new = type(new_name, (cls,), {
|
||||
'__module__': cls.__module__,
|
||||
'__slots__': cls.__slots__,
|
||||
})
|
||||
new._sx_type_ = subtype
|
||||
new.__slots__ = cls.__slots__
|
||||
new.__module__ = cls.__module__
|
||||
subtype.cls = new
|
||||
return new
|
||||
|
||||
|
@ -314,3 +335,16 @@ class Struct:
|
|||
|
||||
def __repr__(self) -> str:
|
||||
return self._format_(repr)
|
||||
|
||||
|
||||
class Union(Struct, union=True, inject=False):
|
||||
_sx_lastset_ = ''
|
||||
|
||||
def __init__(self, **kwargs) -> None:
|
||||
super().__init__(**kwargs)
|
||||
if kwargs:
|
||||
super().__setattr__('_sx_lastset_', list(kwargs)[-1])
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None:
|
||||
super().__setattr__(name, value)
|
||||
super().__setattr__('_sx_lastset_', name)
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
import enum
|
||||
import io
|
||||
from typing import Optional as O, Generic as G, Union as U, TypeVar, Callable, Sequence, Mapping, Type as Ty
|
||||
from ..core.base import Type, Context, PathElement
|
||||
from ..core.io import Stream, Pos
|
||||
from ..core.io import Stream, Pos, to_stream
|
||||
from ..core.meta import Wrapper
|
||||
from ..core import to_type
|
||||
from ..core import to_type, dump
|
||||
|
||||
from .data import Data
|
||||
|
||||
|
@ -19,7 +20,74 @@ class Default(G[T], Wrapper[T]):
|
|||
def default(self, context: Context) -> T:
|
||||
return self._default
|
||||
|
||||
class Transform(G[T, V], Type[V]):
|
||||
|
||||
Preprocessor = U[Callable[[bytes], bytes], Callable[[bytes, Context], bytes]]
|
||||
|
||||
class Preproc(G[T], Wrapper[T]):
|
||||
def __init__(self, child: Type[T], parse: Preprocessor, dump: O[Preprocessor] = None, sizeof: O[U[Callable[[O[T]], Pos], Callable[[O[T], Context], Pos]]] = None, context: bool = False, bits: bool = False, str: O[str] = None, repr: O[str] = None) -> None:
|
||||
super().__init__(child)
|
||||
self.on_parse = parse
|
||||
self.on_dump = dump or self.on_parse
|
||||
self.on_sizeof = sizeof
|
||||
self.context = context
|
||||
self.bits = bits
|
||||
self.on_str = str or repr
|
||||
self.on_repr = repr or str
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> T:
|
||||
size = self.sizeof(context, None)
|
||||
if size is None:
|
||||
raise ValueError(f'{self.__class__.__name__} needs a sized child')
|
||||
|
||||
bits = context.get(self.bits)
|
||||
if bits:
|
||||
raw = stream.read(int(size * 8), bits=True)
|
||||
else:
|
||||
raw = stream.read(size)
|
||||
|
||||
ctx = context.get(self.context)
|
||||
proc = self.on_parse(raw, context) if ctx else self.on_parse(raw)
|
||||
|
||||
return super().parse(context, to_stream(proc))
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: T) -> None:
|
||||
raw_stream = io.BytesIO()
|
||||
super().on_dump(context, raw_stream, value)
|
||||
|
||||
raw = raw_stream.getvalue()
|
||||
ctx = context.get(self.context)
|
||||
proc = self.on_dump(raw, context) if ctx else self.on_dump(raw)
|
||||
|
||||
size = self.sizeof(context, value)
|
||||
bits = context.get(self.bits)
|
||||
if bits:
|
||||
stream.write(proc, bits=int(size * 8))
|
||||
else:
|
||||
stream.write(proc)
|
||||
|
||||
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
|
||||
if value is not None:
|
||||
raw_stream = io.BytesIO()
|
||||
dump(context.peek(self.child), value, raw_stream)
|
||||
value = raw_stream.getvalue()
|
||||
ctx = context.peek(self.context)
|
||||
if self.on_sizeof:
|
||||
size = self.on_sizeof(value, context) if ctx else self.on_sizeof(value)
|
||||
if size is None:
|
||||
size = super().sizeof(context, value)
|
||||
return size
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.on_str is not None:
|
||||
return self.on_str
|
||||
return f'λ({self.child})'
|
||||
|
||||
def __repr__(self) -> str:
|
||||
if self.on_repr is not None:
|
||||
return self.on_repr
|
||||
return f'{__name__}.Preproc({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, sizeof={self.on_sizeof!r}, context={self.context!r}, bits={self.bits!r}, str={self.on_str!r}, repr={self.on_repr!r})'
|
||||
|
||||
class Proc(G[T, V], Type[V]):
|
||||
def __init__(self, child: Type[T], parse: U[Callable[[T], V], Callable[[T, Context], V]], dump: U[Callable[[V], T], Callable[[V, Context], T]], context: bool = False, str: O[str] = None, repr: O[str] = None) -> None:
|
||||
self.child = child
|
||||
self.on_parse = parse
|
||||
|
@ -29,25 +97,35 @@ class Transform(G[T, V], Type[V]):
|
|||
self.on_repr = repr or str
|
||||
|
||||
def parse(self, context: Context, stream: Stream) -> V:
|
||||
value = context.parse(to_type(self.child), stream)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
value = context.parse(child, stream)
|
||||
return self.on_parse(value, context) if self.context else self.on_parse(value)
|
||||
|
||||
def dump(self, context: Context, stream: Stream, value: V) -> None:
|
||||
value = self.on_dump(value, context) if self.context else self.on_dump(value)
|
||||
context.dump(to_type(self.child), stream, value)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
context.dump(child, stream, value)
|
||||
|
||||
def sizeof(self, context: Context, value: O[V]) -> O[int]:
|
||||
if value is not None:
|
||||
value = self.on_dump(value, context) if self.context else self.on_dump(value)
|
||||
return context.sizeof(to_type(self.child), value)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
return context.sizeof(child, value)
|
||||
|
||||
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[V]) -> O[int]:
|
||||
if value is not None:
|
||||
value = self.on_dump(value, context) if self.context else self.on_dump(value)
|
||||
return context.offsetof(to_type(self.child), path, value)
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
return context.offsetof(child, path, value)
|
||||
|
||||
def default(self, context: Context) -> V:
|
||||
value = context.default(to_type(self.child))
|
||||
child = to_type(self.child)
|
||||
with context.enter(None, child):
|
||||
value = context.default(child)
|
||||
return self.on_parse(value, context) if self.context else self.on_parse(value)
|
||||
|
||||
def __str__(self) -> str:
|
||||
|
@ -58,10 +136,10 @@ class Transform(G[T, V], Type[V]):
|
|||
def __repr__(self) -> str:
|
||||
if self.on_repr is not None:
|
||||
return self.on_repr
|
||||
return f'{__name__}.Transform({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, context={self.context!r}, str={self.on_str!r}, repr={self.on_repr!r})'
|
||||
return f'{__name__}.Proc({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, context={self.context!r}, str={self.on_str!r}, repr={self.on_repr!r})'
|
||||
|
||||
|
||||
class Mapped(G[T, V], Transform[T, V]):
|
||||
class Mapped(G[T, V], Proc[T, V]):
|
||||
def __init__(self, child: Type[T], mapping: Mapping[T, V], str: O[str] = None, repr: O[str] = None) -> None:
|
||||
reverse = {v: k for k, v in mapping.items()}
|
||||
super().__init__(child,
|
||||
|
@ -75,7 +153,7 @@ class Mapped(G[T, V], Transform[T, V]):
|
|||
|
||||
E = TypeVar('E', bound=enum.Enum)
|
||||
|
||||
class Enum(G[T, E], Transform[T, E]):
|
||||
class Enum(G[T, E], Proc[T, E]):
|
||||
def __init__(self, enum: Ty[E], child: Type[T]) -> None:
|
||||
super().__init__(child,
|
||||
parse=enum,
|
||||
|
|
Loading…
Reference in New Issue