基于自研Raft库构建一个Neo4j模式的分布式图状态机


最近一个项目遇到了一个棘手的状态同步问题。我们需要一个高可用的内存数据结构,但它并非简单的键值对。其内部状态是一个复杂的、相互关联的实体网络——本质上是一个图。常规的分布式 K/V 存储,如 etcd 或 Consul,其模型无法直接满足需求,而引入一个成熟的分布式图数据库如 Neo4j Causal Cluster 或 Dgraph 又显得过于重型,我们需要的仅仅是其数据模型和事务性操作的子集,并且希望对底层的一致性协议有更强的掌控力。

这个场景的本质是状态机复制。图本身是状态机(State Machine),而 CREATE (n:User {name: 'Alice'}) 这样的操作则是改变状态的指令(Command)。要保证高可用,就需要将这些指令序列(Log)可靠地复制到多个节点上。Raft 协议是解决这类问题的标准答案。

市面上不乏成熟的 Raft 库,比如 HashiCorp 的 hashicorp/raft 或 etcd 的 etcd/raft。但为了彻底吃透协议的每个细节,并为后续可能的深度定制(例如,针对图的特殊快照逻辑)铺平道路,我们决定自己动手实现一个精简的 Raft 核心库,并在此之上构建一个通用的分布式状态机框架,最终将我们的图数据模型作为第一个实例化的状态机。这个过程不仅是造轮子,更是对分布式系统第一性原理的深度复盘。

第一步:Raft 核心协议的实现

任何 Raft 实现都始于三个核心角色(Follower, Candidate, Leader)和两种关键的 RPC(RequestVote, AppendEntries)。我们的目标不是实现 Raft 论文中的全部功能(比如成员变更),而是先搭建一个能稳定完成领导者选举和日志复制的骨架。

stateDiagram-v2
    [*] --> Follower

    Follower --> Candidate: election timeout
    Candidate --> Candidate: election timeout
    Candidate --> Follower: discovers leader or new term
    Candidate --> Leader: receives majority votes

    Leader --> Follower: discovers higher term
    state Leader {
        [*] --> Broadcasting
        Broadcasting --> Broadcasting: sends heartbeats
    }

我们的 Raft 节点需要维护一些持久化状态(currentTerm, votedFor, log)和易失性状态。在真实项目中,持久化状态需要写入磁盘,这里为了简化,我们暂时保留在内存中。

// raft_node.go

package raft

import (
	"context"
	"log"
	"math/rand"
	"sync"
	"time"
)

type State string

const (
	Follower  State = "Follower"
	Candidate State = "Candidate"
	Leader    State = "Leader"
)

// LogEntry 代表 Raft 日志中的一个条目
type LogEntry struct {
	Term    int
	Command []byte // 将要应用到状态机的命令
}

// Node 是 Raft 集群中的一个节点
type Node struct {
	mu sync.Mutex

	// 节点标识和集群信息
	id    int
	peers map[int]string // peerId -> address

	// 持久化状态 (为了演示,存储在内存中)
	currentTerm int
	votedFor    int
	log         []LogEntry

	// 易失性状态
	state       State
	commitIndex int
	lastApplied int

	// Leader 的易失性状态
	nextIndex  map[int]int
	matchIndex map[int]int

	// 定时器
	electionTimer  *time.Timer
	heartbeatTimer *time.Timer

	// 通信通道
	rpcCh chan RPC

	// 状态机应用接口
	stateMachine StateMachine
}

// StateMachine 是应用层状态机的抽象接口
type StateMachine interface {
	Apply(command []byte) ([]byte, error)
}

// RPC 封装了网络请求
type RPC struct {
	Request  interface{}
	Response chan<- interface{}
}

// RequestVoteArgs 是 RequestVote RPC 的参数
type RequestVoteArgs struct {
	Term         int
	CandidateID  int
	LastLogIndex int
	LastLogTerm  int
}

// RequestVoteReply 是 RequestVote RPC 的返回值
type RequestVoteReply struct {
	Term        int
	VoteGranted bool
}

// AppendEntriesArgs 是 AppendEntries RPC 的参数
type AppendEntriesArgs struct {
	Term         int
	LeaderID     int
	PrevLogIndex int
	PrevLogTerm  int
	Entries      []LogEntry
	LeaderCommit int
}

