第7章

多智能体协作

学习如何让多个AI智能体像团队一样协同工作,共同完成复杂任务

7.1 什么是多智能体系统

在前面的章节中,我们学习了如何构建单个智能体。但在现实世界中,复杂任务往往需要多个人协作完成。多智能体系统(Multi-Agent System,MAS)就是模拟这种协作模式的AI架构。

核心概念:多智能体系统是由多个自主智能体组成的群体,它们通过相互通信、协调和合作,共同完成单个智能体难以处理的复杂任务。

想象一下软件公司的开发团队:

  • 产品经理负责需求分析
  • UI设计师负责界面设计
  • 程序员负责编写代码
  • 测试工程师负责质量保证

每个人都在自己的专业领域工作,同时又与其他成员协作,最终共同交付产品。多智能体系统正是模仿这种组织形式。

┌─────────────────────────────────────────────────────────────┐
│                    多智能体系统架构                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   ┌──────────┐    ┌──────────┐    ┌──────────┐             │
│   │ 智能体 A │◄──►│ 智能体 B │◄──►│ 智能体 C │             │
│   │ (规划者) │    │ (执行者) │    │ (检查者) │             │
│   └────┬─────┘    └────┬─────┘    └────┬─────┘             │
│        │               │               │                    │
│        └───────────────┼───────────────┘                    │
│                        ▼                                    │
│                 ┌────────────┐                             │
│                 │ 共享环境/  │                             │
│                 │ 消息总线   │                             │
│                 └────────────┘                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

7.2 为什么需要多智能体

7.2.1 专业分工

单个智能体很难在所有领域都表现优秀。就像一个人不可能同时是顶级程序员、优秀设计师和专业律师一样,不同的智能体可以专注于不同的专业能力。

7.2.2 并行处理

多个智能体可以同时处理不同子任务,大幅提高整体效率。例如,在数据分析任务中:

  • 智能体A负责清洗数据
  • 智能体B同时进行可视化分析
  • 智能体C撰写分析报告

7.2.3 互相检查与纠错

多个智能体可以相互审查工作成果,减少错误。一个智能体的输出可以作为另一个智能体的输入进行验证。

类比理解:多智能体就像一支足球队——有前锋、中场、后卫、守门员等不同角色,只有他们协同配合,才能赢得比赛。
单智能体 多智能体
适合简单、单一领域任务 适合复杂、跨领域任务
架构简单,易于实现 架构复杂,但能力更强
难以并行处理 可以并行提高效率
容易陷入固定思维模式 多角度思考,减少盲区

7.3 常见协作模式

7.3.1 主从模式(Master-Slave)

一个主管智能体负责分配任务和协调,多个执行智能体负责完成具体工作。

            主从模式
┌─────────────────────────────────────┐
│                                     │
│        ┌─────────────┐              │
│        │   主管Agent  │              │
│        │  (Master)   │              │
│        └──────┬──────┘              │
│               │ 分配任务             │
│       ┌───────┼───────┐             │
│       ▼       ▼       ▼             │
│   ┌──────┐ ┌──────┐ ┌──────┐       │
│   │Agent │ │Agent │ │Agent │       │
│   │  A   │ │  B   │ │  C   │       │
│   └──┬───┘ └──┬───┘ └──┬───┘       │
│      │ 汇总结果 │        │           │
│      └───────┼────────┘             │
│              ▼                      │
│        ┌─────────────┐              │
│        │  整合输出   │              │
│        └─────────────┘              │
│                                     │
└─────────────────────────────────────┘

适用场景:任务分解明确、需要集中决策的场景,如项目管理、数据处理流水线。

7.3.2 对等模式(Peer-to-Peer)

所有智能体地位平等,通过协商达成共识,共同决策。

            对等模式
