构建由 Vercel Functions 驱动的异步 Apache Spark 作业提交与结果查询架构


一个必须正视的技术决策场景:业务方需要通过 Web 界面,对存储在数据湖中的 TB 级 Parquet 数据集执行自定义分析。这些分析查询的执行时间从三十秒到数十分钟不等。前端应用需要一个 API 来提交这些分析任务,并在任务完成后获取结果。直接的技术挑战是,如何设计一个兼具成本效益、高可扩展性且能提供良好用户体验的 API 服务层。

方案权衡:从单体阻塞到混合异步架构

在真实项目中,我们通常会评估几种架构路径。

方案A:传统的长轮询或 WebSocket 后端服务

这是最直接的思路。构建一个常驻的 Java 后端服务(例如使用 Spring Boot),它暴露一个 RESTful API 端点。当收到请求时,该服务在内部创建一个 SparkSession,以编程方式提交作业,然后阻塞 HTTP 连接,直到 Spark 作业完成并返回结果。

  • 优势:
    • 架构相对简单,开发心智负担较低。
    • 所有逻辑都在一个代码库中,易于维护。
  • 劣势:
    • 成本高昂: 为了应对不频繁的查询请求,必须维持一组始终在线的服务器实例。在闲置时段,这造成了巨大的资源浪费。
    • 用户体验差: 没有任何一个现代 Web 应用可以容忍一个长达数分钟的 HTTP 请求挂起。这几乎必然会被客户端、负载均衡器或网关超时中断。
    • 扩展性耦合: API 的 Web 流量扩展能力与 Spark 的计算资源扩展能力被绑定在一起,无法独立伸缩。

方案B:纯粹的云厂商 Serverless 数据管道

另一种思路是完全拥抱 Serverless。例如,使用 AWS Lambda 触发一个 Step Functions 工作流,该工作流再启动一个 AWS Glue (托管的 Spark 服务) 作业。

  • 优势:
    • 完全托管,无需关心底层服务器。
    • 理论上具有无限的扩展能力。
  • 劣势:
    • 厂商锁定: 整个解决方案与特定云服务商深度绑定。
    • 控制力受限: 对 Spark 的版本、依赖库、JVM 参数等底层环境的控制力较弱,难以进行深度性能调优。
    • 调试复杂: 跨越多个云服务的分布式系统,日志追踪和问题定位的链路更长。

最终决策:Vercel Functions + 自有 Spark 集群的混合异步架构

我们最终选择的路径,旨在结合上述方案的优点,同时规避其核心缺陷。该架构的核心思想是将前端请求处理与后端重计算彻底解耦。

  • Vercel Functions (JavaScript/Node.js): 扮演轻量级、高弹性的 API 网关角色。它只负责接收请求、校验参数、生成唯一作业 ID,并将作业任务委托给后端,然后立即向客户端返回作业 ID。它的成本模型是按需付费,没有请求时几乎不产生费用。
  • 内部作业提交服务 (Java): 一个轻量级的、不对外公开的 Java 服务。它的唯一职责是接收来自 Vercel Functions 的内部请求,并安全地向 Spark 集群提交作业。
  • Apache Spark 集群 (Java): 专用于执行长时间运行的计算密集型任务。它可以是自建的 Standalone 集群,也可以是运行在 Kubernetes 或 YARN 上的集群。资源可以根据计算需求独立于 API 层进行伸缩。
  • 共享状态与结果存储 (例如 Redis/S3): 作为 Vercel Functions 和 Spark 作业之间通信的桥梁。用于存储作业状态(PENDING, RUNNING, SUCCEEDED, FAILED)和最终的计算结果。

其工作流程如下:

sequenceDiagram
    participant User as 用户
    participant Vercel as Vercel Functions (JS)
    participant JobAPI as 内部作业提交服务 (Java)
    participant Spark as Spark Cluster
    participant Store as 状态/结果存储 (Redis/S3)

    User->>+Vercel: POST /api/submit (含分析参数)
    Vercel->>Vercel: 1. 参数校验, 生成 Job ID
    Vercel->>Store: 2. 记录状态: Job ID -> PENDING
    Vercel->>+JobAPI: 3. POST /internal/submit (Job ID, 参数)
    Vercel-->>-User: 202 Accepted (返回 Job ID)
    
    JobAPI->>+Spark: 4. 提交 Spark 作业 (携带 Job ID)
    JobAPI-->>-Vercel: 确认提交
    Spark->>Store: 5. 更新状态: Job ID -> RUNNING

    loop 轮询状态
        User->>+Vercel: GET /api/status/{Job ID}
        Vercel->>Store: 查询 Job ID 状态
        Store-->>Vercel: 返回状态 (e.g., RUNNING)
        Vercel-->>-User: 返回状态
    end

    Spark->>Spark: 6. 执行计算...
    Spark->>Store: 7. 将结果写入 S3 (路径含 Job ID)
    Spark->>Store: 8. 更新状态: Job ID -> SUCCEEDED
    
    User->>+Vercel: GET /api/status/{Job ID}
    Vercel->>Store: 查询 Job ID 状态
    Store-->>Vercel: 返回状态 (SUCCEEDED)
    Vercel-->>-User: 返回状态与结果路径
    
    User->>Store: 9. 直接从 S3 下载结果

这个架构的务实之处在于,它将正确的技术用在了正确的地方。Vercel Functions 擅长处理海量、并发、短时的 HTTP 请求,而 Spark 专注于其核心优势——大规模分布式数据处理。

核心实现细节

1. Vercel Functions API 层 (JavaScript)

Vercel Functions 使用基于文件的路由。我们创建 api/submit.jsapi/status/[jobId].js 两个文件。

api/submit.js - 作业提交端点

这个函数必须做到轻量且快速响应。它的职责是验证、授权、创建作业记录,然后触发后端,而不是等待作业完成。

// /api/submit.js
import { v4 as uuidv4 } from 'uuid';
import { createClient } from 'redis';

// 在生产环境中,这些配置应该来自环境变量
const INTERNAL_JOB_API_ENDPOINT = process.env.INTERNAL_JOB_API_ENDPOINT;
const INTERNAL_JOB_API_KEY = process.env.INTERNAL_JOB_API_KEY;
const REDIS_URL = process.env.REDIS_URL;

export default async function handler(req, res) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method Not Allowed' });
  }

  // 1. 参数校验: 在真实项目中,这里会使用 Zod 或 Joi 等库进行严格的模式校验
  const { analysisType, dataRange } = req.body;
  if (!analysisType || !dataRange) {
    return res.status(400).json({ error: 'Missing required parameters: analysisType, dataRange' });
  }

  const jobId = uuidv4();
  let redisClient;

  try {
    // 2. 初始化状态: 将作业初始状态写入 Redis
    redisClient = createClient({ url: REDIS_URL });
    await redisClient.connect();

    const jobMetadata = {
      status: 'PENDING',
      submittedAt: new Date().toISOString(),
      analysisType,
      dataRange,
    };
    await redisClient.hSet(`job:${jobId}`, jobMetadata);

    // 3. 异步触发内部作业提交服务
    // 这是一个 fire-and-forget 的调用。我们不等待其完成。
    // 使用内部 API Key 进行服务间认证
    fetch(INTERNAL_JOB_API_ENDPOINT, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'X-Internal-Api-Key': INTERNAL_JOB_API_KEY,
      },
      body: JSON.stringify({
        jobId,
        analysisType,
        dataRange,
        // 传递 Spark 作业所需的具体参数
        sparkJobParams: {
          jarPath: 's3://my-spark-jobs/analyzer.jar',
          mainClass: 'com.example.spark.CohortAnalysisJob',
        }
      }),
    }).catch(err => {
      // 这里的错误处理至关重要。如果触发失败,必须更新作业状态为 FAILED。
      // 在生产环境中,这里应该有重试逻辑或告警机制。
      console.error(`[Job ${jobId}] Failed to trigger internal job API:`, err);
      redisClient.hSet(`job:${jobId}`, { status: 'TRIGGER_FAILED', error: err.message });
    });

    // 4. 立即返回 202 Accepted
    return res.status(202).json({
      message: 'Job accepted for processing.',
      jobId: jobId,
      statusUrl: `/api/status/${jobId}`,
    });

  } catch (error) {
    console.error('Error in /api/submit:', error);
    // 如果在触发前发生任何错误,返回 500
    return res.status(500).json({ error: 'Internal Server Error' });
  } finally {
    if (redisClient?.isOpen) {
      await redisClient.quit();
    }
  }
}

