构建基于 Redis Streams 的 Fastify 微服务核心事件库与配套 IaC 实践


团队内部的 Node.js 微服务数量开始失控时,一个棘手的问题浮出水面:服务间的通信方式碎片化严重。有的团队习惯用 RabbitMQ,有的偏爱轻量的 Redis Pub/Sub,还有的为了省事直接暴露 HTTP 端点进行同步调用。这种技术栈的“多样性”直接导致了统一监控、错误处理和消息治理的缺失。每个新服务都需要重新实现一套消息收发的逻辑,并且几乎没有哪个实现考虑到了消息丢失、重复消费或处理失败后的自动重试,这是一场悄无声息的灾难。

我们的目标是终结这种混乱。我们需要一个标准化的、内部共享的核心事件库,它必须:

  1. 技术栈统一: 强制所有异步事件通信收敛到 Redis Streams。
  2. API 抽象: 提供极其简洁、类型安全的事件发布与订阅 API。
  3. 韧性内建: 自动处理消费者组的创建、消息确认(ACK)、失败消息的重试以及死信队列(DLQ)逻辑。
  4. 框架集成: 作为 Fastify 插件无缝集成,因为 Fastify 是我们选定的 Node.js 后端框架。
  5. 设施先行: 库所依赖的 Redis 实例,其生命周期必须由基础设施即代码(IaC)严格管理,杜绝手动创建。

这个库,我们称之为 @internal/event-core,将成为我们内部开发者平台(IDP)的第一块基石。

技术选型决策

在真实项目中,选择比实现更重要。

  • Fastify: 它的性能和极低的开销是我们需要的。更关键的是,其强大的插件体系和装饰器能力,让我们能将事件库的能力优雅地注入到每个服务的请求上下文中。
  • Redis Streams: 相比 Pub/Sub,它提供了持久化、消费者组和消息确认机制,这对于构建可靠的异步系统至关重要。它本质上是一个轻量级的、内嵌在 Redis 中的“类 Kafka”解决方案,完美满足我们中等规模的需求。
  • Babel: 我们希望用最新的 JavaScript 语法编写这个核心库,以提高代码的可读性和可维护性。Babel 允许我们这样做,同时能将代码编译成兼容线上 Node.js 版本的形式。
  • Terraform (IaC): 手动在云控制台创建资源是不可追溯、不可复现的。我们选择 Terraform 来定义和部署 Redis 实例。这保证了开发、测试、生产环境的一致性,并将基础设施的变更纳入代码审查流程。

第一步:基础设施先行 (IaC with Terraform)

在编写任何应用代码之前,我们先用 Terraform 来声明我们的 Redis 实例。这是原则问题,应用代码应该依赖于被管理的、明确定义的基础设施,而不是反过来。为了演示,这里使用 Docker Provider 创建一个本地的 Redis 实例,但在真实项目中,只需替换为相应的云厂商 Provider (如 aws_elasticache_replication_group) 即可。

创建一个 infra 目录,并放入 main.tf

# infra/main.tf

# 使用 Docker Provider 进行本地演示
terraform {
  required_providers {
    docker = {
      source  = "kreuzwerker/docker"
      version = "~> 3.0.1"
    }
  }
}

# 定义一个可复用的网络
resource "docker_network" "private_network" {
  name = "app-network"
}

# 定义 Redis 服务
resource "docker_container" "redis" {
  name  = "event-redis"
  image = "redis:7.0-alpine"
  # 附加到我们的网络中,以便其他服务可以访问
  networks_advanced {
    name = docker_network.private_network.name
  }
  # 将端口映射到主机,方便调试
  ports {
    internal = 6379
    external = 6379
  }
  # 启用持久化,这在生产中至关重要
  command = ["redis-server", "--save", "60 1", "--loglevel", "warning"]
  volumes {
    host_path      = "/path/to/local/redis-data" # 替换为你的本地路径
    container_path = "/data"
  }
}

# 输出 Redis 的连接地址,供应用程序使用
output "redis_host" {
  description = "The hostname of the Redis container."
  value       = docker_container.redis.name
}

output "redis_port" {
  description = "The port of the Redis container."
  value       = docker_container.redis.ports[0].external
}

执行 terraform initterraform apply 后,一个名为 event-redis 的 Redis 容器就会被创建并运行。这个过程是声明式的、可重复的。