┌─────────────────────────────────────┐
│                                     │
│        ┌───────────┐                │
│        │  Agent A  │◄──────────────┤
│        │ (专家A)   │                │
│        └─────┬─────┘                │
│              │                      │
│              ▼                      │
│   ┌──────►┌─────┐◄──────┐          │
│   │       │协商 │       │          │
│   │       │中心 │       │          │
│   │       └──┬──┘       │          │
│   │          │          │          │
│   │    ┌─────┴─────┐    │          │
│   │    ▼           ▼    │          │
│   └──►┌──────┐  ┌──────┐◄───┐      │
│       │Agent │  │Agent │    │      │
│       │  B   │  │  C   │────┘      │
│       └──────┘  └──────┘           │
│                                     │
│  特点:平等协商,投票决策,共识机制   │
└─────────────────────────────────────┘

适用场景:需要集思广益、多角度分析的场景,如头脑风暴、复杂决策。

7.3.3 流水线模式(Pipeline)

智能体按顺序连接,每个智能体完成一道工序,输出传递给下一个智能体。

           流水线模式
┌──────────────────────────────────────────────┐
│                                              │
│   输入 ──► ┌─────┐ ──► ┌─────┐ ──► ┌─────┐  │
│            │步骤1│     │步骤2│     │步骤3│  │
│            │Agent│     │Agent│     │Agent│  │
│            └──┬──┘     └──┬──┘     └──┬──┘  │
│               │           │           │      │
│            原始数据    中间结果    最终输出   │
│                                              │
│   示例: 需求分析 → 代码编写 → 代码审查 → 测试  │
│                                              │
└──────────────────────────────────────────────┘

适用场景:任务可以分解为连续步骤的场景,如内容创作、软件开发流程。

模式选择建议:
  • 需要强控制选主从模式
  • 需要创意碰撞选对等模式
  • 流程固定明确选流水线模式

7.4 角色分工设计

设计多智能体系统的关键是合理划分角色。以下是常见的角色设计方法:

7.4.1 按专业领域划分

# 软件开发团队角色设计
roles:
  product_manager:
    name: "产品经理"
    expertise: ["需求分析", "用户研究", "产品设计"]
    responsibilities: ["理解需求", "制定计划", "验收成果"]
    
  architect:
    name: "系统架构师"
    expertise: ["系统设计", "技术选型", "架构规划"]
    responsibilities: ["设计方案", "技术决策", "代码审查"]
    
  developer:
    name: "开发工程师"
    expertise: ["编码实现", "单元测试", "bug修复"]
    responsibilities: ["编写代码", "自测", "文档编写"]
    
  tester:
    name: "测试工程师"
    expertise: ["测试用例", "自动化测试", "质量保障"]
    responsibilities: ["编写测试", "执行测试", "报告bug"]

7.4.2 按工作职能划分

职能类型 角色名称 主要职责
规划类 Planner/Manager 任务分解、进度跟踪、资源分配
执行类 Worker/Executor 具体任务执行、成果产出
检查类 Reviewer/Critic 质量检查、错误发现、改进建议
协调类 Coordinator 冲突解决、信息同步、团队协作

7.4.3 角色设计的最佳实践

  1. 职责清晰:每个角色有明确的职责边界,避免重叠或遗漏
  2. 能力互补:不同角色的能力应该相互补充,形成完整解决方案
  3. 粒度适中:角色不宜过多(管理复杂)或过少(失去协作意义)
  4. 可扩展性:设计时考虑未来可能需要增加的角色

7.5 通信与协调

7.5.1 通信机制

智能体之间需要交换信息才能协作。常见的通信方式包括:

# 消息传递系统示例
class Message:
    def __init__(self, sender, receiver, msg_type, content):
        self.sender = sender      # 发送者ID
        self.receiver = receiver  # 接收者ID(None表示广播)
        self.type = msg_type      # 消息类型:request/response/notify
        self.content = content    # 消息内容
        self.timestamp = time.now()

