Compare commits

...

31 Commits

Author SHA1 Message Date
Shiz d09fa3146f do not overwrite builtin Python types in __all__ 2022-05-10 13:33:30 +02:00
Shiz 04fe78c611 transformers: rename Transform to Proc, add Preproc 2022-05-10 13:33:30 +02:00
Shiz 398fe27e77 data: rename Static to Implied, add compatibility wrapper 2022-05-10 13:33:30 +02:00
Shiz b91a01957c data: add Pad type 2022-05-10 13:33:30 +02:00
Shiz e96837d429 core/str: expose terminator_required= argument in CStr 2022-05-10 13:33:30 +02:00
Shiz fb78843d6b core/util: fix Mapping moving around in stdlib `collections` 2022-05-10 13:33:30 +02:00
Shiz c3a4715141 util: lol 2022-05-10 13:33:30 +02:00
Shiz 5afa169647 util: cut off long data by default 2022-05-10 13:33:30 +02:00
Shiz 23bc195d6a sx: fx some misc issues 2022-05-10 13:33:30 +02:00
Shiz 4f76bea4b2 core/expr: do not accept lambas for _sx_get__ 2022-05-10 13:33:30 +02:00
Shiz 3042be6e02 expose to_stream 2022-05-10 13:33:30 +02:00
Shiz 75a4e879a5 data: communicate length back to parser 2022-05-10 13:33:29 +02:00
Shiz 9a613c9070 expr: error on bad operator usage, add bad operator alternative functions, implement item slicing correctly 2022-05-10 13:33:29 +02:00
Shiz f886d9a769 control: add more friendly error message to Switch 2022-05-10 13:33:29 +02:00
Shiz 1ce5fac5fc io: fix Lazy __str__/__repr__ mixup 2022-05-10 13:33:29 +02:00
Shiz f9d9e3497b io: fix some Lazy stream issues 2022-05-10 13:33:29 +02:00
Shiz ff4830c25b io: add Lazy type 2022-05-10 13:33:29 +02:00
Shiz 7a3d57db51 build: add setup.py 2022-05-10 13:33:29 +02:00
Shiz b4ce3eb788 data: fix `sizeof()` for Data 2022-05-10 13:33:29 +02:00
Shiz 05dd3a31e5 io: introduce `hard` parameter for Sized 2022-05-10 13:33:29 +02:00
Shiz eb53863305 struct: fix issue where generics instantiations would have a different module 2022-05-10 13:33:29 +02:00
Shiz f671b70854 union: fix dumping semantics 2022-05-10 13:33:29 +02:00
Shiz c602ad8120 struct: fix up and expose union type 2022-05-10 13:33:29 +02:00
Shiz 71732b9743 base: do not reuse Context for stream offset size calculation 2022-05-10 13:33:29 +02:00
Shiz 81f338a13f types: fix various missing to_type() calls and misc errors 2022-05-10 13:33:29 +02:00
Shiz 0a45c440c9 core: properly pop from path if type fails 2022-05-10 13:33:29 +02:00
Shiz d1406959eb io: add rudimentary Terminated type and rewrite str 2022-05-10 13:33:29 +02:00
Shiz 21641e0b1f examples: add simple RIFF parse example 2022-05-10 13:33:29 +02:00
Shiz 8bf9b77505 core: allow debug tracing of parse for visualisations 2021-07-04 23:40:46 +02:00
Shiz ea61dbf1e3 core: fully keep track of parse path 2021-07-04 23:35:17 +02:00
Shiz e71cacb8f7 num: add variable-width greedy int types 2021-07-04 23:34:04 +02:00
16 changed files with 883 additions and 213 deletions

76
examples/riff.py Normal file
View File

@ -0,0 +1,76 @@
from __future__ import annotations
import enum
from sx import Generic, Struct, parse
_Chunk = Generic('Chunk')
class FormatCompression(enum.Enum):
Unknown = 0
PCM = 1
MSADPCM = 2
ALaw = 6
MuLaw = 7
IMAADPCM = 17
G723ADPCM = 20
GSM610 = 49
G721ADPCM = 64
MPEG = 80
Experimental = 0xFFFF
class FormatChunk(Struct, partial=True):
compression: Enum(FormatCompression, uint16le)
channel_count: uint16le
sample_rate: uint32le
bytes_per_sec: uint32le
alignment: uint16le
bits_per_sample: uint16le
extra_length: uint16le
extra: Data(self.extra_length)
class FactChunk(Struct):
sample_count: uint32le
class SampleLoop(Struct):
id: uint32le
type: uint32le
start: uint32le
end: uint32le
fraction: uint32le
count: uint32le
class SampleChunk(Struct):
manufacturer: uint32le
product: uint32le
period: uint32le
unity_note: uint32le
pitch_fraction: uint32le
smtpe_format: uint32le
smpte_offset: uint32le
sample_loop_count: uint32le
padding_length: uint32le
sample_loops: Arr(SampleLoop, count=self.sample_loop_count)
class DataChunk(Struct):
data: data
class RIFFChunk(Struct):
type: Fixed(b'WAVE')
children: Arr(_Chunk)
class Chunk(Struct):
type: Data(4)
length: uint32le
contents: Sized(Switch({
b'fmt ': FormatChunk,
b'fact': FactChunk,
b'smpl': SampleChunk,
b'data': DataChunk,
b'RIFF': RIFFChunk,
}, fallback=DataChunk, selector=self.type), self.length)
_Chunk.push(Chunk)
if __name__ == '__main__':
import sys
print(parse(Chunk, open(sys.argv[1], 'rb')))

12
setup.py Normal file
View File

@ -0,0 +1,12 @@
from setuptools import setup
setup(
name='sx',
version='0.1.0',
author='Shiz <hi@shiz.me>',
url='https://weeaboo.software/Shiz/sx',
description='declarative binary data parsing and dumping library',
packages=['sx', 'sx.core', 'sx.types'],
zip_safe=True,
)

View File

@ -1,26 +1,35 @@
from .core import parse, dump, sizeof, offsetof, default, context
from .core.base import Params, Context, Type, to_type, Error
from .core.base import Params, Context, Type, to_type, Error
from .core.io import Stream, Segment, to_stream, Endian, BitAlignment
from .core.expr import BaseExpr, Expr, const, infer
from .core.expr import BaseExpr, Expr, const, infer, bool_, not_, and_, or_, in_, len_, int_, float_
from .core.meta import Wrapper, Generic
del core
from .types.data import Nothing, Static, Ignored, Data, data, Bits, bit, nibble
from .types.data import Nothing, Implied, Ignored, Pad, Data, data, Bits, bit, nibble
from .types.num import (
Bool, Int, Float, bool, int8, uint8, byte,
Bool, Int, Float, bool8, int8, uint8, byte,
int16, uint16, int16be, int16le, uint16be, uint16le, word,
int32, uint32, int32be, int32le, uint32be, uint32le, dword,
int64, uint64, int64be, int64le, uint64be, uint64le, qword,
intbe, intle, uintbe, uintle,
float16, float16le, float16be, binary16, binary16le, binary16be, half,
float32, float32le, float32be, binary32, binary32le, binary32be, float_,
float32, float32le, float32be, binary32, binary32le, binary32be,
float64, float64le, float64be, binary64, binary64le, binary64be, double,
)
from .types.str import Str, StrType
from .types.str import Str, CStr, cstr, wcstr, utf8cstr, utf16cstr
from .types.seq import Arr, Tuple
from .types.struct import StructType, Struct
from .types.transforms import Default, Transform, Mapped, Enum, Check, Fixed
from .types.struct import StructType, Struct, Union
from .types.transforms import Default, Preproc, Proc, Mapped, Enum, Check, Fixed
from .types.control import Switch, If
from .types.io import Sized, Ref, AlignTo, AlignedTo
from .types.io import Sized, Terminated, Ref, AlignTo, AlignedTo, Lazy
del types
# Compatibility
Static = Calc = Implied
Transform = Proc
__all__ = [k for k in globals() if not k.startswith('_')]
# Extra types that shouldn't pollute the namespace they're mass-imported into
bool = bool8
float = float32

