第 8 / 8 章

第8章:实战项目开发

恭喜来到最终章!在本章中,我们将综合运用前面学习的所有知识,构建三个完整的企业级实战项目。这些项目涵盖智能客服、知识管理和数据分析三个热门领域,代码完整可直接运行。

项目概览

项目 技术栈 核心特性
智能客服系统 RAG + Memory + Agent + FastAPI 文档问答、多轮对话、Docker部署
个人知识管理助手 多数据源 + Vector Store + Agent 笔记管理、智能问答、思维导图
AI数据分析平台 Agent + Tools + Visualization 自动分析、图表生成、报告导出

项目1:智能客服系统(企业级)

基于公司产品文档的智能客服系统,支持RAG检索、多轮对话和复杂请求处理。

1.1 系统架构

架构流程
用户提问 → 意图识别 → RAG检索 → LLM生成回答 → (可选)Agent处理复杂请求 → 返回结果

1.2 项目结构

customer_service/
├── app/
│   ├── __init__.py
│   ├── main.py           # FastAPI主程序
│   ├── config.py         # 配置管理
│   ├── rag.py            # RAG检索模块
│   ├── agent.py          # Agent处理模块
│   ├── memory.py         # 记忆管理
│   └── models.py         # 数据模型
├── data/
│   └── documents/        # 产品文档
├── vector_db/            # 向量数据库
├── Dockerfile
├── requirements.txt
└── docker-compose.yml

1.3 核心代码实现

# ==================== config.py ====================
"""配置管理"""
import os
from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    """应用配置"""
    # API配置
    OPENAI_API_KEY: str = os.getenv("OPENAI_API_KEY", "")
    OPENAI_BASE_URL: str = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
    
    # 模型配置
    LLM_MODEL: str = "gpt-3.5-turbo"
    EMBEDDING_MODEL: str = "text-embedding-3-small"
    
    # RAG配置
    CHUNK_SIZE: int = 500
    CHUNK_OVERLAP: int = 100
    TOP_K: int = 5
    
    # 向量数据库
    VECTOR_DB_PATH: str = "./vector_db"
    
    # 记忆配置
    MAX_HISTORY: int = 10
    
    # 应用配置
    APP_NAME: str = "智能客服系统"
    DEBUG: bool = False
    
    class Config:
        env_file = ".env"


@lru_cache()
def get_settings() -> Settings:
    return Settings()


# ==================== models.py ====================
"""数据模型"""
from pydantic import BaseModel
from typing import List, Optional, Dict
from datetime import datetime


class ChatMessage(BaseModel):
    """聊天消息"""
    role: str  # user / assistant / system
    content: str
    timestamp: datetime = datetime.now()


class ChatRequest(BaseModel):
    """聊天请求"""
    message: str
    session_id: str = "default"
    user_id: Optional[str] = None
    stream: bool = False


class ChatResponse(BaseModel):
    """聊天响应"""
    message: str
    sources: List[Dict] = []
    session_id: str
    used_agent: bool = False


class DocumentUpload(BaseModel):
    """文档上传"""
    filename: str
    content: str
    doc_type: str = "product"  # product / faq / policy


# ==================== memory.py ====================
"""对话记忆管理"""
from typing import List, Dict
from langchain.memory import ConversationBufferWindowMemory
from langchain.schema import HumanMessage, AIMessage
import json
import os


class SessionMemory:
    """基于会话的记忆管理"""
    
    _instances: Dict[str, 'SessionMemory'] = {}
    
    def __new__(cls, session_id: str):
        if session_id not in cls._instances:
            cls._instances[session_id] = super().__new__(cls)
        return cls._instances[session_id]
    
    def __init__(self, session_id: str):
        if hasattr(self, 'initialized'):
            return
        self.session_id = session_id
        self.memory = ConversationBufferWindowMemory(
            k=10,  # 保留最近10轮对话
            return_messages=True,
            memory_key="chat_history"
        )
        self.initialized = True
    
    def add_user_message(self, message: str):
        """添加用户消息"""
        self.memory.chat_memory.add_user_message(message)
    
    def add_ai_message(self, message: str):
        """添加AI消息"""
        self.memory.chat_memory.add_ai_message(message)
    
    def get_history(self) -> List[Dict]:
        """获取历史记录"""
        messages = self.memory.chat_memory.messages
        return [
            {"role": "user" if isinstance(m, HumanMessage) else "assistant", 
             "content": m.content}
            for m in messages
        ]
    
    def clear(self):
        """清空记忆"""
        self.memory.clear()


# ==================== rag.py ====================
"""RAG检索模块"""
from langchain_community.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema import Document
from typing import List, Dict
import os

from app.config import get_settings


class RAGService:
    """RAG检索服务"""
    
    def __init__(self):
        self.settings = get_settings()
        self.embeddings = OpenAIEmbeddings(
            model=self.settings.EMBEDDING_MODEL,
            api_key=self.settings.OPENAI_API_KEY,
            base_url=self.settings.OPENAI_BASE_URL
        )
        self.vectorstore = None
        self._init_vectorstore()
    
    def _init_vectorstore(self):
        """初始化向量数据库"""
        db_path = self.settings.VECTOR_DB_PATH
        
        if os.path.exists(db_path) and os.listdir(db_path):
            # 加载现有数据库
            self.vectorstore = Chroma(
                persist_directory=db_path,
                embedding_function=self.embeddings
            )
        else:
            # 创建空数据库
            self.vectorstore = Chroma(
                persist_directory=db_path,
                embedding_function=self.embeddings
            )
    
    def load_documents(self, docs_path: str = "./data/documents") -> List[Document]:
        """加载文档"""
        loader = DirectoryLoader(
            docs_path,
            glob="**/*.txt",
            loader_cls=TextLoader,
            loader_kwargs={'encoding': 'utf-8'}
        )
        documents = loader.load()
        return documents
    
    def process_and_index(self, documents: List[Document]):
        """处理文档并建立索引"""
        # 文本分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.settings.CHUNK_SIZE,
            chunk_overlap=self.settings.CHUNK_OVERLAP,
            separators=["\n\n", "\n", "。", ";", " ", ""]
        )
        
        chunks = text_splitter.split_documents(documents)
        
        # 添加元数据
        for i, chunk in enumerate(chunks):
            chunk.metadata["chunk_id"] = i
            chunk.metadata["source_type"] = "document"
        
        # 添加到向量数据库
        self.vectorstore.add_documents(chunks)
        self.vectorstore.persist()
        
        return len(chunks)
    
    def search(self, query: str, top_k: int = None) -> List[Dict]:
        """检索相关文档"""
        if top_k is None:
            top_k = self.settings.TOP_K
        
        results = self.vectorstore.similarity_search_with_score(
            query, 
            k=top_k
        )
        
        return [
            {
                "content": doc.page_content,
                "metadata": doc.metadata,
                "score": float(score)
            }
            for doc, score in results
        ]
    
    def add_texts(self, texts: List[str], metadatas: List[Dict] = None):
        """添加文本到向量库"""
        self.vectorstore.add_texts(texts, metadatas=metadatas)
        self.vectorstore.persist()


