Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new delivery flow) #3598

Merged
merged 85 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from 76 commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
0a96f67
initial commit
ChrOertlin Aug 19, 2024
f45df6e
Merge branch 'master' into dev-new-delivery-service
ChrOertlin Aug 21, 2024
5857eab
add(fetch delivery files service) (#3601)
ChrOertlin Aug 21, 2024
44bbe28
fix imports
ChrOertlin Aug 21, 2024
4dd94c8
initial commit
ChrOertlin Aug 21, 2024
c09db2f
fixes
ChrOertlin Aug 21, 2024
f449da4
fix
ChrOertlin Aug 21, 2024
33456ef
add test
ChrOertlin Aug 21, 2024
f175540
fix test
ChrOertlin Aug 22, 2024
511ed7f
add exists assertion
ChrOertlin Aug 22, 2024
9063fc8
add func call
ChrOertlin Aug 22, 2024
8a64d3b
initial commit
ChrOertlin Aug 22, 2024
83d6c9f
Update tests/fixture_plugins/delivery_fixtures/delivery_files_models_…
ChrOertlin Aug 22, 2024
bacaa11
Update tests/fixture_plugins/delivery_fixtures/delivery_files_models_…
ChrOertlin Aug 22, 2024
0533071
Apply suggestions from code review
ChrOertlin Aug 22, 2024
3fbc9ef
review
ChrOertlin Aug 22, 2024
4eca28e
Merge branch 'add-file-mover' into add-file-reformatters
ChrOertlin Aug 22, 2024
086c74d
feat(delivery file mover service) (#3622)
ChrOertlin Aug 22, 2024
f83f694
conflicsts
ChrOertlin Aug 22, 2024
652938e
refactor logic
ChrOertlin Aug 26, 2024
1aad85b
fix formatter
ChrOertlin Aug 26, 2024
6cabd68
fix concatenation
ChrOertlin Aug 26, 2024
56d7ec1
tests
ChrOertlin Aug 26, 2024
2845d05
add tests for formatters
ChrOertlin Aug 27, 2024
481c518
add test
ChrOertlin Aug 27, 2024
293ee09
review comments
ChrOertlin Aug 27, 2024
273f19f
setup factory
ChrOertlin Aug 27, 2024
981395f
add supporting functions
ChrOertlin Aug 27, 2024
db99c05
make super call
ChrOertlin Aug 27, 2024
e187522
Merge branch 'add-file-reformatters' into add-delivery-services
ChrOertlin Aug 27, 2024
6ab6d5e
changes
ChrOertlin Aug 27, 2024
93056cd
simplifiy formatter
ChrOertlin Aug 27, 2024
c13ce34
add docstring
ChrOertlin Aug 27, 2024
5d1ddb9
solve imports
ChrOertlin Aug 27, 2024
ebcf010
Merge branch 'add-file-reformatters' into add-delivery-services
ChrOertlin Aug 27, 2024
25e8e15
add factory
ChrOertlin Aug 27, 2024
e20ccb0
add builder test
ChrOertlin Aug 27, 2024
79d5ca3
add(delivery file formatter) (#3626)
ChrOertlin Aug 29, 2024
b1e1e2b
merge conflicts
ChrOertlin Aug 29, 2024
e490438
cleanup
ChrOertlin Aug 29, 2024
c751d50
fix doc
ChrOertlin Aug 29, 2024
2bda145
cleanup
ChrOertlin Aug 29, 2024
da1416d
Apply suggestions from code review
ChrOertlin Aug 29, 2024
befb44b
typehints
ChrOertlin Aug 29, 2024
a63e099
merge
ChrOertlin Aug 29, 2024
6f8c117
rename class
ChrOertlin Aug 29, 2024
40f7ac3
missing typehints
ChrOertlin Aug 29, 2024
f04e95f
add(delivery service factory) (#3647)
ChrOertlin Aug 29, 2024
01a9187
Merge branch 'dev-new-delivery-service' of https://github.com/Clinica…
ChrOertlin Sep 2, 2024
df06b43
fix(new delivery service calls in CLI) (#3673)
ChrOertlin Sep 3, 2024
091b238
Merge branch 'master' into dev-new-delivery-service
ChrOertlin Sep 3, 2024
e4e5f52
cases have no workflows
ChrOertlin Sep 3, 2024
c5946a4
Merge branch 'dev-new-delivery-service' of https://github.com/Clinica…
ChrOertlin Sep 3, 2024
cd66e45
add delivery type validation in factory
ChrOertlin Sep 3, 2024
9f9af6c
Merge branch 'master' into dev-new-delivery-service
ChrOertlin Sep 3, 2024
3d9fa82
Merge branch 'master' into dev-new-delivery-service
ChrOertlin Sep 4, 2024
3cbc21d
add error for analysis bundles
ChrOertlin Sep 4, 2024
7441760
hanlde error better rysnc
ChrOertlin Sep 4, 2024
5d5dc0a
remove concatenation from mutant
ChrOertlin Sep 4, 2024
e6055b2
add delete ticket folder when already exists
ChrOertlin Sep 4, 2024
3290091
fix mutant factory test
ChrOertlin Sep 4, 2024
e089a7f
fix(clinical delivery call) (#3688)
ChrOertlin Sep 5, 2024
c762c03
refactor(rsync api) (#3694)
ChrOertlin Sep 6, 2024
c2efaa3
Merge branch 'master' into dev-new-delivery-service
ChrOertlin Sep 9, 2024
6ceb0ae
add(CLI automatic delivery fastq workflow) (#3692)
ChrOertlin Sep 9, 2024
d7553d0
fix
ChrOertlin Sep 9, 2024
29053bc
remove unused var
ChrOertlin Sep 9, 2024
0c98d9e
fix bug
ChrOertlin Sep 9, 2024
57319e6
simplify rsync
ChrOertlin Sep 9, 2024
48d01c4
revert rsync changes
ChrOertlin Sep 9, 2024
dde6123
handle houskeeper error
ChrOertlin Sep 9, 2024
eda8de2
homogenise naming
ChrOertlin Sep 9, 2024
9433507
clarification docstring
ChrOertlin Sep 9, 2024
534534a
clear imports
ChrOertlin Sep 9, 2024
f01ae43
function hidden
ChrOertlin Sep 9, 2024
276f0b1
add cli help
ChrOertlin Sep 9, 2024
c8c0fa3
Apply suggestions from code review
ChrOertlin Sep 10, 2024
00d47ac
named args
ChrOertlin Sep 10, 2024
f4108f3
Merge branch 'master' into dev-new-delivery-service
ChrOertlin Sep 10, 2024
e2e4ee5
Apply suggestions from code review
ChrOertlin Sep 10, 2024
786a647
add delivery type sanitasion
ChrOertlin Sep 10, 2024
5484dd7
typehints
ChrOertlin Sep 10, 2024
900e3f9
Apply suggestions from code review
ChrOertlin Sep 10, 2024
da8d117
review
ChrOertlin Sep 10, 2024
ec4b654
Update tests/services/file_delivery/rsync_service/conftest.py
ChrOertlin Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 107 additions & 140 deletions cg/cli/deliver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,44 +6,33 @@
import click

from cg.apps.tb import TrailblazerAPI
from cg.cli.deliver.utils import deliver_raw_data_for_analyses
from cg.cli.utils import CLICK_CONTEXT_SETTINGS
from cg.constants import Workflow
from cg.constants.cli_options import DRY_RUN
from cg.constants.delivery import PIPELINE_ANALYSIS_OPTIONS, PIPELINE_ANALYSIS_TAG_MAP
from cg.meta.deliver import DeliverAPI, DeliverTicketAPI
from cg.meta.rsync.rsync_api import RsyncAPI
from cg.constants.delivery import FileDeliveryOption
from cg.services.deliver_files.delivery_rsync_service.delivery_rsync_service import (
DeliveryRsyncService,
)
from cg.models.cg_config import CGConfig
from cg.services.fastq_concatenation_service.fastq_concatenation_service import (
FastqConcatenationService,
from cg.services.deliver_files.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
from cg.services.deliver_files.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.store.models import Case
from cg.store.store import Store
from cg.store.models import Case, Analysis

LOG = logging.getLogger(__name__)

DELIVERY_TYPE = click.option(
"-d",
"--delivery-type",
multiple=True,
type=click.Choice(PIPELINE_ANALYSIS_OPTIONS),
required=True,
)
FORCE_ALL = click.option(
"--force-all",
help=(
"Force delivery of all sample files "
"- disregarding of amount of reads or previous deliveries"
),
is_flag=True,
)
TICKET_ID_ARG = click.argument("ticket", type=str, required=True)

IGNORE_MISSING_BUNDLES = click.option(
"-i",
"--ignore-missing-bundles",
help="Ignore errors due to missing case bundles",
is_flag=True,
default=False,
multiple=False,
type=click.Choice(choices=[option for option in FileDeliveryOption]),
required=False,
)
TICKET_ID_ARG = click.option("-t", "--ticket", type=str, required=True)


@click.group(context_settings=CLICK_CONTEXT_SETTINGS)
Expand All @@ -52,141 +41,119 @@ def deliver():
LOG.info("Running CG deliver")


@deliver.command(name="analysis")
@DRY_RUN
@DELIVERY_TYPE
@click.option("-c", "--case-id", help="Deliver the files for a specific case")
@click.option(
"-t", "--ticket", type=str, help="Deliver the files for ALL cases connected to a ticket"
)
@FORCE_ALL
@IGNORE_MISSING_BUNDLES
@click.pass_obj
def deliver_analysis(
context: CGConfig,
case_id: str | None,
ticket: str | None,
delivery_type: list[str],
dry_run: bool,
force_all: bool,
ignore_missing_bundles: bool,
):
"""Deliver analysis files to customer inbox

Files can be delivered either on case level or for all cases connected to a ticket.
Any of those needs to be specified.
"""
if not (case_id or ticket):
LOG.info("Please provide a case-id or ticket-id")
return

inbox: str = context.delivery_path
if not inbox:
LOG.info("Please specify the root path for where files should be delivered")
return

status_db: Store = context.status_db
for delivery in delivery_type:
deliver_api = DeliverAPI(
store=status_db,
hk_api=context.housekeeper_api,
case_tags=PIPELINE_ANALYSIS_TAG_MAP[delivery]["case_tags"],
sample_tags=PIPELINE_ANALYSIS_TAG_MAP[delivery]["sample_tags"],
project_base_path=Path(inbox),
delivery_type=delivery,
force_all=force_all,
ignore_missing_bundles=ignore_missing_bundles,
fastq_file_service=FastqConcatenationService(),
)
deliver_api.set_dry_run(dry_run)
cases: list[Case] = []
if case_id:
case_obj: Case = status_db.get_case_by_internal_id(internal_id=case_id)
if not case_obj:
LOG.warning(f"Could not find case {case_id}")
return
cases.append(case_obj)
else:
cases: list[Case] = status_db.get_cases_by_ticket_id(ticket_id=ticket)
if not cases:
LOG.warning(f"Could not find cases for ticket {ticket}")
return

for case_obj in cases:
deliver_api.deliver_files(case_obj=case_obj)


@deliver.command(name="rsync")
@DRY_RUN
@TICKET_ID_ARG
@click.pass_obj
@TICKET_ID_ARG
@DRY_RUN
def rsync(context: CGConfig, ticket: str, dry_run: bool):
"""The folder generated using the "cg deliver analysis" command will be
rsynced with this function to the customers inbox on the delivery server
"""
tb_api: TrailblazerAPI = context.trailblazer_api
rsync_api: RsyncAPI = RsyncAPI(config=context)
slurm_id = rsync_api.run_rsync_on_slurm(ticket=ticket, dry_run=dry_run)
rsync_api: DeliveryRsyncService = context.delivery_rsync_service
slurm_id = rsync_api.run_rsync_for_ticket(ticket=ticket, dry_run=dry_run)
LOG.info(f"Rsync to the delivery server running as job {slurm_id}")
rsync_api.add_to_trailblazer_api(
tb_api=tb_api, slurm_job_id=slurm_id, ticket=ticket, dry_run=dry_run
)


@deliver.command(name="concatenate")
@deliver.command(
name="case",
help="Deliver all case files based on delivery type to the customer inbox on the HPC "
"and start an Rsync job to clinical-delivery. "
"NOTE: the dry-run flag will copy files to the customer inbox on Hasta, "
"but will not perform the Rsync job.",
)
@click.pass_obj
@click.option(
"-c",
"--case-id",
required=True,
help="Deliver the files for a specific case.",
)
@DELIVERY_TYPE
@DRY_RUN
@TICKET_ID_ARG
@click.pass_context
def concatenate(context: click.Context, ticket: str, dry_run: bool):
"""The fastq files in the folder generated using "cg deliver analysis"
will be concatenated into one forward and one reverse fastq file
def deliver_case(
context: CGConfig,
case_id: str,
delivery_type: FileDeliveryOption,
dry_run: bool,
):
"""
cg_context: CGConfig = context.obj
deliver_ticket_api = DeliverTicketAPI(config=cg_context)
deliver_ticket_api.concatenate_fastq_files(ticket=ticket, dry_run=dry_run)
Deliver all case files based on delivery type to the customer inbox on the HPC
"""
inbox: str = context.delivery_path
service_builder: DeliveryServiceFactory = context.delivery_service_factory
case: Case = context.status_db.get_case_by_internal_id(internal_id=case_id)
if not case:
LOG.error(f"Could not find case with id {case_id}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else case.data_delivery,
workflow=case.data_analysis,
)
delivery_service.deliver_files_for_case(
case=case, delivery_base_path=Path(inbox), dry_run=dry_run
)


@deliver.command(name="ticket")
@deliver.command(
name="ticket",
help="Deliver all case files for cases in a ticket based on delivery type to the customer inbox on the HPC "
"and start an Rsync job to clinical-delivery. "
"NOTE: the dry-run flag will copy files to the customer inbox on Hasta, "
"but will not perform the Rsync job.",
)
@click.pass_obj
@TICKET_ID_ARG
@DELIVERY_TYPE
@DRY_RUN
@FORCE_ALL
@IGNORE_MISSING_BUNDLES
@click.option(
"-t",
"--ticket",
type=str,
help="Deliver and rsync the files for ALL cases connected to a ticket",
required=True,
)
@click.pass_context
def deliver_ticket(
context: click.Context,
delivery_type: list[str],
dry_run: bool,
force_all: bool,
context: CGConfig,
ticket: str,
ignore_missing_bundles: bool,
delivery_type: FileDeliveryOption,
dry_run: bool,
):
"""Will first collect hard links in the customer inbox then
concatenate fastq files if needed and finally send the folder
from customer inbox hasta to the customer inbox on the delivery server
"""
cg_context: CGConfig = context.obj
deliver_ticket_api = DeliverTicketAPI(config=cg_context)
is_upload_needed = deliver_ticket_api.check_if_upload_is_needed(ticket=ticket)
if is_upload_needed or force_all:
LOG.info("Delivering files to customer inbox on the HPC")
context.invoke(
deliver_analysis,
delivery_type=delivery_type,
dry_run=dry_run,
force_all=force_all,
ticket=ticket,
ignore_missing_bundles=ignore_missing_bundles,
)
else:
LOG.info("Files already delivered to customer inbox on the HPC")
Deliver all case files based on delivery type to the customer inbox on the HPC for cases connected to a ticket.
"""
inbox: str = context.delivery_path
service_builder: DeliveryServiceFactory = context.delivery_service_factory
cases: list[Case] = context.status_db.get_cases_by_ticket_id(ticket_id=ticket)
if not cases:
LOG.error(f"Could not find case connected to ticket {ticket}")
return
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=delivery_type if delivery_type else cases[0].data_delivery,
workflow=cases[0].data_analysis,
)
delivery_service.deliver_files_for_ticket(
ticket_id=ticket, delivery_base_path=Path(inbox), dry_run=dry_run
)

deliver_ticket_api.report_missing_samples(ticket=ticket, dry_run=dry_run)
context.invoke(rsync, ticket=ticket, dry_run=dry_run)

@deliver.command(name="auto-raw-data")
@click.pass_obj
@DRY_RUN
def deliver_auto_raw_data(context: CGConfig, dry_run: bool):
"""
Deliver all case files for the raw data workflow to the customer inbox on the HPC and start a Rsync job.
1. get all cases with analysis type fastq that need to be delivered
2. check if their upload has started
3. if not, start the upload
4. update the uploaded at field
"""
"""Starts upload of all not previously uploaded cases with analysis type fastq to
clinical-delivery."""
service_builder: DeliveryServiceFactory = context.delivery_service_factory
analyses: list[Analysis] = context.analysis_service.get_analyses_to_upload_for_workflow(
workflow=Workflow.FASTQ
)
deliver_raw_data_for_analyses(
analyses=analyses,
status_db=context.status_db,
delivery_path=Path(context.delivery_path),
service_builder=service_builder,
dry_run=dry_run,
)
46 changes: 46 additions & 0 deletions cg/cli/deliver/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import logging
from datetime import datetime
from pathlib import Path

from cg.constants import Workflow
from cg.services.deliver_files.deliver_files_service.deliver_files_service import (
DeliverFilesService,
)
from cg.services.deliver_files.deliver_files_service.deliver_files_service_factory import (
DeliveryServiceFactory,
)
from cg.store.models import Case, Analysis
from cg.store.store import Store


LOG = logging.getLogger(__name__)


def deliver_raw_data_for_analyses(
analyses: list[Analysis],
status_db: Store,
delivery_path: Path,
service_builder: DeliveryServiceFactory,
dry_run: bool,
):
"""Deliver raw data for a list of analyses"""
for analysis in analyses:
try:
case: Case = analysis.case
delivery_service: DeliverFilesService = service_builder.build_delivery_service(
delivery_type=case.data_delivery,
workflow=Workflow.FASTQ,
)

delivery_service.deliver_files_for_case(
case=case, delivery_base_path=delivery_path, dry_run=dry_run
)
status_db.update_analysis_upload_started_at(
analysis_id=analysis.id, upload_started_at=datetime.now()
)
except Exception as error:
status_db.update_analysis_upload_started_at(
analysis_id=analysis.id, upload_started_at=None
)
LOG.error(f"Could not deliver files for analysis {analysis.id}: {error}")
continue
3 changes: 0 additions & 3 deletions cg/cli/upload/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import click

from cg.cli.upload.clinical_delivery import auto_fastq, upload_clinical_delivery
from cg.cli.upload.coverage import upload_coverage
from cg.cli.upload.delivery_report import upload_delivery_report_to_scout
from cg.cli.upload.fohm import fohm
Expand Down Expand Up @@ -132,15 +131,13 @@ def upload_all_completed_analyses(context: click.Context, workflow: Workflow = N
sys.exit(exit_code)


upload.add_command(auto_fastq)
upload.add_command(create_scout_load_config)
upload.add_command(fohm)
upload.add_command(nipt)
upload.add_command(process_solved)
upload.add_command(processed_solved)
upload.add_command(upload_available_observations_to_loqusdb)
upload.add_command(upload_case_to_scout)
upload.add_command(upload_clinical_delivery)
upload.add_command(upload_coverage)
upload.add_command(upload_delivery_report_to_scout)
upload.add_command(upload_genotypes)
Expand Down
Loading
Loading