View File

@ -7,7 +7,7 @@ from .io import Stream, PossibleStream, Segment, ceil_sizes, to_stream
from .expr import ProxyExpr
context = ProxyExpr('<context>')
context = ProxyExpr('context')
@contextmanager
def resolving(expr: ProxyExpr, value: Any) -> Generator[ProxyExpr, None, None]:

View File

@ -11,27 +11,53 @@ from .util import seeking
from .io import Segment, Stream, Pos
class Params:
__slots__ = ('segments', 'default_segment', 'user')
T = TypeVar('T')
def __init__(self, segments: Sequence[Segment] = None, user: Mapping[str, Any] = {}):
class DebugTreeNode(Generic[T]):
__slots__ = ('type', 'pos', 'end', 'value', 'children')
def __init__(self, type: 'Type[T]', pos: Pos = None) -> None:
self.type = type
self.pos = pos
self.value = None
self.end = None
self.children = []
def finalize(self, value: T, end: Pos) -> None:
self.value = value
self.end = end
def add_child(self, ident, node: 'DebugTreeNode') -> None:
self.children.append((ident, node))
class Params:
__slots__ = ('segments', 'default_segment', 'user', 'debug_path', 'debug_root')
def __init__(self, segments: Sequence[Segment] = None, user: Mapping[str, Any] = {}, debug: bool = False):
default = segments[0] if segments else Segment('default')
self.segments = {s.name: s for s in (segments or [default, Segment('refs', [default])])}
self.default_segment = default
self.user = SimpleNamespace(**user)
self.debug_path: O[List[DebugTreeNode]] = [] if debug else None
self.debug_root: O[DebugTreeNode] = None
def reset(self):
self.debug_root = None
self.debug_path = [] if self.debug_path is not None else None
for s in self.segments.values():
s.reset()
PathElement = U[str, int]
PathElement = U[None, str, int]
PathEntry = Tuple[PathElement, 'Type']
def format_path(path: Iterable[PathElement]) -> str:
s = ''
first = True
for p in path:
if p is None:
continue
sep = '.'
if isinstance(p, int):
p = '[' + str(p) + ']'
@ -43,10 +69,8 @@ def format_path(path: Iterable[PathElement]) -> str:
return s
T = TypeVar('T')
PT = TypeVar('PT')
class PossibleDynamic(Generic[T]):
pass
@ -60,6 +84,8 @@ class Context:
self.path: List[PathEntry] = []
self.segment_path: List[Segment] = []
self.params.reset()
def copy(self) -> 'Context':
c = self.__class__(root=self.root, value=self.value, params=self.params)
c.path = self.path.copy()
@ -89,13 +115,17 @@ class Context:
raise Error(self, ValueError('could not enter segment {}: could not calculate offset'.format(segment)))
with seeking(stream.root, pos, reference) as s, stream.wrapped(s) as f:
self.segment_path.append(segment)
yield f
self.segment_path.pop()
segment.pos = f.tell()
try:
yield f
segment.pos = f.tell()
finally:
self.segment_path.pop()
else:
self.segment_path.append(segment)
yield stream
self.segment_path.pop()
try:
yield stream
finally:
self.segment_path.pop()
def segment_offset(self, segment: Segment) -> O[Pos]:
size: Pos = 0
@ -110,7 +140,7 @@ class Context:
return size
def segment_size(self, segment: Segment) -> O[Pos]:
sizes = self.sizeof(self.root, self.value)
sizes = Context(self.root, self.value, params=self.params, reset=False).sizeof(self.root, self.value)
return sizes.get(segment, None)
def format_path(self) -> str:
@ -125,24 +155,38 @@ class Context:
def get(self, value: U[T, PossibleDynamic[T]]) -> T:
from .expr import Expr, get
if isinstance(value, Expr):
if isinstance(value, (Expr, FunctionType, tuple)):
value = get(value)
return cast(T, value)
def peek(self, value: U[T, PossibleDynamic[T]]) -> O[T]:
from .expr import Expr, peek
if isinstance(value, Expr):
if isinstance(value, (Expr, FunctionType, tuple)):
value = peek(value)
return cast(T, value)
def put(self, value: U[T, PossibleDynamic[T]], new: T) -> None:
from .expr import Expr, put
if isinstance(value, Expr):
if isinstance(value, (Expr, FunctionType, tuple)):
put(value, new)
def parse(self, type: 'Type[PT]', stream: Stream) -> PT:
return type.parse(self, stream)
if self.params.debug_path is not None:
node = DebugTreeNode(type, stream.root.tell())
if self.params.debug_path:
self.params.debug_path[-1].add_child(self.path[-1][0], node)
else:
self.params.debug_root = node
self.params.debug_path.append(node)
value = None
try:
value = type.parse(self, stream)
return value
finally:
if self.params.debug_path is not None:
node.finalize(value, stream.root.tell())
self.params.debug_path.pop()
def dump(self, type: 'Type[PT]', stream: Stream, value: PT) -> None:
return type.dump(self, stream, value)

View File

