把 RAG 检索服务改造成 Agent:重构过程、持久化选型与踩坑实录

背景:原来是"FastAPI + command 脚本"拼起来的 RAG 检索服务,这次把它重构成了一个会自己决定何时调工具、并且记得住上次聊了什么的 ReAct Agent。真正费劲的不是写循环,而是三件事——想清楚 Agent 循环到底在循环什么、给对话历史选对存储、以及填平不同 LLM 供应商之间的格式坑。


一、背景:这个项目原来是什么

它解决的问题很具体:让 Claude Code 在跨对话的场景下也能查到你自己的文档。前身是一个本地 RAG 服务——用户敲 /rag-retrieve 手动触发检索,或者打开自动模式,让每条 prompt 都先盲检索一遍,再把结果塞进 system prompt 传给 Claude。

老架构长这样:

用户在 Claude Code 里输入 prompt
          ↓
UserPromptSubmit Hook(hook_script.py)
          ↓ POST /retrieve
FastAPI server(server.py)
          ↓
FAISS 向量检索 + BM25 混合评分
          ↓
把检索结果注入 system prompt 区域
          ↓
Claude 看到 [RAG 结果] + [用户 prompt]

每次检索都是一锤子买卖:单次、无状态。要么用户手动敲 /rag-retrieve <问题>,要么自动模式下每条 prompt 都触发一次盲检索。没有规划,没有记忆,也没有"先看看这事要不要查"的判断。

这套设计有两个硬伤。

一是不会规划。用户说"帮我找一下 Redis 缓存穿透的处理方法,然后结合我们的代码库写个方案",这其实是检索加生成两步,得用户自己在脑子里串起来,服务本身串不动。

二是没有跨轮记忆。重启服务或者 /clear 之后,上下文清零,用户得重新交代"上次我们说的那个接口"是哪个。

盲检索本身也是个麻烦。不管你问的是不是文档相关——哪怕只是"帮我把这段代码的缩进改一下"——它都照查不误。检索结果不相关的时候,反而会干扰 Claude。

重构目标因此很明确:把 RAG 操作从"用户手动触发的命令"变成"AI 自己判断要不要调的工具"。该查的时候查,不该查的时候别查,决定权交给 LLM。


二、什么是 Agent(从代码视角看)

这里说的 Agent 不是"会思考的 AI"那种玄学定义,而是工程上一个具体的结构。这几年"Agent"这个词被用滥了,框架人人都自称 Agent 框架,但剥到底,内核没怎么变过。

Agent 就是一个循环,循环体是:感知 → 规划 → 调工具 → 更新记忆。

用户输入
    ↓
加载历史对话(memory)
    ↓
LLM 推理:要回复还是要调工具?
    ├─ 直接回复 → 追加到历史,返回给用户
    └─ 工具调用 → 执行工具 → 追加结果到历史 → 继续推理

它跟老流程"收到请求 → 执行检索 → 返回结果"的根本差别,在于谁来做决策。老流程是用户驱动,新循环是 LLM 驱动。

要判断一个东西是不是真 Agent,有个很省事的问法:它能不能根据当前状态决定下一步?如果下一步是写死的("先检索,再生成,永远这个顺序"),那它是个 pipeline,不是 Agent。

我们实现的是 ReAct 模式:LLM 先推理(Reason),再决定直接回复还是调工具(Act),工具结果追加进上下文后接着推理。核心代码在 agent/loop.py

MAX_TOOL_ROUNDS = 5  # 防止无限循环

async def run(session_id: str, user_input: str) -> str:
    append_message(session_id, "user", user_input)
    history = load_history(session_id)

    for _ in range(MAX_TOOL_ROUNDS):
        response = await _llm.complete(
            [{"role": "system", "content": _SYSTEM_PROMPT}] + history,
            tools=_anthropic_tools(TOOLS),
            tool_choice={"type": "auto"},
        )

        # 纯文本回复 → 结束循环
        if isinstance(response, str):
            append_message(session_id, "assistant", response)
            return response

        # 工具调用 → 执行 → 追加结果 → 继续
        tool_name, tool_args = _extract_tool_call(response, provider_name)
        tool_result = await execute_tool(tool_name, tool_args)
        append_message(session_id, "tool", json.dumps({
            "tool": tool_name, "args": tool_args, "result": tool_result
        }))
        history = load_history(session_id)  # 刷新历史,带上工具结果

    return "已达到最大工具调用轮次,请重新提问。"

