Skip to content

Commit

Permalink
Merge branch 'master' into add-cli-container
Browse files Browse the repository at this point in the history
  • Loading branch information
seallard authored Oct 23, 2023
2 parents aea281f + 0b8ae43 commit 2b6350f
Show file tree
Hide file tree
Showing 29 changed files with 973 additions and 507 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 51.6.14
current_version = 51.7.2
commit = True
tag = True
tag_name = v{new_version}
Expand Down
2 changes: 1 addition & 1 deletion cg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import pkg_resources

__title__ = "cg"
__version__ = "51.6.14"
__version__ = "51.7.2"
63 changes: 58 additions & 5 deletions cg/cli/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
from cg.apps.slurm.slurm_api import SlurmAPI
from cg.constants.constants import DRY_RUN, FlowCellStatus
from cg.constants.housekeeper_tags import SequencingFileTag
from cg.exc import FlowCellEncryptionError, FlowCellError
from cg.exc import (
DsmcAlreadyRunningError,
FlowCellAlreadyBackedUpError,
FlowCellEncryptionError,
FlowCellError,
PdcError,
)
from cg.meta.backup.backup import BackupAPI, SpringBackupAPI
from cg.meta.backup.pdc import PdcAPI
from cg.meta.encryption.encryption import (
Expand All @@ -20,7 +26,10 @@
)
from cg.meta.tar.tar import TarAPI
from cg.models.cg_config import CGConfig
from cg.models.flow_cell.flow_cell import get_flow_cells_from_path
from cg.models.flow_cell.flow_cell import (
FlowCellDirectoryData,
get_flow_cells_from_path,
)
from cg.store import Store
from cg.store.models import Flowcell, Sample

Expand All @@ -34,13 +43,56 @@ def backup(context: CGConfig):
pass


@backup.command("flow-cells")
@DRY_RUN
@click.pass_obj
def backup_flow_cells(context: CGConfig, dry_run: bool):
"""Back-up flow cells."""
pdc_api = context.pdc_api
pdc_api.dry_run = dry_run
status_db: Store = context.status_db
flow_cells: list[FlowCellDirectoryData] = get_flow_cells_from_path(
flow_cells_dir=Path(context.flow_cells_dir)
)
for flow_cell in flow_cells:
db_flow_cell: Optional[Flowcell] = status_db.get_flow_cell_by_name(
flow_cell_name=flow_cell.id
)
flow_cell_encryption_api = FlowCellEncryptionAPI(
binary_path=context.encryption.binary_path,
dry_run=dry_run,
encryption_dir=Path(context.backup.encryption_directories.current),
flow_cell=flow_cell,
pigz_binary_path=context.pigz.binary_path,
slurm_api=SlurmAPI(),
sbatch_parameter=context.backup.slurm_flow_cell_encryption.dict(),
tar_api=TarAPI(binary_path=context.tar.binary_path, dry_run=dry_run),
)
try:
pdc_api.start_flow_cell_backup(
db_flow_cell=db_flow_cell,
flow_cell_encryption_api=flow_cell_encryption_api,
status_db=status_db,
)
except (
DsmcAlreadyRunningError,
FlowCellAlreadyBackedUpError,
FlowCellEncryptionError,
PdcError,
) as error:
logging.error(f"{error}")


@backup.command("encrypt-flow-cells")
@DRY_RUN
@click.pass_obj
def encrypt_flow_cells(context: CGConfig, dry_run: bool):
"""Encrypt flow cells."""
status_db: Store = context.status_db
for flow_cell in get_flow_cells_from_path(flow_cells_dir=Path(context.flow_cells_dir)):
flow_cells: list[FlowCellDirectoryData] = get_flow_cells_from_path(
flow_cells_dir=Path(context.flow_cells_dir)
)
for flow_cell in flow_cells:
db_flow_cell: Optional[Flowcell] = status_db.get_flow_cell_by_name(
flow_cell_name=flow_cell.id
)
Expand All @@ -60,7 +112,7 @@ def encrypt_flow_cells(context: CGConfig, dry_run: bool):
try:
flow_cell_encryption_api.start_encryption()
except (FlowCellError, FlowCellEncryptionError) as error:
logging.debug(f"{error}")
logging.error(f"{error}")


