第4章:文档加载与处理

本章将带你掌握 LangChain 中文档加载与处理的核心技能。这是构建知识库、RAG(检索增强生成)应用的基础,让你的 AI 能够"阅读"和理解各种文档。

4.1 Document 概念

在 LangChain 中,Document(文档)是处理文本数据的基本单位。它是一个简单的数据结构,包含两个主要属性:

Document 的数据结构

from langchain.schema import Document

# 创建一个 Document 对象
doc = Document(
    page_content="这是文档的主要内容,可以是任意长度的文本。",
    metadata={
        "source": "example.txt",      # 文档来源
        "page": 1,                   # 页码(可选)
        "author": "张三",            # 作者(可选)
        "created_date": "2024-01-01" # 创建日期(可选)
    }
)

# 访问文档内容
print(f"内容: {doc.page_content}")
print(f"来源: {doc.metadata['source']}")
print(f"完整文档: {doc}")
属性 类型 说明
page_content str 文档的文本内容,是后续处理的核心数据
metadata dict 文档的元数据,用于存储来源、页码、作者等信息

为什么需要 metadata?在处理大量文档时,metadata 能帮助我们:

  • 追踪文档来源(知道答案来自哪份文件)
  • 过滤特定文档(只处理某个作者或日期的文档)
  • 显示引用信息(回答问题时标注出处)

4.2 文档加载器(Loaders)

文档加载器(Document Loaders)负责从不同来源和格式加载文档。LangChain 提供了丰富的加载器,支持几乎所有常见的文档格式。

常用文档加载器概览

加载器 支持格式 适用场景
TextLoader .txt 加载纯文本文件
PyPDFLoader .pdf 加载 PDF 文档
CSVLoader .csv 加载表格数据
UnstructuredFileLoader 多种格式 通用加载器,自动识别格式
DirectoryLoader 目录 批量加载整个文件夹的文档
WebBaseLoader 网页 加载网页内容

TextLoader - 加载文本文件

from langchain.document_loaders import TextLoader

# ==================== 基础用法 ====================

# 加载单个文本文件
loader = TextLoader("./data/example.txt", encoding="utf-8")
documents = loader.load()

# TextLoader.load() 返回 Document 对象列表
print(f"加载了 {len(documents)} 个文档")

# 查看第一个文档
if documents:
    doc = documents[0]
    print(f"\n文档内容预览(前200字):")
    print(doc.page_content[:200])
    print(f"\n元数据:{doc.metadata}")

# ==================== 处理多个文件 ====================

import os
from pathlib import Path

# 批量加载一个目录中的所有 txt 文件
data_dir = Path("./data/texts")
txt_files = list(data_dir.glob("*.txt"))

all_documents = []
for txt_file in txt_files:
    try:
        loader = TextLoader(str(txt_file), encoding="utf-8")
        docs = loader.load()
        all_documents.extend(docs)
        print(f"✓ 加载成功: {txt_file.name}")
    except Exception as e:
        print(f"✗ 加载失败: {txt_file.name} - {e}")

print(f"\n总共加载了 {len(all_documents)} 个文档")

# ==================== 处理编码问题 ====================

# 如果文件编码不确定,可以尝试多种编码
encodings = ["utf-8", "gbk", "gb2312", "latin-1"]

def load_with_fallback_encoding(file_path):
    """尝试多种编码加载文件"""
    for encoding in encodings:
        try:
            loader = TextLoader(file_path, encoding=encoding)
            return loader.load()
        except UnicodeDecodeError:
            continue
    raise ValueError(f"无法解码文件: {file_path}")

# 使用
# docs = load_with_fallback_encoding("./data/legacy_file.txt")

PyPDFLoader - 加载 PDF 文档

from langchain.document_loaders import PyPDFLoader

# ==================== 基础用法 ====================

# 加载单个 PDF 文件
loader = PyPDFLoader("./data/document.pdf")
pages = loader.load()

# PyPDFLoader 会为每一页创建一个 Document
print(f"PDF 共 {len(pages)} 页")

# 查看每一页的内容
for i, page in enumerate(pages[:3], 1):  # 只显示前3页
    print(f"\n--- 第 {i} 页 ---")
    print(f"内容预览: {page.page_content[:150]}...")
    print(f"页码: {page.metadata.get('page', 'N/A')}")

# ==================== 提取 PDF 元数据 ====================

# 获取 PDF 文件的基本信息
if pages:
    first_page = pages[0]
    print(f"\nPDF 元数据:")
    print(f"来源文件: {first_page.metadata.get('source', 'N/A')}")
    print(f"总页数: {len(pages)}")
    print(f"完整元数据: {first_page.metadata}")

# ==================== 批量加载多个 PDF ====================

from pathlib import Path

def load_pdfs_from_directory(directory):
    """加载目录中的所有 PDF 文件"""
    pdf_files = Path(directory).glob("*.pdf")
    all_pages = []
    
    for pdf_file in pdf_files:
        try:
            loader = PyPDFLoader(str(pdf_file))
            pages = loader.load()
            all_pages.extend(pages)
            print(f"✓ 已加载: {pdf_file.name} ({len(pages)} 页)")
        except Exception as e:
            print(f"✗ 加载失败: {pdf_file.name} - {e}")
    
    return all_pages

# 使用
# all_pdf_pages = load_pdfs_from_directory("./data/pdfs")

# ==================== PDF 内容清理 ====================

# PDF 提取的文本经常包含多余的换行符,可以清理
def clean_pdf_text(text):
    """清理 PDF 提取的文本"""
    # 移除多余的换行(保留段落分隔)
    text = text.replace('\n\n', '[PARA]')  # 临时标记段落
    text = text.replace('\n', ' ')         # 移除单个换行
    text = text.replace('[PARA]', '\n\n')  # 恢复段落
    # 移除多余空格
    text = ' '.join(text.split())
    return text.strip()

