From 735af7b96054d803db90f7dde6dae63ba3874c47 Mon Sep 17 00:00:00 2001 From: Nayib Gloria <55710092+nayib-jose-gloria@users.noreply.github.com> Date: Fri, 26 Jul 2024 09:35:43 -0400 Subject: [PATCH] fix: for visium datasets, copy spatial-deepzoom-bucket assets when running dataset_metadata_update job (#7306) --- .happy/terraform/modules/batch/main.tf | 4 ++ .happy/terraform/modules/batch/variables.tf | 5 ++ .happy/terraform/modules/ecs-stack/main.tf | 2 + .../processing/dataset_metadata_update.py | 44 +++++++++---- backend/layers/thirdparty/s3_provider.py | 35 +++++++++++ .../test_dataset_metadata_update.py | 63 +++++++++++++++++-- 6 files changed, 138 insertions(+), 15 deletions(-) diff --git a/.happy/terraform/modules/batch/main.tf b/.happy/terraform/modules/batch/main.tf index c5fd5968df530..85f06d64494df 100644 --- a/.happy/terraform/modules/batch/main.tf +++ b/.happy/terraform/modules/batch/main.tf @@ -78,6 +78,10 @@ resource aws_batch_job_definition dataset_metadata_update { "name": "DATASETS_BUCKET", "value": "${var.datasets_bucket}" }, + { + "name": "SPATIAL_DEEP_ZOOM_BUCKET", + "value": "${var.spatial_deep_zoom_bucket}" + }, { "name": "DEPLOYMENT_STAGE", "value": "${var.deployment_stage}" diff --git a/.happy/terraform/modules/batch/variables.tf b/.happy/terraform/modules/batch/variables.tf index 1622e9baa74cb..c7eec11174adf 100644 --- a/.happy/terraform/modules/batch/variables.tf +++ b/.happy/terraform/modules/batch/variables.tf @@ -13,6 +13,11 @@ variable datasets_bucket { description = "Datasets public-access bucket name" } +variable spatial_deep_zoom_bucket { + type = string + description = "Bucket for Visium Dataset spatial deep zoom images" +} + variable image { type = string description = "Image name" diff --git a/.happy/terraform/modules/ecs-stack/main.tf b/.happy/terraform/modules/ecs-stack/main.tf index adcb1277a5331..6fe174f35949f 100644 --- a/.happy/terraform/modules/ecs-stack/main.tf +++ b/.happy/terraform/modules/ecs-stack/main.tf @@ -82,6 +82,7 @@ locals { artifact_bucket = try(local.secret["s3_buckets"]["artifact"]["name"], "") cellxgene_bucket = try(local.secret["s3_buckets"]["cellxgene"]["name"], "") datasets_bucket = try(local.secret["s3_buckets"]["datasets"]["name"], "") + spatial_deep_zoom_bucket = try(local.secret["s3_buckets"]["spatial_deep_zoom"]["name"], "") dataset_submissions_bucket = try(local.secret["s3_buckets"]["dataset_submissions"]["name"], "") wmg_bucket = try(local.secret["s3_buckets"]["wmg"]["name"], "") @@ -316,6 +317,7 @@ module upload_batch { artifact_bucket = local.artifact_bucket cellxgene_bucket = local.cellxgene_bucket datasets_bucket = local.datasets_bucket + spatial_deep_zoom_bucket = local.spatial_deep_zoom_bucket frontend_url = local.frontend_url batch_container_memory_limit = local.batch_container_memory_limit } diff --git a/backend/layers/processing/dataset_metadata_update.py b/backend/layers/processing/dataset_metadata_update.py index f0bd6049ef334..586aec2ee5a46 100644 --- a/backend/layers/processing/dataset_metadata_update.py +++ b/backend/layers/processing/dataset_metadata_update.py @@ -44,7 +44,7 @@ class DatasetMetadataUpdaterWorker(ProcessDownload): - def __init__(self, artifact_bucket: str, datasets_bucket: str) -> None: + def __init__(self, artifact_bucket: str, datasets_bucket: str, spatial_deep_zoom_dir: str = None) -> None: # init each worker with business logic backed by non-shared DB connection self.business_logic = BusinessLogic( DatabaseProvider(), @@ -57,6 +57,7 @@ def __init__(self, artifact_bucket: str, datasets_bucket: str) -> None: super().__init__(self.business_logic, self.business_logic.uri_provider, self.business_logic.s3_provider) self.artifact_bucket = artifact_bucket self.datasets_bucket = datasets_bucket + self.spatial_deep_zoom_dir = spatial_deep_zoom_dir def update_raw_h5ad( self, @@ -181,10 +182,19 @@ def update_cxg( self, cxg_uri: str, new_cxg_dir: str, - dataset_version_id: DatasetVersionId, + current_dataset_version_id: DatasetVersionId, + new_dataset_version_id: DatasetVersionId, metadata_update: DatasetArtifactMetadataUpdate, ): self.s3_provider.upload_directory(cxg_uri, new_cxg_dir) + + current_spatial_deep_zoom_dir = f"s3://{self.spatial_deep_zoom_dir}/{current_dataset_version_id.id}" + spatial_dzi_uri = f"{current_spatial_deep_zoom_dir}/spatial.dzi" + # Copy spatial deep zoom directory if it exists (only exists for Visium datasets) + if self.s3_provider.uri_exists(spatial_dzi_uri): + new_spatial_deep_zoom_dir = f"s3://{self.spatial_deep_zoom_dir}/{new_dataset_version_id.id}" + self.s3_provider.upload_directory(current_spatial_deep_zoom_dir, new_spatial_deep_zoom_dir) + ctx = tiledb.Ctx(H5ADDataFile.tile_db_ctx_config) array_name = f"{new_cxg_dir}/cxg_group_metadata" with tiledb.open(array_name, mode="r", ctx=ctx) as metadata_array: @@ -194,18 +204,24 @@ def update_cxg( with tiledb.open(array_name, mode="w", ctx=ctx) as metadata_array: metadata_array.meta["corpora"] = json.dumps(cxg_metadata_dict) - self.business_logic.add_dataset_artifact(dataset_version_id, DatasetArtifactType.CXG, new_cxg_dir) - self.update_processing_status(dataset_version_id, DatasetStatusKey.CXG, DatasetConversionStatus.CONVERTED) + self.business_logic.add_dataset_artifact(new_dataset_version_id, DatasetArtifactType.CXG, new_cxg_dir) + self.update_processing_status(new_dataset_version_id, DatasetStatusKey.CXG, DatasetConversionStatus.CONVERTED) class DatasetMetadataUpdater(ProcessDownload): def __init__( - self, business_logic: BusinessLogic, artifact_bucket: str, cellxgene_bucket: str, datasets_bucket: str + self, + business_logic: BusinessLogic, + artifact_bucket: str, + cellxgene_bucket: str, + datasets_bucket: str, + spatial_deep_zoom_dir: str, ) -> None: super().__init__(business_logic, business_logic.uri_provider, business_logic.s3_provider) self.artifact_bucket = artifact_bucket self.cellxgene_bucket = cellxgene_bucket self.datasets_bucket = datasets_bucket + self.spatial_deep_zoom_dir = spatial_deep_zoom_dir @staticmethod def update_raw_h5ad( @@ -258,13 +274,15 @@ def update_rds( def update_cxg( artifact_bucket: str, datasets_bucket: str, + spatial_deep_zoom_dir: str, cxg_uri: str, new_cxg_dir: str, - dataset_version_id: DatasetVersionId, + current_dataset_version_id: DatasetVersionId, + new_dataset_version_id: DatasetVersionId, metadata_update: DatasetArtifactMetadataUpdate, ): - DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket).update_cxg( - cxg_uri, new_cxg_dir, dataset_version_id, metadata_update + DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket, spatial_deep_zoom_dir).update_cxg( + cxg_uri, new_cxg_dir, current_dataset_version_id, new_dataset_version_id, metadata_update ) def update_metadata( @@ -365,8 +383,10 @@ def update_metadata( args=( self.artifact_bucket, self.datasets_bucket, + self.spatial_deep_zoom_dir, artifact_uris[DatasetArtifactType.CXG], f"s3://{self.cellxgene_bucket}/{new_artifact_key_prefix}.cxg", + current_dataset_version_id, new_dataset_version_id, metadata_update, ), @@ -416,9 +436,11 @@ def has_valid_artifact_statuses(self, dataset_version_id: DatasetVersionId) -> b artifact_bucket = os.environ.get("ARTIFACT_BUCKET", "test-bucket") cellxgene_bucket = os.environ.get("CELLXGENE_BUCKET", "test-cellxgene-bucket") datasets_bucket = os.environ.get("DATASETS_BUCKET", "test-datasets-bucket") + spatial_deep_zoom_bucket = os.environ.get("SPATIAL_DEEP_ZOOM_BUCKET", "test-spatial-deep-zoom-bucket") + spatial_deep_zoom_dir = f"{spatial_deep_zoom_bucket}/spatial-deep-zoom" current_dataset_version_id = DatasetVersionId(os.environ["CURRENT_DATASET_VERSION_ID"]) new_dataset_version_id = DatasetVersionId(os.environ["NEW_DATASET_VERSION_ID"]) metadata_update = DatasetArtifactMetadataUpdate(**json.loads(os.environ["METADATA_UPDATE_JSON"])) - DatasetMetadataUpdater(business_logic, artifact_bucket, cellxgene_bucket, datasets_bucket).update_metadata( - current_dataset_version_id, new_dataset_version_id, metadata_update - ) + DatasetMetadataUpdater( + business_logic, artifact_bucket, cellxgene_bucket, datasets_bucket, spatial_deep_zoom_dir + ).update_metadata(current_dataset_version_id, new_dataset_version_id, metadata_update) diff --git a/backend/layers/thirdparty/s3_provider.py b/backend/layers/thirdparty/s3_provider.py index 662dc3f9af0ac..efcf60e9bbd8b 100644 --- a/backend/layers/thirdparty/s3_provider.py +++ b/backend/layers/thirdparty/s3_provider.py @@ -25,6 +25,33 @@ def parse_s3_uri(s3_uri: str) -> Tuple[str, str]: parsed_url = urlparse(s3_uri) return parsed_url.netloc, parsed_url.path[1:] + def uri_exists(self, s3_uri: str) -> bool: + bucket, key = self.parse_s3_uri(s3_uri) + try: + logger.info( + { + "message": "Running HEAD request", + "s3_uri": s3_uri, + "bucket": bucket, + "key": key, + } + ) + self.client.head_object(Bucket=bucket, Key=key) + logger.info( + { + "message": "HEAD request succeeded!", + } + ) + return True + except Exception as e: + logger.info( + { + "message": "HEAD request failed", + "error": str(e), + } + ) + return False + def get_file_size(self, path: str) -> int: """ Returns the file size of an S3 object located at `path` @@ -120,6 +147,14 @@ def upload_directory(self, src_dir: str, s3_uri: str): if os.getenv("BOTO_ENDPOINT_URL"): command.append(f"--endpoint-url={os.getenv('BOTO_ENDPOINT_URL')}") + logger.info( + { + "message": "Copying directory", + "src_dir": src_dir, + "dst_dir": s3_uri, + } + ) + command.extend( [ "s3", diff --git a/tests/unit/processing/test_dataset_metadata_update.py b/tests/unit/processing/test_dataset_metadata_update.py index 24b29ad86d705..cf791205c5323 100644 --- a/tests/unit/processing/test_dataset_metadata_update.py +++ b/tests/unit/processing/test_dataset_metadata_update.py @@ -48,7 +48,7 @@ def setUp(self): super().setUp() self.business_logic.s3_provider = MockS3Provider() self.updater = DatasetMetadataUpdater( - self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket" + self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket", "spatial_deep_zoom_dir" ) def mock_download(source_uri, local_path): @@ -387,7 +387,7 @@ class TestDatasetMetadataUpdaterWorker(BaseProcessingTest): @patch("backend.layers.processing.dataset_metadata_update.S3Provider", Mock(side_effect=MockS3Provider)) def setUp(self): super().setUp() - self.updater = DatasetMetadataUpdaterWorker("artifact_bucket", "datasets_bucket") + self.updater = DatasetMetadataUpdaterWorker("artifact_bucket", "datasets_bucket", "spatial_deep_zoom_dir") self.updater.business_logic = self.business_logic def mock_download(source_uri, local_path): @@ -515,10 +515,20 @@ def test_update_cxg(self): metadata_update = DatasetArtifactMetadataUpdate( citation="Publication www.doi.org/567.8", title="New Dataset Title" ) - self.updater.update_cxg(None, testing_cxg_temp_directory, new_dataset_version_id, metadata_update) + self.updater.update_cxg( + None, + testing_cxg_temp_directory, + current_dataset_version.version_id, + new_dataset_version_id, + metadata_update, + ) # check new cxg directory exists assert self.updater.s3_provider.uri_exists(testing_cxg_temp_directory) + # check spatial zoom directory was not created for non-spatial dataset + assert not self.updater.s3_provider.uri_exists( + f"s3://{self.updater.spatial_deep_zoom_dir}/{new_dataset_version_id.id}" + ) # check DB artifacts + status are updated new_dataset_version = self.business_logic.get_dataset_version(new_dataset_version_id) @@ -544,6 +554,51 @@ def test_update_cxg(self): assert expected_metadata == actual_stored_metadata + def test_update_cxg__with_spatial_deepzoom_assets(self): + with tempfile.TemporaryDirectory() as tempdir: + testing_cxg_temp_directory = tempdir + + cxg_metadata = { + "cxg_version": "1.0.0", + "corpora": json.dumps( + {"schema_version": "3.0.0", "citation": "Publication www.doi.org/123.4", "title": "Dataset Title 1"} + ), + } + convert_dictionary_to_cxg_group( + testing_cxg_temp_directory, cxg_metadata, group_metadata_name="cxg_group_metadata" + ) + + collection_version = self.generate_unpublished_collection(add_datasets=1) + current_dataset_version = collection_version.datasets[0] + new_dataset_version_id, _ = self.business_logic.ingest_dataset( + collection_version_id=collection_version.version_id, + url=None, + file_size=0, + current_dataset_version_id=current_dataset_version.version_id, + start_step_function=False, + ) + + self.updater.s3_provider.upload_directory( + tempdir, + f"s3://{self.updater.spatial_deep_zoom_dir}/{current_dataset_version.version_id.id}/spatial.dzi", + ) + + metadata_update = DatasetArtifactMetadataUpdate( + citation="Publication www.doi.org/567.8", title="New Dataset Title" + ) + self.updater.update_cxg( + None, + testing_cxg_temp_directory, + current_dataset_version.version_id, + new_dataset_version_id, + metadata_update, + ) + + # check new spatial deepzoom directory exists + assert self.updater.s3_provider.uri_exists( + f"s3://{self.updater.spatial_deep_zoom_dir}/{new_dataset_version_id.id}" + ) + @patch("backend.layers.processing.dataset_metadata_update.os.remove") def test_update_rds(self, *args): with tempfile.TemporaryDirectory() as tempdir: @@ -587,7 +642,7 @@ class TestValidArtifactStatuses(BaseProcessingTest): def setUp(self): super().setUp() self.updater = DatasetMetadataUpdater( - self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket" + self.business_logic, "artifact_bucket", "cellxgene_bucket", "datasets_bucket", "spatial_deep_zoom_dir" ) @parameterized.expand([DatasetConversionStatus.CONVERTED, DatasetConversionStatus.SKIPPED])