知识模块
🤖 Agent 知识模块
十、多 Agent 协作
Agent 通信协议

Agent 通信协议

Agent 通信协议是多个 Agent 协作的基础设施,定义了消息格式、通信模式和交互规则。设计良好的通信协议可以降低通信开销、提高协作效率、增强系统可扩展性。


一、核心原理

1.1 通信协议的设计目标

┌─────────────────────────────────────────────────────────────┐
│                    通信协议设计目标                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   1. 标准化                                                 │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ 统一的消息格式,所有 Agent 遵循相同规范             │  │
│   │                                                      │  │
│   │ ❌ 不标准:每个 Agent 自定义消息格式                │  │
│   │ ✅ 标准化:统一的 Message 结构                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   2. 可扩展                                                 │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ 支持新增消息类型、字段和功能                        │  │
│   │                                                      │  │
│   │ • 元数据扩展                                        │  │
│   │ • 自定义消息类型                                    │  │
│   │ • 版本兼容                                          │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   3. 高效性                                                 │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ 最小化通信开销,减少不必要的数据传输                │  │
│   │                                                      │  │
│   │ • 消息压缩                                          │  │
│   │ • 增量传输                                          │  │
│   │ • 批量处理                                          │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   4. 可靠性                                                 │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ 保证消息的可靠传递,处理异常情况                    │  │
│   │                                                      │  │
│   │ • 消息确认机制                                      │  │
│   │ • 重试策略                                          │  │
│   │ • 错误处理                                          │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.2 消息格式标准

字段类型必需描述
senderstring发送者 Agent ID
receiverstring接收者 Agent ID 或 "broadcast"
message_typestring消息类型
contentany消息内容
timestampdatetime消息时间戳
message_idstring消息唯一标识
correlation_idstring关联消息 ID(用于请求-响应)
priorityint消息优先级
metadatadict扩展元数据

1.3 消息类型分类

┌─────────────────────────────────────────────────────────────┐
│                    消息类型分类                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   1. 控制类消息(Control Messages)                         │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ • REGISTER: Agent 注册                              │  │
│   │ • HEARTBEAT: 心跳检测                               │  │
│   │ • STATUS: 状态查询                                  │  │
│   │ • SHUTDOWN: 关闭指令                                │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   2. 任务类消息(Task Messages)                            │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ • TASK_ASSIGN: 任务分配                             │  │
│   │ • TASK_STATUS: 任务状态更新                         │  │
│   │ • TASK_RESULT: 任务结果                             │  │
│   │ • TASK_CANCEL: 任务取消                             │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   3. 数据类消息(Data Messages)                            │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ • DATA_REQUEST: 数据请求                            │  │
│   │ • DATA_RESPONSE: 数据响应                           │  │
│   │ • DATA_UPDATE: 数据更新                             │  │
│   │ • DATA_SYNC: 数据同步                               │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   4. 协作类消息(Collaboration Messages)                   │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ • COLLAB_REQUEST: 协作请求                          │  │
│   │ • COLLAB_RESPONSE: 协作响应                         │  │
│   │ • COLLAB_NOTIFY: 协作通知                           │  │
│   │ • COLLAB_VOTE: 协作投票                             │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   5. 错误类消息(Error Messages)                           │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ • ERROR_REPORT: 错误报告                            │  │
│   │ • ERROR_RECOVERY: 错误恢复                          │  │
│   │ • TIMEOUT: 超时通知                                 │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

二、通信模式

2.1 直接通信模式

┌─────────────────────────────────────────────────────────────┐
│                    直接通信模式                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Point-to-Point(点对点)                                  │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                                                      │  │
│   │   Agent A ──────────────────────→ Agent B           │  │
│   │            (direct message)                          │  │
│   │                                                      │  │
│   │   特点:                                            │  │
│   │   • 目标明确,效率高                                │  │
│   │   • 适合私有通信                                    │  │
│   │   • 不适合大规模协作                                │  │
│   │                                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   Request-Response(请求-响应)                             │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                                                      │  │
│   │   Agent A                    Agent B                │  │
│   │      │                          │                   │  │
│   │      │──── Request ────────────→│                   │  │
│   │      │                          │                   │  │
│   │      │←─── Response ────────────│                   │  │
│   │      │                          │                   │  │
│   │   特点:                                            │  │
│   │   • 同步交互模式                                    │  │
│   │   • 需要等待响应                                    │  │
│   │   • 使用 correlation_id 关联请求响应               │  │
│   │                                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   示例代码:                                                │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ message = Message(                                  │  │
│   │     sender="agent_a",                               │  │
│   │     receiver="agent_b",                             │  │
│   │     message_type="DATA_REQUEST",                    │  │
│   │     content={"query": "get_user_data"},            │  │
│   │     correlation_id="req-001"                        │  │
│   │ )                                                   │  │
│   │                                                      │  │
│   │ # 发送并等待响应                                    │  │
│   │ response = await bus.send_and_wait(message)        │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2.2 广播通信模式

┌─────────────────────────────────────────────────────────────┐
│                    广播通信模式                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Broadcast(广播)                                         │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                                                      │  │
│   │                    Agent B                          │  │
│   │                       ↑                             │  │
│   │                       │                             │  │
│   │   Agent A ────────→ Bus ────────→ Agent C          │  │
│   │                       │                             │  │
│   │                       ↓                             │  │
│   │                    Agent D                          │  │
│   │                                                      │  │
│   │   特点:                                            │  │
│   │   • 一对多通信                                      │  │
│   │   • 所有订阅者收到消息                              │  │
│   │   • 适合状态通知、事件广播                          │  │
│   │                                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   Pub-Sub(发布订阅)                                       │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                                                      │  │
│   │   Publisher A ────┐                                 │  │
│   │                    │                                 │  │
│   │   Publisher B ────┼──→ Topic ───→ Subscriber X     │  │
│   │                    │              ↓                 │  │
│   │   Publisher C ────┘          Subscriber Y          │  │
│   │                                                      │  │
│   │   特点:                                            │  │
│   │   • 解耦发布者和订阅者                              │  │
│   │   • 支持主题过滤                                    │  │
│   │   • 适合事件驱动架构                                │  │
│   │                                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   示例代码:                                                │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ # 订阅主题                                          │  │
│   │ bus.subscribe("task_events", handler=on_task_event) │  │
│   │                                                      │  │
│   │ # 发布消息                                          │  │
│   │ bus.publish("task_events", {                        │  │
│   │     "event": "task_completed",                      │  │
│   │     "task_id": "task-001"                           │  │
│   │ })                                                  │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

2.3 共享记忆模式

┌─────────────────────────────────────────────────────────────┐
│                    共享记忆模式                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   Shared Memory(共享记忆)                                 │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                                                      │  │
│   │   Agent A ──┐                                       │  │
│   │             │                                       │  │
│   │   Agent B ──┼──→ Shared Knowledge Base ──→ Read    │  │
│   │             │         (共享知识库)                  │  │
│   │   Agent C ──┘                                       │  │
│   │                                                      │  │
│   │   特点:                                            │  │
│   │   • 无需直接通信                                    │  │
│   │   • 信息持久化                                      │  │
│   │   • 适合知识共享场景                                │  │
│   │                                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   Blackboard(黑板模式)                                    │
│   ┌─────────────────────────────────────────────────────┐  │
│   │                                                      │  │
│   │              ┌─────────────────────┐                │  │
│   │              │     Blackboard      │                │  │
│   │              │    (公共黑板)       │                │  │
│   │              │                     │                │  │
│   │              │  ┌───────────────┐  │                │  │
│   │              │  │ Task Status   │  │                │  │
│   │              │  │ Progress: 60% │  │                │  │
│   │              │  │ Current: B    │  │                │  │
│   │              │  └───────────────┘  │                │  │
│   │              └──────────┬──────────┘                │  │
│   │                         │                           │  │
│   │         ┌───────────────┼───────────────┐          │  │
│   │         ↓               ↓               ↓          │  │
│   │   ┌───────────┐  ┌───────────┐  ┌───────────┐     │  │
│   │   │  Agent A  │  │  Agent B  │  │  Agent C  │     │  │
│   │   │ (读取)    │  │ (写入)    │  │ (读取)    │     │  │
│   │   └───────────┘  └───────────┘  └───────────┘     │  │
│   │                                                      │  │
│   │   特点:                                            │  │
│   │   • 中央信息存储                                    │  │
│   │   • 支持读写分离                                    │  │
│   │   • 实时状态同步                                    │  │
│   │                                                      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
│   示例代码:                                                │
│   ┌─────────────────────────────────────────────────────┐  │
│   │ class Blackboard:                                   │  │
│   │     def __init__(self):                             │  │
│   │         self.data = {}                              │  │
│   │         self.subscribers = {}                       │  │
│   │                                                      │  │
│   │     def write(self, key, value):                    │  │
│   │         self.data[key] = value                      │  │
│   │         self._notify(key, value)                    │  │
│   │                                                      │  │
│   │     def read(self, key):                            │  │
│   │         return self.data.get(key)                   │  │
│   │                                                      │  │
│   │     def subscribe(self, key, callback):             │  │
│   │         self.subscribers[key].append(callback)      │  │
│   └─────────────────────────────────────────────────────┘  │
│                                                             │
└─────────────────────────────────────────────────────────────┘

三、代码实现

3.1 标准消息类

"""
Agent 通信协议 - 标准消息类实现
"""
 
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from datetime import datetime
import uuid
import json
 
 
@dataclass
class Message:
    """标准消息格式"""
    
    # 必需字段
    sender: str                              # 发送者 ID
    receiver: str                            # 接收者 ID 或 "broadcast"
    message_type: str                        # 消息类型
    content: Any                             # 消息内容
    
    # 自动生成字段
    message_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.now)
    
    # 可选字段
    correlation_id: Optional[str] = None     # 关联消息 ID
    priority: int = 0                        # 优先级(0=普通,1=高,2=紧急)
    ttl: Optional[int] = None                # 消息有效期(秒)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            "sender": self.sender,
            "receiver": self.receiver,
            "message_type": self.message_type,
            "content": self.content,
            "message_id": self.message_id,
            "timestamp": self.timestamp.isoformat(),
            "correlation_id": self.correlation_id,
            "priority": self.priority,
            "ttl": self.ttl,
            "metadata": self.metadata,
        }
    
    def to_json(self) -> str:
        """转换为 JSON"""
        return json.dumps(self.to_dict())
    
    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> "Message":
        """从字典创建"""
        data["timestamp"] = datetime.fromisoformat(data["timestamp"])
        return cls(**data)
    
    @classmethod
    def from_json(cls, json_str: str) -> "Message":
        """从 JSON 创建"""
        return cls.from_dict(json.loads(json_str))
 
 
# 消息类型常量
class MessageType:
    """消息类型定义"""
    
    # 控制类
    REGISTER = "REGISTER"
    HEARTBEAT = "HEARTBEAT"
    STATUS = "STATUS"
    SHUTDOWN = "SHUTDOWN"
    
    # 任务类
    TASK_ASSIGN = "TASK_ASSIGN"
    TASK_STATUS = "TASK_STATUS"
    TASK_RESULT = "TASK_RESULT"
    TASK_CANCEL = "TASK_CANCEL"
    
    # 数据类
    DATA_REQUEST = "DATA_REQUEST"
    DATA_RESPONSE = "DATA_RESPONSE"
    DATA_UPDATE = "DATA_UPDATE"
    
    # 协作类
    COLLAB_REQUEST = "COLLAB_REQUEST"
    COLLAB_RESPONSE = "COLLAB_RESPONSE"
    COLLAB_NOTIFY = "COLLAB_NOTIFY"
    COLLAB_VOTE = "COLLAB_VOTE"
    
    # 错误类
    ERROR_REPORT = "ERROR_REPORT"
    TIMEOUT = "TIMEOUT"

3.2 通信总线实现

"""
Agent 通信协议 - 通信总线实现
"""
 
from abc import ABC, abstractmethod
from typing import Dict, List, Callable, Optional, Any
from collections import defaultdict
import asyncio
from queue import Queue
import threading
 
 
class CommunicationBus(ABC):
    """通信总线抽象基类"""
    
    @abstractmethod
    def register(self, agent_id: str) -> None:
        """注册 Agent"""
        pass
    
    @abstractmethod
    def send(self, message: Message) -> bool:
        """发送消息"""
        pass
    
    @abstractmethod
    def receive(self, agent_id: str) -> Optional[Message]:
        """接收消息"""
        pass
    
    @abstractmethod
    def broadcast(self, message: Message) -> bool:
        """广播消息"""
        pass
 
 
class InMemoryBus(CommunicationBus):
    """内存通信总线(单机模式)"""
    
    def __init__(self):
        self.mailboxes: Dict[str, Queue] = {}
        self.subscriptions: Dict[str, List[str]] = defaultdict(list)  # topic -> [agent_ids]
        self.lock = threading.Lock()
    
    def register(self, agent_id: str) -> None:
        """注册 Agent,创建邮箱"""
        with self.lock:
            if agent_id not in self.mailboxes:
                self.mailboxes[agent_id] = Queue()
    
    def subscribe(self, agent_id: str, topic: str) -> None:
        """订阅主题"""
        with self.lock:
            if agent_id not in self.subscriptions[topic]:
                self.subscriptions[topic].append(agent_id)
    
    def send(self, message: Message) -> bool:
        """发送消息到指定 Agent"""
        with self.lock:
            if message.receiver not in self.mailboxes:
                return False
            self.mailboxes[message.receiver].put(message)
            return True
    
    def receive(self, agent_id: str) -> Optional[Message]:
        """接收消息"""
        with self.lock:
            if agent_id not in self.mailboxes:
                return None
            if self.mailboxes[agent_id].empty():
                return None
            return self.mailboxes[agent_id].get()
    
    def broadcast(self, message: Message) -> bool:
        """广播消息"""
        with self.lock:
            # 如果有订阅主题,只发送给订阅者
            if message.message_type in self.subscriptions:
                for agent_id in self.subscriptions[message.message_type]:
                    if agent_id != message.sender:
                        self.mailboxes[agent_id].put(message)
            else:
                # 否则发送给所有 Agent
                for agent_id, mailbox in self.mailboxes.items():
                    if agent_id != message.sender:
                        mailbox.put(message)
            return True
    
    def publish(self, topic: str, content: Any, sender: str) -> bool:
        """发布主题消息"""
        message = Message(
            sender=sender,
            receiver="broadcast",
            message_type=topic,
            content=content,
        )
        return self.broadcast(message)
 
 
