Skip to content

Commit

Permalink
(Archiving) CLI functionality (#2345) (minor)
Browse files Browse the repository at this point in the history
### Added

- CLI command archive-all-non-archived-spring-files which invokes its namesake in the SpringArchiveAPI.
- CLI command update-job-statuses which queries any ongoing archivals/retrievals to see if they have finished.
  • Loading branch information
islean authored Dec 1, 2023
1 parent f5d839b commit 7caadb5
Show file tree
Hide file tree
Showing 14 changed files with 524 additions and 87 deletions.
35 changes: 29 additions & 6 deletions cg/apps/housekeeper/hk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from housekeeper.include import checksum as hk_checksum
from housekeeper.include import include_version
from housekeeper.store import Store, models
from housekeeper.store.database import create_all_tables, drop_all_tables, initialize_database
from housekeeper.store.database import (
create_all_tables,
drop_all_tables,
initialize_database,
)
from housekeeper.store.models import Archive, Bundle, File, Version
from sqlalchemy.orm import Query

Expand Down Expand Up @@ -422,14 +426,15 @@ def get_archived_files(self, bundle_name: str, tags: list | None = None) -> list
"""Returns all archived_files from a given bundle, tagged with the given tags"""
return self._store.get_archived_files(bundle_name=bundle_name, tags=tags or [])

def add_archives(self, files: list[Path], archive_task_id: int) -> None:
def add_archives(self, files: list[File], archive_task_id: int) -> None:
"""Creates an archive object for the given files, and adds the archive task id to them."""
for file in files:
archived_file: File | None = self._store.get_files(file_path=file.as_posix()).first()
if not archived_file:
raise HousekeeperFileMissingError(f"No file in housekeeper with the path {file}")
file_id: int = file.id
LOG.info(
f"Adding an archive to file {file_id} with archiving task id {archive_task_id}"
)
archive: Archive = self._store.create_archive(
archived_file.id, archiving_task_id=archive_task_id
file_id=file_id, archiving_task_id=archive_task_id
)
self._store.session.add(archive)
self.commit()
Expand Down Expand Up @@ -618,3 +623,21 @@ def get_ongoing_archivals(self) -> list[Archive]:

def get_ongoing_retrievals(self) -> list[Archive]:
return self._store.get_ongoing_retrievals()

def delete_archives(self, archival_task_id: int):
archives_to_delete: list[Archive] = self.get_archive_entries(
archival_task_id=archival_task_id
)
for archive in archives_to_delete:
self._store.session.delete(archive)
self._store.session.commit()

def update_archive_retrieved_at(
self, old_retrieval_job_id: int, new_retrieval_job_id: int | None
):
archives_to_update: list[Archive] = self.get_archive_entries(
retrieval_task_id=old_retrieval_job_id
)
for archive in archives_to_update:
archive.retrieval_task_id = new_retrieval_job_id
self._store.session.commit()
66 changes: 66 additions & 0 deletions cg/cli/archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import click
from click.core import ParameterSource

from cg.constants.archiving import DEFAULT_SPRING_ARCHIVE_COUNT
from cg.meta.archive.archive import SpringArchiveAPI
from cg.models.cg_config import CGConfig


@click.group()
def archive():
"""Archive utilities."""
pass


@archive.command("archive-spring-files")
@click.option(
"-l",
"--limit",
help="Give a limit to the amount of files to archive.",
default=DEFAULT_SPRING_ARCHIVE_COUNT,
show_default=True,
)
@click.option(
"--archive-all",
help="Use in order to archive all non-archived-files",
is_flag=True,
default=False,
show_default=True,
)
@click.pass_obj
def archive_spring_files(context: CGConfig, limit: int | None, archive_all: bool):
"""Archives non-archived spring files.
Raises:
click.Abort if both a limit to the number of spring files to archive and archive_all is specified.
"""

if (
click.get_current_context().get_parameter_source("limit") == ParameterSource.COMMANDLINE
and limit
and archive_all
):
click.echo(
"Incorrect input parameters - please do not provide both a limit and set --archive-all."
)
raise click.Abort

spring_archive_api = SpringArchiveAPI(
status_db=context.status_db,
housekeeper_api=context.housekeeper_api,
data_flow_config=context.data_flow_config,
)
spring_archive_api.archive_spring_files_and_add_archives_to_housekeeper(
spring_file_count_limit=None if archive_all else limit
)


@archive.command("update-job-statuses")
@click.pass_obj
def update_job_statuses(context: CGConfig):
"""Queries ongoing jobs and updates Housekeeper."""
spring_archive_api = SpringArchiveAPI(
status_db=context.status_db,
housekeeper_api=context.housekeeper_api,
data_flow_config=context.data_flow_config,
)
spring_archive_api.update_statuses_for_ongoing_tasks()
2 changes: 2 additions & 0 deletions cg/cli/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import cg
from cg.cli.add import add as add_cmd
from cg.cli.archive import archive
from cg.cli.backup import backup
from cg.cli.clean import clean
from cg.cli.compress.base import compress, decompress
Expand Down Expand Up @@ -97,6 +98,7 @@ def init(context: CGConfig, reset: bool, force: bool):


base.add_command(add_cmd)
base.add_command(archive)
base.add_command(backup)
base.add_command(clean)
base.add_command(compress)
Expand Down
2 changes: 2 additions & 0 deletions cg/constants/archiving.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from enum import StrEnum

DEFAULT_SPRING_ARCHIVE_COUNT = 200


class ArchiveLocations(StrEnum):
"""Archive locations for the different customers' Spring files."""
Expand Down
4 changes: 4 additions & 0 deletions cg/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,5 +230,9 @@ class MissingMetrics(CgError):
"""Exception raised when mandatory metrics are missing."""


class ArchiveJobFailedError(CgError):
"""Exception raised when an archival or retrieval job has failed."""


class XMLError(CgError):
"""Exception raised when something is wrong with the content of an XML file."""
20 changes: 10 additions & 10 deletions cg/io/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,26 @@
from requests import Response


def put(url: str, headers: dict, json: dict) -> Response:
def put(url: str, headers: dict, json: dict, verify: bool = True) -> Response:
"""Create PUT request."""
return requests.put(url=url, headers=headers, json=json)
return requests.put(url=url, headers=headers, json=json, verify=verify)


def post(url: str, headers: dict, json: dict) -> Response:
def post(url: str, headers: dict, json: dict, verify: bool = True) -> Response:
"""Create POST request."""
return requests.post(url=url, headers=headers, json=json)
return requests.post(url=url, headers=headers, json=json, verify=verify)


def delete(url: str, headers: dict, json: dict) -> Response:
def delete(url: str, headers: dict, json: dict, verify: bool = True) -> Response:
"""Create DELETE request."""
return requests.delete(url=url, headers=headers, json=json)
return requests.delete(url=url, headers=headers, json=json, verify=verify)


def get(url: str, headers: dict, json: dict) -> Response:
def get(url: str, headers: dict, json: dict, verify: bool = True) -> Response:
"""Create GET request."""
return requests.get(url=url, headers=headers, json=json)
return requests.get(url=url, headers=headers, json=json, verify=verify)


def patch(url: str, headers: dict, json: dict) -> Response:
def patch(url: str, headers: dict, json: dict, verify: bool = True) -> Response:
"""Create PATCH request."""
return requests.patch(url=url, headers=headers, json=json)
return requests.patch(url=url, headers=headers, json=json, verify=verify)
4 changes: 2 additions & 2 deletions cg/io/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,6 @@ class APIRequest:

@classmethod
def api_request_from_content(
cls, api_method: str, url: str, headers: dict, json: dict
cls, api_method: str, url: str, headers: dict, json: dict, verify: bool = False
) -> Response:
return cls.api_request[api_method](url=url, headers=headers, json=json)
return cls.api_request[api_method](url=url, headers=headers, json=json, verify=verify)
105 changes: 62 additions & 43 deletions cg/meta/archive/archive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
from pathlib import Path
from typing import Callable, Type

from housekeeper.store.models import Archive, File
Expand All @@ -8,14 +7,14 @@
from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.constants import SequencingFileTag
from cg.constants.archiving import ArchiveLocations
from cg.exc import ArchiveJobFailedError
from cg.meta.archive.ddn_dataflow import DDNDataFlowClient
from cg.meta.archive.models import ArchiveHandler, FileAndSample, SampleAndDestination
from cg.models.cg_config import DataFlowConfig
from cg.store import Store
from cg.store.models import Sample

LOG = logging.getLogger(__name__)
DEFAULT_SPRING_ARCHIVE_COUNT = 200
ARCHIVE_HANDLERS: dict[str, Type[ArchiveHandler]] = {
ArchiveLocations.KAROLINSKA_BUCKET: DDNDataFlowClient
}
Expand Down Expand Up @@ -67,41 +66,45 @@ def __init__(
self.status_db: Store = status_db
self.data_flow_config: DataFlowConfig = data_flow_config

def archive_files(self, files: list[FileAndSample], archive_location: ArchiveLocations) -> int:
archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config)
return archive_handler.archive_files(files_and_samples=files)
def get_non_archived_and_non_pdc_spring_files(self):
files: list[File] = self.housekeeper_api.get_all_non_archived_spring_files()
return [file for file in files if self.get_archive_location_from_file(file) != "PDC"]

def archive_to_location(
def archive_files_to_location(
self, files_and_samples: list[FileAndSample], archive_location: ArchiveLocations
) -> None:
"""Filters out the files matching the archive_location,
archives them and adds corresponding entries in Housekeeper."""
selected_files: [list[FileAndSample]] = filter_files_on_archive_location(
files_and_samples=files_and_samples, archive_location=archive_location
)
archive_task_id: int = self.archive_files(
files=selected_files, archive_location=archive_location
)
self.housekeeper_api.add_archives(
files=[Path(file_and_sample.file.path) for file_and_sample in selected_files],
archive_task_id=archive_task_id,
)
) -> int:
archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config)
return archive_handler.archive_files(files_and_samples=files_and_samples)

