在 Spinnaker 金丝雀发布中集成两阶段提交以保障 Express.js 服务升级的事务一致性


一次失败的发布所造成的线上数据不一致,其修复成本远超任何前期的预防性投入。问题起源于一个看似常规的需求:为我们的核心用户账户服务(一个基于 Express.js 的微服务)增加一个“安全等级”字段。这个字段的引入并非简单的 ALTER TABLE,它需要与另一个独立的风控规则服务进行同步,确保新等级在规则引擎中也即时生效。直接部署新服务,然后执行一个数据迁移脚本,再手动在风控服务里更新配置,这个流程充满了风险。如果在迁移数据后,风控服务更新失败,那么一部分用户的账户状态就会处于一个中间态,后果不堪设想。

传统的金丝雀发布策略在这里也显得力不从心。Spinnaker 可以优雅地部署新版本,并切分少量流量进行验证。但它本身无法感知这种跨服务的业务逻辑原子性。如果金丝雀实例健康检查通过,Spinnaker 就会继续推广,它并不知道背后风控服务的数据可能还未同步。我们需要一个机制,将“部署新代码”、“更新用户数据”、“同步风控规则”这三个操作捆绑成一个原子单元——要么全部成功,要么全部回滚,仿佛它们是一个数据库事务。

这就是我们将两阶段提交(Two-Phase Commit, 2PC)协议引入 Spinnaker 发布流程的初衷。我们将整个发布过程,从部署到最终流量切换,都视为一个分布式事务。Spinnaker Pipeline 扮演事务协调者(Transaction Coordinator)的角色,而我们的 Express.js 用户服务、一个临时的迁移任务以及风控配置服务,则扮演事务参与者(Participants)。

架构设计:将发布流程事务化

我们的目标是实现一个由 Spinnaker 驱动的、具备事务能力的发布流水线。

  1. 事务协调器 (Transaction Coordinator - TC): 一个轻量级的 Express.js 服务,独立部署。它只负责管理事务状态,暴露 start, prepare, commit, abort 四个 API。Spinnaker 将通过 Webhook Stage 调用这些 API。
  2. 事务参与者 (Participants):
    • 用户账户服务 (Account Service): 我们要升级的 Express.js 服务。新版本的代码中必须包含实现 2PC 接口的逻辑,即 /prepare-update, /commit-update, /abort-update 这类端点。
    • 风控规则服务 (Risk Rule Service): 同样需要暴露 2PC 接口,用于准备和提交规则变更。
  3. Spinnaker Pipeline: 整个流程的核心编排者。它将串联起部署、测试、2PC 投票和最终决策的各个阶段。

下面是整个流程的 Mermaid 图示,清晰地展示了各个阶段的交互:

sequenceDiagram
    participant Pipeline as Spinnaker Pipeline
    participant Tester as Integration Test Job
    participant Coordinator as 2PC Coordinator
    participant AccountSvc as Account Service (Canary)
    participant RiskSvc as Risk Rule Service

    Pipeline->>Tester: Trigger E2E Tests
    Note right of Tester: 使用 Playwright + React Testing Library 
验证新UI与Canary后端兼容性 Tester-->>Pipeline: Tests Passed Pipeline->>AccountSvc: Deploy Canary Version Pipeline->>Coordinator: POST /transactions (Start) Coordinator-->>Pipeline: { transactionId: 'xyz' } Pipeline->>AccountSvc: POST /prepare-update
{ txId: 'xyz' } Note right of AccountSvc: 锁定资源, 准备数据变更
但不实际写入 AccountSvc-->>Pipeline: 200 OK (Prepared) Pipeline->>RiskSvc: POST /prepare-rules
{ txId: 'xyz' } Note right of RiskSvc: 预加载新规则,
但不激活 RiskSvc-->>Pipeline: 200 OK (Prepared) Pipeline->>Pipeline: Manual Judgment Stage Note over Pipeline: SRE/QA 验证 Canary 功能
(流量已切入 1%) alt Judgment: "Continue" Pipeline->>Coordinator: POST /transactions/xyz/commit Coordinator->>AccountSvc: POST /commit-update Coordinator->>RiskSvc: POST /commit-rules AccountSvc-->>Coordinator: 200 OK RiskSvc-->>Coordinator: 200 OK Coordinator-->>Pipeline: 200 OK (Committed) Pipeline->>Pipeline: Promote Canary to Production else Judgment: "Stop" Pipeline->>Coordinator: POST /transactions/xyz/abort Coordinator->>AccountSvc: POST /abort-update Coordinator->>RiskSvc: POST /abort-rules AccountSvc-->>Coordinator: 200 OK RiskSvc-->>Coordinator: 200 OK Coordinator-->>Pipeline: 200 OK (Aborted) Pipeline->>Pipeline: Destroy Canary & Rollback end