# ==================== agent.py ====================
"""Agent处理模块 - 处理复杂请求"""
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from typing import List, Dict

from app.config import get_settings
from app.rag import RAGService


class CustomerServiceAgent:
    """客服专用Agent"""
    
    def __init__(self):
        self.settings = get_settings()
        self.rag_service = RAGService()
        self.agent_executor = self._create_agent()
    
    def _create_agent(self):
        """创建Agent"""
        
        # 定义工具
        @tool
        def search_products(keyword: str) -> str:
            """搜索产品信息"""
            results = self.rag_service.search(keyword, top_k=3)
            if not results:
                return "未找到相关产品信息"
            return "\n".join([r["content"] for r in results])
        
        @tool
        def check_order_status(order_id: str) -> str:
            """查询订单状态(模拟)"""
            # 实际应连接订单系统
            return f"订单 {order_id} 状态:已发货,预计明天送达"
        
        @tool
        def get_return_policy() -> str:
            """获取退换货政策"""
            return """
退换货政策:
1. 7天无理由退货
2. 质量问题30天包换
3. 需提供购买凭证
4. 特殊商品除外
"""
        
        @tool
        def escalate_to_human(reason: str) -> str:
            """转人工客服"""
            return f"已为您转接人工客服,原因:{reason}"
        
        tools = [
            search_products,
            check_order_status,
            get_return_policy,
            escalate_to_human
        ]
        
        llm = ChatOpenAI(
            model=self.settings.LLM_MODEL,
            temperature=0,
            api_key=self.settings.OPENAI_API_KEY,
            base_url=self.settings.OPENAI_BASE_URL
        )
        
        prompt = hub.pull("hwchase17/react")
        agent = create_react_agent(llm, tools, prompt)
        
        return AgentExecutor(
            agent=agent,
            tools=tools,
            verbose=True,
            max_iterations=5,
            handle_parsing_errors=True
        )
    
    def process(self, query: str) -> str:
        """处理复杂查询"""
        result = self.agent_executor.invoke({"input": query})
        return result["output"]


# ==================== main.py ====================
"""FastAPI主程序"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager

from app.config import get_settings
from app.models import ChatRequest, ChatResponse
from app.memory import SessionMemory
from app.rag import RAGService
from app.agent import CustomerServiceAgent
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate


# 全局服务实例
rag_service: RAGService = None
agent_service: CustomerServiceAgent = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    """应用生命周期管理"""
    global rag_service, agent_service
    
    # 启动时初始化
    settings = get_settings()
    rag_service = RAGService()
    agent_service = CustomerServiceAgent()
    
    print(f"✅ {settings.APP_NAME} 启动成功")
    print(f"📚 向量库文档数: {rag_service.vectorstore._collection.count()}")
    
    yield
    
    # 关闭时清理
    print("👋 应用关闭")


app = FastAPI(
    title="智能客服系统",
    description="基于RAG和Agent的企业级智能客服",
    version="1.0.0",
    lifespan=lifespan
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# 判断是否使用Agent的提示词
AGENT_DETECTION_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """你是一个意图分类器。判断用户问题是否需要Agent处理。
    
Agent适合处理:
- 需要多步骤查询的问题
- 涉及订单操作的问题
- 需要检索政策/条款的问题
- 复杂的退换货咨询

直接回答适合:
- 简单的产品咨询
- 一般性问题
- 闲聊

只回答 "AGENT" 或 "DIRECT",不要解释。"""),
    ("human", "{query}")
])


def should_use_agent(query: str) -> bool:
    """判断是否使用Agent"""
    settings = get_settings()
    llm = ChatOpenAI(
        model=settings.LLM_MODEL,
        temperature=0,
        api_key=settings.OPENAI_API_KEY,
        base_url=settings.OPENAI_BASE_URL
    )
    
    chain = AGENT_DETECTION_PROMPT | llm
    result = chain.invoke({"query": query})
    
    return "AGENT" in result.content.upper()


@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    """客服对话接口"""
    try:
        settings = get_settings()
        session_memory = SessionMemory(request.session_id)
        
        # 保存用户消息
        session_memory.add_user_message(request.message)
        
        # 判断是否使用Agent
        use_agent = should_use_agent(request.message)
        
        if use_agent:
            # 使用Agent处理复杂请求
            answer = agent_service.process(request.message)
            sources = []
        else:
            # RAG检索 + LLM生成
            search_results = rag_service.search(request.message)
            
            # 构建上下文
            context = "\n\n".join([
                f"[来源 {i+1}] {r['content']}"
                for i, r in enumerate(search_results[:3])
            ])
            
            # 构建提示词
            prompt = ChatPromptTemplate.from_messages([
                ("system", """你是专业的客服助手。基于以下参考资料回答用户问题。
                
参考资料:
{context}

