Add ability to serialize out to a new file handle.

This commit is contained in:
Jennifer Taylor 2021-10-21 00:20:54 +00:00
parent 43dfaaf220
commit 9f07dbb55b
3 changed files with 105 additions and 53 deletions

View File

@ -45,7 +45,7 @@ class FileBytes:
# Never going to find it anyway.
return None
chunksize = max(searchlen * 2, 1024)
chunksize = max(searchlen * 2, 0x8000)
startoffset = searchstart
data: bytes = self[searchstart:(searchstart + (chunksize * 3))]
endoffset = searchstart + len(data)
@ -167,71 +167,77 @@ class FileBytes:
already.add(inst)
self.__gather(already, inst)
def write_changes(self) -> None:
def write_changes(self, new_file: Optional[BinaryIO] = None) -> None:
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
locations = sorted(self.__patches.keys())
keys: Set[int] = set(locations)
handled: Set[int] = set()
if new_file is not None:
# We want to serialize this out to a new file altogether.
for offset in range(0, self.__patchlength, 0x8000):
new_file.write(self[offset:(offset + 0x8000)])
else:
# We want to update the underlying file to contain this data.
locations = sorted(self.__patches.keys())
keys: Set[int] = set(locations)
handled: Set[int] = set()
# First off, see if we need to truncate the file.
if self.__filelength < self.__origfilelength:
self.__handle.truncate(self.__filelength)
self.__origfilelength = self.__filelength
if self.__filelength > self.__origfilelength:
raise Exception("Logic error, somehow resized file bigger than it started?")
# First off, see if we need to truncate the file.
if self.__filelength < self.__origfilelength:
self.__handle.truncate(self.__filelength)
self.__origfilelength = self.__filelength
if self.__filelength > self.__origfilelength:
raise Exception("Logic error, somehow resized file bigger than it started?")
# Now, gather up any changes to the file and write them back.
for location in locations:
if location in handled:
# Already wrote this in a chunk.
continue
# Now, gather up any changes to the file and write them back.
for location in locations:
if location in handled:
# Already wrote this in a chunk.
continue
# Figure out the maximum range for this chunk.
start = location
end = location + 1
while end in keys:
end += 1
# Figure out the maximum range for this chunk.
start = location
end = location + 1
while end in keys:
end += 1
# Sum it up
data = bytes(self.__patches[loc] for loc in range(start, end))
# Sum it up
data = bytes(self.__patches[loc] for loc in range(start, end))
# Write it
self.__handle.seek(start)
self.__handle.write(data)
# Write it
self.__handle.seek(start)
self.__handle.write(data)
# Mark it complete
handled.update(range(start, end))
# Mark it complete
handled.update(range(start, end))
if keys != handled:
raise Exception("Logic error, failed to write some data!")
if keys != handled:
raise Exception("Logic error, failed to write some data!")
# Now that we've serialized out the data, clean up our own representation.
self.__handle.flush()
self.__patches.clear()
self.__filelength = self.__patchlength
# Now that we've serialized out the data, clean up our own representation.
self.__handle.flush()
self.__patches.clear()
self.__filelength = self.__patchlength
# Finally, find all other clones of this class and notify them that they're
# unsafe, so that there isn't any surprise behavior if somebody clones a
# FileBytes and then writes back to the underlying file on that clone. This
# is because the only thing we have in memory is the patches we've made, so
# if the underlying file is changed suddenly its all wrong.
notify: Set[FileBytes] = {self}
self.__gather(notify, self)
for inst in notify:
if inst is self:
continue
# Finally, find all other clones of this class and notify them that they're
# unsafe, so that there isn't any surprise behavior if somebody clones a
# FileBytes and then writes back to the underlying file on that clone. This
# is because the only thing we have in memory is the patches we've made, so
# if the underlying file is changed suddenly its all wrong.
notify: Set[FileBytes] = {self}
self.__gather(notify, self)
for inst in notify:
if inst is self:
continue
# Mark this clone as unsafe for read/write operations.
inst.__unsafe = True
# Mark this clone as unsafe for read/write operations.
inst.__unsafe = True
# Set up the clone so that if it is cloned itself, the clone will
# work since it can read directly from the updated file.
inst.__filelength = self.__filelength
inst.__patchlength = self.__patchlength
inst.__origfilelength = self.__origfilelength
inst.__patches.clear()
# Set up the clone so that if it is cloned itself, the clone will
# work since it can read directly from the updated file.
inst.__filelength = self.__filelength
inst.__patchlength = self.__patchlength
inst.__origfilelength = self.__origfilelength
inst.__patches.clear()
def __slice(self, key: slice) -> Tuple[int, int, int]:
# Determine step of slice

View File

@ -8,7 +8,7 @@ with open(os.path.join("arcadeutils", "README.md"), "r", encoding="utf-8") as fh
setup(
name='arcadeutils',
version='0.1.5',
version='0.1.6',
description='Collection of utilities written in Python for working with various arcade binaries.',
long_description=long_description,
long_description_content_type="text/markdown",

View File

@ -814,3 +814,49 @@ class TestFileBytes(unittest.TestCase):
fb.search(b"12345"),
location,
)
def test_write_new_file(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
fb[3] = 97
self.assertEqual(
fb[:],
b"012a456789",
)
fb[7:9] = b"bc"
self.assertEqual(
fb[:],
b"012a456bc9",
)
fb[4:8:2] = b"de"
self.assertEqual(
fb[:],
b"012ad5ebc9",
)
fb[-1] = 102
self.assertEqual(
fb[:],
b"012ad5ebcf",
)
# Verify that it gets serialized correctly to a new file.
new_file = io.BytesIO(b"")
fb.write_changes(new_file)
handle = fb.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
# Make sure original file didn't get modified.
self.assertEqual(
handle.getvalue(),
b"0123456789",
)
# Make sure new file did get modified.
self.assertEqual(
new_file.getvalue(),
b"012ad5ebcf",
)