Skip to content

Commit

Permalink
Merging staging branch into prod branch
Browse files Browse the repository at this point in the history
  • Loading branch information
nayib-jose-gloria committed Jul 29, 2024
2 parents 49c7618 + be530db commit 4da68a2
Show file tree
Hide file tree
Showing 33 changed files with 358 additions and 71 deletions.
4 changes: 4 additions & 0 deletions .happy/terraform/modules/batch/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ resource aws_batch_job_definition dataset_metadata_update {
"name": "DATASETS_BUCKET",
"value": "${var.datasets_bucket}"
},
{
"name": "SPATIAL_DEEP_ZOOM_BUCKET",
"value": "${var.spatial_deep_zoom_bucket}"
},
{
"name": "DEPLOYMENT_STAGE",
"value": "${var.deployment_stage}"
Expand Down
5 changes: 5 additions & 0 deletions .happy/terraform/modules/batch/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ variable datasets_bucket {
description = "Datasets public-access bucket name"
}

variable spatial_deep_zoom_bucket {
type = string
description = "Bucket for Visium Dataset spatial deep zoom images"
}

variable image {
type = string
description = "Image name"
Expand Down
2 changes: 2 additions & 0 deletions .happy/terraform/modules/ecs-stack/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ locals {
artifact_bucket = try(local.secret["s3_buckets"]["artifact"]["name"], "")
cellxgene_bucket = try(local.secret["s3_buckets"]["cellxgene"]["name"], "")
datasets_bucket = try(local.secret["s3_buckets"]["datasets"]["name"], "")
spatial_deep_zoom_bucket = try(local.secret["s3_buckets"]["spatial_deep_zoom"]["name"], "")
dataset_submissions_bucket = try(local.secret["s3_buckets"]["dataset_submissions"]["name"], "")
wmg_bucket = try(local.secret["s3_buckets"]["wmg"]["name"], "")

Expand Down Expand Up @@ -316,6 +317,7 @@ module upload_batch {
artifact_bucket = local.artifact_bucket
cellxgene_bucket = local.cellxgene_bucket
datasets_bucket = local.datasets_bucket
spatial_deep_zoom_bucket = local.spatial_deep_zoom_bucket
frontend_url = local.frontend_url
batch_container_memory_limit = local.batch_container_memory_limit
}
Expand Down
1 change: 1 addition & 0 deletions backend/curation/api/curation-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ components:
| Human Tumor Atlas Network (HTAN) |
| Kidney Precision Medicine Project (KPMP) |
| LungMAP |
| Pediatric Center of Excellence in Nephrology (PCEN) |
| SEA-AD |
| Wellcome HCA Strategic Science Support |
contact_email:
Expand Down
16 changes: 14 additions & 2 deletions backend/layers/business/business.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ def trigger_dataset_artifact_update(
new_dataset_version_id = self.create_empty_dataset_version_for_current_dataset(
collection_version.version_id, current_dataset_version_id
).version_id
# Update citation for new dataset version
doi = next((link.uri for link in collection_version.metadata.links if link.type == "DOI"), None)
metadata_update.citation = self.generate_dataset_citation(
collection_version.collection_id, new_dataset_version_id, doi
)

self.batch_job_provider.start_metadata_update_batch_job(
current_dataset_version_id, new_dataset_version_id, metadata_update
Expand Down Expand Up @@ -943,6 +948,14 @@ def get_unpublished_dataset_versions(self, dataset_id: DatasetId) -> List[Datase
)
)

def delete_dataset_versions(self, dataset_versions: List[DatasetVersion]) -> None:
"""
Deletes a list of dataset versions and associated dataset artifact rows from the database, as well
as kicking off deletion of their corresponding assets from S3
"""
self.delete_dataset_version_assets(dataset_versions)
self.database_provider.delete_dataset_versions(dataset_versions)

def delete_dataset_version_assets(self, dataset_versions: List[DatasetVersion]) -> None:
self.delete_dataset_versions_from_public_bucket([dv.version_id.id for dv in dataset_versions])
self.delete_artifacts(reduce(lambda artifacts, dv: artifacts + dv.artifacts, dataset_versions, []))
Expand Down Expand Up @@ -1065,8 +1078,7 @@ def publish_collection_version(
version.collection_id, from_date=date_of_last_publish
)
versions_to_delete = list(filter(lambda dv: dv.version_id.id not in versions_to_keep, dataset_versions))
self.delete_dataset_version_assets(versions_to_delete)
self.database_provider.delete_dataset_versions(versions_to_delete)
self.delete_dataset_versions(versions_to_delete)

def get_dataset_version(self, dataset_version_id: DatasetVersionId) -> Optional[DatasetVersion]:
"""
Expand Down
1 change: 1 addition & 0 deletions backend/layers/common/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"Human Tumor Atlas Network (HTAN)",
"Kidney Precision Medicine Project (KPMP)",
"LungMAP",
"Pediatric Center of Excellence in Nephrology (PCEN)",
"SEA-AD",
"Wellcome HCA Strategic Science Support",
}
Expand Down
2 changes: 1 addition & 1 deletion backend/layers/persistence/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ def get_all_collections_versions(self, get_tombstoned: bool = False) -> Iterable
if str(v.collection_id) in all_canonical_map:
for dataset_version_id in v.datasets:
dataset_version_id_str = str(dataset_version_id)
dataset_id = all_dataset_version_mappings[dataset_version_id_str]
dataset_id = all_dataset_version_mappings.get(dataset_version_id_str)
if dataset_id:
if not get_tombstoned and dataset_id in all_dataset_tombstones:
continue
Expand Down
18 changes: 17 additions & 1 deletion backend/layers/persistence/persistence_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ class DatabaseProviderMock(DatabaseProviderInterface):
# All the dataset versions
datasets_versions: Dict[str, DatasetVersion]

# Dataset artifacts
dataset_artifacts: Dict[str, DatasetArtifact]

def __init__(self) -> None:
super().__init__()
self.collections = {} # rename to: active_collections
self.collections_versions = {}
self.datasets = {} # rename to: active_datasets
self.datasets_versions = {}
self.dataset_artifacts = {}

# TODO: add publisher_metadata here?
def create_canonical_collection(
Expand Down Expand Up @@ -250,6 +254,8 @@ def _delete_dataset_version_and_artifact_rows(
self, dataset_version_rows: List[DatasetVersion], session: Any
) -> None:
for d_v_row in dataset_version_rows:
for artifact_id in [a.id.id for a in d_v_row.artifacts]:
del self.dataset_artifacts[artifact_id]
del self.datasets_versions[d_v_row.version_id.id] # Artifacts live on DatasetVersion; they get deleted

def get_collection_version(self, version_id: CollectionVersionId) -> CollectionVersion:
Expand Down Expand Up @@ -458,6 +464,13 @@ def get_dataset_artifacts_by_version_id(self, dataset_version_id: DatasetId) ->
dataset = self.datasets_versions[dataset_version_id.id]
return copy.deepcopy(dataset.artifacts)

def get_dataset_artifacts(self, dataset_artifact_id_list: List[DatasetArtifactId]) -> List[DatasetArtifact]:
return [
self.dataset_artifacts[artifact_id.id]
for artifact_id in dataset_artifact_id_list
if artifact_id.id in self.dataset_artifacts
]

def create_canonical_dataset(self, collection_version_id: CollectionVersionId) -> DatasetVersion:
# Creates a dataset and initializes it with one version
dataset_id = DatasetId()
Expand Down Expand Up @@ -488,11 +501,14 @@ def add_dataset_artifact(
) -> DatasetArtifactId:
version = self.datasets_versions[version_id.id]
artifact_id = DatasetArtifactId()
version.artifacts.append(DatasetArtifact(artifact_id, artifact_type, artifact_uri))
dataset_artifact = DatasetArtifact(artifact_id, artifact_type, artifact_uri)
version.artifacts.append(dataset_artifact)
self.dataset_artifacts[artifact_id.id] = dataset_artifact
return artifact_id

def update_dataset_artifact(self, artifact_id: DatasetArtifactId, artifact_uri: str) -> None:
found_artifact = False
self.dataset_artifacts[artifact_id.id].uri = artifact_uri
for version in self.datasets_versions.values():
if found_artifact:
break
Expand Down
44 changes: 33 additions & 11 deletions backend/layers/processing/dataset_metadata_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@


class DatasetMetadataUpdaterWorker(ProcessDownload):
def __init__(self, artifact_bucket: str, datasets_bucket: str) -> None:
def __init__(self, artifact_bucket: str, datasets_bucket: str, spatial_deep_zoom_dir: str = None) -> None:
# init each worker with business logic backed by non-shared DB connection
self.business_logic = BusinessLogic(
DatabaseProvider(),
Expand All @@ -57,6 +57,7 @@ def __init__(self, artifact_bucket: str, datasets_bucket: str) -> None:
super().__init__(self.business_logic, self.business_logic.uri_provider, self.business_logic.s3_provider)
self.artifact_bucket = artifact_bucket
self.datasets_bucket = datasets_bucket
self.spatial_deep_zoom_dir = spatial_deep_zoom_dir

def update_raw_h5ad(
self,
Expand Down Expand Up @@ -181,10 +182,19 @@ def update_cxg(
self,
cxg_uri: str,
new_cxg_dir: str,
dataset_version_id: DatasetVersionId,
current_dataset_version_id: DatasetVersionId,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
self.s3_provider.upload_directory(cxg_uri, new_cxg_dir)

current_spatial_deep_zoom_dir = f"s3://{self.spatial_deep_zoom_dir}/{current_dataset_version_id.id}"
spatial_dzi_uri = f"{current_spatial_deep_zoom_dir}/spatial.dzi"
# Copy spatial deep zoom directory if it exists (only exists for Visium datasets)
if self.s3_provider.uri_exists(spatial_dzi_uri):
new_spatial_deep_zoom_dir = f"s3://{self.spatial_deep_zoom_dir}/{new_dataset_version_id.id}"
self.s3_provider.upload_directory(current_spatial_deep_zoom_dir, new_spatial_deep_zoom_dir)

ctx = tiledb.Ctx(H5ADDataFile.tile_db_ctx_config)
array_name = f"{new_cxg_dir}/cxg_group_metadata"
with tiledb.open(array_name, mode="r", ctx=ctx) as metadata_array:
Expand All @@ -194,18 +204,24 @@ def update_cxg(
with tiledb.open(array_name, mode="w", ctx=ctx) as metadata_array:
metadata_array.meta["corpora"] = json.dumps(cxg_metadata_dict)

self.business_logic.add_dataset_artifact(dataset_version_id, DatasetArtifactType.CXG, new_cxg_dir)
self.update_processing_status(dataset_version_id, DatasetStatusKey.CXG, DatasetConversionStatus.CONVERTED)
self.business_logic.add_dataset_artifact(new_dataset_version_id, DatasetArtifactType.CXG, new_cxg_dir)
self.update_processing_status(new_dataset_version_id, DatasetStatusKey.CXG, DatasetConversionStatus.CONVERTED)


class DatasetMetadataUpdater(ProcessDownload):
def __init__(
self, business_logic: BusinessLogic, artifact_bucket: str, cellxgene_bucket: str, datasets_bucket: str
self,
business_logic: BusinessLogic,
artifact_bucket: str,
cellxgene_bucket: str,
datasets_bucket: str,
spatial_deep_zoom_dir: str,
) -> None:
super().__init__(business_logic, business_logic.uri_provider, business_logic.s3_provider)
self.artifact_bucket = artifact_bucket
self.cellxgene_bucket = cellxgene_bucket
self.datasets_bucket = datasets_bucket
self.spatial_deep_zoom_dir = spatial_deep_zoom_dir

@staticmethod
def update_raw_h5ad(
Expand Down Expand Up @@ -258,13 +274,15 @@ def update_rds(
def update_cxg(
artifact_bucket: str,
datasets_bucket: str,
spatial_deep_zoom_dir: str,
cxg_uri: str,
new_cxg_dir: str,
dataset_version_id: DatasetVersionId,
current_dataset_version_id: DatasetVersionId,
new_dataset_version_id: DatasetVersionId,
metadata_update: DatasetArtifactMetadataUpdate,
):
DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket).update_cxg(
cxg_uri, new_cxg_dir, dataset_version_id, metadata_update
DatasetMetadataUpdaterWorker(artifact_bucket, datasets_bucket, spatial_deep_zoom_dir).update_cxg(
cxg_uri, new_cxg_dir, current_dataset_version_id, new_dataset_version_id, metadata_update
)

def update_metadata(
Expand Down Expand Up @@ -365,8 +383,10 @@ def update_metadata(
args=(
self.artifact_bucket,
self.datasets_bucket,
self.spatial_deep_zoom_dir,
artifact_uris[DatasetArtifactType.CXG],
f"s3://{self.cellxgene_bucket}/{new_artifact_key_prefix}.cxg",
current_dataset_version_id,
new_dataset_version_id,
metadata_update,
),
Expand Down Expand Up @@ -416,9 +436,11 @@ def has_valid_artifact_statuses(self, dataset_version_id: DatasetVersionId) -> b
artifact_bucket = os.environ.get("ARTIFACT_BUCKET", "test-bucket")
cellxgene_bucket = os.environ.get("CELLXGENE_BUCKET", "test-cellxgene-bucket")
datasets_bucket = os.environ.get("DATASETS_BUCKET", "test-datasets-bucket")
spatial_deep_zoom_bucket = os.environ.get("SPATIAL_DEEP_ZOOM_BUCKET", "test-spatial-deep-zoom-bucket")
spatial_deep_zoom_dir = f"{spatial_deep_zoom_bucket}/spatial-deep-zoom"
current_dataset_version_id = DatasetVersionId(os.environ["CURRENT_DATASET_VERSION_ID"])
new_dataset_version_id = DatasetVersionId(os.environ["NEW_DATASET_VERSION_ID"])
metadata_update = DatasetArtifactMetadataUpdate(**json.loads(os.environ["METADATA_UPDATE_JSON"]))
DatasetMetadataUpdater(business_logic, artifact_bucket, cellxgene_bucket, datasets_bucket).update_metadata(
current_dataset_version_id, new_dataset_version_id, metadata_update
)
DatasetMetadataUpdater(
business_logic, artifact_bucket, cellxgene_bucket, datasets_bucket, spatial_deep_zoom_dir
).update_metadata(current_dataset_version_id, new_dataset_version_id, metadata_update)
8 changes: 4 additions & 4 deletions backend/layers/processing/h5ad_data_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def to_cxg(
self.write_anndata_embeddings_to_cxg(output_cxg_directory, ctx)
logging.info("\t...dataset embeddings saved")

self.write_anndata_x_matrices_to_cxg(output_cxg_directory, ctx, sparse_threshold)
self.write_anndata_x_matrices_to_cxg(output_cxg_directory, ctx, sparse_threshold) # big memory usage
logging.info("\t...dataset X matrix saved")

logging.info("Completed writing to CXG.")
Expand All @@ -97,10 +97,10 @@ def write_anndata_x_matrices_to_cxg(self, output_cxg_directory, ctx, sparse_thre
matrix_container = f"{output_cxg_directory}/X"

x_matrix_data = self.anndata.X
is_sparse = is_matrix_sparse(x_matrix_data, sparse_threshold)
is_sparse = is_matrix_sparse(x_matrix_data, sparse_threshold) # big memory usage
logging.info(f"is_sparse: {is_sparse}")

convert_matrices_to_cxg_arrays(matrix_container, x_matrix_data, is_sparse, ctx)
convert_matrices_to_cxg_arrays(matrix_container, x_matrix_data, is_sparse, ctx) # big memory usage

suffixes = ["r", "c"] if is_sparse else [""]
logging.info("start consolidating")
Expand Down Expand Up @@ -183,7 +183,7 @@ def validate_anndata(self):

def extract_anndata_elements_from_file(self):
logging.info(f"Reading in AnnData dataset: {path.basename(self.input_filename)}")
self.anndata = anndata.read_h5ad(self.input_filename)
self.anndata = anndata.read_h5ad(self.input_filename, backed="r")
logging.info("Completed reading in AnnData dataset!")

self.obs = self.transform_dataframe_index_into_column(self.anndata.obs, "obs", self.obs_index_column_name)
Expand Down
2 changes: 1 addition & 1 deletion backend/layers/processing/process_cxg.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def process(
self.process_cxg(labeled_h5ad_filename, dataset_version_id, cellxgene_bucket, current_artifacts)

@logit
def make_cxg(self, local_filename, dataset_version_id):
def make_cxg(self, local_filename, dataset_version_id: DatasetVersionId):
"""
Convert the uploaded H5AD file to the CXG format servicing the cellxgene Explorer.
"""
Expand Down
38 changes: 24 additions & 14 deletions backend/layers/processing/schema_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def collection_migrate(

def log_errors_and_cleanup(self, collection_version_id: str) -> list:
errors = []
rolled_back_datasets = []
collection_version = self.business_logic.get_collection_version(CollectionVersionId(collection_version_id))
object_keys_to_delete = []

Expand All @@ -226,32 +227,37 @@ def log_errors_and_cleanup(self, collection_version_id: str) -> list:
self.logger.info("checking dataset", extra=_log_extras)
key_prefix = self.get_key_prefix(previous_dataset_version_id)
object_keys_to_delete.append(f"{key_prefix}/migrated.h5ad")
if not self.check_dataset_is_latest_schema_version(dataset):
error_message = "Did Not Migrate"
dataset_status = "n/a"
if dataset.status is not None:
error_message = dataset.status.validation_message
dataset_status = dataset.status.to_dict()
if dataset.status.processing_status != DatasetProcessingStatus.SUCCESS:
error = {
"message": error_message,
"dataset_status": dataset_status,
"message": dataset.status.validation_message,
"dataset_status": dataset.status.to_dict(),
"collection_id": collection_version.collection_id.id,
"collection_version_id": collection_version_id,
"dataset_version_id": dataset_version_id,
"dataset_id": dataset_id,
"rollback": False,
"rollback": True,
}
# If the dataset is not successfully processed, rollback to the version from before migration
self.business_logic.restore_previous_dataset_version(
CollectionVersionId(collection_version_id), dataset.dataset_id
)
rolled_back_datasets.append(dataset)
self.logger.error(error)
errors.append(error)
elif dataset.status.processing_status != DatasetProcessingStatus.SUCCESS:
elif not self.check_dataset_is_latest_schema_version(dataset):
error_message = "Did Not Migrate"
dataset_status = "n/a"
if dataset.status is not None:
error_message = dataset.status.validation_message
dataset_status = dataset.status.to_dict()
error = {
"message": dataset.status.validation_message,
"dataset_status": dataset.status.to_dict(),
"message": error_message,
"dataset_status": dataset_status,
"collection_id": collection_version.collection_id.id,
"collection_version_id": collection_version_id,
"dataset_version_id": dataset_version_id,
"dataset_id": dataset_id,
"rollback": True,
"rollback": False,
}
self.logger.error(error)
errors.append(error)
Expand All @@ -263,6 +269,10 @@ def log_errors_and_cleanup(self, collection_version_id: str) -> list:
self.s3_provider.delete_files(self.artifact_bucket, object_keys_to_delete)
if errors:
self._store_sfn_response("report/errors", collection_version_id, errors)
# clean up artifacts for any now-orphaned, rolled back datasets
if rolled_back_datasets:
# TODO: replace with async job to delete orphaned dataset version DB rows + artifacts
self.business_logic.delete_dataset_versions(rolled_back_datasets)
return errors

def _store_sfn_response(self, directory: str, file_name: str, response: Dict[str, str]):
Expand Down Expand Up @@ -315,7 +325,7 @@ def wrapper(*args, **kwargs):

return wrapper

def report(self, artifact_bucket=None, execution_id=None, dry_run=True) -> dict:
def report(self, artifact_bucket=None, execution_id=None, dry_run=False) -> dict:
"""
Generate a report of the schema migration process. This function will download all the error and migration
:param artifact_bucket: The bucket where the schema migration artifacts are stored.
Expand Down
Loading

0 comments on commit 4da68a2

Please sign in to comment.