From 7caadb52585c46facead3b90403eb1e0eb644070 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isak=20Ohlsson=20=C3=85ngnell?= <40887124+islean@users.noreply.github.com> Date: Fri, 1 Dec 2023 15:11:41 +0100 Subject: [PATCH] (Archiving) CLI functionality (#2345) (minor) ### Added - CLI command archive-all-non-archived-spring-files which invokes its namesake in the SpringArchiveAPI. - CLI command update-job-statuses which queries any ongoing archivals/retrievals to see if they have finished. --- cg/apps/housekeeper/hk.py | 35 ++++- cg/cli/archive.py | 66 +++++++++ cg/cli/base.py | 2 + cg/constants/archiving.py | 2 + cg/exc.py | 4 + cg/io/api.py | 20 +-- cg/io/controller.py | 4 +- cg/meta/archive/archive.py | 105 +++++++------ cg/meta/archive/ddn_dataflow.py | 62 ++++++-- tests/meta/archive/__init__.py | 0 tests/meta/archive/conftest.py | 79 +++++++++- tests/meta/archive/test_archive_api.py | 29 ++-- tests/meta/archive/test_archive_cli.py | 197 +++++++++++++++++++++++++ tests/meta/archive/test_archiving.py | 6 +- 14 files changed, 524 insertions(+), 87 deletions(-) create mode 100644 cg/cli/archive.py create mode 100644 tests/meta/archive/__init__.py create mode 100644 tests/meta/archive/test_archive_cli.py diff --git a/cg/apps/housekeeper/hk.py b/cg/apps/housekeeper/hk.py index 4483f04fa2..d63cb57124 100644 --- a/cg/apps/housekeeper/hk.py +++ b/cg/apps/housekeeper/hk.py @@ -7,7 +7,11 @@ from housekeeper.include import checksum as hk_checksum from housekeeper.include import include_version from housekeeper.store import Store, models -from housekeeper.store.database import create_all_tables, drop_all_tables, initialize_database +from housekeeper.store.database import ( + create_all_tables, + drop_all_tables, + initialize_database, +) from housekeeper.store.models import Archive, Bundle, File, Version from sqlalchemy.orm import Query @@ -422,14 +426,15 @@ def get_archived_files(self, bundle_name: str, tags: list | None = None) -> list """Returns all archived_files from a given bundle, tagged with the given tags""" return self._store.get_archived_files(bundle_name=bundle_name, tags=tags or []) - def add_archives(self, files: list[Path], archive_task_id: int) -> None: + def add_archives(self, files: list[File], archive_task_id: int) -> None: """Creates an archive object for the given files, and adds the archive task id to them.""" for file in files: - archived_file: File | None = self._store.get_files(file_path=file.as_posix()).first() - if not archived_file: - raise HousekeeperFileMissingError(f"No file in housekeeper with the path {file}") + file_id: int = file.id + LOG.info( + f"Adding an archive to file {file_id} with archiving task id {archive_task_id}" + ) archive: Archive = self._store.create_archive( - archived_file.id, archiving_task_id=archive_task_id + file_id=file_id, archiving_task_id=archive_task_id ) self._store.session.add(archive) self.commit() @@ -618,3 +623,21 @@ def get_ongoing_archivals(self) -> list[Archive]: def get_ongoing_retrievals(self) -> list[Archive]: return self._store.get_ongoing_retrievals() + + def delete_archives(self, archival_task_id: int): + archives_to_delete: list[Archive] = self.get_archive_entries( + archival_task_id=archival_task_id + ) + for archive in archives_to_delete: + self._store.session.delete(archive) + self._store.session.commit() + + def update_archive_retrieved_at( + self, old_retrieval_job_id: int, new_retrieval_job_id: int | None + ): + archives_to_update: list[Archive] = self.get_archive_entries( + retrieval_task_id=old_retrieval_job_id + ) + for archive in archives_to_update: + archive.retrieval_task_id = new_retrieval_job_id + self._store.session.commit() diff --git a/cg/cli/archive.py b/cg/cli/archive.py new file mode 100644 index 0000000000..e69cae4e27 --- /dev/null +++ b/cg/cli/archive.py @@ -0,0 +1,66 @@ +import click +from click.core import ParameterSource + +from cg.constants.archiving import DEFAULT_SPRING_ARCHIVE_COUNT +from cg.meta.archive.archive import SpringArchiveAPI +from cg.models.cg_config import CGConfig + + +@click.group() +def archive(): + """Archive utilities.""" + pass + + +@archive.command("archive-spring-files") +@click.option( + "-l", + "--limit", + help="Give a limit to the amount of files to archive.", + default=DEFAULT_SPRING_ARCHIVE_COUNT, + show_default=True, +) +@click.option( + "--archive-all", + help="Use in order to archive all non-archived-files", + is_flag=True, + default=False, + show_default=True, +) +@click.pass_obj +def archive_spring_files(context: CGConfig, limit: int | None, archive_all: bool): + """Archives non-archived spring files. + Raises: + click.Abort if both a limit to the number of spring files to archive and archive_all is specified. + """ + + if ( + click.get_current_context().get_parameter_source("limit") == ParameterSource.COMMANDLINE + and limit + and archive_all + ): + click.echo( + "Incorrect input parameters - please do not provide both a limit and set --archive-all." + ) + raise click.Abort + + spring_archive_api = SpringArchiveAPI( + status_db=context.status_db, + housekeeper_api=context.housekeeper_api, + data_flow_config=context.data_flow_config, + ) + spring_archive_api.archive_spring_files_and_add_archives_to_housekeeper( + spring_file_count_limit=None if archive_all else limit + ) + + +@archive.command("update-job-statuses") +@click.pass_obj +def update_job_statuses(context: CGConfig): + """Queries ongoing jobs and updates Housekeeper.""" + spring_archive_api = SpringArchiveAPI( + status_db=context.status_db, + housekeeper_api=context.housekeeper_api, + data_flow_config=context.data_flow_config, + ) + spring_archive_api.update_statuses_for_ongoing_tasks() diff --git a/cg/cli/base.py b/cg/cli/base.py index e38df62c6f..2242797231 100644 --- a/cg/cli/base.py +++ b/cg/cli/base.py @@ -9,6 +9,7 @@ import cg from cg.cli.add import add as add_cmd +from cg.cli.archive import archive from cg.cli.backup import backup from cg.cli.clean import clean from cg.cli.compress.base import compress, decompress @@ -97,6 +98,7 @@ def init(context: CGConfig, reset: bool, force: bool): base.add_command(add_cmd) +base.add_command(archive) base.add_command(backup) base.add_command(clean) base.add_command(compress) diff --git a/cg/constants/archiving.py b/cg/constants/archiving.py index 83d963d467..c5a87a59d1 100644 --- a/cg/constants/archiving.py +++ b/cg/constants/archiving.py @@ -1,5 +1,7 @@ from enum import StrEnum +DEFAULT_SPRING_ARCHIVE_COUNT = 200 + class ArchiveLocations(StrEnum): """Archive locations for the different customers' Spring files.""" diff --git a/cg/exc.py b/cg/exc.py index dcb8b16920..921fefef3f 100644 --- a/cg/exc.py +++ b/cg/exc.py @@ -230,5 +230,9 @@ class MissingMetrics(CgError): """Exception raised when mandatory metrics are missing.""" +class ArchiveJobFailedError(CgError): + """Exception raised when an archival or retrieval job has failed.""" + + class XMLError(CgError): """Exception raised when something is wrong with the content of an XML file.""" diff --git a/cg/io/api.py b/cg/io/api.py index ba3cdac6a9..6be357dd37 100644 --- a/cg/io/api.py +++ b/cg/io/api.py @@ -3,26 +3,26 @@ from requests import Response -def put(url: str, headers: dict, json: dict) -> Response: +def put(url: str, headers: dict, json: dict, verify: bool = True) -> Response: """Create PUT request.""" - return requests.put(url=url, headers=headers, json=json) + return requests.put(url=url, headers=headers, json=json, verify=verify) -def post(url: str, headers: dict, json: dict) -> Response: +def post(url: str, headers: dict, json: dict, verify: bool = True) -> Response: """Create POST request.""" - return requests.post(url=url, headers=headers, json=json) + return requests.post(url=url, headers=headers, json=json, verify=verify) -def delete(url: str, headers: dict, json: dict) -> Response: +def delete(url: str, headers: dict, json: dict, verify: bool = True) -> Response: """Create DELETE request.""" - return requests.delete(url=url, headers=headers, json=json) + return requests.delete(url=url, headers=headers, json=json, verify=verify) -def get(url: str, headers: dict, json: dict) -> Response: +def get(url: str, headers: dict, json: dict, verify: bool = True) -> Response: """Create GET request.""" - return requests.get(url=url, headers=headers, json=json) + return requests.get(url=url, headers=headers, json=json, verify=verify) -def patch(url: str, headers: dict, json: dict) -> Response: +def patch(url: str, headers: dict, json: dict, verify: bool = True) -> Response: """Create PATCH request.""" - return requests.patch(url=url, headers=headers, json=json) + return requests.patch(url=url, headers=headers, json=json, verify=verify) diff --git a/cg/io/controller.py b/cg/io/controller.py index 3a8618fb47..e44c29642d 100644 --- a/cg/io/controller.py +++ b/cg/io/controller.py @@ -88,6 +88,6 @@ class APIRequest: @classmethod def api_request_from_content( - cls, api_method: str, url: str, headers: dict, json: dict + cls, api_method: str, url: str, headers: dict, json: dict, verify: bool = False ) -> Response: - return cls.api_request[api_method](url=url, headers=headers, json=json) + return cls.api_request[api_method](url=url, headers=headers, json=json, verify=verify) diff --git a/cg/meta/archive/archive.py b/cg/meta/archive/archive.py index c1c6ab9b51..48a93e4dfe 100644 --- a/cg/meta/archive/archive.py +++ b/cg/meta/archive/archive.py @@ -1,5 +1,4 @@ import logging -from pathlib import Path from typing import Callable, Type from housekeeper.store.models import Archive, File @@ -8,6 +7,7 @@ from cg.apps.housekeeper.hk import HousekeeperAPI from cg.constants import SequencingFileTag from cg.constants.archiving import ArchiveLocations +from cg.exc import ArchiveJobFailedError from cg.meta.archive.ddn_dataflow import DDNDataFlowClient from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination from cg.models.cg_config import DataFlowConfig @@ -15,7 +15,6 @@ from cg.store.models import Sample LOG = logging.getLogger(__name__) -DEFAULT_SPRING_ARCHIVE_COUNT = 200 ARCHIVE_HANDLERS: dict[str, Type[ArchiveHandler]] = { ArchiveLocations.KAROLINSKA_BUCKET: DDNDataFlowClient } @@ -67,41 +66,45 @@ def __init__( self.status_db: Store = status_db self.data_flow_config: DataFlowConfig = data_flow_config - def archive_files(self, files: list[FileAndSample], archive_location: ArchiveLocations) -> int: - archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config) - return archive_handler.archive_files(files_and_samples=files) + def get_non_archived_and_non_pdc_spring_files(self): + files: list[File] = self.housekeeper_api.get_all_non_archived_spring_files() + return [file for file in files if self.get_archive_location_from_file(file) != "PDC"] - def archive_to_location( + def archive_files_to_location( self, files_and_samples: list[FileAndSample], archive_location: ArchiveLocations - ) -> None: - """Filters out the files matching the archive_location, - archives them and adds corresponding entries in Housekeeper.""" - selected_files: [list[FileAndSample]] = filter_files_on_archive_location( - files_and_samples=files_and_samples, archive_location=archive_location - ) - archive_task_id: int = self.archive_files( - files=selected_files, archive_location=archive_location - ) - self.housekeeper_api.add_archives( - files=[Path(file_and_sample.file.path) for file_and_sample in selected_files], - archive_task_id=archive_task_id, - ) + ) -> int: + archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config) + return archive_handler.archive_files(files_and_samples=files_and_samples) - def archive_all_non_archived_spring_files( - self, spring_file_count_limit: int = DEFAULT_SPRING_ARCHIVE_COUNT + def archive_spring_files_and_add_archives_to_housekeeper( + self, spring_file_count_limit: int | None ) -> None: - """Archives all non archived spring files.""" + """Archives all non archived spring files. If a limit is provided, the amount of files archived are limited + to that amount.""" - files_to_archive: list[File] = self.housekeeper_api.get_all_non_archived_spring_files()[ + files_to_archive: list[File] = self.get_non_archived_and_non_pdc_spring_files() + files_and_samples: list[FileAndSample] = self.add_samples_to_files(files_to_archive)[ :spring_file_count_limit ] - files_and_samples: list[FileAndSample] = self.add_samples_to_files(files_to_archive) for archive_location in ArchiveLocations: - self.archive_to_location( - files_and_samples=files_and_samples, - archive_location=archive_location, + files_and_samples_for_location: list[FileAndSample] = filter_files_on_archive_location( + files_and_samples=files_and_samples, archive_location=archive_location ) + if files_and_samples_for_location: + job_id = self.archive_files_to_location( + files_and_samples=files_and_samples_for_location, + archive_location=archive_location, + ) + LOG.info(f"Files submitted to {archive_location} with archival task id {job_id}.") + self.housekeeper_api.add_archives( + files=[ + file_and_sample.file for file_and_sample in files_and_samples_for_location + ], + archive_task_id=job_id, + ) + else: + LOG.info(f"No files to archive for location {archive_location}.") def retrieve_samples(self, sample_internal_ids: list[str]) -> None: """Retrieves the archived spring files for a list of samples.""" @@ -179,16 +182,16 @@ def get_sample(self, file: File) -> Sample | None: ) return sample - def add_samples_to_files(self, files_to_archive: list[File]) -> list[FileAndSample]: + def add_samples_to_files(self, files: list[File]) -> list[FileAndSample]: """Fetches the Sample corresponding to each File, instantiates a FileAndSample object and adds it to the list which is returned.""" files_and_samples: list[FileAndSample] = [] - for file in files_to_archive: + for file in files: if sample := self.get_sample(file): files_and_samples.append(FileAndSample(file=file, sample=sample)) return files_and_samples - def update_status_for_ongoing_tasks(self) -> None: + def update_statuses_for_ongoing_tasks(self) -> None: """Updates any completed jobs with a finished timestamp.""" self.update_ongoing_archivals() self.update_ongoing_retrievals() @@ -236,15 +239,27 @@ def update_ongoing_task( ) -> None: """Fetches info on an ongoing job and updates the Archive entry in Housekeeper.""" archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config) - is_job_done: bool = archive_handler.is_job_done(task_id) - if is_job_done: - LOG.info(f"Job with id {task_id} has finished, updating Archive entries.") + try: + LOG.info(f"Fetching status for job with id {task_id} from {archive_location}") + is_job_done: bool = archive_handler.is_job_done(task_id) + if is_job_done: + LOG.info(f"Job with id {task_id} has finished, updating Archive entries.") + if is_archival: + self.housekeeper_api.set_archived_at(task_id) + else: + self.housekeeper_api.set_retrieved_at(task_id) + else: + LOG.info(f"Job with id {task_id} has not yet finished.") + except ArchiveJobFailedError as error: + LOG.error(error) if is_archival: - self.housekeeper_api.set_archived_at(task_id) + LOG.warning(f"Will remove archive entries with archival task ids {task_id}") + self.housekeeper_api.delete_archives(task_id) else: - self.housekeeper_api.set_retrieved_at(task_id) - else: - LOG.info(f"Job with id {task_id} has not yet finished.") + LOG.warning("Will set retrieval task id to null.") + self.housekeeper_api.update_archive_retrieved_at( + old_retrieval_job_id=task_id, new_retrieval_job_id=None + ) def sort_archival_ids_on_archive_location( self, archive_entries: list[Archive] @@ -270,7 +285,10 @@ def get_unique_archival_ids_and_their_archive_location( ) -> set[tuple[int, ArchiveLocations]]: return set( [ - (archive.archiving_task_id, self.get_archive_location_from_file(archive.file)) + ( + archive.archiving_task_id, + ArchiveLocations(self.get_archive_location_from_file(archive.file)), + ) for archive in archive_entries ] ) @@ -297,12 +315,13 @@ def get_unique_retrieval_ids_and_their_archive_location( ) -> set[tuple[int, ArchiveLocations]]: return set( [ - (archive.retrieval_task_id, self.get_archive_location_from_file(archive.file)) + ( + archive.retrieval_task_id, + ArchiveLocations(self.get_archive_location_from_file(archive.file)), + ) for archive in archive_entries ] ) - def get_archive_location_from_file(self, file: File) -> ArchiveLocations: - return ArchiveLocations( - self.status_db.get_sample_by_internal_id(file.version.bundle.name).archive_location - ) + def get_archive_location_from_file(self, file: File) -> str: + return self.status_db.get_sample_by_internal_id(file.version.bundle.name).archive_location diff --git a/cg/meta/archive/ddn_dataflow.py b/cg/meta/archive/ddn_dataflow.py index a9257a6838..2e23f669b4 100644 --- a/cg/meta/archive/ddn_dataflow.py +++ b/cg/meta/archive/ddn_dataflow.py @@ -10,7 +10,7 @@ from requests.models import Response from cg.constants.constants import APIMethods -from cg.exc import DdnDataflowAuthenticationError +from cg.exc import ArchiveJobFailedError, DdnDataflowAuthenticationError from cg.io.controller import APIRequest from cg.meta.archive.models import ( ArchiveHandler, @@ -57,6 +57,22 @@ class JobStatus(StrEnum): TERMINATED_ON_WARNING = "Terminated on warning" +FAILED_JOB_STATUSES: list[str] = [ + JobStatus.CANCELED, + JobStatus.DENIED, + JobStatus.INVALID_LICENSE, + JobStatus.REFUSED, + JobStatus.TERMINATED_ON_ERROR, + JobStatus.TERMINATED_ON_WARNING, +] +ONGOING_JOB_STATUSES: list[str] = [ + JobStatus.CREATION_IN_PROGRESS, + JobStatus.IN_QUEUE, + JobStatus.ON_VALIDATION, + JobStatus.RUNNING, +] + + class MiriaObject(FileTransferData): """Model for representing a singular object transfer.""" @@ -103,7 +119,7 @@ class TransferPayload(BaseModel): files_to_transfer: list[MiriaObject] osType: str = OSTYPE - createFolder: bool = False + createFolder: bool = True settings: list[dict] = [] def trim_paths(self, attribute_to_trim: str): @@ -139,11 +155,19 @@ def post_request(self, url: str, headers: dict) -> "TransferJob": The job ID of the launched transfer task. """ + LOG.info( + "Sending request with headers: \n" + + f"{headers} \n" + + "and body: \n" + + f"{self.model_dump()}" + ) + response: Response = APIRequest.api_request_from_content( api_method=APIMethods.POST, url=url, headers=headers, json=self.model_dump(), + verify=False, ) response.raise_for_status() return TransferJob.model_validate(response.json()) @@ -183,7 +207,7 @@ class GetJobStatusResponse(BaseModel): """Model representing the response fields from a get_job_status post.""" job_id: int = Field(alias="id") - status: str + status: JobStatus class GetJobStatusPayload(BaseModel): @@ -195,9 +219,22 @@ def get_job_status(self, url: str, headers: dict) -> GetJobStatusResponse: """Sends a get request to the given URL with the given headers. Returns the parsed status response of the task specified in the URL. Raises: - HTTPError if the response code is not successful.""" + HTTPError if the response code is not ok. + """ + + LOG.info( + "Sending request with headers: \n" + + f"{headers} \n" + + "and body: \n" + + f"{self.model_dump()}" + ) + response: Response = APIRequest.api_request_from_content( - api_method=APIMethods.GET, url=url, headers=headers, json={} + api_method=APIMethods.GET, + url=url, + headers=headers, + json=self.model_dump(), + verify=False, ) response.raise_for_status() return GetJobStatusResponse.model_validate(response.json()) @@ -233,6 +270,7 @@ def _set_auth_tokens(self) -> None: name=self.user, password=self.password, ).model_dump(), + verify=False, ) if not response.ok: raise DdnDataflowAuthenticationError(message=response.text) @@ -248,6 +286,7 @@ def _refresh_auth_token(self) -> None: url=urljoin(base=self.url, url=DataflowEndpoints.REFRESH_AUTH_TOKEN), headers=self.headers, json=RefreshPayload(refresh=self.refresh_token).model_dump(), + verify=False, ) response_content: AuthToken = AuthToken.model_validate(response.json()) self.auth_token: str = response_content.access @@ -328,15 +367,18 @@ def convert_into_transfer_data( ] def is_job_done(self, job_id: int) -> bool: + """Returns True if the specified job is completed, and false if it is still ongoing. + Raises: + ArchiveJobFailedError if the specified job has a failed status.""" get_job_status_payload = GetJobStatusPayload(id=job_id) get_job_status_response: GetJobStatusResponse = get_job_status_payload.get_job_status( url=urljoin(self.url, DataflowEndpoints.GET_JOB_STATUS + str(job_id)), headers=dict(self.headers, **self.auth_header), ) - if get_job_status_response.status == JobStatus.COMPLETED: + job_status: JobStatus = get_job_status_response.status + LOG.info(f"Miria returned status {job_status} for job {job_id}") + if job_status == JobStatus.COMPLETED: return True - LOG.info( - f"Job with id {job_id} has not been completed. " - f"Current job description is {get_job_status_response.status}" - ) + if job_status in FAILED_JOB_STATUSES: + raise ArchiveJobFailedError(f"Job with id {job_id} failed with status {job_status}") return False diff --git a/tests/meta/archive/__init__.py b/tests/meta/archive/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/meta/archive/conftest.py b/tests/meta/archive/conftest.py index 5874eb9fe4..7d00fccd45 100644 --- a/tests/meta/archive/conftest.py +++ b/tests/meta/archive/conftest.py @@ -4,7 +4,8 @@ from unittest import mock import pytest -from housekeeper.store.models import File +from click.testing import CliRunner +from housekeeper.store.models import Bundle, File from requests import Response from cg.apps.housekeeper.hk import HousekeeperAPI @@ -22,7 +23,7 @@ TransferPayload, ) from cg.meta.archive.models import FileAndSample -from cg.models.cg_config import DataFlowConfig +from cg.models.cg_config import CGConfig, DataFlowConfig from cg.store import Store from cg.store.models import Customer, Sample from tests.store_helpers import StoreHelpers @@ -61,7 +62,7 @@ def archive_request_json( ) -> dict: return { "osType": "Unix/MacOS", - "createFolder": False, + "createFolder": True, "pathInfo": [ { "destination": f"{remote_storage_repository}ADM1", @@ -80,7 +81,7 @@ def retrieve_request_json( """Returns the body for a retrieval http post towards the DDN Miria API.""" return { "osType": "Unix/MacOS", - "createFolder": False, + "createFolder": True, "pathInfo": [ { "destination": local_storage_repository @@ -298,3 +299,73 @@ def spring_archive_api( status_db=archive_store, data_flow_config=ddn_dataflow_config, ) + + +@pytest.fixture +def cli_runner() -> CliRunner: + """Create a CliRunner""" + return CliRunner() + + +@pytest.fixture +def base_context( + base_store: Store, housekeeper_api: HousekeeperAPI, cg_config_object: CGConfig +) -> CGConfig: + """context to use in CLI.""" + cg_config_object.status_db_ = base_store + cg_config_object.housekeeper_api_ = housekeeper_api + return cg_config_object + + +@pytest.fixture +def archive_context( + base_context: CGConfig, + real_housekeeper_api: HousekeeperAPI, + path_to_spring_file_to_archive: str, + path_to_spring_file_with_ongoing_archival: str, + archival_job_id: int, + helpers: StoreHelpers, + ddn_dataflow_config: DataFlowConfig, +) -> CGConfig: + base_context.housekeeper_api_ = real_housekeeper_api + base_context.data_flow_config = ddn_dataflow_config + + customer = helpers.ensure_customer( + store=base_context.status_db, customer_id="miria_customer", customer_name="Miriam" + ) + customer.data_archive_location = ArchiveLocations.KAROLINSKA_BUCKET + + base_context.status_db.add_sample( + name="sample_with_spring_files", + sex="male", + internal_id="sample_with_spring_files", + **{"customer": "MiriaCustomer"}, + ) + helpers.add_sample( + store=base_context.status_db, customer_id="miria_customer", internal_id="miria_sample" + ) + bundle: Bundle = real_housekeeper_api.create_new_bundle_and_version(name="miria_sample") + real_housekeeper_api.add_file( + path=path_to_spring_file_to_archive, + version_obj=bundle.versions[0], + tags=[SequencingFileTag.SPRING], + ) + file: File = real_housekeeper_api.add_file( + path=path_to_spring_file_with_ongoing_archival, + version_obj=bundle.versions[0], + tags=[SequencingFileTag.SPRING], + ) + file.id = 1234 + real_housekeeper_api.add_archives(files=[file], archive_task_id=archival_job_id) + + return base_context + + +@pytest.fixture +def path_to_spring_file_to_archive() -> str: + return "/home/path/to/spring/file.spring" + + +@pytest.fixture +def path_to_spring_file_with_ongoing_archival() -> str: + return "/home/path/to/ongoing/spring/file.spring" diff --git a/tests/meta/archive/test_archive_api.py b/tests/meta/archive/test_archive_api.py index 8b4c60176a..60aa5aa291 100644 --- a/tests/meta/archive/test_archive_api.py +++ b/tests/meta/archive/test_archive_api.py @@ -1,9 +1,9 @@ -from pathlib import Path from unittest import mock import pytest from housekeeper.store.models import File +from cg.apps.housekeeper.hk import HousekeeperAPI from cg.constants.archiving import ArchiveLocations from cg.constants.constants import APIMethods from cg.constants.housekeeper_tags import SequencingFileTag @@ -168,8 +168,8 @@ def test_call_corresponding_archiving_method(spring_archive_api: SpringArchiveAP return_value=123, ) as mock_request_submitter: # WHEN calling the corresponding archive method - spring_archive_api.archive_files( - files=[file_and_sample], archive_location=ArchiveLocations.KAROLINSKA_BUCKET + spring_archive_api.archive_files_to_location( + files_and_samples=[file_and_sample], archive_location=ArchiveLocations.KAROLINSKA_BUCKET ) # THEN the correct archive function should have been called once @@ -183,6 +183,7 @@ def test_archive_all_non_archived_spring_files( archive_request_json, header_with_test_auth_token, test_auth_token: AuthToken, + sample_id: str, ): """Test archiving all non-archived SPRING files for Miria customers.""" # GIVEN a populated status_db database with two customers, one DDN and one non-DDN, @@ -193,12 +194,16 @@ def test_archive_all_non_archived_spring_files( AuthToken, "model_validate", return_value=test_auth_token, + ), mock.patch.object( + HousekeeperAPI, + "get_all_non_archived_spring_files", + return_value=[spring_archive_api.housekeeper_api.get_files(bundle=sample_id).first()], ), mock.patch.object( APIRequest, "api_request_from_content", return_value=ok_miria_response, ) as mock_request_submitter: - spring_archive_api.archive_all_non_archived_spring_files() + spring_archive_api.archive_spring_files_and_add_archives_to_housekeeper(200) # THEN the DDN archiving function should have been called with the correct destination and source. mock_request_submitter.assert_called_with( @@ -206,9 +211,10 @@ def test_archive_all_non_archived_spring_files( url="some/api/files/archive", headers=header_with_test_auth_token, json=archive_request_json, + verify=False, ) - # THEN all spring files for Karolinska should have an entry in the Archive table in HouseKeeper while no other + # THEN all spring files for Karolinska should have an entry in the Archive table in Housekeeper while no other # files should have an entry files: list[File] = spring_archive_api.housekeeper_api.files() for file in files: @@ -237,7 +243,7 @@ def test_get_archival_status( ): # GIVEN a file with an ongoing archival file: File = spring_archive_api.housekeeper_api.files().first() - spring_archive_api.housekeeper_api.add_archives(files=[Path(file.path)], archive_task_id=123) + spring_archive_api.housekeeper_api.add_archives(files=[file], archive_task_id=archival_job_id) # WHEN querying the task id and getting a "COMPLETED" response with mock.patch.object( @@ -273,6 +279,7 @@ def test_get_retrieval_status( ok_miria_job_status_response, archive_request_json, header_with_test_auth_token, + archival_job_id: int, retrieval_job_id: int, test_auth_token, job_status, @@ -280,9 +287,9 @@ def test_get_retrieval_status( ): # GIVEN a file with an ongoing archival file: File = spring_archive_api.housekeeper_api.files().first() - spring_archive_api.housekeeper_api.add_archives(files=[Path(file.path)], archive_task_id=123) + spring_archive_api.housekeeper_api.add_archives(files=[file], archive_task_id=archival_job_id) spring_archive_api.housekeeper_api.set_archive_retrieval_task_id( - file_id=file.id, retrieval_task_id=124 + file_id=file.id, retrieval_task_id=retrieval_job_id ) # WHEN querying the task id and getting a "COMPLETED" response @@ -318,6 +325,7 @@ def test_retrieve_samples( retrieve_request_json, header_with_test_auth_token, test_auth_token, + archival_job_id: int, sample_with_spring_file: str, ): """Test retrieving all archived SPRING files tied to a sample for a Miria customer.""" @@ -326,10 +334,10 @@ def test_retrieve_samples( # with the DDN customer having two samples, and the non-DDN having one sample. files: list[File] = spring_archive_api.housekeeper_api.get_files( bundle=sample_with_spring_file, tags=[SequencingFileTag.SPRING] - ) + ).all() for file in files: spring_archive_api.housekeeper_api.add_archives( - files=[Path(file.full_path)], archive_task_id=123 + files=[file], archive_task_id=archival_job_id ) assert not file.archive.retrieval_task_id assert file.archive @@ -357,6 +365,7 @@ def test_retrieve_samples( url="some/api/files/retrieve", headers=header_with_test_auth_token, json=retrieve_sample_request_json, + verify=False, ) # THEN the Archive entry should have a retrieval task id set diff --git a/tests/meta/archive/test_archive_cli.py b/tests/meta/archive/test_archive_cli.py new file mode 100644 index 0000000000..1b652398af --- /dev/null +++ b/tests/meta/archive/test_archive_cli.py @@ -0,0 +1,197 @@ +import datetime +from unittest import mock + +import pytest +from click.testing import CliRunner +from housekeeper.store.models import Archive, File +from requests import Response + +from cg.cli.archive import archive_spring_files, update_job_statuses +from cg.constants import EXIT_SUCCESS +from cg.io.controller import APIRequest +from cg.meta.archive.ddn_dataflow import ( + FAILED_JOB_STATUSES, + ONGOING_JOB_STATUSES, + AuthToken, + GetJobStatusPayload, + GetJobStatusResponse, + JobStatus, + TransferJob, + TransferPayload, +) +from cg.models.cg_config import CGConfig + + +def test_limit_and_archive_all_fails(cli_runner: CliRunner, cg_context: CGConfig): + """Tests that when invoking archive-spring-files in the Archive CLI module, the command is aborted + if both a limit and the --archive-all flag is provided.""" + + # GIVEN a CLI runner and a context + + # WHEN invoking archive_spring_files with both a given limit and specifying archive_all + result = cli_runner.invoke( + archive_spring_files, + ["--limit", 100, "--archive-all"], + obj=cg_context, + ) + + # THEN the command should have exited with an exit_code 1 + assert result.exit_code == 1 + assert ( + "Incorrect input parameters - please do not provide both a limit and set --archive-all." + in result.stdout + ) + + +def test_archive_spring_files_success( + cli_runner: CliRunner, + archive_context: CGConfig, + archival_job_id: int, + test_auth_token: AuthToken, + ok_miria_response: Response, +): + """Tests that the CLI command 'cg archive archive-spring-files' adds an archive with the correct job_id when + an archiving job is launched via Miria.""" + + # GIVEN a CLI runner and a context + + # GIVEN a spring file belonging to a customer with archive location 'karolinska_bucket' + all_non_archived_spring_files: list[ + File + ] = archive_context.housekeeper_api.get_all_non_archived_spring_files() + assert len(all_non_archived_spring_files) == 1 + spring_file: File = all_non_archived_spring_files[0] + + # WHEN running 'cg archive archive-spring-files' + with mock.patch.object( + AuthToken, + "model_validate", + return_value=test_auth_token, + ), mock.patch.object( + APIRequest, + "api_request_from_content", + return_value=ok_miria_response, + ), mock.patch.object( + TransferPayload, "post_request", return_value=TransferJob(jobId=archival_job_id) + ): + result = cli_runner.invoke( + archive_spring_files, + obj=archive_context, + ) + + # THEN the command should have executed without fail and added an entry in the archive with the archiving task id + # returned by Miria + assert result.exit_code == EXIT_SUCCESS + assert spring_file.archive.archiving_task_id == archival_job_id + + +@pytest.mark.parametrize( + "job_status", [JobStatus.COMPLETED, FAILED_JOB_STATUSES[0], ONGOING_JOB_STATUSES[0]] +) +def test_get_archival_job_status( + cli_runner: CliRunner, + archive_context: CGConfig, + archival_job_id: int, + test_auth_token: AuthToken, + ok_miria_response: Response, + job_status: JobStatus, +): + """Tests that when invoking update_job_statuses in the Archive CLI module with an ongoing archival job, + the database is updated according to whether the job has completed, failed or is still ongoing. + """ + + # GIVEN a CLI runner and a context + + # GIVEN an archive entry with an ongoing archival + assert len(archive_context.housekeeper_api.get_archive_entries()) == 1 + + # WHEN invoking update_job_statuses + with mock.patch.object( + AuthToken, + "model_validate", + return_value=test_auth_token, + ), mock.patch.object( + APIRequest, + "api_request_from_content", + return_value=ok_miria_response, + ), mock.patch.object( + GetJobStatusPayload, + "get_job_status", + return_value=GetJobStatusResponse(id=archival_job_id, status=job_status), + ): + result = cli_runner.invoke( + update_job_statuses, + obj=archive_context, + ) + + # THEN the command should have exited successfully and updated the archive record + assert result.exit_code == 0 + if job_status == JobStatus.COMPLETED: + assert archive_context.housekeeper_api.get_archive_entries( + archival_task_id=archival_job_id + )[0].archived_at + elif job_status in FAILED_JOB_STATUSES: + assert not archive_context.housekeeper_api.get_archive_entries( + archival_task_id=archival_job_id + ) + elif job_status in ONGOING_JOB_STATUSES: + assert not archive_context.housekeeper_api.get_archive_entries( + archival_task_id=archival_job_id + )[0].archived_at + + +@pytest.mark.parametrize( + "job_status", [JobStatus.COMPLETED, FAILED_JOB_STATUSES[0], ONGOING_JOB_STATUSES[0]] +) +def test_get_retrieval_job_status( + cli_runner: CliRunner, + archive_context: CGConfig, + retrieval_job_id: int, + test_auth_token: AuthToken, + ok_miria_response: Response, + job_status: JobStatus, +): + """Tests that when invoking update_job_statuses in the Archive CLI module with an ongoing retrieval job, + the database is updated according to whether the job has completed, failed or is still ongoing. + """ + + # GIVEN a CLI runner and a context + + # GIVEN an archive entry with an ongoing retrieval + retrieving_archive: Archive = archive_context.housekeeper_api.get_archive_entries()[0] + retrieving_archive.archived_at = datetime.datetime.now() + retrieving_archive.retrieval_task_id = retrieval_job_id + + # WHEN invoking update_job_statuses + with mock.patch.object( + AuthToken, + "model_validate", + return_value=test_auth_token, + ), mock.patch.object( + APIRequest, + "api_request_from_content", + return_value=ok_miria_response, + ), mock.patch.object( + GetJobStatusPayload, + "get_job_status", + return_value=GetJobStatusResponse(id=retrieval_job_id, status=job_status), + ): + result = cli_runner.invoke( + update_job_statuses, + obj=archive_context, + ) + + # THEN the command should have exited successfully and updated the archive record + assert result.exit_code == 0 + if job_status == JobStatus.COMPLETED: + assert archive_context.housekeeper_api.get_archive_entries( + retrieval_task_id=retrieval_job_id + )[0].retrieved_at + elif job_status in FAILED_JOB_STATUSES: + assert not archive_context.housekeeper_api.get_archive_entries( + retrieval_task_id=retrieval_job_id + ) + elif job_status in ONGOING_JOB_STATUSES: + assert not archive_context.housekeeper_api.get_archive_entries( + retrieval_task_id=retrieval_job_id + )[0].retrieved_at diff --git a/tests/meta/archive/test_archiving.py b/tests/meta/archive/test_archiving.py index dfeac3b2f8..634339b9cf 100644 --- a/tests/meta/archive/test_archiving.py +++ b/tests/meta/archive/test_archiving.py @@ -301,10 +301,11 @@ def test_archive_folders( } ], "osType": OSTYPE, - "createFolder": False, + "createFolder": True, "metadataList": [], "settings": [], }, + verify=False, ) @@ -348,10 +349,11 @@ def test_retrieve_samples( } ], "osType": OSTYPE, - "createFolder": False, + "createFolder": True, "metadataList": [], "settings": [], }, + verify=False, )