第5章:向量数据库与RAG

本章将带你深入了解向量数据库的工作原理,学习如何使用Chroma存储和检索向量数据,并掌握RAG(检索增强生成)技术,让你的AI应用能够基于私有知识库回答问题。

1. 什么是向量数据库

向量数据库(Vector Database)是一种专门设计用于存储和查询高维向量数据的数据库系统。在AI应用中,我们经常需要将文本、图像等内容转换为向量(Embedding),向量数据库就是用来高效存储和检索这些向量的工具。

向量数据库的核心特点

  • 向量存储:将文本、图像等数据转换为高维向量(通常是768维或1536维)并存储
  • 相似度搜索:通过计算向量间的距离(如余弦相似度、欧氏距离)来找到最相似的数据
  • 近似最近邻(ANN):在大规模数据下快速找到近似最相似的结果
  • 元数据过滤:可以结合向量相似度和元数据条件进行混合查询
💡 为什么需要向量数据库?
想象你要在海量文档中找到与"LangChain入门教程"相关的内容。传统关键词搜索可能找不到包含"LangChain"但语义相关的文档(如"LLM应用开发框架")。向量数据库通过语义相似度搜索,能够理解查询的深层含义,找到真正相关的内容。

向量相似度计算

向量数据库通过计算向量之间的距离来判断相似度,常用的度量方式包括:

  • 余弦相似度(Cosine Similarity):计算两个向量夹角的余弦值,值越接近1表示越相似
  • 欧氏距离(Euclidean Distance):计算向量间的直线距离,距离越小越相似
  • 点积(Dot Product):向量的点积结果,常用于归一化向量

2. 常用向量数据库

目前市面上有多种向量数据库可供选择,下面介绍几种最常用的:

Chroma - 轻量级本地数据库 ⭐ 推荐入门使用

Chroma是一个开源的嵌入式向量数据库,特别适合本地开发和中小型项目。

优点:
  • 安装简单,纯Python实现
  • 支持内存模式和持久化模式
  • 与LangChain集成非常好
  • API简洁易用
局限:不适合超大规模数据(亿级以上),适合百万级以下数据量

FAISS - Facebook的向量检索库

FAISS(Facebook AI Similarity Search)是Facebook开发的高效相似度搜索库,采用C++实现,性能优异。

优点:
  • 搜索速度极快,支持GPU加速
  • 支持多种索引类型(Flat、IVF、HNSW等)
  • 适合亿级向量检索
注意:主要是检索库,需要配合其他存储方案管理原始数据

Pinecone - 云端托管服务

Pinecone是一个全托管的向量数据库云服务,无需维护基础设施。

优点:
  • 完全托管,无需运维
  • 自动扩缩容
  • 高可用性保障
  • 支持元数据过滤和混合搜索
注意:商业化服务,大规模使用需要付费

其他向量数据库

数据库 特点 适用场景
Qdrant Rust编写,高性能,支持过滤 生产环境,需要过滤功能
Weaviate 支持GraphQL查询,模块化AI集成 复杂查询场景
Milvus/Zilliz 分布式架构,企业级 超大规模数据
pgvector PostgreSQL扩展 已有PG基础设施

3. Chroma使用详解

Chroma是最适合入门的向量数据库,本节将详细介绍其使用方法。

3.1 安装Chroma

# 安装Chroma
pip install chromadb

# 如果需要持久化存储,确保目录可写
# 同时需要安装LangChain的Chroma集成
pip install langchain-chroma

3.2 Chroma基础概念

  • Client:客户端对象,用于连接Chroma
  • Collection:集合,类似于关系型数据库中的表,存储相关的文档向量
  • Document:文档,包含原始文本、嵌入向量、元数据

3.3 创建集合和添加数据

import chromadb
from chromadb.utils import embedding_functions

# ========== 方式1:使用内存模式(数据不保存)==========
client = chromadb.Client()

# ========== 方式2:持久化模式(推荐)==========
# 数据会保存在指定目录
client = chromadb.PersistentClient(path="./chroma_db")

# 创建或获取集合
# 使用OpenAI的嵌入模型
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key="your-api-key",
    model_name="text-embedding-ada-002"
)

# 创建集合,指定嵌入函数
collection = client.create_collection(
    name="my_documents",
    embedding_function=openai_ef,
    metadata={"hnsw:space": "cosine"}  # 使用余弦相似度
)

