8  计算流程管理:targets & Nextflow

Modified

2026-05-08

8.1 背景

一开始,大毛觉得分析流程无非就是“按顺序运行几个脚本”。可项目一复杂,事情就变味了:先跑质控,再跑比对,再做定量,再画图;中间某一步挂了,他不知道该从哪一步接着跑;换一批样本后,又怕自己漏改了参数;过了两周回头看,连哪个结果对应哪次运行都分不清。

最让他崩溃的是,明明每一步单独看都不难,一旦串起来就像一堆散落的零件,越做越乱。大毛开始意识到,问题不在于不会写脚本,而在于缺少一条能自动衔接、自动记录、自动续跑的“流水线”。

8.2 简介

当分析从单个脚本扩展到几十个样本、多个物种或需要在集群上长时间运行时,靠手动运行脚本很快会失控:

  • 这个结果是用哪个脚本跑的?是最新版本吗?
  • 这个脚本我跑过了吗?结果文件在哪儿?
  • 这个脚本应该使用哪个脚本产生的结果作为输入?
  • 上次使用的参数是什么?我忘记记录了。

在分析任务比较简短的时候,这些问题似乎还不太重要。但当您需要进行一篇文章的完整分析流程时,使用流程管理工具就十分必要了。它可以把所有分析步骤变成一条可重复、可追踪、可扩展的“装配线”,让任何人都能用同一条命令从原始FASTQ跑到最终图表。

流程管理的核心收益:

  1. 可复现:所有步骤、参数、输入输出被声明在图中,重新执行不靠记忆。
  2. 容错与续跑:流程管理工具知道哪些文件已经产出,只补跑失败或缺失的节点。让您能够以最小代价完成分析。
  3. 并行与调度:天然支持多核/集群,把任务拆成安全的并行单元。
  4. 审计与记录:日志、时间、资源使用可追踪,方便排错和复盘。

下面我们重点介绍两种流程管理工具:R 语言使用 targets,生物信息学项目常用 Nextflow。两者都遵循本书的配置约定:可调参数放 config/,原始数据放 data/raw/,中间结果放 results/

8.3 R:targets

8.3.1 安装 R 包

在开始使用 {targets} 之前,需要安装必要的 R 包:

pixi add r-targets r-qs r-qs2 r-tarchetypes

8.3.2 _targets.R 最小骨架

_targets.R 是流水线“主开关”,通常包含四段:

library(targets)
library(tarchetypes)

# 1) 配置文件和存储位置
tar_config_set(
    script = "_targets.R",
    store = "_targets"
)

# 2) 全局选项
tar_option_set(
  packages = c("readr", "dplyr", "yaml"),
  format = "auto",            # 中间对象存储格式,自动选择,R对象使用qs存储,文件使用file
  error = "continue",        # 单节点失败不阻塞其余
  seed = 42                  # 全局随机种子
)

# 3) 加载自定义函数
tar_source("R/")              # 加载 R/ 目录下所有 .R 文件

# 4) 读取配置(参数集中放 config/)
config <- yaml::read_yaml("config/config.yaml")

# 5) 声明流水线
list(
  tar_target(name, command, format, cue, pattern, …)
)