# 应用清理
if pages:
    clean_content = clean_pdf_text(pages[0].page_content)
    print(f"\n清理后的内容:\n{clean_content[:300]}")

CSVLoader - 加载表格数据

from langchain.document_loaders import CSVLoader

# ==================== 基础用法 ====================

# 假设有一个包含以下内容的 CSV 文件:
# name,age,city,description
# 张三,25,北京,软件工程师,喜欢编程
# 李四,30,上海,产品经理,擅长用户研究

loader = CSVLoader(
    file_path="./data/contacts.csv",
    encoding="utf-8"
)
documents = loader.load()

# CSVLoader 会为每一行创建一个 Document
print(f"加载了 {len(documents)} 行数据\n")

# 查看每一行的内容
for i, doc in enumerate(documents[:3], 1):
    print(f"--- 第 {i} 行 ---")
    print(f"内容: {doc.page_content}")
    print(f"来源: {doc.metadata.get('source', 'N/A')}")
    print(f"行号: {doc.metadata.get('row', 'N/A')}")
    print()

# ==================== 指定列内容 ====================

# 只加载特定列作为内容
loader = CSVLoader(
    file_path="./data/contacts.csv",
    source_column="name",  # 使用 name 列作为来源标识
    encoding="utf-8"
)

# ==================== 自定义内容格式 ====================

from csv import DictReader

# 如果需要更复杂的处理,可以自定义加载逻辑
def custom_csv_loader(file_path, content_columns):
    """自定义 CSV 加载器,合并多列作为内容"""
    documents = []
    
    with open(file_path, 'r', encoding='utf-8') as f:
        reader = DictReader(f)
        for i, row in enumerate(reader):
            # 合并指定列的内容
            content_parts = [f"{col}: {row.get(col, '')}" for col in content_columns]
            content = "\n".join(content_parts)
            
            doc = Document(
                page_content=content,
                metadata={
                    "source": file_path,
                    "row": i,
                    **{k: v for k, v in row.items() if k not in content_columns}
                }
            )
            documents.append(doc)
    
    return documents

# 使用:合并 name 和 description 作为内容
# docs = custom_csv_loader("./data/contacts.csv", ["name", "description"])

UnstructuredFileLoader - 通用加载器

from langchain.document_loaders import UnstructuredFileLoader

# ==================== 自动识别格式 ====================

# UnstructuredFileLoader 可以自动识别多种文件格式
# 支持的格式包括:txt, pdf, docx, pptx, html, md 等

# 加载 Word 文档
loader = UnstructuredFileLoader("./data/document.docx")
docs = loader.load()
print(f"加载了 {len(docs)} 个文档段落")

# 加载 HTML 文件
loader = UnstructuredFileLoader("./data/page.html")
docs = loader.load()

# 加载 Markdown 文件
loader = UnstructuredFileLoader("./data/readme.md")
docs = loader.load()

# ==================== 模式选择 ====================

# UnstructuredFileLoader 支持两种模式:
# 1. "single" - 整个文件作为一个 Document(默认)
# 2. "elements" - 每个元素(段落、标题等)作为单独的 Document

# 单文档模式
loader_single = UnstructuredFileLoader(
    "./data/report.pdf",
    mode="single"
)
docs_single = loader_single.load()
print(f"单文档模式: {len(docs_single)} 个文档")

# 元素模式(更细粒度)
loader_elements = UnstructuredFileLoader(
    "./data/report.pdf",
    mode="elements"
)
docs_elements = loader_elements.load()
print(f"元素模式: {len(docs_elements)} 个元素")

# 查看元素类型
for doc in docs_elements[:5]:
    element_type = doc.metadata.get("category", "text")
    print(f"[{element_type}] {doc.page_content[:50]}...")

# ==================== 安装依赖 ====================

# 使用 UnstructuredFileLoader 需要安装额外的依赖:
# pip install unstructured
# 
# 如果需要处理特定格式,安装对应的依赖:
# pip install unstructured[pdf]      # PDF 支持
# pip install unstructured[docx]     # Word 支持
# pip install unstructured[pptx]     # PPT 支持

DirectoryLoader - 批量加载目录

from langchain.document_loaders import DirectoryLoader, TextLoader

# ==================== 加载整个目录 ====================

# 加载目录中的所有 txt 文件
loader = DirectoryLoader(
    path="./data/documents",      # 目录路径
    glob="**/*.txt",              # 匹配模式,** 表示递归子目录
    loader_cls=TextLoader,        # 使用的加载器类
    loader_kwargs={"encoding": "utf-8"}  # 传递给加载器的参数
)

documents = loader.load()
print(f"共加载了 {len(documents)} 个文档")

# 查看加载的文件
sources = set(doc.metadata["source"] for doc in documents)
print(f"\n来源文件:")
for source in sorted(sources):
    print(f"  - {source}")

# ==================== 递归加载多种格式 ====================

# 加载目录中的 txt 和 pdf 文件
txt_loader = DirectoryLoader(
    "./data/documents",
    glob="**/*.txt",
    loader_cls=TextLoader,
    loader_kwargs={"encoding": "utf-8"},
    show_progress=True  # 显示加载进度
)

# 可以同时加载多种类型然后合并
# pdf_loader = DirectoryLoader("./data/documents", glob="**/*.pdf", loader_cls=PyPDFLoader)

# all_docs = txt_loader.load() + pdf_loader.load()

# ==================== 使用多进程加速 ====================

# 加载大量文件时,可以使用多进程加速
loader = DirectoryLoader(
    "./data/large_corpus",
    glob="**/*.txt",
    loader_cls=TextLoader,
    loader_kwargs={"encoding": "utf-8"},
    use_multithreading=True,  # 启用多线程
    max_concurrency=4         # 并发数
)

