工具组合与编排
复杂任务往往需要多个工具协作完成。工具组合与编排研究如何组织多个工具的执行顺序、处理依赖关系、支持并行执行,是构建复杂 Agent 系统的核心能力。
一、核心原理
1.1 为什么需要工具编排?
┌─────────────────────────────────────────────────────────────┐
│ 为什么需要工具编排 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 简单任务:单工具调用 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 用户: "北京天气怎么样?" │ │
│ │ │ │
│ │ 工具调用: get_weather(city="北京") │ │
│ │ │ │
│ │ 结果: "北京晴天,18°C" │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 复杂任务:多工具协作 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 用户: "分析这份销售报告,生成图表,并发送给团队" │ │
│ │ │ │
│ │ 需要的工具链: │ │
│ │ 1. read_file() - 读取销售报告 │ │
│ │ 2. analyze_data() - 分析数据 │ │
│ │ 3. generate_chart() - 生成图表 │ │
│ │ 4. send_email() - 发送给团队 │ │
│ │ │ │
│ │ 编排挑战: │ │
│ │ • 确定执行顺序 │ │
│ │ • 处理工具依赖 │ │
│ │ • 传递中间结果 │ │
│ │ • 处理执行失败 │ │
│ │ • 支持并行执行 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘1.2 工具编排模式
┌─────────────────────────────────────────────────────────────┐
│ 工具编排模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 顺序执行 (Sequential) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Tool1 → Tool2 → Tool3 → Tool4 │ │
│ │ │ │
│ │ 特点:简单,每个工具依赖前一个结果 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 并行执行 (Parallel) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ┌→ Tool1 →┐ │ │
│ │ Input → ├→ Tool2 →├→ Merge → Output │ │
│ │ └→ Tool3 →┘ │ │
│ │ │ │
│ │ 特点:高效,适用于无依赖的工具 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 条件分支 (Conditional) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ┌→ ToolA →┐ (条件A) │ │
│ │ Input → 条件 ├→ ToolB →┤ (条件B) → Output │ │
│ │ └→ ToolC →┘ (条件C) │ │
│ │ │ │
│ │ 特点:灵活,根据运行时条件选择路径 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 循环迭代 (Loop) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ┌────────────────┐ │ │
│ │ ↓ │ │ │
│ │ Input → Tool → 检查条件 →┘→ Output │ │
│ │ │ │
│ │ 特点:重复执行直到满足条件 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 5. DAG 编排 (Directed Acyclic Graph) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ┌→ ToolB →┐ │ │
│ │ ToolA →┤ ├→ ToolD → Output │ │
│ │ └→ ToolC →┘ │ │
│ │ │ │
│ │ 特点:复杂依赖关系,支持并行和顺序混合 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘二、DAG 工作流
2.1 DAG 概念
DAG(有向无环图) 是一种图结构,用于表示工具之间的依赖关系:
┌─────────────────────────────────────────────────────────────┐
│ DAG 示例 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 任务:数据分析报告生成 │
│ │
│ ┌──────────┐ │
│ │ 读取数据 │ (A) │
│ └────┬─────┘ │
│ │ │
│ ┌──────────────┼──────────────┐ │
│ ↓ ↓ ↓ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 数据清洗 │ │ 数据统计 │ │ 数据可视化│ (B,C,D) │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ↓ │
│ ┌──────────┐ │
│ │ 生成报告 │ (E) │
│ └────┬─────┘ │
│ │ │
│ ↓ │
│ ┌──────────┐ │
│ │ 发送邮件 │ (F) │
│ └──────────┘ │
│ │
│ 依赖关系: │
│ A → B, A → C, A → D (读取后可并行清洗/统计/可视化) │
│ B → E, C → E, D → E (报告生成需等待所有完成) │
│ E → F (邮件发送需等待报告) │
│ │
│ 拓扑排序执行顺序: │
│ A → (B, C, D 并行) → E → F │
│ │
└─────────────────────────────────────────────────────────────┘2.2 DAG 工作流实现
"""
DAG 工作流实现
支持依赖管理、并行执行、错误处理
"""
from typing import Dict, List, Callable, Any, Set
from dataclasses import dataclass, field
from enum import Enum
import asyncio
from collections import defaultdict
class NodeStatus(Enum):
"""节点状态"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class NodeResult:
"""节点执行结果"""
node_id: str
status: NodeStatus
output: Any = None
error: str = None
@dataclass
class Node:
"""DAG 节点"""
id: str
func: Callable
dependencies: List[str] = field(default_factory=list)
status: NodeStatus = NodeStatus.PENDING
result: NodeResult = None
class DAGWorkflow:
"""
DAG 工作流
功能:
- 定义节点和依赖关系
- 自动拓扑排序
- 并行执行无依赖节点
- 错误处理和重试
"""
def __init__(self, name: str = "workflow"):
self.name = name
self.nodes: Dict[str, Node] = {}
self.adjacency: Dict[str, List[str]] = defaultdict(list) # 邻接表
def add_node(
self,
node_id: str,
func: Callable,
dependencies: List[str] = None
):
"""
添加节点
Args:
node_id: 节点唯一标识
func: 执行函数
dependencies: 依赖的节点 ID 列表
"""
if node_id in self.nodes:
raise ValueError(f"节点 '{node_id}' 已存在")
self.nodes[node_id] = Node(
id=node_id,
func=func,
dependencies=dependencies or []
)
# 更新邻接表
for dep in (dependencies or []):
self.adjacency[dep].append(node_id)
def topological_sort(self) -> List[List[str]]:
"""
拓扑排序,返回分层执行顺序
Returns:
每一层包含可以并行执行的节点 ID
"""
in_degree = {node_id: 0 for node_id in self.nodes}
# 计算入度
for node_id, node in self.nodes.items():
for dep in node.dependencies:
if dep in in_degree:
in_degree[node_id] += 1
# 分层 BFS
layers = []
remaining = set(self.nodes.keys())
while remaining:
# 找出入度为 0 的节点
layer = [
node_id for node_id in remaining
if in_degree[node_id] == 0
]
if not layer:
# 存在环
raise ValueError("DAG 中存在循环依赖")
layers.append(layer)
# 移除当前层节点,更新入度
for node_id in layer:
remaining.remove(node_id)
for neighbor in self.adjacency[node_id]:
if neighbor in in_degree:
in_degree[neighbor] -= 1
return layers
async def run(self, initial_input: Any = None) -> Dict[str, NodeResult]:
"""
执行工作流
Args:
initial_input: 初始输入
Returns:
每个节点的执行结果
"""
results: Dict[str, NodeResult] = {}
context = {"input": initial_input}
# 获取执行层次
layers = self.topological_sort()
print(f"\n[工作流] {self.name} 开始执行")
print(f"[工作流] 共 {len(self.nodes)} 个节点,{len(layers)} 层")
for layer_idx, layer in enumerate(layers):
print(f"\n[第 {layer_idx + 1} 层] 并行执行: {layer}")
# 并行执行当前层的所有节点
tasks = []
for node_id in layer:
node = self.nodes[node_id]
# 收集依赖节点的输出
dep_outputs = {
dep: results[dep].output
for dep in node.dependencies
if dep in results
}
tasks.append(self._execute_node(node, dep_outputs, context))
# 等待当前层完成
layer_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for node_id, result in zip(layer, layer_results):
if isinstance(result, Exception):
results[node_id] = NodeResult(
node_id=node_id,
status=NodeStatus.FAILED,
error=str(result)
)
else:
results[node_id] = result
print(f"\n[工作流] 执行完成")
return results
async def _execute_node(
self,
node: Node,
dep_outputs: Dict[str, Any],
context: Dict
) -> NodeResult:
"""执行单个节点"""
node.status = NodeStatus.RUNNING
print(f" [执行] {node.id}")
try:
# 调用节点函数
if asyncio.iscoroutinefunction(node.func):
output = await node.func(dep_outputs, context)
else:
output = node.func(dep_outputs, context)
node.status = NodeStatus.SUCCESS
result = NodeResult(
node_id=node.id,
status=NodeStatus.SUCCESS,
output=output
)
print(f" [成功] {node.id}")
except Exception as e:
node.status = NodeStatus.FAILED
result = NodeResult(
node_id=node.id,
status=NodeStatus.FAILED,
error=str(e)
)
print(f" [失败] {node.id}: {e}")
node.result = result
return result
# 使用示例
async def main():
"""DAG 工作流示例"""
# 定义节点函数
def read_data(_, context):
print(" 读取销售数据...")
return {"sales": [100, 200, 150, 300, 250]}
def clean_data(dep_outputs, context):
print(" 清洗数据...")
data = dep_outputs.get("read", {})
return {"cleaned": data.get("sales", [])}
def analyze_data(dep_outputs, context):
print(" 分析数据...")
data = dep_outputs.get("read", {})
sales = data.get("sales", [])
return {
"total": sum(sales),
"average": sum(sales) / len(sales) if sales else 0
}
def visualize_data(dep_outputs, context):
print(" 生成图表...")
data = dep_outputs.get("read", {})
return {"chart": "sales_chart.png"}
def generate_report(dep_outputs, context):
print(" 生成报告...")
analysis = dep_outputs.get("analyze", {})
chart = dep_outputs.get("visualize", {})
return {
"report": f"销售报告: 总额{analysis.get('total')}, 图表{chart.get('chart')}"
}
def send_email(dep_outputs, context):
print(" 发送邮件...")
report = dep_outputs.get("report", {})
return {"sent": True, "report": report.get("report")}
# 创建工作流
workflow = DAGWorkflow("数据分析报告")
# 添加节点
workflow.add_node("read", read_data)
workflow.add_node("clean", clean_data, ["read"])
workflow.add_node("analyze", analyze_data, ["read"])
workflow.add_node("visualize", visualize_data, ["read"])
workflow.add_node("report", generate_report, ["analyze", "visualize"])
workflow.add_node("email", send_email, ["report"])
# 查看执行层次
print("执行层次:", workflow.topological_sort())
# 执行
results = await workflow.run()
# 输出结果
print("\n=== 执行结果 ===")
for node_id, result in results.items():
print(f"{node_id}: {result.status.value}")
if result.output:
print(f" 输出: {result.output}")
if __name__ == "__main__":
asyncio.run(main())三、工具链编排
3.1 工具链模式
┌─────────────────────────────────────────────────────────────┐
│ 工具链编排模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 线性链 (Linear Chain) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Input → Tool1 → Tool2 → Tool3 → Output │ │
│ │ │ │
│ │ 特点:简单直接,每个工具的输出是下一个的输入 │ │
│ │ │ │
│ │ 示例:翻译链 │ │
│ │ 中文 → 翻译工具 → 英文 → 润色工具 → 最终英文 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 分支链 (Branch Chain) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌→ 分支A → ToolA →┐ │ │
│ │ Input → 判断 → 合并 → Out │ │
│ │ └→ 分支B → ToolB →┘ │ │
│ │ │ │
│ │ 特点:根据条件选择不同的处理路径 │ │
│ │ │ │
│ │ 示例:客服分流 │ │
│ │ 用户问题 → 分类 → 技术问题/订单问题 → 对应工具 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 循环链 (Loop Chain) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────┐ │ │
│ │ ↓ │ │ │
│ │ Input → Tool → 检查条件 ─┘→ Output │ │
│ │ │ │
│ │ 特点:重复执行直到满足条件 │ │
│ │ │ │
│ │ 示例:代码调试 │ │
│ │ 代码 → 运行 → 有错误?→ 修复 → 再运行 → 成功 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘3.2 工具链实现
"""
工具链编排实现
"""
from typing import List, Callable, Any, Dict
from dataclasses import dataclass
from enum import Enum
class ChainNodeType(Enum):
"""节点类型"""
TOOL = "tool" # 工具节点
CONDITION = "condition" # 条件判断节点
MERGE = "merge" # 合并节点
LOOP = "loop" # 循环节点
@dataclass
class ChainNode:
"""工具链节点"""
id: str
type: ChainNodeType
func: Callable = None
condition: Callable = None # 条件函数
branches: Dict[str, str] = None # 分支映射
next_node: str = None
class ToolChain:
"""
工具链
支持线性、分支、循环等模式
"""
def __init__(self, name: str = "chain"):
self.name = name
self.nodes: Dict[str, ChainNode] = {}
self.start_node: str = None
def add_tool(
self,
node_id: str,
func: Callable,
next_node: str = None
):
"""添加工具节点"""
self.nodes[node_id] = ChainNode(
id=node_id,
type=ChainNodeType.TOOL,
func=func,
next_node=next_node
)
if self.start_node is None:
self.start_node = node_id
def add_condition(
self,
node_id: str,
condition: Callable,
branches: Dict[str, str], # {条件结果: 下一个节点ID}
default: str = None
):
"""添加条件分支节点"""
if default:
branches["default"] = default
self.nodes[node_id] = ChainNode(
id=node_id,
type=ChainNodeType.CONDITION,
condition=condition,
branches=branches
)
def add_loop(
self,
node_id: str,
func: Callable,
condition: Callable, # 返回 True 继续循环
max_iterations: int = 10
):
"""添加循环节点"""
self.nodes[node_id] = ChainNode(
id=node_id,
type=ChainNodeType.LOOP,
func=func,
condition=condition,
next_node=None # 循环结束后的下一个节点
)
def run(self, initial_input: Any = None) -> Dict[str, Any]:
"""执行工具链"""
context = {
"input": initial_input,
"current_output": initial_input,
"history": []
}
current_node_id = self.start_node
iteration_count = 0
max_iterations = 50 # 防止无限循环
print(f"\n[工具链] {self.name} 开始执行")
while current_node_id and iteration_count < max_iterations:
iteration_count += 1
if current_node_id not in self.nodes:
print(f"[警告] 节点 '{current_node_id}' 不存在")
break
node = self.nodes[current_node_id]
print(f"[节点] {node.id} ({node.type.value})")
if node.type == ChainNodeType.TOOL:
# 执行工具
output = node.func(context["current_output"], context)
context["current_output"] = output
context["history"].append({
"node": node.id,
"output": output
})
current_node_id = node.next_node
elif node.type == ChainNodeType.CONDITION:
# 条件判断
result = node.condition(context["current_output"], context)
branch_key = str(result) if str(result) in node.branches else "default"
current_node_id = node.branches.get(branch_key)
elif node.type == ChainNodeType.LOOP:
# 循环执行
loop_count = 0
while loop_count < 10: # 单个循环最大次数
output = node.func(context["current_output"], context)
context["current_output"] = output
if not node.condition(output, context):
break
loop_count += 1
current_node_id = node.next_node
print(f"\n[工具链] 执行完成")
return {
"output": context["current_output"],
"history": context["history"]
}
# 使用示例
if __name__ == "__main__":
# 创建工具链
chain = ToolChain("数据处理流水线")
# 定义工具函数
def load_data(input_data, context):
print(" 加载数据...")
return {"data": [1, 2, 3, 4, 5]}
def check_data(input_data, context):
print(" 检查数据...")
data = input_data.get("data", [])
return len(data) > 3
def process_normal(input_data, context):
print(" 正常处理...")
return {"result": "normal processed"}
def process_small(input_data, context):
print(" 小数据量处理...")
return {"result": "small processed"}
def save_result(input_data, context):
print(" 保存结果...")
return {"saved": True, "result": input_data}
# 添加节点
chain.add_tool("load", load_data, "check")
chain.add_condition("check", check_data, {
"True": "process_normal",
"False": "process_small"
})
chain.add_tool("process_normal", process_normal, "save")
chain.add_tool("process_small", process_small, "save")
chain.add_tool("save", save_result)
# 执行
result = chain.run()
print(f"\n最终结果: {result['output']}")四、LangGraph 工作流
4.1 LangGraph 简介
LangGraph 是 LangChain 推出的工作流编排框架,专门用于构建复杂的 Agent 系统:
┌─────────────────────────────────────────────────────────────┐
│ LangGraph 核心概念 │
├─────────────────────────────────────────────────────────────┤
│ │
│ StateGraph (状态图) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ • State: 共享状态,在节点间传递 │ │
│ │ • Node: 执行单元,接收状态,返回状态更新 │ │
│ │ • Edge: 节点之间的连接,可以是条件边 │ │
│ │ • Conditional Edge: 根据状态决定下一个节点 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 工作流示例: │
│ │
│ ┌───────┐ ┌───────┐ ┌───────┐ │
│ │ Start │ ──→ │ NodeA │ ──→ │ NodeB │ │
│ └───────┘ └───┬───┘ └───┬───┘ │
│ │ │ │
│ │ 条件 │ │
│ ↓ ↓ │
│ ┌───────┐ ┌───────┐ │
│ │ NodeC │ │ End │ │
│ └───────┘ └───────┘ │
│ │
└─────────────────────────────────────────────────────────────┘4.2 LangGraph 实现
"""
LangGraph 工作流实现
"""
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain.tools import tool
# 定义状态
class AgentState(TypedDict):
"""Agent 状态"""
messages: list
current_step: str
data: dict
# 定义工具
@tool
def search(query: str) -> str:
"""搜索信息"""
return f"搜索结果: {query}"
@tool
def analyze(data: str) -> str:
"""分析数据"""
return f"分析结果: {data}"
@tool
def generate_report(analysis: str) -> str:
"""生成报告"""
return f"报告: {analysis}"
def create_workflow():
"""创建工作流"""
# 初始化 LLM
llm = ChatOpenAI(model="gpt-4", temperature=0)
tools = [search, analyze, generate_report]
llm_with_tools = llm.bind_tools(tools)
# 定义节点函数
def agent_node(state: AgentState) -> AgentState:
"""Agent 节点"""
response = llm_with_tools.invoke(state["messages"])
return {
"messages": [response],
"current_step": "agent"
}
def search_node(state: AgentState) -> AgentState:
"""搜索节点"""
last_message = state["messages"][-1]
if last_message.tool_calls:
tool_call = last_message.tool_calls[0]
result = search.invoke(tool_call["args"])
return {
"messages": [{"role": "tool", "content": result}],
"data": {"search_result": result}
}
return state
def analyze_node(state: AgentState) -> AgentState:
"""分析节点"""
# 分析逻辑
return state
def should_continue(state: AgentState) -> str:
"""判断是否继续"""
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
# 创建图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("agent", agent_node)
workflow.add_node("tools", ToolNode(tools))
# 设置入口
workflow.set_entry_point("agent")
# 添加条件边
workflow.add_conditional_edges(
"agent",
should_continue,
{
"tools": "tools",
END: END
}
)
# 添加普通边
workflow.add_edge("tools", "agent")
return workflow.compile()
# 使用示例
if __name__ == "__main__":
graph = create_workflow()
result = graph.invoke({
"messages": [{"role": "user", "content": "搜索 Python 教程"}],
"current_step": "start",
"data": {}
})
print(result["messages"][-1])五、错误处理与重试
5.1 错误处理策略
┌─────────────────────────────────────────────────────────────┐
│ 错误处理策略 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 重试策略 (Retry) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 对临时性错误(网络超时、服务繁忙)自动重试: │ │
│ │ │ │
│ │ • 固定间隔重试 │ │
│ │ • 指数退避重试 │ │
│ │ • 最大重试次数限制 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 降级策略 (Fallback) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 主工具失败时使用备选方案: │ │
│ │ │ │
│ │ 主工具 → 失败 → 备选工具A → 失败 → 备选工具B │ │
│ │ │ │
│ │ 示例: │ │
│ │ 天气API → 失败 → 搜索引擎 → 失败 → LLM知识库 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 补偿策略 (Compensation) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 某个步骤失败后,撤销之前已完成的操作: │ │
│ │ │ │
│ │ Step1(创建订单) → Step2(扣款) → 失败 │ │
│ │ ↓ │ │
│ │ 补偿: 取消订单 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 跳过策略 (Skip) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 非关键步骤失败时跳过继续执行: │ │
│ │ │ │
│ │ Step1 → Step2(失败,跳过) → Step3 → ... │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘5.2 重试机制实现
"""
工具执行重试机制
"""
import asyncio
from typing import Callable, Any, Type, Tuple
from functools import wraps
from dataclasses import dataclass
@dataclass
class RetryConfig:
"""重试配置"""
max_retries: int = 3
base_delay: float = 1.0 # 基础延迟(秒)
max_delay: float = 60.0 # 最大延迟
exponential_base: float = 2.0 # 指数退避基数
retryable_exceptions: Tuple[Type[Exception], ...] = (
TimeoutError,
ConnectionError,
)
def with_retry(config: RetryConfig = None):
"""
重试装饰器
Args:
config: 重试配置
Returns:
装饰后的函数
"""
config = config or RetryConfig()
def decorator(func: Callable):
@wraps(func)
async def async_wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt < config.max_retries:
# 计算延迟(指数退避)
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
print(f"[重试] {func.__name__} 第 {attempt + 1} 次失败,"
f"{delay}秒后重试: {e}")
await asyncio.sleep(delay)
else:
print(f"[重试] {func.__name__} 达到最大重试次数")
raise last_exception
@wraps(func)
def sync_wrapper(*args, **kwargs):
last_exception = None
for attempt in range(config.max_retries + 1):
try:
return func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt < config.max_retries:
delay = min(
config.base_delay * (config.exponential_base ** attempt),
config.max_delay
)
print(f"[重试] {func.__name__} 第 {attempt + 1} 次失败,"
f"{delay}秒后重试: {e}")
import time
time.sleep(delay)
raise last_exception
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
# 使用示例
@with_retry(RetryConfig(
max_retries=3,
base_delay=1.0,
exponential_base=2.0
))
async def call_api(url: str) -> dict:
"""调用 API(模拟可能失败)"""
import random
# 模拟 70% 失败率
if random.random() < 0.7:
raise TimeoutError("API 请求超时")
return {"status": "success", "url": url}
if __name__ == "__main__":
result = asyncio.run(call_api("https://api.example.com/data"))
print(result)六、面试高频问题
Q1: 什么是 DAG?为什么用于工具编排?
答案要点:
- 定义:有向无环图,节点表示任务,边表示依赖关系
- 优势:
- 明确表达依赖关系
- 支持并行执行无依赖任务
- 自动拓扑排序确定执行顺序
- 易于可视化和管理
Q2: 如何处理工具编排中的错误?
答案要点:
| 策略 | 适用场景 | 实现方式 |
|---|---|---|
| 重试 | 临时性错误 | 指数退避、最大次数限制 |
| 降级 | 主工具不可用 | 备选工具链 |
| 补偿 | 已执行的需撤销 | Saga 模式 |
| 跳过 | 非关键步骤 | 标记可选节点 |
Q3: 顺序执行和并行执行如何选择?
答案要点:
- 顺序执行:工具间有依赖关系,后一个依赖前一个的输出
- 并行执行:工具间无依赖,可同时执行
- 混合执行:DAG 编排,有依赖的顺序执行,无依赖的并行执行
Q4: LangGraph 和传统工作流引擎有什么区别?
答案要点:
| 方面 | LangGraph | 传统引擎 |
|---|---|---|
| 设计目标 | Agent 工作流 | 企业流程 |
| 状态管理 | 图级别共享状态 | 流程变量 |
| LLM 集成 | 原生支持 | 需要适配 |
| 灵活性 | 高(代码定义) | 中(配置定义) |
Q5: 如何设计一个高可用的工具编排系统?
答案要点:
- 幂等设计:工具可安全重试
- 状态持久化:断点续执行
- 超时控制:防止无限等待
- 熔断机制:故障时快速失败
- 监控告警:实时监控执行状态
七、小结
| 概念 | 一句话总结 |
|---|---|
| 工具编排 | 组织多个工具的执行顺序和依赖关系 |
| DAG | 有向无环图,支持并行和依赖管理 |
| 工具链 | 线性、分支、循环等编排模式 |
| LangGraph | LangChain 的 Agent 工作流框架 |
| 错误处理 | 重试、降级、补偿、跳过策略 |
一句话总结:工具编排是构建复杂 Agent 系统的核心能力,需要掌握 DAG、工具链、错误处理等关键技术。
最后更新:2026年3月18日