采用 Saga 模式实现 Flask 后端与 Next.js 前端的分布式事务状态同步


在一个典型的微服务架构中,一个用户操作往往需要跨越多个独立的服务边界。例如,一个“创建项目”的请求可能需要依次调用用户服务来扣除积分,调用资源服务来分配计算资源,最后调用通知服务来发送确认邮件。如果采用简单的同步HTTP调用链,一旦中间某个环节失败,整个系统的数据状态将陷入不一致的泥潭:积分已经扣除,但资源分配失败。这种不一致性在生产环境中是灾难性的。

传统数据库中的两阶段提交(2PC)是解决这类问题的经典方案,它能提供强一致性保证。但在微服务和互联网规模的应用中,2PC的弊端显而易见。它是一个同步阻塞协议,事务的协调者在等待所有参与者响应时会长时间锁定资源。这不仅严重影响系统吞吐量,而且协调者的单点故障会冻结整个分布式事务,导致可用性急剧下降。对于需要高可用和松耦合的现代系统,2PC往往不是一个可行的选项。

相比之下,基于最终一致性的 Saga 模式提供了一种更具韧性和扩展性的替代方案。Saga 将一个长事务拆分为一系列本地事务,每个本地事务由一个对应的服务执行。如果某个本地事务失败,Saga 会执行一系列“补偿事务”来撤销之前已成功提交的事务,从而使系统数据恢复到一致状态。

在众多 Saga 实现中,我们选择“编排式 Saga”(Orchestration-based Saga)。与服务间通过事件相互触发的“协同式 Saga”(Choreography)相比,编排式模型引入一个中心化的“编排器”来明确定义和控制整个事务流程。这种方式使得业务逻辑更集中,状态更清晰,对于复杂流程的监控和调试也更为友好。

本文将完整构建一个基于 Flask、Next.js 和 RabbitMQ 的编排式 Saga 解决方案,解决从前端发起一个长事务请求,到后端多服务协同处理,再到将事务的实时状态流式反馈给前端的全链路挑战。

架构决策与核心组件设计

我们的目标是构建一个系统,前端(Next.js)发起一个创建订单的请求,后端需要协同 OrderServiceStockServicePaymentService

  1. Saga Orchestrator (Flask): 核心协调者。它负责驱动整个 Saga 流程,维护事务状态机,并与参与者服务通信。
  2. Participant Services (Flask): 具体的业务服务,如库存服务和支付服务。它们只负责执行自己的本地事务和补偿事务。
  3. Message Broker (RabbitMQ): 服务间的通信总线。编排器通过它向参与者发送命令,参与者通过它返回结果。这种异步通信是实现服务解耦和系统韧性的关键。
  4. Real-time Frontend (Next.js): 用户界面。它不仅要能触发 Saga,更重要的是能够实时、清晰地展示这个长事务的每一步进展,包括成功、失败与补偿状态。我们将使用 Server-Sent Events (SSE) 实现这一目标。
  5. Styling (Tailwind CSS): 我们将使用 Tailwind CSS 来构建一个动态的状态展示组件,通过原子化的 CSS 类直观地反映 Saga 的生命周期。