// AppendEntriesReply 是 AppendEntries RPC 的返回值
type AppendEntriesReply struct {
	Term    int
	Success bool
}

func NewNode(id int, peers map[int]string, sm StateMachine) *Node {
	node := &Node{
		id:           id,
		peers:        peers,
		state:        Follower,
		votedFor:     -1, // -1 表示没有投票给任何人
		log:          make([]LogEntry, 1), // log[0] is a dummy entry
		rpcCh:        make(chan RPC),
		stateMachine: sm,
	}
	node.resetElectionTimer()
	return node
}

// electionTimeout 返回一个随机的选举超时时间
func (n *Node) electionTimeout() time.Duration {
	// 在 150ms 到 300ms 之间选择一个随机值
	return time.Duration(150+rand.Intn(150)) * time.Millisecond
}

// resetElectionTimer 重置选举定时器
func (n *Node) resetElectionTimer() {
	if n.electionTimer != nil {
		n.electionTimer.Stop()
	}
	n.electionTimer = time.AfterFunc(n.electionTimeout(), func() {
		n.startElection()
	})
}

// 主循环,处理事件
func (n *Node) Run(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case rpc := <-n.rpcCh:
			// 处理 RPC 请求
			n.handleRPC(rpc)
		}
	}
}

func (n *Node) handleRPC(rpc RPC) {
	n.mu.Lock()
	defer n.mu.Unlock()

	switch req := rpc.Request.(type) {
	case *RequestVoteArgs:
		reply := n.handleRequestVote(req)
		rpc.Response <- reply
	case *AppendEntriesArgs:
		reply := n.handleAppendEntries(req)
		rpc.Response <- reply
	}
}

// ... handleRequestVote 和 handleAppendEntries 的具体实现 ...

这里的核心是 Run 函数中的事件循环。所有外部交互,无论是 RPC 请求还是定时器触发,都应该被封装成事件,在这个单 goroutine 循环中处理。这样做的好处是极大地简化了并发控制,Node 结构体中的所有字段都由这个 goroutine 独占,只需要在 handleRPC 的入口和出口处加锁即可,避免了复杂的细粒度锁。

领导者选举的逻辑是关键。当一个 Follower 的选举定时器超时,它会转变为 Candidate,增加自己的 currentTerm,给自己投票,然后向所有其他节点发送 RequestVote RPC。

// raft_election.go

func (n *Node) startElection() {
	n.mu.Lock()
	n.state = Candidate
	n.currentTerm++
	n.votedFor = n.id
	log.Printf("[Node %d] became Candidate for term %d", n.id, n.currentTerm)
	n.resetElectionTimer()
	n.mu.Unlock()

	votes := 1
	var wg sync.WaitGroup

	for peerId := range n.peers {
		if peerId == n.id {
			continue
		}

		wg.Add(1)
		go func(peerId int) {
			defer wg.Done()
			
			n.mu.Lock()
			args := &RequestVoteArgs{
				Term:         n.currentTerm,
				CandidateID:  n.id,
				LastLogIndex: len(n.log) - 1,
				LastLogTerm:  n.log[len(n.log)-1].Term,
			}
			n.mu.Unlock()

			// 这是一个简化的 RPC 调用,真实项目中会是网络调用
			// reply := n.sendRequestVote(peerId, args) 
			// 这里我们假设有一个 sendRequestVote 的实现
			
			// 模拟 RPC 返回
			// if reply != nil && reply.VoteGranted {
			// 	n.mu.Lock()
			// 	votes++
			// 	n.mu.Unlock()
			// }
		}(peerId)
	}
	
	// 这里需要等待并检查 `votes` 是否超过半数
	// 如果是,则成为 Leader
}