第二步:核心库 (@internal/event-core) 的构建

现在,我们开始构建核心库。项目结构如下:

event-core/
├── package.json
├── .babelrc
└── src/
    ├── index.js         # 插件主入口
    ├── EventBus.js      # 发布者逻辑
    ├── Consumer.js      # 消费者逻辑
    └── errors.js        # 自定义错误

Babel 配置

我们需要一个稳健的 Babel 配置来转换现代 JS 语法。

// .babelrc
{
  "presets": [
    [
      "@babel/preset-env",
      {
        "targets": {
          "node": "18" // 目标 Node.js 版本
        }
      }
    ]
  ],
  "plugins": [
    "@babel/plugin-proposal-class-properties"
  ]
}

事件发布器: EventBus.js

发布逻辑相对简单,核心是 XADD 命令。但我们需要封装好连接管理和事件结构。

// src/EventBus.js
import { createClient } from 'redis';

/**
 * 负责向 Redis Streams 发布事件。
 * 封装了连接管理和标准化的事件格式。
 */
export class EventBus {
  #redisClient;
  #config;
  #logger;
  #isConnected = false;

  /**
   * @param {object} config
   * @param {string} config.url - Redis 连接 URL
   * @param {object} logger - 兼容 pino 的日志记录器
   */
  constructor(config, logger) {
    this.#config = config;
    this.#logger = logger;
    this.#redisClient = createClient({ url: this.#config.url });

    this.#redisClient.on('error', (err) => {
      this.#logger.error({ err }, 'Redis client error');
      this.#isConnected = false;
    });

    this.#redisClient.on('connect', () => {
        this.#logger.info('Redis client connected');
        this.#isConnected = true;
    });
  }

  async connect() {
    if (this.#isConnected) {
      this.#logger.warn('Redis client is already connected.');
      return;
    }
    await this.#redisClient.connect();
  }

  /**
   * 发布一个事件到指定的 stream
   * @param {string} stream - Stream 的名称 (e.g., 'user-events')
   * @param {string} eventType - 事件类型 (e.g., 'UserCreated')
   * @param {object} payload - 事件的载荷
   * @returns {Promise<string>} - 返回消息 ID
   */
  async publish(stream, eventType, payload) {
    if (!this.#isConnected) {
      throw new Error('Redis client is not connected. Cannot publish event.');
    }

    // 这里的坑在于:payload 必须是扁平的 string-string 键值对。
    // 直接传入复杂对象会被 Redis client 库自动 toString(),导致数据失真。
    // 因此,我们必须序列化它。JSON 是最通用的选择。
    const message = {
      type: eventType,
      version: '1.0', // 事件版本,用于未来的 schema 演进
      timestamp: Date.now().toString(),
      payload: JSON.stringify(payload),
    };

    try {
      const messageId = await this.#redisClient.xAdd(stream, '*', message);
      this.#logger.info({ stream, eventType, messageId }, 'Event published successfully');
      return messageId;
    } catch (err) {
      this.#logger.error({ err, stream, eventType }, 'Failed to publish event');
      // 可以在这里引入重试逻辑,但对于发布失败,通常需要让上游业务决定如何处理
      throw err;
    }
  }

  async disconnect() {
    await this.#redisClient.quit();
    this.#isConnected = false;
    this.#logger.info('Redis client disconnected');
  }
}

这个类处理了连接逻辑,并强制所有事件都包含 type, version, timestamppayload。关键点在于 payload 必须 JSON.stringify,这是新手使用 Redis Streams 时常见的陷阱。

事件消费者: Consumer.js

消费者是整个库最复杂、最核心的部分。它需要处理消费者组、读取消息、确认、处理失败以及认领(claim)僵尸消息。

// src/Consumer.js
import { setTimeout } from 'timers/promises';

const DEFAULT_BLOCK_MS = 5000; // XREADGROUP 阻塞等待时间
const DEFAULT_CLAIM_INTERVAL_MS = 60000; // 1分钟检查一次僵尸消息
const DEFAULT_MAX_RETRY = 3; // 消息最大重试次数

/**
 * 消费者,负责处理一个 Stream 的事件。
 * 自动管理消费者组、消息确认和死信逻辑。
 */
export class Consumer {
  #redisClient;
  #stream;
  #group;
  #consumerId;
  #handler;
  #logger;
  #isStopped = true;

