构建完整的 RAG 流水线
把所有东西串起来
前面几章分别讲了 Embedding、向量数据库、分块和检索。现在把它们组合成一个完整的 RAG 系统。
整体架构
RAG 系统分两条流水线:
离线:文档摄入流水线
原始文档 → 解析 → 分块 → Embedding → 存入向量数据库
这条线在数据准备阶段运行,用户查询之前就完成了。
在线:查询流水线
用户问题 → Embedding → 检索 → 重排序 → 构建 Prompt → LLM 生成 → 回答
这条线在用户提问时实时运行。
完整代码示例
以下是一个用 Python 实现的完整 RAG 系统(使用 Chroma + OpenAI):
文档摄入
import chromadb
from openai import OpenAI
import os
client = OpenAI()
chroma = chromadb.PersistentClient(path="./chroma_db")
collection = chroma.get_or_create_collection("knowledge_base")
def load_and_chunk(file_path, chunk_size=500, overlap=50):
"""加载文件并分块"""
with open(file_path, "r") as f:
text = f.read()
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunks.append({
"text": text[start:end],
"source": os.path.basename(file_path),
})
start = end - overlap
return chunks
def embed_text(text):
"""获取文本的 Embedding"""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def ingest(directory):
"""摄入目录下的所有文档"""
for filename in os.listdir(directory):
if not filename.endswith(".md"):
continue
filepath = os.path.join(directory, filename)
chunks = load_and_chunk(filepath)
for i, chunk in enumerate(chunks):
doc_id = f"{filename}_{i}"
collection.add(
ids=[doc_id],
documents=[chunk["text"]],
embeddings=[embed_text(chunk["text"])],
metadatas=[{"source": chunk["source"]}]
)
print(f"已摄入 {collection.count()} 个文档块")
# 运行摄入
ingest("./knowledge_base/")
查询与生成
def retrieve(query, top_k=5):
"""检索最相关的文档块"""
query_embedding = embed_text(query)
results = collection.query(
query_embeddings=[query_embedding],
n_results=top_k
)
return results
def build_prompt(query, context_docs):
"""构建 RAG Prompt"""
context = "\n\n---\n\n".join(context_docs)
return f"""根据以下参考资料回答用户的问题。
规则:
1. 只使用参考资料中的信息来回答
2. 如果参考资料中没有相关信息,明确说"根据现有资料,我无法回答这个问题"
3. 引用信息来源
参考资料:
{context}
用户问题:{query}"""
def ask(query):
"""完整的 RAG 查询流程"""
# 1. 检索
results = retrieve(query)
docs = results["documents"][0]
sources = results["metadatas"][0]
# 2. 构建 Prompt
prompt = build_prompt(query, docs)
# 3. 生成
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "你是一个基于知识库回答问题的助手。"},
{"role": "user", "content": prompt}
],
temperature=0.3
)
answer = response.choices[0].message.content
# 4. 附上来源
source_list = list(set(s["source"] for s in sources))
return f"{answer}\n\n📎 来源:{', '.join(source_list)}"
# 使用
print(ask("退款需要多长时间?"))
RAG Prompt 设计要点
Prompt 是 RAG 系统中经常被忽略但非常重要的环节。
处理"找不到信息"
如果参考资料中没有足够的信息来回答问题,
请说"根据现有资料无法确定",不要编造答案。
没有这条指令,模型可能会忽略参考资料,用自己的知识回答——这就失去了 RAG 的意义。
引用来源
每个关键信息后标注来源,格式:[来源:文件名]
这让用户可以验证答案的准确性。
处理矛盾信息
如果不同资料中的信息相互矛盾,请指出矛盾,
并说明每个资料的说法,由用户判断。
常见问题与调试
问题 1:检索到了不相关的文档
可能原因:
- 分块太大,每块包含多个主题
- Embedding 模型选择不当
- 缺少元数据过滤
解决:减小块大小、换 Embedding 模型、添加元数据过滤。
问题 2:答案忽略了参考资料
可能原因:
- Prompt 没有强调"只用参考资料回答"
- 检索到的文档和问题相关度太低
- 模型倾向于用自己的知识
解决:强化 Prompt 中"只用参考资料"的指令,提高检索质量。
问题 3:答案太笼统
可能原因:
- 检索到的文档块缺少细节
- 分块太小,上下文不够
解决:增大块大小或使用"小块检索、大块返回"策略。
问题 4:延迟太高
RAG 的延迟 = Embedding 时间 + 检索时间 + LLM 生成时间。
优化方向:
- 用更小的 Embedding 模型
- 减少 Top-K
- 用流式输出改善体感
- 缓存热门查询的结果
框架选择
你可以像上面一样自己写,也可以用现成的框架:
| 框架 | 特点 |
|---|---|
| LangChain | 生态最完善,组件丰富,学习曲线较陡 |
| LlamaIndex | 专注于 RAG 场景,数据连接器丰富 |
| Haystack | 生产级框架,管道设计清晰 |
| 自己写 | 最灵活,完全可控,适合简单场景或深度定制 |
建议:先自己实现一个最小 RAG 来理解原理,然后根据需要决定是否引入框架。 很多场景下,几十行代码的自定义实现比引入一个大框架更合适。
要点总结
- RAG 系统 = 离线摄入 + 在线查询两条流水线。 摄入处理文档,查询服务用户。
- Prompt 设计是 RAG 效果的关键环节。 明确要求模型"只用参考资料回答",处理好"找不到信息"的情况。
- 从简单开始,按需优化。 先跑通基本流程,再根据实际问题(检索不准、答案太泛等)针对性改进。
- 先自己写,再考虑框架。 理解原理比学框架 API 更重要。