Skip to content

Commit

Permalink
Merge pull request #893 from kbuma/enhancement/open-data-store
Browse files Browse the repository at this point in the history
Enhancement/open data store
  • Loading branch information
munrojm authored Dec 11, 2023
2 parents 0443064 + 85e5a5d commit 777bb9f
Show file tree
Hide file tree
Showing 3 changed files with 714 additions and 15 deletions.
72 changes: 57 additions & 15 deletions src/maggma/stores/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from hashlib import sha1
from io import BytesIO
from json import dumps
from typing import Any, Dict, Iterator, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union

import msgpack # type: ignore
from monty.msgpack import default as monty_default
Expand Down Expand Up @@ -184,7 +184,7 @@ def query(
else:
try:
# TODO: This is ugly and unsafe, do some real checking before pulling data
data = self.s3_bucket.Object(self.sub_dir + str(doc[self.key])).get()["Body"].read()
data = self.s3_bucket.Object(self._get_full_key_path(doc[self.key])).get()["Body"].read()
except botocore.exceptions.ClientError as e:
# If a client error is thrown, then check that it was a NoSuchKey or NoSuchBucket error.
# If it was a NoSuchKey error, then the object does not exist.
Expand All @@ -199,13 +199,27 @@ def query(
raise e

if self.unpack_data:
data = self._unpack(data=data, compressed=doc.get("compression", "") == "zlib")
data = self._read_data(data=data, compress_header=doc.get("compression", ""))

if self.last_updated_field in doc:
data[self.last_updated_field] = doc[self.last_updated_field]

yield data

def _read_data(self, data: bytes, compress_header: str) -> Dict:
"""Reads the data and transforms it into a dictionary.
Allows for subclasses to apply custom schemes for transforming
the data retrieved from S3.
Args:
data (bytes): The raw byte representation of the data.
compress_header (str): String representing the type of compression used on the data.
Returns:
Dict: Dictionary representation of the data.
"""
return self._unpack(data=data, compressed=compress_header == "zlib")

@staticmethod
def _unpack(data: bytes, compressed: bool):
if compressed:
Expand Down Expand Up @@ -307,12 +321,22 @@ def update(
else:
additional_metadata = list(additional_metadata)

self._write_to_s3_and_index(docs, key + additional_metadata + self.searchable_fields)

def _write_to_s3_and_index(self, docs: List[Dict], search_keys: List[str]):
"""Implements updating of the provided documents in S3 and the index.
Allows for subclasses to apply custom approaches to parellizing the writing.
Args:
docs (List[Dict]): The documents to update
search_keys (List[str]): The keys of the information to be updated in the index
"""
with ThreadPoolExecutor(max_workers=self.s3_workers) as pool:
fs = {
pool.submit(
self.write_doc_to_s3,
doc=itr_doc,
search_keys=key + additional_metadata + self.searchable_fields,
search_keys=search_keys,
)
for itr_doc in docs
}
Expand Down Expand Up @@ -366,14 +390,36 @@ def _get_resource_and_bucket(self):

return resource, bucket

def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
def _get_full_key_path(self, id: str) -> str:
"""Produces the full key path for S3 items
Args:
id (str): The value of the key identifier.
Returns:
str: The full key path
"""
return self.sub_dir + str(id)

def _get_compression_function(self) -> Callable:
"""Returns the function to use for compressing data."""
return zlib.compress

def _get_decompression_function(self) -> Callable:
"""Returns the function to use for decompressing data."""
return zlib.decompress

def write_doc_to_s3(self, doc: Dict, search_keys: List[str]) -> Dict:
"""
Write the data to s3 and return the metadata to be inserted into the index db.
Args:
doc: the document.
search_keys: list of keys to pull from the docs and be inserted into the
index db.
Returns:
Dict: The metadata to be inserted into the index db
"""
s3_bucket = self._get_bucket()

Expand All @@ -393,11 +439,7 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
if self.compress:
# Compress with zlib if chosen
search_doc["compression"] = "zlib"
data = zlib.compress(data)

if self.last_updated_field in doc:
# need this conversion for aws metadata insert
search_doc[self.last_updated_field] = str(to_isoformat_ceil_ms(doc[self.last_updated_field]))
data = self._get_compression_function()(data)

# keep a record of original keys, in case these are important for the individual researcher
# it is not expected that this information will be used except in disaster recovery
Expand All @@ -407,7 +449,7 @@ def write_doc_to_s3(self, doc: Dict, search_keys: List[str]):
search_doc["s3-to-mongo-keys"] = dumps(s3_to_mongo_keys)
s3_bucket.upload_fileobj(
Fileobj=BytesIO(data),
Key=self.sub_dir + str(doc[self.key]),
Key=self._get_full_key_path(str(doc[self.key])),
ExtraArgs={"Metadata": {s3_to_mongo_keys[k]: str(v) for k, v in search_doc.items()}},
)

Expand Down Expand Up @@ -452,7 +494,7 @@ def remove_docs(self, criteria: Dict, remove_s3_object: bool = False):
# Can remove up to 1000 items at a time via boto
to_remove_chunks = list(grouper(to_remove, n=1000))
for chunk_to_remove in to_remove_chunks:
objlist = [{"Key": f"{self.sub_dir}{obj}"} for obj in chunk_to_remove]
objlist = [{"Key": self._get_full_key_path(obj)} for obj in chunk_to_remove]
self.s3_bucket.delete_objects(Delete={"Objects": objlist})

@property
Expand Down Expand Up @@ -486,11 +528,11 @@ def rebuild_index_from_s3_data(self, **kwargs):
bucket = self.s3_bucket
objects = bucket.objects.filter(Prefix=self.sub_dir)
for obj in objects:
key_ = self.sub_dir + obj.key
key_ = self._get_full_key_path(obj.key)
data = self.s3_bucket.Object(key_).get()["Body"].read()

if self.compress:
data = zlib.decompress(data)
data = self._get_decompression_function()(data)
unpacked_data = msgpack.unpackb(data, raw=False)
self.update(unpacked_data, **kwargs)

Expand All @@ -504,7 +546,7 @@ def rebuild_metadata_from_index(self, index_query: Optional[dict] = None):
"""
qq = {} if index_query is None else index_query
for index_doc in self.index.query(qq):
key_ = self.sub_dir + index_doc[self.key]
key_ = self._get_full_key_path(index_doc[self.key])
s3_object = self.s3_bucket.Object(key_)
new_meta = {self._sanitize_key(k): v for k, v in s3_object.metadata.items()}
for k, v in index_doc.items():
Expand Down
Loading

0 comments on commit 777bb9f

Please sign in to comment.