Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### `Changed`

- [#932](https://github.com/nf-core/mag/pull/932) - Replaced usages of deprecated `Channel()` with `channel()` and fix other LSP warnings (by @dialvarezs)

### `Fixed`

- [#894](https://github.com/nf-core/mag/pull/894) - Fix read order in metaSPAdes to allow co-assembly of paired-end data of multiple samples (reported by @maartenciers, fix by @jfy133 with contributions from @prototaxites, @d4straub and @dialvarezs)
Expand Down
6 changes: 3 additions & 3 deletions subworkflows/local/ancient_dna/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ include { SAMTOOLS_FAIDX as FAIDX } from '../../../modules/nf-core

workflow ANCIENT_DNA_ASSEMBLY_VALIDATION {
take:
ch_input //channel: [val(meta), path(contigs), path(bam), path(bam_index)]
ch_input // [val(meta), path(contigs), path(bam), path(bam_index)]

main:
ch_versions = Channel.empty()
ch_versions = channel.empty()

PYDAMAGE_ANALYZE(
ch_input.map { meta, _contigs, bam, bai ->
Expand All @@ -28,7 +28,7 @@ workflow ANCIENT_DNA_ASSEMBLY_VALIDATION {
ch_versions = ch_versions.mix(PYDAMAGE_FILTER.out.versions)

if (params.skip_ancient_damagecorrection) {
ch_corrected_contigs = Channel.empty()
ch_corrected_contigs = channel.empty()
}

if (!params.skip_ancient_damagecorrection) {
Expand Down
43 changes: 24 additions & 19 deletions subworkflows/local/assembly/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ include { GUNZIP as GUNZIP_LONGREAD_ASSEMBLIES } from '../../../modules/nf-core

workflow ASSEMBLY {
take:
ch_short_reads // [ [meta] , fastq1, fastq2] (mandatory)
ch_long_reads // [ [meta] , fastq] (mandatory)
ch_short_reads // [val(meta), path(fastq1), path(fastq2)] (mandatory)
ch_long_reads // [val(meta), path(fastq)] (mandatory)

main:

ch_versions = Channel.empty()
ch_versions = channel.empty()

/*
================================================================================
Expand All @@ -39,7 +39,7 @@ workflow ASSEMBLY {
sr_platform: metas.sr_platform[0]
]
if (assemble_as_single) {
[meta, reads.collect { file -> file }, []]
[meta, reads.sort { files -> files[0].getName() }, []]
}
else {
[meta] + reads.sort { files -> files[0].getName() }.transpose()
Expand All @@ -63,23 +63,28 @@ workflow ASSEMBLY {
meta.id = "group-${group}"
meta.group = group
meta.lr_platform = metas.lr_platform[0]
[meta, reads.collect { it }]
[meta, reads]
Comment thread
jfy133 marked this conversation as resolved.
}
}
else {
ch_short_reads_grouped = ch_short_reads
.filter { it[0].single_end }
.map { meta, reads -> [meta, [reads], []] }
.mix(
ch_short_reads.filter { !it[0].single_end }.map { meta, reads -> [meta, [reads[0]], [reads[1]]] }
Comment thread
dialvarezs marked this conversation as resolved.
)
ch_short_reads_grouped = ch_short_reads.map { meta, reads ->
if (meta.single_end) {
[meta, [reads], []]
}
else {
[meta, [reads[0]], [reads[1]]]
}
}

ch_long_reads_grouped = ch_long_reads
}

if (!params.skip_spades || !params.skip_spadeshybrid) {
if (params.coassemble_group) {
if (params.bbnorm) {
ch_short_reads_spades = ch_short_reads_grouped.map { [it[0], it[1]] }
// When doing co-assembly and using bbnorm, all sample reads get pooled in a single file
// That's why we can drop R2 here (it's empty)
ch_short_reads_spades = ch_short_reads_grouped.map { meta, r1, _r2 -> [meta, r1] }
Comment thread
dialvarezs marked this conversation as resolved.
}
else {
POOL_SHORT_READS(ch_short_reads_grouped_for_pooling)
Expand All @@ -95,21 +100,21 @@ workflow ASSEMBLY {

ch_long_reads_grouped_for_pool = ch_long_reads_grouped
.map { meta, reads -> [meta.id, meta, reads] }
.combine(ch_short_reads_grouped.map { meta, _reads1, _reads2 -> [meta.id, meta] }, by: 0)
.map { [it[1], it[2]] }
.combine(ch_short_reads_grouped.map { meta, _reads1, _reads2 -> [meta.id] }, by: 0)
.map { _id, lr_meta, lr_reads -> [lr_meta, lr_reads] }
//make sure no long reads are pooled for spades if there are no short reads

POOL_LONG_READS(ch_long_reads_grouped_for_pool)
ch_versions = ch_versions.mix(POOL_LONG_READS.out.versions)
ch_long_reads_spades = POOL_LONG_READS.out.reads
}
else {
ch_long_reads_spades = Channel.empty()
ch_long_reads_spades = channel.empty()
}
}
else {
ch_short_reads_spades = Channel.empty()
ch_long_reads_spades = Channel.empty()
ch_short_reads_spades = channel.empty()
ch_long_reads_spades = channel.empty()
}

/*
Expand All @@ -118,8 +123,8 @@ workflow ASSEMBLY {
================================================================================
*/

ch_shortread_assembled_contigs = Channel.empty()
ch_longread_assembled_contigs = Channel.empty()
ch_shortread_assembled_contigs = channel.empty()
ch_longread_assembled_contigs = channel.empty()

// SHORTREAD ASSEMBLY
SHORTREAD_ASSEMBLY(
Expand Down
8 changes: 4 additions & 4 deletions subworkflows/local/assembly_hybrid/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ include { SPADES as METASPADESHYBRID } from '../../../modules/nf-core/spades/mai

workflow HYBRID_ASSEMBLY {
take:
ch_short_reads_spades // [ [meta] , fastq1, fastq2] (mandatory)
ch_long_reads_spades // [ [meta] , fastq] (mandatory)
ch_short_reads_spades // [val(meta), path(fastq1), path(fastq2)] (mandatory)
ch_long_reads_spades // [val(meta), path(fastq)] (mandatory)

main:

ch_versions = Channel.empty()
ch_assembled_contigs = Channel.empty()
ch_versions = channel.empty()
ch_assembled_contigs = channel.empty()

if (!params.single_end && !params.skip_spadeshybrid) {
ch_short_reads_spades_tmp = ch_short_reads_spades.map { meta, reads -> [meta.id, meta, reads] }
Expand Down
6 changes: 3 additions & 3 deletions subworkflows/local/assembly_longread/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ include { METAMDBG_ASM } from '../../../modules/nf-core/metamdbg/asm/main'

workflow LONGREAD_ASSEMBLY {
take:
ch_long_reads // [ [meta] , fastq] (mandatory)
ch_long_reads // [val(meta), path(fastq)] (mandatory)

main:
ch_assembled_contigs = Channel.empty()
ch_versions = Channel.empty()
ch_assembled_contigs = channel.empty()
ch_versions = channel.empty()

if (!params.skip_flye) {

Expand Down
8 changes: 4 additions & 4 deletions subworkflows/local/assembly_shortread/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ include { SPADES as METASPADES } from '../../../modules/nf-core/spades/main'

workflow SHORTREAD_ASSEMBLY {
take:
ch_short_reads_grouped // [ [meta] , fastq1, fastq2] (mandatory)
ch_short_reads_spades
ch_short_reads_grouped // [val(meta), path(fastq1), path(fastq2)] (mandatory)
ch_short_reads_spades // [val(meta), path(fastq1)] (mandatory)

main:
ch_versions = Channel.empty()
ch_assembled_contigs = Channel.empty()
ch_versions = channel.empty()
ch_assembled_contigs = channel.empty()

if (!params.single_end && !params.skip_spades) {
METASPADES(ch_short_reads_spades.map { meta, reads -> [meta, reads, [], []] }, [], [])
Expand Down
56 changes: 37 additions & 19 deletions subworkflows/local/bin_qc/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ include { UNTAR as CHECKM_UNTAR } from '../../../modules/nf-core/un

workflow BIN_QC {
take:
ch_bins // [ [ meta] , fasta ], input bins (mandatory)
ch_bins // [val(meta), path(fasta)], input bins (mandatory)

main:
ch_qc_summaries = Channel.empty()
ch_qc_summaries = channel.empty()
ch_input_bins_for_qc = ch_bins.transpose()
ch_versions = Channel.empty()
ch_multiqc_files = Channel.empty()
ch_busco_final_summaries = Channel.empty()
ch_checkm_final_summaries = Channel.empty()
ch_checkm2_final_summaries = Channel.empty()
ch_versions = channel.empty()
ch_multiqc_files = channel.empty()
ch_busco_final_summaries = channel.empty()
ch_checkm_final_summaries = channel.empty()
ch_checkm2_final_summaries = channel.empty()


/*
Expand All @@ -53,7 +53,7 @@ workflow BIN_QC {
CHECKM_UNTAR(ch_checkm_db)
ch_versions = ch_versions.mix(CHECKM_UNTAR.out.versions)

ch_checkm_db = CHECKM_UNTAR.out.untar.map { it[1] }
ch_checkm_db = CHECKM_UNTAR.out.untar.map { _meta, db -> db }
}
else {
ch_checkm_db = []
Expand All @@ -75,7 +75,7 @@ workflow BIN_QC {
ch_gunc_db = file(params.gunc_db, checkIfExists: true)
}
else {
ch_gunc_db = Channel.empty()
ch_gunc_db = channel.empty()
}

/*
Expand All @@ -93,7 +93,7 @@ workflow BIN_QC {
if (ch_busco_db && ch_busco_db.extension in ['gz', 'tgz']) {
BUSCO_UNTAR([[id: ch_busco_db.getSimpleName()], ch_busco_db])
ch_versions = ch_versions.mix(BUSCO_UNTAR.out.versions)
ch_busco_db = BUSCO_UNTAR.out.untar.map { it[1] }
ch_busco_db = BUSCO_UNTAR.out.untar.map { _meta, db -> db }
}
else if (ch_busco_db && ch_busco_db.isDirectory()) {
ch_busco_db = ch_busco_db
Expand All @@ -109,13 +109,19 @@ workflow BIN_QC {
.map { _meta, summary -> [[id: 'busco'], summary] }
.groupTuple()
ch_multiqc_files = ch_multiqc_files.mix(
BUSCO_BUSCO.out.short_summaries_txt.map { it[1] }.flatten()
BUSCO_BUSCO.out.short_summaries_txt.map { _meta, summary -> summary }.flatten()
)

CONCAT_BUSCO_TSV(ch_busco_summaries, 'tsv', 'tsv')
ch_versions = ch_versions.mix(CONCAT_BUSCO_TSV.out.versions)
ch_busco_final_summaries = ch_busco_final_summaries.mix(CONCAT_BUSCO_TSV.out.csv.map { it[1] })
ch_qc_summaries = ch_qc_summaries.mix(CONCAT_BUSCO_TSV.out.csv.splitCsv(header: true, sep: '\t').map { _meta, summary -> [bin_qc_tool: 'busco'] + summary })
ch_busco_final_summaries = ch_busco_final_summaries.mix(
CONCAT_BUSCO_TSV.out.csv.map { _meta, csv -> csv }
)
ch_qc_summaries = ch_qc_summaries.mix(
CONCAT_BUSCO_TSV.out.csv
.splitCsv(header: true, sep: '\t')
.map { _meta, summary -> [bin_qc_tool: 'busco'] + summary }
)
}
if (params.run_checkm) {
/*
Expand Down Expand Up @@ -147,13 +153,19 @@ workflow BIN_QC {
.map { _meta, summary -> [[id: 'checkm'], summary] }
.groupTuple()
ch_multiqc_files = ch_multiqc_files.mix(
CHECKM_QA.out.output.map { it[1] }.flatten()
CHECKM_QA.out.output.map { _meta, summary -> summary }.flatten()
)

CONCAT_CHECKM_TSV(ch_checkm_summaries, 'tsv', 'tsv')
ch_versions = ch_versions.mix(CONCAT_CHECKM_TSV.out.versions)
ch_checkm_final_summaries = ch_checkm_final_summaries.mix(CONCAT_CHECKM_TSV.out.csv.map { it[1] })
ch_qc_summaries = ch_qc_summaries.mix(CONCAT_CHECKM_TSV.out.csv.splitCsv(header: true, sep: '\t').map { _meta, summary -> [bin_qc_tool: 'checkm'] + summary })
ch_checkm_final_summaries = ch_checkm_final_summaries.mix(
CONCAT_CHECKM_TSV.out.csv.map { _meta, csv -> csv }
)
ch_qc_summaries = ch_qc_summaries.mix(
CONCAT_CHECKM_TSV.out.csv
.splitCsv(header: true, sep: '\t')
.map { _meta, summary -> [bin_qc_tool: 'checkm'] + summary }
)
}
if (params.run_checkm2) {
/*
Expand All @@ -166,13 +178,19 @@ workflow BIN_QC {
.map { _meta, summary -> [[id: 'checkm2'], summary] }
.groupTuple()
ch_multiqc_files = ch_multiqc_files.mix(
CHECKM2_PREDICT.out.checkm2_tsv.map { it[1] }.flatten()
CHECKM2_PREDICT.out.checkm2_tsv.map { _meta, summary -> summary }.flatten()
)

CONCAT_CHECKM2_TSV(ch_checkm2_summaries, 'tsv', 'tsv')
ch_versions = ch_versions.mix(CONCAT_CHECKM2_TSV.out.versions)
ch_checkm2_final_summaries = ch_checkm2_final_summaries.mix(CONCAT_CHECKM2_TSV.out.csv.map { it[1] })
ch_qc_summaries = ch_qc_summaries.mix(CONCAT_CHECKM2_TSV.out.csv.splitCsv(header: true, sep: '\t').map { _meta, summary -> [bin_qc_tool: 'checkm2'] + summary })
ch_checkm2_final_summaries = ch_checkm2_final_summaries.mix(
CONCAT_CHECKM2_TSV.out.csv.map { _meta, csv -> csv }
)
ch_qc_summaries = ch_qc_summaries.mix(
CONCAT_CHECKM2_TSV.out.csv
.splitCsv(header: true, sep: '\t')
.map { _meta, summary -> [bin_qc_tool: 'checkm2'] + summary }
)
}

if (params.run_gunc) {
Expand Down
30 changes: 16 additions & 14 deletions subworkflows/local/binning/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ include { SPLIT_FASTA

workflow BINNING {
take:
ch_assemblies // channel: [ val(meta), path(assembly), path(bams), path(bais) ]
val_bin_min_size
val_bin_max_size
ch_assemblies // [val(meta), path(assembly), path(bams), path(bais)]
val_bin_min_size // val(int)
val_bin_max_size // val(int)

main:

ch_versions = Channel.empty()
ch_input_splitfasta = Channel.empty()
ch_versions = channel.empty()
ch_input_splitfasta = channel.empty()

// generate coverage depths for each contig and branch by assembler type
ch_summarizedepth_input = ch_assemblies
.map { meta, _assembly, bams, bais ->
[meta, bams, bais]
}
.branch {
longread: it[0].assembler in ['FLYE', 'METAMDBG']
.branch { meta, _bams, _bais ->
longread: meta.assembler in ['FLYE', 'METAMDBG']
shortread: true
}

Expand All @@ -47,7 +47,9 @@ workflow BINNING {
ch_versions = ch_versions.mix(METABAT2_JGISUMMARIZEBAMCONTIGDEPTHS_SHORTREAD.out.versions)

// Merge the outputs
ch_combined_depths = METABAT2_JGISUMMARIZEBAMCONTIGDEPTHS_LONGREAD.out.depth.mix(METABAT2_JGISUMMARIZEBAMCONTIGDEPTHS_SHORTREAD.out.depth)
ch_combined_depths = METABAT2_JGISUMMARIZEBAMCONTIGDEPTHS_LONGREAD.out.depth.mix(
METABAT2_JGISUMMARIZEBAMCONTIGDEPTHS_SHORTREAD.out.depth
)
ch_metabat_depths = ch_combined_depths.map { meta, depths ->
def meta_new = meta + [binner: 'MetaBAT2']
[meta_new, depths]
Expand Down Expand Up @@ -76,10 +78,10 @@ workflow BINNING {
}

// main bins for decompressing for MAG_DEPTHS
ch_bins_for_seqkit = Channel.empty()
ch_bins_for_seqkit = channel.empty()

// final gzipped bins
ch_binning_results_gzipped_final = Channel.empty()
ch_binning_results_gzipped_final = channel.empty()

// MetaBAT2
if (!params.skip_metabat2) {
Expand Down Expand Up @@ -128,7 +130,7 @@ workflow BINNING {
// COMEBin
if (!params.skip_comebin) {
ch_comebin_input = ch_assemblies
.map { meta, assembly, bams, bais ->
.map { meta, assembly, bams, _bais ->
def meta_new = meta + [binner: 'COMEBin']
[meta_new, assembly, bams]
}
Expand Down Expand Up @@ -179,12 +181,12 @@ workflow BINNING {
// filtering is 0. Error if so, but only if we had bins to begin with.
//
ch_seqkitstats_results
.map { meta, stats -> stats.bin_total_length }
.map { _meta, stats -> stats.bin_total_length }
.collect().ifEmpty([])
.subscribe { stats ->
def n_bins = stats.size()
def n_filtered_bins = stats.findAll {
it >= val_bin_min_size && (val_bin_max_size ? it <= val_bin_max_size : true)
def n_filtered_bins = stats.findAll { bin_size ->
bin_size >= val_bin_min_size && (val_bin_max_size ? bin_size <= val_bin_max_size : true)
}.size()
if (n_bins > 0 && n_filtered_bins == 0) {
error(
Expand Down
Loading