Skip to content

Commit

Permalink
feat(new delivery flow) (#3598) (major)
Browse files Browse the repository at this point in the history
# Description

Replaces the delivery api with a new delivery service.
  • Loading branch information
ChrOertlin authored Sep 10, 2024
1 parent 8901c8d commit 6a221f1
Show file tree
Hide file tree
Showing 88 changed files with 2,613 additions and 1,906 deletions.
247 changes: 107 additions & 140 deletions cg/cli/deliver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,33 @@
import click

from cg.apps.tb import TrailblazerAPI
from cg.cli.deliver.utils import deliver_raw_data_for_analyses
from cg.cli.utils import CLICK_CONTEXT_SETTINGS
from cg.constants import Workflow
from cg.constants.cli_options import DRY_RUN
from cg.constants.delivery import PIPELINE_ANALYSIS_OPTIONS, PIPELINE_ANALYSIS_TAG_MAP
from cg.meta.deliver import DeliverAPI, DeliverTicketAPI
from cg.meta.rsync.rsync_api import RsyncAPI
from cg.constants.delivery import FileDeliveryOption
from cg.services.deliver_files.delivery_rsync_service.delivery_rsync_service import (
DeliveryRsyncService,
)
from cg.models.cg_config import CGConfig
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
from cg.services.deliver_files.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
from cg.services.deliver_files.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.store.models import Case
from cg.store.store import Store
from cg.store.models import Case, Analysis

LOG = logging.getLogger(__name__)

DELIVERY_TYPE = click.option(
"-d",
"--delivery-type",
multiple=True,
type=click.Choice(PIPELINE_ANALYSIS_OPTIONS),
required=True,
)
FORCE_ALL = click.option(
"--force-all",
help=(
"Force delivery of all sample files "
"- disregarding of amount of reads or previous deliveries"
),
is_flag=True,
)
TICKET_ID_ARG = click.argument("ticket", type=str, required=True)

IGNORE_MISSING_BUNDLES = click.option(
"-i",
"--ignore-missing-bundles",
help="Ignore errors due to missing case bundles",
is_flag=True,
default=False,
multiple=False,
type=click.Choice(choices=[option for option in FileDeliveryOption]),
required=False,
)
TICKET_ID_ARG = click.option("-t", "--ticket", type=str, required=True)


@click.group(context_settings=CLICK_CONTEXT_SETTINGS)
Expand All @@ -52,141 +41,119 @@ def deliver():
LOG.info("Running CG deliver")


@deliver.command(name="analysis")
@DRY_RUN
@DELIVERY_TYPE
@click.option("-c", "--case-id", help="Deliver the files for a specific case")
@click.option(
"-t", "--ticket", type=str, help="Deliver the files for ALL cases connected to a ticket"
)
@FORCE_ALL
@IGNORE_MISSING_BUNDLES
@click.pass_obj
def deliver_analysis(
context: CGConfig,
case_id: str | None,
ticket: str | None,
delivery_type: list[str],
dry_run: bool,
force_all: bool,
ignore_missing_bundles: bool,
):
"""Deliver analysis files to customer inbox
Files can be delivered either on case level or for all cases connected to a ticket.
Any of those needs to be specified.
"""
if not (case_id or ticket):
LOG.info("Please provide a case-id or ticket-id")
return

inbox: str = context.delivery_path
if not inbox:
LOG.info("Please specify the root path for where files should be delivered")
return

status_db: Store = context.status_db
for delivery in delivery_type:
deliver_api = DeliverAPI(
store=status_db,
hk_api=context.housekeeper_api,
case_tags=PIPELINE_ANALYSIS_TAG_MAP[delivery]["case_tags"],
sample_tags=PIPELINE_ANALYSIS_TAG_MAP[delivery]["sample_tags"],
project_base_path=Path(inbox),
delivery_type=delivery,
force_all=force_all,
ignore_missing_bundles=ignore_missing_bundles,
fastq_file_service=FastqConcatenationService(),
)
deliver_api.set_dry_run(dry_run)
cases: list[Case] = []
if case_id:
case_obj: Case = status_db.get_case_by_internal_id(internal_id=case_id)
if not case_obj:
LOG.warning(f"Could not find case {case_id}")
return
cases.append(case_obj)
else:
cases: list[Case] = status_db.get_cases_by_ticket_id(ticket_id=ticket)
if not cases:
LOG.warning(f"Could not find cases for ticket {ticket}")
return

for case_obj in cases:
deliver_api.deliver_files(case_obj=case_obj)


@deliver.command(name="rsync")
@DRY_RUN
@TICKET_ID_ARG
@click.pass_obj
@TICKET_ID_ARG
@DRY_RUN
def rsync(context: CGConfig, ticket: str, dry_run: bool):
"""The folder generated using the "cg deliver analysis" command will be
rsynced with this function to the customers inbox on the delivery server
"""
tb_api: TrailblazerAPI = context.trailblazer_api
rsync_api: RsyncAPI = RsyncAPI(config=context)
slurm_id = rsync_api.run_rsync_on_slurm(ticket=ticket, dry_run=dry_run)
rsync_api: DeliveryRsyncService = context.delivery_rsync_service
slurm_id = rsync_api.run_rsync_for_ticket(ticket=ticket, dry_run=dry_run)
LOG.info(f"Rsync to the delivery server running as job {slurm_id}")
rsync_api.add_to_trailblazer_api(
tb_api=tb_api, slurm_job_id=slurm_id, ticket=ticket, dry_run=dry_run
)


@deliver.command(name="concatenate")
@deliver.command(
name="case",
help="Deliver all case files based on delivery type to the customer inbox on the HPC "
"and start an Rsync job to clinical-delivery. "
"NOTE: the dry-run flag will copy files to the customer inbox on Hasta, "
"but will not perform the Rsync job.",
)
@click.pass_obj
@click.option(
"-c",
"--case-id",
required=True,
help="Deliver the files for a specific case.",
)
@DELIVERY_TYPE
@DRY_RUN
@TICKET_ID_ARG
@click.pass_context
def concatenate(context: click.Context, ticket: str, dry_run: bool):
"""The fastq files in the folder generated using "cg deliver analysis"
will be concatenated into one forward and one reverse fastq file
def deliver_case(
context: CGConfig,
case_id: str,
delivery_type: FileDeliveryOption,
dry_run: bool,
):
"""
cg_context: CGConfig = context.obj
deliver_ticket_api = DeliverTicketAPI(config=cg_context)
deliver_ticket_api.concatenate_fastq_files(ticket=ticket, dry_run=dry_run)
Deliver all case files based on delivery type to the customer inbox on the HPC
"""
inbox: str = context.delivery_path
service_builder: DeliveryServiceFactory = context.delivery_service_factory
case: Case = context.status_db.get_case_by_internal_id(internal_id=case_id)
if not case:
LOG.error(f"Could not find case with id {case_id}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else case.data_delivery,
workflow=case.data_analysis,
)
delivery_service.deliver_files_for_case(
case=case, delivery_base_path=Path(inbox), dry_run=dry_run
)


@deliver.command(name="ticket")
@deliver.command(
name="ticket",
help="Deliver all case files for cases in a ticket based on delivery type to the customer inbox on the HPC "
"and start an Rsync job to clinical-delivery. "
"NOTE: the dry-run flag will copy files to the customer inbox on Hasta, "
"but will not perform the Rsync job.",
)
@click.pass_obj
@TICKET_ID_ARG
@DELIVERY_TYPE
@DRY_RUN
@FORCE_ALL
@IGNORE_MISSING_BUNDLES
@click.option(
"-t",
"--ticket",
type=str,
help="Deliver and rsync the files for ALL cases connected to a ticket",
required=True,
)
@click.pass_context
def deliver_ticket(
context: click.Context,
delivery_type: list[str],
dry_run: bool,
force_all: bool,
context: CGConfig,
ticket: str,
ignore_missing_bundles: bool,
delivery_type: FileDeliveryOption,
dry_run: bool,
):
"""Will first collect hard links in the customer inbox then
concatenate fastq files if needed and finally send the folder
from customer inbox hasta to the customer inbox on the delivery server
"""
cg_context: CGConfig = context.obj
deliver_ticket_api = DeliverTicketAPI(config=cg_context)
is_upload_needed = deliver_ticket_api.check_if_upload_is_needed(ticket=ticket)
if is_upload_needed or force_all:
LOG.info("Delivering files to customer inbox on the HPC")
context.invoke(
deliver_analysis,
delivery_type=delivery_type,
dry_run=dry_run,
force_all=force_all,
ticket=ticket,
ignore_missing_bundles=ignore_missing_bundles,
)
else:
LOG.info("Files already delivered to customer inbox on the HPC")
Deliver all case files based on delivery type to the customer inbox on the HPC for cases connected to a ticket.
"""
inbox: str = context.delivery_path
service_builder: DeliveryServiceFactory = context.delivery_service_factory
cases: list[Case] = context.status_db.get_cases_by_ticket_id(ticket_id=ticket)
if not cases:
LOG.error(f"Could not find case connected to ticket {ticket}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else cases[0].data_delivery,
workflow=cases[0].data_analysis,
)
delivery_service.deliver_files_for_ticket(
ticket_id=ticket, delivery_base_path=Path(inbox), dry_run=dry_run
)

deliver_ticket_api.report_missing_samples(ticket=ticket, dry_run=dry_run)
context.invoke(rsync, ticket=ticket, dry_run=dry_run)

@deliver.command(name="auto-raw-data")
@click.pass_obj
@DRY_RUN
def deliver_auto_raw_data(context: CGConfig, dry_run: bool):
"""
Deliver all case files for the raw data workflow to the customer inbox on the HPC and start a Rsync job.
1. get all cases with analysis type fastq that need to be delivered
2. check if their upload has started
3. if not, start the upload
4. update the uploaded at field
"""
"""Starts upload of all not previously uploaded cases with analysis type fastq to
clinical-delivery."""
service_builder: DeliveryServiceFactory = context.delivery_service_factory
analyses: list[Analysis] = context.analysis_service.get_analyses_to_upload_for_workflow(
workflow=Workflow.FASTQ
)
deliver_raw_data_for_analyses(
analyses=analyses,
status_db=context.status_db,
delivery_path=Path(context.delivery_path),
service_builder=service_builder,
dry_run=dry_run,
)
46 changes: 46 additions & 0 deletions cg/cli/deliver/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging
from datetime import datetime
from pathlib import Path

from cg.constants import Workflow
from cg.services.deliver_files.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
from cg.services.deliver_files.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.store.models import Case, Analysis
from cg.store.store import Store


LOG = logging.getLogger(__name__)


def deliver_raw_data_for_analyses(
analyses: list[Analysis],
status_db: Store,
delivery_path: Path,
service_builder: DeliveryServiceFactory,
dry_run: bool,
):
"""Deliver raw data for a list of analyses"""
for analysis in analyses:
try:
case: Case = analysis.case
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=case.data_delivery,
workflow=Workflow.FASTQ,
)

