Skip to content

Commit

Permalink
Merge pull request #824 from rkingsbury/fsperf
Browse files Browse the repository at this point in the history
FileStore performance enhancements
  • Loading branch information
rkingsbury authored Jul 21, 2023
2 parents ebd69c2 + c5cc5a9 commit e0ed61b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 34 deletions.
73 changes: 43 additions & 30 deletions src/maggma/stores/file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
"""

import hashlib
import os
import fnmatch
import re

import warnings
from pathlib import Path
Expand Down Expand Up @@ -91,7 +94,10 @@ def __init__(
self.path = Path(path) if isinstance(path, str) else path

self.json_name = json_name
self.file_filters = file_filters if file_filters else ["*"]
file_filters = file_filters if file_filters else ["*"]
self.file_filters = re.compile(
"|".join(fnmatch.translate(p) for p in file_filters)
)
self.collection_name = "file_store"
self.key = "file_id"
self.include_orphans = include_orphans
Expand Down Expand Up @@ -189,21 +195,21 @@ def read(self) -> List[Dict]:
"""
file_list = []
# generate a list of files in subdirectories
for pattern in self.file_filters:
# list every file that matches the pattern
for f in self.path.rglob(pattern):
if f.is_file():
# ignore the .json file created by the Store
if f.name == self.json_name:
continue
for root, dirs, files in os.walk(self.path):
# for pattern in self.file_filters:
for match in filter(self.file_filters.match, files):
# for match in fnmatch.filter(files, pattern):
path = Path(os.path.join(root, match))
# ignore the .json file created by the Store
if path.is_file() and path.name != self.json_name:
# filter based on depth
depth = len(f.relative_to(self.path).parts) - 1
depth = len(path.relative_to(self.path).parts) - 1
if self.max_depth is None or depth <= self.max_depth:
file_list.append(self._create_record_from_file(f))
file_list.append(self._create_record_from_file(path))

return file_list

def _create_record_from_file(self, f: Union[str, Path]) -> Dict:
def _create_record_from_file(self, f: Path) -> Dict:
"""
Given the path to a file, return a Dict that constitues a record of
basic information about that file. The keys in the returned dict
Expand All @@ -222,10 +228,6 @@ def _create_record_from_file(self, f: Union[str, Path]) -> Dict:
hash: str = Hash of the file contents
orphan: bool = Whether this record is an orphan
"""
# make sure f is a Path object
if not isinstance(f, Path):
f = Path(f)

# compute the file_id from the relative path
relative_path = f.relative_to(self.path)
digest = hashlib.md5()
Expand All @@ -234,27 +236,34 @@ def _create_record_from_file(self, f: Union[str, Path]) -> Dict:

# hash the file contents
digest2 = hashlib.md5()
block_size = 128 * digest2.block_size
b = bytearray(128 * 2056)
mv = memoryview(b)
digest2.update(self.name.encode())
with open(f.as_posix(), "rb") as file:
buf = file.read(block_size)
digest2.update(buf)
with open(f.as_posix(), "rb", buffering=0) as file:
# this block copied from the file_digest method in python 3.11+
# see https://github.com/python/cpython/blob/0ba07b2108d4763273f3fb85544dde34c5acd40a/Lib/hashlib.py#L213
if hasattr(file, "getbuffer"):
# io.BytesIO object, use zero-copy buffer
digest2.update(file.getbuffer())
else:
for n in iter(lambda: file.readinto(mv), 0):
digest2.update(mv[:n])

content_hash = str(digest2.hexdigest())
stats = f.stat()

d = {
return {
"name": f.name,
"path": f,
"path_relative": relative_path,
"parent": f.parent.name,
"size": f.stat().st_size,
"last_updated": datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc),
"size": stats.st_size,
"last_updated": datetime.fromtimestamp(stats.st_mtime, tz=timezone.utc),
"orphan": False,
"hash": content_hash,
self.key: file_id,
}

return d

def connect(self, force_reset: bool = False):
"""
Connect to the source data
Expand Down Expand Up @@ -375,7 +384,7 @@ def query( # type: ignore
hint: Optional[Dict[str, Union[Sort, int]]] = None,
skip: int = 0,
limit: int = 0,
contents_size_limit: Optional[int] = None,
contents_size_limit: Optional[int] = 0,
) -> Iterator[Dict]:
"""
Queries the Store for a set of documents
Expand All @@ -392,6 +401,9 @@ def query( # type: ignore
contents_size_limit: Maximum file size in bytes for which to return contents.
The FileStore will attempt to read the file and populate the 'contents' key
with its content at query time, unless the file size is larger than this value.
By default, reading content is disabled. Note that enabling content reading
can substantially slow down the query operation, especially when there
are large numbers of files.
"""
return_contents = False
criteria = criteria if criteria else {}
Expand All @@ -407,10 +419,11 @@ def query( # type: ignore

orig_properties = properties.copy() if properties else None

if properties is None or properties.get("contents"):
if properties is None:
# None means return all fields, including contents
return_contents = True
elif properties.get("contents"):
return_contents = True

if properties is not None and return_contents:
# remove contents b/c it isn't stored in the MemoryStore
properties.pop("contents")
# add size and path to query so that file can be read
Expand All @@ -426,7 +439,7 @@ def query( # type: ignore
limit=limit,
):
# add file contents to the returned documents, if appropriate
if return_contents:
if return_contents and not d.get("orphan"):
if contents_size_limit is None or d["size"] <= contents_size_limit:
# attempt to read the file contents and inject into the document
# TODO - could add more logic for detecting different file types
Expand All @@ -438,7 +451,7 @@ def query( # type: ignore
data = f"Unable to read: {e}"

elif d["size"] > contents_size_limit:
data = "Unable to read: file too large"
data = f"File exceeds size limit of {contents_size_limit} bytes"
else:
data = "Unable to read: Unknown error"

Expand Down
4 changes: 1 addition & 3 deletions src/maggma/stores/mongolike.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,9 +482,7 @@ def update(self, docs: Union[List[Dict], Dict], key: Union[List, str, None] = No
if not isinstance(docs, list):
docs = [docs]

for d in docs:
d = jsanitize(d, allow_bson=True)

for d in map(lambda x: jsanitize(x, allow_bson=True), docs):
# document-level validation is optional
validates = True
if self.validator:
Expand Down
2 changes: 1 addition & 1 deletion tests/stores/test_file_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def test_query(test_dir):
properties=["name", "contents"],
contents_size_limit=50,
)
assert d["contents"] == "Unable to read: file too large"
assert d["contents"] == "File exceeds size limit of 50 bytes"
assert d.get("name")


Expand Down

0 comments on commit e0ed61b

Please sign in to comment.