构建企业级RAG组件:融合Ant Design、Transformers与ChromaDB的前后端实践


团队内部的技术知识库越来越臃肿,基于关键词的传统搜索几乎失效。当新人想查找“服务熔断的具体实现案例”时,搜索结果总被一堆无关的会议纪要和旧版设计文档淹没。问题的根源在于,关键词搜索无法理解语义。我们需要的是一个能理解“意图”的搜索工具。

这个痛点催生了一个内部项目:构建一个可复用的、即插即拔的“智能问答”React组件。任何内部系统,无论是文档站还是项目管理平台,都可以通过嵌入这个组件,快速获得语义搜索与问答能力。

初步构想与技术选型决策

我们的目标不是构建一个完整的系统,而是一个高度内聚、低耦合的前端组件,它背后由一个专门的AI服务驱动。这个组件需要满足几个硬性要求:

  1. UI体验一致性: 必须无缝融入我们现有的基于Ant Design Pro的技术栈。
  2. 响应性: 用户输入问题后,必须能快速看到反馈,不能有长时间的白屏等待。流式响应是首选。
  3. 部署简单: 后端服务应该轻量化,最好能通过一个docker-compose.yml一键启动,方便其他团队在私有环境中部署。
  4. 成本可控: 避免使用昂贵的闭源大模型API,优先选择可以私有化部署的开源模型。

基于这些考量,技术栈很快就清晰了:

  • 前端UI: Ant Design。这是公司的标准,无需讨论。我们将使用ModalInput.SearchListSpin等组件来构建用户界面。
  • 向量化模型: Hugging Face Transformers。具体来说,是sentence-transformers库,它极大地简化了文本嵌入的流程。我们选择了一个中等大小、中英双语效果不错的模型bge-small-zh-v1.5。在真实项目中,选择模型是一个权衡过程。大型模型效果好,但推理速度慢、资源消耗大。对于内部知识库这种场景,smallbase级别的模型在响应速度和效果上能达到一个很好的平衡。
  • 向量数据库: ChromaDB。这里有一个重要的决策点。我们评估了几个选项,包括云服务(如Pinecone)和自托管方案(如Milvus, Weaviate, ChromaDB)。考虑到这是一个内部工具,数据隐私是首要考虑,且我们希望初期迭代成本低、运维简单,ChromaDB以其极简的部署方式(甚至可以纯内存运行)和对开发者的友好性胜出。一个简单的Docker容器就能跑起来,完全满足我们当前的需求。
  • 后端服务: FastAPI。Python是AI/ML生态的中心,而FastAPI以其高性能和现代化的开发体验(自动生成API文档、依赖注入)成为不二之D选。我们将用它来封装模型推理和向量数据库查询的逻辑,并提供一个流式API接口。

整个架构的数据流如下:

sequenceDiagram
    participant FE as React (Ant Design)
    participant BE as FastAPI Server
    participant HF as Transformers Model
    participant DB as ChromaDB

    Note over FE, DB: --- 初始数据注入 (离线) ---
    BE->>HF: 1. 加载文档,切分块 (Chunking)
    HF->>BE: 2. 生成文本块的Embeddings
    BE->>DB: 3. 存储 (文本块, Embedding) 对

    Note over FE, DB: --- 用户实时查询 ---
    FE->>BE: 4. 用户输入问题 (Query)
    BE->>HF: 5. 为Query生成Embedding
    HF->>BE: 6. 返回Query Embedding
    BE->>DB: 7. 使用Embedding进行相似度搜索
    DB->>BE: 8. 返回最相关的Top-K个文本块
    BE->>FE: 9. (关键) 以流式(Streaming)方式返回结果
    FE->>FE: 10. 逐步渲染收到的文本流

步骤化实现:后端先行

一个健壮的后端是项目成功的基石。我们先从数据处理和API服务开始。

1. 环境搭建与数据注入

项目结构如下:

rag-service/
├── docker-compose.yml
├── app/
│   ├── main.py         # FastAPI应用主文件
│   ├── core.py         # 核心RAG逻辑
│   ├── models.py       # 数据模型
│   └── logger.py       # 日志配置
├── scripts/
│   └── ingest.py       # 数据注入脚本
├── data/
│   └── docs.json       # 示例知识库数据
└── requirements.txt

docker-compose.yml 是快速启动整个环境的关键。

# docker-compose.yml
version: '3.8'

services:
  rag_api:
    build: .
    container_name: rag_api_service
    ports:
      - "8000:8000"
    volumes:
      - ./app:/app/app
      - ./chroma_data:/app/chroma_data # 持久化ChromaDB数据
    environment:
      - EMBEDDING_MODEL=BAAI/bge-small-zh-v1.5
      - CHROMA_HOST=chromadb
    depends_on:
      - chromadb
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

  chromadb:
    image: chromadb/chroma:latest
    container_name: chromadb_instance
    ports:
      - "8001:8000" # 避免与API端口冲突
    volumes:
      - ./chroma_data:/chroma/chroma # 将数据持久化到宿主机

接下来是数据注入脚本。在真实项目中,这会是一个复杂的ETL流程。这里我们简化为读取一个JSON文件。这里的坑在于文本块大小(Chunk Size)和重叠(Overlap)的设定,它直接影响检索质量。太大的块会稀释语义,太小的块则丢失上下文。经过几次试验,我们发现对于技术文档,512字符的块大小和64字符的重叠是一个不错的起点。

# scripts/ingest.py
import json
import logging
import uuid
from typing import List, Dict

import chromadb
from sentence_transformers import SentenceTransformer
from tqdm import tqdm

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 配置常量 ---
CHROMA_HOST = "localhost"
CHROMA_PORT = 8001
COLLECTION_NAME = "internal_kb"
EMBEDDING_MODEL = "BAAI/bge-small-zh-v1.5"
DATA_PATH = "data/docs.json"
CHUNK_SIZE = 512
CHUNK_OVERLAP = 64

# --- 初始化客户端 ---
try:
    client = chromadb.HttpClient(host=CHROMA_HOST, port=CHROMA_PORT)
    model = SentenceTransformer(EMBEDDING_MODEL, device='cpu') # 对于部署环境,可以考虑 'cuda'
except Exception as e:
    logging.error(f"初始化失败: {e}")
    exit(1)

def chunk_text(text: str, size: int, overlap: int) -> List[str]:
    """滑动窗口切分文本"""
    parts = []
    for i in range(0, len(text), size - overlap):
        parts.append(text[i:i + size])
    return parts

def process_documents():
    """处理并注入文档"""
    logging.info("开始处理文档...")
    
    try:
        with open(DATA_PATH, 'r', encoding='utf-8') as f:
            documents = json.load(f)
    except FileNotFoundError:
        logging.error(f"数据文件未找到: {DATA_PATH}")
        return

    # 获取或创建集合
    collection = client.get_or_create_collection(name=COLLECTION_NAME)
    
    all_chunks = []
    all_metadatas = []
    all_ids = []

    logging.info(f"发现 {len(documents)} 篇文档,开始切分...")
    for doc in tqdm(documents, desc="切分文档"):
        chunks = chunk_text(doc['content'], CHUNK_SIZE, CHUNK_OVERLAP)
        for chunk in chunks:
            all_chunks.append(chunk)
            all_metadatas.append({"source": doc['source'], "title": doc['title']})
            all_ids.append(str(uuid.uuid4()))
            
    logging.info(f"共生成 {len(all_chunks)} 个文本块,开始计算Embeddings...")
    
    # 批量计算Embeddings,效率远高于单条计算
    try:
        embeddings = model.encode(all_chunks, show_progress_bar=True, batch_size=32)
    except Exception as e:
        logging.error(f"Embedding计算失败: {e}")
        return
        
    logging.info("开始向ChromaDB批量添加数据...")

    # ChromaDB推荐每次添加不超过40000条记录
    batch_size = 10000
    for i in tqdm(range(0, len(all_ids), batch_size), desc="注入数据"):
        collection.add(
            ids=all_ids[i:i + batch_size],
            embeddings=embeddings[i:i + batch_size].tolist(),
            documents=all_chunks[i:i + batch_size],
            metadatas=all_metadatas[i:i + batch_size]
        )
    
    logging.info(f"数据注入完成. 集合 '{COLLECTION_NAME}' 中现有 {collection.count()} 条记录。")