def archive_all_non_archived_spring_files(
self, spring_file_count_limit: int = DEFAULT_SPRING_ARCHIVE_COUNT
def archive_spring_files_and_add_archives_to_housekeeper(
self, spring_file_count_limit: int | None
) -> None:
"""Archives all non archived spring files."""
"""Archives all non archived spring files. If a limit is provided, the amount of files archived are limited
to that amount."""

files_to_archive: list[File] = self.housekeeper_api.get_all_non_archived_spring_files()[
files_to_archive: list[File] = self.get_non_archived_and_non_pdc_spring_files()
files_and_samples: list[FileAndSample] = self.add_samples_to_files(files_to_archive)[
:spring_file_count_limit
]
files_and_samples: list[FileAndSample] = self.add_samples_to_files(files_to_archive)

for archive_location in ArchiveLocations:
self.archive_to_location(
files_and_samples=files_and_samples,
archive_location=archive_location,
files_and_samples_for_location: list[FileAndSample] = filter_files_on_archive_location(
files_and_samples=files_and_samples, archive_location=archive_location
)
if files_and_samples_for_location:
job_id = self.archive_files_to_location(
files_and_samples=files_and_samples_for_location,
archive_location=archive_location,
)
LOG.info(f"Files submitted to {archive_location} with archival task id {job_id}.")
self.housekeeper_api.add_archives(
files=[
file_and_sample.file for file_and_sample in files_and_samples_for_location
],
archive_task_id=job_id,
)
else:
LOG.info(f"No files to archive for location {archive_location}.")

def retrieve_samples(self, sample_internal_ids: list[str]) -> None:
"""Retrieves the archived spring files for a list of samples."""
Expand Down Expand Up @@ -179,16 +182,16 @@ def get_sample(self, file: File) -> Sample | None:
)
return sample

def add_samples_to_files(self, files_to_archive: list[File]) -> list[FileAndSample]:
def add_samples_to_files(self, files: list[File]) -> list[FileAndSample]:
"""Fetches the Sample corresponding to each File, instantiates a FileAndSample object and
adds it to the list which is returned."""
files_and_samples: list[FileAndSample] = []
for file in files_to_archive:
for file in files:
if sample := self.get_sample(file):
files_and_samples.append(FileAndSample(file=file, sample=sample))
return files_and_samples

def update_status_for_ongoing_tasks(self) -> None:
def update_statuses_for_ongoing_tasks(self) -> None:
"""Updates any completed jobs with a finished timestamp."""
self.update_ongoing_archivals()
self.update_ongoing_retrievals()
Expand Down Expand Up @@ -236,15 +239,27 @@ def update_ongoing_task(
) -> None:
"""Fetches info on an ongoing job and updates the Archive entry in Housekeeper."""
archive_handler: ArchiveHandler = ARCHIVE_HANDLERS[archive_location](self.data_flow_config)
is_job_done: bool = archive_handler.is_job_done(task_id)
if is_job_done:
LOG.info(f"Job with id {task_id} has finished, updating Archive entries.")
try:
LOG.info(f"Fetching status for job with id {task_id} from {archive_location}")
is_job_done: bool = archive_handler.is_job_done(task_id)
if is_job_done:
LOG.info(f"Job with id {task_id} has finished, updating Archive entries.")
if is_archival:
self.housekeeper_api.set_archived_at(task_id)
else:
self.housekeeper_api.set_retrieved_at(task_id)
else:
LOG.info(f"Job with id {task_id} has not yet finished.")
except ArchiveJobFailedError as error:
LOG.error(error)
if is_archival:
self.housekeeper_api.set_archived_at(task_id)
LOG.warning(f"Will remove archive entries with archival task ids {task_id}")
self.housekeeper_api.delete_archives(task_id)
else:
self.housekeeper_api.set_retrieved_at(task_id)
else:
LOG.info(f"Job with id {task_id} has not yet finished.")
LOG.warning("Will set retrieval task id to null.")
self.housekeeper_api.update_archive_retrieved_at(
old_retrieval_job_id=task_id, new_retrieval_job_id=None
)

def sort_archival_ids_on_archive_location(
self, archive_entries: list[Archive]
Expand All @@ -270,7 +285,10 @@ def get_unique_archival_ids_and_their_archive_location(
) -> set[tuple[int, ArchiveLocations]]:
return set(
[
(archive.archiving_task_id, self.get_archive_location_from_file(archive.file))
(
archive.archiving_task_id,
ArchiveLocations(self.get_archive_location_from_file(archive.file)),
)
for archive in archive_entries
]
)
Expand All @@ -297,12 +315,13 @@ def get_unique_retrieval_ids_and_their_archive_location(
) -> set[tuple[int, ArchiveLocations]]:
return set(
[
(archive.retrieval_task_id, self.get_archive_location_from_file(archive.file))
(
archive.retrieval_task_id,
ArchiveLocations(self.get_archive_location_from_file(archive.file)),
)
for archive in archive_entries
]
)

def get_archive_location_from_file(self, file: File) -> ArchiveLocations:
return ArchiveLocations(
self.status_db.get_sample_by_internal_id(file.version.bundle.name).archive_location
)
def get_archive_location_from_file(self, file: File) -> str:
return self.status_db.get_sample_by_internal_id(file.version.bundle.name).archive_location
Loading

0 comments on commit 7caadb5

Please sign in to comment.