知识模块
🤖 Agent 知识模块
五、Agent 架构模式
DAG 工作流架构

DAG 工作流架构

DAG(Directed Acyclic Graph)工作流架构 是一种基于有向无环图的任务编排架构模式。通过将任务组织为节点、依赖关系组织为边,实现任务的并行执行、依赖管理和错误恢复,是构建大规模数据处理和自动化工作流的核心架构。


一、核心原理

1.1 设计哲学

DAG 工作流的核心思想是任务的结构化编排

┌─────────────────────────────────────────────────────────────┐
│                    DAG 工作流设计哲学                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   传统顺序执行:                                            │
│   ┌─────────┐     ┌─────────┐     ┌─────────┐             │
│   │ Task A  │ ──→ │ Task B  │ ──→ │ Task C  │             │
│   └─────────┘     └─────────┘     └─────────┘             │
│                   所有任务串行执行,效率低                   │
│                                                             │
│   DAG 工作流:                                              │
│   ┌─────────────────────────────────────────────────────┐  │
│  │                                                       │  │
│  │           ┌─────────┐                                │  │
│  │           │ Task A  │                                │  │
│  │           └────┬────┘                                │  │
│  │                │                                      │  │
│  │        ┌───────┴───────┐                             │  │
│  │        ↓               ↓                             │  │
│  │   ┌─────────┐     ┌─────────┐                        │  │
│  │   │ Task B  │     │ Task C  │  ← 可并行执行           │  │
│  │   └────┬────┘     └────┬────┘                        │  │
│  │        │               │                             │  │
│  │        └───────┬───────┘                             │  │
│  │                ↓                                      │  │
│  │           ┌─────────┐                                │  │
│  │           │ Task D  │                                │  │
│  │           └─────────┘                                │  │
│  │                                                       │  │
│  └─────────────────────────────────────────────────────┘  │
│                                                             │
│   核心优势:并行执行 + 依赖管理 + 容错恢复                  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.2 核心概念

概念英文说明示例
节点Node代表一个任务或操作数据加载、模型训练
Edge代表任务间的依赖关系Task B 依赖 Task A
拓扑排序Topological Sort确定执行顺序的算法A→B,C→D
并行度Parallelism同时执行的任务数B 和 C 可并行
关键路径Critical Path最长的依赖链A→B→D

1.3 DAG 特性

┌─────────────────────────────────────────────────────────────┐
│                    DAG 核心特性                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 有向性(Directed)                                      │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 任务有明确的执行方向                                 │   │
│  │                                                      │   │
│  │     A ──→ B    正确:A 完成后执行 B                   │   │
│  │     A ←── B    错误:依赖关系反向                     │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  2. 无环性(Acyclic)                                       │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 不存在循环依赖,保证有终止                           │   │
│  │                                                      │   │
│  │     A → B → C → A    ❌ 循环依赖,无法执行            │   │
│  │     A → B → C → D    ✓ 无环,可执行                  │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  3. 依赖传递                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 任务依赖可以传递                                     │   │
│  │                                                      │   │
│  │     A → B → C    等价于:C 依赖 A 和 B               │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  4. 并行潜力                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 无依赖关系的任务可并行执行                           │   │
│  │                                                      │   │
│  │     A → B                                            │   │
│  │     A → C    B 和 C 可并行执行                        │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.4 与其他架构对比

对比维度DAG 工作流ReActPlan-and-Execute
执行方式按拓扑序动态循环两阶段
并行能力有限
确定性
灵活性
适用场景数据管道探索任务结构化任务

二、工作流程

2.1 完整工作流程图