sequenceDiagram
    participant Client (Next.js)
    participant API Gateway
    participant Orchestrator (Flask)
    participant RabbitMQ
    participant StockService (Flask)
    participant PaymentService (Flask)

    Client->>+API Gateway: POST /orders (创建订单)
    API Gateway->>+Orchestrator: start_order_saga(order_data)
    Orchestrator->>Orchestrator: 创建 Saga 实例, 状态: PENDING
    Orchestrator->>RabbitMQ: publish('stock.reserve', {saga_id, ...})
    Orchestrator-->>-Client: HTTP 202 Accepted { saga_id }

    Note right of Client: 客户端开始通过 SSE 监听 saga_id 的状态

    RabbitMQ->>+StockService: consume('stock.reserve')
    StockService->>StockService: 执行本地事务 (锁定库存)
    alt 成功
        StockService->>RabbitMQ: publish('saga.reply', {saga_id, status: 'SUCCESS'})
    else 失败
        StockService->>RabbitMQ: publish('saga.reply', {saga_id, status: 'FAILURE'})
    end
    StockService-->>-RabbitMQ: ack

    RabbitMQ->>+Orchestrator: consume('saga.reply')
    Orchestrator->>Orchestrator: 更新 Saga 状态, 推进或补偿
    alt 库存锁定成功
        Orchestrator->>RabbitMQ: publish('payment.process', {saga_id, ...})
    else 库存锁定失败
        Orchestrator->>Orchestrator: Saga 失败, 无需补偿
    end

    %% ... 支付流程类似 ...
    RabbitMQ->>+PaymentService: consume('payment.process')
    PaymentService->>PaymentService: 执行本地事务 (扣款)
    PaymentService->>RabbitMQ: publish('saga.reply', {saga_id, status: 'SUCCESS/FAILURE'})
    PaymentService-->>-RabbitMQ: ack

    RabbitMQ->>+Orchestrator: consume('saga.reply')
    Orchestrator->>Orchestrator: 更新 Saga 状态
    alt 支付成功
        Orchestrator->>Orchestrator: Saga 完成, 状态: COMMITTED
    else 支付失败
        Orchestrator->>Orchestrator: Saga 失败, 状态: COMPENSATING
        Orchestrator->>RabbitMQ: publish('stock.release', {saga_id, ...})
    end

Saga 编排器 (Flask) 的实现

编排器是整个架构的大脑。我们将用一个 Flask 应用来实现它,并结合一个简单的内存状态机来管理 Saga 实例。在生产环境中,这应该被替换为持久化存储,如 Redis 或数据库。

1. 项目结构与依赖

/orchestrator
|-- app.py             # Flask 主应用
|-- saga_manager.py    # Saga 状态机与流程定义
|-- sse.py             # Server-Sent Events 实现
|-- rabbit.py          # RabbitMQ 连接与消息发布/消费
|-- requirements.txt   # pika, flask, flask-cors

2. Saga 状态机与定义 (saga_manager.py)

这是编排器的核心逻辑。我们定义 Saga 的每一步、对应的执行命令和补偿命令。

# saga_manager.py
import uuid
import threading
import logging
from enum import Enum
from rabbit import RabbitMQConnection

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 使用线程安全的字典来存储Saga实例的状态
# 在生产环境中,这应该被替换为Redis或数据库
sagas = {}
sagas_lock = threading.Lock()

class SagaStatus(str, Enum):
    PENDING = "PENDING"
    EXECUTING_STOCK = "EXECUTING_STOCK_RESERVATION"
    EXECUTING_PAYMENT = "EXECUTING_PAYMENT_PROCESSING"
    COMMITTED = "COMMITTED"
    COMPENSATING_STOCK = "COMPENSATING_STOCK_RELEASE"
    FAILED = "FAILED"

# Saga流程定义
SAGA_DEFINITION = {
    'create_order': [
        {
            'name': 'reserve_stock',
            'command_queue': 'stock.reserve.queue',
            'on_success': SagaStatus.EXECUTING_PAYMENT,
            'on_failure': SagaStatus.FAILED,
            'compensation_queue': 'stock.release.queue',
        },
        {
            'name': 'process_payment',
            'command_queue': 'payment.process.queue',
            'on_success': SagaStatus.COMMITTED,
            'on_failure': SagaStatus.COMPENSATING_STOCK,
        }
    ]
}

def get_sse_manager():
    """ 延迟导入以避免循环依赖 """
    from sse import sse_manager
    return sse_manager

def find_step_by_name(saga_name, step_name):
    for step in SAGA_DEFINITION.get(saga_name, []):
        if step['name'] == step_name:
            return step
    return None
    
def find_step_by_compensation(saga_name, compensation_queue):
    for step in SAGA_DEFINITION.get(saga_name, []):
        if step.get('compensation_queue') == compensation_queue:
            return step
    return None