class AsyncBus(CommunicationBus):
    """异步通信总线"""
    
    def __init__(self):
        self.mailboxes: Dict[str, asyncio.Queue] = {}
        self.subscriptions: Dict[str, List[str]] = defaultdict(list)
    
    async def register(self, agent_id: str) -> None:
        """注册 Agent"""
        if agent_id not in self.mailboxes:
            self.mailboxes[agent_id] = asyncio.Queue()
    
    async def send(self, message: Message) -> bool:
        """发送消息"""
        if message.receiver not in self.mailboxes:
            return False
        await self.mailboxes[message.receiver].put(message)
        return True
    
    async def receive(self, agent_id: str, timeout: float = 1.0) -> Optional[Message]:
        """接收消息"""
        if agent_id not in self.mailboxes:
            return None
        try:
            return await asyncio.wait_for(
                self.mailboxes[agent_id].get(),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            return None
    
    async def broadcast(self, message: Message) -> bool:
        """广播消息"""
        for agent_id, mailbox in self.mailboxes.items():
            if agent_id != message.sender:
                await mailbox.put(message)
        return True
    
    async def send_and_wait(
        self, 
        message: Message, 
        timeout: float = 30.0
    ) -> Optional[Message]:
        """发送消息并等待响应"""
        await self.send(message)
        
        # 等待响应
        start_time = asyncio.get_event_loop().time()
        while asyncio.get_event_loop().time() - start_time < timeout:
            msg = await self.receive(message.sender, timeout=0.1)
            if msg and msg.correlation_id == message.message_id:
                return msg
        
        return None

3.3 黑板模式实现

"""
Agent 通信协议 - 黑板模式实现
"""
 
from typing import Dict, Any, List, Callable, Optional
from dataclasses import dataclass
from datetime import datetime
import threading
 
 
@dataclass
class BlackboardEntry:
    """黑板条目"""
    key: str
    value: Any
    writer: str
    timestamp: datetime
    version: int = 1
 
 
class Blackboard:
    """
    黑板模式实现
    提供共享信息存储和变更通知
    """
    
    def __init__(self):
        self.data: Dict[str, BlackboardEntry] = {}
        self.subscribers: Dict[str, List[Callable]] = {}
        self.lock = threading.Lock()
    
    def write(self, key: str, value: Any, writer: str) -> None:
        """
        写入黑板
        
        Args:
            key: 键名
            value: 值
            writer: 写入者 Agent ID
        """
        with self.lock:
            version = 1
            if key in self.data:
                version = self.data[key].version + 1
            
            self.data[key] = BlackboardEntry(
                key=key,
                value=value,
                writer=writer,
                timestamp=datetime.now(),
                version=version,
            )
            
            # 通知订阅者
            self._notify(key, value, writer)
    
    def read(self, key: str) -> Optional[Any]:
        """读取黑板内容"""
        with self.lock:
            entry = self.data.get(key)
            return entry.value if entry else None
    
    def read_entry(self, key: str) -> Optional[BlackboardEntry]:
        """读取完整条目"""
        with self.lock:
            return self.data.get(key)
    
    def delete(self, key: str, writer: str) -> bool:
        """删除黑板内容"""
        with self.lock:
            if key in self.data:
                del self.data[key]
                self._notify(key, None, writer)
                return True
            return False
    
    def subscribe(self, key: str, callback: Callable[[str, Any, str], None]) -> None:
        """
        订阅黑板变更
        
        Args:
            key: 订阅的键名
            callback: 回调函数 (key, value, writer)
        """
        with self.lock:
            if key not in self.subscribers:
                self.subscribers[key] = []
            self.subscribers[key].append(callback)
    
    def subscribe_all(self, callback: Callable[[str, Any, str], None]) -> None:
        """订阅所有变更"""
        self.subscribe("*", callback)
    
    def _notify(self, key: str, value: Any, writer: str) -> None:
        """通知订阅者"""
        # 通知特定键的订阅者
        if key in self.subscribers:
            for callback in self.subscribers[key]:
                callback(key, value, writer)
        
        # 通知通配符订阅者
        if "*" in self.subscribers:
            for callback in self.subscribers["*"]:
                callback(key, value, writer)
    
    def get_all(self) -> Dict[str, Any]:
        """获取所有内容"""
        with self.lock:
            return {k: v.value for k, v in self.data.items()}
    
    def clear(self) -> None:
        """清空黑板"""
        with self.lock:
            self.data.clear()
 
 
# 使用示例
def example_blackboard():
    """黑板模式使用示例"""
    
    blackboard = Blackboard()
    
    # 定义回调
    def on_change(key, value, writer):
        print(f"[通知] {writer} 更新了 {key}: {value}")
    
    # 订阅变更
    blackboard.subscribe("task_status", on_change)
    blackboard.subscribe_all(lambda k, v, w: print(f"[全局] {w} 修改了 {k}"))
    
    # Agent 写入
    blackboard.write("task_status", "进行中", "agent_a")
    blackboard.write("progress", 60, "agent_b")
    
    # Agent 读取
    status = blackboard.read("task_status")
    print(f"当前状态: {status}")

四、通信优化

4.1 消息压缩

"""
消息压缩优化
"""
 
import gzip
import json
from typing import Any
 
 
class CompressedMessage:
    """压缩消息"""
    
    @staticmethod
    def compress(data: Dict[str, Any]) -> bytes:
        """压缩消息"""
        json_str = json.dumps(data)
        return gzip.compress(json_str.encode('utf-8'))
    
    @staticmethod
    def decompress(data: bytes) -> Dict[str, Any]:
        """解压消息"""
        json_str = gzip.decompress(data).decode('utf-8')
        return json.loads(json_str)
 
 
class MessageCompressor:
    """消息压缩器"""
    
    def __init__(self, threshold: int = 1024):
        """
        Args:
            threshold: 压缩阈值(字节)
        """
        self.threshold = threshold
    
    def should_compress(self, message: Message) -> bool:
        """判断是否需要压缩"""
        size = len(message.to_json().encode('utf-8'))
        return size > self.threshold
    
    def compress_if_needed(self, message: Message) -> Any:
        """按需压缩"""
        data = message.to_dict()
        if self.should_compress(message):
            return ("compressed", CompressedMessage.compress(data))
        return ("raw", data)

4.2 消息过滤

"""
消息过滤优化
"""
 
from typing import Callable, List
 
 
class MessageFilter:
    """消息过滤器"""
    
    def __init__(self):
        self.filters: List[Callable[[Message], bool]] = []
    
    def add_filter(self, filter_func: Callable[[Message], bool]) -> None:
        """添加过滤器"""
        self.filters.append(filter_func)
    
    def filter(self, messages: List[Message]) -> List[Message]:
        """过滤消息"""
        result = messages
        for f in self.filters:
            result = [m for m in result if f(m)]
        return result
 
 
# 预定义过滤器
def filter_by_type(message_type: str) -> Callable:
    """按消息类型过滤"""
    return lambda m: m.message_type == message_type
 
 
def filter_by_sender(sender: str) -> Callable:
    """按发送者过滤"""
    return lambda m: m.sender == sender
 
 
def filter_by_priority(min_priority: int) -> Callable:
    """按优先级过滤"""
    return lambda m: m.priority >= min_priority

五、适用场景

5.1 通信模式选择

场景推荐模式理由
私有对话点对点通信信息隔离,效率高
状态通知广播模式一对多,实时性
事件驱动Pub-Sub解耦,可扩展
知识共享共享记忆持久化,无需通信
协作决策黑板模式实时同步,可见性

5.2 性能优化建议

优化方向方法效果
减少消息量压缩、增量传输降低带宽
减少通信次数批量处理、缓存降低延迟
提高可靠性确认机制、重试提高成功率

六、面试问答

Q1: 设计 Agent 通信协议需要考虑哪些因素?

回答

  1. 消息格式标准化:统一的字段定义
  2. 通信模式选择:点对点、广播、发布订阅
  3. 可靠性保证:确认、重试、错误处理
  4. 性能优化:压缩、过滤、批量处理
  5. 可扩展性:支持新消息类型和字段

Q2: 广播模式和发布订阅有什么区别?

回答

对比项广播模式发布订阅
接收者所有 Agent订阅者
过滤主题过滤
解耦程度
适用场景状态通知事件驱动

Q3: 黑板模式的优缺点是什么?

回答

  • 优点:无需直接通信、信息持久化、易于审计
  • 缺点:中央存储可能成为瓶颈、需要同步机制

Q4: 如何优化多 Agent 系统的通信开销?

回答

  1. 消息压缩和增量传输
  2. 智能路由和消息过滤
  3. 批量处理和异步通信
  4. 本地缓存减少重复请求

七、小结

概念一句话总结面试关键词
消息格式标准化的消息结构sender、receiver、type、content
通信模式点对点、广播、发布订阅Pub-Sub、Blackboard
优化策略压缩、过滤、批量处理性能优化、可靠性
模式选择根据场景选择合适模式场景匹配、权衡取舍

一句话总结:Agent 通信协议是协作的基础设施,需要平衡标准化、灵活性、性能和可靠性,根据具体场景选择合适的通信模式和优化策略。


最后更新:2026年3月19日