Add search function to FileBytes.
This commit is contained in:
parent
dd00c20b25
commit
9e53b4af0e
|
@ -90,6 +90,15 @@ Truncates the internal representation of the file to the number of bytes specifi
|
|||
This discards any data or changes applied after the truncation. When calling `write_changes()`
|
||||
the file will be resized accordingly to truncate it down.
|
||||
|
||||
### search() method
|
||||
|
||||
Takes a single bytes or FileBytes object and searches the current instance for those
|
||||
bytes. Returns the index of the first found occurence of those bytes if they are present
|
||||
or None of they are not. Note that much like `append()`, searching from another FileBytes
|
||||
will cause the entire file to be read before it is used as the search term. Optionally
|
||||
a start keyword argument can be supplied to specify an offset to start searching at.
|
||||
Optionally an end keyword argument can be supplied to specify an offset to stop searching at.
|
||||
|
||||
### write_changes() method
|
||||
|
||||
Applies all append, truncate and update operations that were performed to the instance
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import BinaryIO, Dict, List, Set, Tuple, Union, overload
|
||||
from typing import BinaryIO, Dict, List, Optional, Set, Tuple, Union, overload
|
||||
|
||||
|
||||
class FileBytes:
|
||||
|
@ -17,6 +17,84 @@ class FileBytes:
|
|||
def handle(self) -> BinaryIO:
|
||||
return self.__handle
|
||||
|
||||
def search(self, search: Union[bytes, "FileBytes"], *, start: Optional[int] = None, end: Optional[int] = None) -> Optional[int]:
|
||||
# Search the file for search bytes in a faster manner than reloading the
|
||||
# file byte for byte for every position to search.
|
||||
|
||||
searchlen = len(search)
|
||||
if searchlen > self.__patchlength:
|
||||
# There's no way that the search bytes could be in this file.
|
||||
return None
|
||||
if isinstance(search, FileBytes):
|
||||
search = search[:]
|
||||
|
||||
if start is None:
|
||||
searchstart = 0
|
||||
else:
|
||||
searchstart = start
|
||||
if searchstart < 0 or searchstart > (self.__patchlength - (searchlen - 1)):
|
||||
# Never going to find it anyway.
|
||||
return None
|
||||
|
||||
if end is None:
|
||||
searchend = self.__patchlength
|
||||
else:
|
||||
searchend = end
|
||||
searchend -= (searchlen - 1)
|
||||
if searchend <= searchstart:
|
||||
# Never going to find it anyway.
|
||||
return None
|
||||
|
||||
chunksize = max(searchlen * 2, 1024)
|
||||
startoffset = searchstart
|
||||
data: bytes = self[searchstart:(searchstart + (chunksize * 3))]
|
||||
endoffset = searchstart + len(data)
|
||||
|
||||
def addchunk() -> bool:
|
||||
nonlocal chunksize
|
||||
nonlocal startoffset
|
||||
nonlocal endoffset
|
||||
nonlocal data
|
||||
|
||||
# Load the next chunk of data, including changes.
|
||||
newdata = self[endoffset:(endoffset + chunksize)]
|
||||
if not newdata:
|
||||
return False
|
||||
|
||||
# Stick the data on the end of the cache.
|
||||
data = data + newdata
|
||||
|
||||
# Update the end offset pointer so we know were to load from next time.
|
||||
endoffset += len(newdata)
|
||||
|
||||
# If we got too long, then truncate ourselves so we don't blow up
|
||||
# our memory searching the file.
|
||||
if len(data) >= (3 * chunksize):
|
||||
data = data[chunksize:]
|
||||
startoffset += chunksize
|
||||
|
||||
return True
|
||||
|
||||
for offset in range(searchstart, searchend):
|
||||
start = offset
|
||||
end = offset + searchlen
|
||||
|
||||
if end > endoffset:
|
||||
if not addchunk():
|
||||
# No more chunks left to search, and we hit the end of the
|
||||
# current chunk, so we have no more data to find.
|
||||
return None
|
||||
|
||||
actualstart = start - startoffset
|
||||
actualend = end - startoffset
|
||||
|
||||
# If this chunk looks like a match, then return the start index.
|
||||
if data[actualstart:actualend] == search:
|
||||
return start
|
||||
|
||||
# Could not find the data.
|
||||
return None
|
||||
|
||||
def __len__(self) -> int:
|
||||
if self.__unsafe:
|
||||
raise Exception("Another FileBytes instance representing the same file was written back!")
|
||||
|
@ -229,11 +307,10 @@ class FileBytes:
|
|||
|
||||
# Do we have any modifications to the file in this area?
|
||||
modifications = any(index in self.__patches for index in range(start, stop, step))
|
||||
outofrange = any(index >= self.__filelength for index in range(start, stop, step))
|
||||
|
||||
# Now see if we can do any fast loading
|
||||
if start < stop and step == 1:
|
||||
if not modifications and not outofrange:
|
||||
if not modifications:
|
||||
# This is just a contiguous read
|
||||
self.__handle.seek(start)
|
||||
return self.__handle.read(stop - start)
|
||||
|
@ -253,7 +330,7 @@ class FileBytes:
|
|||
|
||||
return bytes(data)
|
||||
elif start > stop and step == -1:
|
||||
if not modifications and not outofrange:
|
||||
if not modifications:
|
||||
# This is just a continguous read, reversed
|
||||
self.__handle.seek(stop + 1)
|
||||
return self.__handle.read(start - stop)[::-1]
|
||||
|
|
2
setup.py
2
setup.py
|
@ -8,7 +8,7 @@ with open(os.path.join("arcadeutils", "README.md"), "r", encoding="utf-8") as fh
|
|||
|
||||
setup(
|
||||
name='arcadeutils',
|
||||
version='0.1.4',
|
||||
version='0.1.5',
|
||||
description='Collection of utilities written in Python for working with various arcade binaries.',
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import io
|
||||
import random
|
||||
import unittest
|
||||
|
||||
from arcadeutils import FileBytes
|
||||
|
@ -712,3 +713,73 @@ class TestFileBytes(unittest.TestCase):
|
|||
clone[:],
|
||||
b"0123456",
|
||||
)
|
||||
|
||||
def test_search_basic(self) -> None:
|
||||
fb = FileBytes(io.BytesIO((b"\0" * 54321) + (b"0123456789") + (b"\0" * 54321)))
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789"),
|
||||
54321,
|
||||
)
|
||||
self.assertEqual(
|
||||
fb.search(b"4567"),
|
||||
54325,
|
||||
)
|
||||
self.assertEqual(
|
||||
fb.search(b"abcde"),
|
||||
None,
|
||||
)
|
||||
|
||||
def test_search_bounds(self) -> None:
|
||||
fb = FileBytes(io.BytesIO((b"\0" * 5) + (b"0123456789") + (b"\0" * 5)))
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789", start=5),
|
||||
5,
|
||||
)
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789", start=6),
|
||||
None,
|
||||
)
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789", end=15),
|
||||
5,
|
||||
)
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789", end=14),
|
||||
None,
|
||||
)
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789", start=3, end=18),
|
||||
5,
|
||||
)
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789", start=5, end=15),
|
||||
5,
|
||||
)
|
||||
|
||||
def test_search_edges(self) -> None:
|
||||
fb = FileBytes(io.BytesIO((b"\0" * 5) + (b"0123456789")))
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789"),
|
||||
5,
|
||||
)
|
||||
|
||||
fb = FileBytes(io.BytesIO(b"0123456789"))
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789"),
|
||||
0,
|
||||
)
|
||||
|
||||
fb = FileBytes(io.BytesIO(b"0123456789" + (b"\0" * 5)))
|
||||
self.assertEqual(
|
||||
fb.search(b"0123456789"),
|
||||
0,
|
||||
)
|
||||
|
||||
def test_search_random(self) -> None:
|
||||
for _ in range(25):
|
||||
location = random.randint(1, 2000)
|
||||
fb = FileBytes(io.BytesIO((b"\0" * location) + (b"12345") + (b"\0" * random.randint(1, 2000))))
|
||||
self.assertEqual(
|
||||
fb.search(b"12345"),
|
||||
location,
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue