8 计算流程管理:targets & Nextflow
8.1 背景
一开始,大毛觉得分析流程无非就是“按顺序运行几个脚本”。可项目一复杂,事情就变味了:先跑质控,再跑比对,再做定量,再画图;中间某一步挂了,他不知道该从哪一步接着跑;换一批样本后,又怕自己漏改了参数;过了两周回头看,连哪个结果对应哪次运行都分不清。
最让他崩溃的是,明明每一步单独看都不难,一旦串起来就像一堆散落的零件,越做越乱。大毛开始意识到,问题不在于不会写脚本,而在于缺少一条能自动衔接、自动记录、自动续跑的“流水线”。
8.2 简介
当分析从单个脚本扩展到几十个样本、多个物种或需要在集群上长时间运行时,靠手动运行脚本很快会失控:
- 这个结果是用哪个脚本跑的?是最新版本吗?
- 这个脚本我跑过了吗?结果文件在哪儿?
- 这个脚本应该使用哪个脚本产生的结果作为输入?
- 上次使用的参数是什么?我忘记记录了。
在分析任务比较简短的时候,这些问题似乎还不太重要。但当您需要进行一篇文章的完整分析流程时,使用流程管理工具就十分必要了。它可以把所有分析步骤变成一条可重复、可追踪、可扩展的“装配线”,让任何人都能用同一条命令从原始FASTQ跑到最终图表。
流程管理的核心收益:
- 可复现:所有步骤、参数、输入输出被声明在图中,重新执行不靠记忆。
- 容错与续跑:流程管理工具知道哪些文件已经产出,只补跑失败或缺失的节点。让您能够以最小代价完成分析。
- 并行与调度:天然支持多核/集群,把任务拆成安全的并行单元。
- 审计与记录:日志、时间、资源使用可追踪,方便排错和复盘。
下面我们重点介绍两种流程管理工具:R 语言使用 targets,生物信息学项目常用 Nextflow。两者都遵循本书的配置约定:可调参数放 config/,原始数据放 data/raw/,中间结果放 results/。
8.3 R:targets
8.3.1 安装 R 包
在开始使用 {targets} 之前,需要安装必要的 R 包:
pixi add r-targets r-qs r-qs2 r-tarchetypes8.3.2 _targets.R 最小骨架
_targets.R 是流水线“主开关”,通常包含四段:
library(targets)
library(tarchetypes)
# 1) 配置文件和存储位置
tar_config_set(
script = "_targets.R",
store = "_targets"
)
# 2) 全局选项
tar_option_set(
packages = c("readr", "dplyr", "yaml"),
format = "auto", # 中间对象存储格式,自动选择,R对象使用qs存储,文件使用file
error = "continue", # 单节点失败不阻塞其余
seed = 42 # 全局随机种子
)
# 3) 加载自定义函数
tar_source("R/") # 加载 R/ 目录下所有 .R 文件
# 4) 读取配置(参数集中放 config/)
config <- yaml::read_yaml("config/config.yaml")
# 5) 声明流水线
list(
tar_target(name, command, format, cue, pattern, …)
)关键点:
- 包与选项放在
tar_option_set(),保持一处维护。 - 业务函数放
R/*.R,不要写在_targets.R;这样便于测试与复用。 - 配置、路径、阈值全部进
config/,命令中不出现硬编码路径。
8.3.3 常用命令
# 运行整个流水线
tar_make()
# 在交互环境中读取结果
tar_read(norm_matrix) # 读取对象到临时变量
tar_load(norm_matrix) # 加载到当前环境(创建同名变量)
# 只运行特定 target
tar_make(names = c("norm_matrix", "deg_results"))
# 查看依赖图
tar_visnetwork()
# 查看哪些需要重跑
tar_outdated()8.3.4 并行执行
targets 天然支持并行执行,可大幅提升多样本分析速度:
# 在 _targets.R 中设置并行
tar_option_set(
packages = c("readr", "dplyr"),
format = "auto",
controller = crew::crew_controller_local(workers = 4) # 使用4核并行
)
# 或使用 future 后端
library(future)
plan(multisession, workers = 4)
tar_make_future()对于多个样本,使用 pattern = map() 自动并行:
tar_target(
aligned_bam,
align_reads(fastq_files),
pattern = map(fastq_files) # 每个样本独立并行执行
)8.3.5 调试技巧
# 查看流水线状态
tar_manifest() # 查看所有 targets
tar_meta() # 查看执行历史和元数据
# 排查错误
tar_meta() |>
filter(!is.na(error)) # 查看失败的 target
tar_traceback(norm_matrix) # 查看详细错误堆栈
# 强制重跑
tar_destroy(norm_matrix) # 删除缓存,强制重跑
tar_invalidate(norm_matrix) # 标记为过期,下次 tar_make() 会重跑
# 交互式调试
tar_load(raw_matrix) # 加载中间结果
# 手动运行函数测试
result <- normalize_counts(raw_matrix)8.3.6 target-函数-脚本的关系
- target:一份可重用的、有签名的结果(文件或对象)。
targets只在上游输出发生变化时重跑。 - 函数:放在
R/下,保持纯函数风格(输入显式,输出返回值,不写入全局变量)。 - 脚本文件:
R/normalize_matrix.R等,专门存放这些函数。 - 连接方式:
_targets.R中的tar_target()调用这些函数,并声明输入/输出。
示例片段:
tar_target(raw_matrix_path, "data/matrix.csv", format = "file"),
tar_target(raw_matrix, readr::read_csv(raw_matrix_path)),
tar_target(norm_matrix, normalize_matrix(raw_matrix, method = config$normalization)),
tar_target(norm_path, write_matrix(norm_matrix, "results/normalized_matrix.csv"), format = "file")执行流:raw_matrix_path(常量文件路径)→ raw_matrix(读入数据)→ norm_matrix(函数处理)→ norm_path(写出文件)。targets 用哈希检查输入是否变化,自动决定是否重跑。
8.3.7 示例:RNA-seq 差异表达分析流水线
目标:从 FASTQ 文件开始,完成比对、定量、归一化,最终得到差异表达基因列表。
准备配置:在
config/config.yaml写入:fastq_dir: data/raw/fastq reference_genome: data/reference/genome.fa reference_gtf: data/reference/genes.gtf contrast: control: ["ctrl_1", "ctrl_2", "ctrl_3"] treatment: ["treat_1", "treat_2", "treat_3"] normalization: TPM fdr_threshold: 0.05编写函数(放 R/rnaseq_functions.R):
# 列出所有 FASTQ 文件 list_fastq_files <- function(fastq_dir) { list.files(fastq_dir, pattern = "\\.fastq\\.gz$", full.names = TRUE) } # 比对(实际调用 STAR 或 hisat2) align_reads <- function(fastq_file, genome, threads = 4) { sample_name <- tools::file_path_sans_ext(basename(fastq_file)) bam_file <- file.path("results/aligned", paste0(sample_name, ".bam")) # 这里简化,实际应调用 system2() 运行 STAR # system2("STAR", args = c(...)) fs::dir_create(dirname(bam_file)) file.create(bam_file) # 示例:创建空文件 return(bam_file) } # 计数(实际调用 featureCounts 或 HTSeq) count_features <- function(bam_files, gtf) { # 实际应调用 Rsubread::featureCounts() # counts <- featureCounts(bam_files, annot.ext = gtf) # 这里返回模拟的计数矩阵 tibble::tibble( gene_id = paste0("Gene", 1:1000), ctrl_1 = rpois(1000, 100), ctrl_2 = rpois(1000, 100), treat_1 = rpois(1000, 150) ) } # 归一化 normalize_counts <- function(count_matrix, method = "TPM") { # 实际应使用 edgeR::cpm() 或 DESeq2::vst() count_matrix |> dplyr::mutate(across(where(is.numeric), ~ log1p(.x))) } # 差异表达分析 find_deg <- function(normalized, contrast, fdr = 0.05) { # 实际应使用 DESeq2::DESeq() 或 edgeR::glmFit() tibble::tibble( gene_id = paste0("Gene", 1:100), log2fc = rnorm(100, mean = 2, sd = 1), pvalue = runif(100, 0, 0.1), padj = p.adjust(pvalue, method = "BH") ) |> dplyr::filter(padj < fdr, abs(log2fc) > 1) } # 写出结果 write_results <- function(deg_table, path) { fs::dir_create(dirname(path)) readr::write_csv(deg_table, path) path }编写
_targets.R:library(targets) library(tarchetypes) tar_option_set( packages = c("readr", "dplyr", "yaml", "fs", "tibble"), format = "auto", # 自动选择:R对象用qs,文件用file seed = 42 ) tar_source("R/") config <- yaml::read_yaml("config/config.yaml") list( # 1. 列出输入文件 tar_target( fastq_files, list_fastq_files(config$fastq_dir) ), # 2. 比对(每个样本并行) tar_target( aligned_bam, align_reads(fastq_files, config$reference_genome), pattern = map(fastq_files), format = "file" ), # 3. 计数 tar_target( count_matrix, count_features(aligned_bam, config$reference_gtf) ), # 4. 归一化 tar_target( normalized, normalize_counts(count_matrix, config$normalization) ), # 5. 差异表达分析 tar_target( deg_results, find_deg(normalized, config$contrast, config$fdr_threshold) ), # 6. 写出结果 tar_target( deg_output, write_results(deg_results, "results/deg_genes.csv"), format = "file" ) )运行:
# 查看依赖图 Rscript -e "targets::tar_visnetwork()" # 运行流水线(自动并行比对步骤) Rscript -e "targets::tar_make()"产物:
results/deg_genes.csv。如果只修改了差异分析的FDR阈值,tar_make()只会重跑find_deg和write_results,前面的比对和计数会被跳过。扩展思路:
- 多组对比:使用
tar_map()对不同对比组合展开多个分支 - 质控报告:添加
tar_target生成 FastQC 和 MultiQC 报告 - 可视化:添加 target 绘制火山图、热图等
tar_target( volcano_plot, plot_volcano(deg_results, "results/figures/volcano.png"), format = "file" )- 多组对比:使用
8.4 Nextflow:面向生信流程的标准工具
Nextflow 很适合把“一个样本一份任务”的分析写成可续跑、可并行、可迁移的流程。它不是 Python 包,而是一门专门描述流程的 DSL,底层依赖 Java 运行。
8.4.1 安装
官方要求 Bash 3.2+ 和 Java 17+。在本书的环境里,可以先用 Pixi 准备 Java,再安装 Nextflow:
# 先安装 Java
pixi add openjdk
java -version
# 安装 Nextflow
curl -s https://get.nextflow.io | bash
chmod +x nextflow
mkdir -p $HOME/.local/bin
mv nextflow $HOME/.local/bin/
# 验证安装
nextflow info如果暂时不想修改 PATH,也可以直接在项目目录运行:
./nextflow run main.nf8.4.2 基本结构
main.nf # 主流程,定义 process 和 workflow
nextflow.config # 默认参数、资源限制、profile
modules/ # 可复用模块(DSL2 常见写法)
config/ # 你自己维护的分析参数
data/raw/ # 输入数据
results/ # 最终输出
work/ # Nextflow 自动生成的工作目录
核心概念:
- process:流程中的一个步骤。你要声明输入、输出和实际执行的命令。
- workflow:把多个 process 串起来,决定数据怎么流动。
- channel:数据通道。哪个输入先准备好,哪个任务就可以先启动。
- params:流程参数。通常放在
nextflow.config或命令行里,避免把路径和阈值写死。 -resume:Nextflow 会缓存任务结果。下次运行时,只重跑真正受影响的步骤。
8.4.3 常用命令
# 运行本地流程
nextflow run main.nf
# 传入参数
nextflow run main.nf --reads 'data/raw/fastq/*_{1,2}.fastq.gz' --outdir results
# 使用 profile(例如 Docker 或集群)
nextflow run main.nf -profile docker
# 中断后续跑
nextflow run main.nf -resume
# 读取参数文件
nextflow run main.nf -params-file params.json
# 生成执行报告
nextflow run main.nf -with-report report.html
nextflow run main.nf -with-timeline timeline.html
nextflow run main.nf -with-trace trace.txt
nextflow run main.nf -with-dag dag.png
# 查看历史运行
nextflow log
nextflow log <run_name> -f name,status,hash
# 清理工作目录与缓存
nextflow clean -f8.4.4 并行执行
Nextflow 的并行不是靠你手动写线程,而是靠数据流模型自动触发。只要多个样本彼此独立,Nextflow 就会把它们拆成多个任务并行运行。
最常见的场景是“每个样本一对 FASTQ”:
nextflow.enable.dsl=2
params.reads = 'data/raw/fastq/*_{1,2}.fastq.gz'
process FASTQC {
tag "$sample"
cpus 2
input:
tuple val(sample), path(reads)
output:
tuple val(sample), path("${sample}_fastqc.html")
script:
"""
fastqc -o ./ ${reads}
mv *_fastqc.html ${sample}_fastqc.html
"""
}
workflow {
read_pairs = channel.fromFilePairs(params.reads, checkIfExists: true)
FASTQC(read_pairs)
}上面的 read_pairs 会按样本名拆成多份输入。FASTQC 会对每个样本各跑一次。样本之间互不依赖,所以会自动并行。
资源通常放进 nextflow.config 统一管理:
process {
withName: FASTQC {
cpus = 2
memory = '4 GB'
}
}如果以后把执行环境从本地换成 Slurm、Docker 或云平台,往往只需要改 profile,而不是重写流程主体。
8.4.5 调试
初学者最常见的调试动作有四个:看通道、看日志、跑假任务、用 -resume。
1. 看 channel 里到底有什么
channel.of(1, 2, 3).view { v -> "channel emits ${v}" }如果你怀疑样本名没有正确拆出来,先在 workflow 里 view() 一次,通常比盲猜快得多。
2. 让 process 的标准输出直接打印到终端
process hello {
debug true
script:
"""
echo Hello
"""
}3. 用 -stub-run 快速验证流程结构
有些步骤会跑很久,比如比对、定量、组装。Nextflow 支持给 process 写一个 stub:,在调试时生成假输出,而不真正执行大计算。
process salmon_index {
input:
path transcriptome
output:
path 'index'
script:
"""
salmon index --threads $task.cpus -t $transcriptome -i index
"""
stub:
"""
mkdir index
touch index/seq.bin
touch index/info.json
touch index/refseq.bin
"""
}运行时:
nextflow run main.nf -stub-run4. 看报告与历史记录
nextflow log
nextflow run main.nf -with-trace trace.txt -with-report report.html -resume真正排错时,优先看三处:
.nextflow.log:主日志。work/:每个任务的独立工作目录。trace.txt/report.html:执行时间、资源使用、失败位置。
8.4.6 示例:RNA-seq 差异表达分析
下面给出一个最小化、可演示结构的 Nextflow DSL2 例子。它展示的是“流程骨架”,不是完整生产版。真正项目里,你会把 touch 或 printf 换成 STAR、featureCounts、DESeq2 等真实命令。
1. 准备配置 (nextflow.config):
params {
reads = 'data/raw/fastq/*_{1,2}.fastq.gz'
reference = 'data/reference/genome.fa'
gtf = 'data/reference/genes.gtf'
outdir = 'results'
}
process {
executor = 'local'
cpus = 2
memory = '4 GB'
}
profiles {
docker {
docker.enabled = true
}
}2. 编写主流程 (main.nf):
nextflow.enable.dsl=2
process ALIGN_READS {
tag "$sample"
publishDir "${params.outdir}/aligned", mode: 'copy'
input:
tuple val(sample), path(reads)
path reference
output:
tuple val(sample), path("${sample}.bam")
script:
"""
# 实际项目可替换为 STAR / hisat2
touch ${sample}.bam
"""
stub:
"""
touch ${sample}.bam
"""
}
process COUNT_FEATURES {
publishDir "${params.outdir}/counts", mode: 'copy'
input:
path bam_files
path gtf
output:
path 'count_matrix.csv'
script:
"""
# 实际项目可替换为 featureCounts
printf 'gene_id,ctrl_1,ctrl_2,treat_1\nGene1,100,120,180\nGene2,80,75,150\n' > count_matrix.csv
"""
stub:
"""
printf 'gene_id,ctrl_1,ctrl_2,treat_1\nGene1,100,120,180\n' > count_matrix.csv
"""
}
process FIND_DEG {
publishDir "${params.outdir}/deg", mode: 'copy'
input:
path count_matrix
output:
path 'deg_genes.csv'
script:
"""
# 实际项目可在这里调用 Rscript 运行 DESeq2 / edgeR
printf 'gene_id,log2fc,padj\nGene1,1.8,0.01\nGene2,1.2,0.03\n' > deg_genes.csv
"""
stub:
"""
printf 'gene_id,log2fc,padj\nGene1,1.8,0.01\n' > deg_genes.csv
"""
}
workflow {
read_pairs = channel.fromFilePairs(params.reads, checkIfExists: true)
aligned = ALIGN_READS(read_pairs, file(params.reference))
bam_files = aligned.map { sample, bam -> bam }.collect()
counts = COUNT_FEATURES(bam_files, file(params.gtf))
deg = FIND_DEG(counts)
deg.view { file -> "差异表达结果: ${file}" }
}这个例子里,真正并行的是 ALIGN_READS。因为每个样本都能独立比对,所以 Nextflow 会把它们拆成多个任务。等所有 BAM 都准备好后,再汇总进入 COUNT_FEATURES。
3. 运行流程:
# 先用假任务确认结构没问题
nextflow run main.nf -stub-run
# 再正式运行,并生成报告
nextflow run main.nf -resume -with-report report.html -with-timeline timeline.html运行完成后,重点看两个目录:
results/aligned/:每个样本的 BAM。results/deg/:最终差异表达结果deg_genes.csv。
8.4.7 与 targets 对比
| 特性 | targets (R) | Nextflow |
|---|---|---|
| 定义方式 | tar_target() 列表 |
process + workflow(DSL2) |
| 并行 | pattern = map() |
channel 驱动的自动并行 |
| 缓存 | 自动哈希检查 | task cache + -resume |
| 报告 | tar_visnetwork() 等 |
report / timeline / trace |
| 调试 | tar_load() |
view()、-stub-run、work/ |
| 适用场景 | R 生态、统计分析 | 生信流程、HPC、容器化执行 |