8 计算流程管理:targets & Dagster
当分析从单个脚本扩展到几十个样本、多个物种或需要在集群上长时间运行时,靠手动运行脚本很快会失控:
- 这个结果是用哪个脚本跑的?是最新版本吗?
- 这个脚本我跑过了吗?结果文件在哪儿?
- 这个脚本应该使用哪个脚本产生的结果作为输入?
- 上次使用的参数是什么?我忘记记录了。
在分析任务比较简短的时候,这些问题似乎还不太重要。但当您需要进行一篇文章的完整分析流程时,使用流程管理工具就十分必要了。它可以把所有分析步骤变成一条可重复、可追踪、可扩展的“装配线”,让任何人都能用同一条命令从原始FASTQ跑到最终图表。
流程管理的核心收益:
- 可复现:所有步骤、参数、输入输出被声明在图中,重新执行不靠记忆。
- 容错与续跑:流程管理工具知道哪些文件已经产出,只补跑失败或缺失的节点。让您能够以最小代价完成分析。
- 并行与调度:天然支持多核/集群,把任务拆成安全的并行单元。
- 审计与记录:日志、时间、资源使用可追踪,方便排错和复盘。
下面我们重点介绍两种流程管理工具:R语言使用targets,Python语言使用Dagster。两者都遵循本书的配置约定:可调参数放 config/,原始数据放 data/raw/,中间结果放 results/。
8.1 R:targets
8.1.1 安装 R 包
在开始使用 {targets} 之前,需要安装必要的 R 包:
pixi add r-targets r-qs r-qs2 r-tarchetypes8.1.2 _targets.R 最小骨架
_targets.R 是流水线“主开关”,通常包含四段:
library(targets)
library(tarchetypes)
# 1) 配置文件和存储位置
tar_config_set(
script = "_targets.R",
store = "_targets"
)
# 2) 全局选项
tar_option_set(
packages = c("readr", "dplyr", "yaml"),
format = "auto", # 中间对象存储格式,自动选择,R对象使用qs存储,文件使用file
error = "continue", # 单节点失败不阻塞其余
seed = 42 # 全局随机种子
)
# 3) 加载自定义函数
tar_source("R/") # 加载 R/ 目录下所有 .R 文件
# 4) 读取配置(参数集中放 config/)
config <- yaml::read_yaml("config/config.yaml")
# 5) 声明流水线
list(
tar_target(name, command, format, cue, pattern, …)
)关键点:
- 包与选项放在
tar_option_set(),保持一处维护。 - 业务函数放
R/*.R,不要写在_targets.R;这样便于测试与复用。 - 配置、路径、阈值全部进
config/,命令中不出现硬编码路径。
8.1.3 常用命令
# 运行整个流水线
tar_make()
# 在交互环境中读取结果
tar_read(norm_matrix) # 读取对象到临时变量
tar_load(norm_matrix) # 加载到当前环境(创建同名变量)
# 只运行特定 target
tar_make(names = c("norm_matrix", "deg_results"))
# 查看依赖图
tar_visnetwork()
# 查看哪些需要重跑
tar_outdated()8.1.4 并行执行
targets 天然支持并行执行,可大幅提升多样本分析速度:
# 在 _targets.R 中设置并行
tar_option_set(
packages = c("readr", "dplyr"),
format = "auto",
controller = crew::crew_controller_local(workers = 4) # 使用4核并行
)
# 或使用 future 后端
library(future)
plan(multisession, workers = 4)
tar_make_future()对于多个样本,使用 pattern = map() 自动并行:
tar_target(
aligned_bam,
align_reads(fastq_files),
pattern = map(fastq_files) # 每个样本独立并行执行
)8.1.5 调试技巧
# 查看流水线状态
tar_manifest() # 查看所有 targets
tar_meta() # 查看执行历史和元数据
# 排查错误
tar_meta() |>
filter(!is.na(error)) # 查看失败的 target
tar_traceback(norm_matrix) # 查看详细错误堆栈
# 强制重跑
tar_destroy(norm_matrix) # 删除缓存,强制重跑
tar_invalidate(norm_matrix) # 标记为过期,下次 tar_make() 会重跑
# 交互式调试
tar_load(raw_matrix) # 加载中间结果
# 手动运行函数测试
result <- normalize_counts(raw_matrix)8.1.6 target-函数-脚本的关系
- target:一份可重用的、有签名的结果(文件或对象)。
targets只在上游输出发生变化时重跑。 - 函数:放在
R/下,保持纯函数风格(输入显式,输出返回值,不写入全局变量)。 - 脚本文件:
R/normalize_matrix.R等,专门存放这些函数。 - 连接方式:
_targets.R中的tar_target()调用这些函数,并声明输入/输出。
示例片段:
tar_target(raw_matrix_path, "data/matrix.csv", format = "file"),
tar_target(raw_matrix, readr::read_csv(raw_matrix_path)),
tar_target(norm_matrix, normalize_matrix(raw_matrix, method = config$normalization)),
tar_target(norm_path, write_matrix(norm_matrix, "results/normalized_matrix.csv"), format = "file")执行流:raw_matrix_path(常量文件路径)→ raw_matrix(读入数据)→ norm_matrix(函数处理)→ norm_path(写出文件)。targets 用哈希检查输入是否变化,自动决定是否重跑。
8.1.7 示例:RNA-seq 差异表达分析流水线
目标:从 FASTQ 文件开始,完成比对、定量、归一化,最终得到差异表达基因列表。
准备配置:在
config/config.yaml写入:fastq_dir: data/raw/fastq reference_genome: data/reference/genome.fa reference_gtf: data/reference/genes.gtf contrast: control: ["ctrl_1", "ctrl_2", "ctrl_3"] treatment: ["treat_1", "treat_2", "treat_3"] normalization: TPM fdr_threshold: 0.05编写函数(放 R/rnaseq_functions.R):
# 列出所有 FASTQ 文件 list_fastq_files <- function(fastq_dir) { list.files(fastq_dir, pattern = "\\.fastq\\.gz$", full.names = TRUE) } # 比对(实际调用 STAR 或 hisat2) align_reads <- function(fastq_file, genome, threads = 4) { sample_name <- tools::file_path_sans_ext(basename(fastq_file)) bam_file <- file.path("results/aligned", paste0(sample_name, ".bam")) # 这里简化,实际应调用 system2() 运行 STAR # system2("STAR", args = c(...)) fs::dir_create(dirname(bam_file)) file.create(bam_file) # 示例:创建空文件 return(bam_file) } # 计数(实际调用 featureCounts 或 HTSeq) count_features <- function(bam_files, gtf) { # 实际应调用 Rsubread::featureCounts() # counts <- featureCounts(bam_files, annot.ext = gtf) # 这里返回模拟的计数矩阵 tibble::tibble( gene_id = paste0("Gene", 1:1000), ctrl_1 = rpois(1000, 100), ctrl_2 = rpois(1000, 100), treat_1 = rpois(1000, 150) ) } # 归一化 normalize_counts <- function(count_matrix, method = "TPM") { # 实际应使用 edgeR::cpm() 或 DESeq2::vst() count_matrix |> dplyr::mutate(across(where(is.numeric), ~ log1p(.x))) } # 差异表达分析 find_deg <- function(normalized, contrast, fdr = 0.05) { # 实际应使用 DESeq2::DESeq() 或 edgeR::glmFit() tibble::tibble( gene_id = paste0("Gene", 1:100), log2fc = rnorm(100, mean = 2, sd = 1), pvalue = runif(100, 0, 0.1), padj = p.adjust(pvalue, method = "BH") ) |> dplyr::filter(padj < fdr, abs(log2fc) > 1) } # 写出结果 write_results <- function(deg_table, path) { fs::dir_create(dirname(path)) readr::write_csv(deg_table, path) path }编写
_targets.R:library(targets) library(tarchetypes) tar_option_set( packages = c("readr", "dplyr", "yaml", "fs", "tibble"), format = "auto", # 自动选择:R对象用qs,文件用file seed = 42 ) tar_source("R/") config <- yaml::read_yaml("config/config.yaml") list( # 1. 列出输入文件 tar_target( fastq_files, list_fastq_files(config$fastq_dir) ), # 2. 比对(每个样本并行) tar_target( aligned_bam, align_reads(fastq_files, config$reference_genome), pattern = map(fastq_files), format = "file" ), # 3. 计数 tar_target( count_matrix, count_features(aligned_bam, config$reference_gtf) ), # 4. 归一化 tar_target( normalized, normalize_counts(count_matrix, config$normalization) ), # 5. 差异表达分析 tar_target( deg_results, find_deg(normalized, config$contrast, config$fdr_threshold) ), # 6. 写出结果 tar_target( deg_output, write_results(deg_results, "results/deg_genes.csv"), format = "file" ) )运行:
# 查看依赖图 Rscript -e "targets::tar_visnetwork()" # 运行流水线(自动并行比对步骤) Rscript -e "targets::tar_make()"产物:
results/deg_genes.csv。如果只修改了差异分析的FDR阈值,tar_make()只会重跑find_deg和write_results,前面的比对和计数会被跳过。扩展思路:
- 多组对比:使用
tar_map()对不同对比组合展开多个分支 - 质控报告:添加
tar_target生成 FastQC 和 MultiQC 报告 - 可视化:添加 target 绘制火山图、热图等
tar_target( volcano_plot, plot_volcano(deg_results, "results/figures/volcano.png"), format = "file" )- 多组对比:使用
8.2 Python:Dagster
Python 项目可以用 Dagster 构建可复现流水线。
8.2.1 安装
pixi add dagster验证安装:
dagster --version8.2.2 基本结构
pipeline/dagster_defs.py # Dagster definitions
config/config.yaml # 参数配置
data/raw/ # 输入数据
results/ # 输出结果
核心概念:
- 资产(asset):等价于 targets 中的 target,每个资产返回一个数据对象或文件路径,Dagster 会记录其物化状态与依赖。
- Definitions:把资产/资源/调度组合成一个仓库入口。
dagster dev会自动加载并渲染依赖图。 - 配置文件:放在
config/config.yaml,在资产内读取,保持与本书其他章节一致。
8.2.3 常用命令
# 启动开发服务器(带 UI)
dagster dev -f pipeline/dagster_defs.py
# 查看所有资产
dagster asset list -f pipeline/dagster_defs.py
# 物化(运行)特定资产
dagster asset materialize --select deg_results -f pipeline/dagster_defs.py
# 物化多个资产
dagster asset materialize --select count_matrix,normalized -f pipeline/dagster_defs.py
# 查看运行历史
dagster run list -f pipeline/dagster_defs.py
# 查看特定运行的日志
dagster run logs <run_id> -f pipeline/dagster_defs.py
# 清理特定资产的缓存
dagster asset wipe --select deg_results -f pipeline/dagster_defs.py8.2.4 并行执行
Dagster 支持多种并行策略:
1. 使用多进程执行器:
from dagster import Definitions, multiprocess_executor
defs = Definitions(
assets=[...],
executor=multiprocess_executor.configured({"max_concurrent": 4})
)2. 动态分区(多样本并行):
from dagster import asset, DynamicPartitionsDefinition
# 定义动态分区
samples_partitions = DynamicPartitionsDefinition(name="samples")
@asset(partitions_def=samples_partitions)
def aligned_bam(context):
sample = context.partition_key
# 每个样本独立执行
return align_reads(f"data/raw/{sample}.fastq.gz")
# 添加分区
from dagster import add_dynamic_partitions
add_dynamic_partitions(samples_partitions, ["sample1", "sample2", "sample3"])3. 使用 @multi_asset 批量处理:
from dagster import multi_asset, AssetOut
@multi_asset(
outs={
"sample1_bam": AssetOut(),
"sample2_bam": AssetOut(),
"sample3_bam": AssetOut(),
}
)
def align_all_samples():
# 一次性处理多个样本
results = {}
for sample in ["sample1", "sample2", "sample3"]:
results[f"{sample}_bam"] = align_reads(sample)
return results8.2.5 调试
# 1. 添加日志
from dagster import get_dagster_logger
@asset
def my_asset():
logger = get_dagster_logger()
logger.info("开始处理...")
logger.warning("发现潜在问题")
logger.error("处理失败")
return result
# 2. 查看资产元数据
from dagster import MetadataValue
@asset
def count_matrix():
counts = perform_counting()
return Output(
counts,
metadata={
"num_genes": len(counts),
"num_samples": counts.shape[1],
"preview": MetadataValue.md(counts.head().to_markdown())
}
)
# 3. 本地交互测试(不通过 Dagster 运行)
# 直接在 Python 中测试函数
if __name__ == "__main__":
import yaml
config = yaml.safe_load(open("config/config.yaml"))
# 手动测试资产函数
fastq_files = list_fastq_files(config["fastq_dir"])
bam = align_reads(fastq_files[0])
print(f"生成 BAM: {bam}")
# 4. 失败重试配置
from dagster import asset, RetryPolicy
@asset(retry_policy=RetryPolicy(max_retries=3, delay=60))
def unstable_asset():
# 失败会自动重试3次,每次间隔60秒
return process_data()调试工作流:
# 1. 查看失败的运行
dagster run list --status FAILURE -f pipeline/dagster_defs.py
# 2. 查看详细日志
dagster run logs <run_id> -f pipeline/dagster_defs.py
# 3. 清理失败资产的缓存重跑
dagster asset wipe --select failed_asset -f pipeline/dagster_defs.py
dagster asset materialize --select failed_asset -f pipeline/dagster_defs.py
# 4. 在 UI 中查看
# 访问 http://localhost:3000 查看可视化依赖图和运行状态8.2.6 示例:RNA-seq 差异表达分析
与 targets 示例对应,实现从 FASTQ 到差异表达基因的完整流程。
1. 准备配置 (config/config.yaml):
fastq_dir: data/raw/fastq
reference_genome: data/reference/genome.fa
reference_gtf: data/reference/genes.gtf
contrast:
control: ["ctrl_1", "ctrl_2", "ctrl_3"]
treatment: ["treat_1", "treat_2", "treat_3"]
normalization: TPM
fdr_threshold: 0.052. 编写资产定义 (pipeline/dagster_defs.py):
from pathlib import Path
import pandas as pd
import yaml
from dagster import asset, Definitions, Output, MetadataValue
# 辅助函数(实际应放在单独的模块中)
def list_fastq_files(fastq_dir):
return list(Path(fastq_dir).glob("*.fastq.gz"))
def align_reads(fastq_file, genome):
# 实际调用 STAR 或 hisat2
sample = fastq_file.stem.replace(".fastq", "")
bam_file = Path(f"results/aligned/{sample}.bam")
bam_file.parent.mkdir(parents=True, exist_ok=True)
# system(f"STAR --genomeDir {genome} --readFilesIn {fastq_file} ...")
bam_file.touch() # 示例
return str(bam_file)
def count_features(bam_files, gtf):
# 实际调用 featureCounts
counts = pd.DataFrame({
"gene_id": [f"Gene{i}" for i in range(1000)],
**{Path(bam).stem: [100] * 1000 for bam in bam_files}
})
return counts
def normalize_counts(counts, method="TPM"):
# 实际使用 edgeR 或 DESeq2
numeric_cols = counts.select_dtypes(include="number").columns
counts[numeric_cols] = counts[numeric_cols].apply(lambda x: np.log1p(x))
return counts
def find_deg(normalized, contrast, fdr=0.05):
# 实际使用 DESeq2 或 edgeR
import numpy as np
deg = pd.DataFrame({
"gene_id": [f"Gene{i}" for i in range(100)],
"log2fc": np.random.randn(100) * 2,
"pvalue": np.random.uniform(0, 0.1, 100)
})
deg["padj"] = deg["pvalue"] * 10 # 简化的 BH 校正
return deg[deg["padj"] < fdr]
# 资产定义
@asset
def config():
"""加载配置文件"""
with open("config/config.yaml") as f:
return yaml.safe_load(f)
@asset
def fastq_files(config) -> list[str]:
"""列出所有 FASTQ 文件"""
files = list_fastq_files(config["fastq_dir"])
return [str(f) for f in files]
@asset
def aligned_bams(fastq_files, config) -> list[str]:
"""比对所有样本"""
bams = []
for fastq in fastq_files:
bam = align_reads(fastq, config["reference_genome"])
bams.append(bam)
return bams
@asset
def count_matrix(aligned_bams, config) -> pd.DataFrame:
"""对所有 BAM 进行计数"""
counts = count_features(aligned_bams, config["reference_gtf"])
return Output(
counts,
metadata={
"num_genes": len(counts),
"num_samples": counts.shape[1] - 1,
"preview": MetadataValue.md(counts.head(10).to_markdown())
}
)
@asset
def normalized(count_matrix, config) -> pd.DataFrame:
"""归一化表达矩阵"""
return normalize_counts(count_matrix, config["normalization"])
@asset
def deg_results(normalized, config) -> pd.DataFrame:
"""差异表达分析"""
deg = find_deg(
normalized,
config["contrast"],
config["fdr_threshold"]
)
return Output(
deg,
metadata={
"num_deg": len(deg),
"significant_genes": MetadataValue.md(deg.to_markdown())
}
)
@asset
def deg_output(deg_results) -> str:
"""写出结果文件"""
out = Path("results/deg_genes.csv")
out.parent.mkdir(parents=True, exist_ok=True)
deg_results.to_csv(out, index=False)
return str(out)
# 定义仓库
defs = Definitions(assets=[
config,
fastq_files,
aligned_bams,
count_matrix,
normalized,
deg_results,
deg_output
])3. 运行流水线:
# 启动 UI 查看依赖图
dagster dev -f pipeline/dagster_defs.py
# 或直接命令行运行
dagster asset materialize --select deg_output -f pipeline/dagster_defs.py在 UI (http://localhost:3000) 中可以看到完整的依赖图:
config → fastq_files → aligned_bams → count_matrix → normalized → deg_results → deg_output
8.2.7 与 targets 对比
| 特性 | targets (R) | Dagster (Python) |
|---|---|---|
| 定义方式 | tar_target() 列表 |
@asset 装饰器 |
| 并行 | pattern = map() |
分区或多进程执行器 |
| 缓存 | 自动哈希检查 | 资产版本化 |
| UI | tar_visnetwork() |
Web UI (http://localhost:3000) |
| 调试 | tar_load() |
直接调用函数 |
| 适用场景 | R 生态、统计分析 | Python 生态、数据工程 |