使用 Pulumi 构建基于 gRPC 和 AWS Lambda 的 MLOps Serverless 推理架构


在 MLOps 体系中,模型推理服务的性能是衡量整个系统价值的关键瓶颈。低延迟和高吞吐是硬性指标。常规的方案,即通过 AWS API Gateway 触发 Lambda,使用 RESTful API 和 JSON 进行数据交换,虽然简单快捷,但在性能敏感的场景下很快会暴露其天花板。JSON 的解析和序列化开销、文本协议的冗余,以及 API Gateway 本身的延迟,在每秒需要处理成千上万次推理请求时,这些微小的损耗会累积成显著的性能问题。

这就迫使我们必须评估替代方案。

方案 A: 传统 REST over API Gateway

这是最常见的 Serverless 实现。

  • 架构: 客户端 -> API Gateway (REST) -> Lambda Function
  • 优势:
    • 原生集成: 与 AWS 生态无缝衔接,配置简单。
    • 生态成熟: 工具链、文档、社区支持都非常完善。
    • 按需付费: API Gateway 的定价模型对低流量应用非常友好。
  • 劣势:
    • 性能开销: HTTP/1.1 + JSON 的组合在序列化/反序列化上有显著开销,尤其是在处理大规模特征向量(例如图像嵌入)时。
    • 协议限制: REST 缺乏严格的契约定义,依赖 OpenAPI/Swagger 等外部工具约束。不支持服务端流式传输等高级通信模式。
    • 延迟: 每一跳(Client -> AGW -> Lambda)都会引入延迟。

一个典型的 Python 客户端调用看起来是这样的:

# client_rest.py
import requests
import json
import base64
import time

# 假设这是一个图像分类服务的API Gateway端点
API_ENDPOINT = "https://xxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod/predict"

def predict_rest(image_path):
    with open(image_path, "rb") as f:
        image_bytes = f.read()
    
    # 特征数据通常需要编码
    encoded_string = base64.b64encode(image_bytes).decode("utf-8")
    
    payload = json.dumps({
        "image_data": encoded_string
    })
    
    headers = {
        'Content-Type': 'application/json'
    }

    try:
        start_time = time.time()
        response = requests.post(API_ENDPOINT, headers=headers, data=payload, timeout=10)
        response.raise_for_status() # 抛出HTTP错误
        end_time = time.time()
        
        print(f"REST request took: {end_time - start_time:.4f} seconds")
        print("Response:", response.json())

    except requests.exceptions.RequestException as e:
        print(f"An error occurred: {e}")

if __name__ == "__main__":
    # 使用一个示例图片文件
    predict_rest("path/to/your/image.jpg")

这个方案的问题在于,当 image_data 变得非常大,或者调用频率极高时,json.dumps 和网络传输的文本数据量会成为实实在在的瓶颈。

方案 B: gRPC over Application Load Balancer (ALB)

一个更激进但也更高效的方案是使用 gRPC。

  • 架构: 客户端 -> Application Load Balancer (ALB) -> Lambda Function (Target Group)
  • 优势:
    • 高性能: Protobuf 作为二进制序列化协议,其编解码速度和数据压缩率远超 JSON。HTTP/2 作为底层传输协议,支持多路复用,减少了连接开销。
    • 强类型契约: .proto 文件定义了服务接口和数据结构,为客户端和服务器生成代码,消除了联调时的不确定性,这在 MLOps 流程中对于数据格式的稳定性至关重要。
    • 流式处理: gRPC 原生支持双向流,可以处理如实时视频分析等复杂推理任务。
  • 劣势:
    • 架构复杂性: 需要配置 VPC、Subnet、ALB、Target Group,基础设施的管理成本远高于 API Gateway。
    • 成本: ALB 是按小时计费的,即使没有流量也会产生费用。对于流量波动极大的应用,成本可能高于 API Gateway。
    • 冷启动: Lambda 的冷启动问题依然存在,并且由于需要加载 gRPC 服务器和更大的容器镜像,首次调用的延迟可能更高。

