基于 Rust 与 Solid.js 构建处理高频更新的服务发现实时仪表盘


在维护一个由数百个微服务组成的系统时,我们面临的一个棘手问题是实时状态的可观测性。传统的服务发现仪表盘通常采用轮询机制,这在服务状态变化不频繁时尚可接受。但在一个高度动态的环境中,例如服务因自动扩缩容、灰度发布或故障而频繁上下线,轮-polling-based的UI不仅延迟高,还会给服务注册中心带来不必要的周期性压力。我们需要的是一个真正的实时视图,一个能在服务状态变更的瞬间就将信息推送至所有观察者的“驾驶舱”。

我的初步构想是构建一个基于WebSocket的推送系统。后端服务负责监控服务注册表的变化,并将增量更新实时广播给所有连接的前端客户端。前端则需要一个能高效处理这种高频数据流的框架,避免因UI重绘导致浏览器卡顿或内存溢出。

技术选型决策

后端: 为何选择Rust?

对于这个承担着实时推送核心任务的后端,我需要它满足几个关键条件:高性能、高并发处理能力、内存安全。

  • Go 是一个强有力的竞争者,它的Goroutine模型非常适合处理大量并发连接。
  • Node.js 配合 ws 库也能实现,但其单线程事件循环在处理CPU密集型任务(如果未来需要对服务状态做复杂聚合)时可能成为瓶颈,且动态类型的特性在构建长期维护的系统级工具时让我稍有顾虑。

最终我选择了 Rust。其所有权和借用检查器能在编译期消除一整类内存安全问题,这对于一个需要7x24小时稳定运行的基础设施组件至关重要。Tokio生态系统为编写异步网络应用提供了世界级的支持,其性能与资源利用率几乎是无与伦比的。对于一个需要管理数千个WebSocket连接并进行高效广播的场景,Rust的精确内存控制和零成本抽象能力正是我所需要的。

前端: 为何是Solid.js而非React?

前端的选择更为关键。数据流将是持续不断的,可能每秒都有多次更新。

  • React 是业界标准,但其基于Virtual DOM的diff/patch机制在这种场景下可能成为性能瓶颈。每次数据更新,即使很小,也可能触发组件树的大范围比对,产生不必要的计算开销。
  • Vue 的响应式系统比React更优,但其实现依然涉及组件级别的重新渲染。

我选择了 Solid.js。它的核心魅力在于其真正的响应式(fine-grained reactivity)。Solid没有Virtual DOM。它的编译器会将JSX模板转换成直接操作DOM的优化代码,并创建一组响应式的“信号”(Signals)。当数据源(我们的WebSocket消息)更新一个信号时,只有依赖该信号的特定DOM节点或文本内容会被精准更新。这意味着,如果一个服务列表中的某个服务的状态从UP变为DOWN,只有显示状态的那个<span>标签的文本内容会被修改,整个列表项<li>甚至其他部分都不会被触动。这种“外科手术式”的DOM更新,正是处理高频数据流的理想模型。

步骤化实现:构建实时管道

第一部分: Rust后端 - 服务监控与WebSocket广播器

我们的后端需要做三件事:

  1. 维护一个服务状态的内存视图。
  2. 模拟服务状态的随机变化。
  3. 通过WebSocket将这些变化广播给所有连接的客户端。

项目结构与依赖 (Cargo.toml)

[package]
name = "service-discovery-streamer"
version = "0.1.0"
edition = "2021"