@backup.command("fetch-flow-cell")
Expand All @@ -70,7 +122,8 @@ def encrypt_flow_cells(context: CGConfig, dry_run: bool):
def fetch_flow_cell(context: CGConfig, dry_run: bool, flow_cell_id: Optional[str] = None):
"""Fetch the first flow cell in the requested queue from backup"""

pdc_api = PdcAPI(binary_path=context.pdc.binary_path, dry_run=dry_run)
pdc_api = context.pdc_api
pdc_api.dry_run = dry_run
encryption_api = EncryptionAPI(binary_path=context.encryption.binary_path, dry_run=dry_run)
tar_api = TarAPI(binary_path=context.tar.binary_path, dry_run=dry_run)
context.meta_apis["backup_api"] = BackupAPI(
Expand Down
14 changes: 13 additions & 1 deletion cg/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class CleanFlowCellFailedError(CgError):
"""


class DsmcAlreadyRunningError(CgError):
"""Raised when there is already a DCms process running on the system."""


class DecompressionNeededError(CgError):
"""Raised when decompression still needed to start analysis."""

Expand All @@ -92,6 +96,10 @@ class FlowCellEncryptionError(CgError):
"""Raised when there is a problem with encrypting a flow cell."""


class FlowCellAlreadyBackedUpError(CgError):
"""Raised when a flow cell is already backed-up."""


class HousekeeperFileMissingError(CgError):
"""
Exception raised when a file is missing in Housekeeper.
Expand Down Expand Up @@ -188,7 +196,11 @@ class LoqusdbDuplicateRecordError(LoqusdbError):
"""Exception related to duplicate records in Loqusdb."""


class PdcNoFilesMatchingSearchError(CgError):
class PdcError(CgError):
"""Exception raised when PDC API interaction errors."""


class PdcNoFilesMatchingSearchError(PdcError):
"""Exception raised when PDC API returns no files matching the search criteria."""


Expand Down
20 changes: 9 additions & 11 deletions cg/meta/backup/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,14 @@ def fetch_flow_cell(self, flow_cell: Optional[Flowcell] = None) -> Optional[floa
LOG.info(f"{flow_cell.name}: retrieving from PDC")

try:
dcms_output: list[str] = self.query_pdc_for_flow_cell(flow_cell.name)
dsmc_output: list[str] = self.query_pdc_for_flow_cell(flow_cell.name)

except PdcNoFilesMatchingSearchError as error:
LOG.error(f"PDC query failed: {error}")
raise error

archived_key: Path = self.get_archived_encryption_key_path(dcms_output=dcms_output)
archived_flow_cell: Path = self.get_archived_flow_cell_path(dcms_output=dcms_output)
archived_key: Path = self.get_archived_encryption_key_path(dsmc_output=dsmc_output)
archived_flow_cell: Path = self.get_archived_flow_cell_path(dsmc_output=dsmc_output)

if not self.dry_run:
return self._process_flow_cell(
Expand Down Expand Up @@ -279,11 +279,11 @@ def retrieve_archived_file(self, archived_file: Path, run_dir: Path) -> None:
)

@classmethod
def get_archived_flow_cell_path(cls, dcms_output: list[str]) -> Optional[Path]:
def get_archived_flow_cell_path(cls, dsmc_output: list[str]) -> Optional[Path]:
"""Get the path of the archived flow cell from a PDC query."""
flow_cell_line: str = [
row
for row in dcms_output
for row in dsmc_output
if FileExtensions.TAR in row
and FileExtensions.GZIP in row
and FileExtensions.GPG in row
Expand All @@ -295,11 +295,11 @@ def get_archived_flow_cell_path(cls, dcms_output: list[str]) -> Optional[Path]:
return archived_flow_cell

@classmethod
def get_archived_encryption_key_path(cls, dcms_output: list[str]) -> Optional[Path]:
def get_archived_encryption_key_path(cls, dsmc_output: list[str]) -> Optional[Path]:
"""Get the encryption key for the archived flow cell from a PDC query."""
encryption_key_line: str = [
row
for row in dcms_output
for row in dsmc_output
if FileExtensions.KEY in row
and FileExtensions.GPG in row
and FileExtensions.GZIP not in row
Expand Down Expand Up @@ -347,12 +347,10 @@ def encrypt_and_archive_spring_file(self, spring_file_path: Path) -> None:
self.encryption_api.key_asymmetric_encryption(spring_file_path)
self.encryption_api.compare_spring_file_checksums(spring_file_path)
self.pdc.archive_file_to_pdc(
file_path=str(self.encryption_api.encrypted_spring_file_path(spring_file_path)),
dry_run=self.dry_run,
file_path=str(self.encryption_api.encrypted_spring_file_path(spring_file_path))
)
self.pdc.archive_file_to_pdc(
file_path=str(self.encryption_api.encrypted_key_path(spring_file_path)),
dry_run=self.dry_run,
file_path=str(self.encryption_api.encrypted_key_path(spring_file_path))
)
self.mark_file_as_archived(spring_file_path)
self.encryption_api.cleanup(spring_file_path)
Expand Down
106 changes: 97 additions & 9 deletions cg/meta/backup/pdc.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
""" Module to group PDC related commands """

import logging
from pathlib import Path

import psutil

from cg.constants.pdc import DSMCParameters
from cg.exc import (
DsmcAlreadyRunningError,
FlowCellAlreadyBackedUpError,
FlowCellEncryptionError,
PdcError,
)
from cg.meta.encryption.encryption import FlowCellEncryptionAPI
from cg.store import Store
from cg.store.models import Flowcell
from cg.utils import Process

LOG = logging.getLogger(__name__)
Expand All @@ -13,16 +25,31 @@
class PdcAPI:
"""Group PDC related commands"""

def __init__(self, binary_path: str = None, dry_run: bool = False):
def __init__(self, binary_path: str, dry_run: bool = False):
self.process: Process = Process(binary=binary_path)
self.dry_run: bool = dry_run

def archive_file_to_pdc(self, file_path: str, dry_run: bool = False) -> None:
"""Archive a file by storing it on PDC"""
command: list = DSMCParameters.ARCHIVE_COMMAND.copy()
command.append(file_path)
if not dry_run:
self.run_dsmc_command(command=command)
@classmethod
def validate_is_dsmc_running(cls) -> bool:
"""Check if a Dsmc process is already running on the system.
Raises:
Exception: for all non-exit exceptions.
"""
is_dsmc_running: bool = False
try:
for process in psutil.process_iter():
if "dsmc" in process.name():
is_dsmc_running = True
except Exception as error:
LOG.debug(f"{error}")
if is_dsmc_running:
LOG.debug("A Dsmc process is already running")
return is_dsmc_running

def archive_file_to_pdc(self, file_path: str) -> None:
"""Archive a file by storing it on PDC."""
if not self.dry_run:
self.run_dsmc_command(command=DSMCParameters.ARCHIVE_COMMAND + [file_path])

def query_pdc(self, search_pattern: str) -> None:
"""Query PDC based on a given search pattern."""
Expand All @@ -41,7 +68,68 @@ def retrieve_file_from_pdc(self, file_path: str, target_path: str = None) -> Non
self.run_dsmc_command(command=command)

def run_dsmc_command(self, command: list) -> None:
"""Runs a DSMC command"""
"""Runs a DSMC command.
Raises:
PdcError when unable to process command.
"""
LOG.debug("Starting DSMC command:")
LOG.debug(f"{self.process.binary} {' '.join(command)}")
self.process.run_command(parameters=command, dry_run=self.dry_run)
try:
self.process.run_command(parameters=command, dry_run=self.dry_run)
except Exception as error:
raise PdcError(f"{error}") from error

def validate_is_flow_cell_backup_possible(
self, db_flow_cell: Flowcell, flow_cell_encryption_api: FlowCellEncryptionAPI
) -> bool:
"""Check if back-up of flow cell is possible.
Raises:
DsmcAlreadyRunningError if there is already a Dsmc process ongoing.
FlowCellAlreadyBackupError if flow cell is already backed up.
FlowCellEncryptionError if encryption is not complete.
"""
if self.validate_is_dsmc_running():
raise DsmcAlreadyRunningError("A Dsmc process is already running")
if db_flow_cell and db_flow_cell.has_backup:
raise FlowCellAlreadyBackedUpError(
f"Flow cell: {db_flow_cell.name} is already backed-up"
)
if not flow_cell_encryption_api.complete_file_path.exists():
raise FlowCellEncryptionError(
f"Flow cell: {flow_cell_encryption_api.flow_cell.id} encryption process is not complete"
)
LOG.debug("Flow cell can be backed up")

def backup_flow_cell(
self, files_to_archive: list[Path], store: Store, db_flow_cell: Flowcell
) -> None:
"""Back-up flow cell files."""
archived_file_count: int = 0
for encrypted_file in files_to_archive:
try:
self.archive_file_to_pdc(file_path=encrypted_file.as_posix())
archived_file_count += 1
except PdcError:
LOG.warning(f"{encrypted_file.as_posix()} cannot be archived")
if archived_file_count == len(files_to_archive) and not self.dry_run:
store.update_flow_cell_has_backup(flow_cell=db_flow_cell, has_backup=True)
LOG.info(f"Flow cell: {db_flow_cell.name} has been backed up")

def start_flow_cell_backup(
self,
db_flow_cell: Flowcell,
flow_cell_encryption_api: FlowCellEncryptionAPI,
status_db: Store,
) -> None:
"""Check if back-up of flow cell is possible and if so starts it."""
self.validate_is_flow_cell_backup_possible(
db_flow_cell=db_flow_cell, flow_cell_encryption_api=flow_cell_encryption_api
)
self.backup_flow_cell(
files_to_archive=[
flow_cell_encryption_api.final_passphrase_file_path,
flow_cell_encryption_api.encrypted_gpg_file_path,
],
store=status_db,
db_flow_cell=db_flow_cell,
)
28 changes: 14 additions & 14 deletions cg/meta/report/field_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,34 +37,34 @@ def get_empty_fields(report_data: dict) -> list:
def get_empty_report_data(report_data: ReportModel) -> dict:
"""Retrieve empty fields from a report data model."""
empty_fields = {
"report": get_empty_fields(report_data=report_data.dict()),
"customer": get_empty_fields(report_data=report_data.customer.dict()),
"case": get_empty_fields(report_data=report_data.case.dict()),
"report": get_empty_fields(report_data=report_data.model_dump()),
"customer": get_empty_fields(report_data=report_data.customer.model_dump()),
"case": get_empty_fields(report_data=report_data.case.model_dump()),
"applications": {
app.tag: get_empty_fields(report_data=app.dict())
app.tag: get_empty_fields(report_data=app.model_dump())
for app in report_data.case.applications
if get_empty_fields(report_data=app.dict())
if get_empty_fields(report_data=app.model_dump())
},
"data_analysis": get_empty_fields(report_data=report_data.case.data_analysis.dict()),
"data_analysis": get_empty_fields(report_data=report_data.case.data_analysis.model_dump()),
"samples": {
sample.id: get_empty_fields(report_data=sample.dict())
sample.id: get_empty_fields(report_data=sample.model_dump())
for sample in report_data.case.samples
if get_empty_fields(report_data=sample.dict())
if get_empty_fields(report_data=sample.model_dump())
},
"methods": {
sample.id: get_empty_fields(report_data=sample.methods.dict())
sample.id: get_empty_fields(report_data=sample.methods.model_dump())
for sample in report_data.case.samples
if get_empty_fields(report_data=sample.methods.dict())
if get_empty_fields(report_data=sample.methods.model_dump())
},
"timestamps": {
sample.id: get_empty_fields(report_data=sample.timestamps.dict())
sample.id: get_empty_fields(report_data=sample.timestamps.model_dump())
for sample in report_data.case.samples
if get_empty_fields(report_data=sample.timestamps.dict())
if get_empty_fields(report_data=sample.timestamps.model_dump())
},
"metadata": {
sample.id: get_empty_fields(report_data=sample.metadata.dict())
sample.id: get_empty_fields(report_data=sample.metadata.model_dump())
for sample in report_data.case.samples
if get_empty_fields(report_data=sample.metadata.dict())
if get_empty_fields(report_data=sample.metadata.model_dump())
},
}
# Clear empty values
Expand Down
Loading

0 comments on commit 2b6350f

Please sign in to comment.