基于 Pulsar 和 CQRS 模式构建高并发读写分离架构的工程实践


一个线上监控系统的核心挑战在于其读写负载的极端不平衡。写入端是海量的、持续不断的遥测数据点上报,要求低延迟和高吞吐;读取端则是运维人员或自动化系统进行的大范围、高频率的聚合查询,要求快速响应。传统的单体应用配合单一数据库的架构,在这种场景下会迅速遭遇瓶颈:写入操作的事务锁会严重影响查询性能,而复杂的查询又会占用大量数据库连接和CPU资源,反过来拖慢写入。数据库的垂直扩展成本高昂,而水平扩展又会引入分片带来的数据一致性与查询复杂性问题。

为解决此问题,我们必须放弃将读写操作耦合在同一模型和存储中的传统思路。命令查询职责分离(CQRS)模式提供了一个清晰的架构指引:将系统操作分为命令(Commands)和查询(Queries)两部分。命令负责处理状态变更,查询负责读取状态,两者使用不同的数据模型,甚至可以部署在完全不同的物理服务和存储上。这种分离允许我们针对性地优化每一端。

然而,CQRS 模式的引入也带来了新的复杂度,核心在于如何可靠、高效地同步写模型(Command Side)的状态变更到读模型(Query Side)。方案A是双写,即在应用层同时写入主数据库和读数据库。这种方式看似简单,但缺乏事务保证,极易导致数据不一致,在生产环境中是不可接受的。方案B是基于数据库日志的变更数据捕获(CDC),例如使用 Debezium。这是一种成熟的方案,但它强耦合了特定的数据库技术,且配置和维护CDC管道本身就有不小的运维成本。

我们最终选择了方案C:基于事件驱动的架构,使用消息队列作为写模型和读模型之间的通信总线。命令服务在处理完业务逻辑、持久化到写数据库后,会发布一个领域事件(Domain Event)到消息队列。查询服务订阅这些事件,并根据事件内容异步地更新自己的读模型。这种方式实现了服务间的彻底解耦,并提供了极高的灵活性和弹性。

技术选型上,我们确定使用 Apache Pulsar 作为消息总线。相比于 Kafka 或 RabbitMQ,Pulsar 的分层架构(Broker 无状态,BookKeeper 负责存储)提供了更强的水平扩展能力和更灵活的数据保留策略(例如,利用其分层存储特性将旧事件归档到S3)。更重要的是,Pulsar 的强一致性消息投递保证和灵活的订阅模式(独占、故障转移、共享)为我们处理分布式一致性问题提供了坚实的基础。

整个系统的架构图如下所示:

graph TD
    subgraph "客户端"
        A[Client/Agent]
    end

    subgraph "写模型 (Command Side)"
        B(Command API Service - Go)
        C(Write DB - PostgreSQL)
    end

    subgraph "消息总线"
        D{Apache Pulsar}
    end

    subgraph "读模型 (Query Side)"
        E(Query Service - Go)
        F(Read DB - Elasticsearch/Redis)
        G(Query API Service - Go)
    end

    subgraph "前端展现"
        H[Dashboard - Vue.js + Tailwind CSS]
    end

    A -- HTTP POST /metrics --> B
    B -- 1. Persist Command --> C
    B -- 2. Publish MetricReceivedEvent --> D
    D -- 3. Consume Event --> E
    E -- 4. Update Read Model --> F
    G -- REST API --> F
    H -- Polls/WebSocket --> G

    style F fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#ccf,stroke:#333,stroke-width:2px

代码规范先行:事件契约的定义

在分布式系统中,服务间的通信契约至关重要。任何模糊不清或随意的契约定义都会在未来演进中造成灾难。因此,我们的第一步是建立严格的代码规范,并使用 Protocol Buffers (Protobuf) 来定义事件结构。Protobuf 提供了强类型、向后兼容的 schema 演进能力和高效的序列化性能。

这是我们的核心事件 MetricReceivedEvent 的定义。

protos/metrics/v1/events.proto:

syntax = "proto3";

package metrics.v1;

import "google/protobuf/timestamp.proto";

option go_package = "github.com/my-org/monitoring-system/gen/go/metrics/v1;metricsv1";

// MetricReceivedEvent 代表一个遥测数据点被成功接收的领域事件
// 这是写模型和读模型之间通信的唯一契约
message MetricReceivedEvent {
  // 事件的唯一标识符,用于幂等性处理和追踪
  string event_id = 1;

  // 事件发生的时间戳 (UTC)
  google.protobuf.Timestamp occurred_at = 2;

  // 遥测数据点的详细信息
  MetricDataPoint data_point = 3;
}

