io: add Lazy type

This commit is contained in:
Shiz 2021-08-12 03:02:19 +02:00
parent a8ea56fedb
commit 456b1e84f7
2 changed files with 74 additions and 2 deletions

View File

@ -21,7 +21,7 @@ from .types.seq import Arr, Tuple
from .types.struct import StructType, Struct, Union
from .types.transforms import Default, Transform, Mapped, Enum, Check, Fixed
from .types.control import Switch, If
from .types.io import Sized, Terminated, Ref, AlignTo, AlignedTo
from .types.io import Sized, Terminated, Ref, AlignTo, AlignedTo, Lazy
del types
__all__ = [k for k in globals() if not k.startswith('_')]

View File

@ -1,7 +1,7 @@
from typing import Any, Generic as G, TypeVar, Union as U, Optional as O, Sequence
import os
import errno
from ..core.base import Type, Context, PossibleDynamic, PathElement
from ..core.base import Type, Context, PossibleDynamic, PathElement, to_type
from ..core.io import Stream, Segment, Pos, add_sizes
from ..core.meta import Wrapper
from ..core.util import stretch, seeking, find_overlap
@ -272,6 +272,78 @@ class Ref(G[T], Wrapper[T]):
return f'{__name__}.Ref({super().__repr__()}, pos={self.pos!r}, whence={self.whence!r}, segment={self.segment!r})'
LazyUnresolved = object()
class LazyEntry(G[T]):
__slots__ = ('type', 'context', 'stream', 'positions', 'value')
def __init__(self, type: Type[T], context: Context, stream: Stream, pos: dict[Segment, Pos], value=LazyUnresolved) -> None:
self.type = type
self.context = context.copy()
self.stream = stream
self.positions = pos
self.value = value
def __call__(self) -> T:
if self.value is LazyUnresolved:
old_pos = {}
for segment, pos in self.positions.items():
old_pos[segment] = segment.pos
segment.pos = pos
try:
with self.context.enter_segment(self.context.segment, self.stream, self.positions[self.context.segment]) as stream:
self.value = self.context.parse(to_type(self.type), stream)
finally:
for segment, pos in old_pos.items():
segment.pos = pos
return self.value
def __str__(self) -> str:
return f'~{self.type}'
def __repr__(self) -> str:
return f'{__name__}.{self.__class__.__name__}({self.type!r})'
class Lazy(G[T], Type[LazyEntry[T]]):
__slots__ = ('type',)
def __init__(self, type: Type[T]) -> None:
self.type = type
def parse(self, context: Context, stream: Stream) -> LazyEntry[T]:
sizes = context.to_size(self.sizeof(context, None))
pos = {}
for segment, size in sizes.items():
if size is None:
raise ValueError(f'lazy type size in segment {segment.name} must be known at parse time')
with context.enter_segment(segment, stream) as ss:
pos[segment] = ss.tell()
ss.seek(size, os.SEEK_CUR)
if context.segment.pos != stream.tell():
stream.seek(context.segment.pos, os.SEEK_SET)
return LazyEntry(to_type(self.type), context, stream, pos)
def dump(self, context: Context, stream: Stream, value: LazyEntry[T]) -> None:
return context.dump(to_type(self.type), stream, value())
def sizeof(self, context: Context, value: O[LazyEntry[T]]) -> O[Pos]:
return context.sizeof(to_type(self.type), value() if value is not None else value)
def offsetof(self, context: Context, path: Sequence[PathElement], value: O[LazyEntry[T]]) -> O[Pos]:
return context.offsetof(to_type(self.type), path, value() if value is not None else value)
def default(self, context: Context) -> LazyEntry[T]:
return LazyEntry(self.type, context, None, {}, value=context.default(to_type(self.type)))
def __str__(self) -> str:
return f'~{self.type}'
def __repr__(self) -> str:
return f'{__name__}.{self.__class__.__name__}({self.type!r})'
class AlignTo(G[T], Wrapper[T]):
__slots__ = ('alignment', 'value')