-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.nf
550 lines (494 loc) · 19.9 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
#!/usr/bin/env nextflow
import groovy.json.JsonBuilder
nextflow.enable.dsl = 2
include { fastq_ingress } from "./lib/ingress"
include { configure_igv } from "./lib/common"
include { pipeline as variantCallingPipeline } from "./modules/local/variant-calling"
include {
pipeline as deNovoPipeline_asm; pipeline as deNovoPipeline_spoa;
} from "./modules/local/de-novo"
OPTIONAL_FILE = file("$projectDir/data/OPTIONAL_FILE")
process getVersions {
label "wfamplicon"
cpus 1
memory "2 GB"
output: path "versions.txt"
script:
"""
python --version | tr ' ' ',' | sed 's/P/p/' > versions.txt
fastcat --version | sed 's/^/fastcat,/' >> versions.txt
python -c "import ezcharts; print(f'ezcharts,{ezcharts.__version__}')" >> versions.txt
python -c "import dominate; print(f'dominate,{dominate.__version__}')" >> versions.txt
python -c "import numpy; print(f'numpy,{numpy.__version__}')" >> versions.txt
python -c "import pandas; print(f'pandas,{pandas.__version__}')" >> versions.txt
python -c "import pysam; print(f'pysam,{pysam.__version__}')" >> versions.txt
python -c "import si_prefix; print(f'si-prefix,{si_prefix.__version__}')" >> versions.txt
seqkit version | tr ' ' ',' >> versions.txt
printf "porechop,%s\\n" \$(porechop --version) >> versions.txt
samtools --version | head -n1 | tr ' ' ',' >> versions.txt
printf "minimap2,%s\\n" \$(minimap2 --version) >> versions.txt
mosdepth --version | tr ' ' ',' >> versions.txt
printf "miniasm,%s\\n" \$(miniasm -V) >> versions.txt
printf "racon,%s\\n" \$(racon --version) >> versions.txt
printf "csvtk,%s\\n" \$(csvtk version | grep -oP '\\d+\\.\\d+.\\d+') >> versions.txt
"""
}
process addMedakaToVersionsFile {
publishDir "${params.out_dir}", mode: 'copy', pattern: "versions.txt"
label "medaka"
cpus 1
memory "2 GB"
input: path "old_versions.txt"
output: path "versions.txt"
script:
"""
cp old_versions.txt versions.txt
medaka --version | tr ' ' ',' >> versions.txt
"""
}
process getParams {
label "wfamplicon"
publishDir "${params.out_dir}", mode: 'copy', pattern: "params.json"
cpus 1
memory "2 GB"
output:
path "params.json"
script:
String paramsJSON = new JsonBuilder(params).toPrettyString()
"""
# Output nextflow params object to JSON
echo '$paramsJSON' > params.json
"""
}
process downsampleReads {
label "wfamplicon"
cpus Math.min(params.threads, 3)
memory "2 GB"
input:
tuple val(meta), path("reads.fastq.gz")
val n_reads
output: tuple val(meta), path("downsampled.fastq.gz")
script:
int bgzip_threads = task.cpus == 1 ? 1 : task.cpus - 1
"""
seqkit sample -2 reads.fastq.gz -n $n_reads \
| bgzip -@ $bgzip_threads > downsampled.fastq.gz
"""
}
/*
Instead of downsampling randomly, this process subsets input reads based on length.
Depending on parameters, it drops a fraction (e.g. `0.05` for 5%) of longest reads and
then selects a number of the next-longest reads (if `take_longest_remaining_reads` is
`true`; if `false`, it will randomly downsample the reads after dropping the longest).
Selected reads will always be sorted by length.
*/
process subsetReads {
label "wfamplicon"
cpus 1
memory "2 GB"
input:
tuple val(meta), path("reads.fastq.gz"), path("fastcat-stats")
val drop_longest_frac
val take_longest_remaining_reads
val n_reads
output: tuple val(meta), path("subset.fastq.gz")
script:
String take_longest_remaining_arg = ""
if (take_longest_remaining_reads) {
take_longest_remaining_arg = "--take-longest-remaining"
}
"""
echo $meta.alias # sometimes useful for debugging
workflow-glue subset_reads \
fastcat-stats/per-read-stats.tsv.gz \
$drop_longest_frac \
$n_reads \
$take_longest_remaining_arg \
> target_ids.txt
# only attempt to subset reads if neither the FASTQ or the file with target IDs is
# empty (`samtools fqidx` would fail otherwise)
if [[ ( -n \$(zcat reads.fastq.gz | head -c1) ) && ( -s target_ids.txt ) ]]; then
samtools fqidx reads.fastq.gz -r target_ids.txt | bgzip > subset.fastq.gz
else
echo -n | bgzip > subset.fastq.gz
fi
"""
}
process porechop {
label "wfamplicon"
cpus Math.min(params.threads, 5)
memory {
// `porechop` quite annoyingly loads the whole FASTQ into memory
// (https://github.com/rrwick/Porechop/issues/77) and uses up to 4x the size of
// the `.fastq.gz` file (depending on compression ratio); we give another factor
// of 2 for extra margin to be on the safe side
def fastq_size = fastq.size()
fastq_size > 2e9 ? "32 GB" : (fastq_size > 2.5e8 ? "16 GB" : "2 GB")
}
input: tuple val(meta), path(fastq, stageAs: "reads.fastq.gz")
output:
tuple val(meta), path("porechopped.fastq.gz"), emit: seqs
tuple val(meta), path("porechopped-per-file-stats.tsv"), emit: stats
script:
int porechop_threads = Math.max(task.cpus - 2, 1)
// run fastcat on porechopped reads so that we can include the post-trimming stats
// in the report
"""
(
# only run porechop if the FASTQ isn't empty
if [[ -n \$(zcat reads.fastq.gz | head -c1) ]]; then
porechop -i reads.fastq.gz -t $porechop_threads --discard_middle
else
echo -n
fi
) \
| fastcat /dev/stdin -s $meta.alias -f porechopped-per-file-stats.tsv \
| bgzip > porechopped.fastq.gz
"""
}
process collectFilesInDir {
label "wfamplicon"
cpus 1
memory "2 GB"
input: tuple path("staging_dir/*"), val(dirname)
output: path(dirname)
script:
"""
mv staging_dir $dirname
"""
}
process concatTSVs {
label "wfamplicon"
cpus 1
memory "2 GB"
input:
tuple val(meta), path("input/f*.tsv")
val fname
output: tuple val(meta), path(fname)
script:
"""
csvtk concat -t input/* > $fname
"""
}
process makeReport {
label "wfamplicon"
publishDir "${params.out_dir}", mode: 'copy', pattern: "wf-amplicon-report.html"
cpus 1
memory "8 GB"
input:
path "data/*"
path ref, stageAs: "ref/*"
path sample_sheet, stageAs: "sample_sheet/*"
path "versions.txt"
path "params.json"
output:
path "wf-amplicon-report.html"
script:
// we need to check `.fileName.name` here because `TaskPath.name` includes the
// staging directory (https://github.com/nextflow-io/nextflow/issues/3574)
String sample_sheet_arg = sample_sheet.fileName.name == OPTIONAL_FILE.name ? "" : \
"--sample-sheet \"$sample_sheet\""
String ref_arg = ref.fileName.name == OPTIONAL_FILE.name ? "" : \
"--reference \"$ref\""
"""
workflow-glue report \
--report-fname wf-amplicon-report.html \
--data data \
$ref_arg \
$sample_sheet_arg \
--downsampling-size $params.reads_downsampling_size \
--versions versions.txt \
--params params.json \
--wf-version $workflow.manifest.version
"""
}
// process to combine all de-novo consensus seqs into a single FASTA file for use as
// reference in the IGV config
process catFastqIntoFasta {
label "wfamplicon"
cpus params.threads
cpus 1
memory "2 GB"
input: path "cons_seqs/seq*.fastq"
output:
path("all-consensus-seqs.fasta"), emit: fasta
path("all-consensus-seqs.fasta.fai"), emit: index
script:
"""
cat cons_seqs/* | seqkit fq2fa > all-consensus-seqs.fasta
samtools faidx all-consensus-seqs.fasta
"""
}
// See https://github.com/nextflow-io/nextflow/issues/1636. This is the only way to
// publish files from a workflow whilst decoupling the publish from the process steps.
// The process takes a tuple containing the filename and the name of a sub-directory to
// put the file into. If the latter is `null`, puts it into the top-level directory.
process publish {
// publish inputs to output directory
label "wfamplicon"
cpus 1
memory "2 GB"
publishDir (
params.out_dir,
mode: "copy",
saveAs: { dirname ? "$dirname/$fname" : fname }
)
input:
tuple path(fname), val(dirname)
output:
path fname
"""
"""
}
// workflow module
workflow pipeline {
take:
ch_reads
main:
def software_versions = getVersions() | addMedakaToVersionsFile
def workflow_params = getParams()
// Make channel to hold the results required for the report. It will have the
// shape `[meta, file(s)]` where the second item can be a list of files or an
// individual file.
ch_results_for_report = Channel.empty()
// Make a channel for the files to be published. It will look like `[file, name
// of sub-dir to be published in | null]` (when `null`, the file will be
// published in the top level output directory).
ch_to_publish = Channel.empty()
// drop elements in `ch_reads` that only contain a metamap (this occurs when
// there was an entry in a sample sheet but no corresponding barcode dir)
ch_reads = ch_reads.filter { meta, reads, stats_dir -> reads }
// warn the user if overriding the basecall models found in the inputs
if (params.override_basecaller_cfg) {
log.warn \
"Overriding basecall model with '${params.override_basecaller_cfg}'."
}
// add fastcat stats of raw reads to channel with results for report
ch_results_for_report = ch_reads
| map { meta, reads, stats_dir -> [
meta,
*file(stats_dir.resolve("*.hist")),
file(stats_dir.resolve("per-file-stats.tsv")),
] }
// either subset reads (i.e. drop the longest and then take the next longest) or
// downsample naively or take all reads
if (params.drop_frac_longest_reads || params.take_longest_remaining_reads) {
// subset reads
ch_reads = subsetReads(
ch_reads,
params.drop_frac_longest_reads,
params.take_longest_remaining_reads,
params.reads_downsampling_size,
)
} else if (params.reads_downsampling_size) {
// downsample reads
ch_reads = downsampleReads(
ch_reads | map { meta, reads, stats -> [meta, reads] },
params.reads_downsampling_size
)
} else {
// take all reads
ch_reads = ch_reads.map { meta, reads, stats_dir -> [meta, reads] }
}
// trim adapters
ch_reads = porechop(ch_reads).seqs
// add the post-trim stats to the report channel
ch_results_for_report = ch_results_for_report.join(porechop.out.stats)
// parse the post-porechop fastcat per-file stats to get the number of reads
// after filtering for each sample (`splitCsv` actually works on lists as well;
// it will emit `[meta, row]` for each row in the CSV, but in this case there is
// only one row in each CSV file anyway)
ch_post_filter_n_reads = porechop.out.stats
.splitCsv(sep: "\t", header: true)
.map { meta, stats -> [meta, stats["n_seqs"] as int] }
// make sure that there are reads left after pre-processing and print a warning
// otherwise
ch_post_filter_n_reads
.collect { meta, n_seqs -> n_seqs }
// sum up the `n_seqs` column and print warning if the total is `0`
.map { if (it.sum() == 0) {
log.warn "No reads left after pre-processing; report will only show " +
"limited results. Perhaps consider relaxing the filtering " +
"criteria (`--min_read_qual` etc.)."
}
}
// drop samples with fewer than `params.min_n_reads` reads after filtering
ch_reads = ch_reads
| join(ch_post_filter_n_reads)
| filter { meta, reads, n_seqs -> n_seqs >= params.min_n_reads }
| map { meta, reads, n_seqs -> [meta, reads] }
def ref = OPTIONAL_FILE
def ref_idx = OPTIONAL_FILE
if (params.reference) {
// variant calling mode; make sure the ref file exists and run the variant
// calling pipeline
ref = file(params.reference, checkIfExists: true)
variantCallingPipeline(ch_reads, ref)
// add variant calling results to channels for the report + to publish
ch_results_for_report = ch_results_for_report
| join(variantCallingPipeline.out.mapping_stats, remainder: true)
| join(variantCallingPipeline.out.depth, remainder: true)
| join(variantCallingPipeline.out.variants, remainder: true)
// drop any `null`s from the joined list
| map { it - null }
// set `ref` to the sanitised reference
ref = variantCallingPipeline.out.sanitized_ref
ref_idx = variantCallingPipeline.out.sanitized_ref_index
ch_to_publish = ch_to_publish
| mix(
variantCallingPipeline.out.sanitized_ref | map { [it, null] },
variantCallingPipeline.out.sanitized_ref_index | map { [it, null] },
variantCallingPipeline.out.variants
| map { meta, vcf, idx -> [[vcf, idx], "$meta.alias/variants"] }
| transpose,
variantCallingPipeline.out.mapped
| map { meta, bam, bai -> [[bam, bai], "$meta.alias/alignments"] }
| transpose,
variantCallingPipeline.out.consensus
| map { meta, cons -> [cons, "$meta.alias/consensus"] },
)
// combine VCF and BAM files if requested
if (params.combine_results) {
ch_to_publish = ch_to_publish
| mix(
variantCallingPipeline.out.combined_vcfs | map { [it, null] }
| transpose,
variantCallingPipeline.out.combined_bams | map { [it, null] }
| transpose,
)
}
} else {
// de-novo consensus mode: run pipeline for de-novo consensus; we try
// `miniasm` first and then run `spoa` on all samples that failed
deNovoPipeline_asm(ch_reads, "miniasm")
deNovoPipeline_spoa(
deNovoPipeline_asm.out.metas_failed
// use `combine` instead of `join` here because `join` with `remainder:
// true` does not work as expected if one channel contains only the key
// (as is the case with the `metas_failed` channel here)
| combine(ch_reads, by: 0),
"spoa",
)
// combine the results for `miniasm` and `spoa`
ch_de_novo_results = deNovoPipeline_asm.out.passed
| mix(deNovoPipeline_spoa.out.passed)
| multiMap { meta, cons, bam, bai, bamstats, flagstat, depth ->
consensus: [meta, cons]
mapped: [meta, bam, bai]
for_report: [meta, bamstats, flagstat, depth]
}
// get the QC summary TSVs and concat them for each sample (if the `miniasm`
// and `spoa` pipelines were run for a sample, it will have two QC summary
// TSVs)
ch_concat_qc_summaries = concatTSVs(
deNovoPipeline_asm.out.qc_summaries
| mix(deNovoPipeline_spoa.out.qc_summaries)
| groupTuple,
"qc-summary.tsv"
)
// create a FASTA file containing the consensus seqs for all samples (this
// is needed as reference for IGV)
all_de_novo_cons_seqs = ch_de_novo_results.consensus
// use `collect` instead of `toSortedList` here as `collect` returns an
// empty channel if there are no elements in the input channel
| collect(flat: false)
| map { list -> list.sort { it[0].alias }.collect { it[1] } }
| catFastqIntoFasta
// add de-novo results to main results channel
ch_results_for_report = ch_results_for_report
| join(ch_de_novo_results.for_report, remainder: true)
| join(ch_concat_qc_summaries, remainder: true)
// drop any `null`s from the joined list
| map { it -> it.findAll { it } }
// add the consensus and alignments (re-aligned against the consensus) to
// the output channel for publishing
ch_to_publish = ch_to_publish
| mix(
ch_de_novo_results.consensus
| map { meta, cons -> [cons, "$meta.alias/consensus"] },
ch_de_novo_results.mapped
| map { meta, bam, bai -> [[bam, bai], "$meta.alias/alignments"] }
| transpose,
all_de_novo_cons_seqs.fasta | map { [it, null] },
all_de_novo_cons_seqs.index | map { [it, null] },
)
}
// collect files for report in directories (one dir per sample)
ch_results_for_report = ch_results_for_report
| map {
meta = it[0]
files = it[1..-1]
[files, meta.alias]
}
| collectFilesInDir
// create the report and add it to the publishing channel
makeReport(
ch_results_for_report | collect,
ref,
file(params.sample_sheet ?: OPTIONAL_FILE),
software_versions,
workflow_params,
)
if (params.igv) {
// create the IGV config file and add it to the publishing channel
igv_fnames = ch_to_publish
| map { path, dirname ->
// get a string of the relative path in the output directory after
// publishing the output files
String published_path_str = "${dirname ? "$dirname/" : ""}$path.name"
if (dirname) {
String sample_name = dirname.tokenize( '/' )[0]
// the BAM + VCF files are published in per-sample sub-directories
if (path.name.endsWith(".bam") || path.name.endsWith(".vcf.gz")) {
"$sample_name,$published_path_str"
}
} else {
// the ref file and index are in the top-level output directory
if (path.name.endsWith(".fasta") || path.name.endsWith(".fasta.fai")) {
published_path_str
}
}
}
| collectFile(sort: true, newLine: true)
configure_igv(
igv_fnames,
Channel.of(null),
[displayMode: "SQUISHED", colorBy: "strand"],
Channel.of(null),
)
}
emit:
combined_results_to_publish = ch_to_publish
}
// entrypoint workflow
WorkflowMain.initialise(workflow, params, log)
workflow {
Pinguscript.ping_start(nextflow, workflow, params)
ArrayList fastcat_extra_args = []
if (params.min_read_length) { fastcat_extra_args << "-a $params.min_read_length" }
if (params.max_read_length) { fastcat_extra_args << "-b $params.max_read_length" }
if (params.min_read_qual) { fastcat_extra_args << "-q $params.min_read_qual" }
samples = fastq_ingress([
"input":params.fastq,
"sample":params.sample,
"sample_sheet":params.sample_sheet,
"analyse_unclassified":params.analyse_unclassified,
"stats": true,
"per_read_stats": true,
"fastcat_extra_args": fastcat_extra_args.join(" "),
"allow_multiple_basecall_models": false,
])
// run workflow
pipeline(samples)
// publish results
pipeline.out.combined_results_to_publish
| toList
| flatMap
| publish
}
workflow.onComplete {
Pinguscript.ping_complete(nextflow, workflow, params)
}
workflow.onError {
Pinguscript.ping_error(nextflow, workflow, params)
}