一个必须正视的技术决策场景:业务方需要通过 Web 界面,对存储在数据湖中的 TB 级 Parquet 数据集执行自定义分析。这些分析查询的执行时间从三十秒到数十分钟不等。前端应用需要一个 API 来提交这些分析任务,并在任务完成后获取结果。直接的技术挑战是,如何设计一个兼具成本效益、高可扩展性且能提供良好用户体验的 API 服务层。
方案权衡:从单体阻塞到混合异步架构
在真实项目中,我们通常会评估几种架构路径。
方案A:传统的长轮询或 WebSocket 后端服务
这是最直接的思路。构建一个常驻的 Java 后端服务(例如使用 Spring Boot),它暴露一个 RESTful API 端点。当收到请求时,该服务在内部创建一个 SparkSession,以编程方式提交作业,然后阻塞 HTTP 连接,直到 Spark 作业完成并返回结果。
- 优势:
- 架构相对简单,开发心智负担较低。
- 所有逻辑都在一个代码库中,易于维护。
- 劣势:
- 成本高昂: 为了应对不频繁的查询请求,必须维持一组始终在线的服务器实例。在闲置时段,这造成了巨大的资源浪费。
- 用户体验差: 没有任何一个现代 Web 应用可以容忍一个长达数分钟的 HTTP 请求挂起。这几乎必然会被客户端、负载均衡器或网关超时中断。
- 扩展性耦合: API 的 Web 流量扩展能力与 Spark 的计算资源扩展能力被绑定在一起,无法独立伸缩。
方案B:纯粹的云厂商 Serverless 数据管道
另一种思路是完全拥抱 Serverless。例如,使用 AWS Lambda 触发一个 Step Functions 工作流,该工作流再启动一个 AWS Glue (托管的 Spark 服务) 作业。
- 优势:
- 完全托管,无需关心底层服务器。
- 理论上具有无限的扩展能力。
- 劣势:
- 厂商锁定: 整个解决方案与特定云服务商深度绑定。
- 控制力受限: 对 Spark 的版本、依赖库、JVM 参数等底层环境的控制力较弱,难以进行深度性能调优。
- 调试复杂: 跨越多个云服务的分布式系统,日志追踪和问题定位的链路更长。
最终决策:Vercel Functions + 自有 Spark 集群的混合异步架构
我们最终选择的路径,旨在结合上述方案的优点,同时规避其核心缺陷。该架构的核心思想是将前端请求处理与后端重计算彻底解耦。
- Vercel Functions (JavaScript/Node.js): 扮演轻量级、高弹性的 API 网关角色。它只负责接收请求、校验参数、生成唯一作业 ID,并将作业任务委托给后端,然后立即向客户端返回作业 ID。它的成本模型是按需付费,没有请求时几乎不产生费用。
- 内部作业提交服务 (Java): 一个轻量级的、不对外公开的 Java 服务。它的唯一职责是接收来自 Vercel Functions 的内部请求,并安全地向 Spark 集群提交作业。
- Apache Spark 集群 (Java): 专用于执行长时间运行的计算密集型任务。它可以是自建的 Standalone 集群,也可以是运行在 Kubernetes 或 YARN 上的集群。资源可以根据计算需求独立于 API 层进行伸缩。
- 共享状态与结果存储 (例如 Redis/S3): 作为 Vercel Functions 和 Spark 作业之间通信的桥梁。用于存储作业状态(PENDING, RUNNING, SUCCEEDED, FAILED)和最终的计算结果。
其工作流程如下:
sequenceDiagram
participant User as 用户
participant Vercel as Vercel Functions (JS)
participant JobAPI as 内部作业提交服务 (Java)
participant Spark as Spark Cluster
participant Store as 状态/结果存储 (Redis/S3)
User->>+Vercel: POST /api/submit (含分析参数)
Vercel->>Vercel: 1. 参数校验, 生成 Job ID
Vercel->>Store: 2. 记录状态: Job ID -> PENDING
Vercel->>+JobAPI: 3. POST /internal/submit (Job ID, 参数)
Vercel-->>-User: 202 Accepted (返回 Job ID)
JobAPI->>+Spark: 4. 提交 Spark 作业 (携带 Job ID)
JobAPI-->>-Vercel: 确认提交
Spark->>Store: 5. 更新状态: Job ID -> RUNNING
loop 轮询状态
User->>+Vercel: GET /api/status/{Job ID}
Vercel->>Store: 查询 Job ID 状态
Store-->>Vercel: 返回状态 (e.g., RUNNING)
Vercel-->>-User: 返回状态
end
Spark->>Spark: 6. 执行计算...
Spark->>Store: 7. 将结果写入 S3 (路径含 Job ID)
Spark->>Store: 8. 更新状态: Job ID -> SUCCEEDED
User->>+Vercel: GET /api/status/{Job ID}
Vercel->>Store: 查询 Job ID 状态
Store-->>Vercel: 返回状态 (SUCCEEDED)
Vercel-->>-User: 返回状态与结果路径
User->>Store: 9. 直接从 S3 下载结果
这个架构的务实之处在于,它将正确的技术用在了正确的地方。Vercel Functions 擅长处理海量、并发、短时的 HTTP 请求,而 Spark 专注于其核心优势——大规模分布式数据处理。
核心实现细节
1. Vercel Functions API 层 (JavaScript)
Vercel Functions 使用基于文件的路由。我们创建 api/submit.js 和 api/status/[jobId].js 两个文件。
api/submit.js - 作业提交端点
这个函数必须做到轻量且快速响应。它的职责是验证、授权、创建作业记录,然后触发后端,而不是等待作业完成。
// /api/submit.js
import { v4 as uuidv4 } from 'uuid';
import { createClient } from 'redis';
// 在生产环境中,这些配置应该来自环境变量
const INTERNAL_JOB_API_ENDPOINT = process.env.INTERNAL_JOB_API_ENDPOINT;
const INTERNAL_JOB_API_KEY = process.env.INTERNAL_JOB_API_KEY;
const REDIS_URL = process.env.REDIS_URL;
export default async function handler(req, res) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method Not Allowed' });
}
// 1. 参数校验: 在真实项目中,这里会使用 Zod 或 Joi 等库进行严格的模式校验
const { analysisType, dataRange } = req.body;
if (!analysisType || !dataRange) {
return res.status(400).json({ error: 'Missing required parameters: analysisType, dataRange' });
}
const jobId = uuidv4();
let redisClient;
try {
// 2. 初始化状态: 将作业初始状态写入 Redis
redisClient = createClient({ url: REDIS_URL });
await redisClient.connect();
const jobMetadata = {
status: 'PENDING',
submittedAt: new Date().toISOString(),
analysisType,
dataRange,
};
await redisClient.hSet(`job:${jobId}`, jobMetadata);
// 3. 异步触发内部作业提交服务
// 这是一个 fire-and-forget 的调用。我们不等待其完成。
// 使用内部 API Key 进行服务间认证
fetch(INTERNAL_JOB_API_ENDPOINT, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Internal-Api-Key': INTERNAL_JOB_API_KEY,
},
body: JSON.stringify({
jobId,
analysisType,
dataRange,
// 传递 Spark 作业所需的具体参数
sparkJobParams: {
jarPath: 's3://my-spark-jobs/analyzer.jar',
mainClass: 'com.example.spark.CohortAnalysisJob',
}
}),
}).catch(err => {
// 这里的错误处理至关重要。如果触发失败,必须更新作业状态为 FAILED。
// 在生产环境中,这里应该有重试逻辑或告警机制。
console.error(`[Job ${jobId}] Failed to trigger internal job API:`, err);
redisClient.hSet(`job:${jobId}`, { status: 'TRIGGER_FAILED', error: err.message });
});
// 4. 立即返回 202 Accepted
return res.status(202).json({
message: 'Job accepted for processing.',
jobId: jobId,
statusUrl: `/api/status/${jobId}`,
});
} catch (error) {
console.error('Error in /api/submit:', error);
// 如果在触发前发生任何错误,返回 500
return res.status(500).json({ error: 'Internal Server Error' });
} finally {
if (redisClient?.isOpen) {
await redisClient.quit();
}
}
}
api/status/[jobId].js - 作业状态轮询端点
这个端点允许前端以低成本的方式检查作业进度。
// /api/status/[jobId].js
import { createClient } from 'redis';
const REDIS_URL = process.env.REDIS_URL;
const S3_RESULT_BUCKET = process.env.S3_RESULT_BUCKET;
export default async function handler(req, res) {
const { jobId } = req.query;
if (!jobId) {
return res.status(400).json({ error: 'Job ID is required.' });
}
let redisClient;
try {
redisClient = createClient({ url: REDIS_URL });
await redisClient.connect();
const jobData = await redisClient.hGetAll(`job:${jobId}`);
if (Object.keys(jobData).length === 0) {
return res.status(404).json({ error: 'Job not found.' });
}
const response = {
jobId,
status: jobData.status,
submittedAt: jobData.submittedAt,
updatedAt: jobData.updatedAt || jobData.submittedAt,
};
// 如果作业成功,附带结果位置信息
// 在真实项目中,应该返回一个预签名的 S3 URL (presigned URL) 以确保安全访问
if (jobData.status === 'SUCCEEDED') {
response.resultUrl = `s3://${S3_RESULT_BUCKET}/results/${jobId}/output.parquet`;
response.completedAt = jobData.completedAt;
} else if (jobData.status === 'FAILED') {
response.error = jobData.error;
response.failedAt = jobData.failedAt;
}
return res.status(200).json(response);
} catch (error) {
console.error(`Error fetching status for job ${jobId}:`, error);
return res.status(500).json({ error: 'Internal Server Error' });
} finally {
if (redisClient?.isOpen) {
await redisClient.quit();
}
}
}
2. 内部作业提交服务 (Java)
我们选择使用一个轻量级的 Java HTTP 框架,如 Javalin,来构建这个内部服务。它的职责单一:接收请求,构造 spark-submit 命令并执行。这层服务是安全的关键,它必须部署在内网中,仅对 Vercel 的 IP 地址或通过共享密钥开放。
// JobSubmitterService.java
import io.javalin.Javalin;
import io.javalin.http.Handler;
import org.json.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class JobSubmitterService {
private static final String INTERNAL_API_KEY = System.getenv("INTERNAL_JOB_API_KEY");
private static final String SPARK_MASTER_URL = System.getenv("SPARK_MASTER_URL"); // e.g., spark://host:port
private static final String REDIS_HOST = System.getenv("REDIS_HOST");
private static final JedisPool jedisPool = new JedisPool(REDIS_HOST, 6379);
// 使用线程池来异步执行 spark-submit,避免阻塞 HTTP 请求线程
private static final ExecutorService sparkSubmitExecutor = Executors.newFixedThreadPool(10);
public static void main(String[] args) {
Javalin app = Javalin.create().start(7070);
app.before("/internal/*", ctx -> {
String apiKey = ctx.header("X-Internal-Api-Key");
if (apiKey == null || !apiKey.equals(INTERNAL_API_KEY)) {
ctx.status(401).result("Unauthorized");
ctx.skipFutureHandlers();
}
});
app.post("/internal/submit", handleSubmit);
}
private static final Handler handleSubmit = ctx -> {
JSONObject body = new JSONObject(ctx.body());
String jobId = body.getString("jobId");
// 提交 spark-submit 是一个耗时操作, 必须异步处理
sparkSubmitExecutor.submit(() -> {
try (Jedis jedis = jedisPool.getResource()) {
// 更新状态为 RUNNING
jedis.hset("job:" + jobId, "status", "RUNNING");
jedis.hset("job:" + jobId, "updatedAt", java.time.Instant.now().toString());
// 构建 spark-submit 命令
List<String> command = buildSparkSubmitCommand(body);
System.out.println("Executing command: " + String.join(" ", command));
ProcessBuilder processBuilder = new ProcessBuilder(command);
processBuilder.redirectErrorStream(true); // 合并标准输出和错误输出
Process process = processBuilder.start();
// 读取子进程输出,用于调试
try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
// 在生产环境中,应将这些日志发送到集中的日志系统
System.out.println("[SparkSubmit " + jobId + "]: " + line);
}
}
int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("Spark job failed with exit code: " + exitCode);
}
// Spark 作业本身负责在完成时更新最终状态为 SUCCEEDED
// 此处我们不更新状态,以防作业仍在运行中
} catch (Exception e) {
System.err.println("Failed to submit or monitor Spark job " + jobId + ": " + e.getMessage());
// 如果提交过程本身失败,则更新状态为 FAILED
try (Jedis jedis = jedisPool.getResource()) {
jedis.hset("job:" + jobId, "status", "FAILED");
jedis.hset("job:" + jobId, "error", "Failed during submission: " + e.getMessage());
jedis.hset("job:" + jobId, "failedAt", java.time.Instant.now().toString());
}
}
});
ctx.status(200).json("{\"message\":\"Job submission initiated.\"}");
};
private static List<String> buildSparkSubmitCommand(JSONObject requestBody) {
String jobId = requestBody.getString("jobId");
JSONObject params = requestBody.getJSONObject("sparkJobParams");
JSONObject analysisParams = requestBody.getJSONObject("dataRange");
List<String> command = new ArrayList<>();
command.add("spark-submit");
command.add("--master");
command.add(SPARK_MASTER_URL);
command.add("--deploy-mode");
command.add("cluster"); // 在生产环境中,应使用 cluster 模式
command.add("--class");
command.add(params.getString("mainClass"));
// ... 其他 Spark 配置, 如 driver memory, executor cores 等
command.add("--conf");
command.add("spark.app.name=AnalysisJob-" + jobId);
command.add(params.getString("jarPath"));
// 将业务参数作为应用程序参数传递
command.add(jobId);
command.add(analysisParams.getString("startDate"));
command.add(analysisParams.getString("endDate"));
return command;
}
}
3. Apache Spark 分析作业 (Java)
这是实际执行数据处理的 Spark 应用。它必须被设计成一个独立的、可参数化的 JAR 包。关键在于,它需要接收 jobId 作为参数,并在作业开始、成功或失败时,主动更新 Redis 中的状态。
// com.example.spark.CohortAnalysisJob.java
package com.example.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
public class CohortAnalysisJob {
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: CohortAnalysisJob <jobId> <startDate> <endDate>");
System.exit(1);
}
String jobId = args[0];
String startDate = args[1];
String endDate = args[2];
// 从环境变量或配置文件中获取 Redis 和 S3 的配置
String redisHost = System.getenv("REDIS_HOST");
String s3InputPath = System.getenv("S3_INPUT_PATH");
String s3OutputPath = System.getenv("S3_OUTPUT_PATH");
JedisPool jedisPool = new JedisPool(redisHost, 6379);
// 使用 try-with-resources 确保 SparkSession 被关闭
try (SparkSession spark = SparkSession.builder()
.appName("AnalysisJob-" + jobId)
.getOrCreate()) {
runAnalysis(spark, jobId, startDate, endDate, s3InputPath, s3OutputPath, jedisPool);
} catch (Exception e) {
System.err.println("Spark job " + jobId + " failed catastrophically.");
e.printStackTrace();
// 最终的保障: 如果应用崩溃,更新状态为 FAILED
updateJobStatus(jedisPool, jobId, "FAILED", e.getMessage());
}
}
private static void runAnalysis(SparkSession spark, String jobId, String startDate, String endDate,
String s3InputPath, String s3OutputPath, JedisPool jedisPool) {
try {
// 这是一个模拟的分析过程
System.out.println("Starting analysis for job: " + jobId);
Dataset<Row> sourceData = spark.read()
.parquet(s3InputPath)
.filter("event_date >= '" + startDate + "' AND event_date <= '" + endDate + "'");
// 复杂的聚合逻辑...
Dataset<Row> result = sourceData.groupBy("user_cohort")
.count()
.withColumnRenamed("count", "user_count");
result.show();
// 将结果写入 S3,使用 jobId 作为路径的一部分,以便于查找
String finalOutputPath = s3OutputPath + "/results/" + jobId;
result.write()
.mode("overwrite")
.parquet(finalOutputPath + "/output.parquet");
System.out.println("Job " + jobId + " completed successfully. Results written to " + finalOutputPath);
// 作业成功,更新最终状态
updateJobStatus(jedisPool, jobId, "SUCCEEDED", null);
} catch (Exception e) {
System.err.println("Error during analysis for job " + jobId + ": " + e.getMessage());
e.printStackTrace();
// 作业执行失败,更新状态并记录错误信息
updateJobStatus(jedisPool, jobId, "FAILED", e.getMessage());
}
}
private static void updateJobStatus(JedisPool pool, String jobId, String status, String errorMessage) {
try (Jedis jedis = pool.getResource()) {
Map<String, String> fields = new HashMap<>();
fields.put("status", status);
fields.put("updatedAt", Instant.now().toString());
if ("SUCCEEDED".equals(status)) {
fields.put("completedAt", Instant.now().toString());
} else if ("FAILED".equals(status)) {
fields.put("failedAt", Instant.now().toString());
fields.put("error", errorMessage != null ? errorMessage.substring(0, Math.min(errorMessage.length(), 1024)) : "Unknown error");
}
jedis.hset("job:" + jobId, fields);
} catch (Exception e) {
System.err.println("CRITICAL: Failed to update job status in Redis for jobId " + jobId + ". Error: " + e.getMessage());
}
}
}
架构的局限性与未来优化路径
此架构虽然解决了核心问题,但也引入了新的复杂性,其边界和局限性必须明确。
状态管理中心化: Redis/S3 成为了系统的核心,其可用性和性能直接影响整个系统的稳定性。在更高要求的场景下,需要考虑 Redis 的高可用部署和 S3 的跨区域复制。
作业提交服务的单点问题:
JobSubmitterService虽然轻量,但如果它宕机,新的 Spark 作业将无法提交。生产环境中,需要将其容器化并部署多个实例,通过负载均衡确保高可用。缺乏精细化作业管理: 当前实现依赖于执行
spark-submit进程。更成熟的方案是引入 Apache Livy 这样的 Spark REST Job Server。Livy 提供了更强大的作业生命周期管理、安全性和并发控制能力,可以替代我们自研的JobSubmitterService。轮询机制的效率: 客户端轮询虽然简单,但在作业执行时间很长的情况下,会产生大量无效的 API 请求。对于需要更实时通知的场景,可以引入 WebSocket 或 Server-Sent Events (SSE),在作业完成时由后端主动推送消息给客户端。但这会增加 Vercel Functions 层的复杂性,因为它需要管理长连接,这与 Serverless 的初衷有所相悖,需要仔细权衡。
该架构并非万能药,它最适用于用户发起的、对延迟不敏感、但计算成本高昂的分析查询场景。它是在极致成本效益、用户体验和技术栈灵活性之间做出的一种务实权衡。