@ -1,6 +1,7 @@
import math
import operator
import functools
import types
from typing import Any, Optional as O, Union as U, Sequence, Mapping, Callable, Generic as G, TypeVar, List
@ -51,6 +52,8 @@ reverse = {
}
T = TypeVar('T')
V = TypeVar('V')
X = TypeVar('X')
class BaseExpr(G[T]):
@ -66,6 +69,14 @@ class BaseExpr(G[T]):
def _sx_is_const_(self) -> bool:
raise NotImplementedError
def raise_type_error(self, *y, op=None, alternatives=[]):
if not alternatives:
alternatives = [op + '_']
alternatives = [x + (f'({self}, {y[0]})' if y else f'({self})') for x in alternatives]
msg = f'bad operator __{op}__ called on Expr, try {" / ".join(alternatives)}; or use lambdas for calling standard functions!'
raise TypeError(msg)
class Expr(G[T], BaseExpr[T]):
def __getattr__(self, name: str) -> 'AttrExpr':
return AttrExpr(self, name)
@ -76,30 +87,31 @@ class Expr(G[T], BaseExpr[T]):
def __call__(self, *args: Any, **kwargs: Any) -> 'CallExpr':
return CallExpr(self, args, kwargs)
for x, y in {'bool': ['bool_', 'not_', 'and_', 'or_', 'in_'], 'contains': ['in_'], 'int': [], 'float': [], 'len': []}.items():
locals()['__' + x + '__'] = functools.partialmethod(raise_type_error, op=x, alternatives=y)
for x in ('lt', 'le', 'eq', 'ne', 'ge', 'gt'):
locals()['__' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x, other: CompExpr(getattr(operator, x), self, other), x)
for x in ('not_', 'truth', 'abs', 'index', 'inv', 'neg', 'pos'):
for x in ('abs', 'invert', 'neg', 'pos'):
locals()['__' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x: UnaryExpr(getattr(operator, x), self), x)
for x in (
'add', 'and_', 'floordiv', 'lshift', 'mod', 'mul', 'matmul', 'or_', 'pow', 'rshift', 'sub', 'truediv', 'xor',
'concat', 'contains',
):
locals()[ '__' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x, other: BinExpr(getattr(operator, x), self, other), x)
locals()['__r' + x.strip('_') + '__'] = functools.partialmethod(lambda self, x, other: BinExpr(getattr(operator, x), other, self), x)
del x
del x, y
class AttrExpr(G[T], Expr[T]):
def __init__(self, parent: BaseExpr, attr: str) -> None:
class AttrExpr(G[T, V], Expr[V]):
def __init__(self, parent: BaseExpr[T], attr: str) -> None:
self.__parent = parent
self.__attr = attr
def _sx_get_(self, pop: bool = True) -> T:
def _sx_get_(self, pop: bool = True) -> V:
return getattr(get(self.__parent, pop=pop), get(self.__attr, pop=pop))
def _sx_peek_(self, pop: bool = True) -> T:
def _sx_peek_(self, pop: bool = True) -> V:
return getattr(peek(self.__parent, pop=pop), peek(self.__attr, pop=pop))
def _sx_put_(self, value: T, pop: bool = True) -> None:
def _sx_put_(self, value: V, pop: bool = True) -> None:
parent = get(self.__parent, pop=False)
setattr(parent, get(self.__attr, pop=pop), value)
put(self.__parent, parent, pop=pop)
@ -113,44 +125,60 @@ class AttrExpr(G[T], Expr[T]):
def __repr__(self) -> str:
return f'{self.__parent!r}.{self.__attr}'
class ItemExpr(G[T], Expr[T]):
def __init__(self, parent: BaseExpr, item: Any) -> None:
class ItemExpr(G[T, V], Expr[V]):
def __init__(self, parent: BaseExpr[T], item: Any) -> None:
self.__parent = parent
self.__item = item
def _sx_get_(self, pop: bool = True) -> T:
return get(self.__parent, pop=pop)[get(self.__item, pop=pop)]
def _sx_get_(self, pop: bool = True) -> V:
if isinstance(self.__item, slice):
item = slice(get(self.__item.start, pop=pop), get(self.__item.stop, pop=pop), get(self.__item.step, pop=pop))
else:
item = get(self.__item, pop=pop)
return get(self.__parent, pop=pop)[item]
def _sx_peek_(self, pop: bool = True) -> T:
return peek(self.__parent, pop=pop)[peek(self.__item, pop=pop)]
def _sx_peek_(self, pop: bool = True) -> V:
if isinstance(self.__item, slice):
item = slice(peek(self.__item.start, pop=pop), peek(self.__item.stop, pop=pop), peek(self.__item.step, pop=pop))
else:
item = peek(self.__item, pop=pop)
return peek(self.__parent, pop=pop)[item]
def _sx_put_(self, value: T, pop: bool = True) -> None:
def _sx_put_(self, value: V, pop: bool = True) -> None:
parent = get(self.__parent, pop=False)
parent[get(self.__item, pop=pop)] = value
if isinstance(self.__item, slice):
item = slice(get(self.__item.start, pop=pop), get(self.__item.stop, pop=pop), get(self.__item.step, pop=pop))
else:
item = get(self.__item, pop=pop)
parent[item] = value
put(self.__parent, parent, pop=pop)
def _sx_is_const_(self) -> bool:
return is_const(self.__parent) and is_const(self.__item)
if isinstance(self.__item, slice):
item_const = is_const(self.__item.start) and is_const(self.__item.stop) and is_const(self.__item.step)
else:
item_const = is_const(self.__item)
return is_const(self.__parent) and item_const
def __repr__(self) -> str:
def __str__(self) -> str:
return f'{self.__parent}[{self.__item}]'
def __repr__(self) -> str:
return f'{self.__parent!r}[{self.__item!r}]'
class CallExpr(G[T], Expr[T]):
def __init__(self, parent: BaseExpr, args: Sequence[Any], kwargs: Mapping[str, Any]) -> None:
class CallExpr(G[T, V], Expr[V]):
def __init__(self, parent: BaseExpr[T], args: Sequence[Any], kwargs: Mapping[str, Any]) -> None:
self.__parent = parent
self.__args = args
self.__kwargs = kwargs
def _sx_get_(self, pop: bool = True) -> T:
def _sx_get_(self, pop: bool = True) -> V:
return get(self.__parent, pop=pop)(*(get(a, pop=pop) for a in self.__args), **{k: get(v, pop=pop) for k, v in self.__kwargs.items()})
def _sx_peek_(self, pop: bool = True) -> T:
def _sx_peek_(self, pop: bool = True) -> V:
return peek(self.__parent, pop=pop)(*(peek(a, pop=pop) for a in self.__args), **{k: peek(v, pop=pop) for k, v in self.__kwargs.items()})
def _sx_put_(self, value: T, pop: bool = True) -> None:
def _sx_put_(self, value: V, pop: bool = True) -> None:
raise NotImplementedError(f'{self.__class__.__name__} is not invertible')
def _sx_is_const_(self) -> bool:
@ -169,18 +197,18 @@ class CallExpr(G[T], Expr[T]):
return f'{self.__parent!r}({a})'
class UnaryExpr(G[T], Expr[T]):
def __init__(self, op: Callable[[Any], T], value: BaseExpr) -> None:
class UnaryExpr(G[T, V], Expr[V]):
def __init__(self, op: Callable[[T], V], value: BaseExpr[T]) -> None:
self.__op = op
self.__value = value
def _sx_get_(self, pop: bool = True) -> T:
def _sx_get_(self, pop: bool = True) -> V:
return self.__op(get(self.__value, pop=pop))
def _sx_peek_(self, pop: bool = True) -> T:
def _sx_peek_(self, pop: bool = True) -> V:
return self.__op(peek(self.__value, pop=pop))
def _sx_put_(self, value: T, pop: bool = True) -> None:
def _sx_put_(self, value: V, pop: bool = True) -> None:
if self.__op not in reverse:
raise NotImplementedError(f'{self.__class__.__name__} {symbols[self.__op]!r} is not invertible')
put(self.__value, reverse[self.__op](value), pop=pop)
@ -194,19 +222,19 @@ class UnaryExpr(G[T], Expr[T]):
def __repr__(self) -> str:
return f'({symbols[self.__op]}{self.__value!r})'
class BinExpr(G[T], Expr[T]):
def __init__(self, op: Callable[[Any, Any], T], left: BaseExpr, right: BaseExpr) -> None:
class BinExpr(G[T, V, X], Expr[X]):
def __init__(self, op: Callable[[T, V], X], left: BaseExpr[T], right: BaseExpr[V]) -> None:
self.__op = op
self.__left = left
self.__right = right
def _sx_get_(self, pop: bool = True) -> T:
def _sx_get_(self, pop: bool = True) -> X:
return self.__op(get(self.__left, pop=pop), get(self.__right, pop=pop))
def _sx_peek_(self, pop: bool = True) -> T:
def _sx_peek_(self, pop: bool = True) -> X:
return self.__op(peek(self.__left, pop=pop), peek(self.__right, pop=pop))
def _sx_put_(self, value: T, pop: bool = True) -> None:
def _sx_put_(self, value: X, pop: bool = True) -> None:
if is_const(self.__left):
operand = self.__left
target = self.__right
@ -233,8 +261,8 @@ class BinExpr(G[T], Expr[T]):
def __repr__(self) -> str:
return f'({self.__left!r} {symbols[self.__op]} {self.__right!r})'
class CompExpr(Expr[bool]):
def __init__(self, op: Callable[[Any, Any], bool], left: BaseExpr, right: BaseExpr) -> None:
class CompExpr(G[T, V], Expr[bool]):
def __init__(self, op: Callable[[T, V], bool], left: BaseExpr[T], right: BaseExpr[V]) -> None:
self.__op = op
self.__left = left
self.__right = right
@ -274,8 +302,9 @@ class CompExpr(Expr[bool]):
return f'({self.__left!r} {symbols[self.__op]} {self.__right!r})'
class ProxyExpr(G[T], Expr[T]):
def __init__(self, name: str) -> None:
def __init__(self, name: str, parent: O[str] = None) -> None:
self.__name = name
self.__parent = parent + '.' if parent else ''
self.__stack: List[BaseExpr[T]] = []
def _sx_push_(self, value: BaseExpr[T]) -> None:
@ -297,10 +326,10 @@ class ProxyExpr(G[T], Expr[T]):
return is_const(self.__stack[-1])
def __str__(self) -> str:
return f'${self.__name}'
return f'{self.__parent}{self.__name}'
def __repr__(self) -> str:
return f'${self.__name}(=> {self.__stack!r})'
return f'{self.__parent}{self.__name}(=> {self.__stack!r})'
class ConstChangeExpr(G[T], Expr[T]):
def __init__(self, child: BaseExpr[T], const: bool = True) -> None:
@ -326,23 +355,68 @@ class ConstChangeExpr(G[T], Expr[T]):
return f'infer({self.__child})'
def bool_(x: BaseExpr[T]) -> UnaryExpr[T, bool]:
return UnaryExpr(bool, x)
def not_(x: BaseExpr[T]) -> UnaryExpr[T, bool]:
return UnaryExpr(lambda a: not a, x)
def and_(x: BaseExpr[T], y: BaseExpr[V]) -> BinExpr[T, V, bool]:
return BinExpr(lambda a, b: a and b, x, y)
def or_(x: BaseExpr[T], y: BaseExpr[V]) -> BinExpr[T, V, bool]:
return BinExpr(lambda a, b: a or b, x, y)
def in_(x: BaseExpr[T], y: BaseExpr[V]) -> BinExpr[T, V, bool]:
return BinExpr(lambda a, b: b in a, x, y)
def len_(x: BaseExpr[T]) -> UnaryExpr[T, int]:
return UnaryExpr(len, x)
def int_(x: BaseExpr[T]) -> UnaryExpr[T, int]:
return UnaryExpr(int, x)
def float_(x: BaseExpr[T]) -> UnaryExpr[T, float]:
return UnaryExpr(float, x)
def get(expr: U[T, BaseExpr[T]], pop: bool = True) -> T:
if isinstance(expr, BaseExpr):
return expr._sx_get_(pop=pop)
#if isinstance(expr, types.FunctionType):
# return expr(pop)
if isinstance(expr, tuple) and len(expr) >= 2:
_get, _put = expr[:2]
return _get(pop)
return expr
def peek(expr: U[T, BaseExpr[T]], pop: bool = True) -> T:
if isinstance(expr, BaseExpr):
return expr._sx_peek_(pop=pop)
if isinstance(expr, types.FunctionType):
return expr(pop)
if isinstance(expr, tuple) and len(expr) >= 2:
if len(expr) == 2:
_peek = expr[0]
else:
_get, _put, _peek = expr[:3]
return _peek(pop)
return expr
def put(expr: U[T, BaseExpr[T]], value: T, pop: bool = True) -> None:
if isinstance(expr, BaseExpr):
expr._sx_put_(value, pop=pop)
if isinstance(expr, tuple) and len(expr) >= 2:
_get, _put, _peek = expr[:2]
return _put(value, pop)
def is_const(expr: U[T, BaseExpr[T]]) -> bool:
if isinstance(expr, BaseExpr):
return expr._sx_is_const_()
if isinstance(expr, types.FunctionType):
return False
if isinstance(expr, tuple) and isinstance(expr[0], types.FunctionType):
return False
return True
def const(expr: U[T, BaseExpr[T]]) -> ConstChangeExpr[T]:

View File

@ -13,19 +13,29 @@ class Wrapper(G[T], Type[T]):
self.child = child
def parse(self, context: Context, stream: Stream) -> T:
return context.parse(to_type(self.child), stream)
child = to_type(self.child)
with context.enter(None, child):
return context.parse(child, stream)
def dump(self, context: Context, stream: Stream, value: O[T]) -> None:
context.dump(to_type(self.child), stream, value)
child = to_type(self.child)
with context.enter(None, child):
context.dump(child, stream, value)
def sizeof(self, context: Context, value: O[T]) -> O[int]:
return context.sizeof(to_type(self.child), value)
child = to_type(self.child)
with context.enter(None, child):
return context.sizeof(child, value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]:
return context.offsetof(to_type(self.child), path, value)
child = to_type(self.child)
with context.enter(None, child):
return context.offsetof(child, path, value)
def default(self, context: Context) -> T:
return context.default(to_type(self.child))
child = to_type(self.child)
with context.enter(None, child):
return context.default(child)
def __str__(self) -> str:
return str(to_type(self.child))
@ -64,27 +74,37 @@ class Generic(G[T], Type[T], BaseExpr[T]):
def parse(self, context: Context, stream: Stream) -> T:
if not self.stack:
raise Error(context, 'unresolved generic')
return context.parse(to_type(self.stack[-1]), stream)
child = to_type(self.stack[-1])
with context.enter(None, child):
return context.parse(child, stream)
def dump(self, context: Context, stream: Stream, value: T) -> None:
if not self.stack:
raise Error(context, 'unresolved generic')
context.dump(to_type(self.stack[-1]), stream, value)
child = to_type(self.stack[-1])
with context.enter(None, child):
context.dump(child, stream, value)
def sizeof(self, context: Context, value: O[T]) -> O[int]:
if not self.stack:
return None
return context.sizeof(to_type(self.stack[-1]), value)
child = to_type(self.stack[-1])
with context.enter(None, child):
return context.sizeof(child, value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]:
if not self.stack:
return None
return context.offsetof(to_type(self.stack[-1]), path, value)
child = to_type(self.stack[-1])
with context.enter(None, child):
return context.offsetof(child, path, value)
def default(self, context: Context) -> T:
if not self.stack:
raise Error(context, 'unresolved generic')
return context.default(to_type(self.stack[-1]))
child = to_type(self.stack[-1])
with context.enter(None, child):
return context.default(child)
def __str__(self) -> str:
if self.stack:

View File