# documents = loader.load()

加载器选择建议:

  • 纯文本 → TextLoader
  • PDF 文件 → PyPDFLoader(简单)或 UnstructuredFileLoader(复杂)
  • 表格数据 → CSVLoader
  • 多种格式混合 → UnstructuredFileLoader
  • 批量加载 → DirectoryLoader

4.3 文档分割(Splitters)

LLM 有上下文长度限制(如 GPT-3.5 通常是 4K-16K tokens),我们无法一次性处理超长文档。因此需要将文档分割成适当大小的文本块(Chunks)

为什么要分割文档

不分割的后果:

  • 超出模型上下文限制 → 报错或截断
  • 信息过于密集 → 模型难以聚焦重点
  • 检索效果差 → 无法精准定位相关信息

常用分割器对比

分割器 分割方式 优点 缺点
CharacterTextSplitter 按字符数分割 简单直接 可能切断句子
RecursiveCharacterTextSplitter 递归按分隔符分割 保持语义完整(推荐) 分割速度稍慢
TokenTextSplitter 按 Token 数分割 精确控制 Token 数 需要 Tokenizer
MarkdownHeaderTextSplitter 按 Markdown 标题分割 保持文档结构 仅适用于 Markdown

CharacterTextSplitter - 字符分割

from langchain.text_splitter import CharacterTextSplitter

# 示例长文本
long_text = """
人工智能(Artificial Intelligence,简称 AI)是计算机科学的一个分支,它企图了解智能的实质,
并生产出一种新的能以人类智能相似的方式做出反应的智能机器。该领域的研究包括机器人、
语言识别、图像识别、自然语言处理和专家系统等。人工智能从诞生以来,理论和技术日益成熟,
应用领域也不断扩大,可以设想,未来人工智能带来的科技产品,将会是人类智慧的"容器"。

机器学习是实现人工智能的一种重要方法。机器学习专门研究计算机怎样模拟或实现人类的
学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。
深度学习是机器学习的一种,它使用了多层神经网络来模拟人脑的工作方式。

自然语言处理(NLP)是人工智能和语言学领域的分支学科。它研究能实现人与计算机之间
用自然语言进行有效通信的各种理论和方法。自然语言处理包括文本分析、机器翻译、
情感分析、问答系统等多个方向。
"""

# ==================== 基础用法 ====================

# 创建字符分割器
splitter = CharacterTextSplitter(
    separator="\n",       # 分割符,按段落分割
    chunk_size=100,       # 每个块的最大字符数
    chunk_overlap=20,     # 相邻块之间的重叠字符数
    length_function=len,  # 计算长度的函数
    is_separator_regex=False  # separator 是否正则表达式
)

# 执行分割
chunks = splitter.split_text(long_text)

print(f"共分割成 {len(chunks)} 个块\n")
for i, chunk in enumerate(chunks, 1):
    print(f"--- 块 {i} ({len(chunk)} 字符) ---")
    print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
    print()

# ==================== 关键参数说明 ====================

# chunk_size: 每个文本块的最大长度
# - 太小:信息碎片化,丢失上下文
# - 太大:可能超出模型限制,检索不精准
# - 建议:根据模型限制和具体场景调整,通常 500-2000 字符

# chunk_overlap: 相邻块之间的重叠长度
# - 作用:保持上下文连贯性
# - 建议:设置为 chunk_size 的 10%-20%
# - 例如:chunk_size=1000 时,overlap 可设为 100-200

# ==================== 从 Document 分割 ====================

from langchain.schema import Document

# 如果有 Document 对象,可以直接分割
docs = [Document(page_content=long_text, metadata={"source": "example.txt"})]

# split_documents 会保留原始 metadata
chunked_docs = splitter.split_documents(docs)

print(f"\n从 Document 分割:{len(chunked_docs)} 个块")
for doc in chunked_docs[:3]:
    print(f"元数据保留: {doc.metadata}")
    print(f"内容长度: {len(doc.page_content)} 字符\n")

RecursiveCharacterTextSplitter - 递归分割(推荐)

from langchain.text_splitter import RecursiveCharacterTextSplitter

# 示例文本(包含多种结构)
sample_text = """
# 第一章:Python 基础

Python 是一种解释型、面向对象、动态数据类型的高级程序设计语言。
它由 Guido van Rossum 于 1989 年底发明,第一个公开发行版发行于 1991 年。

## 1.1 Python 的特点

Python 具有以下特点:
1. 简单易学:Python 有极其简单的语法,适合初学者。
2. 开源免费:Python 是开源的,可以自由使用和分发。
3. 丰富的库:Python 拥有大量的标准库和第三方库。

## 1.2 Python 的应用领域

Python 被广泛应用于以下领域:
- Web 开发:Django、Flask 等框架
- 数据分析:Pandas、NumPy 等库
- 人工智能:TensorFlow、PyTorch 等框架
- 自动化运维:Ansible、SaltStack 等工具

# 第二章:Python 语法基础

## 2.1 变量和数据类型

Python 中的变量不需要声明类型,直接赋值即可使用。

```python
name = "张三"  # 字符串
age = 25       # 整数
height = 1.75  # 浮点数
```

## 2.2 控制结构

Python 支持常见的控制结构:
- if/elif/else:条件判断
- for:循环遍历
- while:条件循环
"""

# ==================== 基础用法 ====================

# 创建递归字符分割器(推荐!)
splitter = RecursiveCharacterTextSplitter(
    # 按优先级尝试的分隔符列表
    separators=["\n\n", "\n", "。", "!", "?", " ", ""],
    chunk_size=300,       # 目标块大小
    chunk_overlap=50,     # 重叠大小
    length_function=len,
)

