构建连接Google Cloud Pub/Sub与Azure的OpenSearch及Qdrant双写数据管道


核心挑战在于设计一个具备韧性的、低延迟的跨云数据摄取系统。该系统需要稳定消费来自 Google Cloud Pub/Sub 主题的事件流,并将其可靠地写入两个托管在 Azure 环境中的、异构的索引引擎:一个用于全文与结构化搜索的 OpenSearch 集群,以及一个用于向量相似度检索的 Qdrant 集合。这种双写模式是构建多模态搜索系统的基础,但它也引入了数据一致性、故障隔离和跨云网络延迟的复杂性。

架构决策:从紧耦合到扇出模式

在真实的工程实践中,我们很少会一次性就得到完美的架构。通常是从一个看似直接的方案开始,在识别出其内在的脆弱性后,逐步演进到一个更健壮的设计。

方案 A:Azure Function 直接双写(紧耦合)

初步构想通常是最直接的。我们可以创建一个由 Pub/Sub 推送触发的 Azure Function。每当新消息到达,函数被激活,其内部逻辑按顺序执行两个操作:

  1. 连接 OpenSearch 并索引文档。
  2. 连接 Qdrant 并上报向量。
graph TD
    A[Google Cloud Pub/Sub] -- Push Subscription --> B{Azure Function};
    B -- 1. Index Document --> C[OpenSearch on Azure];
    B -- 2. Upsert Vector --> D[Qdrant on Azure];

这个方案的优点是实现简单,部署快速。但在生产环境中,它的缺陷是致命的:

  • 单点故障: 如果 Qdrant 的写入操作因为网络抖动或服务过载而失败,整个函数执行会失败。这意味着即使 OpenSearch 的写入已经成功,Pub/Sub 也会因为没有收到成功的确认(ACK)而重试投递该消息。这会导致数据在 OpenSearch 中重复。
  • 性能瓶颈: 两个写入操作是串行的。系统的总延迟是 T_network + T_opensearch + T_qdrant。任何一个下游系统的抖动都会直接影响整个管道的吞吐量。
  • 缺乏隔离: OpenSearch 的写入失败会影响到 Qdrant,反之亦然。我们无法对其中一个系统的故障进行独立处理,例如,如果 OpenSearch 集群正在进行滚动升级导致写入暂时不可用,我们不希望这中断向 Qdrant 的数据流入。

方案 B:解耦的消费者与扇出式写入器(最终选择)

一个更成熟的架构是引入一个解耦层。我们不再依赖 Function 的触发模型,而是部署一个或多个常驻的消费者服务(例如,在 Azure Kubernetes Service 或 Azure Container Apps 上)。这个服务作为管道的核心,负责从 Pub/Sub 拉取消息,然后将写入任务分发给专门的、并行的写入器。

graph TD
    subgraph GCP
        A[Pub/Sub Topic]
    end

    subgraph Azure
        B[Ingestion Service on AKS/ACA]
        C[OpenSearch Sink]
        D[Qdrant Sink]
        E[Dead Letter Queue]

        B -- Pull Messages --> A;
        B -- Parse & Validate --> B;
        B -- Async Dispatch --> C;
        B -- Async Dispatch --> D;
        C -- Write Failure --> E;
        D -- Write Failure --> E;
    end

    C -- Index --> F[OpenSearch Cluster];
    D -- Upsert --> G[Qdrant Cluster];

此方案的核心优势在于:

  • 故障隔离: 向 OpenSearch 和 Qdrant 的写入是并行且独立的操作。其中一个写入器的失败不会影响另一个。例如,如果 Qdrant 写入失败,我们可以将其放入一个死信队列(Dead Letter Queue)进行后续处理,同时正常地向 Pub/Sub 发送 ACK,只要 OpenSearch 的写入是成功的(或根据业务定义的主要路径是成功的)。
  • 性能提升: 通过异步和并行处理,总延迟取决于最慢的那个写入操作,即 T_network + max(T_opensearch, T_qdrant),而不是两者之和。这在高吞吐量场景下至关重要。
  • 可维护性与可扩展性: 每个 Sink(写入器)都可以作为独立的模块进行开发、测试和优化。如果未来需要增加第三个数据目的地(如一个时序数据库),我们只需增加一个新的 Sink 模块,而无需改动核心的消费逻辑。消费者服务本身也可以通过增加实例数量来水平扩展,以匹配 Pub/Sub 的流量。

基于对生产环境稳定性和可扩展性的考量,我们最终选择了方案 B。

核心实现:基于 Python 和 asyncio 的消费者服务

