多智能体协作
学习如何让多个AI智能体像团队一样协同工作,共同完成复杂任务
7.1 什么是多智能体系统
在前面的章节中,我们学习了如何构建单个智能体。但在现实世界中,复杂任务往往需要多个人协作完成。多智能体系统(Multi-Agent System,MAS)就是模拟这种协作模式的AI架构。
想象一下软件公司的开发团队:
- 产品经理负责需求分析
- UI设计师负责界面设计
- 程序员负责编写代码
- 测试工程师负责质量保证
每个人都在自己的专业领域工作,同时又与其他成员协作,最终共同交付产品。多智能体系统正是模仿这种组织形式。
┌─────────────────────────────────────────────────────────────┐ │ 多智能体系统架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 智能体 A │◄──►│ 智能体 B │◄──►│ 智能体 C │ │ │ │ (规划者) │ │ (执行者) │ │ (检查者) │ │ │ └────┬─────┘ └────┬─────┘ └────┬─────┘ │ │ │ │ │ │ │ └───────────────┼───────────────┘ │ │ ▼ │ │ ┌────────────┐ │ │ │ 共享环境/ │ │ │ │ 消息总线 │ │ │ └────────────┘ │ │ │ └─────────────────────────────────────────────────────────────┘
7.2 为什么需要多智能体
7.2.1 专业分工
单个智能体很难在所有领域都表现优秀。就像一个人不可能同时是顶级程序员、优秀设计师和专业律师一样,不同的智能体可以专注于不同的专业能力。
7.2.2 并行处理
多个智能体可以同时处理不同子任务,大幅提高整体效率。例如,在数据分析任务中:
- 智能体A负责清洗数据
- 智能体B同时进行可视化分析
- 智能体C撰写分析报告
7.2.3 互相检查与纠错
多个智能体可以相互审查工作成果,减少错误。一个智能体的输出可以作为另一个智能体的输入进行验证。
| 单智能体 | 多智能体 |
|---|---|
| 适合简单、单一领域任务 | 适合复杂、跨领域任务 |
| 架构简单,易于实现 | 架构复杂,但能力更强 |
| 难以并行处理 | 可以并行提高效率 |
| 容易陷入固定思维模式 | 多角度思考,减少盲区 |
7.3 常见协作模式
7.3.1 主从模式(Master-Slave)
一个主管智能体负责分配任务和协调,多个执行智能体负责完成具体工作。
主从模式
┌─────────────────────────────────────┐
│ │
│ ┌─────────────┐ │
│ │ 主管Agent │ │
│ │ (Master) │ │
│ └──────┬──────┘ │
│ │ 分配任务 │
│ ┌───────┼───────┐ │
│ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Agent │ │Agent │ │Agent │ │
│ │ A │ │ B │ │ C │ │
│ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ 汇总结果 │ │ │
│ └───────┼────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ 整合输出 │ │
│ └─────────────┘ │
│ │
└─────────────────────────────────────┘
适用场景:任务分解明确、需要集中决策的场景,如项目管理、数据处理流水线。
7.3.2 对等模式(Peer-to-Peer)
所有智能体地位平等,通过协商达成共识,共同决策。
对等模式
┌─────────────────────────────────────┐
│ │
│ ┌───────────┐ │
│ │ Agent A │◄──────────────┤
│ │ (专家A) │ │
│ └─────┬─────┘ │
│ │ │
│ ▼ │
│ ┌──────►┌─────┐◄──────┐ │
│ │ │协商 │ │ │
│ │ │中心 │ │ │
│ │ └──┬──┘ │ │
│ │ │ │ │
│ │ ┌─────┴─────┐ │ │
│ │ ▼ ▼ │ │
│ └──►┌──────┐ ┌──────┐◄───┐ │
│ │Agent │ │Agent │ │ │
│ │ B │ │ C │────┘ │
│ └──────┘ └──────┘ │
│ │
│ 特点:平等协商,投票决策,共识机制 │
└─────────────────────────────────────┘
适用场景:需要集思广益、多角度分析的场景,如头脑风暴、复杂决策。
7.3.3 流水线模式(Pipeline)
智能体按顺序连接,每个智能体完成一道工序,输出传递给下一个智能体。
流水线模式
┌──────────────────────────────────────────────┐
│ │
│ 输入 ──► ┌─────┐ ──► ┌─────┐ ──► ┌─────┐ │
│ │步骤1│ │步骤2│ │步骤3│ │
│ │Agent│ │Agent│ │Agent│ │
│ └──┬──┘ └──┬──┘ └──┬──┘ │
│ │ │ │ │
│ 原始数据 中间结果 最终输出 │
│ │
│ 示例: 需求分析 → 代码编写 → 代码审查 → 测试 │
│ │
└──────────────────────────────────────────────┘
适用场景:任务可以分解为连续步骤的场景,如内容创作、软件开发流程。
- 需要强控制选主从模式
- 需要创意碰撞选对等模式
- 流程固定明确选流水线模式
7.4 角色分工设计
设计多智能体系统的关键是合理划分角色。以下是常见的角色设计方法:
7.4.1 按专业领域划分
# 软件开发团队角色设计
roles:
product_manager:
name: "产品经理"
expertise: ["需求分析", "用户研究", "产品设计"]
responsibilities: ["理解需求", "制定计划", "验收成果"]
architect:
name: "系统架构师"
expertise: ["系统设计", "技术选型", "架构规划"]
responsibilities: ["设计方案", "技术决策", "代码审查"]
developer:
name: "开发工程师"
expertise: ["编码实现", "单元测试", "bug修复"]
responsibilities: ["编写代码", "自测", "文档编写"]
tester:
name: "测试工程师"
expertise: ["测试用例", "自动化测试", "质量保障"]
responsibilities: ["编写测试", "执行测试", "报告bug"]
7.4.2 按工作职能划分
| 职能类型 | 角色名称 | 主要职责 |
|---|---|---|
| 规划类 | Planner/Manager | 任务分解、进度跟踪、资源分配 |
| 执行类 | Worker/Executor | 具体任务执行、成果产出 |
| 检查类 | Reviewer/Critic | 质量检查、错误发现、改进建议 |
| 协调类 | Coordinator | 冲突解决、信息同步、团队协作 |
7.4.3 角色设计的最佳实践
- 职责清晰:每个角色有明确的职责边界,避免重叠或遗漏
- 能力互补:不同角色的能力应该相互补充,形成完整解决方案
- 粒度适中:角色不宜过多(管理复杂)或过少(失去协作意义)
- 可扩展性:设计时考虑未来可能需要增加的角色
7.5 通信与协调
7.5.1 通信机制
智能体之间需要交换信息才能协作。常见的通信方式包括:
# 消息传递系统示例
class Message:
def __init__(self, sender, receiver, msg_type, content):
self.sender = sender # 发送者ID
self.receiver = receiver # 接收者ID(None表示广播)
self.type = msg_type # 消息类型:request/response/notify
self.content = content # 消息内容
self.timestamp = time.now()
class MessageBus:
"""消息总线 - 智能体通信的基础设施"""
def __init__(self):
self.agents = {} # 注册的智能体
self.message_queue = [] # 消息队列
def register(self, agent_id, agent):
"""智能体注册到总线"""
self.agents[agent_id] = agent
def send(self, message):
"""发送消息"""
if message.receiver:
# 点对点发送
target = self.agents.get(message.receiver)
if target:
target.receive(message)
else:
# 广播给所有智能体
for agent in self.agents.values():
agent.receive(message)
7.5.2 消息类型设计
| 消息类型 | 用途 | 示例 |
|---|---|---|
| Task Request | 分配任务 | "请完成用户登录模块的开发" |
| Task Result | 提交成果 | "登录模块代码已完成,见附件" |
| Query | 信息查询 | "请提供API接口文档" |
| Notification | 状态通知 | "任务已完成,进入测试阶段" |
| Conflict | 冲突报告 | "发现需求与现有设计冲突" |
7.5.3 冲突解决
当多个智能体产生分歧时,需要冲突解决机制:
# 冲突解决策略
class ConflictResolver:
"""冲突解决器"""
def resolve(self, conflict_type, proposals):
"""
conflict_type: 冲突类型
proposals: 各方的提案列表
"""
if conflict_type == "priority":
# 优先级冲突:按预设优先级排序
return self._priority_based(proposals)
elif conflict_type == "resource":
# 资源竞争:轮流或按需分配
return self._round_robin(proposals)
elif conflict_type == "opinion":
# 意见分歧:投票或主管裁决
return self._voting_or_authority(proposals)
elif conflict_type == "deadlock":
# 死锁:超时机制或第三方协调
return self._timeout_or_mediation(proposals)
- 设置消息超时机制,避免无限等待
- 处理网络延迟和消息丢失
- 避免循环依赖导致的死锁
7.6 完整示例:软件开发团队
下面我们实现一个完整的软件开发多智能体系统,包含产品经理、开发工程师、代码审查员和测试工程师四个角色。
7.6.1 系统架构
软件开发多智能体系统
┌──────────────────────────────────────────────────────────┐
│ │
│ 需求输入 ──► ┌─────────────┐ │
│ │ PM Agent │ │
│ │ (产品经理) │ │
│ └──────┬──────┘ │
│ │ 分解任务 │
│ ┌───────────┼───────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Dev │ │Review │ │ Test │ │
│ │ Agent │ │ Agent │ │ Agent │ │
│ │(开发) │ │(审查) │ │(测试) │ │
│ └───┬────┘ └───┬────┘ └───┬────┘ │
│ │ │ │ │
│ │ 代码 │ 报告 │ │
│ └───────────┼───────────┘ │
│ ▼ │
│ ┌────────────┐ │
│ │ 产品交付 │ │
│ └────────────┘ │
│ │
└──────────────────────────────────────────────────────────┘
7.6.2 代码实现
import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "待处理"
IN_PROGRESS = "进行中"
REVIEWING = "审查中"
TESTING = "测试中"
COMPLETED = "已完成"
@dataclass
class Task:
id: str
title: str
description: str
assignee: str = None
status: TaskStatus = TaskStatus.PENDING
result: str = None
class MessageBus:
"""消息总线"""
def __init__(self):
self.agents = {}
self.history = []
def register(self, agent_id: str, agent):
self.agents[agent_id] = agent
async def send(self, sender: str, receiver: str, content: str):
message = {
"sender": sender,
"receiver": receiver,
"content": content,
"timestamp": asyncio.get_event_loop().time()
}
self.history.append(message)
print(f"[{sender} -> {receiver}]: {content}")
if receiver in self.agents:
await self.agents[receiver].receive(message)
class BaseAgent:
"""智能体基类"""
def __init__(self, agent_id: str, name: str, bus: MessageBus):
self.id = agent_id
self.name = name
self.bus = bus
self.inbox = asyncio.Queue()
async def receive(self, message: Dict):
await self.inbox.put(message)
async def send_to(self, receiver: str, content: str):
await self.bus.send(self.id, receiver, content)
async def run(self):
"""主循环 - 子类需实现"""
raise NotImplementedError
class PMAgent(BaseAgent):
"""产品经理智能体"""
def __init__(self, bus: MessageBus):
super().__init__("pm", "产品经理", bus)
self.tasks = []
async def analyze_requirement(self, requirement: str) -> List[Task]:
"""分析需求并分解任务"""
print(f"\n[{self.name}] 正在分析需求: {requirement}")
# 模拟需求分析过程
await asyncio.sleep(0.5)
tasks = [
Task("T1", "数据库设计", "设计用户表结构和关系"),
Task("T2", "API开发", "实现用户CRUD接口"),
Task("T3", "前端页面", "开发用户管理界面")
]
print(f"[{self.name}] 任务分解完成,共 {len(tasks)} 个子任务")
return tasks
async def assign_tasks(self, tasks: List[Task]):
"""分配任务给开发团队"""
for i, task in enumerate(tasks):
task.assignee = "dev"
task.status = TaskStatus.IN_PROGRESS
await self.send_to("dev",
f"新任务 #{i+1}: {task.title}\n描述: {task.description}")
async def review_deliverable(self, result: str) -> bool:
"""验收最终成果"""
print(f"\n[{self.name}] 正在验收成果...")
await asyncio.sleep(0.3)
print(f"[{self.name}] 验收通过!产品可以交付。")
return True
async def run(self):
while True:
try:
message = await asyncio.wait_for(
self.inbox.get(), timeout=1.0
)
content = message["content"]
if "任务完成" in content:
print(f"\n[{self.name}] 收到完成通知")
# 触发代码审查
await self.send_to("reviewer", "请审查新提交的代码")
elif "审查通过" in content:
# 触发测试
await self.send_to("tester", "请进行功能测试")
elif "测试通过" in content:
await self.review_deliverable(content)
except asyncio.TimeoutError:
continue
class DevAgent(BaseAgent):
"""开发工程师智能体"""
def __init__(self, bus: MessageBus):
super().__init__("dev", "开发工程师", bus)
self.current_task = None
async def implement(self, task: Task) -> str:
"""实现任务"""
print(f"\n[{self.name}] 开始开发: {task.title}")
await asyncio.sleep(1.0) # 模拟开发时间
code = f"""
# {task.title} 实现代码
class UserService:
def create_user(self, user_data):
# 实现逻辑
return {{"id": 1, "name": user_data["name"]}}
def get_user(self, user_id):
return {{"id": user_id, "name": "Test User"}}
"""
print(f"[{self.name}] 开发完成")
return code
async def run(self):
while True:
try:
message = await asyncio.wait_for(
self.inbox.get(), timeout=1.0
)
content = message["content"]
if "新任务" in content:
# 解析任务并执行
task = Task("T1", "示例任务", "开发功能")
code = await self.implement(task)
await self.send_to("pm",
f"任务完成,代码已提交\n{code[:100]}...")
except asyncio.TimeoutError:
continue
class ReviewAgent(BaseAgent):
"""代码审查智能体"""
def __init__(self, bus: MessageBus):
super().__init__("reviewer", "代码审查员", bus)
async def review_code(self, code: str) -> Dict[str, Any]:
"""审查代码"""
print(f"\n[{self.name}] 开始代码审查...")
await asyncio.sleep(0.5)
issues = []
# 模拟审查逻辑
if "error handling" not in code.lower():
issues.append("建议添加错误处理")
result = {
"passed": len(issues) == 0,
"issues": issues,
"comments": "代码结构清晰,符合规范"
}
print(f"[{self.name}] 审查完成: {'通过' if result['passed'] else '需修改'}")
return result
async def run(self):
while True:
try:
message = await asyncio.wait_for(
self.inbox.get(), timeout=1.0
)
content = message["content"]
if "审查" in content:
# 执行审查
review_result = await self.review_code("代码内容")
if review_result["passed"]:
await self.send_to("pm", "审查通过,代码质量良好")
else:
await self.send_to("dev",
f"需要修改: {', '.join(review_result['issues'])}")
except asyncio.TimeoutError:
continue
class TestAgent(BaseAgent):
"""测试工程师智能体"""
def __init__(self, bus: MessageBus):
super().__init__("tester", "测试工程师", bus)
async def run_tests(self) -> Dict[str, Any]:
"""执行测试"""
print(f"\n[{self.name}] 开始执行测试...")
test_cases = [
("创建用户", "PASSED"),
("查询用户", "PASSED"),
("更新用户", "PASSED"),
("删除用户", "PASSED")
]
await asyncio.sleep(0.8)
all_passed = all(result == "PASSED" for _, result in test_cases)
print(f"[{self.name}] 测试执行完成")
for case, result in test_cases:
status = "✓" if result == "PASSED" else "✗"
print(f" {status} {case}")
return {"passed": all_passed, "cases": test_cases}
async def run(self):
while True:
try:
message = await asyncio.wait_for(
self.inbox.get(), timeout=1.0
)
content = message["content"]
if "测试" in content:
test_result = await self.run_tests()
if test_result["passed"]:
await self.send_to("pm", "测试通过,所有用例执行成功")
else:
await self.send_to("dev", "发现bug,需要修复")
except asyncio.TimeoutError:
continue
# 系统启动和协调
async def main():
"""主函数 - 启动多智能体系统"""
print("=" * 60)
print(" 软件开发多智能体协作系统启动")
print("=" * 60)
# 创建消息总线
bus = MessageBus()
# 创建智能体
pm = PMAgent(bus)
dev = DevAgent(bus)
reviewer = ReviewAgent(bus)
tester = TestAgent(bus)
# 注册到总线
bus.register("pm", pm)
bus.register("dev", dev)
bus.register("reviewer", reviewer)
bus.register("tester", tester)
# 启动所有智能体
agents = [pm.run(), dev.run(), reviewer.run(), tester.run()]
# 产品经理分析需求并启动流程
tasks = await pm.analyze_requirement(
"开发一个用户管理系统,支持用户的增删改查功能"
)
await pm.assign_tasks(tasks)
# 运行一段时间观察协作
await asyncio.gather(*agents)
# 运行示例
if __name__ == "__main__":
asyncio.run(main())
7.6.3 运行结果示例
============================================================
软件开发多智能体协作系统启动
============================================================
[产品经理] 正在分析需求: 开发一个用户管理系统,支持用户的增删改查功能
[产品经理] 任务分解完成,共 3 个子任务
[pm -> dev]: 新任务 #1: 数据库设计
描述: 设计用户表结构和关系
[pm -> dev]: 新任务 #2: API开发
描述: 实现用户CRUD接口
...
[开发工程师] 开始开发: 数据库设计
[开发工程师] 开发完成
[dev -> pm]: 任务完成,代码已提交
...
[pm -> reviewer]: 请审查新提交的代码
[代码审查员] 开始代码审查...
[代码审查员] 审查完成: 通过
[reviewer -> pm]: 审查通过,代码质量良好
...
[pm -> tester]: 请进行功能测试
[测试工程师] 开始执行测试...
[测试工程师] 测试执行完成
✓ 创建用户
✓ 查询用户
✓ 更新用户
✓ 删除用户
[tester -> pm]: 测试通过,所有用例执行成功
[产品经理] 验收通过!产品可以交付。
7.7 多智能体的挑战
虽然多智能体系统功能强大,但也面临一些挑战:
7.7.1 通信开销
解决方案:
- 批量处理消息,减少通信次数
- 使用共享内存或数据库减少消息传递
- 合理设计通信粒度
7.7.2 一致性维护
多个智能体可能对同一信息有不同理解,导致决策不一致。
# 一致性检查示例
class ConsistencyChecker:
"""一致性检查器"""
def check_consensus(self, opinions: List[str]) -> bool:
"""检查是否达成共识"""
# 使用LLM判断意见是否一致
prompt = f"""
判断以下意见是否一致或相容:
{opinions}
如果相容返回: CONSISTENT
如果不相容返回: CONFLICT
解释原因。
"""
result = llm.call(prompt)
return "CONSISTENT" in result
def resolve_inconsistency(self, opinions: List[str]) -> str:
"""解决不一致"""
# 综合各方意见,生成一致的结论
prompt = f"""
综合以下意见,给出一个一致的结论:
{opinions}
"""
return llm.call(prompt)
7.7.3 错误传播
一个智能体的错误可能影响其他智能体,甚至导致连锁反应。
错误传播风险
┌────────────────────────────────────────┐
│ │
│ Agent A (出错) │
│ │ │
│ ▼ 错误输出 │
│ Agent B (基于错误继续) │
│ │ │
│ ▼ 错误放大 │
│ Agent C (严重偏离目标) │
│ │
│ ⚠️ 需要:错误检测、边界检查、回滚机制 │
│ │
└────────────────────────────────────────┘
7.7.4 其他挑战
| 挑战 | 描述 | 应对策略 |
|---|---|---|
| 死锁 | 智能体互相等待对方完成 | 设置超时、引入仲裁者 |
| 负载不均 | 某些智能体过载,其他空闲 | 动态任务调度、负载均衡 |
| 调试困难 | 多智能体交互复杂,难以追踪 | 完善的日志、可视化工具 |
| 成本增加 | 多个LLM调用增加费用 | 选择性激活、缓存机制 |
- 从简单场景开始,逐步增加复杂度
- 设计清晰的通信协议和错误处理机制
- 建立完善的监控和日志系统
- 定期进行系统审计和优化
本章小结
本章我们学习了多智能体系统的核心概念:
- 多智能体系统 - 多个AI智能体协同工作的架构
- 协作模式 - 主从模式、对等模式、流水线模式
- 角色设计 - 按专业领域或工作职能划分角色
- 通信协调 - 消息传递、冲突解决机制
- 实战示例 - 完整的软件开发团队实现
- 挑战应对 - 通信开销、一致性、错误传播等问题
多智能体系统让AI从"单兵作战"走向"团队协作",能够处理更复杂的现实问题。在下一章中,我们将通过三个完整的实战案例,综合运用所学知识,开发实用的智能体应用。