8  计算流程管理:targets & Dagster

Modified

2026-01-16

当分析从单个脚本扩展到几十个样本、多个物种或需要在集群上长时间运行时,靠手动运行脚本很快会失控:

在分析任务比较简短的时候,这些问题似乎还不太重要。但当您需要进行一篇文章的完整分析流程时,使用流程管理工具就十分必要了。它可以把所有分析步骤变成一条可重复、可追踪、可扩展的“装配线”,让任何人都能用同一条命令从原始FASTQ跑到最终图表。

流程管理的核心收益:

  1. 可复现:所有步骤、参数、输入输出被声明在图中,重新执行不靠记忆。
  2. 容错与续跑:流程管理工具知道哪些文件已经产出,只补跑失败或缺失的节点。让您能够以最小代价完成分析。
  3. 并行与调度:天然支持多核/集群,把任务拆成安全的并行单元。
  4. 审计与记录:日志、时间、资源使用可追踪,方便排错和复盘。

下面我们重点介绍两种流程管理工具:R语言使用targets,Python语言使用Dagster。两者都遵循本书的配置约定:可调参数放 config/,原始数据放 data/raw/,中间结果放 results/

8.1 R:targets

8.1.1 安装 R 包

在开始使用 {targets} 之前,需要安装必要的 R 包:

pixi add r-targets r-qs r-qs2 r-tarchetypes

8.1.2 _targets.R 最小骨架

_targets.R 是流水线“主开关”,通常包含四段:

library(targets)
library(tarchetypes)

# 1) 配置文件和存储位置
tar_config_set(
    script = "_targets.R",
    store = "_targets"
)

# 2) 全局选项
tar_option_set(
  packages = c("readr", "dplyr", "yaml"),
  format = "auto",            # 中间对象存储格式,自动选择,R对象使用qs存储,文件使用file
  error = "continue",        # 单节点失败不阻塞其余
  seed = 42                  # 全局随机种子
)

# 3) 加载自定义函数
tar_source("R/")              # 加载 R/ 目录下所有 .R 文件

# 4) 读取配置(参数集中放 config/)
config <- yaml::read_yaml("config/config.yaml")

# 5) 声明流水线
list(
  tar_target(name, command, format, cue, pattern, …)
)