# 执行分割
chunks = splitter.split_text(sample_text)

print(f"共分割成 {len(chunks)} 个块\n")
for i, chunk in enumerate(chunks[:5], 1):
    print(f"--- 块 {i} ({len(chunk)} 字符) ---")
    print(chunk[:120] + "..." if len(chunk) > 120 else chunk)
    print()

# ==================== 分隔符优先级说明 ====================

# RecursiveCharacterTextSplitter 的工作方式:
# 1. 首先尝试用第一个分隔符 "\n\n"(段落)分割
# 2. 如果某段仍大于 chunk_size,尝试用第二个分隔符 "\n"(换行)分割
# 3. 如果还大,尝试用 "。"(句号)分割
# 4. 依此类推,直到 " "(空格)和 ""(单个字符)
#
# 这样能最大程度保持语义完整性,避免切断句子

# ==================== 针对代码的分割 ====================

code_splitter = RecursiveCharacterTextSplitter.from_language(
    language="python",  # 指定编程语言
    chunk_size=200,
    chunk_overlap=30
)

python_code = """
def calculate_fibonacci(n):
    if n <= 0:
        return []
    elif n == 1:
        return [0]
    elif n == 2:
        return [0, 1]
    
    fib = [0, 1]
    for i in range(2, n):
        fib.append(fib[i-1] + fib[i-2])
    return fib

class Calculator:
    def __init__(self):
        self.history = []
    
    def add(self, a, b):
        result = a + b
        self.history.append(f"{a} + {b} = {result}")
        return result
    
    def subtract(self, a, b):
        result = a - b
        self.history.append(f"{a} - {b} = {result}")
        return result
"""

code_chunks = code_splitter.split_text(python_code)
print(f"\n代码分割结果:{len(code_chunks)} 个块")
for i, chunk in enumerate(code_chunks, 1):
    print(f"\n--- 代码块 {i} ---")
    print(chunk)

# ==================== 支持的语言列表 ====================

# 支持的语言:"python", "js", "ts", "java", "go", "rust", "cpp", "c", 
#             "csharp", "php", "ruby", "swift", "scala", "r", "matlab",
#             "kotlin", "lua", "perl", "haskell" 等

为什么推荐 RecursiveCharacterTextSplitter?

  • 优先在段落、句子边界处分割,保持语义完整
  • 只有当必要时才切断,减少信息丢失
  • 支持多种编程语言的智能分割

TokenTextSplitter - Token 分割

from langchain.text_splitter import TokenTextSplitter

# ==================== 基础用法 ====================

# TokenTextSplitter 按 Token 数分割
# 这对于控制 API 调用成本特别有用

splitter = TokenTextSplitter(
    chunk_size=100,      # 每个块最多 100 个 token
    chunk_overlap=20     # 重叠 20 个 token
)

text = """
OpenAI 的 GPT 模型使用 token 作为计费单位。一个 token 大约相当于 4 个英文字符
或 0.75 个汉字。理解 token 的概念对于控制 API 成本非常重要。

当你调用 OpenAI API 时,费用取决于输入和输出的 token 数量。不同的模型
有不同的价格和 token 限制。例如:
- GPT-3.5-turbo: 4K 或 16K token 限制
- GPT-4: 8K 或 32K token 限制

因此,在将文本发送到 API 之前,按 token 分割可以确保不会超出限制,
同时也能优化成本。
"""

chunks = splitter.split_text(text)

print(f"分割成 {len(chunks)} 个块\n")
for i, chunk in enumerate(chunks, 1):
    # 估算 token 数(实际使用时可用 tiktoken 精确计算)
    approx_tokens = len(chunk) // 4
    print(f"块 {i}: 约 {approx_tokens} tokens, {len(chunk)} 字符")

# ==================== 使用 tiktoken 精确计算 ====================

# 安装: pip install tiktoken
import tiktoken

def count_tokens(text, model="gpt-3.5-turbo"):
    """使用 tiktoken 精确计算 token 数"""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))

# 精确统计
for i, chunk in enumerate(chunks, 1):
    tokens = count_tokens(chunk)
    print(f"\n块 {i}: 精确 {tokens} tokens")
    print(f"内容: {chunk[:100]}...")

# ==================== 根据模型限制调整 ====================

# 不同模型有不同的 token 限制
MODEL_LIMITS = {
    "gpt-3.5-turbo": 4096,
    "gpt-3.5-turbo-16k": 16384,
    "gpt-4": 8192,
    "gpt-4-32k": 32768,
}

def create_splitter_for_model(model_name, reserve_tokens=500):
    """
    为特定模型创建合适的分割器
    reserve_tokens: 为输出和提示预留的 token 数
    """
    limit = MODEL_LIMITS.get(model_name, 4096)
    chunk_size = limit - reserve_tokens
    
    return TokenTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=min(200, chunk_size // 10)
    )

# 为 GPT-4 创建分割器
gpt4_splitter = create_splitter_for_model("gpt-4", reserve_tokens=1000)
print(f"\nGPT-4 分割器配置: chunk_size={gpt4_splitter._chunk_size}")

Markdown 专用分割器

from langchain.text_splitter import MarkdownHeaderTextSplitter

# ==================== 按标题分割 Markdown ====================

markdown_text = """
# 人工智能导论

人工智能是计算机科学的重要分支。

## 历史发展

### 早期探索(1950-1980)

图灵测试的提出标志着 AI 概念的诞生。
早期的专家系统取得了一些成功。

### 机器学习时代(1980-2010)

统计学习方法开始流行。
支持向量机、随机森林等算法被广泛应用。

### 深度学习时代(2010-至今)

神经网络的复兴带来了突破性的进展。
ImageNet 竞赛推动了计算机视觉的发展。

## 主要技术

### 机器学习

机器学习是 AI 的核心技术之一。
包括监督学习、无监督学习、强化学习。

### 深度学习

深度学习使用多层神经网络。
在图像、语音、文本等领域表现出色。

## 应用场景

- 计算机视觉
- 自然语言处理
- 语音识别
- 推荐系统
"""