# 或者获取已存在的集合
collection = client.get_collection(
    name="my_documents",
    embedding_function=openai_ef
)

print(f"集合创建成功:{collection.name}")

3.4 添加文档和嵌入

# 添加文档到集合
documents = [
    "LangChain是一个用于开发LLM应用的框架",
    "RAG是检索增强生成技术,结合外部知识库",
    "向量数据库用于存储和检索高维向量",
    "OpenAI提供了强大的嵌入模型服务",
    "Python是最流行的AI开发语言"
]

# 文档ID(必须唯一)
ids = ["doc_1", "doc_2", "doc_3", "doc_4", "doc_5"]

# 元数据(可选,用于过滤)
metadatas = [
    {"category": "framework", "source": "tutorial"},
    {"category": "technology", "source": "tutorial"},
    {"category": "database", "source": "tutorial"},
    {"category": "model", "source": "official"},
    {"category": "language", "source": "general"}
]

# 添加文档
# Chroma会自动调用嵌入函数生成向量
collection.add(
    documents=documents,
    ids=ids,
    metadatas=metadatas
)

print(f"成功添加 {len(documents)} 个文档")

# 查看集合信息
print(f"集合中的文档数:{collection.count()}")

3.5 相似度搜索

# 基本相似度搜索
results = collection.query(
    query_texts=["什么是RAG技术?"],
    n_results=3  # 返回最相似的3个结果
)

print("搜索结果:")
for i, (doc, distance, metadata) in enumerate(zip(
    results['documents'][0], 
    results['distances'][0],
    results['metadatas'][0]
)):
    print(f"\n结果 {i+1}:")
    print(f"  内容:{doc}")
    print(f"  距离:{distance:.4f}")
    print(f"  元数据:{metadata}")

# ========== 带过滤条件的搜索 ==========
# 只搜索特定类别的文档
results = collection.query(
    query_texts=["AI开发工具"],
    n_results=2,
    where={"category": "framework"}  # 元数据过滤
)

print("\n过滤搜索结果:")
for doc in results['documents'][0]:
    print(f"  - {doc}")

# ========== 使用嵌入向量搜索 ==========
# 如果已有嵌入向量,可以直接使用
# query_embedding = [...]  # 你的嵌入向量
# results = collection.query(
#     query_embeddings=[query_embedding],
#     n_results=3
# )

3.6 更新和删除文档

# 更新文档
collection.update(
    ids=["doc_1"],
    documents=["LangChain是一个强大的LLM应用开发框架,支持多种模型"],
    metadatas=[{"category": "framework", "version": "0.1"}]
)

# 删除特定文档
collection.delete(ids=["doc_5"])

# 按条件删除(删除所有source为tutorial的文档)
# collection.delete(where={"source": "tutorial"})

print(f"删除后文档数:{collection.count()}")

3.7 Chroma与LangChain集成完整示例

from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document

# 初始化嵌入模型
embeddings = OpenAIEmbeddings(
    model="text-embedding-ada-002",
    api_key="your-api-key"
)

# 创建文档列表
docs = [
    Document(
        page_content="LangChain提供了丰富的组件来构建LLM应用",
        metadata={"source": "langchain_doc", "page": 1}
    ),
    Document(
        page_content="向量存储是RAG系统的核心组件之一",
        metadata={"source": "rag_guide", "page": 10}
    ),
    Document(
        page_content="OpenAI的GPT模型在文本生成方面表现出色",
        metadata={"source": "model_comparison", "page": 5}
    )
]

# 创建Chroma向量存储
# persist_directory指定持久化目录
vectorstore = Chroma.from_documents(
    documents=docs,
    embedding=embeddings,
    persist_directory="./chroma_langchain_db",
    collection_name="tutorial_docs"
)

print("向量存储创建成功!")

# ========== 相似度搜索 ==========
# 搜索与查询最相似的文档
results = vectorstore.similarity_search(
    query="RAG系统需要什么组件?",
    k=2  # 返回2个结果
)

print("\n相似度搜索结果:")
for i, doc in enumerate(results, 1):
    print(f"{i}. {doc.page_content}")
    print(f"   来源:{doc.metadata['source']}, 页码:{doc.metadata['page']}\n")

# ========== 带分数的相似度搜索 ==========
results_with_score = vectorstore.similarity_search_with_score(
    query="如何构建LLM应用?",
    k=2
)

