基于Quarkus与Event Sourcing构建支持时态查询的图谱化IAM核心


一、 定义困境:超越“现在”的权限质询

在构建一个复杂的、多租户的身份与访问管理(IAM)系统时,核心挑战通常不仅仅是回答“用户A当前是否能访问资源B?”。在金融、医疗或任何需要严格审计的领域,问题会迅速演变成:“用户A在上周二下午3点15分时,拥有哪些权限?”,“是什么操作导致了管理员B获得了对生产环境数据库的临时访问权?”,以及“重现24小时前整个租户C的权限拓扑结构”。

这些“时态查询”(Temporal Query)的需求,暴露了传统基于状态存储的IAM设计的根本性缺陷。

二、 方案A:关系型数据库与审计日志的挣扎

一个常规的实现方案是采用关系型数据库。其模型通常如下:

CREATE TABLE users (id UUID PRIMARY KEY, name VARCHAR(255));
CREATE TABLE roles (id UUID PRIMARY KEY, name VARCHAR(255));
CREATE TABLE permissions (id UUID PRIMARY KEY, action VARCHAR(100), resource VARCHAR(255));
CREATE TABLE user_roles (user_id UUID, role_id UUID); -- 用户与角色的多对多关系
CREATE TABLE role_permissions (role_id UUID, permission_id UUID); -- 角色与权限的多对多关系

-- 再加上一个巨大的审计日志表
CREATE TABLE audit_log (
    id BIGSERIAL PRIMARY KEY,
    timestamp TIMESTAMPTZ NOT NULL,
    actor_id UUID,
    operation VARCHAR(50), -- e.g., 'GRANT_ROLE', 'REVOKE_PERMISSION'
    target_entity VARCHAR(50),
    target_id UUID,
    details JSONB
);

优势分析:

  • 成熟稳定:关系型数据库技术栈成熟,事务性(ACID)得到完全保障。
  • 查询直观:对于“当前”权限的查询,通过JOIN操作即可完成,易于理解。

劣势分析:

  1. 时态查询的无力感:要回答“用户A在过去某个时间点的权限”,几乎是不可能完成的任务。你无法信任审计日志能完美地用于状态重放。审计日志记录的是“意图”,而数据库的当前状态是“结果”。两者之间可能存在数据不一致、失败的事务或手动数据修正。唯一的办法是定期对整个权限关系图进行全量快照,这会带来巨大的存储开销和管理复杂性。
  2. 复杂关系查询的性能瓶颈:在深度嵌套的组织架构或资源继承体系中(例如,用户属于部门,部门属于分公司,分公司拥有某个文件夹的权限,该权限被子文件夹继承),查询一个用户的最终有效权限集需要执行多次、甚至递归的SQL查询。这在关系数据库中是性能重灾区。
  3. 状态与历史的分离audit_log表与实际的权限关系表是物理分离的。这种分离本身就是一种风险。业务逻辑的任何一个微小bug都可能导致状态被修改而审计日志未能正确记录,从而破坏了系统的可审计性。

这种方案在简单的场景下尚可应付,但在需要强审计和处理复杂层级关系的场景下,其维护成本和系统风险会呈指数级增长。

三、 方案B:事件溯源与图数据库的融合架构

我们选择了一条截然不同的路径:将“事件”作为系统唯一的事实来源(Single Source of Truth),并使用图数据库作为针对复杂关系进行优化的“读模型”。

graph TD
    subgraph "命令侧 (Write Side)"
        A[API Endpoint: Command] --> B{Command Handler}
        B --> C[Aggregate: Tenant/User]
        C -- Generates --> D[Event]
        D -- Persist --> E[(Event Store)]
    end

    subgraph "查询侧 (Read Side)"
        F[Projector] -- Subscribes --> E
        F -- Translates Event to Cypher --> G[(Graph Database - Neo4j)]
        H[API Endpoint: Query] --> I{Query Service}
        I -- Cypher Query --> G
    end

    style E fill:#f9f,stroke:#333,stroke-width:2px
    style G fill:#ccf,stroke:#333,stroke-width:2px

