Workflows
In Nextflow, a workflow is a specialized function for composing processes and dataflow logic:
An entry workflow is the entrypoint of a pipeline. It can take parameters as inputs using the
paramsblock, and it can publish outputs using theoutputblock.A named workflow is a workflow that can be called by other workflows. It can define its own inputs and outputs, which are called takes and emits.
Both entry workflows and named workflows can contain dataflow logic such as calling processes, workflows, and channel operators.
Entry workflow
A script can define up to one entry workflow, which does not have a name and serves as the entrypoint of the script:
workflow {
channel.of('Bonjour', 'Ciao', 'Hello', 'Hola')
.map { v -> "$v world!" }
.view()
}
Parameters
Parameters can be declared by assigning a params property to a default value:
params.input = '/some/data/file'
params.save_intermeds = false
workflow {
if( params.input )
analyze(params.input, params.save_intermeds)
else
analyze(fake_input(), params.save_intermeds)
}
The default value can be overridden by the command line, params file, or config file. Parameters from multiple sources are resolved in the order described in Pipeline parameters.
Outputs
New in version 25.10.0: Workflow outputs are available as a preview in Nextflow 24.04, 24.10, and 25.04.
Note
Workflow outputs are intended to replace the publishDir directive. See Migrating to workflow outputs for guidance on migrating from publishDir to workflow outputs.
A script can define an output block to declare the top-level workflow outputs. Each output should be assigned in the publish section of the entry workflow. Any channel in the workflow can be assigned to an output, including process and subworkflow outputs.
Example:
process fetch {
// ...
output:
path 'sample.txt'
// ...
}
workflow {
main:
ch_samples = fetch(params.input)
publish:
samples = ch_samples
}
output {
samples {
path '.'
}
}
In the above example, the output of process fetch is assigned to the samples workflow output. How this output is published to a directory structure is described in the next section.
Publishing files
Each workflow output can define how files are published from the work directory to a designated output directory.
Output directory
You can set the top-level output directory for a run using the -output-dir command-line option or the outputDir config option:
nextflow run main.nf -output-dir 'my-results'
// nextflow.config
outputDir = 'my-results'
The default output directory is results in the launch directory.
Publish path
By default, Nextflow publishes all output files to the output directory. Each workflow output can define where to publish files within the output directory using the path directive:
workflow {
main:
ch_step1 = step1()
ch_step2 = step2(ch_step1)
publish:
step1 = ch_step1
step2 = ch_step2
}
output {
step1 {
path 'step1'
}
step2 {
path 'step2'
}
}
The following directory structure is created:
results/
└── step1/
└── ...
└── step2/
└── ...
Nextflow publishes all files received by an output into the specified directory. Nextflow recursively scans lists, maps, and tuples for nested files:
workflow {
main:
ch_samples = channel.of(
tuple( [id: 'SAMP1'], [ file('1.txt'), file('2.txt') ] )
)
publish:
samples = ch_samples // 1.txt and 2.txt are published
}
Note
Files that do not originate from the work directory are not published.
Dynamic publish path
The path directive can also be a closure which defines a custom publish path for each channel value:
workflow {
main:
ch_samples = channel.of(
[id: 'SAMP1', fastq_1: file('1.fastq'), fastq_2: file('2.fastq')]
)
publish:
samples = ch_samples
}
output {
samples {
path { sample -> "fastq/${sample.id}/" }
}
}
The above example publishes each channel value to a different subdirectory. In this case, each pair of FASTQ files is published into a subdirectory based on the sample ID.
Alternatively, you can define a different path for each individual file using the >> operator:
output {
samples {
path { sample ->
sample.fastq_1 >> "fastq/${sample.id}/"
sample.fastq_2 >> "fastq/${sample.id}/"
}
}
}
Each >> specifies a source file and publish target. The source file should be a file or collection of files, and the publish target should be a directory or file name. If the publish target ends with a slash, Nextflow treats it as the directory in which to publish source files.
When using this syntax, only files captured with the >> operator are saved to the output directory.
Conditional publishing
Outputs can be conditionally published using pipeline parameters:
output {
samples {
path { sample ->
sample.fastqc >> "fastqc/"
sample.bam >> (params.save_bams ? "align/" : null)
}
}
}
In the above example, the BAM files specified by sample.bam are published only when params.save_bams is true.
Index files
Index files are structured metadata files that catalog published outputs and their associated metadata. An index file preserves the structure of channel values, including metadata, which is more robust than encoding this information into file paths. The index file can be a CSV (.csv), JSON (.json), or YAML (.yml, .yaml) file. The channel values should be files, lists, maps, or tuples.
Each output can create an index file of its published values:
workflow {
main:
ch_samples = channel.of(
[id: 1, name: 'sample 1', fastq_1: '1a.fastq', fastq_2: '1b.fastq'],
[id: 2, name: 'sample 2', fastq_1: '2a.fastq', fastq_2: '2b.fastq'],
[id: 3, name: 'sample 3', fastq_1: '3a.fastq', fastq_2: null]
)
publish:
samples = ch_samples
}
output {
samples {
path 'fastq'
index {
path 'samples.csv'
}
}
}
The above example writes the following CSV file to results/samples.csv:
"1","sample 1","results/fastq/1a.fastq","results/fastq/1b.fastq"
"2","sample 2","results/fastq/2a.fastq","results/fastq/2b.fastq"
"3","sample 3","results/fastq/3a.fastq",""
You can customize the index file with additional directives, for example:
index {
path 'samples.csv'
header true
sep '|'
}
This example produces the following index file:
"id"|"name"|"fastq_1"|"fastq_2"
"1"|"sample 1"|"results/fastq/1a.fastq"|"results/fastq/1b.fastq"
"2"|"sample 2"|"results/fastq/2a.fastq"|"results/fastq/2b.fastq"
"3"|"sample 3"|"results/fastq/3a.fastq"|""
Note
Files that do not originate from the work directory are not published, but are included in the index file.
See Output directives for the list of available index directives.
Labels
You can apply labels to each workflow output using the label directive:
output {
multiqc_report {
label 'qc'
label 'summary'
}
}
Labels can be used to find and filter output files across workflow runs with data lineage. See Use lineage with workflow outputs for details on how to query output files by label.
Output directives
The following directives are available for each output in the output block:
indexCreate an index file containing a record of each published value.
The following directives are available in an index definition:
headerWhen
true, the keys of the first record are used as the column names (default:false). Can also be a list of column names. Only used for CSV files.pathThe name of the index file relative to the base output directory (required). Can be a CSV, JSON, or YAML file.
sepThe character used to separate values (default:
','). Only used for CSV files.
labelAttach a label to every file published by this output. Can be specified multiple times to attach multiple labels.
Labels are stored in the
labelsfield ofFileOutputrecords in the lineage store.pathSpecify the publish path relative to the output directory (default:
'.'). Can be a path, a closure that defines a custom directory for each published value, or a closure that publishes individual files using the>>operator.
Additionally, the following options from the workflow config scope can be specified as directives:
contentTypeenabledignoreErrorsmodeoverwritestorageClasstags
For example:
output {
samples {
mode 'copy'
}
}
Named workflows
A named workflow is a workflow that can be called by other workflows:
workflow my_workflow {
hello()
bye( hello.out.collect() )
}
workflow {
my_workflow()
}
The above example defines a workflow named my_workflow which is called by the entry workflow. Both hello and bye could be any other process or workflow.
Takes and emits
The take: section declares the inputs of a named workflow:
workflow my_workflow {
take:
data1
data2
main:
hello(data1, data2)
bye(hello.out)
}
Inputs can be specified like arguments when calling the workflow:
workflow {
my_workflow( channel.of('/some/data') )
}
The emit: section declares the outputs of a named workflow:
workflow my_workflow {
main:
hello(data)
bye(hello.out)
emit:
bye.out
}
When calling the workflow, the output can be accessed using the out property, i.e. my_workflow.out.
If an output is assigned to a name, the name can be used to reference the output from the calling workflow. For example:
workflow my_workflow {
main:
hello(data)
bye(hello.out)
emit:
my_data = bye.out
}
The result of the above workflow can be accessed using my_workflow.out.my_data.
Note
Every output must be assigned to a name when multiple outputs are declared.
Dataflow
Workflows consist of dataflow logic, in which processes are connected to each other through dataflow channels and dataflow values.
Channels
A dataflow channel (or simply channel) is an asynchronous sequence of values.
The values in a channel cannot be accessed directly, but only through an operator or process. For example:
channel.of(1, 2, 3).view { v -> "channel emits ${v}" }
channel emits 1
channel emits 2
channel emits 3
Factories
A channel can be created by factories in the channel namespace. For example, the channel.fromPath() factory creates a channel from a file name or glob pattern, similar to the files() function:
channel.fromPath('input/*.txt').view()
See Channel factories for the full list of channel factories.
Operators
Channel operators, or operators for short, are functions that consume and produce channels. Because channels are asynchronous, operators are necessary to manipulate the values in a channel. Operators are particularly useful for implementing glue logic between processes.
Commonly used operators include:
collect: collect the channel values into a collection
combine: emit the combinations of two channels
filter: emit only the channel values that satisfy a condition
flatMap: emit multiple values for each channel value with a closure
groupTuple: group the channel values based on a grouping key
join: join the values from two channels based on a matching key
map: transform each channel value with a mapping function
mix: emit the values from multiple channels
view: print each channel value to standard output
See Operators (legacy) for the full set of operators.
Values
A dataflow value is an asynchronous value.
Dataflow values can be created using the channel.value factory, and they are created by processes (under certain conditions).
A dataflow value cannot be accessed directly, but only through an operator or process. For example:
channel.value(1).view { v -> "dataflow value is ${v}" }
dataflow value is 1
See Value<V> for the set of available methods for dataflow values.
Calling processes and workflows
Processes and workflows are called like functions, passing their inputs as arguments:
process hello {
output:
path 'hello.txt', emit: txt
script:
"""
your_command > hello.txt
"""
}
process bye {
input:
path 'hello.txt'
output:
path 'bye.txt', emit: txt
script:
"""
another_command hello.txt > bye.txt
"""
}
workflow hello_bye {
take:
data
main:
hello()
bye(data)
}
workflow {
data = channel.fromPath('/some/path/*.txt')
hello_bye(data)
}
Processes and workflows have a few extra rules for how they can be called:
Processes and workflows can only be called by workflows
A given process or workflow can only be called once in a given workflow. To use a process or workflow multiple times in the same workflow, use Module aliases.
The “return value” of a process or workflow call is the process outputs or workflow emits, respectively. The return value can be assigned to a variable or passed into another call:
workflow hello_bye {
take:
data
main:
bye_out = bye(hello(data))
emit:
bye_out
}
workflow {
data = channel.fromPath('/some/path/*.txt')
bye_out = hello_bye(data)
}
Named outputs can be accessed as properties of the return value:
workflow hello_bye {
take:
data
main:
hello_out = hello(data)
bye_out = bye(hello_out.txt)
emit:
bye = bye_out.txt
}
workflow {
data = channel.fromPath('/some/path/*.txt')
flow_out = hello_bye(data)
bye_out = flow_out.bye
}
As a convenience, process and workflow outputs can also be accessed without first assigning to a variable, by using the .out property of the process or workflow name:
workflow hello_bye {
take:
data
main:
hello(data)
bye(hello.out)
emit:
bye = bye.out
}
workflow {
data = channel.fromPath('/some/path/*.txt')
hello_bye(data)
hello_bye.out.bye.view()
}
Note
Process named outputs are defined using the emit option on a process output. See naming process outputs for more information.
Note
Process and workflow outputs can also be accessed by index (e.g., hello.out[0], hello.out[1], etc.). As a best practice, multiple outputs should be accessed by name.
Workflows can be composed in the same way:
workflow flow1 {
take:
data
main:
tick(data)
tack(tick.out)
emit:
tack.out
}
workflow flow2 {
take:
data
main:
tick(data)
tock(tick.out)
emit:
tock.out
}
workflow {
data = channel.fromPath('/some/path/*.txt')
flow1(data)
flow2(flow1.out)
}
Note
The same process can be called in different workflows without using an alias, like tick in the above example, which is used in both flow1 and flow2. The workflow call stack determines the fully qualified process name, which is used to distinguish the different process calls, i.e. flow1:tick and flow2:tick in the above example.
Tip
The fully qualified process name can be used as a process selector in a Nextflow configuration file, and it takes priority over the simple process name.
Special operators
The following operators have a special meaning when used in a workflow with process and workflow calls.
Note
As a best practice, avoid these operators when type checking is enabled. Using these operators will prevent the type checker from validating your code.
Pipe |
The | pipe operator can be used to chain processes, operators, and workflows:
process greet {
input:
val data
output:
val result
exec:
result = "$data world"
}
workflow {
channel.of('Hello', 'Hola', 'Ciao')
| greet
| map { v -> v.toUpperCase() }
| view
}
The above snippet defines a process named greet and invokes it with the input channel. The result is then piped to the map operator, which converts each string to uppercase, and finally to the view operator which prints it.
The same code can also be written as:
workflow {
ch_input = channel.of('Hello', 'Hola', 'Ciao')
ch_greet = greet(ch_input)
ch_greet
.map { v -> v.toUpperCase() }
.view()
}
And &
The & and operator can be used to call multiple processes in parallel with the same channel(s):
process greet {
input:
val data
output:
val result
exec:
result = "$data world"
}
process to_upper {
input:
val data
output:
val result
exec:
result = data.toUpperCase()
}
workflow {
channel.of('Hello')
| map { v -> v.reverse() }
| (greet & to_upper)
| mix
| view
}
In the above snippet, the initial channel is piped to the map operator, which reverses the string value. Then, the result is passed to the processes greet and to_upper, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the mix operator. Finally, the result is printed using the view operator.
The same code can also be written as:
workflow {
ch = channel.of('Hello').map { v -> v.reverse() }
ch_greet = greet(ch)
ch_upper = to_upper(ch)
ch_greet.mix(ch_upper).view()
}
Process and workflow recursion
New in version 22.04.0.
Note
This is a preview feature and requires the nextflow.preview.recursion feature flag to be enabled. The syntax and behavior may change in future releases.
Processes can be invoked recursively using the recurse method.
nextflow.preview.recursion = true
params.start = 10
workflow {
count_down
.recurse(params.start)
.until { v -> v == 0 }
.view { v -> "${v}..." }
}
process count_down {
input:
val v
output:
val v
exec:
sleep(1000)
v = v - 1
}
9...
8...
7...
6...
5...
4...
3...
2...
1...
0...
In the above example, the count_down process is first invoked with the value params.start. On each subsequent iteration, the process is invoked again using the output from the previous iteration. The recursion continues until the specified condition is satisfied, as defined by the until method, which terminates the recursion.
The recursive output can also be limited using the times method:
count_down
.recurse(params.start)
.times(3)
.view { v -> "${v}..." }
Workflows can also be invoked recursively:
nextflow.preview.recursion = true
params.input = "recurse-workflow.in"
workflow {
clock
.recurse(file(params.input))
.until { file -> file.size() > 64 }
.view { file -> file.text }
}
workflow clock {
take:
logfile
emit:
tock(tick(logfile))
}
process tick {
input:
path 'input.txt'
output:
path 'result.txt'
script:
"""
cat input.txt > result.txt
echo "Task ${task.index} : tick" >> result.txt
"""
}
process tock {
input:
path 'input.txt'
output:
path 'result.txt'
script:
"""
cat input.txt > result.txt
echo "Task ${task.index} : tock" >> result.txt
"""
}
hello
Task 1 : tick
Task 1 : tock
hello
Task 1 : tick
Task 1 : tock
Task 2 : tick
Task 2 : tock
hello
Task 1 : tick
Task 1 : tock
Task 2 : tick
Task 2 : tock
Task 3 : tick
Task 3 : tock
Limitations
A recursive process or workflow must have matching inputs and outputs, such that the outputs for each iteration can be supplied as the inputs for the next iteration.
Recursive workflows cannot use reduction operators such as
collect,reduce, andtoList, because these operators cause the recursion to hang indefinitely after the initial iteration.