  /**
   * @param {object} options
   * @param {object} options.redisClient - 已连接的 redis client 实例
   * @param {string} options.stream - 消费的 Stream 名称
   * @param {string} options.group - 消费者组名称
   * @param {string} options.consumerId - 当前消费者的唯一 ID
   * @param {Function} options.handler - 消息处理函数,必须是 async function
   * @param {object} options.logger - 日志记录器
   */
  constructor({ redisClient, stream, group, consumerId, handler, logger }) {
    this.#redisClient = redisClient;
    this.#stream = stream;
    this.#group = group;
    this.#consumerId = consumerId;
    this.#handler = handler;
    this.#logger = logger;
  }

  async start() {
    this.#isStopped = false;
    await this.#ensureGroupExists();
    this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer starting...');
    
    // 启动两个并行的循环
    // 1. 主处理循环,读取新消息
    this.#poll(); 
    // 2. 僵尸消息处理循环,处理被遗弃的消息
    this.#claimStaleMessages();
  }

  stop() {
    this.#isStopped = true;
    this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer stopping...');
  }

  async #ensureGroupExists() {
    try {
      // MKSTREAM 选项确保如果 stream 不存在,也会被创建
      await this.#redisClient.xGroupCreate(this.#stream, this.#group, '0', {
        MKSTREAM: true,
      });
      this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer group created or already exists.');
    } catch (err) {
      // 'BUSYGROUP Consumer Group name already exists' 是正常情况,可以忽略
      if (err.message.includes('BUSYGROUP')) {
        this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer group already exists.');
      } else {
        this.#logger.error({ err }, 'Failed to create consumer group');
        throw err;
      }
    }
  }

  // 主消息拉取循环
  async #poll() {
    while (!this.#isStopped) {
      try {
        const response = await this.#redisClient.xReadGroup(
          this.#group,
          this.#consumerId,
          { key: this.#stream, id: '>' }, // '>' 表示只读取从未被消费过的消息
          { BLOCK: DEFAULT_BLOCK_MS, COUNT: 10 }
        );

        if (!response || response.length === 0) {
          continue; // 没有新消息,继续下一次循环
        }
        
        // response 结构: [{ name: 'stream-name', messages: [...] }]
        const messages = response[0].messages;
        for (const msg of messages) {
          await this.#processMessage(msg);
        }
      } catch (err) {
        this.#logger.error({ err }, 'Error in polling loop. Retrying after delay.');
        await setTimeout(DEFAULT_BLOCK_MS); // 发生错误时,等待一段时间再重试
      }
    }
  }

  // 认领并处理僵尸消息的循环
  async #claimStaleMessages() {
    while (!this.#isStopped) {
      await setTimeout(DEFAULT_CLAIM_INTERVAL_MS);
      try {
        // 1. 查看 Pending Entries List (PEL) 中有哪些僵尸消息
        const [pendingSummary] = await this.#redisClient.xPending(this.#stream, this.#group);
        if (pendingSummary.pending === 0) continue;

        const minIdleTime = DEFAULT_CLAIM_INTERVAL_MS; // 只有超过这个闲置时间的消息才被认为是僵尸
        
        // 2. 获取详细的僵尸消息列表
        const pendingMessages = await this.#redisClient.xPendingJustId(
          this.#stream,
          this.#group,
          { IDLE: minIdleTime, COUNT: 10 }
        );

        if (!pendingMessages || pendingMessages.length === 0) continue;

        // 3. 认领这些消息
        const claimed = await this.#redisClient.xClaim(
          this.#stream,
          this.#group,
          this.#consumerId,
          minIdleTime,
          pendingMessages
        );
        
        if (claimed && claimed.length > 0) {
           this.#logger.info({ count: claimed.length }, 'Claimed stale messages');
           for (const msg of claimed) {
             await this.#processMessage(msg);
           }
        }
      } catch(err) {
         this.#logger.error({ err }, 'Error while claiming stale messages');
      }
    }
  }
  
  // 单个消息的处理逻辑,包含重试和死信队列
  async #processMessage(message) {
    const { id, message: rawMessage } = message;
    const deadLetterStream = `${this.#stream}:dlq`;

    try {
      // 反序列化 payload
      const payload = JSON.parse(rawMessage.payload);
      const event = { ...rawMessage, payload };
      
      this.#logger.info({ id, type: event.type }, 'Processing message');
      await this.#handler(event, id);
      
      // 处理成功,ACK
      await this.#redisClient.xAck(this.#stream, this.#group, id);
      this.#logger.info({ id }, 'Message acknowledged');

    } catch (handlerError) {
      this.#logger.warn({ err: handlerError, id }, 'Message handler failed');
      
      // 检查消息的投递次数
      // XPENDING 返回 [id, consumerName, idleTime, deliveryCount]
      const [, , , deliveryCount] = (await this.#redisClient.xPending(this.#stream, this.#group, {
        IDLE: 0,
        start: id,
        end: id,
        COUNT: 1,
      }))[0];

      if (deliveryCount > DEFAULT_MAX_RETRY) {
        this.#logger.error({ id, deliveryCount }, 'Message exceeded max retries. Moving to DLQ.');
        
        // 移入死信队列
        await this.#redisClient.xAdd(deadLetterStream, '*', {
          ...rawMessage,
          originalId: id,
          error: handlerError.message,
          failedAt: new Date().toISOString()
        });
        
        // 移入死信队列后,必须 ACK 原始消息,否则它会永远留在 PEL 中
        await this.#redisClient.xAck(this.#stream, this.#group, id);
      } else {
        // 未达到最大重试次数,不做任何事,消息会保留在 PEL 中
        // 等待被 claimStaleMessages 循环重新处理
        this.#logger.info({ id, deliveryCount }, 'Message will be retried later');
      }
    }
  }
}