关键点:

  • 包与选项放在 tar_option_set(),保持一处维护。
  • 业务函数放 R/*.R,不要写在_targets.R;这样便于测试与复用。
  • 配置、路径、阈值全部进 config/,命令中不出现硬编码路径。

8.1.3 常用命令

# 运行整个流水线
tar_make()

# 在交互环境中读取结果
tar_read(norm_matrix)        # 读取对象到临时变量
tar_load(norm_matrix)        # 加载到当前环境(创建同名变量)

# 只运行特定 target
tar_make(names = c("norm_matrix", "deg_results"))

# 查看依赖图
tar_visnetwork()

# 查看哪些需要重跑
tar_outdated()

8.1.4 并行执行

targets 天然支持并行执行,可大幅提升多样本分析速度:

# 在 _targets.R 中设置并行
tar_option_set(
  packages = c("readr", "dplyr"),
  format = "auto",
  controller = crew::crew_controller_local(workers = 4)  # 使用4核并行
)

# 或使用 future 后端
library(future)
plan(multisession, workers = 4)
tar_make_future()

对于多个样本,使用 pattern = map() 自动并行:

tar_target(
  aligned_bam,
  align_reads(fastq_files),
  pattern = map(fastq_files)  # 每个样本独立并行执行
)

8.1.5 调试技巧

# 查看流水线状态
tar_manifest()              # 查看所有 targets
tar_meta()                  # 查看执行历史和元数据

# 排查错误
tar_meta() |>
  filter(!is.na(error))     # 查看失败的 target

tar_traceback(norm_matrix)  # 查看详细错误堆栈

# 强制重跑
tar_destroy(norm_matrix)    # 删除缓存,强制重跑
tar_invalidate(norm_matrix) # 标记为过期,下次 tar_make() 会重跑

# 交互式调试
tar_load(raw_matrix)        # 加载中间结果
# 手动运行函数测试
result <- normalize_counts(raw_matrix)

8.1.6 target-函数-脚本的关系

  • target:一份可重用的、有签名的结果(文件或对象)。targets 只在上游输出发生变化时重跑。
  • 函数:放在 R/ 下,保持纯函数风格(输入显式,输出返回值,不写入全局变量)。
  • 脚本文件R/normalize_matrix.R 等,专门存放这些函数。
  • 连接方式_targets.R 中的 tar_target() 调用这些函数,并声明输入/输出。

示例片段:

tar_target(raw_matrix_path, "data/matrix.csv", format = "file"),
tar_target(raw_matrix, readr::read_csv(raw_matrix_path)),
tar_target(norm_matrix, normalize_matrix(raw_matrix, method = config$normalization)),
tar_target(norm_path, write_matrix(norm_matrix, "results/normalized_matrix.csv"), format = "file")

执行流:raw_matrix_path(常量文件路径)→ raw_matrix(读入数据)→ norm_matrix(函数处理)→ norm_path(写出文件)。targets 用哈希检查输入是否变化,自动决定是否重跑。

8.1.7 示例:RNA-seq 差异表达分析流水线

目标:从 FASTQ 文件开始,完成比对、定量、归一化,最终得到差异表达基因列表。

  1. 准备配置:在 config/config.yaml 写入:

    fastq_dir: data/raw/fastq
    reference_genome: data/reference/genome.fa
    reference_gtf: data/reference/genes.gtf
    contrast:
      control: ["ctrl_1", "ctrl_2", "ctrl_3"]
      treatment: ["treat_1", "treat_2", "treat_3"]
    normalization: TPM
    fdr_threshold: 0.05
  2. 编写函数(放 R/rnaseq_functions.R)

    # 列出所有 FASTQ 文件
    list_fastq_files <- function(fastq_dir) {
      list.files(fastq_dir, pattern = "\\.fastq\\.gz$", full.names = TRUE)
    }
    
    # 比对(实际调用 STAR 或 hisat2)
    align_reads <- function(fastq_file, genome, threads = 4) {
      sample_name <- tools::file_path_sans_ext(basename(fastq_file))
      bam_file <- file.path("results/aligned", paste0(sample_name, ".bam"))
      # 这里简化,实际应调用 system2() 运行 STAR
      # system2("STAR", args = c(...))
      fs::dir_create(dirname(bam_file))
      file.create(bam_file)  # 示例:创建空文件
      return(bam_file)
    }
    
    # 计数(实际调用 featureCounts 或 HTSeq)
    count_features <- function(bam_files, gtf) {
      # 实际应调用 Rsubread::featureCounts()
      # counts <- featureCounts(bam_files, annot.ext = gtf)
      # 这里返回模拟的计数矩阵
      tibble::tibble(
        gene_id = paste0("Gene", 1:1000),
        ctrl_1 = rpois(1000, 100),
        ctrl_2 = rpois(1000, 100),
        treat_1 = rpois(1000, 150)
      )
    }
    
    # 归一化
    normalize_counts <- function(count_matrix, method = "TPM") {
      # 实际应使用 edgeR::cpm() 或 DESeq2::vst()
      count_matrix |>
        dplyr::mutate(across(where(is.numeric), ~ log1p(.x)))
    }
    
    # 差异表达分析
    find_deg <- function(normalized, contrast, fdr = 0.05) {
      # 实际应使用 DESeq2::DESeq() 或 edgeR::glmFit()
      tibble::tibble(
        gene_id = paste0("Gene", 1:100),
        log2fc = rnorm(100, mean = 2, sd = 1),
        pvalue = runif(100, 0, 0.1),
        padj = p.adjust(pvalue, method = "BH")
      ) |>
        dplyr::filter(padj < fdr, abs(log2fc) > 1)
    }
    
    # 写出结果
    write_results <- function(deg_table, path) {
      fs::dir_create(dirname(path))
      readr::write_csv(deg_table, path)
      path
    }
  3. 编写 _targets.R

    library(targets)
    library(tarchetypes)
    
    tar_option_set(
      packages = c("readr", "dplyr", "yaml", "fs", "tibble"),
      format = "auto",        # 自动选择:R对象用qs,文件用file
      seed = 42
    )
    
    tar_source("R/")
    config <- yaml::read_yaml("config/config.yaml")
    
    list(
      # 1. 列出输入文件
      tar_target(
        fastq_files,
        list_fastq_files(config$fastq_dir)
      ),
    
      # 2. 比对(每个样本并行)
      tar_target(
        aligned_bam,
        align_reads(fastq_files, config$reference_genome),
        pattern = map(fastq_files),
        format = "file"
      ),
    
      # 3. 计数
      tar_target(
        count_matrix,
        count_features(aligned_bam, config$reference_gtf)
      ),
    
      # 4. 归一化
      tar_target(
        normalized,
        normalize_counts(count_matrix, config$normalization)
      ),
    
      # 5. 差异表达分析
      tar_target(
        deg_results,
        find_deg(normalized, config$contrast, config$fdr_threshold)
      ),
    
      # 6. 写出结果
      tar_target(
        deg_output,
        write_results(deg_results, "results/deg_genes.csv"),
        format = "file"
      )
    )
  4. 运行

    # 查看依赖图
    Rscript -e "targets::tar_visnetwork()"
    
    # 运行流水线(自动并行比对步骤)
    Rscript -e "targets::tar_make()"

    产物:results/deg_genes.csv。如果只修改了差异分析的FDR阈值,tar_make() 只会重跑 find_degwrite_results,前面的比对和计数会被跳过。

  5. 扩展思路

    • 多组对比:使用 tar_map() 对不同对比组合展开多个分支
    • 质控报告:添加 tar_target 生成 FastQC 和 MultiQC 报告
    • 可视化:添加 target 绘制火山图、热图等
    tar_target(
      volcano_plot,
      plot_volcano(deg_results, "results/figures/volcano.png"),
      format = "file"
    )

8.2 Python:Dagster

Python 项目可以用 Dagster 构建可复现流水线。

8.2.1 安装

pixi add dagster

验证安装:

dagster --version

8.2.2 基本结构

pipeline/dagster_defs.py   # Dagster definitions
config/config.yaml         # 参数配置
data/raw/                  # 输入数据
results/                   # 输出结果

核心概念

  • 资产(asset):等价于 targets 中的 target,每个资产返回一个数据对象或文件路径,Dagster 会记录其物化状态与依赖。
  • Definitions:把资产/资源/调度组合成一个仓库入口。dagster dev 会自动加载并渲染依赖图。
  • 配置文件:放在 config/config.yaml,在资产内读取,保持与本书其他章节一致。

8.2.3 常用命令

# 启动开发服务器(带 UI)
dagster dev -f pipeline/dagster_defs.py

# 查看所有资产
dagster asset list -f pipeline/dagster_defs.py

# 物化(运行)特定资产
dagster asset materialize --select deg_results -f pipeline/dagster_defs.py

# 物化多个资产
dagster asset materialize --select count_matrix,normalized -f pipeline/dagster_defs.py

# 查看运行历史
dagster run list -f pipeline/dagster_defs.py

# 查看特定运行的日志
dagster run logs <run_id> -f pipeline/dagster_defs.py

# 清理特定资产的缓存
dagster asset wipe --select deg_results -f pipeline/dagster_defs.py

8.2.4 并行执行

Dagster 支持多种并行策略:

1. 使用多进程执行器

from dagster import Definitions, multiprocess_executor

defs = Definitions(
    assets=[...],
    executor=multiprocess_executor.configured({"max_concurrent": 4})
)

2. 动态分区(多样本并行)

from dagster import asset, DynamicPartitionsDefinition

# 定义动态分区
samples_partitions = DynamicPartitionsDefinition(name="samples")

@asset(partitions_def=samples_partitions)
def aligned_bam(context):
    sample = context.partition_key
    # 每个样本独立执行
    return align_reads(f"data/raw/{sample}.fastq.gz")

# 添加分区
from dagster import add_dynamic_partitions
add_dynamic_partitions(samples_partitions, ["sample1", "sample2", "sample3"])

3. 使用 @multi_asset 批量处理

from dagster import multi_asset, AssetOut

@multi_asset(
    outs={
        "sample1_bam": AssetOut(),
        "sample2_bam": AssetOut(),
        "sample3_bam": AssetOut(),
    }
)
def align_all_samples():
    # 一次性处理多个样本
    results = {}
    for sample in ["sample1", "sample2", "sample3"]:
        results[f"{sample}_bam"] = align_reads(sample)
    return results

8.2.5 调试

# 1. 添加日志
from dagster import get_dagster_logger

@asset
def my_asset():
    logger = get_dagster_logger()
    logger.info("开始处理...")
    logger.warning("发现潜在问题")
    logger.error("处理失败")
    return result

# 2. 查看资产元数据
from dagster import MetadataValue

@asset
def count_matrix():
    counts = perform_counting()
    return Output(
        counts,
        metadata={
            "num_genes": len(counts),
            "num_samples": counts.shape[1],
            "preview": MetadataValue.md(counts.head().to_markdown())
        }
    )

# 3. 本地交互测试(不通过 Dagster 运行)
# 直接在 Python 中测试函数
if __name__ == "__main__":
    import yaml
    config = yaml.safe_load(open("config/config.yaml"))

    # 手动测试资产函数
    fastq_files = list_fastq_files(config["fastq_dir"])
    bam = align_reads(fastq_files[0])
    print(f"生成 BAM: {bam}")

# 4. 失败重试配置
from dagster import asset, RetryPolicy

@asset(retry_policy=RetryPolicy(max_retries=3, delay=60))
def unstable_asset():
    # 失败会自动重试3次,每次间隔60秒
    return process_data()

调试工作流

# 1. 查看失败的运行
dagster run list --status FAILURE -f pipeline/dagster_defs.py

# 2. 查看详细日志
dagster run logs <run_id> -f pipeline/dagster_defs.py

# 3. 清理失败资产的缓存重跑
dagster asset wipe --select failed_asset -f pipeline/dagster_defs.py
dagster asset materialize --select failed_asset -f pipeline/dagster_defs.py

# 4. 在 UI 中查看
# 访问 http://localhost:3000 查看可视化依赖图和运行状态

8.2.6 示例:RNA-seq 差异表达分析

与 targets 示例对应,实现从 FASTQ 到差异表达基因的完整流程。

1. 准备配置 (config/config.yaml):

fastq_dir: data/raw/fastq
reference_genome: data/reference/genome.fa
reference_gtf: data/reference/genes.gtf
contrast:
  control: ["ctrl_1", "ctrl_2", "ctrl_3"]
  treatment: ["treat_1", "treat_2", "treat_3"]
normalization: TPM
fdr_threshold: 0.05

2. 编写资产定义 (pipeline/dagster_defs.py):

from pathlib import Path
import pandas as pd
import yaml
from dagster import asset, Definitions, Output, MetadataValue

# 辅助函数(实际应放在单独的模块中)
def list_fastq_files(fastq_dir):
    return list(Path(fastq_dir).glob("*.fastq.gz"))

def align_reads(fastq_file, genome):
    # 实际调用 STAR 或 hisat2
    sample = fastq_file.stem.replace(".fastq", "")
    bam_file = Path(f"results/aligned/{sample}.bam")
    bam_file.parent.mkdir(parents=True, exist_ok=True)
    # system(f"STAR --genomeDir {genome} --readFilesIn {fastq_file} ...")
    bam_file.touch()  # 示例
    return str(bam_file)

def count_features(bam_files, gtf):
    # 实际调用 featureCounts
    counts = pd.DataFrame({
        "gene_id": [f"Gene{i}" for i in range(1000)],
        **{Path(bam).stem: [100] * 1000 for bam in bam_files}
    })
    return counts

def normalize_counts(counts, method="TPM"):
    # 实际使用 edgeR 或 DESeq2
    numeric_cols = counts.select_dtypes(include="number").columns
    counts[numeric_cols] = counts[numeric_cols].apply(lambda x: np.log1p(x))
    return counts

def find_deg(normalized, contrast, fdr=0.05):
    # 实际使用 DESeq2 或 edgeR
    import numpy as np
    deg = pd.DataFrame({
        "gene_id": [f"Gene{i}" for i in range(100)],
        "log2fc": np.random.randn(100) * 2,
        "pvalue": np.random.uniform(0, 0.1, 100)
    })
    deg["padj"] = deg["pvalue"] * 10  # 简化的 BH 校正
    return deg[deg["padj"] < fdr]

# 资产定义
@asset
def config():
    """加载配置文件"""
    with open("config/config.yaml") as f:
        return yaml.safe_load(f)

@asset
def fastq_files(config) -> list[str]:
    """列出所有 FASTQ 文件"""
    files = list_fastq_files(config["fastq_dir"])
    return [str(f) for f in files]

@asset
def aligned_bams(fastq_files, config) -> list[str]:
    """比对所有样本"""
    bams = []
    for fastq in fastq_files:
        bam = align_reads(fastq, config["reference_genome"])
        bams.append(bam)
    return bams

@asset
def count_matrix(aligned_bams, config) -> pd.DataFrame:
    """对所有 BAM 进行计数"""
    counts = count_features(aligned_bams, config["reference_gtf"])
    return Output(
        counts,
        metadata={
            "num_genes": len(counts),
            "num_samples": counts.shape[1] - 1,
            "preview": MetadataValue.md(counts.head(10).to_markdown())
        }
    )

@asset
def normalized(count_matrix, config) -> pd.DataFrame:
    """归一化表达矩阵"""
    return normalize_counts(count_matrix, config["normalization"])

@asset
def deg_results(normalized, config) -> pd.DataFrame:
    """差异表达分析"""
    deg = find_deg(
        normalized,
        config["contrast"],
        config["fdr_threshold"]
    )
    return Output(
        deg,
        metadata={
            "num_deg": len(deg),
            "significant_genes": MetadataValue.md(deg.to_markdown())
        }
    )

@asset
def deg_output(deg_results) -> str:
    """写出结果文件"""
    out = Path("results/deg_genes.csv")
    out.parent.mkdir(parents=True, exist_ok=True)
    deg_results.to_csv(out, index=False)
    return str(out)

# 定义仓库
defs = Definitions(assets=[
    config,
    fastq_files,
    aligned_bams,
    count_matrix,
    normalized,
    deg_results,
    deg_output
])

3. 运行流水线

# 启动 UI 查看依赖图
dagster dev -f pipeline/dagster_defs.py

# 或直接命令行运行
dagster asset materialize --select deg_output -f pipeline/dagster_defs.py

在 UI (http://localhost:3000) 中可以看到完整的依赖图:

config → fastq_files → aligned_bams → count_matrix → normalized → deg_results → deg_output

8.2.7 与 targets 对比

特性 targets (R) Dagster (Python)
定义方式 tar_target() 列表 @asset 装饰器
并行 pattern = map() 分区或多进程执行器
缓存 自动哈希检查 资产版本化
UI tar_visnetwork() Web UI (http://localhost:3000)
调试 tar_load() 直接调用函数
适用场景 R 生态、统计分析 Python 生态、数据工程