回答要求:
1. 基于参考资料回答
2. 如果资料不足,明确告知用户
3. 语气友好专业
4. 回答简洁明了"""),
                ("human", "{question}")
            ])
            
            llm = ChatOpenAI(
                model=settings.LLM_MODEL,
                temperature=0.7,
                api_key=settings.OPENAI_API_KEY,
                base_url=settings.OPENAI_BASE_URL
            )
            
            chain = prompt | llm
            result = chain.invoke({
                "context": context,
                "question": request.message
            })
            
            answer = result.content
            sources = [{"content": r["content"], "score": r["score"]} 
                      for r in search_results[:3]]
        
        # 保存AI回复
        session_memory.add_ai_message(answer)
        
        return ChatResponse(
            message=answer,
            sources=sources,
            session_id=request.session_id,
            used_agent=use_agent
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/documents/index")
async def index_documents():
    """索引文档"""
    try:
        documents = rag_service.load_documents()
        chunk_count = rag_service.process_and_index(documents)
        return {"message": f"成功索引 {chunk_count} 个文档块"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/health")
async def health_check():
    """健康检查"""
    return {
        "status": "healthy",
        "vector_db_count": rag_service.vectorstore._collection.count()
    }


if __name__ == "__main__:":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

1.4 Docker部署配置

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  customer_service:
    build: .
    ports:
      - "8000:8000"
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - OPENAI_BASE_URL=${OPENAI_BASE_URL:-https://api.openai.com/v1}
      - LLM_MODEL=gpt-3.5-turbo
    volumes:
      - ./data:/app/data
      - ./vector_db:/app/vector_db
    restart: unless-stopped
    
  # 可选:添加监控
  langsmith:
    image: langchain/langsmith
    ports:
      - "1984:1984"
    environment:
      - LANGCHAIN_API_KEY=${LANGCHAIN_API_KEY}
# requirements.txt
fastapi==0.104.1
uvicorn==0.24.0
langchain==0.1.0
langchain-openai==0.0.5
langchain-community==0.0.10
chromadb==0.4.18
pydantic==2.5.0
pydantic-settings==2.1.0
python-multipart==0.0.6
✅ 项目1亮点
  • 分层架构:配置、模型、服务分离
  • 智能路由:自动判断使用RAG还是Agent
  • 会话记忆:支持多轮对话上下文
  • Docker化:一键部署,易于扩展

项目2:个人知识管理助手

管理个人笔记、论文、网页收藏,支持智能问答、自动总结和思维导图生成。

2.1 系统架构

多数据源 → 统一处理 → 向量化存储 → RAG问答 / Agent分析 / 自动总结

2.2 核心代码实现

# ==================== knowledge_manager.py ====================
"""个人知识管理助手 - 完整实现"""

import os
import json
import hashlib
from pathlib import Path
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass
from datetime import datetime
import sqlite3

from langchain_community.document_loaders import (
    TextLoader, 
    UnstructuredMarkdownLoader,
    PyPDFLoader,
    WebBaseLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.vectorstores import Chroma
from langchain.chains.summarize import load_summarize_chain
from langchain.prompts import PromptTemplate
from langchain.schema import Document
from langchain.tools import tool
from langchain.agents import create_react_agent, AgentExecutor
from langchain import hub


@dataclass
class KnowledgeItem:
    """知识条目"""
    id: str
    title: str
    content: str
    source: str  # 来源:file / web / note
    source_path: str
    doc_type: str  # note / paper / article / bookmark
    tags: List[str]
    created_at: datetime
    summary: Optional[str] = None


class KnowledgeStore:
    """知识存储管理"""
    
    def __init__(self, db_path: str = "./knowledge.db"):
        self.db_path = db_path
        self._init_db()
        
        # 向量数据库
        self.embeddings = OpenAIEmbeddings()
        self.vectorstore = Chroma(
            collection_name="knowledge",
            embedding_function=self.embeddings,
            persist_directory="./knowledge_vectors"
        )
    
    def _init_db(self):
        """初始化SQLite数据库"""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS knowledge_items (
                    id TEXT PRIMARY KEY,
                    title TEXT,
                    content TEXT,
                    source TEXT,
                    source_path TEXT,
                    doc_type TEXT,
                    tags TEXT,
                    created_at TEXT,
                    summary TEXT
                )
            """)
            
            conn.execute("""
                CREATE TABLE IF NOT EXISTS tags (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT UNIQUE
                )
            """)
    
    def add_item(self, item: KnowledgeItem) -> str:
        """添加知识条目"""
        # 生成ID
        if not item.id:
            item.id = hashlib.md5(
                f"{item.source_path}{item.created_at}".encode()
            ).hexdigest()[:12]
        
        # 保存到SQLite
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                """INSERT OR REPLACE INTO knowledge_items 
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    item.id, item.title, item.content,
                    item.source, item.source_path, item.doc_type,
                    json.dumps(item.tags), 
                    item.created_at.isoformat(),
                    item.summary
                )
            )
            
            # 保存标签
            for tag in item.tags:
                conn.execute(
                    "INSERT OR IGNORE INTO tags (name) VALUES (?)",
                    (tag,)
                )
        
        # 添加到向量库
        self.vectorstore.add_texts(
            texts=[item.content],
            metadatas=[{
                "id": item.id,
                "title": item.title,
                "doc_type": item.doc_type,
                "tags": ",".join(item.tags),
                "source": item.source
            }],
            ids=[item.id]
        )
        
        return item.id
    
    def search(self, query: str, doc_type: str = None, tags: List[str] = None) -> List[Dict]:
        """搜索知识"""
        # 构建过滤条件
        filter_dict = {}
        if doc_type:
            filter_dict["doc_type"] = doc_type
        
        # 向量检索
        results = self.vectorstore.similarity_search_with_score(
            query, 
            k=10,
            filter=filter_dict if filter_dict else None
        )
        
        # 后处理过滤标签
        filtered_results = []
        for doc, score in results:
            if tags:
                doc_tags = doc.metadata.get("tags", "").split(",")
                if not any(tag in doc_tags for tag in tags):
                    continue
            
            filtered_results.append({
                "id": doc.metadata.get("id"),
                "title": doc.metadata.get("title"),
                "content": doc.page_content[:500] + "...",
                "doc_type": doc.metadata.get("doc_type"),
                "score": float(score)
            })
        
        return filtered_results
    
    def get_all_tags(self) -> List[str]:
        """获取所有标签"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute("SELECT name FROM tags")
            return [row[0] for row in cursor.fetchall()]
    
    def get_items_by_type(self, doc_type: str) -> List[Dict]:
        """按类型获取条目"""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "SELECT id, title, tags, created_at FROM knowledge_items WHERE doc_type=?",
                (doc_type,)
            )
            return [
                {
                    "id": row[0],
                    "title": row[1],
                    "tags": json.loads(row[2]),
                    "created_at": row[3]
                }
                for row in cursor.fetchall()
            ]