这段代码是整个库的心脏。它包含了两个并行的异步循环:#poll用于处理新消息,#claimStaleMessages用于处理那些被其他崩溃的消费者实例遗留下来的“僵尸消息”。#processMessage方法是关键,它不仅调用业务逻辑,还在失败时检查重试次数,并决定是将消息移入死信队列还是让它留在待处理列表(Pending Entries List, PEL)中等待下一次被认领。

sequenceDiagram
    participant Consumer
    participant Redis
    participant Handler

    loop Poll Loop
        Consumer->>Redis: XREADGROUP (block)
        Redis-->>Consumer: New Messages
        Consumer->>Consumer: processMessage(msg)
        Consumer->>Handler: execute(msg.payload)
        alt Handler Success
            Consumer->>Redis: XACK(msg.id)
        else Handler Failure
            Consumer->>Redis: XPENDING(msg.id)
            Redis-->>Consumer: deliveryCount
            alt deliveryCount > maxRetry
                Consumer->>Redis: XADD (to DLQ stream)
                Consumer->>Redis: XACK(msg.id)
            else deliveryCount <= maxRetry
                Consumer-->>Consumer: Do nothing, leave in PEL
            end
        end
    end

    loop Claim Loop (every minute)
        Consumer->>Redis: XPENDING (find stale)
        Redis-->>Consumer: Stale message IDs
        Consumer->>Redis: XCLAIM(stale IDs)
        Redis-->>Consumer: Claimed Messages
        Consumer->>Consumer: processMessage(claimed_msg)
    end

与 Fastify 集成: index.js

最后,我们用 fastify-pluginEventBusConsumer 封装起来,提供给业务代码使用。

// src/index.js
import fp from 'fastify-plugin';
import { EventBus } from './EventBus.js';
import { Consumer } from './Consumer.js';

async function eventCorePlugin(fastify, options) {
  const { redisUrl, consumers = [] } = options;
  const logger = fastify.log.child({ plugin: 'event-core' });

  if (!redisUrl) {
    throw new Error('`redisUrl` option is required for event-core plugin');
  }

  // 1. 初始化并连接 EventBus
  const eventBus = new EventBus({ url: redisUrl }, logger);
  await eventBus.connect();

  // 2. 将 EventBus 实例装饰到 Fastify 实例上
  fastify.decorate('eventBus', eventBus);

  // 3. 初始化所有声明的消费者
  const consumerInstances = consumers.map(config => {
    const consumerLogger = logger.child({ 
        stream: config.stream, 
        group: config.group 
    });
    return new Consumer({
      redisClient: eventBus._redisClient, // 复用 EventBus 的 Redis client
      consumerId: `${config.group}:${Date.now()}`, // 生成一个唯一的消费者ID
      ...config,
      logger: consumerLogger,
    });
  });

  // 4. 在 Fastify 启动时启动所有消费者
  fastify.addHook('onReady', async () => {
    logger.info('Starting all consumers...');
    await Promise.all(consumerInstances.map(c => c.start()));
  });

  // 5. 在 Fastify 关闭时优雅地停止所有组件
  fastify.addHook('onClose', async (instance) => {
    logger.info('Closing event-core connections...');
    consumerInstances.forEach(c => c.stop());
    await eventBus.disconnect();
  });
}