第一道防线:React Testing Library 驱动的集成测试门禁

在启动如此复杂的发布流程之前,必须确保新版本的软件质量是过关的。这里的坑在于,我们不能只做单元测试或孤立的后端 API 测试。因为这次变更会影响用户界面,必须确保前端在新的数据模型下能正确渲染。

因此,我们在 Spinnaker Pipeline 的最开始设置了一个“Run Jenkins Job”阶段,这个 Job 负责运行一套端到端的集成测试。测试框架使用 Playwright,它能模拟真实用户在浏览器中的操作,而断言部分则深度依赖 React Testing Library。我们不关心按钮的颜色或像素位置,只关心组件的逻辑状态和可访问性。

一个关键的测试用例如下:

// tests/account-security-level.spec.ts
import { test, expect } from '@playwright/test';
import { screen, within } from '@testing-library/dom';

// 在测试环境中, Playwright 会被配置为指向 Canary 实例的 URL
const CANARY_URL = process.env.CANARY_ENDPOINT || 'http://localhost:3001';

test.describe('Account Security Level Feature', () => {
  test('should display the new security level badge for eligible users', async ({ page }) => {
    // 步骤1: 使用测试账户登录
    await page.goto(`${CANARY_URL}/login`);
    await page.fill('input[name="username"]', 'test-user-with-new-level');
    await page.fill('input[name="password"]', 'password123');
    await page.click('button[type="submit"]');
    
    // 步骤2: 导航到个人资料页
    await page.waitForURL(`${CANARY_URL}/profile`);
    await page.click('a[href="/profile"]');

    // 步骤3: 使用 React Testing Library 的查询定位元素
    // 这里的 document.body 是由 Playwright 提供的浏览器上下文
    const profileCard = await screen.findByTestId(document.body, 'profile-card');

    // 在卡片内部查找 badge
    // 这是 RTL 的强大之处:它强制你从用户视角思考,而不是依赖脆弱的 CSS 选择器
    const securityBadge = await within(profileCard).findByText('Security Level: Enhanced');

    // 步骤4: 断言元素存在且可见
    expect(securityBadge).not.toBeNull();
    const isVisible = await securityBadge.isVisible();
    expect(isVisible).toBe(true);
  });

  test('should not display the badge for users without the new level', async ({ page }) => {
    // ... 针对老用户的类似测试 ...
  });
});

这个测试确保了只有当新前端代码能够正确处理新后端 API 返回的数据时,发布流程才能继续。这是至关重要的第一道质量门禁。

核心实现:具备 2PC 能力的 Express.js 服务

现在来看后端服务的改造。我们需要让 Account Service 能够响应来自 Spinnaker(通过协调器)的 prepare, commitabort 指令。

一个常见的错误是直接在业务逻辑中混入事务控制代码。更好的做法是使用中间件或装饰器来分离关注点。这里我们为 Express.js 实现一个简单的事务状态管理器。

// src/services/transactionManager.ts
import { v4 as uuidv4 } from 'uuid';
import { createClient } from 'redis';

// 在真实项目中,状态应持久化到 Redis 或 etcd,以防协调器崩溃
const redisClient = createClient({ url: process.env.REDIS_URL });
redisClient.on('error', (err) => console.error('Redis Client Error', err));
await redisClient.connect();

// 事务状态: pending -> prepared -> (committed | aborted)
interface Transaction {
    id: string;
    status: 'pending' | 'prepared' | 'committed' | 'aborted';
    participants: string[];
    preparedParticipants: Set<string>;
    createdAt: number;
}

const TX_EXPIRATION_SECONDS = 300; // 5分钟

export const transactionCoordinator = {
    async start(participants: string[]): Promise<Transaction> {
        const tx: Transaction = {
            id: uuidv4(),
            status: 'pending',
            participants,
            preparedParticipants: new Set(),
            createdAt: Date.now(),
        };
        await redisClient.set(`transaction:${tx.id}`, JSON.stringify(tx), {
            EX: TX_EXPIRATION_SECONDS,
        });
        return tx;
    },

    async recordPrepare(txId: string, participantId: string): Promise<'ok' | 'not_found' | 'already_prepared'> {
        const txData = await redisClient.get(`transaction:${txId}`);
        if (!txData) return 'not_found';

        const tx: Transaction = JSON.parse(txData);
        if (tx.preparedParticipants.has(participantId)) {
            return 'already_prepared';
        }

        tx.preparedParticipants.add(participantId);
        
        // 当所有参与者都准备好后,更新事务状态
        if (tx.preparedParticipants.size === tx.participants.length) {
            tx.status = 'prepared';
        }

        await redisClient.set(`transaction:${txId}`, JSON.stringify(tx), {
            EX: TX_EXPIRATION_SECONDS,
        });

        return 'ok';
    },

    async setState(txId: string, status: 'committed' | 'aborted'): Promise<'ok' | 'not_found' | 'invalid_state'> {
        const txData = await redisClient.get(`transaction:${txId}`);
        if (!txData) return 'not_found';
        
        const tx: Transaction = JSON.parse(txData);
        if (tx.status !== 'prepared' && status === 'committed') {
            // 只有在 prepared 状态下才能 commit
            return 'invalid_state';
        }
        
        tx.status = status;
        // 在生产环境中,commit/abort 后可以保留一段时间用于审计,而不是立即删除
        await redisClient.set(`transaction:${txId}`, JSON.stringify(tx), {
            EX: TX_EXPIRATION_SECONDS, 
        });
        
        return 'ok';
    },

    async getTransaction(txId: string): Promise<Transaction | null> {
        const txData = await redisClient.get(`transaction:${txId}`);
        return txData ? JSON.parse(txData) : null;
    }
};