关键点:

  • 包与选项放在 tar_option_set(),保持一处维护。
  • 业务函数放 R/*.R,不要写在_targets.R;这样便于测试与复用。
  • 配置、路径、阈值全部进 config/,命令中不出现硬编码路径。

8.3.3 常用命令

# 运行整个流水线
tar_make()

# 在交互环境中读取结果
tar_read(norm_matrix)        # 读取对象到临时变量
tar_load(norm_matrix)        # 加载到当前环境(创建同名变量)

# 只运行特定 target
tar_make(names = c("norm_matrix", "deg_results"))

# 查看依赖图
tar_visnetwork()

# 查看哪些需要重跑
tar_outdated()

8.3.4 并行执行

targets 天然支持并行执行,可大幅提升多样本分析速度:

# 在 _targets.R 中设置并行
tar_option_set(
  packages = c("readr", "dplyr"),
  format = "auto",
  controller = crew::crew_controller_local(workers = 4)  # 使用4核并行
)

# 或使用 future 后端
library(future)
plan(multisession, workers = 4)
tar_make_future()

对于多个样本,使用 pattern = map() 自动并行:

tar_target(
  aligned_bam,
  align_reads(fastq_files),
  pattern = map(fastq_files)  # 每个样本独立并行执行
)

8.3.5 调试技巧

# 查看流水线状态
tar_manifest()              # 查看所有 targets
tar_meta()                  # 查看执行历史和元数据

# 排查错误
tar_meta() |>
  filter(!is.na(error))     # 查看失败的 target

tar_traceback(norm_matrix)  # 查看详细错误堆栈

# 强制重跑
tar_destroy(norm_matrix)    # 删除缓存,强制重跑
tar_invalidate(norm_matrix) # 标记为过期,下次 tar_make() 会重跑

# 交互式调试
tar_load(raw_matrix)        # 加载中间结果
# 手动运行函数测试
result <- normalize_counts(raw_matrix)

8.3.6 target-函数-脚本的关系

  • target:一份可重用的、有签名的结果(文件或对象)。targets 只在上游输出发生变化时重跑。
  • 函数:放在 R/ 下,保持纯函数风格(输入显式,输出返回值,不写入全局变量)。
  • 脚本文件R/normalize_matrix.R 等,专门存放这些函数。
  • 连接方式_targets.R 中的 tar_target() 调用这些函数,并声明输入/输出。

示例片段:

tar_target(raw_matrix_path, "data/matrix.csv", format = "file"),
tar_target(raw_matrix, readr::read_csv(raw_matrix_path)),
tar_target(norm_matrix, normalize_matrix(raw_matrix, method = config$normalization)),
tar_target(norm_path, write_matrix(norm_matrix, "results/normalized_matrix.csv"), format = "file")

执行流:raw_matrix_path(常量文件路径)→ raw_matrix(读入数据)→ norm_matrix(函数处理)→ norm_path(写出文件)。targets 用哈希检查输入是否变化,自动决定是否重跑。

8.3.7 示例:RNA-seq 差异表达分析流水线

目标:从 FASTQ 文件开始,完成比对、定量、归一化,最终得到差异表达基因列表。

  1. 准备配置:在 config/config.yaml 写入:

    fastq_dir: data/raw/fastq
    reference_genome: data/reference/genome.fa
    reference_gtf: data/reference/genes.gtf
    contrast:
      control: ["ctrl_1", "ctrl_2", "ctrl_3"]
      treatment: ["treat_1", "treat_2", "treat_3"]
    normalization: TPM
    fdr_threshold: 0.05
  2. 编写函数(放 R/rnaseq_functions.R)

    # 列出所有 FASTQ 文件
    list_fastq_files <- function(fastq_dir) {
      list.files(fastq_dir, pattern = "\\.fastq\\.gz$", full.names = TRUE)
    }
    
    # 比对(实际调用 STAR 或 hisat2)
    align_reads <- function(fastq_file, genome, threads = 4) {
      sample_name <- tools::file_path_sans_ext(basename(fastq_file))
      bam_file <- file.path("results/aligned", paste0(sample_name, ".bam"))
      # 这里简化,实际应调用 system2() 运行 STAR
      # system2("STAR", args = c(...))
      fs::dir_create(dirname(bam_file))
      file.create(bam_file)  # 示例:创建空文件
      return(bam_file)
    }
    
    # 计数(实际调用 featureCounts 或 HTSeq)
    count_features <- function(bam_files, gtf) {
      # 实际应调用 Rsubread::featureCounts()
      # counts <- featureCounts(bam_files, annot.ext = gtf)
      # 这里返回模拟的计数矩阵
      tibble::tibble(
        gene_id = paste0("Gene", 1:1000),
        ctrl_1 = rpois(1000, 100),
        ctrl_2 = rpois(1000, 100),
        treat_1 = rpois(1000, 150)
      )
    }
    
    # 归一化
    normalize_counts <- function(count_matrix, method = "TPM") {
      # 实际应使用 edgeR::cpm() 或 DESeq2::vst()
      count_matrix |>
        dplyr::mutate(across(where(is.numeric), ~ log1p(.x)))
    }
    
    # 差异表达分析
    find_deg <- function(normalized, contrast, fdr = 0.05) {
      # 实际应使用 DESeq2::DESeq() 或 edgeR::glmFit()
      tibble::tibble(
        gene_id = paste0("Gene", 1:100),
        log2fc = rnorm(100, mean = 2, sd = 1),
        pvalue = runif(100, 0, 0.1),
        padj = p.adjust(pvalue, method = "BH")
      ) |>
        dplyr::filter(padj < fdr, abs(log2fc) > 1)
    }
    
    # 写出结果
    write_results <- function(deg_table, path) {
      fs::dir_create(dirname(path))
      readr::write_csv(deg_table, path)
      path
    }
  3. 编写 _targets.R

    library(targets)
    library(tarchetypes)
    
    tar_option_set(
      packages = c("readr", "dplyr", "yaml", "fs", "tibble"),
      format = "auto",        # 自动选择:R对象用qs,文件用file
      seed = 42
    )
    
    tar_source("R/")
    config <- yaml::read_yaml("config/config.yaml")
    
    list(
      # 1. 列出输入文件
      tar_target(
        fastq_files,
        list_fastq_files(config$fastq_dir)
      ),
    
      # 2. 比对(每个样本并行)
      tar_target(
        aligned_bam,
        align_reads(fastq_files, config$reference_genome),
        pattern = map(fastq_files),
        format = "file"
      ),
    
      # 3. 计数
      tar_target(
        count_matrix,
        count_features(aligned_bam, config$reference_gtf)
      ),
    
      # 4. 归一化
      tar_target(
        normalized,
        normalize_counts(count_matrix, config$normalization)
      ),
    
      # 5. 差异表达分析
      tar_target(
        deg_results,
        find_deg(normalized, config$contrast, config$fdr_threshold)
      ),
    
      # 6. 写出结果
      tar_target(
        deg_output,
        write_results(deg_results, "results/deg_genes.csv"),
        format = "file"
      )
    )
  4. 运行

    # 查看依赖图
    Rscript -e "targets::tar_visnetwork()"
    
    # 运行流水线(自动并行比对步骤)
    Rscript -e "targets::tar_make()"

    产物:results/deg_genes.csv。如果只修改了差异分析的FDR阈值,tar_make() 只会重跑 find_degwrite_results,前面的比对和计数会被跳过。

  5. 扩展思路

    • 多组对比:使用 tar_map() 对不同对比组合展开多个分支
    • 质控报告:添加 tar_target 生成 FastQC 和 MultiQC 报告
    • 可视化:添加 target 绘制火山图、热图等
    tar_target(
      volcano_plot,
      plot_volcano(deg_results, "results/figures/volcano.png"),
      format = "file"
    )

8.4 Nextflow:面向生信流程的标准工具

Nextflow 很适合把“一个样本一份任务”的分析写成可续跑、可并行、可迁移的流程。它不是 Python 包,而是一门专门描述流程的 DSL,底层依赖 Java 运行。

8.4.1 安装

官方要求 Bash 3.2+ 和 Java 17+。在本书的环境里,可以先用 Pixi 准备 Java,再安装 Nextflow:

# 先安装 Java
pixi add openjdk
java -version

# 安装 Nextflow
curl -s https://get.nextflow.io | bash
chmod +x nextflow
mkdir -p $HOME/.local/bin
mv nextflow $HOME/.local/bin/

# 验证安装
nextflow info

如果暂时不想修改 PATH,也可以直接在项目目录运行:

./nextflow run main.nf

8.4.2 基本结构

main.nf              # 主流程,定义 process 和 workflow
nextflow.config      # 默认参数、资源限制、profile
modules/             # 可复用模块(DSL2 常见写法)
config/              # 你自己维护的分析参数
data/raw/            # 输入数据
results/             # 最终输出
work/                # Nextflow 自动生成的工作目录

核心概念

  • process:流程中的一个步骤。你要声明输入、输出和实际执行的命令。
  • workflow:把多个 process 串起来,决定数据怎么流动。
  • channel:数据通道。哪个输入先准备好,哪个任务就可以先启动。
  • params:流程参数。通常放在 nextflow.config 或命令行里,避免把路径和阈值写死。
  • -resume:Nextflow 会缓存任务结果。下次运行时,只重跑真正受影响的步骤。

8.4.3 常用命令

# 运行本地流程
nextflow run main.nf

# 传入参数
nextflow run main.nf --reads 'data/raw/fastq/*_{1,2}.fastq.gz' --outdir results

# 使用 profile(例如 Docker 或集群)
nextflow run main.nf -profile docker

# 中断后续跑
nextflow run main.nf -resume

# 读取参数文件
nextflow run main.nf -params-file params.json

# 生成执行报告
nextflow run main.nf -with-report report.html
nextflow run main.nf -with-timeline timeline.html
nextflow run main.nf -with-trace trace.txt
nextflow run main.nf -with-dag dag.png

# 查看历史运行
nextflow log
nextflow log <run_name> -f name,status,hash

# 清理工作目录与缓存
nextflow clean -f

8.4.4 并行执行

Nextflow 的并行不是靠你手动写线程,而是靠数据流模型自动触发。只要多个样本彼此独立,Nextflow 就会把它们拆成多个任务并行运行。

最常见的场景是“每个样本一对 FASTQ”:

nextflow.enable.dsl=2
params.reads = 'data/raw/fastq/*_{1,2}.fastq.gz'

process FASTQC {
    tag "$sample"
    cpus 2

    input:
    tuple val(sample), path(reads)

    output:
    tuple val(sample), path("${sample}_fastqc.html")

    script:
    """
    fastqc -o ./ ${reads}
    mv *_fastqc.html ${sample}_fastqc.html
    """
}

workflow {
    read_pairs = channel.fromFilePairs(params.reads, checkIfExists: true)
    FASTQC(read_pairs)
}

上面的 read_pairs 会按样本名拆成多份输入。FASTQC 会对每个样本各跑一次。样本之间互不依赖,所以会自动并行。

资源通常放进 nextflow.config 统一管理:

process {
  withName: FASTQC {
    cpus = 2
    memory = '4 GB'
  }
}

如果以后把执行环境从本地换成 Slurm、Docker 或云平台,往往只需要改 profile,而不是重写流程主体。

8.4.5 调试

初学者最常见的调试动作有四个:看通道、看日志、跑假任务、用 -resume

1. 看 channel 里到底有什么

channel.of(1, 2, 3).view { v -> "channel emits ${v}" }

如果你怀疑样本名没有正确拆出来,先在 workflow 里 view() 一次,通常比盲猜快得多。

2. 让 process 的标准输出直接打印到终端

process hello {
  debug true

  script:
  """
  echo Hello
  """
}

3. 用 -stub-run 快速验证流程结构

有些步骤会跑很久,比如比对、定量、组装。Nextflow 支持给 process 写一个 stub:,在调试时生成假输出,而不真正执行大计算。

process salmon_index {
    input:
    path transcriptome

    output:
    path 'index'

    script:
    """
    salmon index --threads $task.cpus -t $transcriptome -i index
    """

    stub:
    """
    mkdir index
    touch index/seq.bin
    touch index/info.json
    touch index/refseq.bin
    """
}

运行时:

nextflow run main.nf -stub-run

4. 看报告与历史记录

nextflow log
nextflow run main.nf -with-trace trace.txt -with-report report.html -resume

真正排错时,优先看三处:

  • .nextflow.log:主日志。
  • work/:每个任务的独立工作目录。
  • trace.txt / report.html:执行时间、资源使用、失败位置。

8.4.6 示例:RNA-seq 差异表达分析

下面给出一个最小化、可演示结构的 Nextflow DSL2 例子。它展示的是“流程骨架”,不是完整生产版。真正项目里,你会把 touchprintf 换成 STAR、featureCounts、DESeq2 等真实命令。

1. 准备配置 (nextflow.config):

params {
  reads = 'data/raw/fastq/*_{1,2}.fastq.gz'
  reference = 'data/reference/genome.fa'
  gtf = 'data/reference/genes.gtf'
  outdir = 'results'
}

process {
  executor = 'local'
  cpus = 2
  memory = '4 GB'
}
profiles {
  docker {
    docker.enabled = true
  }
}

2. 编写主流程 (main.nf):

nextflow.enable.dsl=2

process ALIGN_READS {
    tag "$sample"
    publishDir "${params.outdir}/aligned", mode: 'copy'

    input:
    tuple val(sample), path(reads)
    path reference

    output:
    tuple val(sample), path("${sample}.bam")

    script:
    """
    # 实际项目可替换为 STAR / hisat2
    touch ${sample}.bam
    """

    stub:
    """
    touch ${sample}.bam
    """
}

process COUNT_FEATURES {
    publishDir "${params.outdir}/counts", mode: 'copy'

    input:
    path bam_files
    path gtf

    output:
    path 'count_matrix.csv'

    script:
    """
    # 实际项目可替换为 featureCounts
    printf 'gene_id,ctrl_1,ctrl_2,treat_1\nGene1,100,120,180\nGene2,80,75,150\n' > count_matrix.csv
    """

    stub:
    """
    printf 'gene_id,ctrl_1,ctrl_2,treat_1\nGene1,100,120,180\n' > count_matrix.csv
    """
}

process FIND_DEG {
    publishDir "${params.outdir}/deg", mode: 'copy'

    input:
    path count_matrix

    output:
    path 'deg_genes.csv'

    script:
    """
    # 实际项目可在这里调用 Rscript 运行 DESeq2 / edgeR
    printf 'gene_id,log2fc,padj\nGene1,1.8,0.01\nGene2,1.2,0.03\n' > deg_genes.csv
    """

    stub:
    """
    printf 'gene_id,log2fc,padj\nGene1,1.8,0.01\n' > deg_genes.csv
    """
}

workflow {
    read_pairs = channel.fromFilePairs(params.reads, checkIfExists: true)

    aligned = ALIGN_READS(read_pairs, file(params.reference))
    bam_files = aligned.map { sample, bam -> bam }.collect()
    counts = COUNT_FEATURES(bam_files, file(params.gtf))
    deg = FIND_DEG(counts)

    deg.view { file -> "差异表达结果: ${file}" }
}

这个例子里,真正并行的是 ALIGN_READS。因为每个样本都能独立比对,所以 Nextflow 会把它们拆成多个任务。等所有 BAM 都准备好后,再汇总进入 COUNT_FEATURES

3. 运行流程

# 先用假任务确认结构没问题
nextflow run main.nf -stub-run

# 再正式运行,并生成报告
nextflow run main.nf -resume -with-report report.html -with-timeline timeline.html

运行完成后,重点看两个目录:

  • results/aligned/:每个样本的 BAM。
  • results/deg/:最终差异表达结果 deg_genes.csv

8.4.7 与 targets 对比

特性 targets (R) Nextflow
定义方式 tar_target() 列表 process + workflow(DSL2)
并行 pattern = map() channel 驱动的自动并行
缓存 自动哈希检查 task cache + -resume
报告 tar_visnetwork() report / timeline / trace
调试 tar_load() view()-stub-runwork/
适用场景 R 生态、统计分析 生信流程、HPC、容器化执行