export default fp(eventCorePlugin, {
  name: '@internal/event-core',
  fastify: '4.x'
});

这个插件做了几件重要的事情:

  1. 接收 Redis 连接信息和一系列消费者配置。
  2. 创建 EventBus 实例并将其挂载到 fastify.eventBus,这样在任何路由处理器中都能方便地发布事件。
  3. 根据配置创建 Consumer 实例。
  4. 利用 Fastify 的生命周期钩子 (onReady, onClose) 来自动管理消费者的启动和关闭,确保了资源的优雅释放。

第三步:在微服务中使用核心库

现在,假设我们有一个 user-service,当一个新用户注册时,它需要发布一个 UserCreated 事件。

// user-service/server.js
import Fastify from 'fastify';
import eventCorePlugin from '@internal/event-core';
import { randomUUID } from 'crypto';

const fastify = Fastify({ logger: true });

// 注册我们的核心事件库插件
fastify.register(eventCorePlugin, {
  redisUrl: process.env.REDIS_URL || 'redis://event-redis:6379',
  consumers: [
    {
      stream: 'order-events',
      group: 'user-service-group',
      // 这是一个处理来自其他服务的事件的例子
      async handler(event, id) {
        fastify.log.info(`Received order event ${id}: ${event.type}`);
        // ... 处理订单相关逻辑
      }
    }
  ]
});

// 定义一个用户注册路由
fastify.post('/register', async (request, reply) => {
  const { email, name } = request.body;
  const userId = randomUUID();
  
  // 模拟用户创建逻辑
  fastify.log.info({ userId, email }, 'Creating new user');

  // 使用注入的 eventBus 发布事件
  await fastify.eventBus.publish('user-events', 'UserCreated', {
    userId,
    email,
    name,
    registeredAt: new Date().toISOString()
  });

  return { userId, status: 'pending_activation' };
});

const start = async () => {
  try {
    await fastify.listen({ port: 3000, host: '0.0.0.0' });
  } catch (err) {
    fastify.log.error(err);
    process.exit(1);
  }
};

start();

通过这种方式,业务代码变得极其干净。开发者无需关心 Redis 连接、消费者组、ACK 或重试。他们只需要在插件配置中声明他们想消费的流和处理器,并在需要时调用 fastify.eventBus.publish。所有底层的复杂性都被 @internal/event-core 完美封装。

局限性与未来迭代

这套方案虽然解决了我们最初的痛点,但它并非银弹。在真实项目中,还有几个需要考虑的边界问题和优化路径。

  1. Redis Cluster 支持: 当前实现依赖单个 Redis 实例。对于需要极高吞吐量和可用性的系统,必须支持 Redis Cluster。这会显著增加实现的复杂性,因为对 Stream 的操作(特别是跨多个 slot 的操作)需要更精细的客户端路由逻辑。
  2. 死信队列(DLQ)处理器: 我们实现了将消息移入 DLQ 的逻辑,但没有实现处理 DLQ 的逻辑。一个完整的系统需要一个独立的“reclaimer”服务或管理界面,用于审查死信、手动重试或将其归档。
  3. 动态消费者管理: 当前的消费者是在服务启动时静态定义的。一个更高级的平台可能会允许通过 API 或配置中心动态地增删消费者,以应对变化的业务需求。
  4. 更复杂的 IaC: Terraform 脚本只管理了 Redis 本身。一个完整的平台工程实践会将其扩展到管理 Kubernetes 的部署清单(Deployments, Services)、监控仪表盘(Grafana Dashboards as Code)以及相关的告警规则。
  5. Schema Registry 与类型生成: 为了实现真正的端到端类型安全,可以引入一个 Schema Registry (如 Confluent Schema Registry 的概念),用于存储和版本化事件的 schema。核心库可以集成这个注册表,甚至自动根据 schema 生成类型定义,供服务开发者使用。

  目录