┌─────────────────────────────────────────────────────────────────────┐
│                    DAG 工作流完整流程                                │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌─────────────┐                                                  │
│   │  定义 DAG   │                                                  │
│   │  (Define)   │                                                  │
│   └──────┬──────┘                                                  │
│          │                                                         │
│          ↓                                                         │
│   ┌─────────────────────────────────────────────────────────────┐ │
│   │                    验证阶段                                   │ │
│   │  ┌───────────────────────────────────────────────────────┐  │ │
│   │  │ 1. 检查是否有环                                        │  │ │
│   │  │ 2. 检查节点是否可达                                    │  │ │
│   │  │ 3. 检查依赖是否完整                                    │  │ │
│   │  └───────────────────────────────────────────────────────┘  │ │
│   └─────────────────────────────────────────────────────────────┘ │
│          │                                                         │
│          ↓                                                         │
│   ┌─────────────────────────────────────────────────────────────┐ │
│   │                    调度阶段                                   │ │
│   │                                                              │ │
│   │   ┌─────────────┐                                           │ │
│   │   │ 拓扑排序    │                                           │ │
│   │   │ 确定执行顺序│                                           │ │
│   │   └──────┬──────┘                                           │ │
│   │          │                                                   │ │
│   │          ↓                                                   │ │
│   │   ┌─────────────────────────────────────────────────────┐  │ │
│   │   │                  任务队列                            │  │ │
│   │   │                                                      │  │ │
│   │   │   [A] → [B, C] → [D, E] → [F]                       │  │ │
│   │   │    ↑       ↑          ↑        ↑                    │  │ │
│   │   │   层1    层2        层3      层4                     │  │ │
│   │   │                                                      │  │ │
│   │   └─────────────────────────────────────────────────────┘  │ │
│   │                                                              │ │
│   └─────────────────────────────────────────────────────────────┘ │
│          │                                                         │
│          ↓                                                         │
│   ┌─────────────────────────────────────────────────────────────┐ │
│   │                    执行阶段                                   │ │
│   │                                                              │ │
│   │   ┌─────────────────────────────────────────────────────┐  │ │
│   │   │                                                      │  │ │
│   │   │   Layer 1:  [A] ────────────────────────→ 完成       │  │ │
│   │   │                                                      │  │ │
│   │   │   Layer 2:  [B] ──────→ 完成                         │  │ │
│   │   │             [C] ──────→ 完成    ← 并行执行           │  │ │
│   │   │                                                      │  │ │
│   │   │   Layer 3:  [D] ──────→ 完成                         │  │ │
│   │   │             [E] ──────→ 失败 → 重试 → 完成          │  │ │
│   │   │                                                      │  │ │
│   │   │   Layer 4:  [F] ────────────────────────→ 完成       │  │ │
│   │   │                                                      │  │ │
│   │   └─────────────────────────────────────────────────────┘  │ │
│   │                                                              │ │
│   └─────────────────────────────────────────────────────────────┘ │
│          │                                                         │
│          ↓                                                         │
│   ┌─────────────────────┐                                          │
│   │   输出结果          │                                          │
│   │   (Results)         │                                          │
│   └─────────────────────┘                                          │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

2.2 节点状态管理

from enum import Enum
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
 
class NodeState(Enum):
    """节点状态"""
    PENDING = "pending"        # 等待执行
    QUEUED = "queued"          # 已入队
    RUNNING = "running"        # 执行中
    SUCCESS = "success"        # 执行成功
    FAILED = "failed"          # 执行失败
    SKIPPED = "skipped"        # 跳过
    RETRYING = "retrying"      # 重试中
    UPSTREAM_FAILED = "upstream_failed"  # 上游失败
 
 
@dataclass
class NodeResult:
    """节点执行结果"""
    node_id: str
    state: NodeState
    output: Any = None
    error: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    retry_count: int = 0
    
    @property
    def duration(self) -> float:
        if self.start_time and self.end_time:
            return (self.end_time - self.start_time).total_seconds()
        return 0

2.3 执行策略