class SagaInstance:
    def __init__(self, saga_id, saga_name, payload):
        self.id = saga_id
        self.name = saga_name
        self.payload = payload
        self.current_step_index = 0
        self.status = SagaStatus.PENDING
        self.definition = SAGA_DEFINITION[saga_name]

    def advance(self):
        """ Saga向前推进 """
        if self.status == SagaStatus.COMMITTED:
            logging.info(f"Saga {self.id} already committed.")
            get_sse_manager().broadcast(self.id, {'status': self.status, 'message': 'Saga completed successfully.'})
            return

        step = self.definition[self.current_step_index]
        self.status = SagaStatus[f"EXECUTING_{step['name'].upper()}"]
        logging.info(f"Saga {self.id} advancing to step '{step['name']}'. Status: {self.status}")
        get_sse_manager().broadcast(self.id, {'status': self.status, 'step': step['name']})
        
        with RabbitMQConnection() as rabbit:
            rabbit.publish(
                queue=step['command_queue'],
                body={'saga_id': self.id, 'step': step['name'], 'payload': self.payload}
            )

    def process_reply(self, reply):
        """ 处理参与者的回复 """
        step_name = reply.get('step')
        step = find_step_by_name(self.name, step_name)
        if not step:
            logging.error(f"Saga {self.id}: Received reply for unknown step '{step_name}'.")
            return
            
        if reply['status'] == 'SUCCESS':
            logging.info(f"Saga {self.id}: Step '{step_name}' succeeded.")
            self.status = step['on_success']
            self.current_step_index += 1
            if self.status != SagaStatus.COMMITTED:
                self.advance()
            else:
                logging.info(f"Saga {self.id} successfully committed.")
                get_sse_manager().broadcast(self.id, {'status': self.status, 'message': 'Saga completed successfully.'})
        else: # FAILURE
            logging.warning(f"Saga {self.id}: Step '{step_name}' failed. Initiating compensation.")
            self.status = step['on_failure']
            self.compensate()

    def compensate(self):
        """ Saga向后补偿 """
        if self.current_step_index < 0:
            self.status = SagaStatus.FAILED
            logging.error(f"Saga {self.id} compensation finished. Final status: FAILED.")
            get_sse_manager().broadcast(self.id, {'status': self.status, 'message': 'Saga failed and could not be compensated.'})
            return

        step_to_compensate = self.definition[self.current_step_index]
        compensation_queue = step_to_compensate.get('compensation_queue')
        
        if not compensation_queue:
            logging.warning(f"Saga {self.id}: No compensation defined for step '{step_to_compensate['name']}'. Moving to previous step.")
            self.current_step_index -= 1
            self.compensate() # 递归补偿上一步
            return

        self.status = SagaStatus[f"COMPENSATING_{step_to_compensate['name'].upper()}"]
        logging.info(f"Saga {self.id}: Compensating step '{step_to_compensate['name']}'. Status: {self.status}")
        get_sse_manager().broadcast(self.id, {'status': self.status, 'step': step_to_compensate['name']})

        with RabbitMQConnection() as rabbit:
            rabbit.publish(
                queue=compensation_queue,
                body={'saga_id': self.id, 'step': step_to_compensate['name'], 'payload': self.payload}
            )

def start_new_saga(saga_name, payload):
    saga_id = str(uuid.uuid4())
    instance = SagaInstance(saga_id, saga_name, payload)
    
    with sagas_lock:
        sagas[saga_id] = instance
    
    instance.advance()
    return saga_id

def handle_saga_reply(reply):
    saga_id = reply.get('saga_id')
    with sagas_lock:
        instance = sagas.get(saga_id)

    if not instance:
        logging.error(f"Received reply for non-existent saga_id: {saga_id}")
        return
        
    instance.process_reply(reply)

3. Server-Sent Events (SSE) 流式更新 (sse.py)

为了将后端状态实时推送到前端,SSE 是一个比 WebSocket 更轻量的选择。它基于标准的 HTTP,非常适合单向通信场景。

# sse.py
import json
from queue import Queue
import threading