# 创建 Markdown 分割器
# 指定要按哪些标题级别分割
splitter = MarkdownHeaderTextSplitter(
    headers_to_split_on=[
        ("#", "Header 1"),      # 按一级标题分割
        ("##", "Header 2"),     # 按二级标题分割
        ("###", "Header 3"),    # 按三级标题分割
    ]
)

# 执行分割
chunks = splitter.split_text(markdown_text)

print(f"分割成 {len(chunks)} 个块\n")
for i, chunk in enumerate(chunks, 1):
    print(f"--- 块 {i} ---")
    print(f"标题层级: {chunk.metadata}")
    print(f"内容: {chunk.page_content[:150]}...")
    print()

# ==================== 优势 ====================

# 按标题分割的优势:
# 1. 保持文档结构完整性
# 2. 方便后续按章节检索
# 3. 元数据包含标题信息,有助于理解上下文

4.4 文本嵌入(Embeddings)

文本嵌入(Embeddings)是将文本转换为数值向量的技术。相似的文本在向量空间中距离更近,这使得我们可以通过向量相似度来检索相关内容。

什么是文本嵌入

文本嵌入的可视化理解:

高维向量空间(简化为2D):

        机器学习
           ●
          / \
         /   \
        /     \
   深度学习 ●─────● 神经网络
        \     /
         \   /
          \ /
           ●
        人工智能

相似的概念在向量空间中距离更近

常用 Embedding 模型

模型 类型 特点 适用场景
OpenAIEmbeddings 云端 API 效果好,按 token 计费 快速原型、生产环境
HuggingFaceEmbeddings 本地模型 免费,需下载模型 数据敏感、离线环境
SentenceTransformer 本地模型 专门针对句子优化 语义相似度任务

OpenAIEmbeddings 使用

from langchain.embeddings import OpenAIEmbeddings
import os

os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# ==================== 基础用法 ====================

# 创建嵌入模型实例
embeddings = OpenAIEmbeddings(
    model="text-embedding-ada-002",  # 默认模型,效果好且便宜
    # model="text-embedding-3-small",  # 新版小模型,更快更便宜
    # model="text-embedding-3-large",  # 新版大模型,效果更好
)

# 嵌入单个文本
text = "人工智能是计算机科学的一个分支"
single_vector = embeddings.embed_query(text)

print(f"文本: {text}")
print(f"向量维度: {len(single_vector)}")
print(f"向量前10个值: {single_vector[:10]}")

# ==================== 批量嵌入 ====================

texts = [
    "人工智能是计算机科学的一个分支",
    "机器学习是实现人工智能的一种方法",
    "深度学习是机器学习的一个子领域",
    "自然语言处理研究计算机与人类语言的交互",
    "计算机视觉让机器能够"看见"和理解图像",
]

# embed_documents 用于批量嵌入(更高效)
vectors = embeddings.embed_documents(texts)

print(f"\n批量嵌入:")
print(f"文档数: {len(vectors)}")
print(f"每个向量维度: {len(vectors[0])}")

# ==================== 计算相似度 ====================

import numpy as np

def cosine_similarity(v1, v2):
    """计算两个向量的余弦相似度"""
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

# 计算相似度矩阵
print("\n相似度矩阵:")
print("                AI      ML      DL      NLP     CV")
for i, text_i in enumerate(texts):
    similarities = []
    for j, text_j in enumerate(texts):
        sim = cosine_similarity(vectors[i], vectors[j])
        similarities.append(f"{sim:.3f}")
    short_name = ["AI", "ML", "DL", "NLP", "CV"][i]
    print(f"{short_name:4s} {'  '.join(similarities)}")

# ==================== 语义搜索示例 ====================

query = "如何让机器理解语言"
query_vector = embeddings.embed_query(query)

print(f"\n查询: {query}")
print("相似度排名:")

# 计算查询与每个文档的相似度
scores = []
for i, doc_vector in enumerate(vectors):
    sim = cosine_similarity(query_vector, doc_vector)
    scores.append((sim, texts[i]))

# 按相似度排序
scores.sort(reverse=True)
for rank, (score, text) in enumerate(scores[:3], 1):
    print(f"{rank}. [{score:.4f}] {text[:30]}...")

HuggingFaceEmbeddings 本地模型

from langchain.embeddings import HuggingFaceEmbeddings

# ==================== 基础用法 ====================

# 使用 HuggingFace 的本地嵌入模型
# 首次使用时会自动下载模型

embeddings = HuggingFaceEmbeddings(
    model_name="sentence-transformers/all-MiniLM-L6-v2",
    # 这是一个轻量级模型,效果好且速度快
    # 向量维度: 384
    # 模型大小: 约 80MB
    model_kwargs={'device': 'cpu'},  # 使用 CPU(也可设置为 'cuda')
    encode_kwargs={'normalize_embeddings': True}  # 归一化向量
)

# 测试嵌入
texts = [
    "机器学习是人工智能的核心技术",
    "深度学习使用神经网络",
    "Python 是一种编程语言",
]

vectors = embeddings.embed_documents(texts)
print(f"向量维度: {len(vectors[0])}")

# ==================== 推荐的模型 ====================

