第5章:向量数据库与RAG
本章将带你深入了解向量数据库的工作原理,学习如何使用Chroma存储和检索向量数据,并掌握RAG(检索增强生成)技术,让你的AI应用能够基于私有知识库回答问题。
1. 什么是向量数据库
向量数据库(Vector Database)是一种专门设计用于存储和查询高维向量数据的数据库系统。在AI应用中,我们经常需要将文本、图像等内容转换为向量(Embedding),向量数据库就是用来高效存储和检索这些向量的工具。
向量数据库的核心特点
- 向量存储:将文本、图像等数据转换为高维向量(通常是768维或1536维)并存储
- 相似度搜索:通过计算向量间的距离(如余弦相似度、欧氏距离)来找到最相似的数据
- 近似最近邻(ANN):在大规模数据下快速找到近似最相似的结果
- 元数据过滤:可以结合向量相似度和元数据条件进行混合查询
想象你要在海量文档中找到与"LangChain入门教程"相关的内容。传统关键词搜索可能找不到包含"LangChain"但语义相关的文档(如"LLM应用开发框架")。向量数据库通过语义相似度搜索,能够理解查询的深层含义,找到真正相关的内容。
向量相似度计算
向量数据库通过计算向量之间的距离来判断相似度,常用的度量方式包括:
- 余弦相似度(Cosine Similarity):计算两个向量夹角的余弦值,值越接近1表示越相似
- 欧氏距离(Euclidean Distance):计算向量间的直线距离,距离越小越相似
- 点积(Dot Product):向量的点积结果,常用于归一化向量
2. 常用向量数据库
目前市面上有多种向量数据库可供选择,下面介绍几种最常用的:
Chroma - 轻量级本地数据库 ⭐ 推荐入门使用
Chroma是一个开源的嵌入式向量数据库,特别适合本地开发和中小型项目。
- 安装简单,纯Python实现
- 支持内存模式和持久化模式
- 与LangChain集成非常好
- API简洁易用
FAISS - Facebook的向量检索库
FAISS(Facebook AI Similarity Search)是Facebook开发的高效相似度搜索库,采用C++实现,性能优异。
- 搜索速度极快,支持GPU加速
- 支持多种索引类型(Flat、IVF、HNSW等)
- 适合亿级向量检索
Pinecone - 云端托管服务
Pinecone是一个全托管的向量数据库云服务,无需维护基础设施。
- 完全托管,无需运维
- 自动扩缩容
- 高可用性保障
- 支持元数据过滤和混合搜索
其他向量数据库
| 数据库 | 特点 | 适用场景 |
|---|---|---|
| Qdrant | Rust编写,高性能,支持过滤 | 生产环境,需要过滤功能 |
| Weaviate | 支持GraphQL查询,模块化AI集成 | 复杂查询场景 |
| Milvus/Zilliz | 分布式架构,企业级 | 超大规模数据 |
| pgvector | PostgreSQL扩展 | 已有PG基础设施 |
3. Chroma使用详解
Chroma是最适合入门的向量数据库,本节将详细介绍其使用方法。
3.1 安装Chroma
# 安装Chroma
pip install chromadb
# 如果需要持久化存储,确保目录可写
# 同时需要安装LangChain的Chroma集成
pip install langchain-chroma
3.2 Chroma基础概念
- Client:客户端对象,用于连接Chroma
- Collection:集合,类似于关系型数据库中的表,存储相关的文档向量
- Document:文档,包含原始文本、嵌入向量、元数据
3.3 创建集合和添加数据
import chromadb
from chromadb.utils import embedding_functions
# ========== 方式1:使用内存模式(数据不保存)==========
client = chromadb.Client()
# ========== 方式2:持久化模式(推荐)==========
# 数据会保存在指定目录
client = chromadb.PersistentClient(path="./chroma_db")
# 创建或获取集合
# 使用OpenAI的嵌入模型
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key="your-api-key",
model_name="text-embedding-ada-002"
)
# 创建集合,指定嵌入函数
collection = client.create_collection(
name="my_documents",
embedding_function=openai_ef,
metadata={"hnsw:space": "cosine"} # 使用余弦相似度
)
# 或者获取已存在的集合
collection = client.get_collection(
name="my_documents",
embedding_function=openai_ef
)
print(f"集合创建成功:{collection.name}")
3.4 添加文档和嵌入
# 添加文档到集合
documents = [
"LangChain是一个用于开发LLM应用的框架",
"RAG是检索增强生成技术,结合外部知识库",
"向量数据库用于存储和检索高维向量",
"OpenAI提供了强大的嵌入模型服务",
"Python是最流行的AI开发语言"
]
# 文档ID(必须唯一)
ids = ["doc_1", "doc_2", "doc_3", "doc_4", "doc_5"]
# 元数据(可选,用于过滤)
metadatas = [
{"category": "framework", "source": "tutorial"},
{"category": "technology", "source": "tutorial"},
{"category": "database", "source": "tutorial"},
{"category": "model", "source": "official"},
{"category": "language", "source": "general"}
]
# 添加文档
# Chroma会自动调用嵌入函数生成向量
collection.add(
documents=documents,
ids=ids,
metadatas=metadatas
)
print(f"成功添加 {len(documents)} 个文档")
# 查看集合信息
print(f"集合中的文档数:{collection.count()}")
3.5 相似度搜索
# 基本相似度搜索
results = collection.query(
query_texts=["什么是RAG技术?"],
n_results=3 # 返回最相似的3个结果
)
print("搜索结果:")
for i, (doc, distance, metadata) in enumerate(zip(
results['documents'][0],
results['distances'][0],
results['metadatas'][0]
)):
print(f"\n结果 {i+1}:")
print(f" 内容:{doc}")
print(f" 距离:{distance:.4f}")
print(f" 元数据:{metadata}")
# ========== 带过滤条件的搜索 ==========
# 只搜索特定类别的文档
results = collection.query(
query_texts=["AI开发工具"],
n_results=2,
where={"category": "framework"} # 元数据过滤
)
print("\n过滤搜索结果:")
for doc in results['documents'][0]:
print(f" - {doc}")
# ========== 使用嵌入向量搜索 ==========
# 如果已有嵌入向量,可以直接使用
# query_embedding = [...] # 你的嵌入向量
# results = collection.query(
# query_embeddings=[query_embedding],
# n_results=3
# )
3.6 更新和删除文档
# 更新文档
collection.update(
ids=["doc_1"],
documents=["LangChain是一个强大的LLM应用开发框架,支持多种模型"],
metadatas=[{"category": "framework", "version": "0.1"}]
)
# 删除特定文档
collection.delete(ids=["doc_5"])
# 按条件删除(删除所有source为tutorial的文档)
# collection.delete(where={"source": "tutorial"})
print(f"删除后文档数:{collection.count()}")
3.7 Chroma与LangChain集成完整示例
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_core.documents import Document
# 初始化嵌入模型
embeddings = OpenAIEmbeddings(
model="text-embedding-ada-002",
api_key="your-api-key"
)
# 创建文档列表
docs = [
Document(
page_content="LangChain提供了丰富的组件来构建LLM应用",
metadata={"source": "langchain_doc", "page": 1}
),
Document(
page_content="向量存储是RAG系统的核心组件之一",
metadata={"source": "rag_guide", "page": 10}
),
Document(
page_content="OpenAI的GPT模型在文本生成方面表现出色",
metadata={"source": "model_comparison", "page": 5}
)
]
# 创建Chroma向量存储
# persist_directory指定持久化目录
vectorstore = Chroma.from_documents(
documents=docs,
embedding=embeddings,
persist_directory="./chroma_langchain_db",
collection_name="tutorial_docs"
)
print("向量存储创建成功!")
# ========== 相似度搜索 ==========
# 搜索与查询最相似的文档
results = vectorstore.similarity_search(
query="RAG系统需要什么组件?",
k=2 # 返回2个结果
)
print("\n相似度搜索结果:")
for i, doc in enumerate(results, 1):
print(f"{i}. {doc.page_content}")
print(f" 来源:{doc.metadata['source']}, 页码:{doc.metadata['page']}\n")
# ========== 带分数的相似度搜索 ==========
results_with_score = vectorstore.similarity_search_with_score(
query="如何构建LLM应用?",
k=2
)
print("带分数的搜索结果:")
for doc, score in results_with_score:
print(f"内容:{doc.page_content}")
print(f"相似度分数:{score:.4f}\n")
# ========== 加载已有的向量存储 ==========
# 后续可以加载已创建的向量存储
loaded_vectorstore = Chroma(
persist_directory="./chroma_langchain_db",
embedding_function=embeddings,
collection_name="tutorial_docs"
)
print(f"加载的向量存储文档数:{loaded_vectorstore._collection.count()}")
在LangChain中,
Chroma.from_documents()会自动处理文档的分割、嵌入和存储。如果数据量很大,建议使用Chroma.add_documents()分批添加。
4. 什么是RAG
RAG(Retrieval-Augmented Generation,检索增强生成)是一种将外部知识检索与大语言模型生成能力相结合的技术架构。它让AI能够基于特定的知识库回答问题,而不是仅依赖训练时的知识。
为什么需要RAG
大语言模型虽然知识丰富,但存在以下局限:
- 知识截止时间:模型训练后发生的事件不知道
- 幻觉问题:可能生成看似合理但实际错误的内容
- 私有知识:无法访问企业内部文档、个人笔记等私有数据
- 知识更新:难以实时更新知识库
RAG通过检索外部知识库,将相关信息作为上下文提供给模型,解决了上述问题:
- ✅ 可以访问最新信息
- ✅ 基于检索到的真实文档生成,减少幻觉
- ✅ 能够使用私有数据
- ✅ 知识库可以随时更新
不要试图让模型记住所有知识,而是教会它如何查找和使用知识。就像开卷考试,带着参考资料去答题,比死记硬背更可靠。
5. RAG完整流程
RAG系统分为两个阶段:索引阶段(离线)和查询阶段(在线)。
5.1 索引阶段(Indexing)
这个阶段将文档处理成可检索的向量索引,通常在系统部署前完成。
from langchain.document_loaders import TextLoader, PyPDFLoader, DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
# ========== 第1步:加载文档 ==========
# 加载单个文本文件
# loader = TextLoader("./docs/intro.txt", encoding="utf-8")
# 加载PDF文件
# loader = PyPDFLoader("./docs/manual.pdf")
# 加载整个目录(支持多种格式)
loader = DirectoryLoader(
"./knowledge_base", # 知识库目录
glob="**/*.txt", # 匹配所有txt文件
loader_cls=TextLoader,
loader_kwargs={'encoding': 'utf-8'}
)
documents = loader.load()
print(f"共加载 {len(documents)} 个文档")
# ========== 第2步:分割文档 ==========
# 长文档需要分割成小块,原因:
# 1. 嵌入模型有输入长度限制
# 2. 小块更精确匹配查询意图
# 3. 避免无关信息干扰
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每块最大字符数
chunk_overlap=50, # 块间重叠字符数(保持上下文连贯)
separators=["\n\n", "\n", "。", "!", "?", " ", ""] # 分割优先级
)
chunks = text_splitter.split_documents(documents)
print(f"分割为 {len(chunks)} 个文本块")
# 查看一个块的内容示例
print(f"\n示例块内容(长度:{len(chunks[0].page_content)}):")
print(chunks[0].page_content[:200] + "...")
# ========== 第3步:生成嵌入并存储 ==========
embeddings = OpenAIEmbeddings(
model="text-embedding-ada-002",
api_key="your-api-key"
)
# 创建向量存储
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./rag_vector_db",
collection_name="knowledge_base"
)
print("\n索引创建完成!")
print(f"向量存储路径:./rag_vector_db")
print(f"文档块数:{vectorstore._collection.count()}")
5.2 查询阶段(Querying)
用户提问时,系统检索相关文档并生成回答。
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
# 加载已创建的向量存储
vectorstore = Chroma(
persist_directory="./rag_vector_db",
embedding_function=embeddings
)
# 创建检索器(用于搜索相关文档)
retriever = vectorstore.as_retriever(
search_type="similarity", # 相似度搜索
search_kwargs={"k": 4} # 返回最相似的4个文档
)
# 用户查询
query = "公司的请假流程是什么?"
# ========== 第1步:检索相关文档 ==========
relevant_docs = retriever.get_relevant_documents(query)
print(f"查询:{query}")
print(f"\n检索到 {len(relevant_docs)} 个相关文档:\n")
for i, doc in enumerate(relevant_docs, 1):
print(f"文档 {i}:")
print(f" 内容:{doc.page_content[:150]}...")
print(f" 来源:{doc.metadata.get('source', 'unknown')}\n")
# ========== 第2步:构建提示词 ==========
# 将检索到的文档组合成上下文
context = "\n\n".join([doc.page_content for doc in relevant_docs])
# 构建RAG提示词模板
rag_template = """基于以下上下文信息回答问题。如果上下文中没有相关信息,请明确说明"根据提供的资料,我无法回答这个问题"。
上下文信息:
{context}
用户问题:{question}
请提供准确、简洁的回答:"""
prompt = ChatPromptTemplate.from_template(rag_template)
final_prompt = prompt.format(context=context, question=query)
print("\n" + "="*50)
print("最终发送给模型的提示词:")
print("="*50)
print(final_prompt[:500] + "...")
# ========== 第3步:生成回答 ==========
llm = ChatOpenAI(
model="gpt-3.5-turbo",
api_key="your-api-key",
temperature=0 # 降低随机性,提高准确性
)
response = llm.invoke(final_prompt)
print("\n" + "="*50)
print("AI回答:")
print("="*50)
print(response.content)
- 索引阶段:加载文档 → 分割 → 嵌入 → 存储到向量数据库
- 查询阶段:用户提问 → 检索相关文档 → 组合提示词 → LLM生成回答
6. RetrievalQA链
LangChain提供了RetrievalQA链,将RAG流程封装成可复用的组件。
6.1 基础用法
from langchain.chains import RetrievalQA
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_chroma import Chroma
# 初始化组件
llm = ChatOpenAI(
model="gpt-3.5-turbo",
api_key="your-api-key",
temperature=0
)
embeddings = OpenAIEmbeddings(api_key="your-api-key")
# 加载向量存储
vectorstore = Chroma(
persist_directory="./rag_vector_db",
embedding_function=embeddings
)
# 创建RetrievalQA链
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # 链类型,后面详细解释
retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
return_source_documents=True, # 返回来源文档
verbose=True # 打印中间过程
)
# 执行问答
query = "请介绍一下公司的产品优势"
result = qa_chain.invoke({"query": query})
print("回答:")
print(result["result"])
print("\n来源文档:")
for i, doc in enumerate(result["source_documents"], 1):
print(f"{i}. {doc.page_content[:100]}...")
6.2 链类型详解
LangChain支持多种链类型来处理检索到的文档:
| 链类型 | 工作原理 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
stuff |
将所有文档"塞"进一个提示词 | 简单、只调用一次LLM | 文档过多时可能超出上下文限制 | 文档数量少、总计token数少 |
map_reduce |
每个文档单独处理,再综合结果 | 可处理大量文档 | 需要多次LLM调用,速度慢 | 文档多、需要全面分析 |
refine |
迭代式精炼答案 | 答案质量通常更好 | 调用次数多,依赖文档顺序 | 追求高质量答案 |
map_rerank |
给每个文档打分,选最高分 | 自动选择最相关文档 | 需要设计评分prompt | 有明确答案的问题 |
6.3 不同链类型使用示例
# ===== stuff链(默认)=====
# 直接将所有文档放入提示词
qa_stuff = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 3})
)
# ===== map_reduce链 =====
# 适合处理大量文档
qa_map_reduce = RetrievalQA.from_chain_type(
llm=llm,
chain_type="map_reduce",
retriever=vectorstore.as_retriever(search_kwargs={"k": 10}),
# 可以自定义map和reduce的prompt
chain_type_kwargs={
"question_prompt": ChatPromptTemplate.from_template(
"基于以下文档片段回答问题:\n{context}\n\n问题:{question}\n回答:"
),
"combine_prompt": ChatPromptTemplate.from_template(
"综合以下各片段的回答,给出最终答案:\n{summaries}\n\n最终答案:"
)
}
)
# ===== refine链 =====
# 迭代优化答案
qa_refine = RetrievalQA.from_chain_type(
llm=llm,
chain_type="refine",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)
# 测试不同链的效果
question = "公司有哪些核心产品?"
print("Stuff链结果:", qa_stuff.invoke({"query": question})["result"])
print("\nMapReduce链结果:", qa_map_reduce.invoke({"query": question})["result"])
print("\nRefine链结果:", qa_refine.invoke({"query": question})["result"])
6.4 自定义提示词模板
from langchain.prompts import PromptTemplate
# 自定义RAG提示词模板
custom_template = """你是一个专业的企业知识库助手。请基于提供的参考资料回答问题。
回答要求:
1. 如果资料中有明确答案,请准确回答
2. 如果资料中没有相关信息,请说"抱歉,根据现有资料无法回答"
3. 回答要简洁明了,避免冗余
4. 可以适当引用资料中的关键信息
参考资料:
{context}
用户问题:{question}
请提供你的回答:"""
CUSTOM_PROMPT = PromptTemplate(
template=custom_template,
input_variables=["context", "question"]
)
# 使用自定义提示词创建QA链
qa_chain_custom = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 4}),
chain_type_kwargs={"prompt": CUSTOM_PROMPT},
return_source_documents=True
)
result = qa_chain_custom.invoke({"query": "公司的请假政策是什么?"})
print(result["result"])
7. 多查询检索 MultiQueryRetriever
用户的问题可能表达方式多样,单一查询可能无法召回所有相关文档。MultiQueryRetriever通过生成多个查询变体来提高召回率。
工作原理
- LLM根据原始查询生成多个语义等价的不同表达
- 对每个查询执行检索
- 合并去重后的结果返回
from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain_openai import ChatOpenAI
# 初始化基础检索器
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# 创建MultiQueryRetriever
multi_retriever = MultiQueryRetriever.from_llm(
retriever=base_retriever,
llm=ChatOpenAI(temperature=0, api_key="your-api-key"),
# 生成的查询数量(默认3个)
# include_original=True # 是否包含原始查询
)
# 使用多查询检索
query = "怎么请假?"
docs = multi_retriever.get_relevant_documents(query)
print(f"原始查询:{query}")
print(f"检索到 {len(docs)} 个文档\n")
# 查看生成的查询(需要设置日志级别)
import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)
# 再次执行以查看生成的查询
docs = multi_retriever.get_relevant_documents(query)
MultiQueryRetriever会增加LLM调用次数,适合以下场景:
- 用户查询简短、表达多样时
- 对召回率要求高时
- 可以接受稍高延迟的场景
8. 父文档检索 ParentDocumentRetriever
一个常见的问题:小块文本便于精确匹配查询,但可能丢失上下文;大块文本保留完整上下文,但可能包含无关信息。
ParentDocumentRetriever提供了解决方案:
- 检索阶段:使用小块(child chunk)进行相似度匹配
- 生成阶段:返回大块(parent document)给LLM
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_openai import OpenAIEmbeddings
# 创建两种分割器
# 子分割器:用于索引和检索(小块,精确匹配)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=200, chunk_overlap=20)
# 父分割器:用于返回给LLM(大块,完整上下文)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
# 创建向量存储(存储子块)
vectorstore = Chroma(
collection_name="parent_doc_store",
embedding_function=OpenAIEmbeddings(api_key="your-api-key"),
persist_directory="./parent_doc_db"
)
# 创建文档存储(存储父块)
docstore = InMemoryStore()
# 创建ParentDocumentRetriever
retriever = ParentDocumentRetriever(
vectorstore=vectorstore,
docstore=docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
# 可选:限制返回的父文档数量
# search_kwargs={"k": 2}
)
# 加载原始文档
from langchain.document_loaders import TextLoader
loader = TextLoader("./long_document.txt", encoding="utf-8")
docs = loader.load()
# 添加文档
# 文档会被父分割器分割,存储到docstore
# 同时子分割器分割后的块会嵌入存储到vectorstore
retriever.add_documents(docs, ids=None)
print(f"向量存储中的子块数:{vectorstore._collection.count()}")
# 查询 - 使用子块匹配,但返回父块
query = "某个具体问题"
relevant_docs = retriever.get_relevant_documents(query)
print(f"\n检索到 {len(relevant_docs)} 个父文档")
for i, doc in enumerate(relevant_docs):
print(f"\n父文档 {i+1} (长度:{len(doc.page_content)}):")
print(doc.page_content[:500] + "...")
- 小块检索:更精确的语义匹配
- 大块生成:保留完整的上下文信息
- 减少无关信息:避免返回过多不相关的文本块
- 提高回答质量:LLM有更完整的上下文来生成回答
9. 完整实战:公司知识库问答系统
本节将构建一个完整的公司知识库问答系统,包含文档加载、索引构建、问答交互等全部功能。
项目结构
company_kb_qa/
├── knowledge_base/ # 知识库文档目录
│ ├── 员工手册.txt
│ ├── 产品说明.txt
│ └── 常见问题.txt
├── create_index.py # 构建索引脚本
├── qa_system.py # 问答系统主程序
└── chroma_db/ # 向量数据库目录
9.1 构建索引脚本 create_index.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
公司知识库索引构建脚本
运行此脚本将知识库文档转换为向量索引
"""
import os
from langchain.document_loaders import DirectoryLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
def create_knowledge_base_index(
docs_dir: str = "./knowledge_base",
db_dir: str = "./chroma_db",
collection_name: str = "company_kb"
):
"""
从文档目录构建向量索引
Args:
docs_dir: 知识库文档目录路径
db_dir: 向量数据库存储路径
collection_name: 集合名称
"""
# 检查文档目录是否存在
if not os.path.exists(docs_dir):
print(f"错误:文档目录 {docs_dir} 不存在")
print("请创建目录并添加知识库文档(.txt, .pdf等)")
return False
print(f"开始构建知识库索引...")
print(f"文档目录:{os.path.abspath(docs_dir)}")
# ========== 1. 加载文档 ==========
print("\n📄 正在加载文档...")
# 支持多种文件类型的加载器字典
loaders = {
"*.txt": TextLoader,
# "*.pdf": PyPDFLoader,
# "*.md": UnstructuredMarkdownLoader,
}
all_documents = []
for glob_pattern, loader_cls in loaders.items():
try:
loader = DirectoryLoader(
docs_dir,
glob=glob_pattern,
loader_cls=loader_cls,
loader_kwargs={'encoding': 'utf-8'} if 'TextLoader' in str(loader_cls) else {},
show_progress=True
)
docs = loader.load()
all_documents.extend(docs)
print(f" ✓ {glob_pattern}: 加载 {len(docs)} 个文档")
except Exception as e:
print(f" ✗ {glob_pattern}: 加载失败 - {e}")
if not all_documents:
print("\n❌ 没有加载到任何文档,请检查文档目录")
return False
print(f"\n✅ 共加载 {len(all_documents)} 个文档")
# ========== 2. 分割文档 ==========
print("\n✂️ 正在分割文档...")
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每块500字符
chunk_overlap=50, # 重叠50字符保持上下文
separators=["\n\n", "\n", "。", "!", "?", ";", " ", ""],
length_function=len,
)
chunks = text_splitter.split_documents(all_documents)
print(f"✅ 分割为 {len(chunks)} 个文本块")
# 显示分割统计信息
chunk_lengths = [len(chunk.page_content) for chunk in chunks]
print(f" 平均块长度:{sum(chunk_lengths) // len(chunk_lengths)} 字符")
print(f" 最大块长度:{max(chunk_lengths)} 字符")
print(f" 最小块长度:{min(chunk_lengths)} 字符")
# ========== 3. 创建向量存储 ==========
print("\n🔤 正在生成嵌入向量...")
# 确保存储目录存在
os.makedirs(db_dir, exist_ok=True)
# 初始化嵌入模型
embeddings = OpenAIEmbeddings(
model="text-embedding-ada-002",
# api_key="your-api-key" # 也可从环境变量读取
)
# 创建向量存储
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory=db_dir,
collection_name=collection_name
)
print(f"\n✅ 索引构建完成!")
print(f" 向量数据库路径:{os.path.abspath(db_dir)}")
print(f" 集合名称:{collection_name}")
print(f" 文档块总数:{vectorstore._collection.count()}")
return True
if __name__ == "__main__":
# 从环境变量读取API密钥
import os
if not os.getenv("OPENAI_API_KEY"):
print("⚠️ 警告:未设置 OPENAI_API_KEY 环境变量")
print("请在运行前设置:export OPENAI_API_KEY='your-key'")
# 执行索引构建
success = create_knowledge_base_index()
if success:
print("\n🎉 知识库索引构建成功!可以运行问答系统了。")
else:
print("\n❌ 索引构建失败,请检查错误信息。")
9.2 问答系统主程序 qa_system.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
公司知识库问答系统
支持连续对话、来源展示、历史记录等功能
"""
import os
import sys
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime
from langchain_chroma import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from langchain_core.documents import Document
@dataclass
class QAResponse:
"""问答响应数据结构"""
answer: str
source_documents: List[Document]
query_time: datetime
confidence: Optional[str] = None
class CompanyKBQA:
"""公司知识库问答系统"""
def __init__(
self,
db_dir: str = "./chroma_db",
collection_name: str = "company_kb",
model: str = "gpt-3.5-turbo",
temperature: float = 0.0
):
"""
初始化问答系统
Args:
db_dir: 向量数据库目录
collection_name: 集合名称
model: 使用的LLM模型
temperature: 生成随机性
"""
self.db_dir = db_dir
self.collection_name = collection_name
# 检查API密钥
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("请设置 OPENAI_API_KEY 环境变量")
# 初始化嵌入模型
print("🔤 正在加载嵌入模型...")
self.embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")
# 加载向量存储
print("📚 正在加载知识库...")
if not os.path.exists(db_dir):
raise FileNotFoundError(
f"向量数据库不存在:{db_dir}\n"
"请先运行 create_index.py 构建索引"
)
self.vectorstore = Chroma(
persist_directory=db_dir,
embedding_function=self.embeddings,
collection_name=collection_name
)
doc_count = self.vectorstore._collection.count()
print(f" ✓ 加载了 {doc_count} 个文档块")
# 初始化LLM
print("🤖 正在初始化语言模型...")
self.llm = ChatOpenAI(
model=model,
temperature=temperature
)
# 创建自定义提示词模板
self.qa_prompt = PromptTemplate(
template="""你是公司的智能助手,专门回答员工关于公司政策、流程、产品等方面的问题。
请严格基于以下参考资料回答问题:
{context}
回答要求:
1. 如果参考资料中有明确答案,请准确、简洁地回答
2. 如果参考资料中没有相关信息,请明确说"抱歉,根据现有知识库资料,我无法回答这个问题"
3. 不要编造信息,不要基于你的训练数据回答
4. 可以适当引用资料中的关键内容
5. 保持友好、专业的语气
用户问题:{question}
请提供你的回答:""",
input_variables=["context", "question"]
)
# 创建QA链
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vectorstore.as_retriever(
search_type="similarity",
search_kwargs={"k": 5} # 检索最相似的5个文档
),
chain_type_kwargs={"prompt": self.qa_prompt},
return_source_documents=True,
verbose=False
)
# 对话历史
self.history: List[dict] = []
print("\n✅ 问答系统初始化完成!\n")
def ask(self, question: str) -> QAResponse:
"""
提问并获取回答
Args:
question: 用户问题
Returns:
QAResponse对象,包含回答和来源文档
"""
start_time = datetime.now()
# 执行问答
result = self.qa_chain.invoke({"query": question})
# 构建响应
response = QAResponse(
answer=result["result"],
source_documents=result["source_documents"],
query_time=datetime.now()
)
# 记录历史
self.history.append({
"question": question,
"answer": response.answer,
"time": response.query_time,
"sources": [doc.metadata.get('source', 'unknown') for doc in response.source_documents]
})
return response
def print_response(self, response: QAResponse, show_sources: bool = True):
"""格式化打印回答"""
print("=" * 60)
print("🤖 AI助手回答:")
print("=" * 60)
print(response.answer)
print()
if show_sources and response.source_documents:
print("📚 参考来源:")
print("-" * 60)
seen_sources = set()
for i, doc in enumerate(response.source_documents, 1):
source = doc.metadata.get('source', '未知来源')
if source not in seen_sources:
print(f" {i}. {source}")
seen_sources.add(source)
print()
def interactive_mode(self):
"""交互式问答模式"""
print("=" * 60)
print("🏢 公司知识库问答系统")
print("=" * 60)
print("\n使用方法:")
print(" • 直接输入问题开始问答")
print(" • 输入 'quit' 或 'exit' 退出")
print(" • 输入 'history' 查看对话历史")
print(" • 输入 'clear' 清空历史")
print("\n" + "=" * 60 + "\n")
while True:
try:
# 获取用户输入
question = input("❓ 请输入您的问题:").strip()
# 处理特殊命令
if not question:
continue
if question.lower() in ['quit', 'exit', 'q']:
print("\n👋 感谢使用,再见!")
break
if question.lower() == 'history':
self.show_history()
continue
if question.lower() == 'clear':
self.history.clear()
print("✅ 对话历史已清空\n")
continue
# 执行问答
print("🤔 正在思考...")
response = self.ask(question)
self.print_response(response)
except KeyboardInterrupt:
print("\n\n👋 再见!")
break
except Exception as e:
print(f"\n❌ 出错:{e}\n")
def show_history(self):
"""显示对话历史"""
if not self.history:
print("\n📭 暂无对话历史\n")
return
print("\n" + "=" * 60)
print("📜 对话历史")
print("=" * 60)
for i, record in enumerate(self.history, 1):
print(f"\n【对话 {i}】{record['time'].strftime('%Y-%m-%d %H:%M:%S')}")
print(f"问:{record['question']}")
print(f"答:{record['answer'][:100]}..." if len(record['answer']) > 100 else f"答:{record['answer']}")
print(f"来源:{', '.join(record['sources'])}")
print("\n")
def main():
"""主函数"""
# 检查命令行参数
if len(sys.argv) > 1 and sys.argv[1] == '--create-index':
# 只构建索引模式
from create_index import create_knowledge_base_index
create_knowledge_base_index()
return
try:
# 初始化并启动问答系统
qa_system = CompanyKBQA()
qa_system.interactive_mode()
except Exception as e:
print(f"\n❌ 系统启动失败:{e}")
print("\n解决方案:")
print(" 1. 确保已设置 OPENAI_API_KEY 环境变量")
print(" 2. 确保知识库文档已放入 knowledge_base 目录")
print(" 3. 运行 'python qa_system.py --create-index' 构建索引")
sys.exit(1)
if __name__ == "__main__":
main()
9.3 使用示例
# 1. 设置环境变量
export OPENAI_API_KEY="your-api-key"
# 2. 准备知识库文档
mkdir -p knowledge_base
echo "公司年假政策:员工每年享有10天带薪年假..." > knowledge_base/员工手册.txt
echo "产品A特点:高性能、易用、支持多平台..." > knowledge_base/产品说明.txt
# 3. 构建索引
python qa_system.py --create-index
# 或
python create_index.py
# 4. 启动问答系统
python qa_system.py
# 交互示例:
# ❓ 请输入您的问题:公司年假有多少天?
# 🤔 正在思考...
# ============================================================
# 🤖 AI助手回答:
# ============================================================
# 根据公司员工手册,员工每年享有10天带薪年假。
#
# 📚 参考来源:
# ------------------------------------------------------------
# 1. knowledge_base/员工手册.txt
# ❓ 请输入您的问题:产品A有什么特点?
# ...
9.4 扩展功能建议
在此基础上,你可以进一步扩展系统功能:
- Web界面:使用Gradio或Streamlit构建可视化界面
- 多轮对话:集成记忆功能(下章讲解)
- 用户管理:添加登录、权限控制
- 文档管理:支持动态添加、删除文档
- 回答评分:收集用户反馈优化系统
- 多语言支持:支持中英文等不同语言的知识库
本章小结
本章我们深入学习了向量数据库和RAG技术:
- 向量数据库:Chroma的使用方法和与LangChain的集成
- RAG架构:索引阶段(加载→分割→嵌入→存储)和查询阶段(检索→组合提示→生成)
- RetrievalQA链:快速构建RAG问答系统,支持多种链类型
- 高级检索:MultiQueryRetriever提高召回率,ParentDocumentRetriever平衡精度与上下文
- 完整项目:公司知识库问答系统的实战开发
- 使用Chroma创建一个个人笔记知识库
- 尝试不同的文本分割参数,观察对检索效果的影响
- 对比不同链类型(stuff、map_reduce、refine)的效果差异
- 将本章的问答系统扩展为支持多轮对话(预习下一章内容)