From df7a5e5a6201659d8a742073a63d44399524e57d Mon Sep 17 00:00:00 2001 From: Shiz Date: Tue, 23 Aug 2022 02:24:58 +0200 Subject: [PATCH] pass PosInfo to sizeof/offsetof, implement Align(ed)To.sizeof/offsetof() --- sx/core/__init__.py | 4 +- sx/core/base.py | 16 +++---- sx/core/io.py | 18 +++---- sx/core/meta.py | 26 +++++------ sx/types/control.py | 18 +++---- sx/types/data.py | 12 ++--- sx/types/io.py | 103 ++++++++++++++++++++++++++++------------- sx/types/num.py | 6 +-- sx/types/seq.py | 37 +++++++++------ sx/types/str.py | 4 +- sx/types/struct.py | 20 ++++---- sx/types/transforms.py | 22 ++++----- 12 files changed, 168 insertions(+), 118 deletions(-) diff --git a/sx/core/__init__.py b/sx/core/__init__.py index 1245350..cfd17bc 100644 --- a/sx/core/__init__.py +++ b/sx/core/__init__.py @@ -48,7 +48,7 @@ def sizeof(type: PossibleType, value: O[Any] = None, params: O[Params] = None, s ctx = Context(type, value, params=params) try: with resolving(context, ctx.params.user): - sizes = ceil_sizes(ctx.sizeof(type, value)) + sizes = ceil_sizes(ctx.sizeof(type, {}, value)) except Error: raise except Exception as e: @@ -69,7 +69,7 @@ def offsetof(type: PossibleType, path: Sequence[PathElement], value: O[Any] = No ctx = Context(type, value, params=params) try: with resolving(context, ctx.params.user): - offsets = ctx.offsetof(type, path, value) + offsets = ctx.offsetof(type, {}, path, value) except Error: raise except Exception as e: diff --git a/sx/core/base.py b/sx/core/base.py index 86360a4..71e87d8 100644 --- a/sx/core/base.py +++ b/sx/core/base.py @@ -8,7 +8,7 @@ from typing import ( ) from .util import seeking -from .io import Segment, Stream, Pos +from .io import Segment, Stream, Pos, PosInfo T = TypeVar('T') @@ -141,7 +141,7 @@ class Context: return size def segment_size(self, segment: Segment) -> O[Pos]: - sizes = Context(self.root, self.value, params=self.params, reset=False).sizeof(self.root, self.value) + sizes = Context(self.root, self.value, params=self.params, reset=False).sizeof(self.root, {}, self.value) return sizes.get(segment, None) def format_path(self) -> str: @@ -192,11 +192,11 @@ class Context: def dump(self, type: 'Type[PT]', stream: Stream, value: PT) -> None: return type.dump(self, stream, value) - def sizeof(self, type: 'Type[PT]', value: O[PT] = None) -> Dict[Segment, Pos]: - return self.to_size(type.sizeof(self, value)) + def sizeof(self, type: 'Type[PT]', start: PosInfo, value: O[PT] = None) -> PosInfo: + return self.to_size(type.sizeof(self, start, value)) - def offsetof(self, type: 'Type[PT]', path: Sequence[PathElement], value: O[PT] = None) -> Dict[Segment, Pos]: - return self.to_size(type.offsetof(self, path, value)) + def offsetof(self, type: 'Type[PT]', start: PosInfo, path: Sequence[PathElement], value: O[PT] = None) -> PosInfo: + return self.to_size(type.offsetof(self, start, path, value)) def default(self, type: 'Type[PT]') -> PT: return type.default(self) @@ -211,10 +211,10 @@ class Type(G[PT]): def dump(self, context: Context, stream: Stream, value: PT) -> None: raise NotImplementedError - def sizeof(self, context: Context, value: O[PT]) -> U[Mapping[str, int], O[int]]: + def sizeof(self, context: Context, start: PosInfo, value: O[PT]) -> U[Mapping[str, int], O[int]]: return None - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[PT]) -> O[int]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[PT]) -> O[int]: if path: return None else: diff --git a/sx/core/io.py b/sx/core/io.py index 161337c..4e8f3cf 100644 --- a/sx/core/io.py +++ b/sx/core/io.py @@ -74,7 +74,7 @@ class Stream: except IndexError: raise EOFError self.bit_pos = 0 - + nb = min(8 - self.bit_pos, n) if self.bit_endian == Endian.Big: val = bits(self.bit_val, self.bit_pos, nb) @@ -87,7 +87,7 @@ class Stream: self.bit_pos = self.bit_val = None return val, n - nb - + def read(self, n: int = -1, bits=False) -> bytes: if bits: val, nl = self.read_bits(n) @@ -215,7 +215,9 @@ class Segment: return f'<{__name__}.{self.__class__.__name__}: {self.name}>' -def process_sizes(s: Sequence[Mapping[Segment, Pos]], cb: Callable[[Pos, Pos], Pos]) -> Dict[Segment, O[Pos]]: +PosInfo = Mapping[Segment, O[Pos]] + +def process_sizes(s: Sequence[PosInfo], cb: Callable[[Pos, Pos], Pos]) -> PosInfo: sizes: Dict[Segment, O[Pos]] = {} for prev in s: for k, n in prev.items(): @@ -226,17 +228,17 @@ def process_sizes(s: Sequence[Mapping[Segment, Pos]], cb: Callable[[Pos, Pos], P sizes[k] = cb(p, n) return sizes -def min_sizes(*s: Mapping[Segment, Pos]) -> Dict[Segment, O[Pos]]: +def min_sizes(*s: PosInfo) -> PosInfo: return process_sizes(s, min) -def max_sizes(*s: Mapping[Segment, Pos]) -> Dict[Segment, O[Pos]]: +def max_sizes(*s: PosInfo) -> PosInfo: return process_sizes(s, max) -def add_sizes(*s: Mapping[Segment, Pos]) -> Dict[Segment, O[Pos]]: +def add_sizes(*s: PosInfo) -> PosInfo: return process_sizes(s, lambda a, b: a + b) -def ceil_sizes(s: Mapping[Segment, O[Pos]]) -> Dict[Segment, O[int]]: - d: Dict[Segment, O[int]] = {} +def ceil_sizes(s: PosInfo) -> PosInfo: + d: PosInfo = {} for k, v in s.items(): if v is not None: d[k] = math.ceil(v) diff --git a/sx/core/meta.py b/sx/core/meta.py index 145b6c1..e8c99c1 100644 --- a/sx/core/meta.py +++ b/sx/core/meta.py @@ -2,7 +2,7 @@ from typing import List, Optional as O, Generic as G, Sequence, Tuple, TypeVar, import os from .base import Type, Context, PathElement, Error, to_type -from .io import Stream, Segment, Pos +from .io import Stream, Segment, Pos, PosInfo from .expr import BaseExpr @@ -22,15 +22,15 @@ class Wrapper(G[T], Type[T]): with context.enter(None, child): context.dump(child, stream, value) - def sizeof(self, context: Context, value: O[T]) -> O[int]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[PosInfo]: child = to_type(self.child) with context.enter(None, child): - return context.sizeof(child, value) + return context.sizeof(child, start, value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[PosInfo]: child = to_type(self.child) with context.enter(None, child): - return context.offsetof(child, path, value) + return context.offsetof(child, start, path, value) def default(self, context: Context) -> T: child = to_type(self.child) @@ -85,19 +85,19 @@ class Generic(G[T], Type[T], BaseExpr[T]): with context.enter(None, child): context.dump(child, stream, value) - def sizeof(self, context: Context, value: O[T]) -> O[int]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[PosInfo]: if not self.stack: return None child = to_type(self.stack[-1]) with context.enter(None, child): - return context.sizeof(child, value) + return context.sizeof(child, start, value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[PosInfo]: if not self.stack: return None child = to_type(self.stack[-1]) with context.enter(None, child): - return context.offsetof(child, path, value) + return context.offsetof(child, start, path, value) def default(self, context: Context) -> T: if not self.stack: @@ -137,15 +137,15 @@ class TypeSource(G[T], Wrapper[T], BaseExpr[T]): self.stack.append((context, context.segment, stream, pos, value)) super().dump(context, stream, value) - def sizeof(self, context: Context, value: O[T]) -> None: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> PosInfo: for _ in range(self.count): self.pstack.append(value) - return super().sizeof(context, value) + return super().sizeof(context, start, value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> None: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> PosInfo: for _ in range(self.count): self.pstack.append(value) - return super().offsetof(context, path, value) + return super().offsetof(context, start, path, value) def default(self, context: Context) -> T: value = super().default(context) diff --git a/sx/types/control.py b/sx/types/control.py index f016ecf..7a1582b 100644 --- a/sx/types/control.py +++ b/sx/types/control.py @@ -1,6 +1,6 @@ from typing import Union as U, Generic as G, Optional as O, TypeVar, Mapping, Sequence from ..core.base import Type, Context, PathElement -from ..core.io import Stream, Pos +from ..core.io import Stream, Pos, PosInfo from ..core.util import format_value from ..core.expr import Expr from ..core import to_type @@ -41,15 +41,15 @@ class Switch(G[T, V], Type[T]): with context.enter(None, child): context.dump(child, stream, value) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[PosInfo]: child = self.get_value(context, peek=True) with context.enter(None, child): - return context.sizeof(child, value) + return context.sizeof(child, start, value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[PosInfo]: child = self.get_value(context, peek=True) with context.enter(None, child): - return context.offsetof(child, path, value) + return context.offsetof(child, start, value) def default(self, context: Context) -> T: child = self.get_value(context, peek=True) @@ -84,21 +84,21 @@ class If(G[T,V], Type[U[T, V]]): with context.enter(None, child): return context.dump(child, stream, value) - def sizeof(self, context: Context, value: O[U[T, V]]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[U[T, V]]) -> O[PosInfo]: if context.peek(self.cond): child = to_type(self.true) else: child = to_type(self.false) with context.enter(None, child): - return context.sizeof(child, value) + return context.sizeof(child, start, value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[U[T, V]]) -> O[Pos]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[U[T, V]]) -> O[PosInfo]: if context.peek(self.cond): child = to_type(self.true) else: child = to_type(self.false) with context.enter(None, child): - return context.offsetof(child, path, value) + return context.offsetof(child, start, path, value) def default(self, context: Context) -> U[T, V]: if context.peek(self.cond): diff --git a/sx/types/data.py b/sx/types/data.py index 33ed3af..8c14767 100644 --- a/sx/types/data.py +++ b/sx/types/data.py @@ -1,6 +1,6 @@ from typing import Optional as O, Union as U, Any, Generic as G, TypeVar from ..core.base import Type, Context, PossibleDynamic as D -from ..core.io import Stream, Pos +from ..core.io import Stream, Pos, PosInfo from ..core.meta import Wrapper from ..core.expr import BaseExpr @@ -17,7 +17,7 @@ class Nothing(Type[None]): def dump(self, context: Context, stream: Stream, value: None) -> None: pass - def sizeof(self, context: Context, value: None) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: None) -> O[PosInfo]: return 0 def default(self, context: Context) -> None: @@ -42,7 +42,7 @@ class Implied(G[T], Type[T]): def dump(self, context: Context, stream: Stream, value: T) -> None: context.put(self.value, value) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[PosInfo]: return 0 def default(self, context: Context) -> T: @@ -89,7 +89,7 @@ class Pad(Type[None]): value = stretch(context.get(self.value), context.get(self.amount)) stream.write(value) - def sizeof(self, context: Context) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo) -> O[PosInfo]: return context.peek(self.amount) def default(self, context: Context) -> None: @@ -125,7 +125,7 @@ class Data(Type[bytes]): size = 0 return bytes(size) - def sizeof(self, context: Context, value: O[bytes]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[bytes]) -> O[Pos]: if value is not None: return len(value) return context.peek(self.size) @@ -158,7 +158,7 @@ class Bits(Type[int]): def default(self, context: Context) -> int: return 0 - def sizeof(self, context: Context, value: O[int]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[int]) -> O[Pos]: return context.peek(self.amount) // 8 def __str__(self) -> str: diff --git a/sx/types/io.py b/sx/types/io.py index af27625..8d239c0 100644 --- a/sx/types/io.py +++ b/sx/types/io.py @@ -2,7 +2,7 @@ from typing import Any, Generic as G, TypeVar, Union as U, Optional as O, Sequen import os import errno from ..core.base import Type, Context, PossibleDynamic, PathElement, to_type -from ..core.io import Stream, Segment, Pos, add_sizes +from ..core.io import Stream, Segment, Pos, PosInfo, add_sizes from ..core.meta import Wrapper from ..core.util import stretch, seeking, find_overlap @@ -86,12 +86,12 @@ class Sized(G[T], Wrapper[T]): size = stream.tell() - start context.put(self.limit, size) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]: hard = context.peek(self.hard) if hard: return context.peek(self.limit) else: - return super().sizeof(context, value) + return super().sizeof(context, start, value) class TerminatedStream: @@ -219,13 +219,13 @@ class Terminated(G[T], Wrapper[T]): if required and not included: stream.write(terminator) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]: terminator = context.peek(self.terminator) required = context.peek(self.required) included = context.peek(self.included) if not required: return None - size = super().sizeof(context, value) + size = super().sizeof(context, start, value) if size is None: return None if not included: @@ -263,15 +263,15 @@ class Ref(G[T], Wrapper[T]): context.put(self.pos, pos) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[PosInfo]: segment = context.peek(self.segment) or context.params.segments['refs'] with context.enter_segment(segment): - return super().sizeof(context, value) + return super().sizeof(context, start, value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[PosInfo]: segment = context.peek(self.segment) or context.params.segments['refs'] with context.enter_segment(segment): - return super().offsetof(context, path, value) + return super().offsetof(context, start, path, value) def __str__(self) -> str: indicator = {os.SEEK_SET: '', os.SEEK_CUR: '+', os.SEEK_END: '-'}.get(self.whence, self.whence) @@ -325,7 +325,7 @@ class Lazy(G[T], Type[LazyEntry[T]]): self.type = type def parse(self, context: Context, stream: Stream) -> LazyEntry[T]: - sizes = context.to_size(self.sizeof(context, None)) + sizes = context.to_size(self.sizeof(context, context.pos_info(), None)) pos = {} base_size = sizes.pop(context.segment) @@ -346,11 +346,11 @@ class Lazy(G[T], Type[LazyEntry[T]]): def dump(self, context: Context, stream: Stream, value: LazyEntry[T]) -> None: return context.dump(to_type(self.type), stream, value()) - def sizeof(self, context: Context, value: O[LazyEntry[T]]) -> O[Pos]: - return context.sizeof(to_type(self.type), value() if value is not None else value) + def sizeof(self, context: Context, start: PosInfo, value: O[LazyEntry[T]]) -> O[Pos]: + return context.sizeof(to_type(self.type), start, value() if value is not None else value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[LazyEntry[T]]) -> O[Pos]: - return context.offsetof(to_type(self.type), path, value() if value is not None else value) + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[LazyEntry[T]]) -> O[Pos]: + return context.offsetof(to_type(self.type), start, path, value() if value is not None else value) def default(self, context: Context) -> LazyEntry[T]: return LazyEntry(self.type, context, None, {}, value=context.default(to_type(self.type))) @@ -363,34 +363,58 @@ class Lazy(G[T], Type[LazyEntry[T]]): class AlignTo(G[T], Wrapper[T]): - __slots__ = ('alignment', 'value') + __slots__ = ('alignment', 'value', 'offset') - def __init__(self, child: Type[T], alignment: U[PossibleDynamic, Pos], value: U[PossibleDynamic, bytes] = b'\x00') -> None: + def __init__(self, child: Type[T], alignment: U[PossibleDynamic, Pos], value: U[PossibleDynamic, bytes] = b'\x00', offset: U[PossibleDynamic, Pos, None] = 0) -> None: super().__init__(child) self.alignment = alignment self.value = value + self.offset = offset def parse(self, context: Context, stream: Stream) -> T: + start = stream.tell() value = super().parse(context, stream) align = context.get(self.alignment) - adjustment = stream.tell() % align + pos = stream.tell() + offset = context.get(self.offset) + if offset is None: + pos -= start + else: + pos -= offset + + adjustment = pos % align if adjustment: stream.seek(align - adjustment, os.SEEK_CUR) return value def dump(self, context: Context, stream: Stream, value: T) -> None: + start = stream.tell() super().dump(context, stream, value) align = context.get(self.alignment) - adjustment = stream.tell() % align + pos = stream.tell() + offset = context.get(self.offset) + if offset is None: + pos -= start + else: + pos -= offset + + adjustment = pos % align if adjustment: padding = stretch(context.get(self.value), align - adjustment) stream.write(padding) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: - # TODO - return None + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]: + align = context.peek(self.alignment) + offset = context.peek(self.offset) + + size = super().sizeof(context, start, value) + pos = add_sizes(start, size).get(context.segment, 0) + adjustment = (pos - offset) % align + if adjustment: + size = add_sizes(adjustment, size) + return size def __str__(self) -> str: return f'{super().__str__()}%{self.align}' @@ -400,35 +424,50 @@ class AlignTo(G[T], Wrapper[T]): class AlignedTo(G[T], Wrapper[T]): - __slots__ = ('alignment', 'value') + __slots__ = ('alignment', 'value', 'offset') - def __init__(self, child: Type[T], alignment: U[PossibleDynamic, Pos], value: U[PossibleDynamic, bytes] = b'\x00') -> None: + def __init__(self, child: Type[T], alignment: U[PossibleDynamic, Pos], value: U[PossibleDynamic, bytes] = b'\x00', offset: U[PossibleDynamic, Pos] = 0) -> None: super().__init__(child) self.alignment = alignment self.value = value + self.offset = offset def parse(self, context: Context, stream: Stream) -> T: align = context.get(self.alignment) - adjustment = stream.tell() % align + adjustment = (stream.tell() - context.get(self.offset)) % align if adjustment: stream.seek(align - adjustment, os.SEEK_CUR) return super().parse(context, stream) def dump(self, context: Context, stream: Stream, value: T) -> None: align = context.get(self.alignment) - adjustment = stream.tell() % align + adjustment = (stream.tell() - context.get(self.offset)) % align if adjustment: padding = stretch(context.get(self.value), align - adjustment) stream.write(padding) super().dump(context, stream, value) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: - # TODO - return None + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]: + currstart = start.get(context.segment, 0) + align = context.peek(self.alignment) + adjustment = (currstart - context.peek(self.offset)) % align + if adjustment: + size = context.to_size(align - adjustment) + start = add_sizes(start, size) + else: + size = context.to_size(0) + return add_sizes(size, super().sizeof(context, start, value)) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]: - # TODO - return None + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[Pos]: + currstart = start.get(context.segment, 0) + align = context.peek(self.alignment) + adjustment = (currstart - context.peek(self.offset)) % align + if adjustment: + size = context.to_size(align - adjustment) + start = add_sizes(start, size) + else: + size = context.to_size(0) + return add_sizes(size, super().offsetof(context, start, path, value)) def __str__(self) -> str: return f'{super().__str__()}%{self.align}' @@ -447,7 +486,7 @@ class Tell(Type[Pos]): def dump(self, context: Context, stream: Stream, value: Pos) -> None: pass - def sizeof(self, context: Context, value: O[Pos]) -> Pos: + def sizeof(self, context: Context, start: PosInfo, value: O[Pos]) -> Pos: return 0 def __str__(self) -> str: diff --git a/sx/types/num.py b/sx/types/num.py index 08009c8..c8d3827 100644 --- a/sx/types/num.py +++ b/sx/types/num.py @@ -1,7 +1,7 @@ import struct from typing import Optional as O, Union as U, Generic as G, TypeVar from ..core.base import Type, Context, PossibleDynamic -from ..core.io import Stream, Endian +from ..core.io import Stream, Endian, PosInfo from .transforms import Mapped @@ -31,7 +31,7 @@ class Int(Type[int]): def default(self, context: Context) -> int: return 0 - def sizeof(self, context: Context, value: O[int]) -> O[int]: + def sizeof(self, context: Context, start: PosInfo, value: O[int]) -> O[int]: size = context.peek(self.bits) if size is not None: size //= 8 @@ -114,7 +114,7 @@ class Float(Type[float]): def default(self, context: Context) -> float: return 0.0 - def sizeof(self, context: Context, value: O[float]) -> O[int]: + def sizeof(self, context: Context, start: PosInfo, value: O[float]) -> O[int]: size = context.peek(self.bits) if size is not None: size //= 8 diff --git a/sx/types/seq.py b/sx/types/seq.py index 8fd8c9d..a7fe23c 100644 --- a/sx/types/seq.py +++ b/sx/types/seq.py @@ -2,7 +2,7 @@ from typing import Optional as O, Union as U, Callable, Any, List, Sequence, Map from types import FunctionType from ..core.base import PossibleDynamic as D, Type, Context, PathElement -from ..core.io import Stream, add_sizes +from ..core.io import Stream, PosInfo, add_sizes from ..core import to_type @@ -63,7 +63,7 @@ class Arr(G[T], Type[List[T]]): context.put(self.count, len(value)) - def get_sizes(self, context: Context, value: O[List[T]], n: int) -> Mapping[str, int]: + def get_sizes(self, context: Context, start: PosInfo, value: O[List[T]], n: int) -> Tu[O[PosInfo], O[PosInfo]]: child = context.peek(self.child) stop = context.peek(self.stop) @@ -75,25 +75,28 @@ class Arr(G[T], Type[List[T]]): else: elem = None with context.enter(i, c): - size = context.sizeof(c, elem) + size = context.sizeof(c, start, elem) sizes.append(size) + start = add_sizes(start, size) if stop is not None and not isinstance(stop, FunctionType): - sizes.append(context.sizeof(child, stop)) + size = context.sizeof(child, start, stop) + sizes.append(size) + start = add_sizes(start, size) - return sizes + return start, sizes - def sizeof(self, context: Context, value: O[List[T]]) -> O[Mapping[str, int]]: + def sizeof(self, context: Context, start: PosInfo, value: O[List[T]]) -> O[PosInfo]: if value is not None: count = len(value) else: count = context.peek(self.count) if count is None: return None - sizes = self.get_sizes(context, value, count) + _, sizes = self.get_sizes(context, start, value, count) return add_sizes(*sizes) if sizes else 0 - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[List[T]]) -> O[int]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[List[T]]) -> O[PosInfo]: if not path: return 0 @@ -103,11 +106,11 @@ class Arr(G[T], Type[List[T]]): raise ValueError('path element for array must be integer') child = context.peek(self.child) - sizes = self.get_sizes(context, value, i) + start, sizes = self.get_sizes(context, start, value, i) if path: c = to_type(child) with context.enter(i, c): - sizes.append(context.offsetof(c, path, value[i] if value is not None else None)) + sizes.append(context.offsetof(c, start, path, value[i] if value is not None else None)) return add_sizes(*sizes) if sizes else 0 def default(self, context: Context) -> List[T]: @@ -151,7 +154,7 @@ class Tuple(Type): with context.enter(i, c): context.dump(c, stream, elem) - def sizeof(self, context: Context, value: O[Tu]) -> O[int]: + def sizeof(self, context: Context, start: PosInfo, value: O[Tu]) -> PosInfo: sizes = [] if value is None: @@ -159,11 +162,13 @@ class Tuple(Type): for i, (child, elem) in enumerate(zip(self.children, value)): c = to_type(child) with context.enter(i, c): - sizes.append(context.sizeof(c, elem)) + size = context.sizeof(c, start, elem) + start = add_sizes(start, size) + sizes.append(size) return add_sizes(*sizes) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[Tu]) -> O[int]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[Tu]) -> O[int]: if not path: return 0 @@ -181,12 +186,14 @@ class Tuple(Type): break c = to_type(child) with context.enter(i, c): - sizes.append(context.sizeof(c, elem)) + size = context.sizeof(c, start, elem) + start = add_sizes(start, size) + sizes.append(size) if path: c = to_type(child) with context.enter(n, c): - sizes.append(context.offsetof(c, path, elem)) + sizes.append(context.offsetof(c, start, path, elem)) return add_sizes(*sizes) diff --git a/sx/types/str.py b/sx/types/str.py index 7f84daf..0b012e4 100644 --- a/sx/types/str.py +++ b/sx/types/str.py @@ -1,7 +1,7 @@ import enum from typing import Optional as O, Union as U from ..core.base import PossibleDynamic as D, Type, Context -from ..core.io import Stream +from ..core.io import Stream, PosInfo from .io import Terminated @@ -36,7 +36,7 @@ class Str(Type[str]): def default(self, context: Context) -> str: return '' - def sizeof(self, context: Context, value: O[str]) -> O[int]: + def sizeof(self, context: Context, start: PosInfo, value: O[str]) -> O[PosInfo]: if value is not None: return len(value.encode(context.peek(self.encoding))) return None diff --git a/sx/types/struct.py b/sx/types/struct.py index 2a77f35..fdccb5b 100644 --- a/sx/types/struct.py +++ b/sx/types/struct.py @@ -11,7 +11,7 @@ from contextlib import contextmanager import sx from ..core import to_type from ..core.base import Context, Type, PathElement -from ..core.io import Stream, Pos, add_sizes, max_sizes +from ..core.io import Stream, Pos, PosInfo, add_sizes, max_sizes from ..core.util import indent, format_value, get_annot_locations from ..core.meta import Generic, TypeSource from ..core.expr import ProxyExpr @@ -142,7 +142,7 @@ class StructType(G[T], Type[T]): def default(self, context: Context) -> T: return self.cls(_sx_context_=context) - def get_sizes(self, context: Context, value: O[Any], n: str) -> List[Mapping[str, int]]: + def get_sizes(self, context: Context, start: PosInfo, value: O[Any], n: str) -> Tuple[PosInfo, List[PosInfo]]: sizes = [] for field, child in self.fields.items(): if field == n: @@ -153,13 +153,15 @@ class StructType(G[T], Type[T]): elem = None c = to_type(child, field) with context.enter(field, c): - size = context.sizeof(c, elem) + size = context.sizeof(c, start, elem) + if not self.union: + start = add_sizes(start, size) sizes.append(size) - return sizes + return start, sizes - def sizeof(self, context: Context, value: O[T]) -> O[Mapping[str, int]]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> PosInfo: with self.enter(): - sizes = self.get_sizes(context, value, None) + _, sizes = self.get_sizes(context, start, value, None) if sizes: if self.union: return max_sizes(*sizes) @@ -168,7 +170,7 @@ class StructType(G[T], Type[T]): else: return 0 - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[int]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[int]: if not path: return 0 @@ -184,10 +186,10 @@ class StructType(G[T], Type[T]): if self.union: sizes = [] else: - sizes = self.get_sizes(context, value, field) + start, sizes = self.get_sizes(context, start, value, field) if path: with context.enter(field, child): - sizes.append(context.offsetof(child, path, getattr(value, field) if value is not None else None)) + sizes.append(context.offsetof(child, start, path, getattr(value, field) if value is not None else None)) return add_sizes(*sizes) if sizes else 0 def __str__(self) -> str: diff --git a/sx/types/transforms.py b/sx/types/transforms.py index c6f173f..e2b38c6 100644 --- a/sx/types/transforms.py +++ b/sx/types/transforms.py @@ -2,7 +2,7 @@ import enum import io from typing import Optional as O, Generic as G, Union as U, TypeVar, Callable, Sequence, Mapping, Type as Ty from ..core.base import Type, Context, PathElement -from ..core.io import Stream, Pos, to_stream +from ..core.io import Stream, Pos, PosInfo, to_stream from ..core.meta import Wrapper from ..core import to_type, dump @@ -65,7 +65,7 @@ class Preproc(G[T], Wrapper[T]): else: stream.write(proc) - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]: if value is not None: raw_stream = io.BytesIO() dump(context.peek(self.child), value, raw_stream) @@ -74,7 +74,7 @@ class Preproc(G[T], Wrapper[T]): if self.on_sizeof: size = self.on_sizeof(value, context) if ctx else self.on_sizeof(value) if size is None: - size = super().sizeof(context, value) + size = super().sizeof(context, start, value) return size def __str__(self) -> str: @@ -108,19 +108,19 @@ class Proc(G[T, V], Type[V]): with context.enter(None, child): context.dump(child, stream, value) - def sizeof(self, context: Context, value: O[V]) -> O[int]: + def sizeof(self, context: Context, start: PosInfo, value: O[V]) -> O[int]: if value is not None: value = self.on_dump(value, context) if self.context else self.on_dump(value) child = to_type(self.child) with context.enter(None, child): - return context.sizeof(child, value) + return context.sizeof(child, start, value) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[V]) -> O[int]: + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[V]) -> O[int]: if value is not None: value = self.on_dump(value, context) if self.context else self.on_dump(value) child = to_type(self.child) with context.enter(None, child): - return context.offsetof(child, path, value) + return context.offsetof(child, start, path, value) def default(self, context: Context) -> V: child = to_type(self.child) @@ -212,11 +212,11 @@ class Fixed(G[T], Wrapper[T]): def default(self, context: Context) -> T: return self.value - def sizeof(self, context: Context, value: O[T]) -> O[Pos]: - return super().sizeof(context, value or context.peek(self.value)) + def sizeof(self, context: Context, start: PosInfo, value: O[T]) -> O[Pos]: + return super().sizeof(context, start, value or context.peek(self.value)) - def offsetof(self, context: Context, path: Sequence[PathElement], value: O[T]) -> O[Pos]: - return super().offsetof(context, path, value or context.peek(self.value)) + def offsetof(self, context: Context, start: PosInfo, path: Sequence[PathElement], value: O[T]) -> O[Pos]: + return super().offsetof(context, start, path, value or context.peek(self.value)) def __str__(self) -> str: return str(self.value)