print("带分数的搜索结果:")
for doc, score in results_with_score:
    print(f"内容:{doc.page_content}")
    print(f"相似度分数:{score:.4f}\n")

# ========== 加载已有的向量存储 ==========
# 后续可以加载已创建的向量存储
loaded_vectorstore = Chroma(
    persist_directory="./chroma_langchain_db",
    embedding_function=embeddings,
    collection_name="tutorial_docs"
)

print(f"加载的向量存储文档数:{loaded_vectorstore._collection.count()}")
💡 小贴士
在LangChain中,Chroma.from_documents()会自动处理文档的分割、嵌入和存储。如果数据量很大,建议使用Chroma.add_documents()分批添加。

4. 什么是RAG

RAG(Retrieval-Augmented Generation,检索增强生成)是一种将外部知识检索与大语言模型生成能力相结合的技术架构。它让AI能够基于特定的知识库回答问题,而不是仅依赖训练时的知识。

为什么需要RAG

大语言模型虽然知识丰富,但存在以下局限:

  • 知识截止时间:模型训练后发生的事件不知道
  • 幻觉问题:可能生成看似合理但实际错误的内容
  • 私有知识:无法访问企业内部文档、个人笔记等私有数据
  • 知识更新:难以实时更新知识库

RAG通过检索外部知识库,将相关信息作为上下文提供给模型,解决了上述问题:

  • ✅ 可以访问最新信息
  • ✅ 基于检索到的真实文档生成,减少幻觉
  • ✅ 能够使用私有数据
  • ✅ 知识库可以随时更新
🎯 RAG的核心思想
不要试图让模型记住所有知识,而是教会它如何查找和使用知识。就像开卷考试,带着参考资料去答题,比死记硬背更可靠。

5. RAG完整流程

RAG系统分为两个阶段:索引阶段(离线)和查询阶段(在线)。

5.1 索引阶段(Indexing)

这个阶段将文档处理成可检索的向量索引,通常在系统部署前完成。

from langchain.document_loaders import TextLoader, PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma

# ========== 第1步:加载文档 ==========
# 加载单个文本文件
# loader = TextLoader("./docs/intro.txt", encoding="utf-8")

# 加载PDF文件
# loader = PyPDFLoader("./docs/manual.pdf")

# 加载整个目录(支持多种格式)
loader = DirectoryLoader(
    "./knowledge_base",  # 知识库目录
    glob="**/*.txt",     # 匹配所有txt文件
    loader_cls=TextLoader,
    loader_kwargs={'encoding': 'utf-8'}
)

documents = loader.load()
print(f"共加载 {len(documents)} 个文档")

# ========== 第2步:分割文档 ==========
# 长文档需要分割成小块,原因:
# 1. 嵌入模型有输入长度限制
# 2. 小块更精确匹配查询意图
# 3. 避免无关信息干扰

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,      # 每块最大字符数
    chunk_overlap=50,    # 块间重叠字符数(保持上下文连贯)
    separators=["\n\n", "\n", "。", "!", "?", " ", ""]  # 分割优先级
)

chunks = text_splitter.split_documents(documents)
print(f"分割为 {len(chunks)} 个文本块")

# 查看一个块的内容示例
print(f"\n示例块内容(长度:{len(chunks[0].page_content)}):")
print(chunks[0].page_content[:200] + "...")

# ========== 第3步:生成嵌入并存储 ==========
embeddings = OpenAIEmbeddings(
    model="text-embedding-ada-002",
    api_key="your-api-key"
)

# 创建向量存储
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=embeddings,
    persist_directory="./rag_vector_db",
    collection_name="knowledge_base"
)

print("\n索引创建完成!")
print(f"向量存储路径:./rag_vector_db")
print(f"文档块数:{vectorstore._collection.count()}")

5.2 查询阶段(Querying)

用户提问时,系统检索相关文档并生成回答。

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

# 加载已创建的向量存储
vectorstore = Chroma(
    persist_directory="./rag_vector_db",
    embedding_function=embeddings
)

# 创建检索器(用于搜索相关文档)
retriever = vectorstore.as_retriever(
    search_type="similarity",  # 相似度搜索
    search_kwargs={"k": 4}     # 返回最相似的4个文档
)

# 用户查询
query = "公司的请假流程是什么?"

# ========== 第1步:检索相关文档 ==========
relevant_docs = retriever.get_relevant_documents(query)