api/status/[jobId].js - 作业状态轮询端点

这个端点允许前端以低成本的方式检查作业进度。

// /api/status/[jobId].js
import { createClient } from 'redis';

const REDIS_URL = process.env.REDIS_URL;
const S3_RESULT_BUCKET = process.env.S3_RESULT_BUCKET;

export default async function handler(req, res) {
  const { jobId } = req.query;

  if (!jobId) {
    return res.status(400).json({ error: 'Job ID is required.' });
  }

  let redisClient;
  try {
    redisClient = createClient({ url: REDIS_URL });
    await redisClient.connect();

    const jobData = await redisClient.hGetAll(`job:${jobId}`);

    if (Object.keys(jobData).length === 0) {
      return res.status(404).json({ error: 'Job not found.' });
    }

    const response = {
      jobId,
      status: jobData.status,
      submittedAt: jobData.submittedAt,
      updatedAt: jobData.updatedAt || jobData.submittedAt,
    };
    
    // 如果作业成功,附带结果位置信息
    // 在真实项目中,应该返回一个预签名的 S3 URL (presigned URL) 以确保安全访问
    if (jobData.status === 'SUCCEEDED') {
      response.resultUrl = `s3://${S3_RESULT_BUCKET}/results/${jobId}/output.parquet`;
      response.completedAt = jobData.completedAt;
    } else if (jobData.status === 'FAILED') {
      response.error = jobData.error;
      response.failedAt = jobData.failedAt;
    }

    return res.status(200).json(response);

  } catch (error) {
    console.error(`Error fetching status for job ${jobId}:`, error);
    return res.status(500).json({ error: 'Internal Server Error' });
  } finally {
    if (redisClient?.isOpen) {
      await redisClient.quit();
    }
  }
}

2. 内部作业提交服务 (Java)

我们选择使用一个轻量级的 Java HTTP 框架,如 Javalin,来构建这个内部服务。它的职责单一:接收请求,构造 spark-submit 命令并执行。这层服务是安全的关键,它必须部署在内网中,仅对 Vercel 的 IP 地址或通过共享密钥开放。