message MetricDataPoint {
  // 数据源的标识,例如 "server-01.us-west-1.prod"
  string source_id = 1;

  // 指标名称,例如 "cpu.usage.percentage"
  string metric_name = 2;

  // 指标值
  double value = 3;

  // 关联的标签,用于查询和聚合
  map<string, string> tags = 4;

  // 数据点的时间戳 (UTC)
  google.protobuf.Timestamp timestamp = 5;
}

这个 .proto 文件不仅仅是一段代码,它是我们系统架构中的法律。所有服务都必须基于它生成的代码进行交互。这 enforces 了一种跨团队、跨服务的代码规范,避免了因手写 JSON 结构不一致导致的问题。

命令服务的实现:稳定写入与事件发布

命令服务的目标是接收数据、快速持久化并可靠地发布事件。它不应该包含复杂的业务逻辑。

command-service/main.go:

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"os"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
	"github.com/google/uuid"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/timestamppb"

	metricsv1 "github.com/my-org/monitoring-system/gen/go/metrics/v1"
)

// MetricIngestRequest 定义了API的入参结构
type MetricIngestRequest struct {
	SourceID   string            `json:"source_id"`
	MetricName string            `json:"metric_name"`
	Value      float64           `json:"value"`
	Tags       map[string]string `json:"tags"`
	Timestamp  time.Time         `json:"timestamp"`
}

type CommandHandler struct {
	pulsarProducer pulsar.Producer
	// 在真实项目中,这里还应该有一个数据库连接池,用于持久化到写模型DB
	// db *sql.DB
}

func (h *CommandHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
		return
	}

	var req MetricIngestRequest
	if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
		http.Error(w, "Invalid request body", http.StatusBadRequest)
		return
	}

    // 1. 数据验证 (在生产环境中需要更完善的验证逻辑)
	if req.SourceID == "" || req.MetricName == "" {
		http.Error(w, "source_id and metric_name are required", http.StatusBadRequest)
		return
	}

	// 2. 持久化到写数据库 (此处省略,但在真实项目中至关重要)
	// tx, err := h.db.Begin()
	// ... store data ...
	// tx.Commit()
	// 如果持久化失败,则不应发布事件,直接返回错误

	// 3. 构建领域事件
	event := &metricsv1.MetricReceivedEvent{
		EventId:    uuid.New().String(),
		OccurredAt: timestamppb.Now(),
		DataPoint: &metricsv1.MetricDataPoint{
			SourceId:   req.SourceID,
			MetricName: req.MetricName,
			Value:      req.Value,
			Tags:       req.Tags,
			Timestamp:  timestamppb.New(req.Timestamp),
		},
	}

	payload, err := proto.Marshal(event)
	if err != nil {
		log.Printf("ERROR: Failed to marshal event protobuf: %v", err)
		http.Error(w, "Internal server error", http.StatusInternalServerError)
		return
	}

	// 4. 异步发布到 Pulsar
	// 使用 Key 可以确保同一来源的数据落在同一个 partition,保证顺序性
	msg := &pulsar.ProducerMessage{
		Payload: payload,
		Key:     req.SourceID,
	}

	// Send an asynchronous message
	h.pulsarProducer.SendAsync(context.Background(), msg,
		func(id pulsar.MessageID, err error) {
			if err != nil {
				// 这里的错误处理至关重要。在生产环境中,需要有重试机制、死信队列等
				// 来处理 Pulsar broker 不可达或消息被拒绝的情况。
				log.Printf("FATAL: Failed to publish message for source %s: %v", req.SourceID, err)
			} else {
				log.Printf("INFO: Successfully published message %s with ID: %v", event.EventId, id)
			}
		})

	w.WriteHeader(http.StatusAccepted)
	w.Write([]byte("Metric accepted"))
}

func main() {
	pulsarURL := os.Getenv("PULSAR_URL")
	if pulsarURL == "" {
		pulsarURL = "pulsar://localhost:6650"
	}

	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarURL})
	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}
	defer client.Close()

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic: "persistent://public/default/metrics-events",
        // 启用批处理可以显著提高吞吐量
		EnableBatching:      true,
		BatchingMaxMessages: 1000,
		BatchingMaxSize:     128 * 1024, // 128KB
		BatchingMaxPublishDelay: 10 * time.Millisecond,
	})
	if err != nil {
		log.Fatalf("Could not create Pulsar producer: %v", err)
	}
	defer producer.Close()

	handler := &CommandHandler{
		pulsarProducer: producer,
	}

	http.Handle("/ingest", handler)
	log.Println("INFO: Command service listening on :8080")
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("Failed to start server: %v", err)
	}
}