核心组件剖析:

  • **事件溯源 (Event Sourcing)**:我们不存储当前状态。相反,我们存储导致状态变化的每一个不可变的领域事件。例如,不是将user_roles表中的一行记录删除,而是记录一个RoleRevokedFromUserEvent事件。系统的当前状态是通过从头到尾重放这些事件计算得出的。
  • **图数据库 (Graph Database)**:权限模型天然是一种图。用户、角色、权限、资源都是图中的节点(Node),它们之间的关系(如HAS_ROLE, CAN_ACCESS)则是边(Edge)。图数据库对这种多对多、深度遍历的查询场景有无与伦比的性能优势。
  • **CQRS (命令查询职责分离)**:写操作(命令)被发送到处理业务逻辑并产生事件的聚合根。读操作(查询)则直接访问为特定查询场景优化的读模型(在这里是图数据库)。
  • Quarkus: 作为高性能的云原生Java框架,其快速启动、低内存占用和响应式编程模型非常适合构建事件驱动的微服务。

优势分析:

  1. 内置完整审计与时态查询:事件存储本身就是一份完美的、不可篡改的审计日志。要获取任何历史时刻的权限状态,我们只需重放该时间点之前的所有事件即可。这从根本上解决了方案A中状态与历史分离的问题。
  2. 卓越的复杂关系查询性能:查询“用户A是否能访问资源B?”变成了一个图遍历问题:“是否存在从节点(User A)到节点(Resource B)的路径?”。这类查询在图数据库中极其高效。
  3. 模型与业务一致:权限和组织结构在现实世界中就是一张关系网。使用图来建模,使得代码和数据库结构能更直观地反映业务领域,降低了沟通和维护成本。

四、 最终决策与核心实现

我们最终选择了方案B。它虽然引入了事件溯源和CQRS带来的认知复杂性,但其为我们核心的审计和复杂查询需求提供了根本性的解决方案。下面是基于Quarkus的核心实现概览。

4.1. 领域事件与命令定义

一切始于对领域行为的建模。我们使用Java的Record来定义不可变的命令和事件。

// src/main/java/com/example/iam/domain/model/Commands.java
package com.example.iam.domain.model;

import java.util.UUID;

public final class Commands {
    // 命令:意图执行一个操作
    public record CreateTenantCommand(UUID tenantId, String tenantName) {}
    public record DefineRoleCommand(UUID tenantId, UUID roleId, String roleName) {}
    public record GrantRoleToUserCommand(UUID tenantId, UUID userId, UUID roleId) {}
    public record RevokeRoleFromUserCommand(UUID tenantId, UUID userId, UUID roleId) {}
    public record AddPermissionToRoleCommand(UUID tenantId, UUID roleId, String permissionAction, String resourcePattern) {}
}

// src/main/java/com/example/iam/domain/model/Events.java
package com.example.iam.domain.model;

import java.time.Instant;
import java.util.UUID;

public sealed interface DomainEvent {
    UUID aggregateId();
    long sequence();
    Instant timestamp();

    // 事件:已经发生的事实
    record TenantCreatedEvent(UUID aggregateId, long sequence, Instant timestamp, String tenantName) implements DomainEvent {}
    record RoleDefinedEvent(UUID aggregateId, long sequence, Instant timestamp, UUID roleId, String roleName) implements DomainEvent {}
    record RoleGrantedToUserEvent(UUID aggregateId, long sequence, Instant timestamp, UUID userId, UUID roleId) implements DomainEvent {}
    record RoleRevokedFromUserEvent(UUID aggregateId, long sequence, Instant timestamp, UUID userId, UUID roleId) implements DomainEvent {}
    record PermissionAddedToRoleEvent(UUID aggregateId, long sequence, Instant timestamp, UUID roleId, String permissionAction, String resourcePattern) implements DomainEvent {}
}

4.2. 聚合根:业务规则的守护者

聚合根(Aggregate)是命令的处理者。它加载历史事件来重建自身状态,然后基于当前状态验证命令的有效性,如果有效,则生成新的事件。

// src/main/java/com/example/iam/domain/TenantAggregate.java
package com.example.iam.domain;

import com.example.iam.domain.model.Commands.*;
import com.example.iam.domain.model.Events.*;
import com.example.iam.domain.model.DomainEvent;