尽管存在劣势,但对于核心推理业务,性能的确定性往往压倒一切。因此,我们选择方案 B,并深入探讨如何用 Pulumi 实现这个相对复杂的架构。在真实项目中,基础设施的复杂性必须通过代码来管理,否则将变成运维的噩梦。

核心实现概览

我们将使用 Pulumi 和 Python 来定义和部署整个堆栈。

graph TD
    subgraph Client-Side
        A[MLOps Client]
    end

    subgraph AWS Cloud
        B[Route 53 DNS] --> C{Application Load Balancer};
        C -- gRPC Traffic --> D[Lambda Target Group];
        D --> E[AWS Lambda Function];
        E -- reads from --> F[ECR Container Image];
        G[Model Artifacts S3] --> F;
        E -- needs permissions --> H[IAM Role];
    end
    
    subgraph Code & Definition
        I[inference.proto] --> A & E;
        J[Pulumi IaC Code] -- defines --> B & C & D & E & F & G & H;
    end

    A -- gRPC Request --> B;

1. 定义服务契约 (inference.proto)

一切从定义服务接口开始。这是客户端和服务端的共同语言。

// protos/inference.proto
syntax = "proto3";

package inference;

// 定义推理服务的接口
service InferenceService {
  // RPC方法,用于执行预测
  rpc Predict (InferenceRequest) returns (InferenceResponse) {}
}

// 请求消息体
message InferenceRequest {
  // 模型的唯一标识符
  string model_name = 1;
  // 输入张量,使用 bytes 类型以适应任意二进制数据
  bytes input_tensor = 2;
}

// 响应消息体
message InferenceResponse {
  // 模型的唯一标识符
  string model_name = 1;
  // 输出张量
  bytes output_tensor = 2;
  // 响应状态码,用于业务逻辑
  int32 status_code = 3; 
}

编译 .proto 文件生成 Python 代码:
python -m grpc_tools.protoc -I./protos --python_out=./src --grpc_python_out=./src ./protos/inference.proto

2. 实现 gRPC 服务端 (Lambda Function)

这里的挑战在于,Lambda 是一个事件驱动的模型,而 gRPC 服务器是一个长连接模型。我们不能直接在 Lambda 中 server.wait_for_termination()。解决方案是使用一个适配器,将 ALB 发送过来的 HTTP/2 请求事件转换为 gRPC 调用。幸运的是,有现成的库(如 mangum 的思路,或专门的 gRPC 适配器)可以处理,但为了理解其原理,我们展示一个简化的核心逻辑。

Lambda 的容器镜像需要包含 gRPC 服务器代码、模型文件以及所有依赖。

src/server.py

# src/server.py
import grpc
import logging
from concurrent import futures
import base64
import os

# 从生成的代码中导入
from inference_pb2 import InferenceResponse
from inference_pb2_grpc import InferenceServiceServicer, add_InferenceServiceServicer_to_server

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 一个模拟的模型加载和推理
class MockModel:
    def __init__(self, model_path):
        # 在真实场景中,这里会执行耗时的模型加载操作
        # 例如: torch.load(model_path)
        logger.info(f"Loading model from {model_path}...")
        if not os.path.exists(model_path):
            logger.warning("Model file not found, using dummy logic.")
            self._model = None
        else:
            # 假设模型加载成功
            self._model = "loaded"
        logger.info("Model loaded successfully.")

    def predict(self, data: bytes) -> bytes:
        # 模拟推理过程
        logger.info(f"Performing inference on {len(data)} bytes of data.")
        # 在真实场景中,这里是模型的 forward pass
        # output = self._model(data)
        reversed_data = data[::-1]
        return reversed_data

# 实现 proto 中定义的服务
class InferenceServiceImpl(InferenceServiceServicer):
    def __init__(self):
        # 在 Lambda 的全局作用域(构造函数之外)加载模型,以便在多次调用中复用
        # 只有在冷启动时才会执行
        model_path = os.environ.get("MODEL_PATH", "/app/model/model.bin")
        self.model = MockModel(model_path)
        super().__init__()

    def Predict(self, request, context):
        logger.info(f"Received prediction request for model: {request.model_name}")
        
        try:
            # 执行推理
            output_tensor = self.model.predict(request.input_tensor)
            
            # 构造响应
            return InferenceResponse(
                model_name=request.model_name,
                output_tensor=output_tensor,
                status_code=200
            )
        except Exception as e:
            logger.error(f"Inference failed: {e}", exc_info=True)
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(f"An internal error occurred: {e}")
            return InferenceResponse(status_code=500)