这里的关键点在于,HTTP Handler 的职责非常单一:验证、持久化、发布事件。它通过 SendAsync 异步发送消息,快速响应客户端,将消息处理的延迟和可靠性保证交给了 Pulsar 客户端和 Broker。这是保证写入端低延迟的核心。

查询服务的实现:事件消费与读模型构建

查询服务是整个架构中体现分布式一致性思考最多的地方。它的任务是消费 MetricReceivedEvent 事件,并将数据转换为适合快速查询的格式,存入读数据库(例如 Elasticsearch 或 Redis)。

query-service/main.go:

package main

import (
	"context"
	"log"
	"os"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
	"google.golang.org/protobuf/proto"

	metricsv1 "github.com/my-org/monitoring-system/gen/go/metrics/v1"
)

// ReadModelUpdater 是一个接口,抽象了更新读模型的行为
// 具体的实现可以是写入 Elasticsearch, Redis, 或任何其他适合查询的存储
type ReadModelUpdater interface {
	Update(ctx context.Context, point *metricsv1.MetricDataPoint) error
}

// LogUpdater 是一个简单的实现,仅将数据点打印到日志
// 在真实项目中,这里会是 ElasticsearchUpdater
type LogUpdater struct{}

func (u *LogUpdater) Update(ctx context.Context, point *metricsv1.MetricDataPoint) error {
	log.Printf("INFO: Updating read model for metric '%s' from source '%s' with value %.2f",
		point.MetricName, point.SourceId, point.Value)
	// 模拟写入延迟
	time.Sleep(10 * time.Millisecond)
	return nil
}

func main() {
	pulsarURL := os.Getenv("PULSAR_URL")
	if pulsarURL == "" {
		pulsarURL = "pulsar://localhost:6650"
	}

	client, err := pulsar.NewClient(pulsar.ClientOptions{URL: pulsarURL})
	if err != nil {
		log.Fatalf("Could not instantiate Pulsar client: %v", err)
	}
	defer client.Close()

	consumer, err := client.Subscribe(pulsar.ConsumerOptions{
		Topic: "persistent://public/default/metrics-events",
		// SubscriptionName 是持久化的,即使服务重启也会从上次消费的位置继续
		SubscriptionName: "query-service-builder",
		// Failover 模式确保在多个服务实例中,只有一个实例会消费一个 partition 的消息
		// 这对于保证顺序性和避免重复处理非常重要
		Type: pulsar.Failover,
        // Nack 红字策略,当消息处理失败时,会在延迟后重新投递
        // 防止 "poison pill" 消息阻塞整个消费流程
        NackRedeliveryDelay: 10 * time.Second,
	})
	if err != nil {
		log.Fatalf("Could not create Pulsar consumer: %v", err)
	}
	defer consumer.Close()

	updater := &LogUpdater{}
	ctx := context.Background()

	log.Println("INFO: Query service started. Waiting for events...")
	
	// 主消费循环
	for {
		msg, err := consumer.Receive(ctx)
		if err != nil {
			// 在 context canceled 的情况下,优雅退出
			if ctx.Err() != nil {
				log.Println("INFO: Context cancelled, shutting down consumer.")
				return
			}
			log.Printf("ERROR: Failed to receive message: %v", err)
			continue
		}

		var event metricsv1.MetricReceivedEvent
		if err := proto.Unmarshal(msg.Payload(), &event); err != nil {
			log.Printf("ERROR: Failed to unmarshal protobuf message ID %s. Sending to dead-letter topic. Error: %v", msg.ID(), err)
			// 对于无法解析的 "poison pill" 消息,不能简单地 Nack
			// 最佳实践是将其发送到死信队列(DLQ),并 Ack 原始消息,防止阻塞
			// Pulsar 有内置的 DLQ 策略,这里仅作演示
			consumer.Ack(msg)
			continue
		}

		// 核心业务逻辑:更新读模型
		if err := updater.Update(ctx, event.DataPoint); err != nil {
			log.Printf("ERROR: Failed to update read model for event %s. Nacking message. Error: %v", event.EventId, err)
			// 如果更新读模型失败(例如,数据库暂时不可用),
			// 我们 Nack 这条消息,Pulsar 会在稍后重新投递它。
			// 这实现了 "at-least-once" 语义。
			consumer.Nack(msg)
			continue
		}

		// 只有在所有处理都成功后,才 Ack 消息,通知 Pulsar 这条消息已成功处理
		consumer.Ack(msg)
		log.Printf("INFO: Successfully processed and acked message for event %s", event.EventId)
	}
}