func (n *Node) handleRequestVote(args *RequestVoteArgs) *RequestVoteReply {
	reply := &RequestVoteReply{
		Term:        n.currentTerm,
		VoteGranted: false,
	}

	if args.Term < n.currentTerm {
		return reply
	}

	if args.Term > n.currentTerm {
		n.becomeFollower(args.Term, -1)
	}

	// 投票约束:
	// 1. 尚未在本任期投票
	// 2. 候选人的日志至少和自己一样新
	isLogUpToDate := args.LastLogTerm > n.log[len(n.log)-1].Term ||
		(args.LastLogTerm == n.log[len(n.log)-1].Term && args.LastLogIndex >= len(n.log)-1)

	if (n.votedFor == -1 || n.votedFor == args.CandidateID) && isLogUpToDate {
		n.votedFor = args.CandidateID
		reply.VoteGranted = true
		n.resetElectionTimer()
		log.Printf("[Node %d] granted vote to Node %d for term %d", n.id, args.CandidateID, n.currentTerm)
	}

	return reply
}

func (n *Node) becomeFollower(term int, leaderId int) {
	n.state = Follower
	n.currentTerm = term
	n.votedFor = -1 // 新任期,重置投票
	log.Printf("[Node %d] became Follower at term %d", n.id, n.currentTerm)
	n.resetElectionTimer()
}

日志复制逻辑 (handleAppendEntries) 是 Raft 最复杂的部分。Leader 通过心跳(空的 AppendEntries)维持权威,并通过携带日志条目的 RPC 来同步状态。这里的坑在于,必须正确处理日志不一致的情况。当 Follower 发现 PrevLogIndex 处的日志条目与 Leader 发来的 PrevLogTerm 不匹配时,必须拒绝该 RPC,并告知 Leader 自己当前的日志情况,以便 Leader 下次能发送正确的日志。

第二步:抽象状态机框架

Raft 本身只关心日志的一致性,它不关心日志里是什么。为了让我们的 Raft 库变得通用,必须定义一个清晰的边界。这个边界就是 StateMachine 接口。

// raft_node.go

// StateMachine 是应用层状态机的抽象接口
// Apply 方法负责解析命令字节流,执行它,并返回结果
type StateMachine interface {
	Apply(command []byte) ([]byte, error)
}

Raft 节点的核心逻辑中,当一个日志条目被确认提交(即被集群中大多数节点复制)后,就需要将其应用到状态机。

// raft_node.go in Node struct
// ...
// lastApplied int // 跟踪已应用到状态机的最高日志索引
// ...

func (n *Node) applyCommittedEntries() {
	n.mu.Lock()
	defer n.mu.Unlock()

	for n.commitIndex > n.lastApplied {
		n.lastApplied++
		entry := n.log[n.lastApplied]
		log.Printf("[Node %d] applying command from log index %d", n.id, n.lastApplied)
		
		// 这里的 stateMachine 就是我们注入的具体实现
		_, err := n.stateMachine.Apply(entry.Command)
		if err != nil {
			// 在真实项目中,这里需要有健壮的错误处理机制
			log.Fatalf("[Node %d] failed to apply command: %v", n.id, err)
		}
	}
}

这个 applyCommittedEntries 函数应该在一个独立的 goroutine 中被周期性调用,或者在 commitIndex 更新后被触发。它将 Raft 核心与业务逻辑彻底解耦。现在,我们可以实现任何满足 StateMachine 接口的业务逻辑,而无需改动 Raft 库的一行代码。

第三步:实现 Neo4j 模式的图状态机

现在,我们来构建那个激发了这一切的图状态机。我们将模拟 Neo4j 的基本模型:带标签和属性的节点,以及带类型和属性的有向关系。

// graph_sm.go

package main

import (
	"encoding/json"
	"fmt"
	"sync"
)

// Node represents a node in the graph
type Node struct {
	ID         int64
	Labels     []string
	Properties map[string]interface{}
}

// Relationship represents a relationship in the graph
type Relationship struct {
	ID         int64
	StartNode  int64
	EndNode    int64
	Type       string
	Properties map[string]interface{}
}

// GraphStateMachine implements the raft.StateMachine interface
type GraphStateMachine struct {
	mu           sync.RWMutex
	nodes        map[int64]*Node
	relationships map[int64]*Relationship
	nodeCounter  int64
	relCounter   int64
}

func NewGraphStateMachine() *GraphStateMachine {
	return &GraphStateMachine{
		nodes:        make(map[int64]*Node),
		relationships: make(map[int64]*Relationship),
	}
}

// CommandType defines the type of graph operation
type CommandType string

const (
	CreateNodeCmd CommandType = "CREATE_NODE"
	CreateRelCmd  CommandType = "CREATE_REL"
)

