notded/dotnet/streams.py

94 lines
2.6 KiB
Python

import enum
import uuid
from destruct import Type, Struct, Arr
from .common import CLRStreamType, CLRTableType, CLRHeapFlags
from .tables import TABLE_PARSERS
STREAM_PARSERS = {}
def stream_parser(name):
def inner(c):
STREAM_PARSERS[name] = c
return c
return inner
class BitVector(Type):
ORDER_MAP = {
'be': 'big',
'le': 'little',
}
def __init__(self, size, child=None, order='be'):
self.size = size
self.order = order
self.child = child
def parse(self, input, context):
data = int.from_bytes(input.read(self.size // 8), self.ORDER_MAP[self.order])
values = []
for i in range(self.size):
if data & (1 << i):
values.append(self.child(i) if self.child else i)
return values
def emit(self, output, value, context):
data = 0
for i in values:
data |= (1 << i)
output.write(data.to_bytes(self.size // 8, self.ORDER_MAP[self.order]))
def __repr__(self):
return '<BitVector[{}]>'.format(self.size)
@stream_parser(CLRStreamType.Metadata)
class CLRMetadataStream(Struct):
_gap0 = Data(4)
version_major = UInt(8)
version_minor = UInt(8)
heap_flags = Enum(CLRHeapFlags, UInt(8))
_gap7 = Data(1)
present = BitVector(64, CLRTableType, order='le')
sorted = BitVector(64, CLRTableType, order='le')
row_counts = Arr(UInt(32))
tables = Arr([])
def on_present(self, spec, context):
spec.row_counts.count = len(self.present)
spec.tables.count = len(self.present)
def on_row_counts(self, spec, context):
counts = {}
for p, count in zip(self.present, self.row_counts):
spec.tables.child.append(Arr(TABLE_PARSERS[p], count=count))
counts[p] = count
self.row_counts = counts
context.user.metadata = self
def on_tables(self, spec, context):
tables = {}
for p, table in zip(self.present, self.tables):
tables[p] = table
self.tables = tables
class GUID(Type):
def parse(self, input, context):
data = input.read(16)
return uuid.UUID(bytes=data)
def emit(self, value, output, context):
output.write(value.bytes)
def sizeof(self, value, context):
return 16
@stream_parser(CLRStreamType.GUID)
class CLRGUIDStream(Struct):
guids = Arr(GUID)
@stream_parser(CLRStreamType.String)
@stream_parser(CLRStreamType.UserString)
@stream_parser(CLRStreamType.Blob)
class CLRDataStream(Struct):
data = Data(None)