class MessageBus:
    """消息总线 - 智能体通信的基础设施"""
    
    def __init__(self):
        self.agents = {}          # 注册的智能体
        self.message_queue = []   # 消息队列
    
    def register(self, agent_id, agent):
        """智能体注册到总线"""
        self.agents[agent_id] = agent
    
    def send(self, message):
        """发送消息"""
        if message.receiver:
            # 点对点发送
            target = self.agents.get(message.receiver)
            if target:
                target.receive(message)
        else:
            # 广播给所有智能体
            for agent in self.agents.values():
                agent.receive(message)

7.5.2 消息类型设计

消息类型 用途 示例
Task Request 分配任务 "请完成用户登录模块的开发"
Task Result 提交成果 "登录模块代码已完成,见附件"
Query 信息查询 "请提供API接口文档"
Notification 状态通知 "任务已完成,进入测试阶段"
Conflict 冲突报告 "发现需求与现有设计冲突"

7.5.3 冲突解决

当多个智能体产生分歧时,需要冲突解决机制:

# 冲突解决策略
class ConflictResolver:
    """冲突解决器"""
    
    def resolve(self, conflict_type, proposals):
        """
        conflict_type: 冲突类型
        proposals: 各方的提案列表
        """
        
        if conflict_type == "priority":
            # 优先级冲突:按预设优先级排序
            return self._priority_based(proposals)
        
        elif conflict_type == "resource":
            # 资源竞争:轮流或按需分配
            return self._round_robin(proposals)
        
        elif conflict_type == "opinion":
            # 意见分歧:投票或主管裁决
            return self._voting_or_authority(proposals)
        
        elif conflict_type == "deadlock":
            # 死锁:超时机制或第三方协调
            return self._timeout_or_mediation(proposals)
通信注意事项:
  • 设置消息超时机制,避免无限等待
  • 处理网络延迟和消息丢失
  • 避免循环依赖导致的死锁

7.6 完整示例:软件开发团队

下面我们实现一个完整的软件开发多智能体系统,包含产品经理、开发工程师、代码审查员和测试工程师四个角色。

7.6.1 系统架构

         软件开发多智能体系统
┌──────────────────────────────────────────────────────────┐
│                                                          │
│   需求输入 ──► ┌─────────────┐                          │
│                │   PM Agent  │                          │
│                │  (产品经理)  │                          │
│                └──────┬──────┘                          │
│                       │ 分解任务                          │
│           ┌───────────┼───────────┐                      │
│           ▼           ▼           ▼                      │
│      ┌────────┐  ┌────────┐  ┌────────┐                 │
│      │  Dev   │  │Review  │  │  Test  │                 │
│      │ Agent  │  │ Agent  │  │ Agent  │                 │
│      │(开发)  │  │(审查)  │  │(测试)  │                 │
│      └───┬────┘  └───┬────┘  └───┬────┘                 │
│          │           │           │                       │
│          │   代码    │   报告    │                       │
│          └───────────┼───────────┘                       │
│                      ▼                                   │
│               ┌────────────┐                            │
│               │  产品交付  │                            │
│               └────────────┘                            │
│                                                          │
└──────────────────────────────────────────────────────────┘

7.6.2 代码实现

import asyncio
from typing import List, Dict, Any
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    PENDING = "待处理"
    IN_PROGRESS = "进行中"
    REVIEWING = "审查中"
    TESTING = "测试中"
    COMPLETED = "已完成"

@dataclass
class Task:
    id: str
    title: str
    description: str
    assignee: str = None
    status: TaskStatus = TaskStatus.PENDING
    result: str = None

class MessageBus:
    """消息总线"""
    def __init__(self):
        self.agents = {}
        self.history = []
    
    def register(self, agent_id: str, agent):
        self.agents[agent_id] = agent
    
    async def send(self, sender: str, receiver: str, content: str):
        message = {
            "sender": sender,
            "receiver": receiver,
            "content": content,
            "timestamp": asyncio.get_event_loop().time()
        }
        self.history.append(message)
        print(f"[{sender} -> {receiver}]: {content}")
        
        if receiver in self.agents:
            await self.agents[receiver].receive(message)