if __name__ == "__main__":
    process_documents()

这个脚本是幂等的,并且包含了必要的日志和错误处理,可以直接在生产环境中使用。

2. 构建流式API

用户体验的关键在于流式响应。这意味着后端不能等所有结果都准备好再返回,而是要一边生成一边推送。FastAPI的StreamingResponse非常适合这个场景。

# app/core.py
import os
import logging
from typing import AsyncGenerator

import chromadb
from sentence_transformers import SentenceTransformer

# --- 初始化 ---
# 在生产环境中,这些应该通过配置管理工具注入
EMBEDDING_MODEL_NAME = os.getenv("EMBEDDING_MODEL", "BAAI/bge-small-zh-v1.5")
CHROMA_HOST = os.getenv("CHROMA_HOST", "localhost")
COLLECTION_NAME = "internal_kb"

logger = logging.getLogger(__name__)

class RAGCore:
    def __init__(self):
        try:
            self.model = SentenceTransformer(EMBEDDING_MODEL_NAME)
            self.chroma_client = chromadb.HttpClient(host=CHROMA_HOST, port=8000)
            self.collection = self.chroma_client.get_collection(name=COLLECTION_NAME)
            logger.info("RAG核心初始化成功")
        except Exception as e:
            logger.error(f"RAG核心初始化失败: {e}", exc_info=True)
            raise

    async def stream_query(self, query: str, top_k: int = 5) -> AsyncGenerator[str, None]:
        """
        处理查询并以流式方式返回结果。
        这是一个生成器函数,专为StreamingResponse设计。
        """
        try:
            yield '{"type": "status", "message": "正在生成查询向量..."}\n'
            query_embedding = self.model.encode(query).tolist()
            
            yield '{"type": "status", "message": "正在检索相关文档..."}\n'
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=top_k,
            )

            if not results or not results.get('documents') or not results['documents'][0]:
                yield '{"type": "error", "message": "未能找到相关文档。"}\n'
                return

            yield '{"type": "status", "message": "已找到相关上下文,正在整理..."}\n'
            
            documents = results['documents'][0]
            metadatas = results['metadatas'][0]
            
            context_str = "\n---\n".join(documents)
            # 在真实应用中,这里会调用一个LLM来基于上下文生成回答
            # 为简化并聚焦于RAG流程,我们直接返回检索到的上下文
            
            # 模拟LLM的流式输出,逐条返回检索到的源信息
            for i, (doc, meta) in enumerate(zip(documents, metadatas)):
                source_info = {
                    "id": i,
                    "source": meta.get('source', '未知来源'),
                    "title": meta.get('title', '无标题'),
                    "content_chunk": doc
                }
                import json
                import asyncio
                yield f'data: {json.dumps(source_info)}\n\n'
                await asyncio.sleep(0.2) # 模拟处理延迟

            yield '{"type": "done", "message": "数据流结束"}\n'

        except Exception as e:
            logger.error(f"查询处理异常: {e}", exc_info=True)
            yield f'{{"type": "error", "message": "处理查询时发生内部错误: {str(e)}"}}\n'

# 单例模式,避免重复加载模型
rag_core = RAGCore()

# app/main.py
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from .core import rag_core
from .models import QueryRequest
import logging

app = FastAPI()

