团队内部的 Node.js 微服务数量开始失控时,一个棘手的问题浮出水面:服务间的通信方式碎片化严重。有的团队习惯用 RabbitMQ,有的偏爱轻量的 Redis Pub/Sub,还有的为了省事直接暴露 HTTP 端点进行同步调用。这种技术栈的“多样性”直接导致了统一监控、错误处理和消息治理的缺失。每个新服务都需要重新实现一套消息收发的逻辑,并且几乎没有哪个实现考虑到了消息丢失、重复消费或处理失败后的自动重试,这是一场悄无声息的灾难。
我们的目标是终结这种混乱。我们需要一个标准化的、内部共享的核心事件库,它必须:
- 技术栈统一: 强制所有异步事件通信收敛到 Redis Streams。
- API 抽象: 提供极其简洁、类型安全的事件发布与订阅 API。
- 韧性内建: 自动处理消费者组的创建、消息确认(ACK)、失败消息的重试以及死信队列(DLQ)逻辑。
- 框架集成: 作为 Fastify 插件无缝集成,因为 Fastify 是我们选定的 Node.js 后端框架。
- 设施先行: 库所依赖的 Redis 实例,其生命周期必须由基础设施即代码(IaC)严格管理,杜绝手动创建。
这个库,我们称之为 @internal/event-core,将成为我们内部开发者平台(IDP)的第一块基石。
技术选型决策
在真实项目中,选择比实现更重要。
- Fastify: 它的性能和极低的开销是我们需要的。更关键的是,其强大的插件体系和装饰器能力,让我们能将事件库的能力优雅地注入到每个服务的请求上下文中。
- Redis Streams: 相比 Pub/Sub,它提供了持久化、消费者组和消息确认机制,这对于构建可靠的异步系统至关重要。它本质上是一个轻量级的、内嵌在 Redis 中的“类 Kafka”解决方案,完美满足我们中等规模的需求。
- Babel: 我们希望用最新的 JavaScript 语法编写这个核心库,以提高代码的可读性和可维护性。Babel 允许我们这样做,同时能将代码编译成兼容线上 Node.js 版本的形式。
- Terraform (IaC): 手动在云控制台创建资源是不可追溯、不可复现的。我们选择 Terraform 来定义和部署 Redis 实例。这保证了开发、测试、生产环境的一致性,并将基础设施的变更纳入代码审查流程。
第一步:基础设施先行 (IaC with Terraform)
在编写任何应用代码之前,我们先用 Terraform 来声明我们的 Redis 实例。这是原则问题,应用代码应该依赖于被管理的、明确定义的基础设施,而不是反过来。为了演示,这里使用 Docker Provider 创建一个本地的 Redis 实例,但在真实项目中,只需替换为相应的云厂商 Provider (如 aws_elasticache_replication_group) 即可。
创建一个 infra 目录,并放入 main.tf:
# infra/main.tf
# 使用 Docker Provider 进行本地演示
terraform {
required_providers {
docker = {
source = "kreuzwerker/docker"
version = "~> 3.0.1"
}
}
}
# 定义一个可复用的网络
resource "docker_network" "private_network" {
name = "app-network"
}
# 定义 Redis 服务
resource "docker_container" "redis" {
name = "event-redis"
image = "redis:7.0-alpine"
# 附加到我们的网络中,以便其他服务可以访问
networks_advanced {
name = docker_network.private_network.name
}
# 将端口映射到主机,方便调试
ports {
internal = 6379
external = 6379
}
# 启用持久化,这在生产中至关重要
command = ["redis-server", "--save", "60 1", "--loglevel", "warning"]
volumes {
host_path = "/path/to/local/redis-data" # 替换为你的本地路径
container_path = "/data"
}
}
# 输出 Redis 的连接地址,供应用程序使用
output "redis_host" {
description = "The hostname of the Redis container."
value = docker_container.redis.name
}
output "redis_port" {
description = "The port of the Redis container."
value = docker_container.redis.ports[0].external
}
执行 terraform init 和 terraform apply 后,一个名为 event-redis 的 Redis 容器就会被创建并运行。这个过程是声明式的、可重复的。
第二步:核心库 (@internal/event-core) 的构建
现在,我们开始构建核心库。项目结构如下:
event-core/
├── package.json
├── .babelrc
└── src/
├── index.js # 插件主入口
├── EventBus.js # 发布者逻辑
├── Consumer.js # 消费者逻辑
└── errors.js # 自定义错误
Babel 配置
我们需要一个稳健的 Babel 配置来转换现代 JS 语法。
// .babelrc
{
"presets": [
[
"@babel/preset-env",
{
"targets": {
"node": "18" // 目标 Node.js 版本
}
}
]
],
"plugins": [
"@babel/plugin-proposal-class-properties"
]
}
事件发布器: EventBus.js
发布逻辑相对简单,核心是 XADD 命令。但我们需要封装好连接管理和事件结构。
// src/EventBus.js
import { createClient } from 'redis';
/**
* 负责向 Redis Streams 发布事件。
* 封装了连接管理和标准化的事件格式。
*/
export class EventBus {
#redisClient;
#config;
#logger;
#isConnected = false;
/**
* @param {object} config
* @param {string} config.url - Redis 连接 URL
* @param {object} logger - 兼容 pino 的日志记录器
*/
constructor(config, logger) {
this.#config = config;
this.#logger = logger;
this.#redisClient = createClient({ url: this.#config.url });
this.#redisClient.on('error', (err) => {
this.#logger.error({ err }, 'Redis client error');
this.#isConnected = false;
});
this.#redisClient.on('connect', () => {
this.#logger.info('Redis client connected');
this.#isConnected = true;
});
}
async connect() {
if (this.#isConnected) {
this.#logger.warn('Redis client is already connected.');
return;
}
await this.#redisClient.connect();
}
/**
* 发布一个事件到指定的 stream
* @param {string} stream - Stream 的名称 (e.g., 'user-events')
* @param {string} eventType - 事件类型 (e.g., 'UserCreated')
* @param {object} payload - 事件的载荷
* @returns {Promise<string>} - 返回消息 ID
*/
async publish(stream, eventType, payload) {
if (!this.#isConnected) {
throw new Error('Redis client is not connected. Cannot publish event.');
}
// 这里的坑在于:payload 必须是扁平的 string-string 键值对。
// 直接传入复杂对象会被 Redis client 库自动 toString(),导致数据失真。
// 因此,我们必须序列化它。JSON 是最通用的选择。
const message = {
type: eventType,
version: '1.0', // 事件版本,用于未来的 schema 演进
timestamp: Date.now().toString(),
payload: JSON.stringify(payload),
};
try {
const messageId = await this.#redisClient.xAdd(stream, '*', message);
this.#logger.info({ stream, eventType, messageId }, 'Event published successfully');
return messageId;
} catch (err) {
this.#logger.error({ err, stream, eventType }, 'Failed to publish event');
// 可以在这里引入重试逻辑,但对于发布失败,通常需要让上游业务决定如何处理
throw err;
}
}
async disconnect() {
await this.#redisClient.quit();
this.#isConnected = false;
this.#logger.info('Redis client disconnected');
}
}
这个类处理了连接逻辑,并强制所有事件都包含 type, version, timestamp 和 payload。关键点在于 payload 必须 JSON.stringify,这是新手使用 Redis Streams 时常见的陷阱。
事件消费者: Consumer.js
消费者是整个库最复杂、最核心的部分。它需要处理消费者组、读取消息、确认、处理失败以及认领(claim)僵尸消息。
// src/Consumer.js
import { setTimeout } from 'timers/promises';
const DEFAULT_BLOCK_MS = 5000; // XREADGROUP 阻塞等待时间
const DEFAULT_CLAIM_INTERVAL_MS = 60000; // 1分钟检查一次僵尸消息
const DEFAULT_MAX_RETRY = 3; // 消息最大重试次数
/**
* 消费者,负责处理一个 Stream 的事件。
* 自动管理消费者组、消息确认和死信逻辑。
*/
export class Consumer {
#redisClient;
#stream;
#group;
#consumerId;
#handler;
#logger;
#isStopped = true;
/**
* @param {object} options
* @param {object} options.redisClient - 已连接的 redis client 实例
* @param {string} options.stream - 消费的 Stream 名称
* @param {string} options.group - 消费者组名称
* @param {string} options.consumerId - 当前消费者的唯一 ID
* @param {Function} options.handler - 消息处理函数,必须是 async function
* @param {object} options.logger - 日志记录器
*/
constructor({ redisClient, stream, group, consumerId, handler, logger }) {
this.#redisClient = redisClient;
this.#stream = stream;
this.#group = group;
this.#consumerId = consumerId;
this.#handler = handler;
this.#logger = logger;
}
async start() {
this.#isStopped = false;
await this.#ensureGroupExists();
this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer starting...');
// 启动两个并行的循环
// 1. 主处理循环,读取新消息
this.#poll();
// 2. 僵尸消息处理循环,处理被遗弃的消息
this.#claimStaleMessages();
}
stop() {
this.#isStopped = true;
this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer stopping...');
}
async #ensureGroupExists() {
try {
// MKSTREAM 选项确保如果 stream 不存在,也会被创建
await this.#redisClient.xGroupCreate(this.#stream, this.#group, '0', {
MKSTREAM: true,
});
this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer group created or already exists.');
} catch (err) {
// 'BUSYGROUP Consumer Group name already exists' 是正常情况,可以忽略
if (err.message.includes('BUSYGROUP')) {
this.#logger.info({ stream: this.#stream, group: this.#group }, 'Consumer group already exists.');
} else {
this.#logger.error({ err }, 'Failed to create consumer group');
throw err;
}
}
}
// 主消息拉取循环
async #poll() {
while (!this.#isStopped) {
try {
const response = await this.#redisClient.xReadGroup(
this.#group,
this.#consumerId,
{ key: this.#stream, id: '>' }, // '>' 表示只读取从未被消费过的消息
{ BLOCK: DEFAULT_BLOCK_MS, COUNT: 10 }
);
if (!response || response.length === 0) {
continue; // 没有新消息,继续下一次循环
}
// response 结构: [{ name: 'stream-name', messages: [...] }]
const messages = response[0].messages;
for (const msg of messages) {
await this.#processMessage(msg);
}
} catch (err) {
this.#logger.error({ err }, 'Error in polling loop. Retrying after delay.');
await setTimeout(DEFAULT_BLOCK_MS); // 发生错误时,等待一段时间再重试
}
}
}
// 认领并处理僵尸消息的循环
async #claimStaleMessages() {
while (!this.#isStopped) {
await setTimeout(DEFAULT_CLAIM_INTERVAL_MS);
try {
// 1. 查看 Pending Entries List (PEL) 中有哪些僵尸消息
const [pendingSummary] = await this.#redisClient.xPending(this.#stream, this.#group);
if (pendingSummary.pending === 0) continue;
const minIdleTime = DEFAULT_CLAIM_INTERVAL_MS; // 只有超过这个闲置时间的消息才被认为是僵尸
// 2. 获取详细的僵尸消息列表
const pendingMessages = await this.#redisClient.xPendingJustId(
this.#stream,
this.#group,
{ IDLE: minIdleTime, COUNT: 10 }
);
if (!pendingMessages || pendingMessages.length === 0) continue;
// 3. 认领这些消息
const claimed = await this.#redisClient.xClaim(
this.#stream,
this.#group,
this.#consumerId,
minIdleTime,
pendingMessages
);
if (claimed && claimed.length > 0) {
this.#logger.info({ count: claimed.length }, 'Claimed stale messages');
for (const msg of claimed) {
await this.#processMessage(msg);
}
}
} catch(err) {
this.#logger.error({ err }, 'Error while claiming stale messages');
}
}
}
// 单个消息的处理逻辑,包含重试和死信队列
async #processMessage(message) {
const { id, message: rawMessage } = message;
const deadLetterStream = `${this.#stream}:dlq`;
try {
// 反序列化 payload
const payload = JSON.parse(rawMessage.payload);
const event = { ...rawMessage, payload };
this.#logger.info({ id, type: event.type }, 'Processing message');
await this.#handler(event, id);
// 处理成功,ACK
await this.#redisClient.xAck(this.#stream, this.#group, id);
this.#logger.info({ id }, 'Message acknowledged');
} catch (handlerError) {
this.#logger.warn({ err: handlerError, id }, 'Message handler failed');
// 检查消息的投递次数
// XPENDING 返回 [id, consumerName, idleTime, deliveryCount]
const [, , , deliveryCount] = (await this.#redisClient.xPending(this.#stream, this.#group, {
IDLE: 0,
start: id,
end: id,
COUNT: 1,
}))[0];
if (deliveryCount > DEFAULT_MAX_RETRY) {
this.#logger.error({ id, deliveryCount }, 'Message exceeded max retries. Moving to DLQ.');
// 移入死信队列
await this.#redisClient.xAdd(deadLetterStream, '*', {
...rawMessage,
originalId: id,
error: handlerError.message,
failedAt: new Date().toISOString()
});
// 移入死信队列后,必须 ACK 原始消息,否则它会永远留在 PEL 中
await this.#redisClient.xAck(this.#stream, this.#group, id);
} else {
// 未达到最大重试次数,不做任何事,消息会保留在 PEL 中
// 等待被 claimStaleMessages 循环重新处理
this.#logger.info({ id, deliveryCount }, 'Message will be retried later');
}
}
}
}
这段代码是整个库的心脏。它包含了两个并行的异步循环:#poll用于处理新消息,#claimStaleMessages用于处理那些被其他崩溃的消费者实例遗留下来的“僵尸消息”。#processMessage方法是关键,它不仅调用业务逻辑,还在失败时检查重试次数,并决定是将消息移入死信队列还是让它留在待处理列表(Pending Entries List, PEL)中等待下一次被认领。
sequenceDiagram
participant Consumer
participant Redis
participant Handler
loop Poll Loop
Consumer->>Redis: XREADGROUP (block)
Redis-->>Consumer: New Messages
Consumer->>Consumer: processMessage(msg)
Consumer->>Handler: execute(msg.payload)
alt Handler Success
Consumer->>Redis: XACK(msg.id)
else Handler Failure
Consumer->>Redis: XPENDING(msg.id)
Redis-->>Consumer: deliveryCount
alt deliveryCount > maxRetry
Consumer->>Redis: XADD (to DLQ stream)
Consumer->>Redis: XACK(msg.id)
else deliveryCount <= maxRetry
Consumer-->>Consumer: Do nothing, leave in PEL
end
end
end
loop Claim Loop (every minute)
Consumer->>Redis: XPENDING (find stale)
Redis-->>Consumer: Stale message IDs
Consumer->>Redis: XCLAIM(stale IDs)
Redis-->>Consumer: Claimed Messages
Consumer->>Consumer: processMessage(claimed_msg)
end
与 Fastify 集成: index.js
最后,我们用 fastify-plugin 将 EventBus 和 Consumer 封装起来,提供给业务代码使用。
// src/index.js
import fp from 'fastify-plugin';
import { EventBus } from './EventBus.js';
import { Consumer } from './Consumer.js';
async function eventCorePlugin(fastify, options) {
const { redisUrl, consumers = [] } = options;
const logger = fastify.log.child({ plugin: 'event-core' });
if (!redisUrl) {
throw new Error('`redisUrl` option is required for event-core plugin');
}
// 1. 初始化并连接 EventBus
const eventBus = new EventBus({ url: redisUrl }, logger);
await eventBus.connect();
// 2. 将 EventBus 实例装饰到 Fastify 实例上
fastify.decorate('eventBus', eventBus);
// 3. 初始化所有声明的消费者
const consumerInstances = consumers.map(config => {
const consumerLogger = logger.child({
stream: config.stream,
group: config.group
});
return new Consumer({
redisClient: eventBus._redisClient, // 复用 EventBus 的 Redis client
consumerId: `${config.group}:${Date.now()}`, // 生成一个唯一的消费者ID
...config,
logger: consumerLogger,
});
});
// 4. 在 Fastify 启动时启动所有消费者
fastify.addHook('onReady', async () => {
logger.info('Starting all consumers...');
await Promise.all(consumerInstances.map(c => c.start()));
});
// 5. 在 Fastify 关闭时优雅地停止所有组件
fastify.addHook('onClose', async (instance) => {
logger.info('Closing event-core connections...');
consumerInstances.forEach(c => c.stop());
await eventBus.disconnect();
});
}
export default fp(eventCorePlugin, {
name: '@internal/event-core',
fastify: '4.x'
});
这个插件做了几件重要的事情:
- 接收 Redis 连接信息和一系列消费者配置。
- 创建
EventBus实例并将其挂载到fastify.eventBus,这样在任何路由处理器中都能方便地发布事件。 - 根据配置创建
Consumer实例。 - 利用 Fastify 的生命周期钩子 (
onReady,onClose) 来自动管理消费者的启动和关闭,确保了资源的优雅释放。
第三步:在微服务中使用核心库
现在,假设我们有一个 user-service,当一个新用户注册时,它需要发布一个 UserCreated 事件。
// user-service/server.js
import Fastify from 'fastify';
import eventCorePlugin from '@internal/event-core';
import { randomUUID } from 'crypto';
const fastify = Fastify({ logger: true });
// 注册我们的核心事件库插件
fastify.register(eventCorePlugin, {
redisUrl: process.env.REDIS_URL || 'redis://event-redis:6379',
consumers: [
{
stream: 'order-events',
group: 'user-service-group',
// 这是一个处理来自其他服务的事件的例子
async handler(event, id) {
fastify.log.info(`Received order event ${id}: ${event.type}`);
// ... 处理订单相关逻辑
}
}
]
});
// 定义一个用户注册路由
fastify.post('/register', async (request, reply) => {
const { email, name } = request.body;
const userId = randomUUID();
// 模拟用户创建逻辑
fastify.log.info({ userId, email }, 'Creating new user');
// 使用注入的 eventBus 发布事件
await fastify.eventBus.publish('user-events', 'UserCreated', {
userId,
email,
name,
registeredAt: new Date().toISOString()
});
return { userId, status: 'pending_activation' };
});
const start = async () => {
try {
await fastify.listen({ port: 3000, host: '0.0.0.0' });
} catch (err) {
fastify.log.error(err);
process.exit(1);
}
};
start();
通过这种方式,业务代码变得极其干净。开发者无需关心 Redis 连接、消费者组、ACK 或重试。他们只需要在插件配置中声明他们想消费的流和处理器,并在需要时调用 fastify.eventBus.publish。所有底层的复杂性都被 @internal/event-core 完美封装。
局限性与未来迭代
这套方案虽然解决了我们最初的痛点,但它并非银弹。在真实项目中,还有几个需要考虑的边界问题和优化路径。
- Redis Cluster 支持: 当前实现依赖单个 Redis 实例。对于需要极高吞吐量和可用性的系统,必须支持 Redis Cluster。这会显著增加实现的复杂性,因为对 Stream 的操作(特别是跨多个 slot 的操作)需要更精细的客户端路由逻辑。
- 死信队列(DLQ)处理器: 我们实现了将消息移入 DLQ 的逻辑,但没有实现处理 DLQ 的逻辑。一个完整的系统需要一个独立的“reclaimer”服务或管理界面,用于审查死信、手动重试或将其归档。
- 动态消费者管理: 当前的消费者是在服务启动时静态定义的。一个更高级的平台可能会允许通过 API 或配置中心动态地增删消费者,以应对变化的业务需求。
- 更复杂的 IaC: Terraform 脚本只管理了 Redis 本身。一个完整的平台工程实践会将其扩展到管理 Kubernetes 的部署清单(Deployments, Services)、监控仪表盘(Grafana Dashboards as Code)以及相关的告警规则。
- Schema Registry 与类型生成: 为了实现真正的端到端类型安全,可以引入一个 Schema Registry (如 Confluent Schema Registry 的概念),用于存储和版本化事件的 schema。核心库可以集成这个注册表,甚至自动根据 schema 生成类型定义,供服务开发者使用。