我们将使用 Python 构建这个消费者服务,因为它拥有出色的云原生库支持和强大的异步处理能力。

项目结构

一个清晰的项目结构是可维护性的基础。

data-pipeline-service/
├── app/
│   ├── __init__.py
│   ├── config.py           # 配置管理 (环境变量)
│   ├── main.py             # 服务入口, Pub/Sub 监听器
│   ├── models.py           # 数据模型 (Pydantic)
│   └── sinks/
│       ├── __init__.py
│       ├── base.py         # 抽象 Sink 基类
│       ├── opensearch_sink.py # OpenSearch 写入逻辑
│       └── qdrant_sink.py    # Qdrant 写入逻辑
├── requirements.txt
└── Dockerfile

1. 配置管理 (app/config.py)

在真实项目中,配置绝不能硬编码。使用环境变量是标准实践,它允许我们在不同环境(开发、测试、生产)中灵活部署。

# app/config.py
import os
from dataclasses import dataclass
from dotenv import load_dotenv

load_dotenv() # 加载 .env 文件用于本地开发

@dataclass(frozen=True)
class Settings:
    # Google Cloud Pub/Sub 配置
    gcp_project_id: str = os.environ["GCP_PROJECT_ID"]
    pubsub_subscription_id: str = os.environ["PUBSUB_SUBSCRIPTION_ID"]

    # OpenSearch 配置
    opensearch_host: str = os.environ["OPENSEARCH_HOST"]
    opensearch_port: int = int(os.environ.get("OPENSEARCH_PORT", 9200))
    opensearch_user: str = os.environ["OPENSEARCH_USER"]
    opensearch_password: str = os.environ["OPENSEARCH_PASSWORD"]
    opensearch_index_name: str = os.environ["OPENSEARCH_INDEX_NAME"]

    # Qdrant 配置
    qdrant_host: str = os.environ["QDRANT_HOST"]
    qdrant_port: int = int(os.environ.get("QDRANT_PORT", 6333))
    qdrant_api_key: str = os.environ.get("QDRANT_API_KEY")
    qdrant_collection_name: str = os.environ["QDRANT_COLLECTION_NAME"]
    
    # 服务运行时配置
    max_messages: int = int(os.environ.get("MAX_MESSAGES", 10))
    max_wait_seconds: int = int(os.environ.get("MAX_WAIT_SECONDS", 5))

def get_settings() -> Settings:
    return Settings()

settings = get_settings()

2. 数据模型 (app/models.py)

使用 Pydantic 定义清晰的数据模型,可以为我们提供免费的数据校验、转换和文档化。

# app/models.py
from typing import List, Optional
from uuid import UUID
from pydantic import BaseModel, Field

class ProductPayload(BaseModel):
    """从 Pub/Sub 接收的原始事件负载"""
    product_id: UUID = Field(..., description="产品的唯一标识符")
    product_name: str = Field(..., description="产品名称")
    description: str = Field(..., description="产品详细描述")
    tags: List[str] = Field(default_factory=list, description="产品标签")
    embedding: List[float] = Field(..., description="产品描述的向量表示")

    class Config:
        # Pydantic 2.x 中使用 frozen=True, 1.x 中是 allow_mutation = False
        frozen = True

class OpenSearchDocument(BaseModel):
    """为 OpenSearch 准备的文档结构"""
    product_name: str
    description: str
    tags: List[str]

class QdrantPoint(BaseModel):
    """为 Qdrant 准备的数据点结构"""
    id: str # Qdrant 要求 id 是 string, integer or UUID
    vector: List[float]
    payload: Optional[dict] = None