┌─────────────────────────────────────────────────────────────┐
│                    DAG 执行策略                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 层次并行执行                                            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 按拓扑层级执行,每层内的任务并行执行                  │   │
│  │                                                      │   │
│  │ Layer 1: [A]           → 执行 A                      │   │
│  │ Layer 2: [B, C]        → 并行执行 B 和 C             │   │
│  │ Layer 3: [D, E, F]     → 并行执行 D, E, F            │   │
│  │ Layer 4: [G]           → 执行 G                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  2. 最大并行执行                                            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 限制最大并行任务数,防止资源过载                      │   │
│  │                                                      │   │
│  │ max_parallelism = 2                                  │   │
│  │                                                      │   │
│  │ Ready Queue: [B, C, D, E]                            │   │
│  │ Running:     [B, C]      (并行度=2)                  │   │
│  │ Waiting:     [D, E]                                  │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  3. 条件分支执行                                            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 根据条件选择执行路径                                  │   │
│  │                                                      │   │
│  │              ┌─── [B] (if success)                   │   │
│  │     [A] ────┤                                        │   │
│  │              └─── [C] (if failed)                    │   │
│  │                                                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  4. 错误处理策略                                            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ • 重试:失败后自动重试 N 次                          │   │
│  │ • 跳过:标记为跳过,继续执行其他任务                 │   │
│  │ • 终止:停止整个工作流                               │   │
│  │ • 补偿:执行补偿任务                                 │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

三、代码实现

3.1 基础实现

"""
DAG 工作流基础实现
"""
 
from typing import Dict, List, Set, Any, Callable, Optional
from dataclasses import dataclass, field
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
 
 
@dataclass
class Node:
    """DAG 节点"""
    id: str
    task: Callable
    dependencies: List[str] = field(default_factory=list)
    retries: int = 0
    max_retries: int = 3
    state: NodeState = NodeState.PENDING
    result: Any = None
    error: Optional[str] = None
 
 
class DAG:
    """有向无环图"""
    
    def __init__(self):
        self.nodes: Dict[str, Node] = {}
        self.edges: Dict[str, List[str]] = defaultdict(list)  # 邻接表
        self.in_degree: Dict[str, int] = defaultdict(int)     # 入度
    
    def add_node(self, node: Node):
        """添加节点"""
        self.nodes[node.id] = node
        
        # 更新依赖关系
        for dep in node.dependencies:
            self.edges[dep].append(node.id)
            self.in_degree[node.id] += 1
        
        # 确保所有节点都有入度记录
        if node.id not in self.in_degree:
            self.in_degree[node.id] = 0
    
    def validate(self) -> bool:
        """验证 DAG 是否有效(无环)"""
        # 使用 Kahn 算法检测环
        in_degree = self.in_degree.copy()
        queue = deque([node_id for node_id, degree in in_degree.items() if degree == 0])
        visited = 0
        
        while queue:
            node_id = queue.popleft()
            visited += 1
            
            for neighbor in self.edges[node_id]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        return visited == len(self.nodes)
    
    def topological_sort(self) -> List[str]:
        """拓扑排序"""
        in_degree = self.in_degree.copy()
        queue = deque([node_id for node_id, degree in in_degree.items() if degree == 0])
        result = []
        
        while queue:
            node_id = queue.popleft()
            result.append(node_id)
            
            for neighbor in self.edges[node_id]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        return result
    
    def get_layers(self) -> List[List[str]]:
        """获取按层级分组的节点"""
        in_degree = self.in_degree.copy()
        layers = []
        
        while True:
            # 找到当前层(入度为 0 的节点)
            current_layer = [
                node_id for node_id, degree in in_degree.items()
                if degree == 0 and self.nodes[node_id].state == NodeState.PENDING
            ]
            
            if not current_layer:
                break
            
            layers.append(current_layer)
            
            # 更新入度
            for node_id in current_layer:
                self.nodes[node_id].state = NodeState.QUEUED
                for neighbor in self.edges[node_id]:
                    in_degree[neighbor] -= 1
        
        # 重置状态
        for node in self.nodes.values():
            if node.state == NodeState.QUEUED:
                node.state = NodeState.PENDING
        
        return layers
 
 