class BaseAgent:
    """智能体基类"""
    def __init__(self, agent_id: str, name: str, bus: MessageBus):
        self.id = agent_id
        self.name = name
        self.bus = bus
        self.inbox = asyncio.Queue()
    
    async def receive(self, message: Dict):
        await self.inbox.put(message)
    
    async def send_to(self, receiver: str, content: str):
        await self.bus.send(self.id, receiver, content)
    
    async def run(self):
        """主循环 - 子类需实现"""
        raise NotImplementedError

class PMAgent(BaseAgent):
    """产品经理智能体"""
    
    def __init__(self, bus: MessageBus):
        super().__init__("pm", "产品经理", bus)
        self.tasks = []
    
    async def analyze_requirement(self, requirement: str) -> List[Task]:
        """分析需求并分解任务"""
        print(f"\n[{self.name}] 正在分析需求: {requirement}")
        
        # 模拟需求分析过程
        await asyncio.sleep(0.5)
        
        tasks = [
            Task("T1", "数据库设计", "设计用户表结构和关系"),
            Task("T2", "API开发", "实现用户CRUD接口"),
            Task("T3", "前端页面", "开发用户管理界面")
        ]
        
        print(f"[{self.name}] 任务分解完成,共 {len(tasks)} 个子任务")
        return tasks
    
    async def assign_tasks(self, tasks: List[Task]):
        """分配任务给开发团队"""
        for i, task in enumerate(tasks):
            task.assignee = "dev"
            task.status = TaskStatus.IN_PROGRESS
            
            await self.send_to("dev", 
                f"新任务 #{i+1}: {task.title}\n描述: {task.description}")
    
    async def review_deliverable(self, result: str) -> bool:
        """验收最终成果"""
        print(f"\n[{self.name}] 正在验收成果...")
        await asyncio.sleep(0.3)
        print(f"[{self.name}] 验收通过!产品可以交付。")
        return True
    
    async def run(self):
        while True:
            try:
                message = await asyncio.wait_for(
                    self.inbox.get(), timeout=1.0
                )
                content = message["content"]
                
                if "任务完成" in content:
                    print(f"\n[{self.name}] 收到完成通知")
                    # 触发代码审查
                    await self.send_to("reviewer", "请审查新提交的代码")
                    
                elif "审查通过" in content:
                    # 触发测试
                    await self.send_to("tester", "请进行功能测试")
                    
                elif "测试通过" in content:
                    await self.review_deliverable(content)
                    
            except asyncio.TimeoutError:
                continue

class DevAgent(BaseAgent):
    """开发工程师智能体"""
    
    def __init__(self, bus: MessageBus):
        super().__init__("dev", "开发工程师", bus)
        self.current_task = None
    
    async def implement(self, task: Task) -> str:
        """实现任务"""
        print(f"\n[{self.name}] 开始开发: {task.title}")
        await asyncio.sleep(1.0)  # 模拟开发时间
        
        code = f"""
# {task.title} 实现代码
class UserService:
    def create_user(self, user_data):
        # 实现逻辑
        return {{"id": 1, "name": user_data["name"]}}
    
    def get_user(self, user_id):
        return {{"id": user_id, "name": "Test User"}}
"""
        print(f"[{self.name}] 开发完成")
        return code
    
    async def run(self):
        while True:
            try:
                message = await asyncio.wait_for(
                    self.inbox.get(), timeout=1.0
                )
                content = message["content"]
                
                if "新任务" in content:
                    # 解析任务并执行
                    task = Task("T1", "示例任务", "开发功能")
                    code = await self.implement(task)
                    
                    await self.send_to("pm", 
                        f"任务完成,代码已提交\n{code[:100]}...")
                    
            except asyncio.TimeoutError:
                continue