// JobSubmitterService.java
import io.javalin.Javalin;
import io.javalin.http.Handler;
import org.json.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class JobSubmitterService {

    private static final String INTERNAL_API_KEY = System.getenv("INTERNAL_JOB_API_KEY");
    private static final String SPARK_MASTER_URL = System.getenv("SPARK_MASTER_URL"); // e.g., spark://host:port
    private static final String REDIS_HOST = System.getenv("REDIS_HOST");
    private static final JedisPool jedisPool = new JedisPool(REDIS_HOST, 6379);
    // 使用线程池来异步执行 spark-submit,避免阻塞 HTTP 请求线程
    private static final ExecutorService sparkSubmitExecutor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        Javalin app = Javalin.create().start(7070);

        app.before("/internal/*", ctx -> {
            String apiKey = ctx.header("X-Internal-Api-Key");
            if (apiKey == null || !apiKey.equals(INTERNAL_API_KEY)) {
                ctx.status(401).result("Unauthorized");
                ctx.skipFutureHandlers();
            }
        });

        app.post("/internal/submit", handleSubmit);
    }

    private static final Handler handleSubmit = ctx -> {
        JSONObject body = new JSONObject(ctx.body());
        String jobId = body.getString("jobId");
        
        // 提交 spark-submit 是一个耗时操作, 必须异步处理
        sparkSubmitExecutor.submit(() -> {
            try (Jedis jedis = jedisPool.getResource()) {
                // 更新状态为 RUNNING
                jedis.hset("job:" + jobId, "status", "RUNNING");
                jedis.hset("job:" + jobId, "updatedAt", java.time.Instant.now().toString());

                // 构建 spark-submit 命令
                List<String> command = buildSparkSubmitCommand(body);
                
                System.out.println("Executing command: " + String.join(" ", command));

                ProcessBuilder processBuilder = new ProcessBuilder(command);
                processBuilder.redirectErrorStream(true); // 合并标准输出和错误输出
                Process process = processBuilder.start();

                // 读取子进程输出,用于调试
                try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
                    String line;
                    while ((line = reader.readLine()) != null) {
                        // 在生产环境中,应将这些日志发送到集中的日志系统
                        System.out.println("[SparkSubmit " + jobId + "]: " + line);
                    }
                }

                int exitCode = process.waitFor();

                if (exitCode != 0) {
                    throw new RuntimeException("Spark job failed with exit code: " + exitCode);
                }
                
                // Spark 作业本身负责在完成时更新最终状态为 SUCCEEDED
                // 此处我们不更新状态,以防作业仍在运行中

            } catch (Exception e) {
                System.err.println("Failed to submit or monitor Spark job " + jobId + ": " + e.getMessage());
                // 如果提交过程本身失败,则更新状态为 FAILED
                try (Jedis jedis = jedisPool.getResource()) {
                    jedis.hset("job:" + jobId, "status", "FAILED");
                    jedis.hset("job:" + jobId, "error", "Failed during submission: " + e.getMessage());
                    jedis.hset("job:" + jobId, "failedAt", java.time.Instant.now().toString());
                }
            }
        });

        ctx.status(200).json("{\"message\":\"Job submission initiated.\"}");
    };

    private static List<String> buildSparkSubmitCommand(JSONObject requestBody) {
        String jobId = requestBody.getString("jobId");
        JSONObject params = requestBody.getJSONObject("sparkJobParams");
        JSONObject analysisParams = requestBody.getJSONObject("dataRange");

        List<String> command = new ArrayList<>();
        command.add("spark-submit");
        command.add("--master");
        command.add(SPARK_MASTER_URL);
        command.add("--deploy-mode");
        command.add("cluster"); // 在生产环境中,应使用 cluster 模式
        command.add("--class");
        command.add(params.getString("mainClass"));
        // ... 其他 Spark 配置, 如 driver memory, executor cores 等
        command.add("--conf");
        command.add("spark.app.name=AnalysisJob-" + jobId);
        
        command.add(params.getString("jarPath"));
        
        // 将业务参数作为应用程序参数传递
        command.add(jobId);
        command.add(analysisParams.getString("startDate"));
        command.add(analysisParams.getString("endDate"));
        
        return command;
    }
}

3. Apache Spark 分析作业 (Java)

这是实际执行数据处理的 Spark 应用。它必须被设计成一个独立的、可参数化的 JAR 包。关键在于,它需要接收 jobId 作为参数,并在作业开始、成功或失败时,主动更新 Redis 中的状态。

// com.example.spark.CohortAnalysisJob.java
package com.example.spark;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;

public class CohortAnalysisJob {

    public static void main(String[] args) {
        if (args.length < 3) {
            System.err.println("Usage: CohortAnalysisJob <jobId> <startDate> <endDate>");
            System.exit(1);
        }

        String jobId = args[0];
        String startDate = args[1];
        String endDate = args[2];
        
        // 从环境变量或配置文件中获取 Redis 和 S3 的配置
        String redisHost = System.getenv("REDIS_HOST");
        String s3InputPath = System.getenv("S3_INPUT_PATH");
        String s3OutputPath = System.getenv("S3_OUTPUT_PATH");

        JedisPool jedisPool = new JedisPool(redisHost, 6379);

        // 使用 try-with-resources 确保 SparkSession 被关闭
        try (SparkSession spark = SparkSession.builder()
                .appName("AnalysisJob-" + jobId)
                .getOrCreate()) {
            
            runAnalysis(spark, jobId, startDate, endDate, s3InputPath, s3OutputPath, jedisPool);

        } catch (Exception e) {
            System.err.println("Spark job " + jobId + " failed catastrophically.");
            e.printStackTrace();
            // 最终的保障: 如果应用崩溃,更新状态为 FAILED
            updateJobStatus(jedisPool, jobId, "FAILED", e.getMessage());
        }
    }