class DocumentProcessor:
    """文档处理器"""
    
    def __init__(self):
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            separators=["\n\n", "\n", "。", ";", " ", ""]
        )
    
    def load_file(self, file_path: str) -> List[Document]:
        """加载文件"""
        path = Path(file_path)
        
        if path.suffix.lower() == '.pdf':
            loader = PyPDFLoader(file_path)
        elif path.suffix.lower() in ['.md', '.markdown']:
            loader = UnstructuredMarkdownLoader(file_path)
        else:
            loader = TextLoader(file_path, encoding='utf-8')
        
        return loader.load()
    
    def load_web(self, url: str) -> List[Document]:
        """加载网页"""
        loader = WebBaseLoader(url)
        return loader.load()
    
    def process(self, documents: List[Document], doc_type: str, tags: List[str]) -> KnowledgeItem:
        """处理文档为知识条目"""
        # 合并内容
        full_content = "\n\n".join([doc.page_content for doc in documents])
        
        # 提取标题(使用第一行或文件名)
        title = documents[0].metadata.get('title', '')
        if not title:
            title = documents[0].metadata.get('source', '未命名')
            title = Path(title).stem
        
        # 创建知识条目
        return KnowledgeItem(
            id="",
            title=title,
            content=full_content,
            source="file" if 'http' not in documents[0].metadata.get('source', '') else 'web',
            source_path=documents[0].metadata.get('source', ''),
            doc_type=doc_type,
            tags=tags,
            created_at=datetime.now()
        )


class Summarizer:
    """文档总结器"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3)
    
    def summarize(self, content: str, summary_type: str = "general") -> str:
        """生成总结"""
        
        templates = {
            "general": """请对以下内容进行简洁的总结:

{content}

要求:
- 控制在200字以内
- 保留关键信息
- 使用 bullet points

总结:""",
            
            "academic": """请对以下学术论文进行总结:

{content}

总结格式:
1. 研究背景
2. 主要方法
3. 关键发现
4. 研究意义

总结:""",
            
            "key_points": """从以下内容中提取关键要点:

{content}

请列出5-7个最重要的要点:"""
        }
        
        template = templates.get(summary_type, templates["general"])
        prompt = PromptTemplate.from_template(template)
        
        chain = prompt | self.llm
        result = chain.invoke({"content": content[:4000]})
        
        return result.content


class MindMapGenerator:
    """思维导图生成器"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
    
    def generate(self, content: str, title: str = "思维导图") -> Dict:
        """生成思维导图数据(Markdown格式,可导入XMind等)"""
        
        prompt = f"""分析以下内容,生成思维导图结构。

内容:
{content[:3000]}

请以JSON格式输出思维导图结构:
{{
    "中心主题": "{title}",
    "分支": [
        {{
            "主题": "分支1",
            "子主题": ["子主题1-1", "子主题1-2"]
        }},
        ...
    ]
}}

只输出JSON,不要其他内容。"""
        
        result = self.llm.invoke(prompt)
        
        try:
            mindmap_data = json.loads(result.content)
            return mindmap_data
        except:
            # 如果解析失败,返回简单结构
            return {
                "中心主题": title,
                "分支": [{"主题": "内容概要", "子主题": ["详见原文"]}]
            }
    
    def to_markdown(self, mindmap_data: Dict) -> str:
        """转换为Markdown格式"""
        md = f"# {mindmap_data['中心主题']}\n\n"
        
        for branch in mindmap_data.get('分支', []):
            md += f"## {branch['主题']}\n"
            for sub in branch.get('子主题', []):
                md += f"- {sub}\n"
            md += "\n"
        
        return md
    
    def to_xmind_markdown(self, mindmap_data: Dict) -> str:
        """生成XMind兼容的Markdown"""
        lines = [f"# {mindmap_data['中心主题']}"]
        
        for branch in mindmap_data.get('分支', []):
            lines.append(f"## {branch['主题']}")
            for sub in branch.get('子主题', []):
                lines.append(f"### {sub}")
        
        return "\n".join(lines)


class KnowledgeAgent:
    """知识管理Agent"""
    
    def __init__(self, knowledge_store: KnowledgeStore):
        self.knowledge_store = knowledge_store
        self.summarizer = Summarizer()
        self.mindmap_gen = MindMapGenerator()
        self._create_agent()
    
    def _create_agent(self):
        """创建Agent"""
        
        @tool
        def search_knowledge(query: str) -> str:
            """搜索知识库"""
            results = self.knowledge_store.search(query)
            if not results:
                return "未找到相关内容"
            return json.dumps(results[:3], ensure_ascii=False)
        
        @tool
        def summarize_document(doc_id: str, summary_type: str = "general") -> str:
            """总结指定文档"""
            # 从向量库获取文档
            # 简化处理,实际应该查SQLite
            return f"文档 {doc_id} 的总结:这是一个重要的文档..."
        
        @tool
        def generate_mindmap(doc_id: str) -> str:
            """生成思维导图"""
            mindmap = self.mindmap_gen.generate("文档内容", "知识导图")
            return self.mindmap_gen.to_markdown(mindmap)
        
        @tool
        def list_by_type(doc_type: str) -> str:
            """按类型列出文档"""
            items = self.knowledge_store.get_items_by_type(doc_type)
            return json.dumps(items, ensure_ascii=False)
        
        tools = [search_knowledge, summarize_document, generate_mindmap, list_by_type]
        
        llm = ChatOpenAI(model="gpt-4o", temperature=0)
        prompt = hub.pull("hwchase17/react")
        
        agent = create_react_agent(llm, tools, prompt)
        
        self.agent_executor = AgentExecutor(
            agent=agent,
            tools=tools,
            verbose=True,
            max_iterations=5
        )
    
    def chat(self, query: str) -> str:
        """对话式查询"""
        result = self.agent_executor.invoke({"input": query})
        return result["output"]