MODELS = {
    # 轻量级(速度快,效果中等)
    "all-MiniLM-L6-v2": {
        "dim": 384,
        "size": "~80MB",
        "desc": "推荐入门使用"
    },
    
    # 中等(平衡速度和效果)
    "all-mpnet-base-v2": {
        "dim": 768,
        "size": "~420MB",
        "desc": "更好的语义理解"
    },
    
    # 中文专用
    "shibing624/text2vec-base-chinese": {
        "dim": 768,
        "size": "~400MB",
        "desc": "专门针对中文优化"
    },
    
    # 多语言
    "paraphrase-multilingual-MiniLM-L12-v2": {
        "dim": 384,
        "size": "~120MB",
        "desc": "支持50+语言"
    }
}

# ==================== 中文优化示例 ====================

# 使用中文专用模型
chinese_embeddings = HuggingFaceEmbeddings(
    model_name="shibing624/text2vec-base-chinese",
    model_kwargs={'device': 'cpu'}
)

chinese_texts = [
    "人工智能改变世界",
    "机器学习是AI的核心技术",
    "深度学习使用多层神经网络",
    "今天天气真好",
]

chinese_vectors = chinese_embeddings.embed_documents(chinese_texts)

# 测试中文语义相似度
import numpy as np

def cosine_similarity(v1, v2):
    return np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))

query_vec = chinese_embeddings.embed_query("什么是AI技术")

print("\n中文语义搜索:")
scores = []
for i, text in enumerate(chinese_texts):
    sim = cosine_similarity(query_vec, chinese_vectors[i])
    scores.append((sim, text))

scores.sort(reverse=True)
for rank, (score, text) in enumerate(scores, 1):
    print(f"{rank}. [{score:.4f}] {text}")

# ==================== 离线环境使用 ====================

# 如果处于离线环境,可以先在有网络的环境下下载模型:
# from sentence_transformers import SentenceTransformer
# model = SentenceTransformer('all-MiniLM-L6-v2')
# model.save('./local_models/all-MiniLM-L6-v2')

# 然后在离线环境加载本地模型:
# embeddings = HuggingFaceEmbeddings(
#     model_name="./local_models/all-MiniLM-L6-v2"
# )

模型选择建议:

  • 快速原型:使用 OpenAIEmbeddings
  • 数据敏感/离线:使用 HuggingFaceEmbeddings
  • 中文内容:使用 shibing624/text2vec-base-chinese
  • 资源受限:使用 all-MiniLM-L6-v2

4.5 完整流程示例

现在我们将所有知识点串联起来,展示从加载文档 → 分割 → 生成嵌入 → 保存到向量数据库的完整流程。

"""
完整文档处理流程
步骤:
1. 加载 PDF 文档
2. 分割文档
3. 生成嵌入向量
4. 保存到向量数据库
"""

from langchain.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import os

os.environ["OPENAI_API_KEY"] = "your-api-key-here"

# ==================== 步骤1: 加载 PDF ====================

print("=" * 60)
print("步骤1: 加载 PDF 文档")
print("=" * 60)

pdf_path = "./data/人工智能导论.pdf"
loader = PyPDFLoader(pdf_path)
pages = loader.load()

print(f"✓ 成功加载 PDF")
print(f"  - 文件路径: {pdf_path}")
print(f"  - 总页数: {len(pages)}")
print(f"  - 示例内容(第1页前100字):")
print(f"    {pages[0].page_content[:100]}...")

# ==================== 步骤2: 分割文档 ====================

print("\n" + "=" * 60)
print("步骤2: 分割文档")
print("=" * 60)

# 创建递归字符分割器
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", "。", "!", "?", " ", ""],
    chunk_size=500,       # 每块约500字符
    chunk_overlap=100,    # 重叠100字符
    length_function=len,
)

# 分割文档
chunks = text_splitter.split_documents(pages)

print(f"✓ 文档分割完成")
print(f"  - 原始页数: {len(pages)}")
print(f"  - 分割后块数: {len(chunks)}")
print(f"  - 平均每页分割: {len(chunks)/len(pages):.1f} 块")

# 查看分割后的示例
sample_chunk = chunks[0]
print(f"\n  - 示例块:")
print(f"    内容: {sample_chunk.page_content[:100]}...")
print(f"    元数据: {sample_chunk.metadata}")
print(f"    长度: {len(sample_chunk.page_content)} 字符")

# ==================== 步骤3: 生成嵌入 ====================

print("\n" + "=" * 60)
print("步骤3: 生成嵌入向量")
print("=" * 60)

embeddings = OpenAIEmbeddings(model="text-embedding-ada-002")

# 测试单个块的嵌入
sample_vector = embeddings.embed_query(chunks[0].page_content)
print(f"✓ 嵌入模型: text-embedding-ada-002")
print(f"  - 向量维度: {len(sample_vector)}")
print(f"  - 待处理块数: {len(chunks)}")
print(f"  - 预计 API 调用量: ~{len(chunks) * len(sample_vector) / 1000:.1f}K tokens")

# ==================== 步骤4: 保存到向量数据库 ====================

print("\n" + "=" * 60)
print("步骤4: 保存到向量数据库 (Chroma)")
print("=" * 60)

# 指定持久化目录
persist_directory = "./chroma_db/ai_book"

# 创建向量数据库
vectorstore = Chroma.from_documents(
    documents=chunks,           # 文档块
    embedding=embeddings,       # 嵌入模型
    persist_directory=persist_directory,  # 持久化目录
    collection_name="ai_book"   # 集合名称
)

# 持久化到磁盘
vectorstore.persist()

print(f"✓ 向量数据库创建完成")
print(f"  - 存储位置: {persist_directory}")
print(f"  - 集合名称: ai_book")
print(f"  - 文档数量: {vectorstore._collection.count()}")

# ==================== 步骤5: 测试检索 ====================

print("\n" + "=" * 60)
print("步骤5: 测试语义检索")
print("=" * 60)

# 定义测试查询
test_queries = [
    "什么是机器学习",
    "深度学习的基本原理",
    "神经网络的结构",
]

