构建完整的 RAG 流水线

把所有东西串起来

前面几章分别讲了 Embedding、向量数据库、分块和检索。现在把它们组合成一个完整的 RAG 系统。

整体架构

RAG 系统分两条流水线:

离线:文档摄入流水线

原始文档 → 解析 → 分块 → Embedding → 存入向量数据库

这条线在数据准备阶段运行,用户查询之前就完成了。

在线:查询流水线

用户问题 → Embedding → 检索 → 重排序 → 构建 Prompt → LLM 生成 → 回答

这条线在用户提问时实时运行。

完整代码示例

以下是一个用 Python 实现的完整 RAG 系统(使用 Chroma + OpenAI):

文档摄入

import chromadb
from openai import OpenAI
import os

client = OpenAI()
chroma = chromadb.PersistentClient(path="./chroma_db")
collection = chroma.get_or_create_collection("knowledge_base")

def load_and_chunk(file_path, chunk_size=500, overlap=50):
    """加载文件并分块"""
    with open(file_path, "r") as f:
        text = f.read()

    chunks = []
    start = 0
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunks.append({
            "text": text[start:end],
            "source": os.path.basename(file_path),
        })
        start = end - overlap
    return chunks

def embed_text(text):
    """获取文本的 Embedding"""
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text
    )
    return response.data[0].embedding

def ingest(directory):
    """摄入目录下的所有文档"""
    for filename in os.listdir(directory):
        if not filename.endswith(".md"):
            continue
        filepath = os.path.join(directory, filename)
        chunks = load_and_chunk(filepath)

        for i, chunk in enumerate(chunks):
            doc_id = f"{filename}_{i}"
            collection.add(
                ids=[doc_id],
                documents=[chunk["text"]],
                embeddings=[embed_text(chunk["text"])],
                metadatas=[{"source": chunk["source"]}]
            )

    print(f"已摄入 {collection.count()} 个文档块")

# 运行摄入
ingest("./knowledge_base/")

查询与生成

def retrieve(query, top_k=5):
    """检索最相关的文档块"""
    query_embedding = embed_text(query)
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=top_k
    )
    return results

def build_prompt(query, context_docs):
    """构建 RAG Prompt"""
    context = "\n\n---\n\n".join(context_docs)

    return f"""根据以下参考资料回答用户的问题。

规则:
1. 只使用参考资料中的信息来回答
2. 如果参考资料中没有相关信息,明确说"根据现有资料,我无法回答这个问题"
3. 引用信息来源

参考资料:
{context}

用户问题:{query}"""

def ask(query):
    """完整的 RAG 查询流程"""
    # 1. 检索
    results = retrieve(query)
    docs = results["documents"][0]
    sources = results["metadatas"][0]

    # 2. 构建 Prompt
    prompt = build_prompt(query, docs)

    # 3. 生成
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "你是一个基于知识库回答问题的助手。"},
            {"role": "user", "content": prompt}
        ],
        temperature=0.3
    )

    answer = response.choices[0].message.content

    # 4. 附上来源
    source_list = list(set(s["source"] for s in sources))
    return f"{answer}\n\n📎 来源:{', '.join(source_list)}"

# 使用
print(ask("退款需要多长时间?"))

RAG Prompt 设计要点

Prompt 是 RAG 系统中经常被忽略但非常重要的环节。

处理"找不到信息"

如果参考资料中没有足够的信息来回答问题,
请说"根据现有资料无法确定",不要编造答案。

没有这条指令,模型可能会忽略参考资料,用自己的知识回答——这就失去了 RAG 的意义。

引用来源

每个关键信息后标注来源,格式:[来源:文件名]

这让用户可以验证答案的准确性。

处理矛盾信息

如果不同资料中的信息相互矛盾,请指出矛盾,
并说明每个资料的说法,由用户判断。

常见问题与调试

问题 1:检索到了不相关的文档

可能原因:

  • 分块太大,每块包含多个主题
  • Embedding 模型选择不当
  • 缺少元数据过滤

解决:减小块大小、换 Embedding 模型、添加元数据过滤。

问题 2:答案忽略了参考资料

可能原因:

  • Prompt 没有强调"只用参考资料回答"
  • 检索到的文档和问题相关度太低
  • 模型倾向于用自己的知识

解决:强化 Prompt 中"只用参考资料"的指令,提高检索质量。

问题 3:答案太笼统

可能原因:

  • 检索到的文档块缺少细节
  • 分块太小,上下文不够

解决:增大块大小或使用"小块检索、大块返回"策略。

问题 4:延迟太高

RAG 的延迟 = Embedding 时间 + 检索时间 + LLM 生成时间。

优化方向:

  • 用更小的 Embedding 模型
  • 减少 Top-K
  • 用流式输出改善体感
  • 缓存热门查询的结果

框架选择

你可以像上面一样自己写,也可以用现成的框架:

框架特点
LangChain生态最完善,组件丰富,学习曲线较陡
LlamaIndex专注于 RAG 场景,数据连接器丰富
Haystack生产级框架,管道设计清晰
自己写最灵活,完全可控,适合简单场景或深度定制

建议:先自己实现一个最小 RAG 来理解原理,然后根据需要决定是否引入框架。 很多场景下,几十行代码的自定义实现比引入一个大框架更合适。

要点总结

  1. RAG 系统 = 离线摄入 + 在线查询两条流水线。 摄入处理文档,查询服务用户。
  2. Prompt 设计是 RAG 效果的关键环节。 明确要求模型"只用参考资料回答",处理好"找不到信息"的情况。
  3. 从简单开始,按需优化。 先跑通基本流程,再根据实际问题(检索不准、答案太泛等)针对性改进。
  4. 先自己写,再考虑框架。 理解原理比学框架 API 更重要。