Skip to content

Commit

Permalink
Merging main branch into staging branch
Browse files Browse the repository at this point in the history
  • Loading branch information
actions-user committed Jul 26, 2024
2 parents 97f192e + 735af7b commit 3be2bac
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 15 deletions.
4 changes: 4 additions & 0 deletions .happy/terraform/modules/batch/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ resource aws_batch_job_definition dataset_metadata_update {
"name": "DATASETS_BUCKET",
"value": "${var.datasets_bucket}"
},
{
"name": "SPATIAL_DEEP_ZOOM_BUCKET",
"value": "${var.spatial_deep_zoom_bucket}"
},
{
"name": "DEPLOYMENT_STAGE",
"value": "${var.deployment_stage}"
Expand Down
5 changes: 5 additions & 0 deletions .happy/terraform/modules/batch/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ variable datasets_bucket {
description = "Datasets public-access bucket name"
}

variable spatial_deep_zoom_bucket {
type = string
description = "Bucket for Visium Dataset spatial deep zoom images"
}

variable image {
type = string
description = "Image name"
Expand Down
2 changes: 2 additions & 0 deletions .happy/terraform/modules/ecs-stack/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ locals {
artifact_bucket = try(local.secret["s3_buckets"]["artifact"]["name"], "")
cellxgene_bucket = try(local.secret["s3_buckets"]["cellxgene"]["name"], "")
datasets_bucket = try(local.secret["s3_buckets"]["datasets"]["name"], "")
spatial_deep_zoom_bucket = try(local.secret["s3_buckets"]["spatial_deep_zoom"]["name"], "")
dataset_submissions_bucket = try(local.secret["s3_buckets"]["dataset_submissions"]["name"], "")
wmg_bucket = try(local.secret["s3_buckets"]["wmg"]["name"], "")

Expand Down Expand Up @@ -316,6 +317,7 @@ module upload_batch {
artifact_bucket = local.artifact_bucket
cellxgene_bucket = local.cellxgene_bucket
datasets_bucket = local.datasets_bucket
spatial_deep_zoom_bucket = local.spatial_deep_zoom_bucket
frontend_url = local.frontend_url
batch_container_memory_limit = local.batch_container_memory_limit
}
Expand Down
44 changes: 33 additions & 11 deletions backend/layers/processing/dataset_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@


class DatasetMetadataUpdaterWorker(ProcessDownload):
def __init__(self, artifact_bucket: str, datasets_bucket: str) -> None:
def __init__(self, artifact_bucket: str, datasets_bucket: str, spatial_deep_zoom_dir: str = None) -> None:
# init each worker with business logic backed by non-shared DB connection
self.business_logic = BusinessLogic(
DatabaseProvider(),
Expand All @@ -57,6 +57,7 @@ def __init__(self, artifact_bucket: str, datasets_bucket: str) -> None:
super().__init__(self.business_logic, self.business_logic.uri_provider, self.business_logic.s3_provider)
self.artifact_bucket = artifact_bucket
self.datasets_bucket = datasets_bucket
self.spatial_deep_zoom_dir = spatial_deep_zoom_dir

def update_raw_h5ad(
self,
Expand Down Expand Up @@ -181,10 +182,19 @@ def update_cxg(
self,
cxg_uri: str,
new_cxg_dir: str,
dataset_version_id: DatasetVersionId,
current_dataset_version_id: DatasetVersionId,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
self.s3_provider.upload_directory(cxg_uri, new_cxg_dir)

current_spatial_deep_zoom_dir = f"s3://{self.spatial_deep_zoom_dir}/{current_dataset_version_id.id}"
spatial_dzi_uri = f"{current_spatial_deep_zoom_dir}/spatial.dzi"
# Copy spatial deep zoom directory if it exists (only exists for Visium datasets)
if self.s3_provider.uri_exists(spatial_dzi_uri):
new_spatial_deep_zoom_dir = f"s3://{self.spatial_deep_zoom_dir}/{new_dataset_version_id.id}"
self.s3_provider.upload_directory(current_spatial_deep_zoom_dir, new_spatial_deep_zoom_dir)

ctx = tiledb.Ctx(H5ADDataFile.tile_db_ctx_config)
array_name = f"{new_cxg_dir}/cxg_group_metadata"
with tiledb.open(array_name, mode="r", ctx=ctx) as metadata_array:
Expand All @@ -194,18 +204,24 @@ def update_cxg(
with tiledb.open(array_name, mode="w", ctx=ctx) as metadata_array:
metadata_array.meta["corpora"] = json.dumps(cxg_metadata_dict)

self.business_logic.add_dataset_artifact(dataset_version_id, DatasetArtifactType.CXG, new_cxg_dir)
self.update_processing_status(dataset_version_id, DatasetStatusKey.CXG, DatasetConversionStatus.CONVERTED)
self.business_logic.add_dataset_artifact(new_dataset_version_id, DatasetArtifactType.CXG, new_cxg_dir)
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.CXG, DatasetConversionStatus.CONVERTED)


class DatasetMetadataUpdater(ProcessDownload):
def __init__(
self, business_logic: BusinessLogic, artifact_bucket: str, cellxgene_bucket: str, datasets_bucket: str
self,
business_logic: BusinessLogic,
artifact_bucket: str,
cellxgene_bucket: str,
datasets_bucket: str,
spatial_deep_zoom_dir: str,
) -> None:
super().__init__(business_logic, business_logic.uri_provider, business_logic.s3_provider)
self.artifact_bucket = artifact_bucket
self.cellxgene_bucket = cellxgene_bucket
self.datasets_bucket = datasets_bucket
self.spatial_deep_zoom_dir = spatial_deep_zoom_dir

@staticmethod
def update_raw_h5ad(
Expand Down Expand Up @@ -258,13 +274,15 @@ def update_rds(
def update_cxg(
artifact_bucket: str,
datasets_bucket: str,
spatial_deep_zoom_dir: str,
cxg_uri: str,
new_cxg_dir: str,
dataset_version_id: DatasetVersionId,
current_dataset_version_id: DatasetVersionId,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket).update_cxg(
cxg_uri, new_cxg_dir, dataset_version_id, metadata_update
DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket, spatial_deep_zoom_dir).update_cxg(
cxg_uri, new_cxg_dir, current_dataset_version_id, new_dataset_version_id, metadata_update
)

def update_metadata(
Expand Down Expand Up @@ -365,8 +383,10 @@ def update_metadata(
args=(
self.artifact_bucket,
self.datasets_bucket,
self.spatial_deep_zoom_dir,
artifact_uris[DatasetArtifactType.CXG],
f"s3://{self.cellxgene_bucket}/{new_artifact_key_prefix}.cxg",
current_dataset_version_id,
new_dataset_version_id,
metadata_update,
),
Expand Down Expand Up @@ -416,9 +436,11 @@ def has_valid_artifact_statuses(self, dataset_version_id: DatasetVersionId) -> b
artifact_bucket = os.environ.get("ARTIFACT_BUCKET", "test-bucket")
cellxgene_bucket = os.environ.get("CELLXGENE_BUCKET", "test-cellxgene-bucket")
datasets_bucket = os.environ.get("DATASETS_BUCKET", "test-datasets-bucket")
spatial_deep_zoom_bucket = os.environ.get("SPATIAL_DEEP_ZOOM_BUCKET", "test-spatial-deep-zoom-bucket")
spatial_deep_zoom_dir = f"{spatial_deep_zoom_bucket}/spatial-deep-zoom"
current_dataset_version_id = DatasetVersionId(os.environ["CURRENT_DATASET_VERSION_ID"])
new_dataset_version_id = DatasetVersionId(os.environ["NEW_DATASET_VERSION_ID"])
metadata_update = DatasetArtifactMetadataUpdate(**json.loads(os.environ["METADATA_UPDATE_JSON"]))
DatasetMetadataUpdater(business_logic, artifact_bucket, cellxgene_bucket, datasets_bucket).update_metadata(
current_dataset_version_id, new_dataset_version_id, metadata_update
)
DatasetMetadataUpdater(
business_logic, artifact_bucket, cellxgene_bucket, datasets_bucket, spatial_deep_zoom_dir
).update_metadata(current_dataset_version_id, new_dataset_version_id, metadata_update)
35 changes: 35 additions & 0 deletions backend/layers/thirdparty/s3_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,33 @@ def parse_s3_uri(s3_uri: str) -> Tuple[str, str]:
parsed_url = urlparse(s3_uri)
return parsed_url.netloc, parsed_url.path[1:]

def uri_exists(self, s3_uri: str) -> bool:
bucket, key = self.parse_s3_uri(s3_uri)
try:
logger.info(
{
"message": "Running HEAD request",
"s3_uri": s3_uri,
"bucket": bucket,
"key": key,
}
)
self.client.head_object(Bucket=bucket, Key=key)
logger.info(
{
"message": "HEAD request succeeded!",
}
)
return True
except Exception as e:
logger.info(
{
"message": "HEAD request failed",
"error": str(e),
}
)
return False

def get_file_size(self, path: str) -> int:
"""
Returns the file size of an S3 object located at `path`
Expand Down Expand Up @@ -120,6 +147,14 @@ def upload_directory(self, src_dir: str, s3_uri: str):
if os.getenv("BOTO_ENDPOINT_URL"):
command.append(f"--endpoint-url={os.getenv('BOTO_ENDPOINT_URL')}")

logger.info(
{
"message": "Copying directory",
"src_dir": src_dir,
"dst_dir": s3_uri,
}
)

command.extend(
[
"s3",
Expand Down
63 changes: 59 additions & 4 deletions tests/unit/processing/test_dataset_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def setUp(self):
super().setUp()
self.business_logic.s3_provider = MockS3Provider()
self.updater = DatasetMetadataUpdater(
self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket"
self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket", "spatial_deep_zoom_dir"
)

def mock_download(source_uri, local_path):
Expand Down Expand Up @@ -387,7 +387,7 @@ class TestDatasetMetadataUpdaterWorker(BaseProcessingTest):
@patch("backend.layers.processing.dataset_metadata_update.S3Provider", Mock(side_effect=MockS3Provider))
def setUp(self):
super().setUp()
self.updater = DatasetMetadataUpdaterWorker("artifact_bucket", "datasets_bucket")
self.updater = DatasetMetadataUpdaterWorker("artifact_bucket", "datasets_bucket", "spatial_deep_zoom_dir")
self.updater.business_logic = self.business_logic

def mock_download(source_uri, local_path):
Expand Down Expand Up @@ -515,10 +515,20 @@ def test_update_cxg(self):
metadata_update = DatasetArtifactMetadataUpdate(
citation="Publication www.doi.org/567.8", title="New Dataset Title"
)
self.updater.update_cxg(None, testing_cxg_temp_directory, new_dataset_version_id, metadata_update)
self.updater.update_cxg(
None,
testing_cxg_temp_directory,
current_dataset_version.version_id,
new_dataset_version_id,
metadata_update,
)

# check new cxg directory exists
assert self.updater.s3_provider.uri_exists(testing_cxg_temp_directory)
# check spatial zoom directory was not created for non-spatial dataset
assert not self.updater.s3_provider.uri_exists(
f"s3://{self.updater.spatial_deep_zoom_dir}/{new_dataset_version_id.id}"
)

# check DB artifacts + status are updated
new_dataset_version = self.business_logic.get_dataset_version(new_dataset_version_id)
Expand All @@ -544,6 +554,51 @@ def test_update_cxg(self):

assert expected_metadata == actual_stored_metadata

def test_update_cxg__with_spatial_deepzoom_assets(self):
with tempfile.TemporaryDirectory() as tempdir:
testing_cxg_temp_directory = tempdir

cxg_metadata = {
"cxg_version": "1.0.0",
"corpora": json.dumps(
{"schema_version": "3.0.0", "citation": "Publication www.doi.org/123.4", "title": "Dataset Title 1"}
),
}
convert_dictionary_to_cxg_group(
testing_cxg_temp_directory, cxg_metadata, group_metadata_name="cxg_group_metadata"
)

collection_version = self.generate_unpublished_collection(add_datasets=1)
current_dataset_version = collection_version.datasets[0]
new_dataset_version_id, _ = self.business_logic.ingest_dataset(
collection_version_id=collection_version.version_id,
url=None,
file_size=0,
current_dataset_version_id=current_dataset_version.version_id,
start_step_function=False,
)

self.updater.s3_provider.upload_directory(
tempdir,
f"s3://{self.updater.spatial_deep_zoom_dir}/{current_dataset_version.version_id.id}/spatial.dzi",
)

metadata_update = DatasetArtifactMetadataUpdate(
citation="Publication www.doi.org/567.8", title="New Dataset Title"
)
self.updater.update_cxg(
None,
testing_cxg_temp_directory,
current_dataset_version.version_id,
new_dataset_version_id,
metadata_update,
)

# check new spatial deepzoom directory exists
assert self.updater.s3_provider.uri_exists(
f"s3://{self.updater.spatial_deep_zoom_dir}/{new_dataset_version_id.id}"
)

@patch("backend.layers.processing.dataset_metadata_update.os.remove")
def test_update_rds(self, *args):
with tempfile.TemporaryDirectory() as tempdir:
Expand Down Expand Up @@ -587,7 +642,7 @@ class TestValidArtifactStatuses(BaseProcessingTest):
def setUp(self):
super().setUp()
self.updater = DatasetMetadataUpdater(
self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket"
self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket", "spatial_deep_zoom_dir"
)

@parameterized.expand([DatasetConversionStatus.CONVERTED, DatasetConversionStatus.SKIPPED])
Expand Down

0 comments on commit 3be2bac

Please sign in to comment.