import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;

public class TenantAggregate {

    private UUID id;
    private String name;
    private long version = 0;
    private final Set<UUID> existingRoles = new HashSet<>();
    private final Set<String> userRolePairs = new HashSet<>(); // Format: "userId:roleId"

    private final List<DomainEvent> uncommittedChanges = new ArrayList<>();

    // Public method to handle commands
    public List<DomainEvent> handle(CreateTenantCommand cmd) {
        // Business rule: Tenant cannot be created if already exists
        if (this.id != null) {
            throw new IllegalStateException("Tenant " + this.id + " already exists.");
        }
        applyNewChange(new TenantCreatedEvent(cmd.tenantId(), 1, Instant.now(), cmd.tenantName()));
        return getUncommittedChanges();
    }
    
    public List<DomainEvent> handle(DefineRoleCommand cmd) {
        // Business rule: Role cannot be redefined
        if (existingRoles.contains(cmd.roleId())) {
            throw new IllegalArgumentException("Role " + cmd.roleId() + " already defined.");
        }
        applyNewChange(new RoleDefinedEvent(id, newVersion(), Instant.now(), cmd.roleId(), cmd.roleName()));
        return getUncommittedChanges();
    }
    
    public List<DomainEvent> handle(GrantRoleToUserCommand cmd) {
        if (!existingRoles.contains(cmd.roleId())) {
            throw new IllegalArgumentException("Role " + cmd.roleId() + " does not exist.");
        }
        String pair = cmd.userId().toString() + ":" + cmd.roleId().toString();
        if (userRolePairs.contains(pair)) {
            // Idempotency: granting an existing role is not an error, just do nothing.
            return List.of();
        }
        applyNewChange(new RoleGrantedToUserEvent(id, newVersion(), Instant.now(), cmd.userId(), cmd.roleId()));
        return getUncommittedChanges();
    }

    // ... other command handlers (revoke, add permission etc.)

    // State reconstruction from events
    public void apply(DomainEvent event) {
        if (event instanceof TenantCreatedEvent e) {
            this.id = e.aggregateId();
            this.name = e.tenantName();
        } else if (event instanceof RoleDefinedEvent e) {
            this.existingRoles.add(e.roleId());
        } else if (event instanceof RoleGrantedToUserEvent e) {
            this.userRolePairs.add(e.userId().toString() + ":" + e.roleId().toString());
        } else if (event instanceof RoleRevokedFromUserEvent e) {
            this.userRolePairs.remove(e.userId().toString() + ":" + e.roleId().toString());
        }
        // ... apply other events
        this.version = event.sequence();
    }
    
    public static TenantAggregate fromHistory(UUID id, List<DomainEvent> history) {
        TenantAggregate aggregate = new TenantAggregate();
        history.forEach(aggregate::apply);
        return aggregate;
    }

    private void applyNewChange(DomainEvent event) {
        apply(event);
        uncommittedChanges.add(event);
    }

    private long newVersion() {
        return this.version + 1;
    }
    
    public List<DomainEvent> getUncommittedChanges() {
        return List.copyOf(uncommittedChanges);
    }
}

4.3. 事件存储与命令处理器 (Quarkus Service)

命令处理器是协调者。它从事件存储中加载事件,重建聚合,调用聚合处理命令,最后将新生成的事件存回事件存储。

// src/main/java/com/example/iam/application/TenantCommandService.java
package com.example.iam.application;

import com.example.iam.domain.TenantAggregate;
import com.example.iam.domain.model.Commands.CreateTenantCommand;
import com.example.iam.domain.model.DomainEvent;
import com.example.iam.infra.EventStore; // Assume an EventStore interface exists

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.List;
import java.util.UUID;
import org.jboss.logging.Logger;

@ApplicationScoped
public class TenantCommandService {

    private static final Logger LOG = Logger.getLogger(TenantCommandService.class);

    @Inject
    EventStore eventStore;