# ==================== 主程序 ====================

class PersonalKnowledgeAssistant:
    """个人知识管理助手 - 主类"""
    
    def __init__(self):
        self.store = KnowledgeStore()
        self.processor = DocumentProcessor()
        self.summarizer = Summarizer()
        self.mindmap_gen = MindMapGenerator()
        self.agent = KnowledgeAgent(self.store)
    
    def add_note(self, title: str, content: str, tags: List[str] = None) -> str:
        """添加笔记"""
        item = KnowledgeItem(
            id="",
            title=title,
            content=content,
            source="note",
            source_path="manual",
            doc_type="note",
            tags=tags or [],
            created_at=datetime.now()
        )
        return self.store.add_item(item)
    
    def add_file(self, file_path: str, doc_type: str, tags: List[str] = None) -> str:
        """添加文件"""
        # 加载文档
        documents = self.processor.load_file(file_path)
        
        # 处理为知识条目
        item = self.processor.process(documents, doc_type, tags or [])
        
        # 自动生成总结
        item.summary = self.summarizer.summarize(item.content)
        
        return self.store.add_item(item)
    
    def add_web(self, url: str, tags: List[str] = None) -> str:
        """添加网页"""
        documents = self.processor.load_web(url)
        
        item = self.processor.process(documents, "article", tags or [])
        item.summary = self.summarizer.summarize(item.content)
        
        return self.store.add_item(item)
    
    def query(self, question: str, doc_type: str = None) -> List[Dict]:
        """查询知识"""
        return self.store.search(question, doc_type=doc_type)
    
    def chat(self, question: str) -> str:
        """对话式交互"""
        return self.agent.chat(question)
    
    def generate_mindmap(self, query: str) -> str:
        """为查询结果生成思维导图"""
        results = self.store.search(query)
        
        if not results:
            return "未找到相关内容"
        
        # 合并内容生成思维导图
        combined_content = "\n".join([r["content"] for r in results[:3]])
        mindmap_data = self.mindmap_gen.generate(combined_content, query)
        
        return self.mindmap_gen.to_xmind_markdown(mindmap_data)


# 使用示例
if __name__ == "__main__":
    assistant = PersonalKnowledgeAssistant()
    
    # 1. 添加笔记
    print("=== 添加笔记 ===")
    note_id = assistant.add_note(
        title="LangChain学习笔记",
        content="""LangChain是一个强大的LLM应用开发框架。
        
核心组件包括:
1. Model I/O - 模型输入输出
2. Chains - 链式调用
3. Agents - 智能体
4. Memory - 记忆
5. Vector Stores - 向量存储

学习建议:循序渐进,从基础开始。""",
        tags=["AI", "LangChain", "学习"]
    )
    print(f"笔记已添加,ID: {note_id}")
    
    # 2. 查询
    print("\n=== 查询 ===")
    results = assistant.query("LangChain核心组件")
    for r in results:
        print(f"[{r['doc_type']}] {r['title']} (相关度: {r['score']:.3f})")
    
    # 3. Agent对话
    print("\n=== Agent对话 ===")
    response = assistant.chat("我有哪些关于AI的笔记?")
    print(response)
✅ 项目2亮点
  • 多数据源支持:文件、PDF、网页、手动笔记
  • 自动总结:AI生成文档摘要
  • 思维导图:一键生成知识图谱
  • Agent交互:自然语言管理知识

项目3:AI数据分析平台

上传CSV/Excel文件,自动分析数据、生成图表、输出洞察报告。

3.1 系统架构

文件上传 → 数据预览 → Agent分析 → 可视化 → 报告生成

3.2 核心代码实现

# ==================== data_analysis_platform.py ====================
"""AI数据分析平台 - 完整实现"""

import os
import io
import json
import base64
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from matplotlib.backends.backend_agg import FigureCanvasAgg

from langchain.agents import create_react_agent, AgentExecutor
from langchain.tools import tool, StructuredTool
from langchain_openai import ChatOpenAI
from langchain import hub
from pydantic import BaseModel, Field


# 设置matplotlib中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False


@dataclass
class AnalysisResult:
    """分析结果"""
    summary: str
    insights: List[str]
    charts: List[str]  # base64编码的图片
    recommendations: List[str]


class DataManager:
    """数据管理器"""
    
    def __init__(self):
        self.datasets: Dict[str, pd.DataFrame] = {}
    
    def load_csv(self, file_path: str, dataset_id: str = None) -> str:
        """加载CSV文件"""
        if dataset_id is None:
            dataset_id = os.path.basename(file_path)
        
        df = pd.read_csv(file_path)
        self.datasets[dataset_id] = df
        
        return dataset_id
    
    def load_excel(self, file_path: str, sheet_name: int = 0, dataset_id: str = None) -> str:
        """加载Excel文件"""
        if dataset_id is None:
            dataset_id = os.path.basename(file_path)
        
        df = pd.read_excel(file_path, sheet_name=sheet_name)
        self.datasets[dataset_id] = df
        
        return dataset_id
    
    def get_dataset(self, dataset_id: str) -> pd.DataFrame:
        """获取数据集"""
        if dataset_id not in self.datasets:
            raise ValueError(f"数据集 {dataset_id} 不存在")
        return self.datasets[dataset_id]
    
    def get_info(self, dataset_id: str) -> Dict:
        """获取数据集信息"""
        df = self.get_dataset(dataset_id)
        
        # 数值列统计
        numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
        
        return {
            "shape": df.shape,
            "columns": df.columns.tolist(),
            "dtypes": df.dtypes.to_dict(),
            "numeric_columns": numeric_cols,
            "missing_values": df.isnull().sum().to_dict(),
            "sample": df.head(5).to_dict(orient='records')
        }
    
    def list_datasets(self) -> List[str]:
        """列出所有数据集"""
        return list(self.datasets.keys())