class DAGExecutor:
    """DAG 执行器"""
    
    def __init__(
        self,
        dag: DAG,
        max_parallelism: int = 4,
        on_node_complete: Callable = None
    ):
        self.dag = dag
        self.max_parallelism = max_parallelism
        self.on_node_complete = on_node_complete
        self.results: Dict[str, Any] = {}
        self.lock = threading.Lock()
    
    def execute(self) -> Dict[str, Any]:
        """执行 DAG"""
        # 验证 DAG
        if not self.dag.validate():
            raise ValueError("DAG 包含循环依赖")
        
        # 获取层级
        layers = self.dag.get_layers()
        
        print(f"[DAG] 共 {len(self.dag.nodes)} 个节点,{len(layers)} 层")
        
        # 逐层执行
        for layer_idx, layer in enumerate(layers):
            print(f"\n[Layer {layer_idx + 1}] 执行: {layer}")
            
            # 并行执行当前层
            self._execute_layer(layer)
        
        return self.results
    
    def _execute_layer(self, layer: List[str]):
        """并行执行一层"""
        with ThreadPoolExecutor(max_workers=self.max_parallelism) as executor:
            futures = {}
            
            for node_id in layer:
                node = self.dag.nodes[node_id]
                future = executor.submit(self._execute_node, node)
                futures[future] = node_id
            
            for future in as_completed(futures):
                node_id = futures[future]
                try:
                    result = future.result()
                    print(f"  [{node_id}] 完成")
                except Exception as e:
                    print(f"  [{node_id}] 失败: {e}")
    
    def _execute_node(self, node: Node) -> Any:
        """执行单个节点"""
        node.state = NodeState.RUNNING
        
        # 准备输入(上游节点的输出)
        inputs = {
            dep: self.results[dep]
            for dep in node.dependencies
            if dep in self.results
        }
        
        # 执行任务(带重试)
        for attempt in range(node.max_retries + 1):
            try:
                result = node.task(inputs)
                
                with self.lock:
                    node.state = NodeState.SUCCESS
                    node.result = result
                    self.results[node.id] = result
                
                if self.on_node_complete:
                    self.on_node_complete(node)
                
                return result
            
            except Exception as e:
                node.retries = attempt + 1
                node.error = str(e)
                
                if attempt < node.max_retries:
                    node.state = NodeState.RETRYING
                    print(f"  [{node.id}] 重试 {attempt + 1}/{node.max_retries}")
                else:
                    node.state = NodeState.FAILED
                    raise
        
        return None
 
 
# 使用示例
if __name__ == "__main__":
    # 定义任务
    def task_a(inputs):
        print("    执行任务 A")
        return {"data": "A 的结果"}
    
    def task_b(inputs):
        print("    执行任务 B")
        return {"data": f"B 处理了 {inputs['a']['data']}"}
    
    def task_c(inputs):
        print("    执行任务 C")
        return {"data": f"C 处理了 {inputs['a']['data']}"}
    
    def task_d(inputs):
        print("    执行任务 D")
        return {"data": f"D 聚合了 {inputs['b']['data']}{inputs['c']['data']}"}
    
    # 构建 DAG
    dag = DAG()
    dag.add_node(Node(id="a", task=task_a))
    dag.add_node(Node(id="b", task=task_b, dependencies=["a"]))
    dag.add_node(Node(id="c", task=task_c, dependencies=["a"]))
    dag.add_node(Node(id="d", task=task_d, dependencies=["b", "c"]))
    
    # 执行
    executor = DAGExecutor(dag, max_parallelism=2)
    results = executor.execute()
    
    print(f"\n最终结果: {results}")

3.2 条件分支实现

"""
支持条件分支的 DAG 实现
"""
 
@dataclass
class ConditionalNode(Node):
    """条件节点"""
    condition: Callable[[Any], str] = None  # 条件函数,返回分支名称
    branches: Dict[str, List[str]] = None    # 分支映射
    
    def get_next_nodes(self, result: Any) -> List[str]:
        """根据条件获取下一批节点"""
        if not self.condition or not self.branches:
            return []
        
        branch = self.condition(result)
        return self.branches.get(branch, [])
 
 