    // A real implementation would have handlers for each command.
    public void handle(CreateTenantCommand command) {
        try {
            // For creation, the history is empty
            TenantAggregate aggregate = new TenantAggregate(); 
            List<DomainEvent> newEvents = aggregate.handle(command);
            
            if (!newEvents.isEmpty()) {
                eventStore.saveEvents(command.tenantId(), 0, newEvents);
                LOG.infof("Tenant %s created successfully.", command.tenantId());
            }
        } catch (Exception e) {
            LOG.errorf(e, "Error processing command %s", command);
            // Proper error handling and possibly compensation logic
            throw new RuntimeException("Command processing failed", e);
        }
    }
    
    // Example for an update command
    public void handle(GrantRoleToUserCommand command) {
        try {
            List<DomainEvent> history = eventStore.getEventsForAggregate(command.tenantId());
            if (history.isEmpty()) {
                throw new IllegalStateException("Tenant not found: " + command.tenantId());
            }

            TenantAggregate aggregate = TenantAggregate.fromHistory(command.tenantId(), history);
            List<DomainEvent> newEvents = aggregate.handle(command);
            
            if (!newEvents.isEmpty()) {
                long expectedVersion = history.get(history.size() - 1).sequence();
                eventStore.saveEvents(command.tenantId(), expectedVersion, newEvents);
                LOG.infof("Role %s granted to user %s in tenant %s", command.roleId(), command.userId(), command.tenantId());
            }
        } catch (Exception e) {
            LOG.errorf(e, "Error processing command %s", command);
            throw new RuntimeException("Command processing failed", e);
        }
    }
}
  • 注意EventStore的实现可以是基于PostgreSQL的JSONB字段,也可以是专用的事件存储数据库如EventStoreDB。关键在于它必须支持基于聚合ID和序列号的乐观锁,以防止并发写入冲突。

4.4. 投影器:将事件翻译成图

投影器(Projector)是连接写模型和读模型的桥梁。它订阅事件流,并将每个事件转换成对图数据库的操作(Cypher语句)。

// src/main/java/com/example/iam/infra/GraphProjector.java
package com.example.iam.infra;

import com.example.iam.domain.model.DomainEvent;
import com.example.iam.domain.model.Events.*;

import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Session;
import org.neo4j.driver.Values;
import org.jboss.logging.Logger;

@ApplicationScoped
public class GraphProjector {
    
    private static final Logger LOG = Logger.getLogger(GraphProjector.class);

    @Inject
    Driver neo4jDriver; // Quarkus Neo4j client

    @Inject
    EventBus eventBus; // A simple in-memory or Kafka-backed event bus

    void onStart(@Observes StartupEvent ev) {
        // In a real system, you'd subscribe to a durable message queue like Kafka.
        // For this example, we'll use a simple event bus.
        eventBus.subscribe(this::handleEvent);
        LOG.info("GraphProjector started and subscribed to the event bus.");
    }

    private void handleEvent(DomainEvent event) {
        try (Session session = neo4jDriver.session()) {
            session.executeWrite(tx -> {
                switch (event) {
                    case TenantCreatedEvent e -> tx.run(
                        "MERGE (t:Tenant {id: $id}) SET t.name = $name",
                        Values.parameters("id", e.aggregateId().toString(), "name", e.tenantName())
                    );
                    case RoleDefinedEvent e -> tx.run(
                        "MATCH (t:Tenant {id: $tenantId}) " +
                        "MERGE (r:Role {id: $roleId, tenantId: $tenantId}) SET r.name = $roleName " +
                        "MERGE (t)-[:DEFINES_ROLE]->(r)",
                        Values.parameters("tenantId", e.aggregateId().toString(), "roleId", e.roleId().toString(), "roleName", e.roleName())
                    );
                    case RoleGrantedToUserEvent e -> tx.run(
                        "MERGE (u:User {id: $userId, tenantId: $tenantId}) " +
                        "MATCH (r:Role {id: $roleId, tenantId: $tenantId}) " +
                        "MERGE (u)-[:HAS_ROLE]->(r)",
                        Values.parameters("userId", e.userId().toString(), "roleId", e.roleId().toString(), "tenantId", e.aggregateId().toString())
                    );
                    case RoleRevokedFromUserEvent e -> tx.run(
                        "MATCH (u:User {id: $userId, tenantId: $tenantId})-[rel:HAS_ROLE]->(r:Role {id: $roleId, tenantId: $tenantId}) " +
                        "DELETE rel",
                         Values.parameters("userId", e.userId().toString(), "roleId", e.roleId().toString(), "tenantId", e.aggregateId().toString())
                    );
                    case PermissionAddedToRoleEvent e -> tx.run(
                        "MATCH (r:Role {id: $roleId, tenantId: $tenantId}) " +
                        "MERGE (p:Permission {action: $action, resource: $resource, tenantId: $tenantId}) " +
                        "MERGE (r)-[:HAS_PERMISSION]->(p)",
                         Values.parameters("roleId", e.roleId().toString(), "tenantId", e.aggregateId().toString(), "action", e.permissionAction(), "resource", e.resourcePattern())
                    );
                    // default case to handle unhandled events
                    default -> LOG.warnf("Unhandled event type: %s", event.getClass().getSimpleName());
                }
                return null;
            });
        } catch (Exception e) {
            LOG.errorf(e, "Failed to project event: %s", event);
            // A real system needs a robust retry/dead-letter-queue mechanism here.
        }
    }
}