[dependencies]
axum = { version = "0.6", features = ["ws"] }
dashmap = "5.5"
futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.28", features = ["full"] }
tokio-stream = "0.1"
tower-http = { version = "0.4", features = ["cors"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.4", features = ["v4", "serde"] }

我们使用了axum作为Web框架,它对WebSocket的支持非常出色。dashmap提供了一个线程安全的HashMap,用于存储服务状态。tokio::sync::broadcast是实现广播的关键。

核心数据模型 (src/models.rs)

use serde::{Deserialize, Serialize};
use uuid::Uuid;
use std::time::{SystemTime, UNIX_EPOOCH};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ServiceStatus {
    Up,
    Down,
    Warning,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Service {
    pub id: Uuid,
    pub name: String,
    pub ip: String,
    pub port: u16,
    pub status: ServiceStatus,
    pub last_updated: u64,
}

impl Service {
    pub fn update_status(&mut self, new_status: ServiceStatus) {
        self.status = new_status;
        self.last_updated = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_secs();
    }
}

// 定义广播给客户端的事件类型
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
pub enum BroadcastEvent {
    FullState(Vec<Service>),
    Update(Service),
}

清晰地定义数据结构是第一步。BroadcastEvent枚举让前端可以轻易地区分是全量状态同步还是增量更新。

应用状态与广播中心 (src/state.rs)

use crate::models::{BroadcastEvent, Service, ServiceStatus};
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;

// AppState 将在所有 axum handler 之间共享
#[derive(Clone)]
pub struct AppState {
    pub service_registry: Arc<DashMap<Uuid, Service>>,
    pub tx: broadcast::Sender<BroadcastEvent>,
}

impl AppState {
    pub fn new() -> Self {
        // broadcast channel 的容量,如果发送速度超过接收速度,旧消息会被丢弃
        // 在我们的场景里,客户端只关心最新状态,所以这个设定是合理的
        let (tx, _) = broadcast::channel(1024);

        Self {
            service_registry: Arc::new(DashMap::new()),
            tx,
        }
    }

    // 将当前所有服务状态广播出去,通常用于新客户端连接时
    pub fn broadcast_full_state(&self) {
        let services = self.service_registry
            .iter()
            .map(|entry| entry.value().clone())
            .collect::<Vec<_>>();
        
        // 在真实项目中,错误处理应该更健壮
        if let Err(e) = self.tx.send(BroadcastEvent::FullState(services)) {
            tracing::error!("Failed to broadcast full state: {}", e);
        }
    }

    // 广播单个服务的更新
    pub fn broadcast_update(&self, service: Service) {
        if let Err(e) = self.tx.send(BroadcastEvent::Update(service)) {
            tracing::error!("Failed to broadcast update: {}", e);
        }
    }
}

AppState是整个应用的核心。service_registryDashMap保证并发安全,txbroadcast::Sender,负责将消息分发给所有订阅者(即WebSocket连接)。

主程序、模拟器与Web服务器 (src/main.rs)

mod models;
mod state;

use axum::{
    extract::{
        ws::{Message, WebSocket, WebSocketUpgrade},
        State,
    },
    response::IntoResponse,
    routing::get,
    Router,
};
use futures_util::{stream::StreamExt, SinkExt};
use state::AppState;
use std::{net::SocketAddr, time::Duration};
use tokio::time::sleep;
use tower_http::cors::{Any, CorsLayer};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use rand::Rng;

use crate::models::{Service, ServiceStatus};
use uuid::Uuid;


#[tokio::main]
async fn main() {
    // 初始化日志
    tracing_subscriber::registry()
        .with(
            tracing_subscriber::EnvFilter::try_from_default_env()
                .unwrap_or_else(|_| "service_discovery_streamer=debug,tower_http=debug".into()),
        )
        .with(tracing_subscriber::fmt::layer())
        .init();

    let app_state = AppState::new();

    // 启动一个后台任务来模拟服务状态变化
    tokio::spawn(service_state_simulator(app_state.clone()));

    let app = Router::new()
        .route("/ws", get(ws_handler))
        .with_state(app_state)
        .layer(
            CorsLayer::new()
                .allow_origin(Any)
                .allow_methods(Any)
                .allow_headers(Any),
        );

    let addr = SocketAddr::from(([127, 0, 0, 1], 3030));
    tracing::debug!("listening on {}", addr);
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();
}

// WebSocket处理器
async fn ws_handler(
    ws: WebSocketUpgrade,
    State(state): State<AppState>,
) -> impl IntoResponse {
    ws.on_upgrade(|socket| handle_socket(socket, state))
}

async fn handle_socket(mut socket: WebSocket, state: AppState) {
    tracing::info!("New WebSocket connection");

    // 订阅广播
    let mut rx = state.tx.subscribe();

    // 1. 新连接建立时,立即发送一次全量状态
    let initial_state = state.service_registry.iter().map(|e| e.value().clone()).collect();
    let initial_event = models::BroadcastEvent::FullState(initial_state);
    let initial_payload = serde_json::to_string(&initial_event).unwrap();
    if socket.send(Message::Text(initial_payload)).await.is_err() {
        tracing::error!("Failed to send initial state to new client. Closing connection.");
        return;
    }
    
    // 2. 启动一个循环,将广播消息转发给这个客户端
    loop {
        tokio::select! {
            // 接收来自广播通道的消息
            Ok(event) = rx.recv() => {
                let payload = serde_json::to_string(&event).unwrap_or_else(|e| {
                    tracing::error!("Failed to serialize event: {}", e);
                    "{}".to_string()
                });

                if socket.send(Message::Text(payload)).await.is_err() {
                    // 发送失败,很可能是客户端已经断开
                    tracing::warn!("Client disconnected. Breaking send loop.");
                    break;
                }
            },
            // 接收来自客户端的消息(心跳等),这里我们忽略
            Some(Ok(msg)) = socket.next() => {
                 if let Message::Close(_) = msg {
                    tracing::info!("Client sent close frame.");
                    break;
                }
            },
            else => {
                // 两种情况都会导致循环退出: rx.recv()错误或socket.next()返回None
                tracing::info!("Client connection closed or broadcast channel lagged.");
                break;
            }
        }
    }
}


// 这个异步任务模拟了服务注册中心的动态变化
async fn service_state_simulator(state: AppState) {
    // 初始化一些服务
    for i in 0..50 {
        let service = Service {
            id: Uuid::new_v4(),
            name: format!("service-{}", i),
            ip: format!("10.0.1.{}", i),
            port: 8080 + i as u16,
            status: ServiceStatus::Up,
            last_updated: 0,
        };
        state.service_registry.insert(service.id, service);
    }
    
    // 首次广播全量状态
    state.broadcast_full_state();
    
    tracing::info!("Initial services populated. Starting simulation...");

    let mut rng = rand::thread_rng();

    loop {
        // 每 200ms 到 1s 随机更新一个服务
        sleep(Duration::from_millis(rng.gen_range(200..1000))).await;

        let service_ids: Vec<Uuid> = state.service_registry.iter().map(|e| *e.key()).collect();
        if service_ids.is_empty() {
            continue;
        }

        let random_index = rng.gen_range(0..service_ids.len());
        let service_id = service_ids[random_index];

        if let Some(mut service_entry) = state.service_registry.get_mut(&service_id) {
            let new_status = match rng.gen_range(0..10) {
                0..=1 => ServiceStatus::Down, // 20% 概率 Down
                2..=3 => ServiceStatus::Warning, // 20% 概率 Warning
                _ => ServiceStatus::Up,     // 60% 概率 Up
            };
            
            service_entry.update_status(new_status);
            let updated_service = service_entry.value().clone();

            // 广播这个更新
            state.broadcast_update(updated_service);
        }
    }
}

这里的service_state_simulator是关键,它以随机间隔更新随机服务的状态,并调用state.broadcast_updatehandle_socket函数则处理每个WebSocket连接,它首先发送一次全量数据,然后进入一个循环,监听broadcast通道并将收到的任何更新转发给客户端。tokio::select!宏优雅地处理了来自广播通道的消息和来自客户端的消息。

第二部分: Solid.js前端 - 消费与渲染数据流

现在,让我们转向前端。我们需要一个能够优雅地消化这股数据洪流的UI。

项目初始化与依赖

npm create solid@latest solid-dashboard
cd solid-dashboard
npm install

WebSocket服务 (src/services/websocket.ts)

import { createSignal } from 'solid-js';

export type ServiceStatus = 'Up' | 'Down' | 'Warning';

export interface Service {
  id: string;
  name: string;
  ip: string;
  port: number;
  status: ServiceStatus;
  last_updated: number;
}

export type BroadcastEvent = 
  | { type: 'FullState', payload: Service[] }
  | { type: 'Update', payload: Service };

// 使用 Signal 来表示连接状态,这样UI可以响应性地展示
export const [connectionStatus, setConnectionStatus] = createSignal<'Connecting' | 'Open' | 'Closed'>('Connecting');

let socket: WebSocket | null = null;
const eventListeners: Map<string, ((data: BroadcastEvent) => void)[]> = new Map();

const connect = () => {
    if (socket && socket.readyState === WebSocket.OPEN) {
        return;
    }

    setConnectionStatus('Connecting');
    socket = new WebSocket('ws://127.0.0.1:3030/ws');

    socket.onopen = () => {
        console.log('WebSocket connection established.');
        setConnectionStatus('Open');
    };

    socket.onmessage = (event) => {
        try {
            const data: BroadcastEvent = JSON.parse(event.data);
            const listeners = eventListeners.get('message') || [];
            listeners.forEach(cb => cb(data));
        } catch (error) {
            console.error('Failed to parse message:', error);
        }
    };

    socket.onerror = (error) => {
        console.error('WebSocket error:', error);
    };

    socket.onclose = () => {
        console.log('WebSocket connection closed. Reconnecting in 3s...');
        setConnectionStatus('Closed');
        socket = null;
        setTimeout(connect, 3000); // 简单的重连逻辑
    };
};

// 允许组件订阅消息
export const onMessage = (callback: (data: BroadcastEvent) => void) => {
    const listeners = eventListeners.get('message') || [];
    listeners.push(callback);
    eventListeners.set('message', listeners);

    // 返回一个取消订阅的函数
    return () => {
        const currentListeners = eventListeners.get('message') || [];
        eventListeners.set('message', currentListeners.filter(cb => cb !== callback));
    };
};

// 初始化连接
connect();

这个服务封装了WebSocket的连接逻辑,包括自动重连,并提供了一个简单的事件订阅机制。连接状态被包装在一个Solid的Signal中,以便UI可以动态反应。

核心UI组件 (src/App.tsx)

import { createStore, produce } from 'solid-js/store';
import { For, createEffect, onCleanup, Show } from 'solid-js';
import { onMessage, connectionStatus, Service } from './services/websocket';
import type { Component } from 'solid-js';
import './App.css';

const App: Component = () => {
  // 使用 createStore 来管理复杂对象数组
  // 这使得对单个服务属性的更新具有细粒度的响应性
  const [services, setServices] = createStore<Record<string, Service>>({});

  const serviceList = () => Object.values(services).sort((a, b) => a.name.localeCompare(b.name));

  createEffect(() => {
    const unsubscribe = onMessage(data => {
      if (data.type === 'FullState') {
        const newState: Record<string, Service> = {};
        for (const service of data.payload) {
          newState[service.id] = service;
        }
        setServices(newState);
      } else if (data.type === 'Update') {
        // 使用 produce 来进行不可变更新,这是 createStore 的最佳实践
        setServices(
          produce(s => {
            s[data.payload.id] = data.payload;
          })
        );
      }
    });

    onCleanup(() => unsubscribe());
  });

  const getStatusClass = (status: Service['status']) => {
    switch(status) {
      case 'Up': return 'status-up';
      case 'Down': return 'status-down';
      case 'Warning': return 'status-warning';
    }
  };

  return (
    <div class="container">
      <header>
        <h1>Service Discovery Dashboard</h1>
        <div class="connection-status">
          Status: <span class={connectionStatus().toLowerCase()}>{connectionStatus()}</span>
        </div>
      </header>
      <main>
        <div class="service-grid">
          <For each={serviceList()}>
            {(service) => (
              <div class="service-card">
                <div class="card-header">
                  <span class={`status-indicator ${getStatusClass(service.status)}`}></span>
                  <h3 class="service-name">{service.name}</h3>
                </div>
                <div class="card-body">
                  <p><strong>ID:</strong> {service.id}</p>
                  <p><strong>Address:</strong> {service.ip}:{service.port}</p>
                  <p><strong>Status:</strong> <span class={`status-text ${getStatusClass(service.status)}`}>{service.status}</span></p>
                </div>
              </div>
            )}
          </For>
        </div>
        <Show when={serviceList().length === 0}>
           <div class="no-services">
              <p>Waiting for service data...</p>
           </div>
        </Show>
      </main>
    </div>
  );
};

export default App;

这里的关键在于 createStore。我们用一个对象(以服务ID为键)来存储服务列表,而不是数组。这使得更新特定服务变得非常高效。当onMessage回调接收到一个Update事件时,它通过setServices(produce(s => { s[data.payload.id] = data.payload; }))来更新store。

Solid的魔力就在这里:这个更新操作只会导致与data.payload.id对应的那个服务卡片中,值发生变化的部分(比如status文本和状态指示灯的class)进行DOM更新。整个卡片、其他卡片、乃至整个列表的DOM结构都保持不变。没有diffing,没有虚拟DOM。这就是为什么即使每秒有几十上百次更新,UI依然流畅如丝。

第三部分: 架构与数据流

为了更清晰地展示整个系统的工作流程,可以用Mermaid图来描绘:

sequenceDiagram
    participant Simulator as Service State Simulator (Rust)
    participant Registry as Service Registry (DashMap)
    participant Broadcaster as WebSocket Broadcaster (Tokio Broadcast)
    participant ClientA as Solid.js Client A
    participant ClientB as Solid.js Client B

    Simulator->>Registry: Periodically update a service's status
    Registry-->>Simulator: Confirms update
    Simulator->>Broadcaster: Send BroadcastEvent::Update
    
    Broadcaster->>ClientA: Push updated service JSON
    Broadcaster->>ClientB: Push updated service JSON

    ClientA->>ClientA: Parse JSON, update Solid Store
    ClientB->>ClientB: Parse JSON, update Solid Store

    Note right of ClientA: Solid.js fine-grained reactivity updates only the specific DOM element (e.g., a )
    Note right of ClientB: Same precise DOM update happens here

这个流程展示了从后端状态变更到多个前端UI精准更新的完整链路。Rust后端的高效广播能力和Solid.js前端的精细化渲染能力在此形成了完美的协同。

局限性与未来迭代路径

这个实现虽然验证了核心架构的可行性和性能优势,但在生产环境中仍有几处需要完善。

首先,后端的单点广播模型是其可伸缩性的瓶颈。当客户端数量达到数千乃至数万时,单个Rust实例的CPU和网络带宽将成为限制。一个更健壮的架构会引入一个中间消息队列(如Redis Pub/Sub或NATS),将服务监控进程与WebSocket网关进程解耦。这样,WebSocket网关可以水平扩展,独立地处理大量客户端连接。

其次,当前的服务发现机制是纯粹的模拟。在真实项目中,Rust代理需要实现与具体服务注册中心(如Kubernetes API Server、Consul、etcd)的集成。这需要编写相应的客户端逻辑,并处理好与这些系统交互时的网络分区、超时等容错问题。

再者,前端的UI功能相对基础。一个完整的仪表盘需要搜索、过滤、排序以及可能的服务详情历史视图。这些功能的加入,会对Solid.js的状态管理提出更高要求,但其底层的响应式模型依然是应对这些挑战的坚实基础。

最后,安全性方面完全没有涉及。生产环境的WebSocket端点必须通过认证(如JWT Token)和授权机制来保护,确保只有合法的用户或系统才能接收到服务状态信息。


  目录