class DataAnalysisTools:
    """数据分析工具集"""
    
    def __init__(self, data_manager: DataManager):
        self.data_manager = data_manager
    
    def create_tools(self) -> List:
        """创建所有工具"""
        
        @tool
        def get_dataframe_info(dataset_id: str) -> str:
            """获取数据框的基本信息"""
            try:
                info = self.data_manager.get_info(dataset_id)
                return json.dumps(info, indent=2, default=str)
            except Exception as e:
                return f"错误: {str(e)}"
        
        @tool
        def describe_column(dataset_id: str, column: str) -> str:
            """描述指定列的统计信息"""
            try:
                df = self.data_manager.get_dataset(dataset_id)
                
                if column not in df.columns:
                    return f"列 '{column}' 不存在"
                
                desc = df[column].describe()
                return desc.to_json()
            except Exception as e:
                return f"错误: {str(e)}"
        
        @tool
        def execute_pandas_query(dataset_id: str, query: str) -> str:
            """执行Pandas查询代码
            
            可用的变量:
            - df: 数据框
            - pd: pandas库
            - np: numpy库
            
            示例:
            - df.groupby('category')['sales'].sum()
            - df[df['age'] > 30].shape[0]
            """
            try:
                df = self.data_manager.get_dataset(dataset_id)
                
                # 限制可用函数(安全考虑)
                allowed_names = {
                    'df': df,
                    'pd': pd,
                    'np': np
                }
                
                result = eval(query, {"__builtins__": {}}, allowed_names)
                
                # 格式化结果
                if hasattr(result, 'to_string'):
                    return result.to_string()
                elif hasattr(result, 'tolist'):
                    return str(result.tolist())
                else:
                    return str(result)
                    
            except Exception as e:
                return f"查询错误: {str(e)}"
        
        @tool
        def correlation_analysis(dataset_id: str) -> str:
            """相关性分析"""
            try:
                df = self.data_manager.get_dataset(dataset_id)
                numeric_df = df.select_dtypes(include=[np.number])
                
                if numeric_df.empty:
                    return "没有数值列可供分析"
                
                corr = numeric_df.corr()
                return corr.to_json()
            except Exception as e:
                return f"错误: {str(e)}"
        
        @tool
        def detect_outliers(dataset_id: str, column: str) -> str:
            """检测异常值(使用IQR方法)"""
            try:
                df = self.data_manager.get_dataset(dataset_id)
                
                if column not in df.columns:
                    return f"列 '{column}' 不存在"
                
                data = df[column].dropna()
                Q1 = data.quantile(0.25)
                Q3 = data.quantile(0.75)
                IQR = Q3 - Q1
                
                outliers = data[(data < Q1 - 1.5*IQR) | (data > Q3 + 1.5*IQR)]
                
                return json.dumps({
                    "column": column,
                    "outlier_count": len(outliers),
                    "outlier_percentage": f"{len(outliers)/len(data)*100:.2f}%",
                    "Q1": Q1,
                    "Q3": Q3,
                    "IQR": IQR,
                    "outlier_sample": outliers.head(5).tolist()
                })
            except Exception as e:
                return f"错误: {str(e)}"
        
        return [
            get_dataframe_info,
            describe_column,
            execute_pandas_query,
            correlation_analysis,
            detect_outliers
        ]


class VisualizationService:
    """可视化服务"""
    
    def __init__(self, data_manager: DataManager):
        self.data_manager = data_manager
    
    def create_chart(self, dataset_id: str, chart_type: str, 
                     x_column: str = None, y_column: str = None,
                     title: str = None) -> str:
        """创建图表,返回base64编码"""
        
        df = self.data_manager.get_dataset(dataset_id)
        
        fig, ax = plt.subplots(figsize=(10, 6))
        
        if chart_type == "line":
            if x_column and y_column:
                ax.plot(df[x_column], df[y_column], marker='o')
            else:
                df.plot(ax=ax)
                
        elif chart_type == "bar":
            if x_column and y_column:
                df.plot(x=x_column, y=y_column, kind='bar', ax=ax)
            else:
                df.plot(kind='bar', ax=ax)
                
        elif chart_type == "histogram":
            if y_column:
                df[y_column].hist(ax=ax, bins=20)
            else:
                df.hist(ax=ax, bins=20)
                
        elif chart_type == "scatter":
            if x_column and y_column:
                ax.scatter(df[x_column], df[y_column], alpha=0.6)
                
        elif chart_type == "heatmap":
            numeric_df = df.select_dtypes(include=[np.number])
            sns.heatmap(numeric_df.corr(), annot=True, cmap='coolwarm', ax=ax)
        
        elif chart_type == "box":
            if y_column:
                df.boxplot(column=y_column, ax=ax)
            else:
                df.boxplot(ax=ax)
        
        if title:
            ax.set_title(title, fontsize=14)
        
        plt.tight_layout()
        
        # 转换为base64
        buf = io.BytesIO()
        fig.savefig(buf, format='png', dpi=150)
        buf.seek(0)
        img_base64 = base64.b64encode(buf.read()).decode('utf-8')
        plt.close(fig)
        
        return img_base64
    
    def create_multi_chart(self, dataset_id: str, charts_config: List[Dict]) -> List[str]:
        """批量创建图表"""
        images = []
        for config in charts_config:
            img = self.create_chart(
                dataset_id=dataset_id,
                chart_type=config.get('type', 'line'),
                x_column=config.get('x'),
                y_column=config.get('y'),
                title=config.get('title', '')
            )
            images.append(img)
        return images