// GraphCommand is the structure for commands to be applied
type GraphCommand struct {
	Type    CommandType     `json:"type"`
	Payload json.RawMessage `json:"payload"`
}

type CreateNodePayload struct {
	Labels     []string               `json:"labels"`
	Properties map[string]interface{} `json:"properties"`
}

type CreateRelationshipPayload struct {
	StartNodeID int64                  `json:"startNodeId"`
	EndNodeID   int64                  `json:"endNodeId"`
	Type        string                 `json:"type"`
	Properties  map[string]interface{} `json:"properties"`
}


// Apply applies a command to the graph state machine.
// 这里的关键是:Apply 方法必须是确定性的。对于相同的输入 command,
// 在任何节点上执行都必须产生完全相同的状态变更和输出。
func (g *GraphStateMachine) Apply(command []byte) ([]byte, error) {
	g.mu.Lock()
	defer g.mu.Unlock()

	var cmd GraphCommand
	if err := json.Unmarshal(command, &cmd); err != nil {
		return nil, fmt.Errorf("failed to unmarshal command: %w", err)
	}

	switch cmd.Type {
	case CreateNodeCmd:
		var p CreateNodePayload
		if err := json.Unmarshal(cmd.Payload, &p); err != nil {
			return nil, err
		}
		g.nodeCounter++
		newNode := &Node{
			ID:         g.nodeCounter,
			Labels:     p.Labels,
			Properties: p.Properties,
		}
		g.nodes[newNode.ID] = newNode
		log.Printf("[GraphSM] Created node %d with labels %v", newNode.ID, newNode.Labels)
		return json.Marshal(newNode)

	case CreateRelCmd:
		var p CreateRelationshipPayload
		if err := json.Unmarshal(cmd.Payload, &p); err != nil {
			return nil, err
		}
		if _, ok := g.nodes[p.StartNodeID]; !ok {
			return nil, fmt.Errorf("start node %d not found", p.StartNodeID)
		}
		if _, ok := g.nodes[p.EndNodeID]; !ok {
			return nil, fmt.Errorf("end node %d not found", p.EndNodeID)
		}
		
		g.relCounter++
		newRel := &Relationship{
			ID:         g.relCounter,
			StartNode:  p.StartNodeID,
			EndNode:    p.EndNodeID,
			Type:       p.Type,
			Properties: p.Properties,
		}
		g.relationships[newRel.ID] = newRel
		log.Printf("[GraphSM] Created relationship %d (%d)-[%s]->(%d)", newRel.ID, newRel.StartNode, newRel.Type, newRel.EndNode)
		return json.Marshal(newRel)

	default:
		return nil, fmt.Errorf("unknown command type: %s", cmd.Type)
	}
}

// GetNode is a read-only operation.
// 在真实系统中,需要处理读请求的一致性问题。
// 直接读本地状态机可能读到旧数据 (stale read)。
// 强一致性读需要通过 Raft 协议或与 Leader 确认。
func (g *GraphStateMachine) GetNode(id int64) (*Node, bool) {
	g.mu.RLock()
	defer g.mu.RUnlock()
	node, ok := g.nodes[id]
	return node, ok
}

GraphStateMachineApply 方法是整个系统的业务核心。它解析序列化后的命令,并对内存中的图结构进行修改。一个常见的错误是,在 Apply 方法中包含任何不确定性的操作,比如使用本地时间 time.Now() 或随机数。这会破坏 Raft 的基本假设,导致不同副本的状态发生偏离。在我们的实现中,节点和关系的 ID 是通过一个简单的计数器 nodeCounter 生成的,由于所有节点都以完全相同的顺序应用相同的日志条目,这个计数器的值在所有副本上都会保持一致,因此 ID 的分配是确定性的。

第四步:组装与运行

现在,我们把所有部分串联起来。我们需要一个服务层来暴露 API,接收客户端请求,并将其提交给 Raft 集群。