delivery_service.deliver_files_for_case(
case=case, delivery_base_path=delivery_path, dry_run=dry_run
)
status_db.update_analysis_upload_started_at(
analysis_id=analysis.id, upload_started_at=datetime.now()
)
except Exception as error:
status_db.update_analysis_upload_started_at(
analysis_id=analysis.id, upload_started_at=None
)
LOG.error(f"Could not deliver files for analysis {analysis.id}: {error}")
continue
3 changes: 0 additions & 3 deletions cg/cli/upload/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import click

from cg.cli.upload.clinical_delivery import auto_fastq, upload_clinical_delivery
from cg.cli.upload.coverage import upload_coverage
from cg.cli.upload.delivery_report import upload_delivery_report_to_scout
from cg.cli.upload.fohm import fohm
Expand Down Expand Up @@ -132,15 +131,13 @@ def upload_all_completed_analyses(context: click.Context, workflow: Workflow = N
sys.exit(exit_code)


upload.add_command(auto_fastq)
upload.add_command(create_scout_load_config)
upload.add_command(fohm)
upload.add_command(nipt)
upload.add_command(process_solved)
upload.add_command(processed_solved)
upload.add_command(upload_available_observations_to_loqusdb)
upload.add_command(upload_case_to_scout)
upload.add_command(upload_clinical_delivery)
upload.add_command(upload_coverage)
upload.add_command(upload_delivery_report_to_scout)
upload.add_command(upload_genotypes)
Expand Down
Loading

0 comments on commit 6a221f1

Please sign in to comment.