print(f"查询:{query}")
print(f"\n检索到 {len(relevant_docs)} 个相关文档:\n")
for i, doc in enumerate(relevant_docs, 1):
    print(f"文档 {i}:")
    print(f"  内容:{doc.page_content[:150]}...")
    print(f"  来源:{doc.metadata.get('source', 'unknown')}\n")

# ========== 第2步:构建提示词 ==========
# 将检索到的文档组合成上下文
context = "\n\n".join([doc.page_content for doc in relevant_docs])

# 构建RAG提示词模板
rag_template = """基于以下上下文信息回答问题。如果上下文中没有相关信息,请明确说明"根据提供的资料,我无法回答这个问题"。

上下文信息:
{context}

用户问题:{question}

请提供准确、简洁的回答:"""

prompt = ChatPromptTemplate.from_template(rag_template)
final_prompt = prompt.format(context=context, question=query)

print("\n" + "="*50)
print("最终发送给模型的提示词:")
print("="*50)
print(final_prompt[:500] + "...")

# ========== 第3步:生成回答 ==========
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    api_key="your-api-key",
    temperature=0  # 降低随机性,提高准确性
)

response = llm.invoke(final_prompt)
print("\n" + "="*50)
print("AI回答:")
print("="*50)
print(response.content)
✅ RAG流程总结
  1. 索引阶段:加载文档 → 分割 → 嵌入 → 存储到向量数据库
  2. 查询阶段:用户提问 → 检索相关文档 → 组合提示词 → LLM生成回答

6. RetrievalQA链

LangChain提供了RetrievalQA链,将RAG流程封装成可复用的组件。

6.1 基础用法

from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma

# 初始化组件
llm = ChatOpenAI(
    model="gpt-3.5-turbo",
    api_key="your-api-key",
    temperature=0
)

embeddings = OpenAIEmbeddings(api_key="your-api-key")

# 加载向量存储
vectorstore = Chroma(
    persist_directory="./rag_vector_db",
    embedding_function=embeddings
)

# 创建RetrievalQA链
qa_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",  # 链类型,后面详细解释
    retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
    return_source_documents=True,  # 返回来源文档
    verbose=True  # 打印中间过程
)

# 执行问答
query = "请介绍一下公司的产品优势"
result = qa_chain.invoke({"query": query})

print("回答:")
print(result["result"])
print("\n来源文档:")
for i, doc in enumerate(result["source_documents"], 1):
    print(f"{i}. {doc.page_content[:100]}...")

6.2 链类型详解

LangChain支持多种链类型来处理检索到的文档:

链类型 工作原理 优点 缺点 适用场景
stuff 将所有文档"塞"进一个提示词 简单、只调用一次LLM 文档过多时可能超出上下文限制 文档数量少、总计token数少
map_reduce 每个文档单独处理,再综合结果 可处理大量文档 需要多次LLM调用,速度慢 文档多、需要全面分析
refine 迭代式精炼答案 答案质量通常更好 调用次数多,依赖文档顺序 追求高质量答案
map_rerank 给每个文档打分,选最高分 自动选择最相关文档 需要设计评分prompt 有明确答案的问题

6.3 不同链类型使用示例

# ===== stuff链(默认)=====
# 直接将所有文档放入提示词
qa_stuff = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)

# ===== map_reduce链 =====
# 适合处理大量文档
qa_map_reduce = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="map_reduce",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 10}),
    # 可以自定义map和reduce的prompt
    chain_type_kwargs={
        "question_prompt": ChatPromptTemplate.from_template(
            "基于以下文档片段回答问题:\n{context}\n\n问题:{question}\n回答:"
        ),
        "combine_prompt": ChatPromptTemplate.from_template(
            "综合以下各片段的回答,给出最终答案:\n{summaries}\n\n最终答案:"
        )
    }
)

# ===== refine链 =====
# 迭代优化答案
qa_refine = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="refine",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

# 测试不同链的效果
question = "公司有哪些核心产品?"
print("Stuff链结果:", qa_stuff.invoke({"query": question})["result"])
print("\nMapReduce链结果:", qa_map_reduce.invoke({"query": question})["result"])
print("\nRefine链结果:", qa_refine.invoke({"query": question})["result"])

6.4 自定义提示词模板

from langchain.prompts import PromptTemplate