class SSEManager:
    def __init__(self):
        self.listeners = {}
        self.lock = threading.Lock()

    def subscribe(self, client_id):
        q = Queue()
        with self.lock:
            self.listeners[client_id] = q
        return q

    def unsubscribe(self, client_id):
        with self.lock:
            if client_id in self.listeners:
                del self.listeners[client_id]

    def broadcast(self, client_id, data):
        """ 发送消息给特定的订阅者 """
        with self.lock:
            q = self.listeners.get(client_id)
            if q:
                # 必须格式化为 SSE 规范的 "data: ...\n\n"
                message = f"data: {json.dumps(data)}\n\n"
                q.put(message)

sse_manager = SSEManager()

4. 主应用与 API 端点 (app.py)

这是将所有部分粘合起来的 Flask 应用。它提供一个 API 来启动 Saga,一个 SSE 端点来监听状态,并启动一个后台线程来消费 RabbitMQ 的回复。

# app.py
import json
import threading
from flask import Flask, request, Response
from flask_cors import CORS
import pika

from saga_manager import start_new_saga, handle_saga_reply
from sse import sse_manager
from rabbit import RabbitMQConnection

app = Flask(__name__)
# 允许来自 Next.js 开发服务器的跨域请求
CORS(app, resources={r"/*": {"origins": "http://localhost:3000"}})

@app.route('/orders', methods=['POST'])
def create_order():
    """ 触发一个新的 'create_order' Saga """
    data = request.get_json()
    if not data:
        return {"error": "Invalid payload"}, 400
    
    # 在真实项目中,这里会有负载验证
    saga_id = start_new_saga('create_order', data)
    
    # 返回202 Accepted,表示请求已被接受处理,但尚未完成
    # 客户端应使用返回的 saga_id 来监听状态更新
    return {"saga_id": saga_id}, 202

@app.route('/saga-status/<saga_id>')
def saga_status_stream(saga_id):
    """ SSE 端点,用于流式传输 Saga 状态 """
    def event_stream():
        q = sse_manager.subscribe(saga_id)
        try:
            while True:
                message = q.get()
                yield message
        except GeneratorExit:
            # 当客户端断开连接时,清理订阅者
            sse_manager.unsubscribe(saga_id)

    return Response(event_stream(), mimetype='text/event-stream')

def start_reply_consumer():
    """ 启动一个后台线程来消费来自参与者的回复 """
    def callback(ch, method, properties, body):
        try:
            reply = json.loads(body)
            handle_saga_reply(reply)
        except json.JSONDecodeError as e:
            app.logger.error(f"Failed to decode reply message: {body}, error: {e}")
        finally:
            ch.basic_ack(delivery_tag=method.delivery_tag)

    with RabbitMQConnection() as rabbit:
        rabbit.consume('saga.reply.queue', callback)

if __name__ == '__main__':
    # 在单独的线程中启动 RabbitMQ 消费者
    consumer_thread = threading.Thread(target=start_reply_consumer, daemon=True)
    consumer_thread.start()
    
    app.run(port=5001, debug=True)

参与者服务 (Flask) 的实现

StockService 为例,所有参与者服务的结构都类似。它们是独立的 Flask 应用,只关心自己的业务逻辑和与 RabbitMQ 的交互。

# stock_service/app.py
import json
import threading
import pika
import time
import random

# 这是一个模拟的库存数据库
inventory = {"product-123": 10}
inventory_lock = threading.Lock()

def reserve_stock(ch, method, properties, body):
    data = json.loads(body)
    saga_id = data['saga_id']
    payload = data['payload']
    product_id = payload['product_id']
    quantity = payload['quantity']
    
    reply_status = 'FAILURE'
    with inventory_lock:
        if inventory.get(product_id, 0) >= quantity:
            # 模拟操作延迟
            time.sleep(2) 
            inventory[product_id] -= quantity
            reply_status = 'SUCCESS'
            print(f"Stock for {product_id} reserved. Remaining: {inventory[product_id]}")
    
    # 模拟随机失败
    if random.random() < 0.2: # 20% 的失败率
        print(f"Simulating failure for stock reservation for saga {saga_id}")
        reply_status = 'FAILURE'

    send_reply(saga_id, 'reserve_stock', reply_status)
    ch.basic_ack(delivery_tag=method.delivery_tag)