这是协调器服务的核心逻辑。它使用 Redis 来存储事务状态,简单而有效。

接下来是 Account Service 如何作为参与者实现接口:

// src/controllers/transactionalUpdateController.ts
import { Request, Response } from 'express';
import { dbClient } from '../db'; // 假设的数据库客户端
import { lockResource, releaseResource, hasLock } from '../utils/distributedLock';

const RESOURCE_ID = 'user_security_level_migration';

// 准备阶段:执行所有前置检查和资源锁定,但不提交数据库事务
export const prepareUpdate = async (req: Request, res: Response) => {
    const { transactionId } = req.body;
    if (!transactionId) {
        return res.status(400).json({ error: 'transactionId is required' });
    }

    // 1. 尝试获取分布式锁,防止并发的发布流程冲突
    const lockAcquired = await lockResource(transactionId, RESOURCE_ID);
    if (!lockAcquired) {
        // 这里的坑在于:如果锁被其他事务持有,应该立即返回失败,让协调器中止整个事务
        return res.status(409).json({ error: 'Resource is locked by another transaction' });
    }

    try {
        // 2. 模拟检查数据库连接、模式兼容性等
        await dbClient.query('SELECT 1');
        console.log(`[TX: ${transactionId}] PREPARE: Database connection is healthy.`);
        
        // 3. 模拟预计算或验证数据。在真实项目中,这里可能会创建一个临时表
        const { rowCount } = await dbClient.query('SELECT COUNT(*) FROM users WHERE security_level IS NULL');
        if (rowCount === 0) {
            console.log(`[TX: ${transactionId}] PREPARE: No users to update. Vote YES.`);
        } else {
            console.log(`[TX: ${transactionId}] PREPARE: Found ${rowCount} users to update. Vote YES.`);
        }

        // 所有检查通过,返回成功
        res.status(200).json({ status: 'prepared' });

    } catch (error) {
        console.error(`[TX: ${transactionId}] PREPARE_FAILED:`, error);
        // 如果准备失败,必须释放锁并返回错误
        await releaseResource(transactionId, RESOURCE_ID);
        res.status(500).json({ error: 'Preparation failed', details: error.message });
    }
};

// 提交阶段:执行真正的数据库变更
export const commitUpdate = async (req: Request, res: Response) => {
    const { transactionId } = req.body;

    // 安全检查:确保锁仍然被当前事务持有
    if (!(await hasLock(transactionId, RESOURCE_ID))) {
        return res.status(412).json({ error: 'Transaction lock lost or expired' });
    }

    const tx = await dbClient.connect();
    try {
        await tx.query('BEGIN');
        // 这是真正的业务逻辑
        await tx.query(`
            UPDATE users SET security_level = 'standard' WHERE security_level IS NULL;
        `);
        await tx.query('COMMIT');
        
        console.log(`[TX: ${transactionId}] COMMIT: Successfully updated user security levels.`);
        res.status(200).json({ status: 'committed' });

    } catch (error) {
        console.error(`[TX: ${transactionId}] COMMIT_FAILED:`, error);
        await tx.query('ROLLBACK');
        res.status(500).json({ error: 'Commit failed', details: error.message });
    } finally {
        await releaseResource(transactionId, RESOURCE_ID);
        tx.release();
    }
};

// 中止阶段:清理所有预留的资源
export const abortUpdate = async (req: Request, res: Response) => {
    const { transactionId } = req.body;

    // 只需释放分布式锁即可,因为没有实际数据被修改
    await releaseResource(transactionId, RESOURCE_ID);
    console.log(`[TX: ${transactionId}] ABORT: Rolled back any pending changes.`);
    res.status(200).json({ status: 'aborted' });
};