class ConditionalDAGExecutor(DAGExecutor):
    """支持条件分支的执行器"""
    
    def execute(self) -> Dict[str, Any]:
        """执行带条件分支的 DAG"""
        # 获取初始可执行节点
        ready_nodes = [
            node_id for node_id, node in self.dag.nodes.items()
            if not node.dependencies
        ]
        
        while ready_nodes:
            # 执行当前批次
            completed = self._execute_batch(ready_nodes)
            
            # 获取下一批可执行节点
            ready_nodes = []
            for node_id in completed:
                node = self.dag.nodes[node_id]
                
                if isinstance(node, ConditionalNode):
                    # 条件节点:根据结果选择分支
                    next_nodes = node.get_next_nodes(node.result)
                    ready_nodes.extend(next_nodes)
                else:
                    # 普通节点:检查下游节点
                    for neighbor_id in self.dag.edges[node_id]:
                        neighbor = self.dag.nodes[neighbor_id]
                        if self._can_execute(neighbor_id):
                            ready_nodes.append(neighbor_id)
        
        return self.results
    
    def _can_execute(self, node_id: str) -> bool:
        """检查节点是否可执行"""
        node = self.dag.nodes[node_id]
        
        # 检查所有依赖是否完成
        for dep in node.dependencies:
            if self.dag.nodes[dep].state != NodeState.SUCCESS:
                return False
        
        return True
 
 
# 使用示例
def check_result(result):
    """根据结果选择分支"""
    if result.get("score", 0) > 0.5:
        return "high"
    return "low"
 
# 构建条件 DAG
dag = DAG()
dag.add_node(Node(id="start", task=lambda x: {"score": 0.8}))
dag.add_node(ConditionalNode(
    id="check",
    task=lambda x: x["start"],
    dependencies=["start"],
    condition=check_result,
    branches={
        "high": ["process_high"],
        "low": ["process_low"]
    }
))
dag.add_node(Node(id="process_high", task=lambda x: "高级处理", dependencies=["check"]))
dag.add_node(Node(id="process_low", task=lambda x: "低级处理", dependencies=["check"]))

3.3 使用 LangGraph 实现

"""
使用 LangGraph 实现 DAG 工作流
"""
 
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
 
 
class WorkflowState(TypedDict):
    """工作流状态"""
    inputs: dict
    results: Annotated[dict, operator.or_]
    errors: Annotated[list, operator.add]
 
 
def create_dag_workflow():
    """创建 DAG 工作流"""
    
    # 定义节点函数
    def load_data(state: WorkflowState):
        print("Loading data...")
        return {"results": {"data": "loaded_data"}}
    
    def process_a(state: WorkflowState):
        print("Processing A...")
        return {"results": {"a": "processed_a"}}
    
    def process_b(state: WorkflowState):
        print("Processing B...")
        return {"results": {"b": "processed_b"}}
    
    def merge_results(state: WorkflowState):
        print("Merging results...")
        return {"results": {"merged": "final_result"}}
    
    # 创建图
    workflow = StateGraph(WorkflowState)
    
    # 添加节点
    workflow.add_node("load", load_data)
    workflow.add_node("process_a", process_a)
    workflow.add_node("process_b", process_b)
    workflow.add_node("merge", merge_results)
    
    # 添加边(定义依赖关系)
    workflow.set_entry_point("load")
    workflow.add_edge("load", "process_a")
    workflow.add_edge("load", "process_b")
    workflow.add_edge("process_a", "merge")
    workflow.add_edge("process_b", "merge")
    workflow.add_edge("merge", END)
    
    return workflow.compile()
 
 
# 使用示例
if __name__ == "__main__":
    workflow = create_dag_workflow()
    
    result = workflow.invoke({
        "inputs": {"query": "test"},
        "results": {},
        "errors": []
    })
    
    print(f"Results: {result['results']}")

四、适用场景

4.1 最佳适用场景