# 自定义RAG提示词模板
custom_template = """你是一个专业的企业知识库助手。请基于提供的参考资料回答问题。

回答要求:
1. 如果资料中有明确答案,请准确回答
2. 如果资料中没有相关信息,请说"抱歉,根据现有资料无法回答"
3. 回答要简洁明了,避免冗余
4. 可以适当引用资料中的关键信息

参考资料:
{context}

用户问题:{question}

请提供你的回答:"""

CUSTOM_PROMPT = PromptTemplate(
    template=custom_template,
    input_variables=["context", "question"]
)

# 使用自定义提示词创建QA链
qa_chain_custom = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
    chain_type_kwargs={"prompt": CUSTOM_PROMPT},
    return_source_documents=True
)

result = qa_chain_custom.invoke({"query": "公司的请假政策是什么?"})
print(result["result"])

7. 多查询检索 MultiQueryRetriever

用户的问题可能表达方式多样,单一查询可能无法召回所有相关文档。MultiQueryRetriever通过生成多个查询变体来提高召回率。

工作原理

  1. LLM根据原始查询生成多个语义等价的不同表达
  2. 对每个查询执行检索
  3. 合并去重后的结果返回
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_openai import ChatOpenAI

# 初始化基础检索器
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 创建MultiQueryRetriever
multi_retriever = MultiQueryRetriever.from_llm(
    retriever=base_retriever,
    llm=ChatOpenAI(temperature=0, api_key="your-api-key"),
    # 生成的查询数量(默认3个)
    # include_original=True  # 是否包含原始查询
)

# 使用多查询检索
query = "怎么请假?"
docs = multi_retriever.get_relevant_documents(query)

print(f"原始查询:{query}")
print(f"检索到 {len(docs)} 个文档\n")

# 查看生成的查询(需要设置日志级别)
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)

# 再次执行以查看生成的查询
docs = multi_retriever.get_relevant_documents(query)
💡 使用建议
MultiQueryRetriever会增加LLM调用次数,适合以下场景:
  • 用户查询简短、表达多样时
  • 对召回率要求高时
  • 可以接受稍高延迟的场景

8. 父文档检索 ParentDocumentRetriever

一个常见的问题:小块文本便于精确匹配查询,但可能丢失上下文;大块文本保留完整上下文,但可能包含无关信息。

ParentDocumentRetriever提供了解决方案:

  • 检索阶段:使用小块(child chunk)进行相似度匹配
  • 生成阶段:返回大块(parent document)给LLM
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings

# 创建两种分割器
# 子分割器:用于索引和检索(小块,精确匹配)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=20)

# 父分割器:用于返回给LLM(大块,完整上下文)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)

# 创建向量存储(存储子块)
vectorstore = Chroma(
    collection_name="parent_doc_store",
    embedding_function=OpenAIEmbeddings(api_key="your-api-key"),
    persist_directory="./parent_doc_db"
)

# 创建文档存储(存储父块)
docstore = InMemoryStore()

# 创建ParentDocumentRetriever
retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=docstore,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
    # 可选:限制返回的父文档数量
    # search_kwargs={"k": 2}
)

# 加载原始文档
from langchain.document_loaders import TextLoader
loader = TextLoader("./long_document.txt", encoding="utf-8")
docs = loader.load()

# 添加文档
# 文档会被父分割器分割,存储到docstore
# 同时子分割器分割后的块会嵌入存储到vectorstore
retriever.add_documents(docs, ids=None)

print(f"向量存储中的子块数:{vectorstore._collection.count()}")

# 查询 - 使用子块匹配,但返回父块
query = "某个具体问题"
relevant_docs = retriever.get_relevant_documents(query)

print(f"\n检索到 {len(relevant_docs)} 个父文档")
for i, doc in enumerate(relevant_docs):
    print(f"\n父文档 {i+1} (长度:{len(doc.page_content)}):")
    print(doc.page_content[:500] + "...")
✅ ParentDocumentRetriever的优势
  • 小块检索:更精确的语义匹配
  • 大块生成:保留完整的上下文信息
  • 减少无关信息:避免返回过多不相关的文本块
  • 提高回答质量:LLM有更完整的上下文来生成回答

9. 完整实战:公司知识库问答系统

本节将构建一个完整的公司知识库问答系统,包含文档加载、索引构建、问答交互等全部功能。

项目结构