class ReviewAgent(BaseAgent):
    """代码审查智能体"""
    
    def __init__(self, bus: MessageBus):
        super().__init__("reviewer", "代码审查员", bus)
    
    async def review_code(self, code: str) -> Dict[str, Any]:
        """审查代码"""
        print(f"\n[{self.name}] 开始代码审查...")
        await asyncio.sleep(0.5)
        
        issues = []
        # 模拟审查逻辑
        if "error handling" not in code.lower():
            issues.append("建议添加错误处理")
        
        result = {
            "passed": len(issues) == 0,
            "issues": issues,
            "comments": "代码结构清晰,符合规范"
        }
        
        print(f"[{self.name}] 审查完成: {'通过' if result['passed'] else '需修改'}")
        return result
    
    async def run(self):
        while True:
            try:
                message = await asyncio.wait_for(
                    self.inbox.get(), timeout=1.0
                )
                content = message["content"]
                
                if "审查" in content:
                    # 执行审查
                    review_result = await self.review_code("代码内容")
                    
                    if review_result["passed"]:
                        await self.send_to("pm", "审查通过,代码质量良好")
                    else:
                        await self.send_to("dev", 
                            f"需要修改: {', '.join(review_result['issues'])}")
                    
            except asyncio.TimeoutError:
                continue

class TestAgent(BaseAgent):
    """测试工程师智能体"""
    
    def __init__(self, bus: MessageBus):
        super().__init__("tester", "测试工程师", bus)
    
    async def run_tests(self) -> Dict[str, Any]:
        """执行测试"""
        print(f"\n[{self.name}] 开始执行测试...")
        
        test_cases = [
            ("创建用户", "PASSED"),
            ("查询用户", "PASSED"),
            ("更新用户", "PASSED"),
            ("删除用户", "PASSED")
        ]
        
        await asyncio.sleep(0.8)
        
        all_passed = all(result == "PASSED" for _, result in test_cases)
        
        print(f"[{self.name}] 测试执行完成")
        for case, result in test_cases:
            status = "✓" if result == "PASSED" else "✗"
            print(f"  {status} {case}")
        
        return {"passed": all_passed, "cases": test_cases}
    
    async def run(self):
        while True:
            try:
                message = await asyncio.wait_for(
                    self.inbox.get(), timeout=1.0
                )
                content = message["content"]
                
                if "测试" in content:
                    test_result = await self.run_tests()
                    
                    if test_result["passed"]:
                        await self.send_to("pm", "测试通过,所有用例执行成功")
                    else:
                        await self.send_to("dev", "发现bug,需要修复")
                    
            except asyncio.TimeoutError:
                continue

# 系统启动和协调
async def main():
    """主函数 - 启动多智能体系统"""
    print("=" * 60)
    print("      软件开发多智能体协作系统启动")
    print("=" * 60)
    
    # 创建消息总线
    bus = MessageBus()
    
    # 创建智能体
    pm = PMAgent(bus)
    dev = DevAgent(bus)
    reviewer = ReviewAgent(bus)
    tester = TestAgent(bus)
    
    # 注册到总线
    bus.register("pm", pm)
    bus.register("dev", dev)
    bus.register("reviewer", reviewer)
    bus.register("tester", tester)
    
    # 启动所有智能体
    agents = [pm.run(), dev.run(), reviewer.run(), tester.run()]
    
    # 产品经理分析需求并启动流程
    tasks = await pm.analyze_requirement(
        "开发一个用户管理系统,支持用户的增删改查功能"
    )
    await pm.assign_tasks(tasks)
    
    # 运行一段时间观察协作
    await asyncio.gather(*agents)

# 运行示例
if __name__ == "__main__":
    asyncio.run(main())

7.6.3 运行结果示例

============================================================
      软件开发多智能体协作系统启动
============================================================