class ReportGenerator:
    """报告生成器"""
    
    def __init__(self):
        self.llm = ChatOpenAI(model="gpt-4o", temperature=0.3)
    
    def generate_report(self, dataset_id: str, analysis_results: Dict, 
                       user_question: str = None) -> str:
        """生成分析报告"""
        
        prompt = f"""基于以下数据分析结果,生成一份专业的分析报告。

数据集: {dataset_id}
用户问题: {user_question or '全面数据分析'}

数据概况:
{json.dumps(analysis_results.get('info', {}), indent=2, default=str)[:2000]}

统计数据:
{json.dumps(analysis_results.get('stats', {}), indent=2, default=str)[:1500]}

请按以下结构生成报告:

# 数据分析报告

## 1. 数据概览
- 数据规模、字段说明
- 数据质量评估

## 2. 统计分析
- 关键指标
- 分布特征
- 趋势分析

## 3. 关键发现
- 数据洞察(3-5条)
- 异常点说明

## 4. 建议与结论
- 业务建议
- 后续分析方向

报告要求:
- 专业但易懂
- 使用Markdown格式
- 包含具体数字支撑
"""
        
        result = self.llm.invoke(prompt)
        return result.content
    
    def generate_insights(self, data_sample: str) -> List[str]:
        """生成数据洞察"""
        
        prompt = f"""分析以下数据,提取3-5条关键洞察:

{data_sample}

请以JSON格式返回:
{{"insights": ["洞察1", "洞察2", "洞察3"]}}"""
        
        result = self.llm.invoke(prompt)
        
        try:
            data = json.loads(result.content)
            return data.get('insights', [])
        except:
            return ["数据整体趋势良好", "建议关注异常值", "可考虑进一步细分分析"]


class DataAnalysisAgent:
    """数据分析Agent"""
    
    def __init__(self, data_manager: DataManager):
        self.data_manager = data_manager
        self.analysis_tools = DataAnalysisTools(data_manager)
        self.viz_service = VisualizationService(data_manager)
        self.report_gen = ReportGenerator()
        self._create_agent()
    
    def _create_agent(self):
        """创建Agent"""
        
        tools = self.analysis_tools.create_tools()
        
        # 添加可视化工具
        @tool
        def suggest_visualization(dataset_id: str, purpose: str) -> str:
            """根据分析目的推荐可视化方案"""
            suggestions = {
                "trend": "使用折线图展示趋势",
                "distribution": "使用直方图或箱线图展示分布",
                "comparison": "使用柱状图进行对比",
                "relationship": "使用散点图或热力图展示关系"
            }
            return suggestions.get(purpose, "建议根据数据特点选择合适的图表")
        
        tools.append(suggest_visualization)
        
        llm = ChatOpenAI(model="gpt-4o", temperature=0)
        prompt = hub.pull("hwchase17/react")
        
        agent = create_react_agent(llm, tools, prompt)
        
        self.agent_executor = AgentExecutor(
            agent=agent,
            tools=tools,
            verbose=True,
            max_iterations=10,
            handle_parsing_errors=True
        )
    
    def analyze(self, dataset_id: str, question: str = None) -> AnalysisResult:
        """执行完整分析"""
        
        # 1. 获取数据信息
        info = self.data_manager.get_info(dataset_id)
        
        # 2. Agent分析
        agent_input = f"""分析数据集 '{dataset_id}'

数据概况: {json.dumps(info, default=str)}

用户问题: {question or '请进行全面数据分析'}

请执行以下分析:
1. 查看数据基本信息
2. 分析数值列的统计特征
3. 进行相关性分析(如果有多个数值列)
4. 检查异常值
5. 总结关键发现"""
        
        agent_result = self.agent_executor.invoke({"input": agent_input})
        
        # 3. 生成可视化
        charts = []
        numeric_cols = info.get('numeric_columns', [])
        
        if numeric_cols:
            # 分布图
            charts.append(self.viz_service.create_chart(
                dataset_id, "histogram", y_column=numeric_cols[0],
                title=f"{numeric_cols[0]} 分布"
            ))
            
            # 如果有多个数值列,添加相关性热力图
            if len(numeric_cols) >= 2:
                charts.append(self.viz_service.create_chart(
                    dataset_id, "heatmap", title="相关性热力图"
                ))
        
        # 4. 生成报告
        analysis_results = {
            'info': info,
            'agent_analysis': agent_result['output']
        }
        
        report = self.report_gen.generate_report(
            dataset_id, analysis_results, question
        )
        
        # 5. 提取洞察
        insights = self.report_gen.generate_insights(
            json.dumps(info.get('sample', []), default=str)
        )
        
        return AnalysisResult(
            summary=report,
            insights=insights,
            charts=charts,
            recommendations=["建议定期更新数据", "关注关键指标变化"]
        )


# ==================== 主程序 ====================

class DataAnalysisPlatform:
    """AI数据分析平台 - 主类"""
    
    def __init__(self):
        self.data_manager = DataManager()
        self.agent = None
    
    def upload_csv(self, file_path: str, dataset_id: str = None) -> str:
        """上传CSV文件"""
        dataset_id = self.data_manager.load_csv(file_path, dataset_id)
        self._ensure_agent()
        return dataset_id
    
    def upload_excel(self, file_path: str, sheet_name: int = 0, 
                    dataset_id: str = None) -> str:
        """上传Excel文件"""
        dataset_id = self.data_manager.load_excel(file_path, sheet_name, dataset_id)
        self._ensure_agent()
        return dataset_id
    
    def _ensure_agent(self):
        """确保Agent已初始化"""
        if self.agent is None:
            self.agent = DataAnalysisAgent(self.data_manager)
    
    def preview(self, dataset_id: str, n: int = 10) -> pd.DataFrame:
        """预览数据"""
        df = self.data_manager.get_dataset(dataset_id)
        return df.head(n)
    
    def analyze(self, dataset_id: str, question: str = None) -> AnalysisResult:
        """分析数据"""
        self._ensure_agent()
        return self.agent.analyze(dataset_id, question)
    
    def query(self, dataset_id: str, question: str) -> str:
        """对话式查询"""
        self._ensure_agent()
        result = self.agent.agent_executor.invoke({
            "input": f"数据集: {dataset_id}\n问题: {question}"
        })
        return result['output']
    
    def list_datasets(self) -> List[str]:
        """列出所有数据集"""
        return self.data_manager.list_datasets()