# 这个 handler 是 Lambda 的入口点
# 它需要一个适配器来将 ALB 的事件转换为 gRPC 请求
# 这里我们使用一个简化的概念,实际项目推荐使用 AWS Lambda Web Adapter
# 它允许你运行几乎任何 web 框架
# 这里我们假定有一个包装器`grpc_lambda_adapter`

# 全局gRPC服务器实例,利用Lambda的执行上下文复用
server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
add_InferenceServiceServicer_to_server(InferenceServiceImpl(), server)
# 这里的 server.start() 等逻辑被适配器隐藏了

def handler(event, context):
    # 一个假想的适配器函数,将事件路由到 gRPC 服务器
    # 在真实世界中,Lambda Web Adapter 会监听一个端口,ALB将流量转发到该端口
    # Lambda Runtime Interface Client (RIC) 会将事件发送给 Adapter
    # 我们这里只是展示一个概念性的入口
    
    # 模拟从 event 中提取 gRPC 需要的信息
    # 实际过程比这复杂得多,涉及二进制数据的解码和 HTTP/2 帧的处理
    logger.info("Handler invoked. This part would be managed by Lambda Web Adapter.")
    # 因为我们无法直接运行gRPC服务器循环,所以此handler仅作示意
    # 真正的魔法发生在容器的 ENTRYPOINT 和 Adapter 的配置上
    return {
        'statusCode': 501,
        'body': 'This handler is a placeholder. The real gRPC server runs via Lambda Web Adapter.'
    }

Dockerfile

FROM public.ecr.aws/lambda/python:3.9

# 安装 gRPC 工具和依赖
RUN pip install grpcio grpcio-tools

# 拷贝 proto 文件并生成代码
WORKDIR /app
COPY protos/ /app/protos/
RUN python -m grpc_tools.protoc -I./protos --python_out=. --grpc_python_out=. ./protos/inference.proto

# 拷贝服务端代码
COPY src/ /app/

# 拷贝模型文件 (在真实CI/CD中,这应该从S3下载)
COPY model/ /app/model/

# 设置环境变量
ENV MODEL_PATH="/app/model/model.bin"

# 设置 Lambda 的启动命令
# AWS Lambda Web Adapter 会启动我们的应用
# 这里我们仅设置CMD为我们传统的 handler
# 但在使用Adapter时,CMD会是你的gRPC服务器启动脚本
CMD [ "server.handler" ]

一个关键点是:为了让 gRPC 服务器在 Lambda 中运行,最佳实践是使用 AWS Lambda Web Adapter。它作为一个 sidecar 运行,将 Lambda 调用事件转换为 HTTP 请求,并转发到你容器中监听指定端口(如 8080)的 gRPC 服务器进程。这样,你的 server.py 就可以像一个正常的 gRPC 服务器一样启动,而无需关心 Lambda 的 handler 函数。Dockerfile 的 CMD 也会变成 python server.py

3. 基础设施即代码 (Pulumi)

这是将所有部分粘合起来的核心。我们使用 Pulumi 的 Python SDK 来定义云资源。

# __main__.py
import pulumi
import pulumi_aws as aws
import pulumi_docker as docker

# --- 配置 ---
config = pulumi.Config()
app_name = "grpc-ml-inference"
container_port = 8080 # gRPC 服务器监听的端口

# --- 网络基础设施 ---
# 一个常见的错误是忽略网络配置的复杂性。ALB需要VPC。
vpc = aws.ec2.get_vpc(default=True)
subnets = aws.ec2.get_subnets(filters=[aws.ec2.GetSubnetsFilterArgs(
    name="vpc-id",
    values=[vpc.id],
)])