3. Sink 实现 (app/sinks/*.py)

每个 Sink 负责与一个下游系统通信。它们应该是无状态的,并且处理所有与连接、数据格式化和错误处理相关的细节。

# app/sinks/opensearch_sink.py
import logging
from opensearchpy import OpenSearch, helpers
from app.config import settings
from app.models import ProductPayload, OpenSearchDocument

logger = logging.getLogger(__name__)

class OpenSearchSink:
    def __init__(self):
        try:
            self.client = OpenSearch(
                hosts=[{'host': settings.opensearch_host, 'port': settings.opensearch_port}],
                http_auth=(settings.opensearch_user, settings.opensearch_password),
                use_ssl=True,
                verify_certs=True,
                ssl_assert_hostname=False,
                ssl_show_warn=False,
            )
            logger.info("OpenSearch client initialized successfully.")
        except Exception as e:
            logger.critical(f"Failed to initialize OpenSearch client: {e}", exc_info=True)
            raise

    async def write(self, message: ProductPayload) -> bool:
        """
        将单个消息写入 OpenSearch。
        这里的关键是使用 product_id 作为文档的 _id,这确保了写入的幂等性。
        如果消息被重试,它只会覆盖现有文档,而不会创建重复项。
        """
        doc_id = str(message.product_id)
        document = OpenSearchDocument(
            product_name=message.product_name,
            description=message.description,
            tags=message.tags
        ).model_dump()

        try:
            self.client.index(
                index=settings.opensearch_index_name,
                body=document,
                id=doc_id,
                refresh="wait_for" # 在生产中可能希望设置为 false 或 true
            )
            logger.debug(f"Successfully indexed document {doc_id} to OpenSearch.")
            return True
        except Exception as e:
            logger.error(f"Failed to index document {doc_id} to OpenSearch: {e}", exc_info=True)
            return False

# app/sinks/qdrant_sink.py
import logging
from qdrant_client import QdrantClient, models
from app.config import settings
from app.models import ProductPayload

logger = logging.getLogger(__name__)

class QdrantSink:
    def __init__(self):
        try:
            self.client = QdrantClient(
                host=settings.qdrant_host, 
                port=settings.qdrant_port,
                api_key=settings.qdrant_api_key,
                https=True # 假设生产环境使用 TLS
            )
            logger.info("Qdrant client initialized successfully.")
        except Exception as e:
            logger.critical(f"Failed to initialize Qdrant client: {e}", exc_info=True)
            raise

    async def write(self, message: ProductPayload) -> bool:
        """
        将单个消息的向量和元数据写入 Qdrant。
        同样,使用 product_id 作为 Point ID 来保证幂等性。
        """
        point_id = str(message.product_id)
        payload = {"product_name": message.product_name}

        try:
            self.client.upsert(
                collection_name=settings.qdrant_collection_name,
                points=[
                    models.PointStruct(
                        id=point_id,
                        vector=message.embedding,
                        payload=payload
                    )
                ],
                wait=True
            )
            logger.debug(f"Successfully upserted point {point_id} to Qdrant.")
            return True
        except Exception as e:
            logger.error(f"Failed to upsert point {point_id} to Qdrant: {e}", exc_info=True)
            return False

4. 主消费逻辑 (app/main.py)

这是将所有部分粘合在一起的地方。它负责监听 Pub/Sub,解析消息,并异步地将任务分派给 Sinks。

# app/main.py
import asyncio
import logging
import json
from uuid import UUID
from concurrent.futures import TimeoutError
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1.subscriber.message import Message
from pydantic import ValidationError

from app.config import settings
from app.models import ProductPayload
from app.sinks.opensearch_sink import OpenSearchSink
from app.sinks.qdrant_sink import QdrantSink

# 配置结构化日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

class PubSubConsumer:
    def __init__(self, os_sink: OpenSearchSink, qdrant_sink: QdrantSink):
        self.subscriber = SubscriberClient()
        self.subscription_path = self.subscriber.subscription_path(
            settings.gcp_project_id, settings.pubsub_subscription_id
        )
        self.os_sink = os_sink
        self.qdrant_sink = qdrant_sink
        logger.info(f"Consumer configured for subscription: {self.subscription_path}")

    async def process_message(self, message: Message) -> None:
        """
        核心处理逻辑,包含解析、扇出写入和确认/拒绝。
        """
        message_id = message.message_id
        try:
            data = json.loads(message.data.decode("utf-8"))
            payload = ProductPayload(**data)
            logger.info(f"Received message {message_id} for product {payload.product_id}")

            # 并行执行对 OpenSearch 和 Qdrant 的写入
            # asyncio.gather 允许我们并发运行多个 awaitable 对象
            results = await asyncio.gather(
                self.os_sink.write(payload),
                self.qdrant_sink.write(payload),
                return_exceptions=True # 关键:捕获异常而不是让 gather 失败
            )

            os_success = results[0] is True
            qdrant_success = results[1] is True

            # 错误处理与确认逻辑
            # 在真实项目中,这里的策略需要仔细定义。
            # 例如,如果主要依赖全文搜索,可以容忍 Qdrant 的暂时性失败。
            if os_success and qdrant_success:
                logger.info(f"Message {message_id} successfully processed by both sinks.")
                message.ack()
            elif os_success and not qdrant_success:
                logger.warning(f"Message {message_id} processed by OpenSearch, but failed for Qdrant. Acknowledging message. Failure reason: {results[1]}")
                # 业务决策:主路径成功,消息依然确认,失败的写入需要其他补偿机制(如死信队列日志报警)
                message.ack()
            else:
                # OpenSearch 写入失败是关键路径失败,必须重试。
                logger.error(f"Critical path failure: OpenSearch write failed for message {message_id}. Nacking message for retry. Reason: {results[0]}")
                message.nack()

        except ValidationError as e:
            logger.error(f"Data validation failed for message {message_id}: {e}. Acknowledging to avoid retry loop.", exc_info=True)
            message.ack() # 确认消息,防止毒丸消息(Poison Pill)阻塞队列
        except Exception as e:
            logger.error(f"An unexpected error occurred processing message {message_id}: {e}. Nacking for retry.", exc_info=True)
            message.nack()

    def start(self):
        loop = asyncio.get_event_loop()
        
        def callback_wrapper(message: Message):
            # Pub/Sub Python 客户端的 callback 是同步的,我们需要一个桥梁来调用我们的 async 方法
            asyncio.run_coroutine_threadsafe(self.process_message(message), loop)

        streaming_pull_future = self.subscriber.subscribe(
            self.subscription_path, callback=callback_wrapper
        )
        logger.info(f"Listening for messages on {self.subscription_path}...")

        try:
            # 在一个 event loop 中运行,直到程序被中断
            loop.run_forever()
        except KeyboardInterrupt:
            streaming_pull_future.cancel()
            streaming_pull_future.result(timeout=30)
            loop.close()
        finally:
            self.subscriber.close()
            logger.info("Pub/Sub subscriber shut down.")


if __name__ == "__main__":
    # 初始化 Sinks
    opensearch_sink = OpenSearchSink()
    qdrant_sink = QdrantSink()

    # 启动消费者
    consumer = PubSubConsumer(os_sink=opensearch_sink, qdrant_sink=qdrant_sink)
    consumer.start()

部署与运维考量

代码只是故事的一半。在生产环境中运行此服务还需要考虑以下几点:

  1. 容器化: 使用 Dockerfile 将应用打包成一个镜像,这使得在 Azure Kubernetes Service (AKS) 或 Azure Container Apps (ACA) 上的部署变得标准化和可重复。
  2. 身份认证: 在跨云场景中,服务主体(Service Principal)或 Workload Identity 是管理认证的最佳实践。代码中不应出现静态的密钥文件。Azure 服务需要一个有权访问 Google Cloud Pub/Sub 的身份。
  3. 可观测性:
    • 结构化日志: 将日志输出为 JSON 格式,并发送到 Azure Monitor Logs,便于查询和告警。
    • 指标: 暴露关键业务指标,如每秒处理消息数、各 Sink 的写入延迟、错误率等。使用 Prometheus 格式并通过 Azure Monitor for Prometheus 进行采集。
    • 分布式追踪: 在更复杂的系统中,使用 OpenTelemetry 为跨服务的请求添加追踪,以诊断延迟问题。
  4. 单元与集成测试:
    • 单元测试应覆盖 models.py 的数据校验逻辑。
    • 集成测试应 Mock OpenSearchSinkQdrantSinkwrite 方法,以测试 process_message 中的扇出和 ACK/NACK 逻辑是否正确。

架构的局限性与未来演进

这个架构虽然健壮,但并非没有缺点。它最大的局限在于缺乏跨两个 Sink 的原子性。我们的 ACK/NACK 逻辑是一种妥协:如果 OpenSearch 写入成功而 Qdrant 失败,我们会确认消息,导致数据在两个系统间出现短暂不一致。

对于需要更强一致性保证的场景,可以考虑以下演进路径:

  • Transactional Outbox 模式: 将事件写入应用的主数据库的一个 “outbox” 表中,与业务操作在同一个事务内完成。然后,使用 Debezium 等 CDC (Change Data Capture) 工具读取 outbox 表的变更,并将其推送到 Pub/Sub。这样可以保证事件的”至少一次”投递,并且与业务数据严格一致。

  • 引入流处理引擎: 当业务逻辑变得更复杂,例如需要在写入前进行数据聚合或转换时,可以将这个简单的消费者服务升级为基于 Apache Flink 或 Spark Streaming 的流处理作业。这些引擎提供了更高级的状态管理、窗口操作和容错机制,可以托管在 Azure HDInsight 或 Databricks 上。

该方案的适用边界在于那些可以容忍最终一致性、且对故障隔离和水平扩展能力有较高要求的实时数据同步场景。它是在简单性和生产级可靠性之间取得的一个务实平衡。


  目录