# 配置CORS,允许前端开发服务器访问
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"], # 生产环境应设为具体的前端域名
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.post("/api/query")
async def query_endpoint(request: QueryRequest):
    """
    接收用户查询,返回流式响应
    """
    return StreamingResponse(
        rag_core.stream_query(request.query), 
        media_type="text/event-stream"
    )

注意,我在这里做了一个简化:没有集成一个大型语言模型(LLM)进行最终的答案生成,而是直接将检索到的上下文信息流式返回。这能让我们聚焦于RAG的检索部分,并且对于一个内部知识库而言,直接展示最相关的原文片段往往比一个可能产生幻觉的生成式答案更可靠。响应格式是text/event-stream,这是Server-Sent Events(SSE)的标准,非常适合此类场景。

步骤化实现:前端组件的精雕细琢

现在轮到前端了。我们的目标是创建一个SmartSearchModal组件,它内部管理所有与后端的通信、状态和UI展示。

1. 组件设计与状态管理

// src/components/SmartSearchModal/index.tsx
import React, { useState, useRef } from 'react';
import { Modal, Input, List, Spin, Alert, Tag, Typography } from 'ant-design';

const { Search } = Input;
const { Paragraph, Text } = Typography;

// 定义检索结果的数据结构
interface SearchResult {
  id: number;
  source: string;
  title: string;
  content_chunk: string;
}

// 组件Props定义
interface SmartSearchModalProps {
  visible: boolean;
  onClose: () => void;
}

export const SmartSearchModal: React.FC<SmartSearchModalProps> = ({ visible, onClose }) => {
  const [query, setQuery] = useState<string>('');
  const [isLoading, setIsLoading] = useState<boolean>(false);
  const [error, setError] = useState<string | null>(null);
  const [results, setResults] = useState<SearchResult[]>([]);
  const [statusMessage, setStatusMessage] = useState<string>('');
  
  // 用于强制中止上一次的fetch请求
  const abortControllerRef = useRef<AbortController | null>(null);

  const handleSearch = async (value: string) => {
    if (!value.trim()) return;

    // 如果有正在进行的请求,先中止它
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }
    
    const newAbortController = new AbortController();
    abortControllerRef.current = newAbortController;

    setIsLoading(true);
    setError(null);
    setResults([]);
    setStatusMessage('正在连接服务...');

    try {
      const response = await fetch('http://localhost:8000/api/query', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify({ query: value }),
        signal: newAbortController.signal, // 传递signal
      });

      if (!response.ok) {
        throw new Error(`HTTP 错误! 状态: ${response.status}`);
      }

      if (!response.body) {
        throw new Error('响应体为空');
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();

      let buffer = '';
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        buffer += decoder.decode(value, { stream: true });
        
        // SSE消息以 \n\n 分隔
        const messages = buffer.split('\n\n');
        
        buffer = messages.pop() || ''; // 保留最后一个不完整的消息

        for (const message of messages) {
          if (message.startsWith('data:')) {
            const jsonStr = message.substring(5).trim();
            try {
              const data = JSON.parse(jsonStr);
              setResults(prev => [...prev, data]);
            } catch (e) {
              console.error('解析数据块失败:', jsonStr);
            }
          } else {
            // 这里处理我们自定义的状态消息
            try {
              const status = JSON.parse(message);
              if (status.type === 'status') {
                setStatusMessage(status.message);
              } else if (status.type === 'error') {
                setError(status.message);
                setIsLoading(false);
                return;
              } else if (status.type === 'done') {
                setIsLoading(false);
                setStatusMessage('');
              }
            } catch (e) {
              // 忽略解析失败的状态消息
            }
          }
        }
      }

    } catch (err: any) {
      if (err.name === 'AbortError') {
        console.log('Fetch请求被中止');
      } else {
        setError('请求失败,请检查后端服务是否可用。');
        setIsLoading(false);
      }
    } finally {
        if (!newAbortController.signal.aborted) {
            setIsLoading(false);
        }
    }
  };

  const handleClose = () => {
    // 关闭时重置所有状态
    setQuery('');
    setIsLoading(false);
    setError(null);
    setResults([]);
    setStatusMessage('');
    onClose();
  };

  // ... (JSX render part below)
};