for query in test_queries:
    print(f"\n查询: '{query}'")
    print("-" * 40)
    
    # 相似度搜索
    results = vectorstore.similarity_search(
        query=query,
        k=2  # 返回最相似的2个结果
    )
    
    for i, doc in enumerate(results, 1):
        print(f"  {i}. {doc.page_content[:80]}...")
        print(f"     来源: {doc.metadata.get('source', 'N/A')}, 页码: {doc.metadata.get('page', 'N/A')}")

# ==================== 重新加载向量数据库 ====================

print("\n" + "=" * 60)
print("步骤6: 重新加载向量数据库")
print("=" * 60)

# 从磁盘加载(无需重新处理文档)
loaded_vectorstore = Chroma(
    persist_directory=persist_directory,
    embedding_function=embeddings,
    collection_name="ai_book"
)

print(f"✓ 数据库加载成功")
print(f"  - 文档数量: {loaded_vectorstore._collection.count()}")

# 测试加载后的数据库
results = loaded_vectorstore.similarity_search("人工智能的未来发展", k=1)
print(f"\n  - 检索测试: '{results[0].page_content[:60]}...'")

print("\n" + "=" * 60)
print("流程完成!")
print("=" * 60)

4.6 实战:构建文档处理管道

现在我们来完成一个完整的实战项目:处理一本电子书,准备用于问答系统

"""
电子书文档处理管道

功能:
1. 加载多格式文档(PDF、TXT、MD)
2. 智能分割(根据文档类型选择不同策略)
3. 生成嵌入并保存到向量数据库
4. 提供检索接口
5. 统计和日志功能
"""

from langchain.document_loaders import (
    PyPDFLoader,
    TextLoader,
    UnstructuredMarkdownLoader,
    DirectoryLoader
)
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    MarkdownHeaderTextSplitter
)
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from pathlib import Path
import json
from datetime import datetime
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ==================== 配置类 ====================

class ProcessingConfig:
    """文档处理配置"""
    
    # 嵌入模型配置
    EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
    
    # 分割配置
    CHUNK_SIZE = 500
    CHUNK_OVERLAP = 100
    
    # 向量数据库配置
    VECTOR_DB_PATH = "./vector_db/ebook_collection"
    COLLECTION_NAME = "ebook_qa"
    
    # 支持文件类型
    SUPPORTED_EXTENSIONS = ['.pdf', '.txt', '.md']

# ==================== 文档处理器类 ====================

class EbookProcessor:
    """电子书文档处理器"""
    
    def __init__(self, config: ProcessingConfig = None):
        self.config = config or ProcessingConfig()
        self.embeddings = HuggingFaceEmbeddings(
            model_name=self.config.EMBEDDING_MODEL
        )
        self.stats = {
            "processed_files": [],
            "total_chunks": 0,
            "errors": []
        }
    
    def load_document(self, file_path: str):
        """根据文件类型选择合适的加载器"""
        path = Path(file_path)
        extension = path.suffix.lower()
        
        logger.info(f"加载文件: {path.name}")
        
        if extension == '.pdf':
            loader = PyPDFLoader(file_path)
        elif extension == '.txt':
            loader = TextLoader(file_path, encoding='utf-8')
        elif extension == '.md':
            loader = UnstructuredMarkdownLoader(file_path)
        else:
            raise ValueError(f"不支持的文件类型: {extension}")
        
        documents = loader.load()
        
        # 添加文件信息到 metadata
        for doc in documents:
            doc.metadata.update({
                'filename': path.name,
                'file_type': extension,
                'file_path': str(path.absolute())
            })
        
        return documents
    
    def split_document(self, documents, file_type: str):
        """根据文档类型选择合适的分割策略"""
        
        if file_type == '.md':
            # Markdown 使用标题分割
            splitter = MarkdownHeaderTextSplitter(
                headers_to_split_on=[("#", "Header 1"), ("##", "Header 2")]
            )
            # MarkdownHeaderTextSplitter 需要字符串输入
            all_chunks = []
            for doc in documents:
                chunks = splitter.split_text(doc.page_content)
                # 继承原始 metadata
                for chunk in chunks:
                    chunk.metadata.update(doc.metadata)
                all_chunks.extend(chunks)
            return all_chunks
        else:
            # PDF 和 TXT 使用递归字符分割
            splitter = RecursiveCharacterTextSplitter(
                chunk_size=self.config.CHUNK_SIZE,
                chunk_overlap=self.config.CHUNK_OVERLAP,
                separators=["\n\n", "\n", "。", "!", "?", " ", ""]
            )
            return splitter.split_documents(documents)
    
    def process_file(self, file_path: str):
        """处理单个文件"""
        try:
            # 1. 加载
            documents = self.load_document(file_path)
            logger.info(f"  加载了 {len(documents)} 页/段落")
            
            # 2. 分割
            file_type = Path(file_path).suffix
            chunks = self.split_document(documents, file_type)
            logger.info(f"  分割成 {len(chunks)} 个块")
            
            # 3. 更新统计
            self.stats["processed_files"].append({
                "file": file_path,
                "pages": len(documents),
                "chunks": len(chunks)
            })
            self.stats["total_chunks"] += len(chunks)
            
            return chunks
            
        except Exception as e:
            logger.error(f"处理失败: {file_path} - {e}")
            self.stats["errors"].append({
                "file": file_path,
                "error": str(e)
            })
            return []
    
    def process_directory(self, directory: str):
        """批量处理目录中的所有文档"""
        dir_path = Path(directory)
        all_chunks = []
        
        logger.info(f"开始处理目录: {directory}")
        
        for ext in self.config.SUPPORTED_EXTENSIONS:
            files = list(dir_path.glob(f"*{ext}"))
            logger.info(f"找到 {len(files)} 个 {ext} 文件")
            
            for file_path in files:
                chunks = self.process_file(str(file_path))
                all_chunks.extend(chunks)
        
        logger.info(f"\n目录处理完成,共 {len(all_chunks)} 个块")
        return all_chunks
    
    def save_to_vectorstore(self, chunks):
        """保存到向量数据库"""
        if not chunks:
            logger.warning("没有块需要保存")
            return
        
        logger.info(f"保存 {len(chunks)} 个块到向量数据库...")
        
        vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory=self.config.VECTOR_DB_PATH,
            collection_name=self.config.COLLECTION_NAME
        )
        vectorstore.persist()
        
        logger.info(f"✓ 已保存到: {self.config.VECTOR_DB_PATH}")
        return vectorstore
    
    def save_stats(self, output_path: str = "./processing_stats.json"):
        """保存处理统计"""
        self.stats["processing_time"] = datetime.now().isoformat()
        self.stats["config"] = {
            "embedding_model": self.config.EMBEDDING_MODEL,
            "chunk_size": self.config.CHUNK_SIZE,
            "chunk_overlap": self.config.CHUNK_OVERLAP
        }
        
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(self.stats, f, ensure_ascii=False, indent=2)
        
        logger.info(f"统计信息已保存: {output_path}")