company_kb_qa/
├── knowledge_base/          # 知识库文档目录
│   ├── 员工手册.txt
│   ├── 产品说明.txt
│   └── 常见问题.txt
├── create_index.py          # 构建索引脚本
├── qa_system.py             # 问答系统主程序
└── chroma_db/               # 向量数据库目录

9.1 构建索引脚本 create_index.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
公司知识库索引构建脚本
运行此脚本将知识库文档转换为向量索引
"""

import os
from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma

def create_knowledge_base_index(
    docs_dir: str = "./knowledge_base",
    db_dir: str = "./chroma_db",
    collection_name: str = "company_kb"
):
    """
    从文档目录构建向量索引
    
    Args:
        docs_dir: 知识库文档目录路径
        db_dir: 向量数据库存储路径
        collection_name: 集合名称
    """
    
    # 检查文档目录是否存在
    if not os.path.exists(docs_dir):
        print(f"错误:文档目录 {docs_dir} 不存在")
        print("请创建目录并添加知识库文档(.txt, .pdf等)")
        return False
    
    print(f"开始构建知识库索引...")
    print(f"文档目录:{os.path.abspath(docs_dir)}")
    
    # ========== 1. 加载文档 ==========
    print("\n📄 正在加载文档...")
    
    # 支持多种文件类型的加载器字典
    loaders = {
        "*.txt": TextLoader,
        # "*.pdf": PyPDFLoader,
        # "*.md": UnstructuredMarkdownLoader,
    }
    
    all_documents = []
    for glob_pattern, loader_cls in loaders.items():
        try:
            loader = DirectoryLoader(
                docs_dir,
                glob=glob_pattern,
                loader_cls=loader_cls,
                loader_kwargs={'encoding': 'utf-8'} if 'TextLoader' in str(loader_cls) else {},
                show_progress=True
            )
            docs = loader.load()
            all_documents.extend(docs)
            print(f"  ✓ {glob_pattern}: 加载 {len(docs)} 个文档")
        except Exception as e:
            print(f"  ✗ {glob_pattern}: 加载失败 - {e}")
    
    if not all_documents:
        print("\n❌ 没有加载到任何文档,请检查文档目录")
        return False
    
    print(f"\n✅ 共加载 {len(all_documents)} 个文档")
    
    # ========== 2. 分割文档 ==========
    print("\n✂️  正在分割文档...")
    
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=500,          # 每块500字符
        chunk_overlap=50,        # 重叠50字符保持上下文
        separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""],
        length_function=len,
    )
    
    chunks = text_splitter.split_documents(all_documents)
    print(f"✅ 分割为 {len(chunks)} 个文本块")
    
    # 显示分割统计信息
    chunk_lengths = [len(chunk.page_content) for chunk in chunks]
    print(f"   平均块长度:{sum(chunk_lengths) // len(chunk_lengths)} 字符")
    print(f"   最大块长度:{max(chunk_lengths)} 字符")
    print(f"   最小块长度:{min(chunk_lengths)} 字符")
    
    # ========== 3. 创建向量存储 ==========
    print("\n🔤 正在生成嵌入向量...")
    
    # 确保存储目录存在
    os.makedirs(db_dir, exist_ok=True)
    
    # 初始化嵌入模型
    embeddings = OpenAIEmbeddings(
        model="text-embedding-ada-002",
        # api_key="your-api-key"  # 也可从环境变量读取
    )
    
    # 创建向量存储
    vectorstore = Chroma.from_documents(
        documents=chunks,
        embedding=embeddings,
        persist_directory=db_dir,
        collection_name=collection_name
    )
    
    print(f"\n✅ 索引构建完成!")
    print(f"   向量数据库路径:{os.path.abspath(db_dir)}")
    print(f"   集合名称:{collection_name}")
    print(f"   文档块总数:{vectorstore._collection.count()}")
    
    return True


if __name__ == "__main__":
    # 从环境变量读取API密钥
    import os
    if not os.getenv("OPENAI_API_KEY"):
        print("⚠️  警告:未设置 OPENAI_API_KEY 环境变量")
        print("请在运行前设置:export OPENAI_API_KEY='your-key'")
    
    # 执行索引构建
    success = create_knowledge_base_index()
    
    if success:
        print("\n🎉 知识库索引构建成功!可以运行问答系统了。")
    else:
        print("\n❌ 索引构建失败,请检查错误信息。")

9.2 问答系统主程序 qa_system.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
公司知识库问答系统
支持连续对话、来源展示、历史记录等功能
"""

