Agent 通信协议
Agent 通信协议是多个 Agent 协作的基础设施,定义了消息格式、通信模式和交互规则。设计良好的通信协议可以降低通信开销、提高协作效率、增强系统可扩展性。
一、核心原理
1.1 通信协议的设计目标
┌─────────────────────────────────────────────────────────────┐
│ 通信协议设计目标 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 标准化 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 统一的消息格式,所有 Agent 遵循相同规范 │ │
│ │ │ │
│ │ ❌ 不标准:每个 Agent 自定义消息格式 │ │
│ │ ✅ 标准化:统一的 Message 结构 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 可扩展 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 支持新增消息类型、字段和功能 │ │
│ │ │ │
│ │ • 元数据扩展 │ │
│ │ • 自定义消息类型 │ │
│ │ • 版本兼容 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 高效性 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 最小化通信开销,减少不必要的数据传输 │ │
│ │ │ │
│ │ • 消息压缩 │ │
│ │ • 增量传输 │ │
│ │ • 批量处理 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 可靠性 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 保证消息的可靠传递,处理异常情况 │ │
│ │ │ │
│ │ • 消息确认机制 │ │
│ │ • 重试策略 │ │
│ │ • 错误处理 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘1.2 消息格式标准
| 字段 | 类型 | 必需 | 描述 |
|---|---|---|---|
sender | string | 是 | 发送者 Agent ID |
receiver | string | 是 | 接收者 Agent ID 或 "broadcast" |
message_type | string | 是 | 消息类型 |
content | any | 是 | 消息内容 |
timestamp | datetime | 是 | 消息时间戳 |
message_id | string | 是 | 消息唯一标识 |
correlation_id | string | 否 | 关联消息 ID(用于请求-响应) |
priority | int | 否 | 消息优先级 |
metadata | dict | 否 | 扩展元数据 |
1.3 消息类型分类
┌─────────────────────────────────────────────────────────────┐
│ 消息类型分类 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. 控制类消息(Control Messages) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • REGISTER: Agent 注册 │ │
│ │ • HEARTBEAT: 心跳检测 │ │
│ │ • STATUS: 状态查询 │ │
│ │ • SHUTDOWN: 关闭指令 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 2. 任务类消息(Task Messages) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • TASK_ASSIGN: 任务分配 │ │
│ │ • TASK_STATUS: 任务状态更新 │ │
│ │ • TASK_RESULT: 任务结果 │ │
│ │ • TASK_CANCEL: 任务取消 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 3. 数据类消息(Data Messages) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • DATA_REQUEST: 数据请求 │ │
│ │ • DATA_RESPONSE: 数据响应 │ │
│ │ • DATA_UPDATE: 数据更新 │ │
│ │ • DATA_SYNC: 数据同步 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 4. 协作类消息(Collaboration Messages) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • COLLAB_REQUEST: 协作请求 │ │
│ │ • COLLAB_RESPONSE: 协作响应 │ │
│ │ • COLLAB_NOTIFY: 协作通知 │ │
│ │ • COLLAB_VOTE: 协作投票 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 5. 错误类消息(Error Messages) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ • ERROR_REPORT: 错误报告 │ │
│ │ • ERROR_RECOVERY: 错误恢复 │ │
│ │ • TIMEOUT: 超时通知 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘二、通信模式
2.1 直接通信模式
┌─────────────────────────────────────────────────────────────┐
│ 直接通信模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Point-to-Point(点对点) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Agent A ──────────────────────→ Agent B │ │
│ │ (direct message) │ │
│ │ │ │
│ │ 特点: │ │
│ │ • 目标明确,效率高 │ │
│ │ • 适合私有通信 │ │
│ │ • 不适合大规模协作 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Request-Response(请求-响应) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Agent A Agent B │ │
│ │ │ │ │ │
│ │ │──── Request ────────────→│ │ │
│ │ │ │ │ │
│ │ │←─── Response ────────────│ │ │
│ │ │ │ │ │
│ │ 特点: │ │
│ │ • 同步交互模式 │ │
│ │ • 需要等待响应 │ │
│ │ • 使用 correlation_id 关联请求响应 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 示例代码: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ message = Message( │ │
│ │ sender="agent_a", │ │
│ │ receiver="agent_b", │ │
│ │ message_type="DATA_REQUEST", │ │
│ │ content={"query": "get_user_data"}, │ │
│ │ correlation_id="req-001" │ │
│ │ ) │ │
│ │ │ │
│ │ # 发送并等待响应 │ │
│ │ response = await bus.send_and_wait(message) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘2.2 广播通信模式
┌─────────────────────────────────────────────────────────────┐
│ 广播通信模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Broadcast(广播) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Agent B │ │
│ │ ↑ │ │
│ │ │ │ │
│ │ Agent A ────────→ Bus ────────→ Agent C │ │
│ │ │ │ │
│ │ ↓ │ │
│ │ Agent D │ │
│ │ │ │
│ │ 特点: │ │
│ │ • 一对多通信 │ │
│ │ • 所有订阅者收到消息 │ │
│ │ • 适合状态通知、事件广播 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Pub-Sub(发布订阅) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Publisher A ────┐ │ │
│ │ │ │ │
│ │ Publisher B ────┼──→ Topic ───→ Subscriber X │ │
│ │ │ ↓ │ │
│ │ Publisher C ────┘ Subscriber Y │ │
│ │ │ │
│ │ 特点: │ │
│ │ • 解耦发布者和订阅者 │ │
│ │ • 支持主题过滤 │ │
│ │ • 适合事件驱动架构 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 示例代码: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ # 订阅主题 │ │
│ │ bus.subscribe("task_events", handler=on_task_event) │ │
│ │ │ │
│ │ # 发布消息 │ │
│ │ bus.publish("task_events", { │ │
│ │ "event": "task_completed", │ │
│ │ "task_id": "task-001" │ │
│ │ }) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘2.3 共享记忆模式
┌─────────────────────────────────────────────────────────────┐
│ 共享记忆模式 │
├─────────────────────────────────────────────────────────────┤
│ │
│ Shared Memory(共享记忆) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ Agent A ──┐ │ │
│ │ │ │ │
│ │ Agent B ──┼──→ Shared Knowledge Base ──→ Read │ │
│ │ │ (共享知识库) │ │
│ │ Agent C ──┘ │ │
│ │ │ │
│ │ 特点: │ │
│ │ • 无需直接通信 │ │
│ │ • 信息持久化 │ │
│ │ • 适合知识共享场景 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Blackboard(黑板模式) │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────┐ │ │
│ │ │ Blackboard │ │ │
│ │ │ (公共黑板) │ │ │
│ │ │ │ │ │
│ │ │ ┌───────────────┐ │ │ │
│ │ │ │ Task Status │ │ │ │
│ │ │ │ Progress: 60% │ │ │ │
│ │ │ │ Current: B │ │ │ │
│ │ │ └───────────────┘ │ │ │
│ │ └──────────┬──────────┘ │ │
│ │ │ │ │
│ │ ┌───────────────┼───────────────┐ │ │
│ │ ↓ ↓ ↓ │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Agent A │ │ Agent B │ │ Agent C │ │ │
│ │ │ (读取) │ │ (写入) │ │ (读取) │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ │ │ │
│ │ 特点: │ │
│ │ • 中央信息存储 │ │
│ │ • 支持读写分离 │ │
│ │ • 实时状态同步 │ │
│ │ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ 示例代码: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ class Blackboard: │ │
│ │ def __init__(self): │ │
│ │ self.data = {} │ │
│ │ self.subscribers = {} │ │
│ │ │ │
│ │ def write(self, key, value): │ │
│ │ self.data[key] = value │ │
│ │ self._notify(key, value) │ │
│ │ │ │
│ │ def read(self, key): │ │
│ │ return self.data.get(key) │ │
│ │ │ │
│ │ def subscribe(self, key, callback): │ │
│ │ self.subscribers[key].append(callback) │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘三、代码实现
3.1 标准消息类
"""
Agent 通信协议 - 标准消息类实现
"""
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from datetime import datetime
import uuid
import json
@dataclass
class Message:
"""标准消息格式"""
# 必需字段
sender: str # 发送者 ID
receiver: str # 接收者 ID 或 "broadcast"
message_type: str # 消息类型
content: Any # 消息内容
# 自动生成字段
message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
# 可选字段
correlation_id: Optional[str] = None # 关联消息 ID
priority: int = 0 # 优先级(0=普通,1=高,2=紧急)
ttl: Optional[int] = None # 消息有效期(秒)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"sender": self.sender,
"receiver": self.receiver,
"message_type": self.message_type,
"content": self.content,
"message_id": self.message_id,
"timestamp": self.timestamp.isoformat(),
"correlation_id": self.correlation_id,
"priority": self.priority,
"ttl": self.ttl,
"metadata": self.metadata,
}
def to_json(self) -> str:
"""转换为 JSON"""
return json.dumps(self.to_dict())
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Message":
"""从字典创建"""
data["timestamp"] = datetime.fromisoformat(data["timestamp"])
return cls(**data)
@classmethod
def from_json(cls, json_str: str) -> "Message":
"""从 JSON 创建"""
return cls.from_dict(json.loads(json_str))
# 消息类型常量
class MessageType:
"""消息类型定义"""
# 控制类
REGISTER = "REGISTER"
HEARTBEAT = "HEARTBEAT"
STATUS = "STATUS"
SHUTDOWN = "SHUTDOWN"
# 任务类
TASK_ASSIGN = "TASK_ASSIGN"
TASK_STATUS = "TASK_STATUS"
TASK_RESULT = "TASK_RESULT"
TASK_CANCEL = "TASK_CANCEL"
# 数据类
DATA_REQUEST = "DATA_REQUEST"
DATA_RESPONSE = "DATA_RESPONSE"
DATA_UPDATE = "DATA_UPDATE"
# 协作类
COLLAB_REQUEST = "COLLAB_REQUEST"
COLLAB_RESPONSE = "COLLAB_RESPONSE"
COLLAB_NOTIFY = "COLLAB_NOTIFY"
COLLAB_VOTE = "COLLAB_VOTE"
# 错误类
ERROR_REPORT = "ERROR_REPORT"
TIMEOUT = "TIMEOUT"3.2 通信总线实现
"""
Agent 通信协议 - 通信总线实现
"""
from abc import ABC, abstractmethod
from typing import Dict, List, Callable, Optional, Any
from collections import defaultdict
import asyncio
from queue import Queue
import threading
class CommunicationBus(ABC):
"""通信总线抽象基类"""
@abstractmethod
def register(self, agent_id: str) -> None:
"""注册 Agent"""
pass
@abstractmethod
def send(self, message: Message) -> bool:
"""发送消息"""
pass
@abstractmethod
def receive(self, agent_id: str) -> Optional[Message]:
"""接收消息"""
pass
@abstractmethod
def broadcast(self, message: Message) -> bool:
"""广播消息"""
pass
class InMemoryBus(CommunicationBus):
"""内存通信总线(单机模式)"""
def __init__(self):
self.mailboxes: Dict[str, Queue] = {}
self.subscriptions: Dict[str, List[str]] = defaultdict(list) # topic -> [agent_ids]
self.lock = threading.Lock()
def register(self, agent_id: str) -> None:
"""注册 Agent,创建邮箱"""
with self.lock:
if agent_id not in self.mailboxes:
self.mailboxes[agent_id] = Queue()
def subscribe(self, agent_id: str, topic: str) -> None:
"""订阅主题"""
with self.lock:
if agent_id not in self.subscriptions[topic]:
self.subscriptions[topic].append(agent_id)
def send(self, message: Message) -> bool:
"""发送消息到指定 Agent"""
with self.lock:
if message.receiver not in self.mailboxes:
return False
self.mailboxes[message.receiver].put(message)
return True
def receive(self, agent_id: str) -> Optional[Message]:
"""接收消息"""
with self.lock:
if agent_id not in self.mailboxes:
return None
if self.mailboxes[agent_id].empty():
return None
return self.mailboxes[agent_id].get()
def broadcast(self, message: Message) -> bool:
"""广播消息"""
with self.lock:
# 如果有订阅主题,只发送给订阅者
if message.message_type in self.subscriptions:
for agent_id in self.subscriptions[message.message_type]:
if agent_id != message.sender:
self.mailboxes[agent_id].put(message)
else:
# 否则发送给所有 Agent
for agent_id, mailbox in self.mailboxes.items():
if agent_id != message.sender:
mailbox.put(message)
return True
def publish(self, topic: str, content: Any, sender: str) -> bool:
"""发布主题消息"""
message = Message(
sender=sender,
receiver="broadcast",
message_type=topic,
content=content,
)
return self.broadcast(message)
class AsyncBus(CommunicationBus):
"""异步通信总线"""
def __init__(self):
self.mailboxes: Dict[str, asyncio.Queue] = {}
self.subscriptions: Dict[str, List[str]] = defaultdict(list)
async def register(self, agent_id: str) -> None:
"""注册 Agent"""
if agent_id not in self.mailboxes:
self.mailboxes[agent_id] = asyncio.Queue()
async def send(self, message: Message) -> bool:
"""发送消息"""
if message.receiver not in self.mailboxes:
return False
await self.mailboxes[message.receiver].put(message)
return True
async def receive(self, agent_id: str, timeout: float = 1.0) -> Optional[Message]:
"""接收消息"""
if agent_id not in self.mailboxes:
return None
try:
return await asyncio.wait_for(
self.mailboxes[agent_id].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
async def broadcast(self, message: Message) -> bool:
"""广播消息"""
for agent_id, mailbox in self.mailboxes.items():
if agent_id != message.sender:
await mailbox.put(message)
return True
async def send_and_wait(
self,
message: Message,
timeout: float = 30.0
) -> Optional[Message]:
"""发送消息并等待响应"""
await self.send(message)
# 等待响应
start_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() - start_time < timeout:
msg = await self.receive(message.sender, timeout=0.1)
if msg and msg.correlation_id == message.message_id:
return msg
return None3.3 黑板模式实现
"""
Agent 通信协议 - 黑板模式实现
"""
from typing import Dict, Any, List, Callable, Optional
from dataclasses import dataclass
from datetime import datetime
import threading
@dataclass
class BlackboardEntry:
"""黑板条目"""
key: str
value: Any
writer: str
timestamp: datetime
version: int = 1
class Blackboard:
"""
黑板模式实现
提供共享信息存储和变更通知
"""
def __init__(self):
self.data: Dict[str, BlackboardEntry] = {}
self.subscribers: Dict[str, List[Callable]] = {}
self.lock = threading.Lock()
def write(self, key: str, value: Any, writer: str) -> None:
"""
写入黑板
Args:
key: 键名
value: 值
writer: 写入者 Agent ID
"""
with self.lock:
version = 1
if key in self.data:
version = self.data[key].version + 1
self.data[key] = BlackboardEntry(
key=key,
value=value,
writer=writer,
timestamp=datetime.now(),
version=version,
)
# 通知订阅者
self._notify(key, value, writer)
def read(self, key: str) -> Optional[Any]:
"""读取黑板内容"""
with self.lock:
entry = self.data.get(key)
return entry.value if entry else None
def read_entry(self, key: str) -> Optional[BlackboardEntry]:
"""读取完整条目"""
with self.lock:
return self.data.get(key)
def delete(self, key: str, writer: str) -> bool:
"""删除黑板内容"""
with self.lock:
if key in self.data:
del self.data[key]
self._notify(key, None, writer)
return True
return False
def subscribe(self, key: str, callback: Callable[[str, Any, str], None]) -> None:
"""
订阅黑板变更
Args:
key: 订阅的键名
callback: 回调函数 (key, value, writer)
"""
with self.lock:
if key not in self.subscribers:
self.subscribers[key] = []
self.subscribers[key].append(callback)
def subscribe_all(self, callback: Callable[[str, Any, str], None]) -> None:
"""订阅所有变更"""
self.subscribe("*", callback)
def _notify(self, key: str, value: Any, writer: str) -> None:
"""通知订阅者"""
# 通知特定键的订阅者
if key in self.subscribers:
for callback in self.subscribers[key]:
callback(key, value, writer)
# 通知通配符订阅者
if "*" in self.subscribers:
for callback in self.subscribers["*"]:
callback(key, value, writer)
def get_all(self) -> Dict[str, Any]:
"""获取所有内容"""
with self.lock:
return {k: v.value for k, v in self.data.items()}
def clear(self) -> None:
"""清空黑板"""
with self.lock:
self.data.clear()
# 使用示例
def example_blackboard():
"""黑板模式使用示例"""
blackboard = Blackboard()
# 定义回调
def on_change(key, value, writer):
print(f"[通知] {writer} 更新了 {key}: {value}")
# 订阅变更
blackboard.subscribe("task_status", on_change)
blackboard.subscribe_all(lambda k, v, w: print(f"[全局] {w} 修改了 {k}"))
# Agent 写入
blackboard.write("task_status", "进行中", "agent_a")
blackboard.write("progress", 60, "agent_b")
# Agent 读取
status = blackboard.read("task_status")
print(f"当前状态: {status}")四、通信优化
4.1 消息压缩
"""
消息压缩优化
"""
import gzip
import json
from typing import Any
class CompressedMessage:
"""压缩消息"""
@staticmethod
def compress(data: Dict[str, Any]) -> bytes:
"""压缩消息"""
json_str = json.dumps(data)
return gzip.compress(json_str.encode('utf-8'))
@staticmethod
def decompress(data: bytes) -> Dict[str, Any]:
"""解压消息"""
json_str = gzip.decompress(data).decode('utf-8')
return json.loads(json_str)
class MessageCompressor:
"""消息压缩器"""
def __init__(self, threshold: int = 1024):
"""
Args:
threshold: 压缩阈值(字节)
"""
self.threshold = threshold
def should_compress(self, message: Message) -> bool:
"""判断是否需要压缩"""
size = len(message.to_json().encode('utf-8'))
return size > self.threshold
def compress_if_needed(self, message: Message) -> Any:
"""按需压缩"""
data = message.to_dict()
if self.should_compress(message):
return ("compressed", CompressedMessage.compress(data))
return ("raw", data)4.2 消息过滤
"""
消息过滤优化
"""
from typing import Callable, List
class MessageFilter:
"""消息过滤器"""
def __init__(self):
self.filters: List[Callable[[Message], bool]] = []
def add_filter(self, filter_func: Callable[[Message], bool]) -> None:
"""添加过滤器"""
self.filters.append(filter_func)
def filter(self, messages: List[Message]) -> List[Message]:
"""过滤消息"""
result = messages
for f in self.filters:
result = [m for m in result if f(m)]
return result
# 预定义过滤器
def filter_by_type(message_type: str) -> Callable:
"""按消息类型过滤"""
return lambda m: m.message_type == message_type
def filter_by_sender(sender: str) -> Callable:
"""按发送者过滤"""
return lambda m: m.sender == sender
def filter_by_priority(min_priority: int) -> Callable:
"""按优先级过滤"""
return lambda m: m.priority >= min_priority五、适用场景
5.1 通信模式选择
| 场景 | 推荐模式 | 理由 |
|---|---|---|
| 私有对话 | 点对点通信 | 信息隔离,效率高 |
| 状态通知 | 广播模式 | 一对多,实时性 |
| 事件驱动 | Pub-Sub | 解耦,可扩展 |
| 知识共享 | 共享记忆 | 持久化,无需通信 |
| 协作决策 | 黑板模式 | 实时同步,可见性 |
5.2 性能优化建议
| 优化方向 | 方法 | 效果 |
|---|---|---|
| 减少消息量 | 压缩、增量传输 | 降低带宽 |
| 减少通信次数 | 批量处理、缓存 | 降低延迟 |
| 提高可靠性 | 确认机制、重试 | 提高成功率 |
六、面试问答
Q1: 设计 Agent 通信协议需要考虑哪些因素?
回答:
- 消息格式标准化:统一的字段定义
- 通信模式选择:点对点、广播、发布订阅
- 可靠性保证:确认、重试、错误处理
- 性能优化:压缩、过滤、批量处理
- 可扩展性:支持新消息类型和字段
Q2: 广播模式和发布订阅有什么区别?
回答:
| 对比项 | 广播模式 | 发布订阅 |
|---|---|---|
| 接收者 | 所有 Agent | 订阅者 |
| 过滤 | 无 | 主题过滤 |
| 解耦程度 | 低 | 高 |
| 适用场景 | 状态通知 | 事件驱动 |
Q3: 黑板模式的优缺点是什么?
回答:
- 优点:无需直接通信、信息持久化、易于审计
- 缺点:中央存储可能成为瓶颈、需要同步机制
Q4: 如何优化多 Agent 系统的通信开销?
回答:
- 消息压缩和增量传输
- 智能路由和消息过滤
- 批量处理和异步通信
- 本地缓存减少重复请求
七、小结
| 概念 | 一句话总结 | 面试关键词 |
|---|---|---|
| 消息格式 | 标准化的消息结构 | sender、receiver、type、content |
| 通信模式 | 点对点、广播、发布订阅 | Pub-Sub、Blackboard |
| 优化策略 | 压缩、过滤、批量处理 | 性能优化、可靠性 |
| 模式选择 | 根据场景选择合适模式 | 场景匹配、权衡取舍 |
一句话总结:Agent 通信协议是协作的基础设施,需要平衡标准化、灵活性、性能和可靠性,根据具体场景选择合适的通信模式和优化策略。
最后更新:2026年3月19日