这段代码展示了参与者的关键职责:

  • Prepare: 锁定资源,执行所有必要的检查,确保 commit 阶段有极大概率会成功。这是 2PC 的精髓。
  • Commit: 执行实际操作。这个阶段必须是幂等的,并且要尽可能快地完成。
  • Abort: 回滚所有在 prepare 阶段所做的预备操作,释放资源。

Spinnaker Pipeline 的编排实现

理论和代码都有了,最后一步是将其在 Spinnaker 中组装起来。这通常通过编辑 Pipeline 的 JSON 来实现。

// ...部分 Pipeline JSON 定义...
"stages": [
    {
      "name": "Run E2E Integration Tests",
      "type": "jenkins",
      "refId": "1",
      "requisiteStageRefIds": [],
      "parameters": {
        "job": "E2E-Account-Service-Tests",
        "parameters": { "CANARY_ENDPOINT": "${trigger['artifacts'][0]['reference']}" }
      }
    },
    {
      "name": "Deploy Canary",
      "type": "deploy",
      "refId": "2",
      "requisiteStageRefIds": ["1"],
      // ... deploy configuration ...
    },
    {
      "name": "Start Transaction",
      "type": "webhook",
      "refId": "3",
      "requisiteStageRefIds": ["2"],
      "payload": {
        "participants": ["account-service", "risk-rule-service"]
      },
      "url": "http://tx-coordinator.internal/transactions",
      "method": "POST",
      "failOnNon2xx": true
    },
    {
      "name": "Prepare Participants",
      "type": "parallel",
      "refId": "4",
      "requisiteStageRefIds": ["3"],
      "stages": [
        {
          "name": "Prepare Account Service",
          "type": "webhook",
          "payload": { "transactionId": "${ #stage('Start Transaction')['outputs']['body']['transactionId'] }" },
          "url": "http://account-service-canary.internal/prepare-update",
          "method": "POST"
        },
        {
          "name": "Prepare Risk Service",
          "type": "webhook",
          "payload": { "transactionId": "${ #stage('Start Transaction')['outputs']['body']['transactionId'] }" },
          "url": "http://risk-rule-service.internal/prepare-rules",
          "method": "POST"
        }
      ]
    },
    {
      "name": "Manual Judgment: Verify Canary",
      "type": "manualJudgment",
      "refId": "5",
      "requisiteStageRefIds": ["4"]
    },
    {
      "name": "Commit Transaction",
      "type": "webhook",
      "refId": "6",
      "requisiteStageRefIds": ["5"],
      "url": "http://tx-coordinator.internal/transactions/${ #stage('Start Transaction')['outputs']['body']['transactionId'] }/commit",
      "method": "POST",
      "failOnNon2xx": true
    },
    // ... 后续的 Promote to Production 和 Cleanup 阶段 ...
    {
      "name": "Abort Transaction (On Failure)",
      "type": "webhook",
      "refId": "7",
      "requisiteStageRefIds": ["5"], // This stage would be triggered on judgment rejection
      "url": "http://tx-coordinator.internal/transactions/${ #stage('Start Transaction')['outputs']['body']['transactionId'] }/abort",
      "method": "POST"
    }
]
// ...

这里使用了 Spinnaker 的 webhook stage 和 SpEL 表达式 (${...}) 来传递从上一个 stage 获取的 transactionIdparallel stage 确保了对所有参与者的 prepare 请求是同时发出的,以缩短投票阶段的时间。

局限性与未来展望

我们成功地构建了一个事务性的发布流程,它极大地增强了复杂服务变更的安全性。然而,这个方案并非银弹。两阶段提交协议本身存在一些固有的问题:

  1. 同步阻塞:prepare 阶段之后,所有参与者都必须锁定资源等待协调器的最终指令。如果协调器宕机,这些资源将被无限期锁定,这在分布式系统中是致命的。
  2. 协调器单点故障: 尽管我们的协调器使用了 Redis 做状态持久化,但协调器服务本身仍然是单点。生产环境需要为其设计高可用方案。
  3. 性能开销: 整个流程引入了多次网络调用,相比常规发布,耗时会显著增加。这只适用于那些绝对不能出现中间状态的、低频但高风险的发布。

对于更复杂的、涉及多个服务长时间交互的场景,Saga 模式可能是更好的选择。Saga 通过一系列的本地事务和对应的补偿操作来保证最终一致性,它避免了 2PC 的同步阻塞问题,但实现起来更为复杂,需要精心设计每个步骤的补偿逻辑。我们当前的 2PC 方案,可以看作是在特定场景下,为了追求强一致性而做出的一个务实且有效的工程权衡。下一步的迭代方向,可能是为这个协调器增加对 Saga 模式的支持,让团队可以根据发布变更的风险等级,在 Pipeline 中选择使用 2PC 还是 Saga。


  目录