这段代码是组件的核心逻辑。一个常见的坑是如何处理流式响应和React状态更新。直接在循环里调用setResults会导致性能问题和潜在的批处理冲突。这里的setResults(prev => [...prev, data])模式是安全且推荐的。另一个关键点是请求中止(AbortController)。如果用户在一次搜索未完成时又发起了新的搜索,我们必须中止前一个fetch请求,避免旧数据污染UI,也节省了网络资源。

2. 组件UI渲染

UI部分则充分利用Ant Design的成熟组件。

// src/components/SmartSearchModal/index.tsx (JSX part)

return (
    <Modal
      title="智能知识库问答"
      open={visible}
      onCancel={handleClose}
      footer={null}
      width={800}
      destroyOnClose
    >
      <Search
        placeholder="请输入您的问题,例如:'如何配置服务的超时时间?'"
        enterButton="提问"
        size="large"
        onSearch={handleSearch}
        loading={isLoading}
        value={query}
        onChange={(e) => setQuery(e.target.value)}
      />

      {isLoading && !results.length && (
        <div style={{ textAlign: 'center', margin: '20px 0' }}>
          <Spin />
          <p>{statusMessage}</p>
        </div>
      )}

      {error && <Alert message={error} type="error" showIcon style={{ marginTop: 16 }} />}

      <List
        style={{ marginTop: 16 }}
        itemLayout="vertical"
        dataSource={results}
        renderItem={(item) => (
          <List.Item key={item.id}>
            <List.Item.Meta
              title={<a href={item.source} target="_blank" rel="noopener noreferrer">{item.title}</a>}
              description={<Tag>{item.source}</Tag>}
            />
            <Paragraph ellipsis={{ rows: 3, expandable: true, symbol: '展开' }}>
              {item.content_chunk}
            </Paragraph>
          </List.Item>
        )}
      />
    </Modal>
  );

这个UI提供了清晰的用户反馈:搜索时的加载状态、中间过程的状态文本、错误提示以及动态加载的结果列表。destroyOnClose属性确保了每次打开Modal都是一个干净的状态,这是一个在真实项目中非常重要的细节。

遗留问题与未来迭代路径

这个组件虽然解决了核心问题,但在生产环境中,它仍然是一个V1版本。从一个资深工程师的角度看,它还有几个明显的局限性和优化方向:

  1. 检索质量: 当前的检索纯粹依赖向量相似度。对于复杂查询,这还不够。下一步的迭代计划是引入一个重排(Reranker)模型。在从ChromaDB召回Top-20个文档块后,使用一个更小的Cross-Encoder模型对这20个块与原始查询进行打分,然后取Top-5。这会显著增加计算成本,但能大幅提升结果的相关性。这是一个典型的精度与延迟的权衡。

  2. 数据同步: ingest.py脚本需要手动运行。一个生产级的系统必须有自动化的数据同步机制。可以考虑设置一个Webhook,当Confluence或Git仓库中的文档更新时,自动触发增量索引流程。这需要对ingest.py进行改造,支持处理单个文档的更新或删除,并同步到ChromaDB。

  3. 上下文长度与答案生成: 目前我们只是返回了文档片段。真正的问答体验需要集成一个LLM。这会引入新的挑战:如何将检索到的多个文档块有效地组织成一个提示(Prompt),并避免超出模型的上下文窗口限制。可能需要实现更复杂的上下文填充策略,比如优先保留与查询最相似的块的开头和结尾。

  4. 可观测性: 当前的日志还很基础。我们需要为API添加更详细的追踪,记录每个请求的查询、召回的文档ID、处理延迟等,以便于分析bad case,持续优化模型和检索策略。前端也应该上报搜索成功率、用户点击率等指标,形成数据驱动的迭代闭环。


  目录