158 lines
5.0 KiB
Python
158 lines
5.0 KiB
Python
import pefile
|
|
import destruct
|
|
from destruct import Type, Struct, Arr
|
|
|
|
from .common import (
|
|
CLISectionReference, CLIToken,
|
|
CLIStreamType, CLIStreamIndex, CLITableType, CLITableIndex, CLITableRange
|
|
)
|
|
from .streams import STREAM_PARSERS
|
|
from .tables import CLITypeClassSemantics
|
|
from .wrappers import wrap
|
|
|
|
|
|
class CLIHeader(Struct, nocopy=True):
|
|
size = UInt(32)
|
|
version_major = UInt(16)
|
|
version_minor = UInt(16)
|
|
metadata_info = CLISectionReference
|
|
flags = UInt(32)
|
|
entrypoint = CLIToken
|
|
unk_info = CLISectionReference
|
|
namesig_info = CLISectionReference
|
|
_gap0x28 = Data(32)
|
|
|
|
class CLIStreamMetadata(Struct, nocopy=True):
|
|
offset = UInt(32)
|
|
size = UInt(32)
|
|
name = AlignTo(Str(kind='c'), 4)
|
|
|
|
class CLIMetadataHeader(Struct, nocopy=True):
|
|
magic = Sig(b'BSJB')
|
|
version_major = UInt(16)
|
|
version_minor = UInt(16)
|
|
_gap0x8 = Data(4)
|
|
version = Str(kind='pascal', length_type=UInt(32))
|
|
_gap0x12 = Data(2)
|
|
stream_count = UInt(16)
|
|
streams = Arr(CLIStreamMetadata)
|
|
|
|
def on_stream_count(self, spec, context):
|
|
spec.streams.count = self.stream_count
|
|
|
|
|
|
class CLIFile:
|
|
def __init__(self, fn):
|
|
self.name = fn
|
|
self.pe = pefile.PE(fn)
|
|
self.handle = open(fn, 'rb')
|
|
self.header = None
|
|
self.metadata = None
|
|
self.streams = {}
|
|
self.parse()
|
|
|
|
def __repr__(self):
|
|
return '<{}: "{}">'.format(self.__class__.__name__, self.name)
|
|
|
|
def read_at(self, offset, size):
|
|
return self.pe.get_data(rva=offset, length=size)
|
|
|
|
def parse_at(self, c, offset, size=None):
|
|
if size:
|
|
c = destruct.Capped(c, size)
|
|
pos = self.pe.get_offset_from_rva(offset)
|
|
with destruct.seeking(self.handle, pos) as f:
|
|
return destruct.parse(c, f)
|
|
|
|
def parse(self):
|
|
net_data_entry = pefile.DIRECTORY_ENTRY['IMAGE_DIRECTORY_ENTRY_COM_DESCRIPTOR']
|
|
if net_data_entry < len(self.pe.OPTIONAL_HEADER.DATA_DIRECTORY):
|
|
net_data_dir = self.pe.OPTIONAL_HEADER.DATA_DIRECTORY[net_data_entry]
|
|
net_offset = net_data_dir.VirtualAddress
|
|
net_size = net_data_dir.Size
|
|
else:
|
|
for section in self.pe.sections:
|
|
if section.Name.rstrip(b'\x00') != b'.text':
|
|
continue
|
|
net_offset = section.PointerToRawData + 8
|
|
net_size = 72
|
|
|
|
self.header = self.parse_at(CLIHeader, net_offset, net_size)
|
|
self.metadata = self.parse_at(CLIMetadataHeader, self.header.metadata_info.rva, self.header.metadata_info.size)
|
|
self.streams = {}
|
|
for s in self.metadata.streams:
|
|
type = CLIStreamType(s.name)
|
|
if type in STREAM_PARSERS:
|
|
stream = self.parse_at(STREAM_PARSERS[type], self.header.metadata_info.rva + s.offset, s.size)
|
|
else:
|
|
stream = None
|
|
self.streams[type] = stream
|
|
|
|
def get_table(self, t):
|
|
return (wrap(self, t, i + 1, x()) for i, x in enumerate(self.streams[CLIStreamType.Metadata].tables[t]))
|
|
|
|
def get_table_size(self, t):
|
|
return len(self.streams[CLIStreamType.Metadata].tables[t])
|
|
|
|
def get_table_entry(self, t, i):
|
|
if not i:
|
|
return None
|
|
return wrap(self, t, i, self.streams[CLIStreamType.Metadata].tables[t][i - 1]())
|
|
|
|
def get_string_at(self, i):
|
|
buf = bytearray()
|
|
while True:
|
|
c = self.streams[CLIStreamType.String].data[i]
|
|
if not c:
|
|
break
|
|
buf.append(c)
|
|
i += 1
|
|
return buf.decode('utf-8')
|
|
|
|
def _get_length_prefixed_value(self, stream, i):
|
|
length = self.streams[stream].data[i]
|
|
if (length >> 5) == 0b110:
|
|
length = length & 0b11111
|
|
nbytes = 4
|
|
elif (length >> 6) == 0b10:
|
|
length = length & 0b111111
|
|
nbytes = 2
|
|
else:
|
|
nbytes = 1
|
|
for off in range(nbytes - 1):
|
|
length = (length << 8) | self.streams[stream].data[i + 1 + off]
|
|
return self.streams[stream].data[i + nbytes:i + nbytes + length]
|
|
|
|
def get_blob_at(self, i):
|
|
return self._get_length_prefixed_value(CLIStreamType.Blob, i)
|
|
|
|
def get_user_string_at(self, i):
|
|
return self._get_length_prefixed_value(CLIStreamType.UserString, i)[:-1].decode('utf-16le')
|
|
|
|
def get_guid_at(self, i):
|
|
if i == 0:
|
|
return None
|
|
return self.streams[CLIStreamType.GUID].guids[i - 1]
|
|
|
|
def get_by_token(self, t):
|
|
return self.get_table_entry(t.table, t.row)
|
|
|
|
|
|
@property
|
|
def entrypoint(self):
|
|
return self.get_by_token(self.header.entrypoint)
|
|
|
|
@property
|
|
def classes(self):
|
|
return (
|
|
t for t in self.get_table(CLITableType.TypeDef)
|
|
if t.flags.semantics == CLITypeClassSemantics.Class
|
|
)
|
|
|
|
@property
|
|
def interfaces(self):
|
|
return (
|
|
t for t in self.get_table(CLITableType.TypeDef)
|
|
if t.flags.semantics == CLITypeClassSemantics.Interface
|
|
)
|