# ==================== 检索器类 ====================

class EbookRetriever:
    """电子书检索器"""
    
    def __init__(self, config: ProcessingConfig = None):
        self.config = config or ProcessingConfig()
        self.embeddings = HuggingFaceEmbeddings(
            model_name=self.config.EMBEDDING_MODEL
        )
        self.vectorstore = None
        self._load_vectorstore()
    
    def _load_vectorstore(self):
        """加载向量数据库"""
        try:
            self.vectorstore = Chroma(
                persist_directory=self.config.VECTOR_DB_PATH,
                embedding_function=self.embeddings,
                collection_name=self.config.COLLECTION_NAME
            )
            logger.info(f"向量数据库加载成功")
        except Exception as e:
            logger.error(f"加载向量数据库失败: {e}")
    
    def search(self, query: str, k: int = 3):
        """搜索相关内容"""
        if not self.vectorstore:
            raise ValueError("向量数据库未加载")
        
        results = self.vectorstore.similarity_search(query, k=k)
        return results
    
    def search_with_scores(self, query: str, k: int = 3):
        """搜索并返回相似度分数"""
        if not self.vectorstore:
            raise ValueError("向量数据库未加载")
        
        results = self.vectorstore.similarity_search_with_score(query, k=k)
        return results
    
    def format_results(self, query: str, results):
        """格式化搜索结果"""
        output = f"查询: '{query}'\n"
        output += "=" * 60 + "\n\n"
        
        for i, doc in enumerate(results, 1):
            output += f"[{i}] 相似度: {doc.metadata.get('score', 'N/A'):.4f}\n"
            output += f"来源: {doc.metadata.get('filename', 'N/A')}\n"
            output += f"内容: {doc.page_content[:200]}...\n\n"
        
        return output

# ==================== 主程序 ====================

def main():
    """主程序"""
    config = ProcessingConfig()
    
    # ===== 阶段1: 处理文档 =====
    print("=" * 70)
    print("阶段1: 文档处理")
    print("=" * 70)
    
    processor = EbookProcessor(config)
    
    # 处理单个文件示例
    # chunks = processor.process_file("./data/book.pdf")
    
    # 处理整个目录
    chunks = processor.process_directory("./data/ebooks")
    
    # 保存到向量数据库
    if chunks:
        vectorstore = processor.save_to_vectorstore(chunks)
        processor.save_stats()
    
    # ===== 阶段2: 测试检索 =====
    print("\n" + "=" * 70)
    print("阶段2: 测试检索")
    print("=" * 70)
    
    retriever = EbookRetriever(config)
    
    # 测试查询
    test_queries = [
        "什么是深度学习",
        "神经网络的工作原理",
        "如何训练机器学习模型",
    ]
    
    for query in test_queries:
        print(f"\n{'='*70}")
        results = retriever.search_with_scores(query, k=2)
        
        # 添加分数到 metadata(用于显示)
        for doc, score in results:
            doc.metadata['score'] = score
        
        docs_only = [doc for doc, _ in results]
        print(retriever.format_results(query, docs_only))

if __name__ == "__main__":
    main()

实战项目亮点:

  • 多格式支持:自动识别 PDF、TXT、Markdown 文件
  • 智能分割策略:根据文档类型选择最佳分割方式
  • 本地嵌入:使用 HuggingFace 模型,无需联网
  • 完整统计:记录处理日志,便于追踪
  • 模块化设计:Processor 和 Retriever 分离,易于扩展

本章小结

组件 核心功能 关键参数/概念
Document 文本数据的基本单位 page_content, metadata
Loaders 从不同格式加载文档 TextLoader, PyPDFLoader, CSVLoader, DirectoryLoader
Splitters 将长文档分割成小块 chunk_size, chunk_overlap
Embeddings 文本转向量表示 OpenAIEmbeddings, HuggingFaceEmbeddings
VectorStore 存储和检索向量 Chroma, similarity_search

文档处理最佳实践

  1. 选择合适的分割器:优先使用 RecursiveCharacterTextSplitter
  2. 合理设置 chunk_size:根据模型限制和应用场景调整,通常 500-1000 字符
  3. 保留 metadata:始终保留来源信息,方便追溯和引用
  4. 测试检索效果:处理完成后务必测试检索质量
  5. 选择合适的嵌入模型:中文内容选择中文优化模型

下一章我们将学习向量数据库与检索,深入了解如何高效存储和检索向量数据,构建完整的 RAG(检索增强生成)系统。