DAG 工作流架构
DAG(Directed Acyclic Graph)工作流架构 是一种基于有向无环图的任务编排架构模式。通过将任务组织为节点、依赖关系组织为边,实现任务的并行执行、依赖管理和错误恢复,是构建大规模数据处理和自动化工作流的核心架构。
一、核心原理
1.1 设计哲学
DAG 工作流的核心思想是任务的结构化编排:
┌─────────────────────────────────────────────────────────────┐
│ DAG 工作流设计哲学 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 传统顺序执行: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Task A │ ──→ │ Task B │ ──→ │ Task C │ │
│ └─────────┘ └─────────┘ └─────────┘ │
│ 所有任务串行执行,效率低 │
│ │
│ DAG 工作流: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────┐ │ │
│ │ │ Task A │ │ │
│ │ └────┬────┘ │ │
│ │ │ │ │
│ │ ┌───────┴───────┐ │ │
│ │ ↓ ↓ │ │
│ │ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Task B │ │ Task C │ ← 可并行执行 │ │
│ │ └────┬────┘ └────┬────┘ │ │
│ │ │ │ │ │
│ │ └───────┬───────┘ │ │
│ │ ↓ │ │
│ │ ┌─────────┐ │ │
│ │ │ Task D │ │ │
│ │ └─────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 核心优势:并行执行 + 依赖管理 + 容错恢复 │
│ │
└─────────────────────────────────────────────────────────────┘1.2 核心概念
| 概念 | 英文 | 说明 | 示例 |
|---|---|---|---|
| 节点 | Node | 代表一个任务或操作 | 数据加载、模型训练 |
| 边 | Edge | 代表任务间的依赖关系 | Task B 依赖 Task A |
| 拓扑排序 | Topological Sort | 确定执行顺序的算法 | A→B,C→D |
| 并行度 | Parallelism | 同时执行的任务数 | B 和 C 可并行 |
| 关键路径 | Critical Path | 最长的依赖链 | A→B→D |
1.3 DAG 特性
┌─────────────────────────────────────────────────────────────┐
│ DAG 核心特性 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 有向性(Directed) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 任务有明确的执行方向 │ │
│ │ │ │
│ │ A ──→ B 正确:A 完成后执行 B │ │
│ │ A ←── B 错误:依赖关系反向 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 无环性(Acyclic) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 不存在循环依赖,保证有终止 │ │
│ │ │ │
│ │ A → B → C → A ❌ 循环依赖,无法执行 │ │
│ │ A → B → C → D ✓ 无环,可执行 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 依赖传递 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 任务依赖可以传递 │ │
│ │ │ │
│ │ A → B → C 等价于:C 依赖 A 和 B │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 并行潜力 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 无依赖关系的任务可并行执行 │ │
│ │ │ │
│ │ A → B │ │
│ │ A → C B 和 C 可并行执行 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘1.4 与其他架构对比
| 对比维度 | DAG 工作流 | ReAct | Plan-and-Execute |
|---|---|---|---|
| 执行方式 | 按拓扑序 | 动态循环 | 两阶段 |
| 并行能力 | 强 | 无 | 有限 |
| 确定性 | 高 | 低 | 中 |
| 灵活性 | 低 | 高 | 中 |
| 适用场景 | 数据管道 | 探索任务 | 结构化任务 |
二、工作流程
2.1 完整工作流程图
┌─────────────────────────────────────────────────────────────────────┐
│ DAG 工作流完整流程 │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ 定义 DAG │ │
│ │ (Define) │ │
│ └──────┬──────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 验证阶段 │ │
│ │ ┌───────────────────────────────────────────────────────┐ │ │
│ │ │ 1. 检查是否有环 │ │ │
│ │ │ 2. 检查节点是否可达 │ │ │
│ │ │ 3. 检查依赖是否完整 │ │ │
│ │ └───────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 调度阶段 │ │
│ │ │ │
│ │ ┌─────────────┐ │ │
│ │ │ 拓扑排序 │ │ │
│ │ │ 确定执行顺序│ │ │
│ │ └──────┬──────┘ │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ 任务队列 │ │ │
│ │ │ │ │ │
│ │ │ [A] → [B, C] → [D, E] → [F] │ │ │
│ │ │ ↑ ↑ ↑ ↑ │ │ │
│ │ │ 层1 层2 层3 层4 │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 执行阶段 │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────────┐ │ │
│ │ │ │ │ │
│ │ │ Layer 1: [A] ────────────────────────→ 完成 │ │ │
│ │ │ │ │ │
│ │ │ Layer 2: [B] ──────→ 完成 │ │ │
│ │ │ [C] ──────→ 完成 ← 并行执行 │ │ │
│ │ │ │ │ │
│ │ │ Layer 3: [D] ──────→ 完成 │ │ │
│ │ │ [E] ──────→ 失败 → 重试 → 完成 │ │ │
│ │ │ │ │ │
│ │ │ Layer 4: [F] ────────────────────────→ 完成 │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │ │
│ ↓ │
│ ┌─────────────────────┐ │
│ │ 输出结果 │ │
│ │ (Results) │ │
│ └─────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘2.2 节点状态管理
from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
class NodeState(Enum):
"""节点状态"""
PENDING = "pending" # 等待执行
QUEUED = "queued" # 已入队
RUNNING = "running" # 执行中
SUCCESS = "success" # 执行成功
FAILED = "failed" # 执行失败
SKIPPED = "skipped" # 跳过
RETRYING = "retrying" # 重试中
UPSTREAM_FAILED = "upstream_failed" # 上游失败
@dataclass
class NodeResult:
"""节点执行结果"""
node_id: str
state: NodeState
output: Any = None
error: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
retry_count: int = 0
@property
def duration(self) -> float:
if self.start_time and self.end_time:
return (self.end_time - self.start_time).total_seconds()
return 02.3 执行策略
┌─────────────────────────────────────────────────────────────┐
│ DAG 执行策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 层次并行执行 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 按拓扑层级执行,每层内的任务并行执行 │ │
│ │ │ │
│ │ Layer 1: [A] → 执行 A │ │
│ │ Layer 2: [B, C] → 并行执行 B 和 C │ │
│ │ Layer 3: [D, E, F] → 并行执行 D, E, F │ │
│ │ Layer 4: [G] → 执行 G │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 最大并行执行 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 限制最大并行任务数,防止资源过载 │ │
│ │ │ │
│ │ max_parallelism = 2 │ │
│ │ │ │
│ │ Ready Queue: [B, C, D, E] │ │
│ │ Running: [B, C] (并行度=2) │ │
│ │ Waiting: [D, E] │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 条件分支执行 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 根据条件选择执行路径 │ │
│ │ │ │
│ │ ┌─── [B] (if success) │ │
│ │ [A] ────┤ │ │
│ │ └─── [C] (if failed) │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 错误处理策略 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • 重试:失败后自动重试 N 次 │ │
│ │ • 跳过:标记为跳过,继续执行其他任务 │ │
│ │ • 终止:停止整个工作流 │ │
│ │ • 补偿:执行补偿任务 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘三、代码实现
3.1 基础实现
"""
DAG 工作流基础实现
"""
from typing import Dict, List, Set, Any, Callable, Optional
from dataclasses import dataclass, field
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
@dataclass
class Node:
"""DAG 节点"""
id: str
task: Callable
dependencies: List[str] = field(default_factory=list)
retries: int = 0
max_retries: int = 3
state: NodeState = NodeState.PENDING
result: Any = None
error: Optional[str] = None
class DAG:
"""有向无环图"""
def __init__(self):
self.nodes: Dict[str, Node] = {}
self.edges: Dict[str, List[str]] = defaultdict(list) # 邻接表
self.in_degree: Dict[str, int] = defaultdict(int) # 入度
def add_node(self, node: Node):
"""添加节点"""
self.nodes[node.id] = node
# 更新依赖关系
for dep in node.dependencies:
self.edges[dep].append(node.id)
self.in_degree[node.id] += 1
# 确保所有节点都有入度记录
if node.id not in self.in_degree:
self.in_degree[node.id] = 0
def validate(self) -> bool:
"""验证 DAG 是否有效(无环)"""
# 使用 Kahn 算法检测环
in_degree = self.in_degree.copy()
queue = deque([node_id for node_id, degree in in_degree.items() if degree == 0])
visited = 0
while queue:
node_id = queue.popleft()
visited += 1
for neighbor in self.edges[node_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return visited == len(self.nodes)
def topological_sort(self) -> List[str]:
"""拓扑排序"""
in_degree = self.in_degree.copy()
queue = deque([node_id for node_id, degree in in_degree.items() if degree == 0])
result = []
while queue:
node_id = queue.popleft()
result.append(node_id)
for neighbor in self.edges[node_id]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result
def get_layers(self) -> List[List[str]]:
"""获取按层级分组的节点"""
in_degree = self.in_degree.copy()
layers = []
while True:
# 找到当前层(入度为 0 的节点)
current_layer = [
node_id for node_id, degree in in_degree.items()
if degree == 0 and self.nodes[node_id].state == NodeState.PENDING
]
if not current_layer:
break
layers.append(current_layer)
# 更新入度
for node_id in current_layer:
self.nodes[node_id].state = NodeState.QUEUED
for neighbor in self.edges[node_id]:
in_degree[neighbor] -= 1
# 重置状态
for node in self.nodes.values():
if node.state == NodeState.QUEUED:
node.state = NodeState.PENDING
return layers
class DAGExecutor:
"""DAG 执行器"""
def __init__(
self,
dag: DAG,
max_parallelism: int = 4,
on_node_complete: Callable = None
):
self.dag = dag
self.max_parallelism = max_parallelism
self.on_node_complete = on_node_complete
self.results: Dict[str, Any] = {}
self.lock = threading.Lock()
def execute(self) -> Dict[str, Any]:
"""执行 DAG"""
# 验证 DAG
if not self.dag.validate():
raise ValueError("DAG 包含循环依赖")
# 获取层级
layers = self.dag.get_layers()
print(f"[DAG] 共 {len(self.dag.nodes)} 个节点,{len(layers)} 层")
# 逐层执行
for layer_idx, layer in enumerate(layers):
print(f"\n[Layer {layer_idx + 1}] 执行: {layer}")
# 并行执行当前层
self._execute_layer(layer)
return self.results
def _execute_layer(self, layer: List[str]):
"""并行执行一层"""
with ThreadPoolExecutor(max_workers=self.max_parallelism) as executor:
futures = {}
for node_id in layer:
node = self.dag.nodes[node_id]
future = executor.submit(self._execute_node, node)
futures[future] = node_id
for future in as_completed(futures):
node_id = futures[future]
try:
result = future.result()
print(f" [{node_id}] 完成")
except Exception as e:
print(f" [{node_id}] 失败: {e}")
def _execute_node(self, node: Node) -> Any:
"""执行单个节点"""
node.state = NodeState.RUNNING
# 准备输入(上游节点的输出)
inputs = {
dep: self.results[dep]
for dep in node.dependencies
if dep in self.results
}
# 执行任务(带重试)
for attempt in range(node.max_retries + 1):
try:
result = node.task(inputs)
with self.lock:
node.state = NodeState.SUCCESS
node.result = result
self.results[node.id] = result
if self.on_node_complete:
self.on_node_complete(node)
return result
except Exception as e:
node.retries = attempt + 1
node.error = str(e)
if attempt < node.max_retries:
node.state = NodeState.RETRYING
print(f" [{node.id}] 重试 {attempt + 1}/{node.max_retries}")
else:
node.state = NodeState.FAILED
raise
return None
# 使用示例
if __name__ == "__main__":
# 定义任务
def task_a(inputs):
print(" 执行任务 A")
return {"data": "A 的结果"}
def task_b(inputs):
print(" 执行任务 B")
return {"data": f"B 处理了 {inputs['a']['data']}"}
def task_c(inputs):
print(" 执行任务 C")
return {"data": f"C 处理了 {inputs['a']['data']}"}
def task_d(inputs):
print(" 执行任务 D")
return {"data": f"D 聚合了 {inputs['b']['data']} 和 {inputs['c']['data']}"}
# 构建 DAG
dag = DAG()
dag.add_node(Node(id="a", task=task_a))
dag.add_node(Node(id="b", task=task_b, dependencies=["a"]))
dag.add_node(Node(id="c", task=task_c, dependencies=["a"]))
dag.add_node(Node(id="d", task=task_d, dependencies=["b", "c"]))
# 执行
executor = DAGExecutor(dag, max_parallelism=2)
results = executor.execute()
print(f"\n最终结果: {results}")3.2 条件分支实现
"""
支持条件分支的 DAG 实现
"""
@dataclass
class ConditionalNode(Node):
"""条件节点"""
condition: Callable[[Any], str] = None # 条件函数,返回分支名称
branches: Dict[str, List[str]] = None # 分支映射
def get_next_nodes(self, result: Any) -> List[str]:
"""根据条件获取下一批节点"""
if not self.condition or not self.branches:
return []
branch = self.condition(result)
return self.branches.get(branch, [])
class ConditionalDAGExecutor(DAGExecutor):
"""支持条件分支的执行器"""
def execute(self) -> Dict[str, Any]:
"""执行带条件分支的 DAG"""
# 获取初始可执行节点
ready_nodes = [
node_id for node_id, node in self.dag.nodes.items()
if not node.dependencies
]
while ready_nodes:
# 执行当前批次
completed = self._execute_batch(ready_nodes)
# 获取下一批可执行节点
ready_nodes = []
for node_id in completed:
node = self.dag.nodes[node_id]
if isinstance(node, ConditionalNode):
# 条件节点:根据结果选择分支
next_nodes = node.get_next_nodes(node.result)
ready_nodes.extend(next_nodes)
else:
# 普通节点:检查下游节点
for neighbor_id in self.dag.edges[node_id]:
neighbor = self.dag.nodes[neighbor_id]
if self._can_execute(neighbor_id):
ready_nodes.append(neighbor_id)
return self.results
def _can_execute(self, node_id: str) -> bool:
"""检查节点是否可执行"""
node = self.dag.nodes[node_id]
# 检查所有依赖是否完成
for dep in node.dependencies:
if self.dag.nodes[dep].state != NodeState.SUCCESS:
return False
return True
# 使用示例
def check_result(result):
"""根据结果选择分支"""
if result.get("score", 0) > 0.5:
return "high"
return "low"
# 构建条件 DAG
dag = DAG()
dag.add_node(Node(id="start", task=lambda x: {"score": 0.8}))
dag.add_node(ConditionalNode(
id="check",
task=lambda x: x["start"],
dependencies=["start"],
condition=check_result,
branches={
"high": ["process_high"],
"low": ["process_low"]
}
))
dag.add_node(Node(id="process_high", task=lambda x: "高级处理", dependencies=["check"]))
dag.add_node(Node(id="process_low", task=lambda x: "低级处理", dependencies=["check"]))3.3 使用 LangGraph 实现
"""
使用 LangGraph 实现 DAG 工作流
"""
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
class WorkflowState(TypedDict):
"""工作流状态"""
inputs: dict
results: Annotated[dict, operator.or_]
errors: Annotated[list, operator.add]
def create_dag_workflow():
"""创建 DAG 工作流"""
# 定义节点函数
def load_data(state: WorkflowState):
print("Loading data...")
return {"results": {"data": "loaded_data"}}
def process_a(state: WorkflowState):
print("Processing A...")
return {"results": {"a": "processed_a"}}
def process_b(state: WorkflowState):
print("Processing B...")
return {"results": {"b": "processed_b"}}
def merge_results(state: WorkflowState):
print("Merging results...")
return {"results": {"merged": "final_result"}}
# 创建图
workflow = StateGraph(WorkflowState)
# 添加节点
workflow.add_node("load", load_data)
workflow.add_node("process_a", process_a)
workflow.add_node("process_b", process_b)
workflow.add_node("merge", merge_results)
# 添加边(定义依赖关系)
workflow.set_entry_point("load")
workflow.add_edge("load", "process_a")
workflow.add_edge("load", "process_b")
workflow.add_edge("process_a", "merge")
workflow.add_edge("process_b", "merge")
workflow.add_edge("merge", END)
return workflow.compile()
# 使用示例
if __name__ == "__main__":
workflow = create_dag_workflow()
result = workflow.invoke({
"inputs": {"query": "test"},
"results": {},
"errors": []
})
print(f"Results: {result['results']}")四、适用场景
4.1 最佳适用场景
| 场景类型 | 具体示例 | DAG 优势 |
|---|---|---|
| 数据处理管道 | ETL、数据清洗 | 并行处理、依赖管理 |
| ML 工作流 | 特征工程、模型训练 | 任务编排、资源调度 |
| 自动化测试 | 测试套件执行 | 并行测试、依赖顺序 |
| CI/CD 管道 | 构建、测试、部署 | 流程自动化、错误处理 |
4.2 场景详解
┌─────────────────────────────────────────────────────────────┐
│ DAG 适用场景详解 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 数据处理管道 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ [数据加载] → [清洗] → [转换] → [加载] │ │
│ │ ↓ ↓ ↓ │ │
│ │ [验证] [验证] [验证] │ │
│ │ ↓ ↓ ↓ │ │
│ │ [日志] [日志] [日志] │ │
│ │ │ │
│ │ 优势:各步骤并行、错误隔离、可监控 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. ML 工作流 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ [数据获取] ─────────────────────────────┐ │ │
│ │ ↓ ↓ │ │
│ │ [特征提取A] [特征提取B] [特征提取C] │ │
│ │ ↓ ↓ ↓ │ │
│ │ [特征融合] ─────→ [模型训练] → [评估] │ │
│ │ ↓ │ │
│ │ [部署] │ │
│ │ │ │
│ │ 优势:特征并行提取、流程可复现 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. CI/CD 管道 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ [代码拉取] │ │
│ │ ↓ │ │
│ │ [单元测试] ←── 可并行 │ │
│ │ [集成测试] ←── 可并行 │ │
│ │ [代码扫描] ←── 可并行 │ │
│ │ ↓ │ │
│ │ [构建镜像] │ │
│ │ ↓ │ │
│ │ [部署测试环境] → [自动化测试] │ │
│ │ ↓ │ │
│ │ [部署生产环境] │ │
│ │ │ │
│ │ 优势:测试并行、流程标准化 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘4.3 不适用场景
| 场景 | 原因 | 推荐替代方案 |
|---|---|---|
| 动态决策任务 | 无法预先确定执行顺序 | ReAct 模式 |
| 循环迭代任务 | DAG 无环特性限制 | 状态机 + 循环 |
| 探索性任务 | 流程不确定 | Agent 自主决策 |
| 实时交互任务 | 需要即时响应 | 事件驱动架构 |
五、局限性与优化
5.1 主要局限性
| 局限性 | 具体表现 | 影响 |
|---|---|---|
| 静态结构 | 运行时无法改变 DAG | 灵活性受限 |
| 无循环 | 不支持迭代任务 | 某些场景不适用 |
| 复杂性 | 大规模 DAG 难以维护 | 可读性差 |
| 资源竞争 | 并行任务资源冲突 | 需要资源调度 |
5.2 优化策略
┌─────────────────────────────────────────────────────────────┐
│ DAG 优化策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 动态 DAG │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 问题:静态 DAG 无法适应动态变化 │ │
│ │ │ │
│ │ 优化方案: │ │
│ │ • 条件分支:根据运行时结果选择分支 │ │
│ │ • 动态生成:基于输入动态构建 DAG │ │
│ │ • 子图嵌套:将 DAG 作为节点嵌套 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 资源调度 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 问题:并行任务资源竞争 │ │
│ │ │ │
│ │ 优化方案: │ │
│ │ • 资源标签:为节点标记资源需求 │ │
│ │ • 优先级队列:高优先级任务优先调度 │ │
│ │ • 背压机制:资源不足时暂停调度 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 容错处理 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 问题:单个节点失败影响整体 │ │
│ │ │ │
│ │ 优化方案: │ │
│ │ • 检查点:定期保存执行状态 │ │
│ │ • 断点续传:从失败点恢复执行 │ │
│ │ • 补偿机制:执行补偿任务回滚 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 可观测性 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 问题:大规模 DAG 难以监控 │ │
│ │ │ │
│ │ 优化方案: │ │
│ │ • 可视化:生成 DAG 执行图 │ │
│ │ • 指标收集:记录每个节点的执行指标 │ │
│ │ • 日志聚合:集中收集和分析日志 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘5.3 与 Agent 结合
"""
DAG + Agent 混合架构
"""
class DAGAgent:
"""结合 Agent 的 DAG 执行"""
def __init__(self, dag_template, agent):
self.dag_template = dag_template
self.agent = agent
def run(self, task: str):
# 1. Agent 分析任务,生成 DAG 配置
config = self.agent.analyze(task)
# 2. 动态构建 DAG
dag = self._build_dag(config)
# 3. 执行 DAG
results = dag.execute()
# 4. Agent 处理结果
return self.agent.summarize(results)
def _build_dag(self, config):
"""根据配置动态构建 DAG"""
dag = DAG()
for node_config in config["nodes"]:
node = Node(
id=node_config["id"],
task=node_config["task"],
dependencies=node_config.get("dependencies", [])
)
dag.add_node(node)
return dag六、面试常见问题
Q1: DAG 工作流与普通工作流有什么区别?
A:
| 对比维度 | DAG 工作流 | 普通工作流 |
|---|---|---|
| 执行顺序 | 拓扑排序 | 固定顺序 |
| 并行能力 | 强(自动识别) | 弱(需手动配置) |
| 依赖管理 | 自动管理 | 手动处理 |
| 容错能力 | 节点级重试 | 流程级重试 |
| 可视化 | 图形化展示 | 线性展示 |
核心差异:DAG 工作流通过图结构自动管理依赖关系,实现最大化并行。
Q2: 如何检测 DAG 中的循环依赖?
A:
检测方法:
def has_cycle(dag) -> bool:
"""使用 Kahn 算法检测环"""
in_degree = dag.in_degree.copy()
queue = deque([n for n, d in in_degree.items() if d == 0])
visited = 0
while queue:
node = queue.popleft()
visited += 1
for neighbor in dag.edges[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
# 如果访问的节点数小于总节点数,说明有环
return visited < len(dag.nodes)Q3: 如何处理 DAG 节点执行失败?
A:
失败处理策略:
| 策略 | 适用场景 | 实现方式 |
|---|---|---|
| 重试 | 临时性错误 | 自动重试 N 次 |
| 跳过 | 非关键任务 | 标记跳过,继续执行 |
| 终止 | 关键任务失败 | 停止整个工作流 |
| 补偿 | 需要回滚 | 执行补偿任务 |
| 降级 | 有备选方案 | 执行备选任务 |
Q4: DAG 工作流如何实现最大化并行?
A:
并行优化策略:
- 层次并行:同一层的节点并行执行
- 资源感知:根据资源限制调整并行度
- 关键路径优化:优先调度关键路径上的任务
- 预取:提前准备下游任务所需数据
def get_max_parallelism(dag):
"""计算最大并行度"""
layers = dag.get_layers()
return max(len(layer) for layer in layers)Q5: DAG 工作流在 Agent 系统中如何应用?
A:
应用场景:
| 场景 | DAG 作用 | 示例 |
|---|---|---|
| 多步骤任务 | 任务编排 | 数据分析流程 |
| 多工具协作 | 工具调度 | 搜索→分析→生成 |
| 并行处理 | 提升效率 | 并行信息收集 |
| 流程自动化 | 标准化执行 | 自动化测试 |
Q6: 主流的 DAG 工作流框架有哪些?
A:
| 框架 | 语言 | 特点 | 适用场景 |
|---|---|---|---|
| Airflow | Python | 成熟稳定 | 数据管道 |
| Prefect | Python | 现代化 | ML 工作流 |
| Luigi | Python | 轻量级 | 数据处理 |
| Dagster | Python | 类型安全 | 数据工程 |
| Temporal | Go | 分布式 | 微服务编排 |
| LangGraph | Python | AI 原生 | Agent 工作流 |
七、总结
| 概念 | 一句话总结 | 面试关键词 |
|---|---|---|
| DAG | 有向无环图,任务编排的数据结构 | 无环、依赖管理 |
| 拓扑排序 | 确定 DAG 执行顺序的算法 | 层次化、并行识别 |
| 节点 | DAG 中的任务单元 | 任务、状态管理 |
| 边 | 节点间的依赖关系 | 依赖、流向 |
| 核心优势 | 并行执行、依赖管理、容错 | 效率、可靠性 |
| 适用场景 | 数据管道、ML 工作流、CI/CD | 结构化流程 |
一句话总结:DAG 工作流通过有向无环图结构实现任务的并行执行和依赖管理,是构建大规模数据处理和自动化工作流的核心架构模式。
最后更新:2026年3月18日