def release_stock(ch, method, properties, body):
    """ 补偿事务:必须是幂等的且不能失败 """
    data = json.loads(body)
    payload = data['payload']
    product_id = payload['product_id']
    quantity = payload['quantity']

    with inventory_lock:
        inventory[product_id] = inventory.get(product_id, 0) + quantity
    
    print(f"COMPENSATION: Stock for {product_id} released. Current: {inventory[product_id]}")
    ch.basic_ack(delivery_tag=method.delivery_tag)
    # 补偿事务不发送回复,因为它标志着一个分支的结束

def send_reply(saga_id, step, status):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='saga.reply.queue', durable=True)
    message = json.dumps({'saga_id': saga_id, 'step': step, 'status': status})
    channel.basic_publish(
        exchange='',
        routing_key='saga.reply.queue',
        body=message
    )
    connection.close()

# 主消费逻辑
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='stock.reserve.queue', durable=True)
channel.queue_declare(queue='stock.release.queue', durable=True)
channel.basic_consume(queue='stock.reserve.queue', on_message_callback=reserve_stock)
channel.basic_consume(queue='stock.release.queue', on_message_callback=release_stock)

print('StockService is waiting for messages...')
channel.start_consuming()

前端状态同步 (Next.js) 的实现

前端的挑战在于如何优雅地处理这种异步、长周期的后端任务。用户不能被一个无尽的加载动画阻塞。

1. 状态管理与 API 调用

我们使用一个 React Hook 来封装与 Saga 后端的交互。

// hooks/useOrderSaga.js
import { useState, useCallback } from 'react';

export const useOrderSaga = () => {
  const [sagaId, setSagaId] = useState(null);
  const [status, setStatus] = useState('IDLE');
  const [steps, setSteps] = useState([]); // [{ name, status }]
  const [error, setError] = useState(null);

  const startOrderSaga = useCallback(async (orderData) => {
    setStatus('INITIATING');
    setSteps([]);
    setError(null);
    setSagaId(null);

    try {
      const response = await fetch('http://localhost:5001/orders', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(orderData),
      });

      if (response.status !== 202) {
        throw new Error('Failed to initiate order process.');
      }

      const { saga_id } = await response.json();
      setSagaId(saga_id);
      setStatus('LISTENING');

      // 建立 SSE 连接
      const eventSource = new EventSource(`http://localhost:5001/saga-status/${saga_id}`);
      
      eventSource.onmessage = (event) => {
        const data = JSON.parse(event.data);
        setStatus(data.status);
        
        // 更新步骤状态
        setSteps(prevSteps => {
          const newSteps = [...prevSteps];
          const stepIndex = newSteps.findIndex(s => s.name === data.step);
          
          if (stepIndex > -1) {
            // 更新现有步骤
            if (data.status.startsWith('COMPENSATING')) {
              newSteps[stepIndex].status = 'COMPENSATING';
            } else {
              newSteps[stepIndex].status = 'SUCCESS';
            }
          } else if (data.step) {
            // 添加新步骤
            if (prevSteps.length > 0) {
              prevSteps[prevSteps.length - 1].status = 'SUCCESS';
            }
            newSteps.push({ name: data.step, status: 'IN_PROGRESS' });
          }
          
          return newSteps;
        });
        
        if (data.status === 'COMMITTED' || data.status === 'FAILED') {
          eventSource.close();
        }
      };
      
      eventSource.onerror = () => {
        setError('Connection to status stream lost.');
        setStatus('ERROR');
        eventSource.close();
      };

    } catch (err) {
      setError(err.message);
      setStatus('ERROR');
    }
  }, []);

  return { sagaId, status, steps, error, startOrderSaga };
};

2. UI 组件与样式方案 (Tailwind CSS)

这个组件将使用 useOrderSaga hook,并用 Tailwind CSS 动态地渲染 Saga 的状态。

// components/OrderProcessor.jsx
import { useOrderSaga } from '../hooks/useOrderSaga';