    private static void runAnalysis(SparkSession spark, String jobId, String startDate, String endDate,
                                    String s3InputPath, String s3OutputPath, JedisPool jedisPool) {
        try {
            // 这是一个模拟的分析过程
            System.out.println("Starting analysis for job: " + jobId);
            
            Dataset<Row> sourceData = spark.read()
                    .parquet(s3InputPath)
                    .filter("event_date >= '" + startDate + "' AND event_date <= '" + endDate + "'");
            
            // 复杂的聚合逻辑...
            Dataset<Row> result = sourceData.groupBy("user_cohort")
                    .count()
                    .withColumnRenamed("count", "user_count");
            
            result.show();

            // 将结果写入 S3,使用 jobId 作为路径的一部分,以便于查找
            String finalOutputPath = s3OutputPath + "/results/" + jobId;
            result.write()
                  .mode("overwrite")
                  .parquet(finalOutputPath + "/output.parquet");

            System.out.println("Job " + jobId + " completed successfully. Results written to " + finalOutputPath);
            // 作业成功,更新最终状态
            updateJobStatus(jedisPool, jobId, "SUCCEEDED", null);

        } catch (Exception e) {
            System.err.println("Error during analysis for job " + jobId + ": " + e.getMessage());
            e.printStackTrace();
            // 作业执行失败,更新状态并记录错误信息
            updateJobStatus(jedisPool, jobId, "FAILED", e.getMessage());
        }
    }

    private static void updateJobStatus(JedisPool pool, String jobId, String status, String errorMessage) {
        try (Jedis jedis = pool.getResource()) {
            Map<String, String> fields = new HashMap<>();
            fields.put("status", status);
            fields.put("updatedAt", Instant.now().toString());

            if ("SUCCEEDED".equals(status)) {
                fields.put("completedAt", Instant.now().toString());
            } else if ("FAILED".equals(status)) {
                fields.put("failedAt", Instant.now().toString());
                fields.put("error", errorMessage != null ? errorMessage.substring(0, Math.min(errorMessage.length(), 1024)) : "Unknown error");
            }
            jedis.hset("job:" + jobId, fields);
        } catch (Exception e) {
            System.err.println("CRITICAL: Failed to update job status in Redis for jobId " + jobId + ". Error: " + e.getMessage());
        }
    }
}

架构的局限性与未来优化路径

此架构虽然解决了核心问题,但也引入了新的复杂性,其边界和局限性必须明确。

  1. 状态管理中心化: Redis/S3 成为了系统的核心,其可用性和性能直接影响整个系统的稳定性。在更高要求的场景下,需要考虑 Redis 的高可用部署和 S3 的跨区域复制。

  2. 作业提交服务的单点问题: JobSubmitterService 虽然轻量,但如果它宕机,新的 Spark 作业将无法提交。生产环境中,需要将其容器化并部署多个实例,通过负载均衡确保高可用。

  3. 缺乏精细化作业管理: 当前实现依赖于执行 spark-submit 进程。更成熟的方案是引入 Apache Livy 这样的 Spark REST Job Server。Livy 提供了更强大的作业生命周期管理、安全性和并发控制能力,可以替代我们自研的 JobSubmitterService

  4. 轮询机制的效率: 客户端轮询虽然简单,但在作业执行时间很长的情况下,会产生大量无效的 API 请求。对于需要更实时通知的场景,可以引入 WebSocket 或 Server-Sent Events (SSE),在作业完成时由后端主动推送消息给客户端。但这会增加 Vercel Functions 层的复杂性,因为它需要管理长连接,这与 Serverless 的初衷有所相悖,需要仔细权衡。

该架构并非万能药,它最适用于用户发起的、对延迟不敏感、但计算成本高昂的分析查询场景。它是在极致成本效益、用户体验和技术栈灵活性之间做出的一种务实权衡。


  目录