场景类型具体示例DAG 优势
数据处理管道ETL、数据清洗并行处理、依赖管理
ML 工作流特征工程、模型训练任务编排、资源调度
自动化测试测试套件执行并行测试、依赖顺序
CI/CD 管道构建、测试、部署流程自动化、错误处理

4.2 场景详解

┌─────────────────────────────────────────────────────────────┐
│                    DAG 适用场景详解                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 数据处理管道                                            │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                                                      │   │
│  │     [数据加载] → [清洗] → [转换] → [加载]            │   │
│  │         ↓            ↓         ↓                     │   │
│  │     [验证]       [验证]   [验证]                      │   │
│  │         ↓            ↓         ↓                     │   │
│  │     [日志]       [日志]   [日志]                      │   │
│  │                                                      │   │
│  │ 优势:各步骤并行、错误隔离、可监控                    │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  2. ML 工作流                                               │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                                                      │   │
│  │     [数据获取] ─────────────────────────────┐       │   │
│  │         ↓                                    ↓       │   │
│  │     [特征提取A]  [特征提取B]  [特征提取C]           │   │
│  │         ↓            ↓           ↓                  │   │
│  │     [特征融合] ─────→ [模型训练] → [评估]           │   │
│  │                           ↓                         │   │
│  │                       [部署]                        │   │
│  │                                                      │   │
│  │ 优势:特征并行提取、流程可复现                       │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  3. CI/CD 管道                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                                                      │   │
│  │     [代码拉取]                                       │   │
│  │         ↓                                            │   │
│  │     [单元测试] ←── 可并行                            │   │
│  │     [集成测试] ←── 可并行                            │   │
│  │     [代码扫描] ←── 可并行                            │   │
│  │         ↓                                            │   │
│  │     [构建镜像]                                       │   │
│  │         ↓                                            │   │
│  │     [部署测试环境] → [自动化测试]                    │   │
│  │         ↓                                            │   │
│  │     [部署生产环境]                                   │   │
│  │                                                      │   │
│  │ 优势:测试并行、流程标准化                           │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

4.3 不适用场景

场景原因推荐替代方案
动态决策任务无法预先确定执行顺序ReAct 模式
循环迭代任务DAG 无环特性限制状态机 + 循环
探索性任务流程不确定Agent 自主决策
实时交互任务需要即时响应事件驱动架构

五、局限性与优化

5.1 主要局限性

局限性具体表现影响
静态结构运行时无法改变 DAG灵活性受限
无循环不支持迭代任务某些场景不适用
复杂性大规模 DAG 难以维护可读性差
资源竞争并行任务资源冲突需要资源调度

5.2 优化策略

┌─────────────────────────────────────────────────────────────┐
│                    DAG 优化策略                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. 动态 DAG                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 问题:静态 DAG 无法适应动态变化                      │   │
│  │                                                      │   │
│  │ 优化方案:                                           │   │
│  │ • 条件分支:根据运行时结果选择分支                   │   │
│  │ • 动态生成:基于输入动态构建 DAG                     │   │
│  │ • 子图嵌套:将 DAG 作为节点嵌套                      │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  2. 资源调度                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 问题:并行任务资源竞争                               │   │
│  │                                                      │   │
│  │ 优化方案:                                           │   │
│  │ • 资源标签:为节点标记资源需求                       │   │
│  │ • 优先级队列:高优先级任务优先调度                   │   │
│  │ • 背压机制:资源不足时暂停调度                       │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  3. 容错处理                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 问题:单个节点失败影响整体                           │   │
│  │                                                      │   │
│  │ 优化方案:                                           │   │
│  │ • 检查点:定期保存执行状态                           │   │
│  │ • 断点续传:从失败点恢复执行                         │   │
│  │ • 补偿机制:执行补偿任务回滚                         │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  4. 可观测性                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │ 问题:大规模 DAG 难以监控                            │   │
│  │                                                      │   │
│  │ 优化方案:                                           │   │
│  │ • 可视化:生成 DAG 执行图                            │   │
│  │ • 指标收集:记录每个节点的执行指标                   │   │
│  │ • 日志聚合:集中收集和分析日志                       │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

