Add a FileBytes class that allows diffing and patching to be done against files that are not fully loaded in RAM.

This commit is contained in:
Jennifer Taylor 2021-10-20 19:22:32 +00:00
parent 80c301ff85
commit 7ef541ca0c
6 changed files with 1532 additions and 13 deletions

View File

@ -1,7 +1,9 @@
from .binary import BinaryDiffException, BinaryDiff, ByteUtil
from .filebytes import FileBytes
__all__ = [
"BinaryDiffException",
"BinaryDiff",
"ByteUtil",
"FileBytes",
]

View File

@ -1,6 +1,8 @@
from typing import List, Optional, Tuple, cast
from typing import List, Optional, Tuple, Union, cast, overload
from typing_extensions import Final
from .filebytes import FileBytes
class BinaryDiffException(Exception):
pass
@ -19,7 +21,7 @@ class BinaryDiff:
return out
@staticmethod
def diff(bin1: bytes, bin2: bytes) -> List[str]:
def diff(bin1: Union[bytes, FileBytes], bin2: Union[bytes, FileBytes]) -> List[str]:
binlength = len(bin1)
if binlength != len(bin2):
raise BinaryDiffException("Cannot diff different-sized binary blobs!")
@ -164,6 +166,7 @@ class BinaryDiff:
# Finally, return it
return differences
@overload
@staticmethod
def patch(
binary: bytes,
@ -172,6 +175,31 @@ class BinaryDiff:
reverse: bool = False,
ignore_size_differences: bool = False,
) -> bytes:
...
@overload
@staticmethod
def patch(
binary: FileBytes,
patchlines: List[str],
*,
reverse: bool = False,
ignore_size_differences: bool = False,
) -> FileBytes:
...
@staticmethod
def patch(
binary: Union[bytes, FileBytes],
patchlines: List[str],
*,
reverse: bool = False,
ignore_size_differences: bool = False,
) -> Union[bytes, FileBytes]:
# If we were given filebytes, get a clone of it so we don't modify the input.
if isinstance(binary, FileBytes):
binary = binary.clone()
# First, grab the differences
if not ignore_size_differences:
file_size = BinaryDiff.size(patchlines)
@ -203,18 +231,31 @@ class BinaryDiff:
f"but found {BinaryDiff._hex(binary[offset])}!"
)
if isinstance(binary, bytes):
if last_patch_end < offset:
chunks.append(binary[last_patch_end:offset])
chunks.append(new)
last_patch_end = offset + 1
elif isinstance(binary, FileBytes):
binary[offset:(offset + len(new))] = new
else:
# This should never happen?
raise NotImplementedError("Not implemented!")
if isinstance(binary, bytes):
# Return the new data!
chunks.append(binary[last_patch_end:])
return b"".join(chunks)
elif isinstance(binary, FileBytes):
# We modified the filebytes object in place.
return binary
else:
# This should never happen?
raise NotImplementedError("Not implemented!")
@staticmethod
def can_patch(
binary: bytes,
binary: Union[bytes, FileBytes],
patchlines: List[str],
*,
reverse: bool = False,

347
arcadeutils/filebytes.py Normal file
View File

@ -0,0 +1,347 @@
from typing import BinaryIO, Dict, List, Set, Tuple, Union, overload
class FileBytes:
def __init__(self, handle: BinaryIO) -> None:
self.__handle: BinaryIO = handle
self.__patches: Dict[int, int] = {}
self.__copies: List["FileBytes"] = []
self.__unsafe: bool = False
handle.seek(0, 2)
self.__filelength: int = handle.tell()
self.__origfilelength: int = self.__filelength
self.__patchlength: int = self.__filelength
@property
def handle(self) -> BinaryIO:
return self.__handle
def __len__(self) -> int:
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
return self.__patchlength
def __add__(self, other: object) -> "FileBytes":
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
if isinstance(other, FileBytes):
clone = self.clone()
clone.append(other[:])
elif isinstance(other, bytes):
clone = self.clone()
clone.append(other)
else:
raise NotImplementedError("Not implemented!")
return clone
def clone(self) -> "FileBytes":
# Make a safe copy so that in-memory patches can be changed.
myclone = FileBytes(self.__handle)
myclone.__patches = {k: v for k, v in self.__patches.items()}
myclone.__filelength = self.__filelength
myclone.__patchlength = self.__patchlength
myclone.__origfilelength = self.__origfilelength
# Make sure we can invalidate copies if we write back the data.
myclone.__copies.append(self)
self.__copies.append(myclone)
return myclone
def append(self, data: bytes) -> None:
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
# Add data to the end of our representation.
for off, change in enumerate(data):
self.__patches[self.__patchlength + off] = change
self.__patchlength += len(data)
def truncate(self, size: int) -> None:
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
# Truncate the resulting data
if size < 0:
raise NotImplementedError("Not implemented!")
if size >= self.__patchlength:
# We are already this short?
return
# Set the file length to this size so we don't read anything past it.
if size < self.__filelength:
self.__filelength = size
# Get rid of any changes made in the truncation range.
for off in range(size, self.__patchlength):
if off in self.__patches:
del self.__patches[off]
# Set the length of this object to the size as well so resizing will
# zero out the data.
self.__patchlength = size
def __gather(self, already: Set["FileBytes"], need: "FileBytes") -> None:
for inst in need.__copies:
if inst not in already:
already.add(inst)
self.__gather(already, inst)
def write_changes(self) -> None:
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
locations = sorted(self.__patches.keys())
keys: Set[int] = set(locations)
handled: Set[int] = set()
# First off, see if we need to truncate the file.
if self.__filelength < self.__origfilelength:
self.__handle.truncate(self.__filelength)
self.__origfilelength = self.__filelength
if self.__filelength > self.__origfilelength:
raise Exception("Logic error, somehow resized file bigger than it started?")
# Now, gather up any changes to the file and write them back.
for location in locations:
if location in handled:
# Already wrote this in a chunk.
continue
# Figure out the maximum range for this chunk.
start = location
end = location + 1
while end in keys:
end += 1
# Sum it up
data = bytes(self.__patches[loc] for loc in range(start, end))
# Write it
self.__handle.seek(start)
self.__handle.write(data)
# Mark it complete
handled.update(range(start, end))
if keys != handled:
raise Exception("Logic error, failed to write some data!")
# Now that we've serialized out the data, clean up our own representation.
self.__handle.flush()
self.__patches.clear()
self.__filelength = self.__patchlength
# Finally, find all other clones of this class and notify them that they're
# unsafe, so that there isn't any surprise behavior if somebody clones a
# FileBytes and then writes back to the underlying file on that clone. This
# is because the only thing we have in memory is the patches we've made, so
# if the underlying file is changed suddenly its all wrong.
notify: Set[FileBytes] = {self}
self.__gather(notify, self)
for inst in notify:
if inst is self:
continue
# Mark this clone as unsafe for read/write operations.
inst.__unsafe = True
# Set up the clone so that if it is cloned itself, the clone will
# work since it can read directly from the updated file.
inst.__filelength = self.__filelength
inst.__patchlength = self.__patchlength
inst.__origfilelength = self.__origfilelength
inst.__patches.clear()
def __slice(self, key: slice) -> Tuple[int, int, int]:
# Determine step of slice
if key.step is None:
step = 1
else:
step = key.step
# Determine start of slice
if key.start is None:
start = 0 if step > 0 else self.__patchlength
elif key.start < 0:
start = self.__patchlength + key.start
else:
start = key.start
# Determine end of slice
if key.stop is None:
stop = self.__patchlength if step > 0 else -1
elif key.stop < 0:
stop = self.__patchlength + key.stop
else:
stop = key.stop
if start < 0:
raise Exception("Logic error!")
if start >= self.__patchlength:
start = self.__patchlength
if stop >= self.__patchlength:
stop = self.__patchlength
return (start, stop, step)
@overload
def __getitem__(self, key: int) -> int:
...
@overload
def __getitem__(self, key: slice) -> bytes:
...
def __getitem__(self, key: Union[int, slice]) -> Union[int, bytes]:
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
if isinstance(key, int):
# Support negative indexing.
if key < 0:
key = self.__patchlength + key
if key >= self.__patchlength:
raise IndexError("FileBytes index out of range")
# Look up in our modifications, and then fall back to the file.
if key in self.__patches:
return self.__patches[key]
else:
if key >= self.__filelength:
raise Exception("Logic error, should never fall through to loading file bytes in area enlarged by patches!")
self.__handle.seek(key)
return self.__handle.read(1)[0]
elif isinstance(key, slice):
# Grab our iterators.
start, stop, step = self.__slice(key)
if start == stop:
return b""
if start > stop and step > 0:
return b""
if start < stop and step < 0:
return b""
# Do we have any modifications to the file in this area?
modifications = any(index in self.__patches for index in range(start, stop, step))
outofrange = any(index >= self.__filelength for index in range(start, stop, step))
# Now see if we can do any fast loading
if start < stop and step == 1:
if not modifications and not outofrange:
# This is just a contiguous read
self.__handle.seek(start)
return self.__handle.read(stop - start)
else:
# We need to modify at least one of the bytes in this read.
self.__handle.seek(start)
data = [x for x in self.__handle.read(stop - start)]
# Append any amount of data we need to read past the end of the file.
if len(data) < stop - start:
data = data + ([0] * (stop - len(data)))
# Now we have to modify the data with our own overlay.
for off in range(start, stop):
if off in self.__patches:
data[off - start] = self.__patches[off]
return bytes(data)
elif start > stop and step == -1:
if not modifications and not outofrange:
# This is just a continguous read, reversed
self.__handle.seek(stop + 1)
return self.__handle.read(start - stop)[::-1]
else:
self.__handle.seek(stop + 1)
data = [x for x in self.__handle.read(start - stop)]
# Append any amount of data we need to read past the end of the file.
if len(data) < stop - start:
data = data + ([0] * (stop - len(data)))
# Now we have to modify the data with our own overlay.
for index, off in enumerate(range(stop + 1, start + 1)):
if off in self.__patches:
data[index] = self.__patches[off]
return bytes(data[::-1])
else:
# Gotta load the slow way
resp: List[bytes] = []
for off in range(start, stop, step):
if off in self.__patches:
resp.append(bytes([self.__patches[off]]))
else:
if off >= self.__filelength:
raise Exception("Logic error, should never fall through to loading file bytes in area enlarged by patches!")
self.__handle.seek(off)
resp.append(self.__handle.read(1))
return b"".join(resp)
else:
raise NotImplementedError("Not implemented!")
@overload
def __setitem__(self, key: int, val: int) -> None:
...
@overload
def __setitem__(self, key: slice, val: bytes) -> None:
...
def __setitem__(self, key: Union[int, slice], val: Union[int, bytes]) -> None:
if self.__unsafe:
raise Exception("Another FileBytes instance representing the same file was written back!")
if isinstance(key, int):
if not isinstance(val, int):
raise NotImplementedError("Not implemented!")
# Support negative indexing.
if key < 0:
key = self.__patchlength + key
if key >= self.__patchlength:
raise IndexError("FileBytes index out of range")
self.__patches[key] = val
elif isinstance(key, slice):
if not isinstance(val, bytes):
raise NotImplementedError("Not implemented!")
# Grab our iterators.
start, stop, step = self.__slice(key)
vallen = len(val)
if start == stop:
if vallen != 0:
raise NotImplementedError("Cannot resize FileBuffer!")
if start > stop and step > 0:
if vallen != 0:
raise NotImplementedError("Cannot resize FileBuffer!")
if start < stop and step < 0:
if vallen != 0:
raise NotImplementedError("Cannot resize FileBuffer!")
# Now, verify the patches are the right length. Make sure that if
# somebody catches NotImplementedError that we don't partially
# modify ourselves.
for index, _off in enumerate(range(start, stop, step)):
if index >= vallen:
raise NotImplementedError("Cannot resize FileBuffer!")
if index != (vallen - 1):
raise NotImplementedError("Cannot resize FileBuffer!")
# Finally, perform the modification.
for index, off in enumerate(range(start, stop, step)):
self.__patches[off] = val[index]
else:
raise NotImplementedError("Not implemented!")

View File

@ -1,8 +1,10 @@
from arcadeutils import BinaryDiff, BinaryDiffException
import io
import unittest
from arcadeutils import BinaryDiff, BinaryDiffException, FileBytes
class TestBinaryDiff(unittest.TestCase):
class TestBinaryDiffBytes(unittest.TestCase):
def test_diff_no_differences(self) -> None:
self.assertEqual(
@ -406,3 +408,415 @@ class TestBinaryDiff(unittest.TestCase):
reverse=True,
)
self.assertEqual(str(context.exception), 'Patch offset 06 specifies a wildcard and cannot be reversed!')
class TestBinaryDiffFileBytes(unittest.TestCase):
def __make_filebytes(self, data: bytes) -> FileBytes:
return FileBytes(io.BytesIO(data))
def __make_bytes(self, filebytes: FileBytes) -> bytes:
return filebytes[:]
def test_diff_no_differences(self) -> None:
self.assertEqual(
BinaryDiff.diff(self.__make_filebytes(b"abcd"), self.__make_filebytes(b"abcd")),
[],
)
self.assertEqual(
BinaryDiff.diff(self.__make_filebytes(b""), self.__make_filebytes(b"")),
[],
)
def test_diff_different_sizes(self) -> None:
with self.assertRaises(BinaryDiffException):
BinaryDiff.diff(self.__make_filebytes(b"1234"), self.__make_filebytes(b"123"))
with self.assertRaises(BinaryDiffException):
BinaryDiff.diff(self.__make_filebytes(b"123"), self.__make_filebytes(b"1234"))
def test_diff_simple(self) -> None:
self.assertEqual(
BinaryDiff.diff(self.__make_filebytes(b"abcd1234"), self.__make_filebytes(b"bbcd1234")),
[
'# File size: 8',
'00: 61 -> 62',
]
)
self.assertEqual(
BinaryDiff.diff(self.__make_filebytes(b"abcd1234"), self.__make_filebytes(b"abcd1235")),
[
'# File size: 8',
'07: 34 -> 35',
]
)
self.assertEqual(
BinaryDiff.diff(self.__make_filebytes(b"abcd1234"), self.__make_filebytes(b"abdc1224")),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
]
)
self.assertEqual(
BinaryDiff.diff(self.__make_filebytes(b"abcd1234"), self.__make_filebytes(b"4321bcda")),
[
'# File size: 8',
'00: 61 62 63 64 31 32 33 34 -> 34 33 32 31 62 63 64 61',
]
)
def test_size(self) -> None:
self.assertEqual(
BinaryDiff.size([]),
None,
)
self.assertEqual(
BinaryDiff.size(['# Comment']),
None,
)
self.assertEqual(
BinaryDiff.size(['00: 01 -> 02']),
None,
)
self.assertEqual(
BinaryDiff.size(['# File Size: 1024']),
1024,
)
self.assertEqual(
BinaryDiff.size(['# File Size: invalid']),
None,
)
def test_description(self) -> None:
self.assertEqual(
BinaryDiff.description([]),
None,
)
self.assertEqual(
BinaryDiff.description(['# Comment']),
None,
)
self.assertEqual(
BinaryDiff.description(['00: 01 -> 02']),
None,
)
self.assertEqual(
BinaryDiff.description(['# Description: sample text']),
"sample text",
)
def test_needed_amount(self) -> None:
self.assertEqual(
BinaryDiff.needed_amount([]),
0,
)
self.assertEqual(
BinaryDiff.needed_amount(
[
'# File size: 8',
'00: 61 -> 62',
]
),
1,
)
self.assertEqual(
BinaryDiff.needed_amount(
[
'# File size: 8',
'07: 34 -> 35',
]
),
8,
)
self.assertEqual(
BinaryDiff.needed_amount(
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
]
),
7,
)
self.assertEqual(
BinaryDiff.needed_amount(
[
'# File size: 8',
'00: 61 62 63 64 31 32 33 34 -> 34 33 32 31 62 63 64 61',
]
),
8,
)
def test_can_patch_normal(self) -> None:
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
),
(True, ''),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
),
(False, 'Patch is for binary of size 12 but binary is 8 bytes long!'),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
ignore_size_differences=True,
),
(True, '')
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abcd"),
[
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
),
(False, 'Patch offset 06 is beyond the end of the binary!'),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"4321bcda"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
),
(False, 'Patch offset 02 expecting 63 but found 32!'),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 8',
'06: * -> 32',
],
),
(True, ''),
)
def test_can_patch_reverse(self) -> None:
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abdc1224"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
),
(True, ''),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abdc1224"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
),
(False, 'Patch is for binary of size 12 but binary is 8 bytes long!'),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abdc1224"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
ignore_size_differences=True,
),
(True, ''),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abdc"),
[
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
),
(False, 'Patch offset 06 is beyond the end of the binary!'),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"4321bcda"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
),
(False, 'Patch offset 02 expecting 64 but found 32!'),
)
self.assertEqual(
BinaryDiff.can_patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 8',
'06: * -> 32',
],
reverse=True,
),
(False, 'Patch offset 06 specifies a wildcard and cannot be reversed!'),
)
def test_patch_normal(self) -> None:
self.assertEqual(
self.__make_bytes(BinaryDiff.patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
)),
b'abdc1224',
)
with self.assertRaises(BinaryDiffException) as context:
BinaryDiff.patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
)
self.assertEqual(str(context.exception), 'Patch is for binary of size 12 but binary is 8 bytes long!')
self.assertEqual(
self.__make_bytes(BinaryDiff.patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
ignore_size_differences=True,
)),
b'abdc1224',
)
with self.assertRaises(BinaryDiffException) as context:
BinaryDiff.patch(
self.__make_filebytes(b"abcd"),
[
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
)
self.assertEqual(str(context.exception), 'Patch offset 06 is beyond the end of the binary!')
with self.assertRaises(BinaryDiffException) as context:
BinaryDiff.patch(
self.__make_filebytes(b"4321bcda"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
)
self.assertEqual(str(context.exception), 'Patch offset 02 expecting 63 but found 32!')
self.assertEqual(
self.__make_bytes(BinaryDiff.patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 8',
'06: * -> 32',
],
)),
b'abcd1224',
)
def test_patch_reverse(self) -> None:
self.assertEqual(
self.__make_bytes(BinaryDiff.patch(
self.__make_filebytes(b"abdc1224"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
)),
b'abcd1234',
)
with self.assertRaises(BinaryDiffException) as context:
BinaryDiff.patch(
self.__make_filebytes(b"abdc1224"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
)
self.assertEqual(str(context.exception), 'Patch is for binary of size 12 but binary is 8 bytes long!')
self.assertEqual(
self.__make_bytes(BinaryDiff.patch(
self.__make_filebytes(b"abdc1224"),
[
'# File size: 12',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
ignore_size_differences=True,
)),
b'abcd1234',
)
with self.assertRaises(BinaryDiffException) as context:
BinaryDiff.patch(
self.__make_filebytes(b"abdc"),
[
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
)
self.assertEqual(str(context.exception), 'Patch offset 06 is beyond the end of the binary!')
with self.assertRaises(BinaryDiffException) as context:
BinaryDiff.patch(
self.__make_filebytes(b"4321bcda"),
[
'# File size: 8',
'02: 63 64 -> 64 63',
'06: 33 -> 32',
],
reverse=True,
)
self.assertEqual(str(context.exception), 'Patch offset 02 expecting 64 but found 32!')
with self.assertRaises(BinaryDiffException) as context:
BinaryDiff.patch(
self.__make_filebytes(b"abcd1234"),
[
'# File size: 8',
'06: * -> 32',
],
reverse=True,
)
self.assertEqual(str(context.exception), 'Patch offset 06 specifies a wildcard and cannot be reversed!')

View File

@ -1,6 +1,7 @@
from arcadeutils import ByteUtil
import unittest
from arcadeutils import ByteUtil
class TestBinaryDiff(unittest.TestCase):

714
tests/test_FileBytes.py Normal file
View File

@ -0,0 +1,714 @@
import io
import unittest
from arcadeutils import FileBytes
class TestFileBytes(unittest.TestCase):
def test_read_only_operations(self) -> None:
b = b"0123456789"
fb = FileBytes(io.BytesIO(b))
# Length check.
self.assertEqual(
len(fb),
len(b),
)
# Basic index lookup.
self.assertEqual(
fb[5],
b[5],
)
# Make sure negative indexing works.
self.assertEqual(
fb[-2],
b[-2],
)
# Indexing outside of the length as an individual lookup
# should cause an IndexError.
with self.assertRaises(IndexError):
fb[10]
# Basic start:end lookups.
self.assertEqual(
fb[3:7],
b[3:7],
)
# Leave out the start or end.
self.assertEqual(
fb[3:],
b[3:],
)
self.assertEqual(
fb[:5],
b[:5],
)
self.assertEqual(
fb[-2:],
b[-2:],
)
self.assertEqual(
fb[:-8],
b[:-8]
)
# Mixed positive and negative indexes.
self.assertEqual(
fb[3:-2],
b[3:-2],
)
self.assertEqual(
fb[-8:5],
b[-8:5],
)
# Resulting in no data.
self.assertEqual(
fb[3:3],
b[3:3],
)
self.assertEqual(
fb[5:3],
fb[5:3],
)
# Out of range.
self.assertEqual(
fb[5:15],
b[5:15],
)
# Copy
self.assertEqual(
fb[:],
b,
)
# Indexing with a zero step should raise a ValueError.
with self.assertRaises(ValueError):
fb[3:5:0]
# Lookups with a step.
self.assertEqual(
fb[3:7:2],
b[3:7:2],
)
self.assertEqual(
fb[7:3:-2],
b[7:3:-2],
)
# Reverse copy.
self.assertEqual(
fb[::-1],
b[::-1],
)
# Provide default explicitly.
self.assertEqual(
fb[3:7:1],
b[3:7:1],
)
# Negative single step.
self.assertEqual(
fb[7:3:-1],
b[7:3:-1],
)
# Lookups that result in no data.
self.assertEqual(
fb[3:7:-1],
b[3:7:-1],
)
self.assertEqual(
fb[7:3:1],
b[7:3:1],
)
# Make sure that a clone of this object doesn't get any file changes
# and that it is identical.
self.assertEqual(
fb.clone()[:],
b,
)
# Attempt to serialize out the data and make sure it did not change
# before calling write.
handle = fb.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b,
)
# Make sure that the data is identical after calling write as well.
fb.write_changes()
handle = fb.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b,
)
def test_read_after_modify(self) -> None:
b = b"012a456bc9"
fb = FileBytes(io.BytesIO(b"0123456789"))
# Do some simple modifications.
fb[3] = 97
fb[7:9] = b"bc"
# Length check.
self.assertEqual(
len(fb),
len(b),
)
# Basic index lookup.
self.assertEqual(
fb[5],
b[5],
)
# Make sure negative indexing works.
self.assertEqual(
fb[-2],
b[-2],
)
# Indexing outside of the length as an individual lookup
# should cause an IndexError.
with self.assertRaises(IndexError):
fb[10]
# Basic start:end lookups.
self.assertEqual(
fb[3:7],
b[3:7],
)
# Leave out the start or end.
self.assertEqual(
fb[3:],
b[3:],
)
self.assertEqual(
fb[:5],
b[:5],
)
self.assertEqual(
fb[-2:],
b[-2:],
)
self.assertEqual(
fb[:-8],
b[:-8]
)
# Mixed positive and negative indexes.
self.assertEqual(
fb[3:-2],
b[3:-2],
)
self.assertEqual(
fb[-8:5],
b[-8:5],
)
# Resulting in no data.
self.assertEqual(
fb[3:3],
b[3:3],
)
self.assertEqual(
fb[5:3],
fb[5:3],
)
# Out of range.
self.assertEqual(
fb[5:15],
b[5:15],
)
# Copy
self.assertEqual(
fb[:],
b,
)
# Indexing with a zero step should raise a ValueError.
with self.assertRaises(ValueError):
fb[3:5:0]
# Lookups with a step.
self.assertEqual(
fb[3:7:2],
b[3:7:2],
)
self.assertEqual(
fb[7:3:-2],
b[7:3:-2],
)
# Reverse copy.
self.assertEqual(
fb[::-1],
b[::-1],
)
# Provide default explicitly.
self.assertEqual(
fb[3:7:1],
b[3:7:1],
)
# Negative single step.
self.assertEqual(
fb[7:3:-1],
b[7:3:-1],
)
# Lookups that result in no data.
self.assertEqual(
fb[3:7:-1],
b[3:7:-1],
)
self.assertEqual(
fb[7:3:1],
b[7:3:1],
)
# Verify that it gets serialized correctly.
fb.write_changes()
handle = fb.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b,
)
def test_modify_variants(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
fb[3] = 97
self.assertEqual(
fb[:],
b"012a456789",
)
fb[7:9] = b"bc"
self.assertEqual(
fb[:],
b"012a456bc9",
)
fb[4:8:2] = b"de"
self.assertEqual(
fb[:],
b"012ad5ebc9",
)
fb[-1] = 102
self.assertEqual(
fb[:],
b"012ad5ebcf",
)
# Verify that it gets serialized correctly.
fb.write_changes()
handle = fb.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"012ad5ebcf",
)
fb[7:3:-2] = b"gh"
self.assertEqual(
fb[:],
b"012adhegcf",
)
# Verify that it gets serialized correctly.
fb.write_changes()
handle = fb.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"012adhegcf",
)
def test_resize_fail(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
with self.assertRaises(NotImplementedError):
fb[3:4] = b"long"
with self.assertRaises(NotImplementedError):
fb[3:7] = b""
def test_append_modify(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
# Length check.
self.assertEqual(
len(fb),
10,
)
# Clone this so we can verify that clones don't receive additional modifications.
clone = fb.clone()
clone.append(b"abc")
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
13,
)
# Verify additional appends work.
clone.append(b"def")
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
16,
)
# Verify modification stuck.
self.assertEqual(
fb[:],
b"0123456789",
)
self.assertEqual(
clone[:],
b"0123456789abcdef",
)
# Verify that it gets serialized correctly.
clone.write_changes()
handle = clone.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"0123456789abcdef",
)
def test_modify_writeback_clones_unsafe(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
clone = fb.clone()
clone.append(b"abcdef")
# Verify modification stuck.
self.assertEqual(
fb[:],
b"0123456789",
)
self.assertEqual(
clone[:],
b"0123456789abcdef",
)
# Verify that it gets serialized correctly.
clone.write_changes()
handle = clone.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"0123456789abcdef",
)
# Verify that accessing the clone data works.
self.assertEqual(
clone[:],
b"0123456789abcdef",
)
clone[0:1] = b"z"
self.assertEqual(
clone[:],
b"z123456789abcdef",
)
# Verify that attempting to read or modify the original raises
# an error since we wrote the clone back to the original file.
with self.assertRaisesRegex(Exception, "Another FileBytes instance representing the same file was written back!"):
fb[:]
with self.assertRaisesRegex(Exception, "Another FileBytes instance representing the same file was written back!"):
fb[5] = 2
# Verify that making a new clone works properly.
newclone = fb.clone()
self.assertEqual(
newclone[:],
b"0123456789abcdef",
)
def test_add(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
# Length check.
self.assertEqual(
len(fb),
10,
)
# Create a copy by adding to the original.
clone = fb + b"abc"
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
13,
)
# Verify additional appends work including from FileBytes.
clone = clone + FileBytes(io.BytesIO(b"def"))
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
16,
)
# Verify modification stuck.
self.assertEqual(
fb[:],
b"0123456789",
)
self.assertEqual(
clone[:],
b"0123456789abcdef",
)
# Verify that it gets serialized correctly.
clone.write_changes()
handle = clone.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"0123456789abcdef",
)
def test_truncate_noop(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
# Length check.
self.assertEqual(
len(fb),
10,
)
# Create a copy by adding to the original.
clone = fb.clone()
clone.truncate(15)
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
10,
)
# Verify modification stuck.
self.assertEqual(
fb[:],
b"0123456789",
)
self.assertEqual(
clone[:],
b"0123456789",
)
def test_truncate_simple(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
# Length check.
self.assertEqual(
len(fb),
10,
)
# Create a copy by adding to the original.
clone = fb.clone()
clone.truncate(5)
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
5,
)
# Verify modification stuck.
self.assertEqual(
fb[:],
b"0123456789",
)
self.assertEqual(
clone[:],
b"01234",
)
# Verify that it gets serialized correctly.
clone.write_changes()
handle = clone.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"01234",
)
# Verify everything is good after writeback.
self.assertEqual(
len(clone),
5,
)
self.assertEqual(
clone[:],
b"01234",
)
def test_truncate_only_patches(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
# Length check.
self.assertEqual(
len(fb),
10,
)
# Create a copy by adding to the original.
clone = fb.clone()
clone.append(b"abcdef")
clone.truncate(13)
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
13,
)
# Verify modification stuck.
self.assertEqual(
fb[:],
b"0123456789",
)
self.assertEqual(
clone[:],
b"0123456789abc",
)
# Verify that it gets serialized correctly.
clone.write_changes()
handle = clone.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"0123456789abc",
)
# Verify that everything looks good still.
self.assertEqual(
len(clone),
13,
)
self.assertEqual(
clone[:],
b"0123456789abc",
)
def test_truncate_overlap(self) -> None:
fb = FileBytes(io.BytesIO(b"0123456789"))
# Length check.
self.assertEqual(
len(fb),
10,
)
# Create a copy by adding to the original.
clone = fb.clone()
clone.append(b"abcdef")
clone.truncate(7)
# New length check.
self.assertEqual(
len(fb),
10,
)
self.assertEqual(
len(clone),
7,
)
# Verify modification stuck.
self.assertEqual(
fb[:],
b"0123456789",
)
self.assertEqual(
clone[:],
b"0123456",
)
# Verify that it gets serialized correctly.
clone.write_changes()
handle = clone.handle
if not isinstance(handle, io.BytesIO):
raise Exception("File handle changed type somehow!")
self.assertEqual(
handle.getvalue(),
b"0123456",
)
# Verify that everything looks good still.
self.assertEqual(
len(clone),
7,
)
self.assertEqual(
clone[:],
b"0123456",
)