4.5. 查询服务:释放图的力量

现在,我们可以编写强大而简洁的查询来回答复杂的权限问题。

// src/main/java/com/example/iam/application/PermissionQueryService.java
package com.example.iam.application;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.neo4j.driver.Driver;
import org.neo4j.driver.Session;
import org.neo4j.driver.Values;

import java.util.UUID;

@ApplicationScoped
public class PermissionQueryService {

    @Inject
    Driver neo4jDriver;

    public boolean canAccess(UUID tenantId, UUID userId, String requiredAction, String resource) {
        // This is a powerful query. It checks if a user has a role, which in turn
        // has a permission that matches the required action and resource.
        // The power of graph is that this query remains efficient even with many
        // intermediate layers (e.g., user -> group -> role -> permission).
        String cypherQuery = """
            MATCH (u:User {id: $userId, tenantId: $tenantId})
            // Find all paths from user to a permission
            // The *1..5 means it will traverse up to 5 relationships deep
            // (e.g., User->Group->Role->InheritedRole->Permission)
            MATCH p = (u)-[:HAS_ROLE*1..5]->(role:Role)-[:HAS_PERMISSION]->(perm:Permission)
            WHERE perm.action = $action AND $resource =~ perm.resource
            // If any such path exists, return true
            RETURN count(p) > 0 AS hasPermission
        """;
        
        try (Session session = neo4jDriver.session()) {
            return session.readTransaction(tx -> 
                tx.run(cypherQuery, Values.parameters(
                        "userId", userId.toString(),
                        "tenantId", tenantId.toString(),
                        "action", requiredAction,
                        "resource", resource))
                  .single()
                  .get("hasPermission")
                  .asBoolean()
            );
        }
    }
}

$resource =~ perm.resource 这部分展示了图数据库的灵活性,我们可以将perm.resource存储为正则表达式,如/api/v1/tenants/.*/documents/.*,从而实现基于模式的资源匹配。

五、 架构的扩展性与局限性

此架构并非银弹。一个显而易见的代价是最终一致性。从一个事件被持久化到它被投影器处理并反映在图数据库中,存在一个(通常是毫秒级的)延迟。对于大多数IAM场景,这种延迟是可以接受的,但在某些极端情况下(例如,刚刚撤销了一个用户的权限,而用户立即尝试访问),可能会出现问题。这可以通过一些策略缓解,比如在执行关键命令后让API端点短暂等待或直接查询聚合状态,但这会破坏CQRS的纯粹性。

另一个挑战是**事件模型的演进 (Schema Evolution)**。一旦事件被写入,它们就是不可变的。如果将来需要为RoleDefinedEvent添加一个description字段,就需要处理版本控制。通常的策略是“向后兼容”地添加新字段,并让新版本的代码能够处理旧版本的事件。

然而,这个架构带来的最大收益——完整的、可查询的历史状态——是传统方法难以企及的。实现一个“时间旅行”API,重现任何时间点的权限图谱,在技术上是可行的:只需从事件存储中读取截止到特定时间点的所有事件,并将它们投影到一个临时的、内存中的图模型或一个单独的数据库实例中,即可进行历史状态的分析与审计。


  目录