(实际代码里 tool_choice 会按 provider 分两种写法,这里为了看清主干省掉了分支,下一节会展开。)

MAX_TOOL_ROUNDS = 5 这道闸不能省。没有它,一旦 LLM 卡进"我还需要更多信息 → 调检索 → 我还需要更多信息"的怪圈,服务就会一直空转。这个数字怎么定,第五节有专门的复盘。

还有个容易忽略的细节:每轮工具调用之后都重新 load_history(session_id)。因为 append_message 刚把工具结果写进了 SQLite,下一轮推理必须带上完整上下文(含工具返回值),LLM 才知道上一步干了什么、拿到了什么。


三、重构过程与关键决策

3.1 工具注册表

第一步是把 RAG 操作抽象成工具。注意,工具不等于函数——工具是带描述的函数,而那段描述才是 LLM 决定何时调用它的依据。

一共定义了 5 个:

TOOLS = [
    {
        "name": "rag_retrieve",
        "description": (
            "Semantically retrieve relevant document chunks from the knowledge base. "
            "Use when the user asks a question that might be answered by ingested documents."
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "The question or search query"}
            },
            "required": ["query"],
        },
    },
    # rag_ingest, rag_status, rag_list_sources, rag_delete_source ...
]

第一版描述只写了 "Search the knowledge base",四个词。结果 LLM 在不少场景下拿不准该不该调,干脆直接回"我不知道"。把描述改成讲清使用场景——"Use when the user asks a question that might be answered by ingested documents"——之后召回率立竿见影地涨了。

这件事后面还会反复出现,所以先把结论摆这儿:LLM 判断要不要调某个工具,几乎全靠这段描述。描述含糊,它就在拿不准时跳过,或者挑错工具。这不是模型笨,是接口没写明白。

3.2 LLM 供应商抽象层

为了能在 OpenAI 和 Anthropic 之间随时切换,llm/ 下放了一层抽象:

# llm/base.py
class LLMProvider(ABC):
    @abstractmethod
    async def complete(self, messages: list[dict], **kwargs) -> str: ...

    @abstractmethod
    async def stream(self, messages: list[dict], **kwargs) -> AsyncIterator[str]: ...

切换供应商,config.yaml 改一行就行:

llm:
  provider: "anthropic"   # 改成 "openai" 即可切换
  model: "claude-sonnet-4-6"
  api_key_env: "ANTHROPIC_API_KEY"

工厂函数负责按名字实例化:

# llm/factory.py
def get_provider(cfg: dict) -> LLMProvider:
    name = cfg.get("provider", "anthropic")
    cls = _REGISTRY.get(name)
    if cls is None:
        raise ValueError(f"Unknown LLM provider: {name!r}. Available: {list(_REGISTRY)}")
    return cls(cfg)

加新供应商两步搞定:写个子类,注册一行。开闭原则在这里落得很自然。

3.3 RAG 操作变成工具调用

重构前,/retrieve 是个同步端点,HTTP 请求直接打过来。重构后,Agent 通过 execute_tool 去调它:

# agent/tools.py
async def execute_tool(name: str, args: dict) -> str:
    async with httpx.AsyncClient(timeout=10.0) as client:
        if name == "rag_retrieve":
            r = await client.post(f"{_BASE}/retrieve", json={"text": args["query"]})
            r.raise_for_status()
            chunks = r.json().get("chunks", [])
            return "\n---\n".join(chunks) if chunks else "No relevant results found."
        # ...

这样 Agent 和 RAG 服务就解耦了。Agent 是个 HTTP 客户端,FastAPI 那头压根不知道 Agent 的存在,现有代码一行都不用改。RAG 服务照样能被 slash command、MCP 工具或者一条 curl 调用——Agent 只是其中一个调用方,不是唯一入口。

比起把 Agent 逻辑硬塞进 FastAPI 路由,"Agent 当 HTTP 客户端"这种分法干净得多:两边各自独立部署、独立测试,测工具函数时把 HTTP 调用 mock 掉就够了,不用真把整个 RAG 服务拉起来。