# --- ECR 镜像仓库 ---
ecr_repo = aws.ecr.Repository(f"{app_name}-repo")

# 构建并推送 Docker 镜像到 ECR
# Pulumi 的 Docker provider 简化了 CI/CD 流程
image = docker.Image(f"{app_name}-image",
    build=docker.DockerBuildArgs(
        context=".", # Dockerfile 所在目录
    ),
    image_name=ecr_repo.repository_url,
    registry=docker.ImageRegistryArgs(
        server=ecr_repo.repository_url,
        # 授权 Pulumi 推送镜像
        # AWS CLI 必须已配置好凭证
    )
)

# --- IAM 角色与权限 ---
# Lambda 执行所需的最小权限角色
lambda_role = aws.iam.Role(f"{app_name}-lambda-role",
    assume_role_policy="""{
        "Version": "2012-10-17",
        "Statement": [{
            "Action": "sts:AssumeRole",
            "Principal": { "Service": "lambda.amazonaws.com" },
            "Effect": "Allow",
            "Sid": ""
        }]
    }"""
)

# 附加 Lambda 执行和 VPC 访问策略
aws.iam.RolePolicyAttachment(f"{app_name}-lambda-exec-policy",
    role=lambda_role.name,
    policy_arn="arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole"
)

# --- Lambda 函数 ---
# 使用容器镜像创建 Lambda 函数
lambda_function = aws.lambda_.Function(f"{app_name}-function",
    package_type="Image",
    image_uri=image.image_name,
    role=lambda_role.arn,
    timeout=60, # 推理可能需要更长时间
    memory_size=1024, # 根据模型大小调整
    vpc_config=aws.lambda_.FunctionVpcConfigArgs(
        subnet_ids=subnets.ids,
        security_group_ids=[], # 最好创建一个专用的安全组
    )
    # 使用 Web Adapter 时,需要覆盖 CMD 并配置环境变量
)

# --- ALB 和目标组 ---
# 安全组,允许来自任何地方的 HTTP/HTTPS 流量
alb_sg = aws.ec2.SecurityGroup(f"{app_name}-alb-sg",
    vpc_id=vpc.id,
    ingress=[
        aws.ec2.SecurityGroupIngressArgs(protocol="tcp", from_port=80, to_port=80, cidr_blocks=["0.0.0.0/0"]),
    ],
    egress=[
        aws.ec2.SecurityGroupEgressArgs(protocol="-1", from_port=0, to_port=0, cidr_blocks=["0.0.0.0/0"]),
    ]
)

# 创建 ALB
alb = aws.lb.LoadBalancer(f"{app_name}-alb",
    internal=False,
    load_balancer_type="application",
    security_groups=[alb_sg.id],
    subnets=subnets.ids,
)

# 创建指向 Lambda 的目标组
target_group = aws.lb.TargetGroup(f"{app_name}-tg",
    target_type="lambda",
    protocol="HTTP", # ALB 与 Lambda 之间总是 HTTP
    port=80,
    vpc_id=vpc.id,
)

# 授予 ALB 调用 Lambda 的权限
permission = aws.lambda_.Permission(f"{app_name}-lambda-permission",
    action="lambda:InvokeFunction",
    function=lambda_function.name,
    principal="elasticloadbalancing.amazonaws.com",
    source_arn=target_group.arn
)

# 将 Lambda 函数注册到目标组
attachment = aws.lb.TargetGroupAttachment(f"{app_name}-tg-attachment",
    target_group_arn=target_group.arn,
    target_id=lambda_function.arn,
    opts=pulumi.ResourceOptions(depends_on=[permission]),
)

# 创建 ALB 监听器和规则
# 这里的关键是 `conditions` 中的 `host_header` 可以用来路由 gRPC 服务
# 或者直接将所有流量转发
listener = aws.lb.Listener(f"{app_name}-listener",
    load_balancer_arn=alb.arn,
    port=80,
    default_actions=[aws.lb.ListenerDefaultActionArgs(
        type="forward",
        target_group_arn=target_group.arn,
    )],
    # 在生产环境中,应该使用 443 端口和 ACM 证书
)