sequenceDiagram
    participant Client
    participant APIServer (Leader)
    participant RaftNode (Leader)
    participant GraphSM (Leader)
    participant RaftNode (Follower)
    participant GraphSM (Follower)

    Client->>APIServer (Leader): POST /command (e.g., CREATE_NODE)
    APIServer (Leader)->>RaftNode (Leader): Propose(command)
    RaftNode (Leader)->>RaftNode (Follower): AppendEntries(command)
    Note over RaftNode (Leader), RaftNode (Follower): Log Replication & Commit
    RaftNode (Follower)-->>RaftNode (Leader): AppendEntriesReply(Success)
    RaftNode (Leader)->>GraphSM (Leader): Apply(command)
    RaftNode (Follower)->>GraphSM (Follower): Apply(command)
    GraphSM (Leader)-->>RaftNode (Leader): Apply Result
    RaftNode (Leader)-->>APIServer (Leader): Proposal Result
    APIServer (Leader)-->>Client: 200 OK (Node Created)

客户端的写请求必须发送给 Leader。Leader 节点接收到命令后,将其序列化,并通过 Raft 的 Propose 方法提交。这个方法会将命令追加到自己的日志中,然后通过 AppendEntries RPC 复制给所有 Follower。一旦命令被大多数节点确认,Raft 模块就会将该命令应用到本地的状态机,并通知 API 服务层请求处理完成。

// main.go

func main() {
    // 简化版集群配置
	peers := map[int]string{
		1: "localhost:8001",
		2: "localhost:8002",
		3: "localhost:8003",
	}
	nodeId := 1 // 假设我们是节点1

	// 1. 创建状态机实例
	graphSM := NewGraphStateMachine()

	// 2. 创建并运行 Raft 节点
	raftNode := raft.NewNode(nodeId, peers, graphSM)
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go raftNode.Run(ctx)
	
	// 3. 启动一个简单的 HTTP 服务器来接收命令
	http.HandleFunc("/command", func(w http.ResponseWriter, r *http.Request) {
		// 在真实项目中,需要检查当前节点是否是 Leader
		// if raftNode.State() != raft.Leader {
		// 	http.Error(w, "Not a leader", http.StatusServiceUnavailable)
		// 	return
		// }
		
		body, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Failed to read body", http.StatusBadRequest)
			return
		}

        // 假设 body 是我们定义好的 GraphCommand JSON
        // resultChan := raftNode.Propose(body)
        // select {
        // case result := <- resultChan:
        //      // 处理应用结果
        //      w.WriteHeader(http.StatusOK)
        //      w.Write(result)
        // case <-time.After(5 * time.Second):
        //      http.Error(w, "Proposal timed out", http.StatusRequestTimeout)
        // }
	})

	log.Printf("Starting API server for node %d on port %d", nodeId, 9000+nodeId)
	http.ListenAndServe(fmt.Sprintf(":%d", 9000+nodeId), nil)
}

这个 main 函数展示了如何将 GraphStateMachine 注入 RaftNode,并启动 Raft 的主循环。一个完整的系统还需要实现节点间的 RPC 通信(例如使用 gRPC),以及一个健壮的 Propose 机制,它应该返回一个 channel,当命令被成功应用后,通过这个 channel 返回结果。

局限与展望

我们从零构建了一个基于 Raft 的分布式图状态机框架。这个过程虽然复杂,但对一致性协议和状态机复制模型的理解是无价的。然而,当前这个实现距离生产可用还有很长的路要走。

首先,日志持久化与快照是缺失的。目前日志完全在内存中,节点重启后状态会丢失。一个生产级实现必须将 Raft 日志和状态机快照持久化到磁盘。当日志增长到一定大小时,必须触发快照,将当前状态机的完整状态(整个图)序列化到磁盘,然后才能安全地丢弃旧的日志条目。为图结构设计高效的快照和恢复机制本身就是一个不小的挑战。

其次,客户端交互逻辑过于简单。客户端需要知道谁是 Leader,并在 Leader 变更时自动重试或重定向。这通常通过在客户端维护集群成员列表,并在请求失败时轮询不同的节点来实现。

最后,成员变更没有实现。一个动态的集群需要支持节点的增减,这是 Raft 协议中一个相当复杂的部分,需要通过特殊的日志条目来协调整个集群对配置变化的共识。

尽管存在这些局限,这个从零开始的构建过程清晰地展示了如何将一个复杂的一致性协议工程化,并将其与具体的业务模型相结合,最终形成一个可扩展、高可用的分布式组件库。


  目录