3.4 顺手把 MCP 服务器也做了

重构 Agent 的同时,把 MCP(Model Context Protocol)接入也一并做了——同一批 RAG 工具,再以 stdio JSON-RPC 暴露一份。用户在 ~/.claude/settings.json 里注册之后,Claude Code 就能在对话里直接调 rag_retrieve,slash command 都省了:

# mcp_server.py(核心部分)
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[types.TextContent]:
    await _ensure_fastapi_running()  # FastAPI 没跑就自动拉起
    dispatch = {
        "rag_retrieve":      lambda: rag_retrieve(**arguments),
        "rag_ingest":        lambda: rag_ingest(**arguments),
        "rag_delete_source": lambda: rag_delete_source(**arguments),
        "rag_status":        lambda: rag_status(),
        "rag_list_sources":  lambda: rag_list_sources(),
    }
    result = await dispatch[name]()
    return [types.TextContent(type="text", text=result)]

MCP 和 Agent 是两条并行的路。MCP 服务于 Claude Code 里"AI 自主调用"的场景,Agent CLI 服务于"独立对话应用"的场景。底层 RAG 服务共用一套,只是触发方式不同。

_ensure_fastapi_running() 值得单拎出来说一句:MCP server 启动时先探一下 FastAPI 在不在,不在就 subprocess.Popen 把它拉起来,最多等 10 秒等它就绪。这样用户配一次 MCP 就够了,打开 Claude Code 直接能用,不用每次先手动跑 ./start.sh


四、Agent 对话持久化怎么选

这是整个重构里我最想展开的一块。

4.1 内存存储为什么不行

会不会持久化,是 Agent 和普通 chatbot 的一道分水岭。chatbot 不在乎上次说了什么,每次都从头开始;Agent 得记住——上次聊到哪、用户有什么偏好、之前那次工具调用返回了什么。

开发阶段图省事,我们先拿一个 Python 字典存历史:

_sessions: dict[str, list[dict]] = {}

本地测的时候它工作得好好的,直到第一次重启服务。进程一重启,对话历史全蒸发。用户又得把背景从头讲一遍。对一个主打"长期记忆"的工具来说,这是要命的缺陷。

还有个隐患是字典没有事务保护。写到一半进程崩了,状态就是不一致的;要是又加了序列化落盘,下次启动很可能读到半截脏数据。项目里原本给 FAISS 索引加 WAL,就是为了治这个病,对话历史一样需要同等的保护。

4.2 为什么选 SQLite

这个项目定位是本地工具,不碰多用户、也没有多进程并发写。这种场景下 SQLite 几乎是默认答案:

  • Python 自带import sqlite3,零额外依赖,不用装、不用起任何服务。
  • ACID 事务:项目早先用 pickle 存 RAG 数据,吃过进程中断导致半写的亏。SQLite 的事务保证写入要么完整要么回滚。
  • WAL 模式:开了 WAL(Write-Ahead Logging),读写不互斥,检索和写对话历史能并发。
  • 结构够简单sessions + messages 两张表,十来行 DDL 收工。
CREATE TABLE sessions (
    id         TEXT PRIMARY KEY,
    created_at INTEGER NOT NULL,
    metadata   TEXT NOT NULL DEFAULT '{}'
);

CREATE TABLE messages (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    session_id TEXT NOT NULL REFERENCES sessions(id) ON DELETE CASCADE,
    role       TEXT NOT NULL,     -- 'user' | 'assistant' | 'tool'
    content    TEXT NOT NULL,
    timestamp  INTEGER NOT NULL
);

初始化时(agent/db.py)顺手开了 WAL 和外键约束:

conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")

ON DELETE CASCADE 保证删会话时消息一起清掉,不留孤儿数据。

4.3 上规模的话怎么选(2026 视角)

本项目选 SQLite 纯粹因为场景简单。如果你的 Agent 要面对多用户、要上云、要扛并发,下面这张表是 2026 年比较主流的几条路:

方案 适用场景 优势 劣势
SQLite 本地单机工具 零依赖、ACID、Python 内置 不支持多进程写
PostgreSQL + pgvector 多用户云端 SaaS 关系数据和向量统一管,Supabase 托管省心 要运维,向量查询要额外配置调优
Redis(热)+ PostgreSQL(冷) 高并发 SaaS 活跃会话全内存,响应极快 架构复杂,冷热同步有延迟
Mem0 需要长期语义记忆 自动摘要压缩历史、按语义检索 偏黑盒,自托管偏麻烦
LangGraph Platform 团队协作 Agent 托管 checkpointer + 可观测性 付费,有 vendor lock-in
Zep(开源) 长窗口 Agent 自动摘要,防 context 溢出 多一个服务要部署运维

一句话给个倾向:本地工具 SQLite,团队级云端上 PostgreSQL,并发起来了再加 Redis 热层,对话长到 context 扛不住了再考虑 Mem0 / Zep。

4.4 决策树

你的 Agent 在哪里跑?
├─ 本地单机(个人工具、CLI)
   └─  SQLite 

└─ 云端 / 多用户
    ├─ DAU < 1000,团队级工具
       └─  PostgreSQL(Supabase 最省心)✅
    
    ├─ DAU > 1000,高并发
       └─  Redis(热)+ PostgreSQL(冷)✅
    
    └─ 需要跨会话语义记忆 / 历史压缩
        └─  Mem0  Zep(按需选) 

五、踩坑记录

坑 1:Anthropic 的 system message 不是消息

把 OpenAI 风格的 messages 数组原封不动丢给 Anthropic SDK,直接报错。

原因是两家对 system 的处理根本不一样。OpenAI 把 system 当成普通消息塞在 messages 数组里({"role": "system", "content": "..."});Anthropic 的 messages.create() 只认 userassistant 两种 role,碰到 role: system 会回一个 400 invalid_request_error——system 必须作为独立的顶层参数传。

解法是在 AnthropicProvider 内部做格式转换,让上层调用方完全无感:

# llm/anthropic_provider.py
def _split_messages(self, messages: list[dict]) -> tuple[str | None, list[dict]]:
    system = next((m["content"] for m in messages if m["role"] == "system"), None)
    turns = [m for m in messages if m["role"] != "system"]
    return system, turns

async def complete(self, messages: list[dict], **kwargs) -> str:
    system, turns = self._split_messages(messages)
    params = dict(model=self._model, max_tokens=4096, messages=turns, **kwargs)
    if system:
        params["system"] = system   # system 单独传
    resp = await self._client.messages.create(**params)
    return resp.content[0].text

抽象层的价值正体现在这儿:供应商之间的脏活在子类里消化掉,agent/loop.py 通篇看不到一行 provider 特定的代码。

坑 2:MAX_TOOL_ROUNDS 到底设几

设 3 的时候,稍微复杂点的任务(先检索 A、再检索 B、最后综合)会被拦腰截断,吐一句"已达到最大轮次"。设 10,又偶尔撞见 LLM 抱着 rag_retrieve 来回换关键词、却没什么实质进展的"原地打转"。

这个数其实没有标准答案,它跟工具描述的质量和任务复杂度绑在一起:描述越准,LLM 的"探索性"调用就越少。

所以处理方式是两头一起收。先把工具描述磨清楚(把"Use when..."写明白),再把上限定在 5。实测下来 5 轮能覆盖 95% 的正常任务("检索 A → 检索 B → 综合回答"撑死 3 轮工具调用),又不至于在边界 case 上空转到死。

# agent/loop.py
MAX_TOOL_ROUNDS = 5

# 工具描述写明使用条件
"description": (
    "Semantically retrieve relevant document chunks from the knowledge base. "
    "Use when the user asks a question that might be answered by ingested documents."
),

坑 3:同一次入库被调了两遍

用户说"帮我把这个文档入库",Agent 偶尔会把 rag_ingest 调两次,结果 chunks 重了一份。

刨根问底,是 rag_ingest 的返回值太含糊。如果只回一个 "ok",LLM 有时拿不准第一次到底成没成,就再补一刀。

两道防线一起上。一是让 rag_ingest 回包含 chunk 数量的明确信息;二是 server.py 本来就有基于内容 hash 的幂等校验——同样的内容调两次,第二次会直接告诉你"内容没变,跳过":

# agent/tools.py — 返回带 chunk 数的明确响应
r = await client.post(f"{_BASE}/ingest", json={"text": text, "source": source})
return r.json().get("message", "Ingestion complete.")