5.3 与 Agent 结合

"""
DAG + Agent 混合架构
"""
 
class DAGAgent:
    """结合 Agent 的 DAG 执行"""
    
    def __init__(self, dag_template, agent):
        self.dag_template = dag_template
        self.agent = agent
    
    def run(self, task: str):
        # 1. Agent 分析任务,生成 DAG 配置
        config = self.agent.analyze(task)
        
        # 2. 动态构建 DAG
        dag = self._build_dag(config)
        
        # 3. 执行 DAG
        results = dag.execute()
        
        # 4. Agent 处理结果
        return self.agent.summarize(results)
    
    def _build_dag(self, config):
        """根据配置动态构建 DAG"""
        dag = DAG()
        
        for node_config in config["nodes"]:
            node = Node(
                id=node_config["id"],
                task=node_config["task"],
                dependencies=node_config.get("dependencies", [])
            )
            dag.add_node(node)
        
        return dag

六、面试常见问题

Q1: DAG 工作流与普通工作流有什么区别?

A:

对比维度DAG 工作流普通工作流
执行顺序拓扑排序固定顺序
并行能力强(自动识别)弱(需手动配置)
依赖管理自动管理手动处理
容错能力节点级重试流程级重试
可视化图形化展示线性展示

核心差异:DAG 工作流通过图结构自动管理依赖关系,实现最大化并行。

Q2: 如何检测 DAG 中的循环依赖?

A:

检测方法

def has_cycle(dag) -> bool:
    """使用 Kahn 算法检测环"""
    in_degree = dag.in_degree.copy()
    queue = deque([n for n, d in in_degree.items() if d == 0])
    visited = 0
    
    while queue:
        node = queue.popleft()
        visited += 1
        
        for neighbor in dag.edges[node]:
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)
    
    # 如果访问的节点数小于总节点数,说明有环
    return visited < len(dag.nodes)

Q3: 如何处理 DAG 节点执行失败?

A:

失败处理策略

策略适用场景实现方式
重试临时性错误自动重试 N 次
跳过非关键任务标记跳过,继续执行
终止关键任务失败停止整个工作流
补偿需要回滚执行补偿任务
降级有备选方案执行备选任务

Q4: DAG 工作流如何实现最大化并行?

A:

并行优化策略

  1. 层次并行:同一层的节点并行执行
  2. 资源感知:根据资源限制调整并行度
  3. 关键路径优化:优先调度关键路径上的任务
  4. 预取:提前准备下游任务所需数据
def get_max_parallelism(dag):
    """计算最大并行度"""
    layers = dag.get_layers()
    return max(len(layer) for layer in layers)

Q5: DAG 工作流在 Agent 系统中如何应用?

A:

应用场景

场景DAG 作用示例
多步骤任务任务编排数据分析流程
多工具协作工具调度搜索→分析→生成
并行处理提升效率并行信息收集
流程自动化标准化执行自动化测试

Q6: 主流的 DAG 工作流框架有哪些?

A:

框架语言特点适用场景
AirflowPython成熟稳定数据管道
PrefectPython现代化ML 工作流
LuigiPython轻量级数据处理
DagsterPython类型安全数据工程
TemporalGo分布式微服务编排
LangGraphPythonAI 原生Agent 工作流

七、总结

概念一句话总结面试关键词
DAG有向无环图,任务编排的数据结构无环、依赖管理
拓扑排序确定 DAG 执行顺序的算法层次化、并行识别
节点DAG 中的任务单元任务、状态管理
节点间的依赖关系依赖、流向
核心优势并行执行、依赖管理、容错效率、可靠性
适用场景数据管道、ML 工作流、CI/CD结构化流程

一句话总结:DAG 工作流通过有向无环图结构实现任务的并行执行和依赖管理,是构建大规模数据处理和自动化工作流的核心架构模式。


最后更新:2026年3月18日