这个消费者体现了对分布式一致性的深入考量:

  1. 持久订阅 (SubscriptionName): 保证了即使服务重启,也不会丢失消息。
  2. 故障转移订阅 (Failover): 当我们水平扩展查询服务时,Pulsar 保证每个消息分区只会被一个消费者实例处理,避免了多个实例处理同一事件导致的写竞争和数据错乱。
  3. 消息确认机制 (Ack/Nack): 这是实现最终一致性的基石。我们遵循的原则是:只有当读模型成功更新后,才向 Pulsar 发送 Ack。如果更新失败,就发送 Nack,让 Pulsar 在延迟后重新投递。这保证了消息至少被处理一次(at-least-once delivery)。要实现精确一次(exactly-once),则需要在读模型存储端实现幂等性写入,例如使用事件ID作为唯一键。

前端展现与 Tailwind CSS 的价值

查询API服务(代码未展示)会直接从读数据库(如 Elasticsearch)中读取数据,提供给前端。对于一个内部监控仪表盘,快速迭代和清晰的数据展示是关键。这里就是 Tailwind CSS 发挥价值的地方。

传统上,我们会为每个组件编写独立的 CSS 文件,例如 .dashboard-panel, .metric-chart 等。但随着仪表盘复杂度增加,CSS 会变得难以维护。Tailwind CSS 采用原子化/功能优先(utility-first)的方法,让我们直接在 HTML 中组合原子类来构建界面。

一个简单的指标展示卡片可能看起来是这样的:

<!-- 使用 Vue.js 和 Tailwind CSS 构建的指标卡片组件 -->
<div class="bg-gray-800 rounded-lg shadow-md p-4 flex flex-col justify-between transition-transform duration-300 hover:scale-105">
  <div>
    <div class="flex items-center justify-between mb-2">
      <h3 class="text-sm font-medium text-gray-400">{{ metric.name }}</h3>
      <span class="px-2 py-1 text-xs font-semibold rounded-full"
            :class="{
              'bg-green-500 text-green-900': metric.status === 'OK',
              'bg-yellow-500 text-yellow-900': metric.status === 'WARN',
              'bg-red-500 text-red-900': metric.status === 'CRIT'
            }">
        {{ metric.status }}
      </span>
    </div>
    <p class="text-3xl font-bold text-white">{{ metric.value.toFixed(2) }}</p>
    <p class="text-xs text-gray-500">{{ metric.source }}</p>
  </div>
  <div class="mt-4 h-16 bg-gray-700 rounded">
    <!-- 此处可以放置一个小型图表 -->
  </div>
</div>

使用 Tailwind CSS 的好处在于:

  1. 开发效率: 无需在 HTML 和 CSS 文件之间频繁切换,心智负担小。
  2. 代码规范: 样式是内联且确定的,避免了全局 CSS 命名冲突和样式覆盖问题。团队成员可以快速理解一个组件的样式构成,这本身就是一种视觉上的代码规范。
  3. 可维护性: 当需要修改样式时,直接在模板中修改类名即可,不会意外影响到其他组件。

架构的局限性与演进方向

这个基于 CQRS 和 Pulsar 的架构并非银弹。首先,最显著的代价是最终一致性。从命令服务接收数据到查询服务更新完毕,中间存在一个可感知的延迟。这个延迟对于大多数监控场景是可以接受的,但对于需要强读写一致性的场景(如交易系统)则不适用。

其次,系统的复杂性增加了。我们需要维护命令和查询两个服务、Pulsar 集群以及两种不同的数据存储。这要求团队具备更强的分布式系统运维和调试能力。例如,排查一个数据未在仪表盘上显示的问题,需要追溯 API 调用、事件发布、事件消费、读模型更新等多个环节。引入分布式追踪系统(如 OpenTelemetry)是解决此类问题的必要投资。

未来的优化路径包括:

  1. 引入 Pulsar Functions: 对于简单的事件转换和更新逻辑,可以使用 Pulsar Functions 替代独立的查询服务,进一步简化部署和运维。
  2. 读模型快照: 对于生命周期很长的实体,如果每次重建读模型都需要消费海量历史事件,性能会很差。可以定期为读模型创建快照,新服务实例启动时从快照恢复,再消费增量事件。
  3. 多租户与隔离: Pulsar 的多租户(Tenant/Namespace)特性可以为不同的业务或客户提供逻辑隔离,确保一个租户的流量高峰不会影响到其他租户,这是架构向平台化演进的关键一步。

  目录