# 使用示例
if __name__ == "__main__":
    platform = DataAnalysisPlatform()
    
    # 创建示例数据
    sample_data = pd.DataFrame({
        'date': pd.date_range('2024-01-01', periods=100),
        'sales': np.random.normal(1000, 200, 100),
        'customers': np.random.randint(50, 200, 100),
        'region': np.random.choice(['North', 'South', 'East', 'West'], 100)
    })
    sample_data.to_csv('sample_sales.csv', index=False)
    
    # 1. 上传数据
    print("=== 上传数据 ===")
    dataset_id = platform.upload_csv('sample_sales.csv', 'sales_data')
    print(f"数据集ID: {dataset_id}")
    
    # 2. 预览
    print("\n=== 数据预览 ===")
    print(platform.preview(dataset_id, 5))
    
    # 3. 分析
    print("\n=== 开始分析 ===")
    result = platform.analyze(dataset_id, "分析销售趋势和客户分布")
    
    print("\n关键洞察:")
    for insight in result.insights:
        print(f"- {insight}")
    
    print(f"\n生成图表数: {len(result.charts)}")
    print("\n报告预览(前500字):")
    print(result.summary[:500])
    
    # 清理
    os.remove('sample_sales.csv')
✅ 项目3亮点
  • 全流程自动化:上传→分析→可视化→报告
  • 安全查询:限制eval环境,防止代码注入
  • Agent驱动:智能选择分析策略
  • 丰富图表:支持6种可视化类型

LangServe部署

使用LangServe将LangChain应用部署为API服务。

# ==================== langserve_demo.py ====================
"""使用LangServe部署LangChain应用"""

from fastapi import FastAPI
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langserve import add_routes

# 创建FastAPI应用
app = FastAPI(
    title="LangChain服务",
    version="1.0",
    description="使用LangServe部署的LangChain API",
)

# 1. 基础Chain示例
llm = ChatOpenAI(model="gpt-3.5-turbo")
prompt = ChatPromptTemplate.from_template("用一句话总结:{topic}")
summary_chain = prompt | llm

# 2. 添加路由
add_routes(
    app,
    summary_chain,
    path="/summary",
    enable_feedback_endpoint=True,  # 启用反馈端点
    enable_public_trace_link_endpoint=True,  # 启用追踪链接
)

# 3. 带记忆的Chain
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationChain

memory = ConversationBufferMemory()
conversation = ConversationChain(
    llm=llm,
    memory=memory,
    verbose=True
)

add_routes(
    app,
    conversation,
    path="/chat",
)

# 4. 自定义输入输出模型
from pydantic import BaseModel, Field

class TranslateRequest(BaseModel):
    """翻译请求"""
    text: str = Field(description="要翻译的文本")
    target_language: str = Field(description="目标语言", default="中文")

class TranslateResponse(BaseModel):
    """翻译响应"""
    translation: str = Field(description="翻译结果")
    source_language: str = Field(description="检测到的源语言")

translate_prompt = ChatPromptTemplate.from_template(
    "将以下文本翻译成{target_language}:\n\n{text}"
)
translate_chain = translate_prompt | llm

add_routes(
    app,
    translate_chain,
    path="/translate",
    input_type=TranslateRequest,
    output_type=TranslateResponse,
)

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

部署命令

# 安装依赖
pip install langserve[all] fastapi uvicorn

# 启动服务
python langserve_demo.py

# 访问API文档
open http://localhost:8000/docs

# 调用API
curl -X POST http://localhost:8000/summary/invoke \
  -H "Content-Type: application/json" \
  -d '{"input": {"topic": "人工智能的发展"}}'

LangSmith监控

LangSmith是LangChain的监控平台,用于追踪、调试和评估应用。

# ==================== langsmith_setup.py ====================
"""LangSmith监控配置"""

import os
from langchain.callbacks.tracers import LangChainTracer

# 配置环境变量
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-app"  # 项目名称

# 方式1:自动追踪(推荐)
# 配置环境变量后,所有LangChain调用会自动追踪

from langchain_openai import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

llm = ChatOpenAI()
prompt = PromptTemplate.from_template("你好,{name}!")
chain = LLMChain(llm=llm, prompt=prompt)

# 此调用会自动记录到LangSmith
result = chain.invoke({"name": "张三"})

# 方式2:手动添加追踪器
tracer = LangChainTracer(
    project_name="debug-project"
)

result = chain.invoke(
    {"name": "李四"},
    callbacks=[tracer]
)

# 方式3:针对特定运行启用追踪
from langchain.callbacks.manager import CallbackManager

callback_manager = CallbackManager([tracer])

llm_with_tracing = ChatOpenAI(
    callback_manager=callback_manager
)
LangSmith功能
  • 追踪:记录每次调用的输入、输出、中间步骤
  • 调试:查看详细的执行流程和时间消耗
  • 评估:批量评估链的性能
  • 数据集:创建测试数据集用于回归测试

学习路径总结

从入门到精通的LangChain学习路线:

阶段 内容 目标
入门 (1-2周) 环境搭建、基础概念、简单Chain 能写出第一个LangChain程序
进阶 (2-3周) Prompt模板、文档处理、RAG 能构建知识库问答系统
提高 (2-3周) 记忆、Agent、工具开发 能开发智能助手类产品
实战 (持续) 项目开发、部署、监控 能独立完成企业级项目

资源推荐

课程总结

恭喜完成LangChain入门教程的全部内容!让我们一起回顾所学:

📚 八章内容回顾
  1. 环境搭建:OpenAI API、基础库安装
  2. 基础概念:Model、Prompt、Chain核心组件
  3. 文档处理:加载、分割、向量化
  4. RAG应用:检索增强生成完整流程
  5. 存储与记忆:向量库、对话记忆
  6. 智能体:Agent、工具、执行器
  7. 实战项目:客服系统、知识管理、数据分析

进阶建议

  • 深入学习:阅读LangChain源码,理解内部机制
  • 扩展技能:学习LlamaIndex、AutoGPT等框架
  • 关注前沿:跟进Multi-Agent、Function Calling等新特性
  • 实践出真知:动手做项目是最好的学习方式
🎉 最后的话
LangChain是一个快速发展的框架,新功能层出不穷。保持学习热情,多动手实践,你一定能在AI应用开发领域取得成功!

如果你在学习和实践中遇到问题,欢迎查阅官方文档或在社区寻求帮助。

祝你编码愉快,创造无限可能!