[产品经理] 正在分析需求: 开发一个用户管理系统,支持用户的增删改查功能
[产品经理] 任务分解完成,共 3 个子任务
[pm -> dev]: 新任务 #1: 数据库设计
描述: 设计用户表结构和关系
[pm -> dev]: 新任务 #2: API开发
描述: 实现用户CRUD接口
...

[开发工程师] 开始开发: 数据库设计
[开发工程师] 开发完成
[dev -> pm]: 任务完成,代码已提交
...

[pm -> reviewer]: 请审查新提交的代码

[代码审查员] 开始代码审查...
[代码审查员] 审查完成: 通过
[reviewer -> pm]: 审查通过,代码质量良好
...

[pm -> tester]: 请进行功能测试

[测试工程师] 开始执行测试...
[测试工程师] 测试执行完成
  ✓ 创建用户
  ✓ 查询用户
  ✓ 更新用户
  ✓ 删除用户
[tester -> pm]: 测试通过,所有用例执行成功

[产品经理] 验收通过!产品可以交付。

7.7 多智能体的挑战

虽然多智能体系统功能强大,但也面临一些挑战:

7.7.1 通信开销

问题:智能体之间的频繁通信会消耗大量资源和时间,可能成为系统瓶颈。

解决方案:
  • 批量处理消息,减少通信次数
  • 使用共享内存或数据库减少消息传递
  • 合理设计通信粒度

7.7.2 一致性维护

多个智能体可能对同一信息有不同理解,导致决策不一致。

# 一致性检查示例
class ConsistencyChecker:
    """一致性检查器"""
    
    def check_consensus(self, opinions: List[str]) -> bool:
        """检查是否达成共识"""
        # 使用LLM判断意见是否一致
        prompt = f"""
        判断以下意见是否一致或相容:
        {opinions}
        
        如果相容返回: CONSISTENT
        如果不相容返回: CONFLICT
        解释原因。
        """
        result = llm.call(prompt)
        return "CONSISTENT" in result
    
    def resolve_inconsistency(self, opinions: List[str]) -> str:
        """解决不一致"""
        # 综合各方意见,生成一致的结论
        prompt = f"""
        综合以下意见,给出一个一致的结论:
        {opinions}
        """
        return llm.call(prompt)

7.7.3 错误传播

一个智能体的错误可能影响其他智能体,甚至导致连锁反应。

        错误传播风险
┌────────────────────────────────────────┐
│                                        │
│   Agent A (出错)                       │
│        │                               │
│        ▼ 错误输出                      │
│   Agent B (基于错误继续)               │
│        │                               │
│        ▼ 错误放大                      │
│   Agent C (严重偏离目标)               │
│                                        │
│   ⚠️ 需要:错误检测、边界检查、回滚机制  │
│                                        │
└────────────────────────────────────────┘

7.7.4 其他挑战

挑战 描述 应对策略
死锁 智能体互相等待对方完成 设置超时、引入仲裁者
负载不均 某些智能体过载,其他空闲 动态任务调度、负载均衡
调试困难 多智能体交互复杂,难以追踪 完善的日志、可视化工具
成本增加 多个LLM调用增加费用 选择性激活、缓存机制
最佳实践总结:
  1. 从简单场景开始,逐步增加复杂度
  2. 设计清晰的通信协议和错误处理机制
  3. 建立完善的监控和日志系统
  4. 定期进行系统审计和优化

本章小结

本章我们学习了多智能体系统的核心概念:

  • 多智能体系统 - 多个AI智能体协同工作的架构
  • 协作模式 - 主从模式、对等模式、流水线模式
  • 角色设计 - 按专业领域或工作职能划分角色
  • 通信协调 - 消息传递、冲突解决机制
  • 实战示例 - 完整的软件开发团队实现
  • 挑战应对 - 通信开销、一致性、错误传播等问题

多智能体系统让AI从"单兵作战"走向"团队协作",能够处理更复杂的现实问题。在下一章中,我们将通过三个完整的实战案例,综合运用所学知识,开发实用的智能体应用。