# --- 输出 ---
pulumi.export("alb_dns_name", alb.dns_name)

这段 Pulumi 代码定义了从网络到应用部署的每一个环节。它解决了手动配置易出错、难以复现的问题。一个 pulumi up 命令就可以部署或更新整个服务。

4. 实现客户端调用

客户端代码现在需要使用 gRPC stub 来调用部署在 ALB 上的服务。

# client_grpc.py
import grpc
import time

# 导入生成的代码
from inference_pb2 import InferenceRequest
from inference_pb2_grpc import InferenceServiceStub

# ALB 的 DNS 名称,从 Pulumi 输出中获取
ALB_ENDPOINT = "grpc-ml-inference-alb-xxxxxxxx.us-east-1.elb.amazonaws.com:80"

def predict_grpc(input_data: bytes):
    # 创建一个不安全的通道(生产环境应使用 TLS)
    with grpc.insecure_channel(ALB_ENDPOINT) as channel:
        stub = InferenceServiceStub(channel)
        
        request = InferenceRequest(
            model_name="image-classifier-v1",
            input_tensor=input_data
        )

        try:
            start_time = time.time()
            response = stub.Predict(request, timeout=10)
            end_time = time.time()
            
            print(f"gRPC request took: {end_time - start_time:.4f} seconds")
            print(f"Response status: {response.status_code}, output tensor length: {len(response.output_tensor)}")
            
        except grpc.RpcError as e:
            # 这里的错误处理至关重要
            # e.g., grpc.StatusCode.UNAVAILABLE 可能意味着 Lambda 冷启动超时或 ALB 5xx 错误
            print(f"An gRPC error occurred: {e.code()} - {e.details()}")

if __name__ == "__main__":
    # 使用一些模拟的二进制数据
    dummy_tensor = b'\x01\x02\x03' * 1024 # 3KB
    predict_grpc(dummy_tensor)

架构的扩展性与局限性

这个架构并非银弹,它有明确的适用边界。

  • 局限性 - 冷启动: 这是 Serverless 固有的问题。gRPC 方案由于容器镜像更大,可能加剧该问题。唯一的有效解法是使用 Provisioned Concurrency,但这会带来持续的成本,违背了 Serverless “按需付费”的初衷。在真实项目中,需要精确计算业务能容忍的延迟和愿意付出的成本之间的平衡点。
  • 局限性 - 成本陷阱: ALB 按小时计费,且数据处理费用也不菲。如果你的推理服务流量非常稀疏,例如每天只有几次调用,那么这个架构的固定成本会远高于 API Gateway。它只适用于持续有一定流量负载的场景。
  • 局限性 - 连接模型: gRPC 的优势之一是长连接。但在 Lambda 架构下,这个优势被削弱了。每次调用,ALB 都会触发一次新的 Lambda 调用,不存在真正意义上的长连接复用。它本质上是利用了 gRPC 的协议效率,而非其连接模型。对于需要服务端推送或长时间双向流的场景,这个架构可能不适用,需要考虑 ECS Fargate 或 EKS。
  • 扩展性与未来路径:
    • 集成 MLOps 流水线: Pulumi 的优势在于可以用编程逻辑定义基础设施。可以轻易地将此部署脚本集成到 CI/CD 中,当模型仓库(S3)有新模型时,自动触发 Pulumi 更新,构建新镜像并部署新的 Lambda 版本,实现模型的自动化上线。
    • 金丝雀部署: ALB 和 Lambda 的版本与别名功能结合,可以实现精细的流量切换,支持金丝雀发布。Pulumi 可以对这些部署策略进行编程,例如,定义一个 CanaryDeployment 组件,自动管理 10% 流量到新版本,监控 CloudWatch 指标,然后逐步全量。
    • Lambda Web Adapter 的标准化: 随着 AWS Lambda Web Adapter 的成熟,在 Lambda 中运行任意 web 服务(包括 gRPC)的模式会变得更加标准化,这将进一步降低该架构的实现门槛。

  目录