@ -1,8 +1,8 @@
import os
import inspect
import ast
import collections
from typing import BinaryIO, Generator, Callable, Union as U, Tuple, Mapping, Any, cast
import collections.abc
from typing import BinaryIO, Generator, Callable, Optional as O, Union as U, Tuple, Mapping, Any, cast
from contextlib import contextmanager
@ -38,12 +38,17 @@ def indent(s: str, count: int, start: bool = False) -> str:
lines[i] = ' ' * count + lines[i]
return '\n'.join(lines)
def format_bytes(bs: bytes) -> str:
return '[' + ' '.join(hex(b)[2:].zfill(2) for b in bs) + ']'
def format_bytes(bs: bytes, cutoff=256) -> str:
if len(bs) > cutoff:
trailer = ' ...'
bs = bs[:cutoff]
else:
trailer = ''
return '[' + ' '.join(hex(b)[2:].zfill(2) for b in bs) + trailer + ']'
def format_value(value: Any, formatter: Callable[[Any], str], indentation: int = 0) -> str:
""" Format containers to use the given formatter function instead of always repr(). """
if isinstance(value, (dict, collections.Mapping)):
if isinstance(value, (dict, collections.abc.Mapping)):
if value:
fmt = '{{\n{}\n}}'
values = [indent(',\n'.join('{}: {}'.format(
@ -93,3 +98,15 @@ def get_annot_locations(cls: type) -> Tuple[str, Mapping[str, int]]:
lines[t.id] = start + b.lineno - 2
return fn, lines
def find_overlap(haystack: bytes, needle: bytes, start: int = 0) -> O[int]:
""" Find occurrence of `needle` in `haystack` or start of `needle` at the end of `haystack` """
if needle in haystack[start:]:
return haystack.index(needle, start)
n = len(needle) - 1
while n > 0:
if haystack[-n:] == needle[:n]:
return len(haystack) - n
n -= 1
return None

View File

@ -25,27 +25,39 @@ class Switch(G[T, V], Type[T]):
selector = context.peek(self.default_key) if peek else context.get(self.default_key)
else:
return to_type(default)
if selector not in options and default:
return to_type(default)
if selector not in options:
if default:
return to_type(default)
raise ValueError(f'selector {selector} not any in options {", ".join(str(o) for o in options)}')
return to_type(options[selector])
def parse(self, context: Context, stream: Stream) -> T:
return context.parse(self.get_value(context), stream)
child = self.get_value(context)
with context.enter(None, child):
return context.parse(child, stream)
def dump(self, context: Context, stream: Stream, value: T) -> None:
context.dump(self.get_value(context), stream, value)
child = self.get_value(context)
with context.enter(None, child):
context.dump(child, stream, value)
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
return context.sizeof(self.get_value(context, peek=True), value)
child = self.get_value(context, peek=True)
with context.enter(None, child):
return context.sizeof(child, value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]:
return context.offsetof(self.get_value(context, peek=True), path, value)
child = self.get_value(context, peek=True)
with context.enter(None, child):
return context.offsetof(child, path, value)
def default(self, context: Context) -> T:
return context.default(self.get_value(context, peek=True))
child = self.get_value(context, peek=True)
with context.enter(None, child):
return context.default(ChildProcessError)
def __str__(self) -> str:
return f'{format_value(self.options)}[{self.selector}]'
return f'{format_value(self.options, str)}[{self.selector}]'
def __repr__(self) -> str:
return f'{__name__}.Switch({self.options!r}, selector={self.selector!r}, default={self.default_key!r}, fallback={self.default_val!r})'
@ -58,33 +70,43 @@ class If(G[T,V], Type[U[T, V]]):
def parse(self, context: Context, stream: Stream) -> U[T, V]:
if context.get(self.cond):
return context.parse(to_type(self.true), stream)
child = to_type(self.true)
else:
return context.parse(to_type(self.false), stream)
child = to_type(self.false)
with context.enter(None, child):
return context.parse(child, stream)
def dump(self, context: Context, stream: Stream, value: U[T, V]) -> None:
if context.get(self.cond):
context.dump(to_type(self.true), stream, value)
child = to_type(self.true)
else:
context.dump(to_type(self.false), stream, value)
child = to_type(self.false)
with context.enter(None, child):
return context.dump(child, stream, value)
def sizeof(self, context: Context, value: O[U[T, V]]) -> O[Pos]:
if context.peek(self.cond):
return context.sizeof(to_type(self.true), value)
child = to_type(self.true)
else:
return context.sizeof(to_type(self.false), value)
child = to_type(self.false)
with context.enter(None, child):
return context.sizeof(child, value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[U[T, V]]) -> O[Pos]:
if context.peek(self.cond):
return context.offsetof(to_type(self.true), path, value)
child = to_type(self.true)
else:
return context.offsetof(to_type(self.false), path, value)
child = to_type(self.false)
with context.enter(None, child):
return context.offsetof(child, path, value)
def default(self, context: Context) -> U[T, V]:
if context.peek(self.cond):
return context.default(to_type(self.true))
child = to_type(self.true)
else:
return context.default(to_type(self.false))
child = to_type(self.false)
with context.enter(None, child):
return context.default(self.child)
def __str__(self) -> str:
return f'({self.cond} ? {self.true} : {self.false})'

View File

@ -5,6 +5,8 @@ from ..core.meta import Wrapper
from ..core.expr import BaseExpr
T = TypeVar('T')
class Nothing(Type[None]):
def __init__(self) -> None:
pass
@ -27,7 +29,81 @@ class Nothing(Type[None]):
def __repr__(self) -> str:
return '{__name__}.Nothing()'
class Implied(G[T], Type[T]):
""" Parse/dump nothing, yield value. """
__slots__ = ('value',)
def __init__(self, value: U[BaseExpr[T], T]) -> None:
self.value = value
def parse(self, context: Context, stream: Stream) -> T:
return context.get(self.value)
def dump(self, context: Context, stream: Stream, value: T) -> None:
context.put(self.value, value)
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
return 0
def default(self, context: Context) -> T:
return context.peek(self.value)
def __str__(self) -> str:
return f'={self.value}'
def __repr__(self) -> str:
return f'{__name__}.Static({self.value!r})'
class Ignored(G[T], Wrapper[T]):
""" Parse/dump something, yield nothing. """
def __init__(self, child: Type[T]) -> None:
super().__init__(child)
def parse(self, context: Context, stream: Stream) -> None:
super().parse(context, stream)
def dump(self, context: Context, stream: Stream, value: None) -> None:
super().dump(context, stream, super().default(context))
def default(self, context: Context) -> None:
return None
def __str__(self) -> str:
return f'(void){super().__str__()}'
def __repr__(self) -> str:
return f'{__name__}.Ignored({super().__repr__()}'
class Pad(Type[None]):
""" Seek something, yield nothing. """
__slots__ = ('amount', 'value')
def __init__(self, amount=0, value=b'\x00'):
self.amount = amount
self.value = value
def parse(self, context: Context, stream: Stream) -> None:
stream.seek(context.get(self.amount), os.SEEK_CUR)
def dump(self, context: Context, stream: Stream, value: None) -> None:
value = stretch(context.get(self.value), context.get(self.amount))
stream.write(value)
def sizeof(self, context: Context) -> O[Pos]:
return context.peek(self.amount)
def default(self, context: Context) -> None:
return None
def __str__(self) -> str:
return f'[padding: {self.amount}]'
def __repr__(self) -> str:
return f'{__name__}.Pad({self.amount!r}, value={self.value!r})'
class Data(Type[bytes]):
""" Parse/dump and yield bytes. """
__slots__ = ('size',)
def __init__(self, size: U[D, O[int]] = None) -> None:
@ -41,6 +117,7 @@ class Data(Type[bytes]):
def dump(self, context: Context, stream: Stream, value: bytes) -> None:
stream.write(value)
context.put(self.size, len(value))
def default(self, context: Context) -> bytes:
size = context.peek(self.size)
@ -49,6 +126,8 @@ class Data(Type[bytes]):
return bytes(size)
def sizeof(self, context: Context, value: O[bytes]) -> O[Pos]:
if value is not None:
return len(value)
return context.peek(self.size)
def __str__(self) -> str:
@ -59,8 +138,8 @@ class Data(Type[bytes]):
data = Data()
class Bits(Type[int]):
""" Parse/dump and yield bits. """
__slots__ = ('amount',)
def __init__(self, amount: U[D, int] = 0) -> None:
@ -90,47 +169,3 @@ class Bits(Type[int]):
bit = Bits(1)
nibble = Bits(4)
T = TypeVar('T')
class Static(G[T], Type[T]):
def __init__(self, value: U[BaseExpr[T], T]) -> None:
self.value = value
def parse(self, context: Context, stream: Stream) -> T:
return context.get(self.value)
def dump(self, context: Context, stream: Stream, value: T) -> None:
context.put(self.value, value)
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
return 0
def default(self, context: Context) -> T:
return context.peek(self.value)
def __str__(self) -> str:
return f'={self.value}'
def __repr__(self) -> str:
return f'{__name__}.Static({self.value!r})'
class Ignored(G[T], Wrapper[T]):
def __init__(self, child: Type[T]) -> None:
super().__init__(child)
def parse(self, context: Context, stream: Stream) -> None:
super().parse(context, stream)
def emit(self, context: Context, stream: Stream, value: None) -> None:
super().emit(context, stream, super().default(context))
def default(self, context: Context) -> None:
return None
def __str__(self) -> str:
return f'(void){super().__str__()}'
def __repr__(self) -> str:
return f'{__name__}.Ignored({super().__repr__()}'

View File

@ -1,10 +1,10 @@
from typing import Any, Generic as G, TypeVar, Union as U, Optional as O, Sequence
import os
import errno
from ..core.base import Type, Context, PossibleDynamic, PathElement
from ..core.io import Stream, Segment, Pos
from ..core.base import Type, Context, PossibleDynamic, PathElement, to_type
from ..core.io import Stream, Segment, Pos, add_sizes
from ..core.meta import Wrapper
from ..core.util import stretch, seeking
from ..core.util import stretch, seeking, find_overlap
class SizedStream:
@ -61,9 +61,10 @@ class SizedStream:
T = TypeVar('T')
class Sized(G[T], Wrapper[T]):
def __init__(self, child: Type[T], limit: U[Pos, PossibleDynamic]):
def __init__(self, child: Type[T], limit: U[Pos, PossibleDynamic], hard=False) -> None:
super().__init__(child)
self.limit = limit
self.hard = hard
def parse(self, context: Context, stream: Stream) -> T:
limit = max(0, context.get(self.limit))
@ -72,14 +73,154 @@ class Sized(G[T], Wrapper[T]):
stream.seek(start + limit, os.SEEK_SET)
return value
def dump(self, context: Context, stream: Stream, value: O[T]) -> None:
limit = max(0, context.get(self.limit))
start = stream.tell()
super().dump(context, SizedStream(stream, limit), value)
stream.seek(start + limit, os.SEEK_SET)
def dump(self, context: Context, stream: Stream, value: T) -> None:
hard = context.get(self.hard)
if hard:
limit = max(0, context.get(self.limit))
start = stream.tell()
super().dump(context, SizedStream(stream, limit), value)
stream.seek(start + limit, os.SEEK_SET)
else:
start = stream.tell()
super().dump(context, stream, value)
size = stream.tell() - start
context.put(self.limit, size)
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
return context.peek(self.limit)
hard = context.peek(self.hard)
if hard:
return context.peek(self.limit)
else:
return super().sizeof(context, value)
class TerminatedStream:
def __init__(self, stream: Stream, terminator: bytes, included: bool, blocksize: int = 8192) -> None:
self._stream = stream
self._terminator = terminator
self._included = included
self._end_pos = None
self._blocksize = blocksize
def read(self, n: int = -1, *, bits: bool = False) -> U[bytes, int]:
if bits:
raise ValueError('terminated streams can not use bit-level reads')
if self._end_pos is not None:
if self._stream.tell() >= self._end_pos:
raise EOFError
remaining = self._end_pos - self._stream.tell()
if n < 0:
n = remaining
else:
n = min(n, remaining)
value = b''
while n < 0 or len(value) < n:
try:
p = self._stream.tell()
v = self._stream.read(n if n >= 0 else self._blocksize)
except EOFError:
if n < 0:
self._stream.seek(p, os.SEEK_SET)
v = self._stream.read(-1)
else:
raise
if not v:
break
value += v
# find full terminator or start of terminator at the end
if self._end_pos is None:
termpos = find_overlap(value, self._terminator, len(value) - len(v))
if termpos is not None:
# need to read more data?
termrem = len(self._terminator) - (len(value) - termpos)
if termrem > 0:
p = self._stream.tell()
try:
value += self._stream.read(termrem)
except EOFError:
pass
if value.endswith(self._terminator):
termrem = 0
else:
self._stream.seek(p, os.SEEK_SET)
if termrem <= 0:
# terminator found, reset overread data
self._stream.seek(-(len(value) - (termpos + len(self._terminator))), os.SEEK_CUR)
self._end_pos = self._stream.tell() - len(self._terminator)
if self._included:
termpos += len(self._terminator)
self._end_pos += len(self._terminator)
value = value[:termpos]
break
if n > 0 and len(value) != n:
raise EOFError
return value
def seek(self, pos: U[int, float], whence: int = os.SEEK_SET) -> None:
if self._end_pos is not None:
if whence == os.SEEK_SET:
pos = min(pos, self._end_pos)
elif whence == os.SEEK_CUR:
pos = min(pos + self._stream.tell(), self._end_pos)
elif whence == os.SEEK_END:
pos += self._end_pos
whence = os.SEEK_SET
return self._stream.seek(pos, whence)
def tell(self) -> U[int, float]:
pos = self._stream.tell()
if self._end_pos is not None:
pos = min(pos, self._end_pos)
return pos
def __getattr__(self, attr: str) -> Any:
return getattr(self._stream, attr)
class Terminated(G[T], Wrapper[T]):
def __init__(self, child: Type[T], terminator: U[bytes, PossibleDynamic], required: U[bool, PossibleDynamic] = True, included: U[bool, PossibleDynamic] = False, blocksize: int = 8192) -> None:
super().__init__(child)
self.terminator = terminator
self.required = required
self.included = included
self.blocksize = blocksize
def parse(self, context: Context, stream: Stream) -> T:
terminator = context.get(self.terminator)
required = context.get(self.required)
included = context.get(self.included)
tstream = TerminatedStream(stream, terminator, included, blocksize=self.blocksize)
value = super().parse(context, tstream)
if required and tstream._end_pos is None:
raise EOFError(f'terminator {terminator} not found in stream')
return value
def dump(self, context: Context, stream: Stream, value: T) -> None:
terminator = context.get(self.terminator)
required = context.get(self.required)
included = context.get(self.included)
tstream = TerminatedStream(stream, terminator, included, blocksize=self.blocksize)
super().dump(context, tstream, value)
if required and not included:
stream.write(terminator)
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
terminator = context.peek(self.terminator)
required = context.peek(self.required)
included = context.peek(self.included)
if not required:
return None
size = super().sizeof(context, value)
if size is None:
return None
if not included:
size = add_sizes(size, context.to_size(len(terminator)))
return size
class Ref(G[T], Wrapper[T]):
@ -120,7 +261,7 @@ class Ref(G[T], Wrapper[T]):
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]:
segment = context.peek(self.segment) or context.params.segments['refs']
with context.enter_segment(segment):
return super().contextof(context, path, value)
return super().offsetof(context, path, value)
def __str__(self) -> str:
indicator = {os.SEEK_SET: '', os.SEEK_CUR: '+', os.SEEK_END: '-'}.get(self.whence, self.whence)
@ -131,6 +272,86 @@ class Ref(G[T], Wrapper[T]):
return f'{__name__}.Ref({super().__repr__()}, pos={self.pos!r}, whence={self.whence!r}, segment={self.segment!r})'
LazyUnresolved = object()
class LazyEntry(G[T]):
__slots__ = ('type', 'context', 'stream', 'positions', 'value')
def __init__(self, type: Type[T], context: Context, stream: Stream, pos: dict[Segment, Pos], value=LazyUnresolved) -> None:
self.type = type
self.context = context.copy()
self.stream = stream
self.positions = pos
self.value = value
def __call__(self) -> T:
if self.value is LazyUnresolved:
old_pos = {}
for segment, pos in self.positions.items():
old_pos[segment] = segment.pos
segment.pos = pos
try:
with self.context.enter_segment(self.context.segment, self.stream, self.positions[self.context.segment]) as stream:
self.value = self.context.parse(self.type, stream)
finally:
for segment, pos in old_pos.items():
segment.pos = pos
return self.value
def __str__(self) -> str:
if self.value is LazyUnresolved:
return f'~{self.type}'
return f'~{self.value}'
def __repr__(self) -> str:
if self.value is LazyUnresolved:
return f'{__name__}.{self.__class__.__name__}({self.type!r})'
return f'{__name__}.{self.__class__.__name__}(value={self.valu!r})'
class Lazy(G[T], Type[LazyEntry[T]]):
__slots__ = ('type',)
def __init__(self, type: Type[T]) -> None:
self.type = type
def parse(self, context: Context, stream: Stream) -> LazyEntry[T]:
sizes = context.to_size(self.sizeof(context, None))
pos = {}
base_size = sizes.pop(context.segment)
if base_size is None:
raise ValueError(f'lazy type size in current segment must be known at parse time')
pos[context.segment] = stream.tell()
stream.seek(base_size, os.SEEK_CUR)
for segment, size in sizes.items():
if size is None:
raise ValueError(f'lazy type size in segment {segment.name} must be known at parse time')
with context.enter_segment(segment, stream) as ss:
pos[segment] = ss.tell()
ss.seek(size, os.SEEK_CUR)
return LazyEntry(to_type(self.type), context, stream, pos)
def dump(self, context: Context, stream: Stream, value: LazyEntry[T]) -> None:
return context.dump(to_type(self.type), stream, value())
def sizeof(self, context: Context, value: O[LazyEntry[T]]) -> O[Pos]:
return context.sizeof(to_type(self.type), value() if value is not None else value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[LazyEntry[T]]) -> O[Pos]:
return context.offsetof(to_type(self.type), path, value() if value is not None else value)
def default(self, context: Context) -> LazyEntry[T]:
return LazyEntry(self.type, context, None, {}, value=context.default(to_type(self.type)))
def __str__(self) -> str:
return f'~{self.type}'
def __repr__(self) -> str:
return f'{__name__}.{self.__class__.__name__}({self.type!r})'
class AlignTo(G[T], Wrapper[T]):
__slots__ = ('alignment', 'value')

View File

@ -15,7 +15,7 @@ class Int(Type[int]):
def parse(self, context: Context, stream: Stream) -> int:
n = context.get(self.bits)
bs = stream.read(n // 8)
bs = stream.read(n // 8 if n is not None else -1)
return int.from_bytes(bs, byteorder=context.get(self.endian).to_python(), signed=context.get(self.signed))
def dump(self, context: Context, stream: Stream, value: U[int, float]) -> None:
@ -40,7 +40,8 @@ class Int(Type[int]):
def __str__(self) -> str:
endian = {Endian.Big: 'be', Endian.Little: 'le'}.get(self.endian, self.endian) if self.bits != 8 else ''
sign = {True: '', False: 'u'}.get(self.signed, self.signed)
return f'{sign}int{self.bits}{endian}'
bits = self.bits if self.bits is not None else ''
return f'{sign}int{bits}{endian}'
def __repr__(self) -> str:
return f'{__name__}.Int({self.bits!r}, endian={self.endian!r}, signed={self.signed!r})'
@ -73,6 +74,11 @@ uint64 = \
uint64le = Int(64, endian=Endian.Little, signed=False)
uint64be = Int(64, endian=Endian.Big, signed=False)
intle = Int(None, endian=Endian.Little, signed=True)
intbe = Int(None, endian=Endian.Big, signed=True)
uintle = Int(None, endian=Endian.Little, signed=False)
uintbe = Int(None, endian=Endian.Big, signed=False)
T = TypeVar('T')
@ -83,7 +89,7 @@ class Bool(G[T], Mapped[T, bool]):
repr=f'<{__name__}.Bool({child!r}, true: {true_value!r}, false: {false_value!r})>',
)
bool = Bool(uint8)
bool8 = Bool(uint8)
class Float(Type[float]):
@ -128,7 +134,6 @@ float16le = \
binary16le = Float(16, endian=Endian.Little)
float16be = \
binary16be = Float(16, endian=Endian.Big)
float_ = \
float32 = \
binary32 = \
float32le = \

View File

@ -59,7 +59,7 @@ class Arr(G[T], Type[List[T]]):
for i, elem in enumerate(value):
c = to_type(child)
with context.enter(i, c):
context.dump(child, stream, elem)
context.dump(c, stream, elem)
context.put(self.count, len(value))

View File

@ -2,57 +2,80 @@ import enum
from typing import Optional as O, Union as U
from ..core.base import PossibleDynamic as D, Type, Context
from ..core.io import Stream
from ..core.util import stretch
from .num import uint8
from .io import Terminated
class StrType(enum.Enum):
Raw = enum.auto()
ZeroTerminated = C = enum.auto()
LengthPrefixed = Pascal = enum.auto()
class Str(Type[str]):
def __init__(self, type: U[D, StrType], length: U[D, O[int]] = None, encoding: U[D, str] = 'utf-8', char_size: U[D, int] = 1, length_type: U[D, Type] = uint8, terminator: U[D, bytes] = b'\x00') -> None:
self.type = type
def __init__(self, length: U[D, O[int]] = None, encoding: U[D, str] = 'utf-8', char_size: U[D, int] = 1) -> None:
self.length = length
self.encoding = encoding
self.char_size = char_size
self.length_type = length_type
self.terminator = terminator
def parse(self, context: Context, stream: Stream) -> str:
type = context.get(self.type)
length = context.get(self.length)
encoding = context.get(self.encoding)
char_size = context.get(self.char_size)
if type == StrType.Raw:
if length is None:
data = stream.read()
else:
data = stream.read(length * char_size)
elif type == StrType.C:
terminator = context.get(self.terminator)
if len(terminator) != char_size:
terminator = stretch(terminator, char_size)
data = b''
while True:
d = stream.read(char_size)
if d == terminator:
break
data += d
if length is not None and len(data) >= length * char_size:
break
elif type == StrType.Pascal:
length_type = context.get(self.length_type)
plength = context.parse(length_type, stream)
if length is None:
data = stream.read()
else:
data = stream.read(length * char_size)
return data.decode(encoding)
def dump(self, context: Context, stream: Stream, value: str) -> None:
pass
encoding = context.get(self.encoding)
char_size = context.get(self.char_size)
bs = value.encode(encoding)
length = len(bs) // char_size
stream.write(bs)
context.put(self.length, length)
def default(self, context: Context) -> str:
return ''
def sizeof(self, context: Context, value: O[str]) -> O[int]:
return 0
if value is not None:
return len(value.encode(context.peek(self.encoding)))
return None
def __str__(self) -> str:
if self.length is not None:
length = f'({self.length})'
else:
length = ''
if self.encoding != 'utf-8':
encoding = f'.{self.encoding}'
else:
encoding = ''
return f'str{self.encoding}{length}'
def __repr__(self) -> str:
return f'{__name__}.Str(length={self.length!r}, encoding={self.encoding!r}, char_size={self.char_size!r})'
class CStr(Terminated[str]):
def __init__(self, *args, terminator_required=True, **kwargs) -> None:
terminator = '\x00'.encode(kwargs.get('encoding', 'utf-8'))
super().__init__(Str(*args, **kwargs), terminator, required=terminator_required, blocksize=16)
def __str__(self) -> str:
if self.child.encoding == 'utf-8':
return 'cstr'
if self.child.encoding == 'utf-16le':
return 'wcstr'
if self.child.encoding == 'sjis':
return 'jcstr'
return str(self.child)
def __repr__(self) -> str:
return f'<{__name__}.CStr({self.child!r})>'
cstr = \
utf8cstr = CStr()
wcstr = \
utf16cstr = CStr(encoding='utf-16le', char_size=2)
jcstr = \
sjiscstr = CStr(encoding='sjis')

View File

@ -11,7 +11,7 @@ from contextlib import contextmanager
import sx
from ..core import to_type
from ..core.base import Context, Type, PathElement
from ..core.io import Stream, Pos, add_sizes
from ..core.io import Stream, Pos, add_sizes, max_sizes
from ..core.util import indent, format_value, get_annot_locations
from ..core.meta import Generic, TypeSource
from ..core.expr import ProxyExpr
@ -24,7 +24,7 @@ class ProxyStruct:
self._sx_fields_ = {}
def __getattr__(self, name: str) -> ProxyExpr:
e = ProxyExpr(name)
e = ProxyExpr(name, 'self')
try:
self._sx_fields_[name].append(e)
except KeyError:
@ -110,7 +110,16 @@ class StructType(G[T], Type[T]):
pos = stream.tell()
with self.enter():
for name, type in self.fields.items():
if self.union:
if value._sx_lastset_:
fields = [value._sx_lastset_]
else:
fields = next(self.fields)
else:
fields = list(self.fields)
for name in fields:
type = self.fields[name]
with context.enter(name, type):
if self.union:
stream.seek(pos, os.SEEK_SET)
@ -142,15 +151,22 @@ class StructType(G[T], Type[T]):
elem = getattr(value, field)
else:
elem = None
with context.enter(field, child):
size = context.sizeof(child, elem)
c = to_type(child, field)
with context.enter(field, c):
size = context.sizeof(c, elem)
sizes.append(size)
return sizes
def sizeof(self, context: Context, value: O[T]) -> O[Mapping[str, int]]:
with self.enter():
sizes = self.get_sizes(context, value, None)
return add_sizes(*sizes) if sizes else 0
if sizes:
if self.union:
return max_sizes(*sizes)
else:
return add_sizes(*sizes)
else:
return 0
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]:
if not path:
@ -165,7 +181,10 @@ class StructType(G[T], Type[T]):
child = self.fields[field]
with self.enter():
sizes = self.get_sizes(context, value, field)
if self.union:
sizes = []
else:
sizes = self.get_sizes(context, value, field)
if path:
with context.enter(field, child):
sizes.append(context.offsetof(child, path, getattr(value, field) if value is not None else None))
@ -222,6 +241,7 @@ class Struct:
if parent:
generics = parent.generics + generics
bound = parent.bound + bound
kwargs.setdefault('union', parent.union)
# Get all annotations
annots = {}
@ -269,10 +289,11 @@ class Struct:
item = (item,)
subtype = cls._sx_type_[item]
new_name = '{}[{}]'.format(cls.__name__, ', '.join(str(g) for g in subtype.bound))
new = type(new_name, (cls,), {})
new = type(new_name, (cls,), {
'__module__': cls.__module__,
'__slots__': cls.__slots__,
})
new._sx_type_ = subtype
new.__slots__ = cls.__slots__
new.__module__ = cls.__module__
subtype.cls = new
return new
@ -314,3 +335,16 @@ class Struct:
def __repr__(self) -> str:
return self._format_(repr)
class Union(Struct, union=True, inject=False):
_sx_lastset_ = ''
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
if kwargs:
super().__setattr__('_sx_lastset_', list(kwargs)[-1])
def __setattr__(self, name: str, value: Any) -> None:
super().__setattr__(name, value)
super().__setattr__('_sx_lastset_', name)

View File

@ -1,9 +1,10 @@
import enum
import io
from typing import Optional as O, Generic as G, Union as U, TypeVar, Callable, Sequence, Mapping, Type as Ty
from ..core.base import Type, Context, PathElement
from ..core.io import Stream, Pos
from ..core.io import Stream, Pos, to_stream
from ..core.meta import Wrapper
from ..core import to_type
from ..core import to_type, dump
from .data import Data
@ -19,7 +20,74 @@ class Default(G[T], Wrapper[T]):
def default(self, context: Context) -> T:
return self._default
class Transform(G[T, V], Type[V]):
Preprocessor = U[Callable[[bytes], bytes], Callable[[bytes, Context], bytes]]
class Preproc(G[T], Wrapper[T]):
def __init__(self, child: Type[T], parse: Preprocessor, dump: O[Preprocessor] = None, sizeof: O[U[Callable[[O[T]], Pos], Callable[[O[T], Context], Pos]]] = None, context: bool = False, bits: bool = False, str: O[str] = None, repr: O[str] = None) -> None:
super().__init__(child)
self.on_parse = parse
self.on_dump = dump or self.on_parse
self.on_sizeof = sizeof
self.context = context
self.bits = bits
self.on_str = str or repr
self.on_repr = repr or str
def parse(self, context: Context, stream: Stream) -> T:
size = self.sizeof(context, None)
if size is None:
raise ValueError(f'{self.__class__.__name__} needs a sized child')
bits = context.get(self.bits)
if bits:
raw = stream.read(int(size * 8), bits=True)
else:
raw = stream.read(size)
ctx = context.get(self.context)
proc = self.on_parse(raw, context) if ctx else self.on_parse(raw)
return super().parse(context, to_stream(proc))
def dump(self, context: Context, stream: Stream, value: T) -> None:
raw_stream = io.BytesIO()
super().on_dump(context, raw_stream, value)
raw = raw_stream.getvalue()
ctx = context.get(self.context)
proc = self.on_dump(raw, context) if ctx else self.on_dump(raw)
size = self.sizeof(context, value)
bits = context.get(self.bits)
if bits:
stream.write(proc, bits=int(size * 8))
else:
stream.write(proc)
def sizeof(self, context: Context, value: O[T]) -> O[Pos]:
if value is not None:
raw_stream = io.BytesIO()
dump(context.peek(self.child), value, raw_stream)
value = raw_stream.getvalue()
ctx = context.peek(self.context)
if self.on_sizeof:
size = self.on_sizeof(value, context) if ctx else self.on_sizeof(value)
if size is None:
size = super().sizeof(context, value)
return size
def __str__(self) -> str:
if self.on_str is not None:
return self.on_str
return f'λ({self.child})'
def __repr__(self) -> str:
if self.on_repr is not None:
return self.on_repr
return f'{__name__}.Preproc({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, sizeof={self.on_sizeof!r}, context={self.context!r}, bits={self.bits!r}, str={self.on_str!r}, repr={self.on_repr!r})'
class Proc(G[T, V], Type[V]):
def __init__(self, child: Type[T], parse: U[Callable[[T], V], Callable[[T, Context], V]], dump: U[Callable[[V], T], Callable[[V, Context], T]], context: bool = False, str: O[str] = None, repr: O[str] = None) -> None:
self.child = child
self.on_parse = parse
@ -29,25 +97,35 @@ class Transform(G[T, V], Type[V]):
self.on_repr = repr or str
def parse(self, context: Context, stream: Stream) -> V:
value = context.parse(to_type(self.child), stream)
child = to_type(self.child)
with context.enter(None, child):
value = context.parse(child, stream)
return self.on_parse(value, context) if self.context else self.on_parse(value)
def dump(self, context: Context, stream: Stream, value: V) -> None:
value = self.on_dump(value, context) if self.context else self.on_dump(value)
context.dump(to_type(self.child), stream, value)
child = to_type(self.child)
with context.enter(None, child):
context.dump(child, stream, value)
def sizeof(self, context: Context, value: O[V]) -> O[int]:
if value is not None:
value = self.on_dump(value, context) if self.context else self.on_dump(value)
return context.sizeof(to_type(self.child), value)
child = to_type(self.child)
with context.enter(None, child):
return context.sizeof(child, value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[V]) -> O[int]:
if value is not None:
value = self.on_dump(value, context) if self.context else self.on_dump(value)
return context.offsetof(to_type(self.child), path, value)
child = to_type(self.child)
with context.enter(None, child):
return context.offsetof(child, path, value)
def default(self, context: Context) -> V:
value = context.default(to_type(self.child))
child = to_type(self.child)
with context.enter(None, child):
value = context.default(child)
return self.on_parse(value, context) if self.context else self.on_parse(value)
def __str__(self) -> str:
@ -58,10 +136,10 @@ class Transform(G[T, V], Type[V]):
def __repr__(self) -> str:
if self.on_repr is not None:
return self.on_repr
return f'{__name__}.Transform({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, context={self.context!r}, str={self.on_str!r}, repr={self.on_repr!r})'
return f'{__name__}.Proc({self.child!r}, parse={self.on_parse!r}, dump={self.on_dump!r}, context={self.context!r}, str={self.on_str!r}, repr={self.on_repr!r})'
class Mapped(G[T, V], Transform[T, V]):
class Mapped(G[T, V], Proc[T, V]):
def __init__(self, child: Type[T], mapping: Mapping[T, V], str: O[str] = None, repr: O[str] = None) -> None:
reverse = {v: k for k, v in mapping.items()}
super().__init__(child,
@ -75,7 +153,7 @@ class Mapped(G[T, V], Transform[T, V]):
E = TypeVar('E', bound=enum.Enum)
class Enum(G[T, E], Transform[T, E]):
class Enum(G[T, E], Proc[T, E]):
def __init__(self, enum: Ty[E], child: Type[T]) -> None:
super().__init__(child,
parse=enum,