const stepFriendlyNames = {
  reserve_stock: 'Reserving Stock',
  process_payment: 'Processing Payment',
};

const StepIndicator = ({ step }) => {
  const statusClasses = {
    IN_PROGRESS: 'bg-blue-100 text-blue-800 animate-pulse',
    SUCCESS: 'bg-green-100 text-green-800',
    COMPENSATING: 'bg-yellow-100 text-yellow-800',
  };

  const statusText = {
    IN_PROGRESS: 'In Progress...',
    SUCCESS: '✓ Completed',
    COMPENSATING: 'Compensating...',
  };

  return (
    <li className={`p-4 rounded-md shadow-sm flex justify-between items-center mb-2 ${statusClasses[step.status] || 'bg-gray-100'}`}>
      <span className="font-medium">{stepFriendlyNames[step.name] || step.name}</span>
      <span className="text-sm font-semibold">{statusText[step.status] || ''}</span>
    </li>
  );
};

export default function OrderProcessor() {
  const { sagaId, status, steps, error, startOrderSaga } = useOrderSaga();

  const handleCreateOrder = () => {
    const orderData = {
      user_id: 'user-456',
      product_id: 'product-123',
      quantity: 1,
      amount: 99.99,
    };
    startOrderSaga(orderData);
  };

  const isProcessing = status !== 'IDLE' && status !== 'COMMITTED' && status !== 'FAILED' && status !== 'ERROR';

  return (
    <div className="max-w-2xl mx-auto p-8 font-sans bg-white shadow-lg rounded-lg mt-10">
      <h1 className="text-3xl font-bold text-gray-800 mb-6">Saga Pattern Demo</h1>
      
      <button
        onClick={handleCreateOrder}
        disabled={isProcessing}
        className="w-full bg-indigo-600 text-white font-bold py-3 px-6 rounded-lg hover:bg-indigo-700 disabled:bg-gray-400 disabled:cursor-not-allowed transition duration-300"
      >
        {isProcessing ? 'Processing Order...' : 'Create a New Order'}
      </button>

      {sagaId && (
        <div className="mt-8">
          <p className="text-sm text-gray-500 mb-4">
            Saga ID: <code className="bg-gray-200 px-1 rounded">{sagaId}</code>
          </p>
          
          <h2 className="text-xl font-semibold text-gray-700 mb-4">Transaction Status: {status}</h2>
          
          <ul className="space-y-2">
            {steps.map((step, index) => <StepIndicator key={index} step={step} />)}
          </ul>
        </div>
      )}
      
      {status === 'COMMITTED' && (
        <div className="mt-6 p-4 bg-green-50 border-l-4 border-green-500 text-green-700">
          <p className="font-bold">Success!</p>
          <p>Your order has been processed successfully.</p>
        </div>
      )}
      
      {(status === 'FAILED' || error) && (
        <div className="mt-6 p-4 bg-red-50 border-l-4 border-red-500 text-red-700">
          <p className="font-bold">Process Failed</p>
          <p>{error || 'The order could not be completed. Any completed steps have been rolled back.'}</p>
        </div>
      )}
    </div>
  );
}

架构的局限性与未来展望

此架构虽然解决了分布式事务的一致性问题,但也引入了新的复杂性。首先,编排器本身可能成为单点瓶颈。在生产环境中,编排器需要被设计为无状态的,并将 Saga 实例的状态持久化到高可用的存储(如 Redis Cluster 或 CockroachDB)中,自身则可以水平扩展。

其次,最终一致性意味着数据在短时间内可能是不一致的。业务方需要理解并接受这一点。对于某些需要强一致性读的场景(例如,用户查询订单状态时),可能需要设计特定的查询服务或采用 CQRS 模式来提供一个一致性的数据视图。

最后,调试和监控的难度大大增加。一个请求的完整生命周期跨越了多个服务和消息队列。如果没有健全的分布式追踪系统(例如,集成 OpenTelemetry),在生产环境中定位问题将是一场噩梦。每个 Saga 步骤的日志都必须包含唯一的 saga_idcorrelation_id,以便将分散的日志串联起来,还原完整的业务流程。


  目录