import os
import sys
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime

from langchain_chroma import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_core.documents import Document


@dataclass
class QAResponse:
    """问答响应数据结构"""
    answer: str
    source_documents: List[Document]
    query_time: datetime
    confidence: Optional[str] = None


class CompanyKBQA:
    """公司知识库问答系统"""
    
    def __init__(
        self,
        db_dir: str = "./chroma_db",
        collection_name: str = "company_kb",
        model: str = "gpt-3.5-turbo",
        temperature: float = 0.0
    ):
        """
        初始化问答系统
        
        Args:
            db_dir: 向量数据库目录
            collection_name: 集合名称
            model: 使用的LLM模型
            temperature: 生成随机性
        """
        self.db_dir = db_dir
        self.collection_name = collection_name
        
        # 检查API密钥
        if not os.getenv("OPENAI_API_KEY"):
            raise ValueError("请设置 OPENAI_API_KEY 环境变量")
        
        # 初始化嵌入模型
        print("🔤 正在加载嵌入模型...")
        self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
        
        # 加载向量存储
        print("📚 正在加载知识库...")
        if not os.path.exists(db_dir):
            raise FileNotFoundError(
                f"向量数据库不存在:{db_dir}\n"
                "请先运行 create_index.py 构建索引"
            )
        
        self.vectorstore = Chroma(
            persist_directory=db_dir,
            embedding_function=self.embeddings,
            collection_name=collection_name
        )
        
        doc_count = self.vectorstore._collection.count()
        print(f"   ✓ 加载了 {doc_count} 个文档块")
        
        # 初始化LLM
        print("🤖 正在初始化语言模型...")
        self.llm = ChatOpenAI(
            model=model,
            temperature=temperature
        )
        
        # 创建自定义提示词模板
        self.qa_prompt = PromptTemplate(
            template="""你是公司的智能助手,专门回答员工关于公司政策、流程、产品等方面的问题。

请严格基于以下参考资料回答问题:
{context}

回答要求:
1. 如果参考资料中有明确答案,请准确、简洁地回答
2. 如果参考资料中没有相关信息,请明确说"抱歉,根据现有知识库资料,我无法回答这个问题"
3. 不要编造信息,不要基于你的训练数据回答
4. 可以适当引用资料中的关键内容
5. 保持友好、专业的语气

用户问题:{question}

请提供你的回答:""",
            input_variables=["context", "question"]
        )
        
        # 创建QA链
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever(
                search_type="similarity",
                search_kwargs={"k": 5}  # 检索最相似的5个文档
            ),
            chain_type_kwargs={"prompt": self.qa_prompt},
            return_source_documents=True,
            verbose=False
        )
        
        # 对话历史
        self.history: List[dict] = []
        
        print("\n✅ 问答系统初始化完成!\n")
    
    def ask(self, question: str) -> QAResponse:
        """
        提问并获取回答
        
        Args:
            question: 用户问题
            
        Returns:
            QAResponse对象,包含回答和来源文档
        """
        start_time = datetime.now()
        
        # 执行问答
        result = self.qa_chain.invoke({"query": question})
        
        # 构建响应
        response = QAResponse(
            answer=result["result"],
            source_documents=result["source_documents"],
            query_time=datetime.now()
        )
        
        # 记录历史
        self.history.append({
            "question": question,
            "answer": response.answer,
            "time": response.query_time,
            "sources": [doc.metadata.get('source', 'unknown') for doc in response.source_documents]
        })
        
        return response
    
    def print_response(self, response: QAResponse, show_sources: bool = True):
        """格式化打印回答"""
        print("=" * 60)
        print("🤖 AI助手回答:")
        print("=" * 60)
        print(response.answer)
        print()
        
        if show_sources and response.source_documents:
            print("📚 参考来源:")
            print("-" * 60)
            seen_sources = set()
            for i, doc in enumerate(response.source_documents, 1):
                source = doc.metadata.get('source', '未知来源')
                if source not in seen_sources:
                    print(f"  {i}. {source}")
                    seen_sources.add(source)
            print()
    
    def interactive_mode(self):
        """交互式问答模式"""
        print("=" * 60)
        print("🏢 公司知识库问答系统")
        print("=" * 60)
        print("\n使用方法:")
        print("  • 直接输入问题开始问答")
        print("  • 输入 'quit' 或 'exit' 退出")
        print("  • 输入 'history' 查看对话历史")
        print("  • 输入 'clear' 清空历史")
        print("\n" + "=" * 60 + "\n")
        
        while True:
            try:
                # 获取用户输入
                question = input("❓ 请输入您的问题:").strip()
                
                # 处理特殊命令
                if not question:
                    continue
                    
                if question.lower() in ['quit', 'exit', 'q']:
                    print("\n👋 感谢使用,再见!")
                    break
                
                if question.lower() == 'history':
                    self.show_history()
                    continue
                
                if question.lower() == 'clear':
                    self.history.clear()
                    print("✅ 对话历史已清空\n")
                    continue
                
                # 执行问答
                print("🤔 正在思考...")
                response = self.ask(question)
                self.print_response(response)
                
            except KeyboardInterrupt:
                print("\n\n👋 再见!")
                break
            except Exception as e:
                print(f"\n❌ 出错:{e}\n")
    
    def show_history(self):
        """显示对话历史"""
        if not self.history:
            print("\n📭 暂无对话历史\n")
            return
        
        print("\n" + "=" * 60)
        print("📜 对话历史")
        print("=" * 60)
        
        for i, record in enumerate(self.history, 1):
            print(f"\n【对话 {i}】{record['time'].strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"问:{record['question']}")
            print(f"答:{record['answer'][:100]}..." if len(record['answer']) > 100 else f"答:{record['answer']}")
            print(f"来源:{', '.join(record['sources'])}")
        
        print("\n")


def main():
    """主函数"""
    # 检查命令行参数
    if len(sys.argv) > 1 and sys.argv[1] == '--create-index':
        # 只构建索引模式
        from create_index import create_knowledge_base_index
        create_knowledge_base_index()
        return
    
    try:
        # 初始化并启动问答系统
        qa_system = CompanyKBQA()
        qa_system.interactive_mode()
    except Exception as e:
        print(f"\n❌ 系统启动失败:{e}")
        print("\n解决方案:")
        print("  1. 确保已设置 OPENAI_API_KEY 环境变量")
        print("  2. 确保知识库文档已放入 knowledge_base 目录")
        print("  3. 运行 'python qa_system.py --create-index' 构建索引")
        sys.exit(1)


if __name__ == "__main__":
    main()

9.3 使用示例

# 1. 设置环境变量
export OPENAI_API_KEY="your-api-key"

# 2. 准备知识库文档
mkdir -p knowledge_base
echo "公司年假政策:员工每年享有10天带薪年假..." > knowledge_base/员工手册.txt
echo "产品A特点:高性能、易用、支持多平台..." > knowledge_base/产品说明.txt

# 3. 构建索引
python qa_system.py --create-index
# 或
python create_index.py

# 4. 启动问答系统
python qa_system.py

# 交互示例:
# ❓ 请输入您的问题:公司年假有多少天?
# 🤔 正在思考...
# ============================================================
# 🤖 AI助手回答:
# ============================================================
# 根据公司员工手册,员工每年享有10天带薪年假。
#
# 📚 参考来源:
# ------------------------------------------------------------
#   1. knowledge_base/员工手册.txt

# ❓ 请输入您的问题:产品A有什么特点?
# ...

9.4 扩展功能建议

在此基础上,你可以进一步扩展系统功能:

  • Web界面:使用Gradio或Streamlit构建可视化界面
  • 多轮对话:集成记忆功能(下章讲解)
  • 用户管理:添加登录、权限控制
  • 文档管理:支持动态添加、删除文档
  • 回答评分:收集用户反馈优化系统
  • 多语言支持:支持中英文等不同语言的知识库

本章小结

本章我们深入学习了向量数据库和RAG技术:

  • 向量数据库:Chroma的使用方法和与LangChain的集成
  • RAG架构:索引阶段(加载→分割→嵌入→存储)和查询阶段(检索→组合提示→生成)
  • RetrievalQA链:快速构建RAG问答系统,支持多种链类型
  • 高级检索:MultiQueryRetriever提高召回率,ParentDocumentRetriever平衡精度与上下文
  • 完整项目:公司知识库问答系统的实战开发
📝 练习建议
  1. 使用Chroma创建一个个人笔记知识库
  2. 尝试不同的文本分割参数,观察对检索效果的影响
  3. 对比不同链类型(stuff、map_reduce、refine)的效果差异
  4. 将本章的问答系统扩展为支持多轮对话(预习下一章内容)