# server.py — 幂等校验(已有逻辑)
if new_hash == existing_hash:
    return {"message": f"来源 '{source}' 内容未变更,跳过入库"}

这里其实是坑 1 那条结论的另一面:工具描述决定 LLM 要不要调,工具返回值决定 LLM 调完之后怎么判断"下一步还要不要做"。回 "ok" 是最坏的设计,LLM 从里头啥也推不出来;回 "入库完成,新增 8 个 chunks,来源标识:auth-v2.md",它一眼就知道成了,不会再补一次。


六、重构前后对比

重构前(脚本 + Hook)

用户 prompt
    ↓
Hook 拦截(Python 脚本,单次)
    ↓ POST /retrieve(一次固定调用)
FastAPI → FAISS → chunks
    ↓
注入 system prompt,发给 Claude

重构后(ReAct Agent)

用户 prompt
    ↓
agent/loop.py(循环,最多 5 轮)
    ├─ LLM 决策:需要检索吗?
    │       ↓ yes
    │   agent/tools.py → POST /retrieve
    │       ↓ 结果追加到历史
    │   LLM 决策:还需要其他工具吗?
    │       ↓ no
    └─ LLM 直接回复
    ↓
agent/memory.py → SQLite(持久化)

能力上的变化:

能力 重构前 重构后
RAG 检索 每次固定触发 按需自主调用
多步任务 用户手动串联 LLM 自动规划
对话记忆 无(进程内临时) SQLite 持久化,重启不丢
供应商切换 没有 LLM 调用能力 改一行配置切 OpenAI/Anthropic
MCP 接入 只有 slash command stdio JSON-RPC,Claude Code 原生工具
查询改写 不支持 三种策略(expansion / HyDE / multi_query)

七、结论与下一步

回头看,这次重构能拎出三条经验。

第一,Agent 的本质是个 LLM 驱动的循环,不是"更聪明的函数"。写代码之前先把循环画清楚——什么时候停、什么时候继续——能省掉一大堆返工。检验标准只有一条:每个节点上,是 LLM 看着当前上下文做决策,还是被写死的控制流推着走。

第二,持久化选型先回答"我的场景是什么",而不是先选技术。本地工具 SQLite 就够(零依赖、ACID、Python 自带),云端多用户上 PostgreSQL,并发起来再加 Redis 热层。别在本地工具上硬怼 Redis,也别在生产 SaaS 里裸用 SQLite——过度设计和欠设计一样坑。

第三,工具的描述和返回值,直接决定 Agent 的表现。写工具时多花十分钟把描述("Use when...")和返回值(带上足够上下文)打磨好,比反复调 MAX_TOOL_ROUNDS、temperature 这些参数管用得多。Agent 的"智力"很大一部分是接口设计出来的,不是调参调出来的。

另外几个常见误区也顺带说一下:

  • "能调 LLM 就是 Agent"。如果流程是 prompt → LLM → 固定后处理 → 输出,那是 LLM pipeline,不是 Agent。循环加自主决策,缺一个都不算。
  • "框架越重越好"。LangChain、AutoGen、CrewAI 都是好东西,但工具就 3-5 个、场景也固定的话,自己写一段 ReAct 循环比抬出一个大框架更可控、更好调。这个项目就是自己写的,核心循环没超过 60 行。
  • "持久化用文件就够了"。JSON 文件能序列化,可它没事务、没并发保护、没查询能力。等你想做"列出所有会话"或者"按时间翻历史",文件方案的天花板立刻撞上来。SQLite 不过多三行初始化,后面省下的调试时间是按小时算的。

接下来想做的几个方向:

  • 接 Mem0:把 SQLite 的对话历史接进 Mem0,做跨会话的语义记忆压缩,治长对话 context 溢出。
  • 流式输出:现在 /agent/chat 是攒齐整段回复再一次性返回,改成 SSE streaming 体验会顺很多。
  • 多 Agent 协作:把这个 RAG Agent 当成一个 sub-agent,挂进更大的任务规划框架(比如 LangGraph 的 multi-agent)。

代码已在 GitHub 开源,欢迎 Star 和 PR。

Comments

No Data
Total 0
  • 1