在一个典型的微服务架构中,一个用户操作往往需要跨越多个独立的服务边界。例如,一个“创建项目”的请求可能需要依次调用用户服务来扣除积分,调用资源服务来分配计算资源,最后调用通知服务来发送确认邮件。如果采用简单的同步HTTP调用链,一旦中间某个环节失败,整个系统的数据状态将陷入不一致的泥潭:积分已经扣除,但资源分配失败。这种不一致性在生产环境中是灾难性的。
传统数据库中的两阶段提交(2PC)是解决这类问题的经典方案,它能提供强一致性保证。但在微服务和互联网规模的应用中,2PC的弊端显而易见。它是一个同步阻塞协议,事务的协调者在等待所有参与者响应时会长时间锁定资源。这不仅严重影响系统吞吐量,而且协调者的单点故障会冻结整个分布式事务,导致可用性急剧下降。对于需要高可用和松耦合的现代系统,2PC往往不是一个可行的选项。
相比之下,基于最终一致性的 Saga 模式提供了一种更具韧性和扩展性的替代方案。Saga 将一个长事务拆分为一系列本地事务,每个本地事务由一个对应的服务执行。如果某个本地事务失败,Saga 会执行一系列“补偿事务”来撤销之前已成功提交的事务,从而使系统数据恢复到一致状态。
在众多 Saga 实现中,我们选择“编排式 Saga”(Orchestration-based Saga)。与服务间通过事件相互触发的“协同式 Saga”(Choreography)相比,编排式模型引入一个中心化的“编排器”来明确定义和控制整个事务流程。这种方式使得业务逻辑更集中,状态更清晰,对于复杂流程的监控和调试也更为友好。
本文将完整构建一个基于 Flask、Next.js 和 RabbitMQ 的编排式 Saga 解决方案,解决从前端发起一个长事务请求,到后端多服务协同处理,再到将事务的实时状态流式反馈给前端的全链路挑战。
架构决策与核心组件设计
我们的目标是构建一个系统,前端(Next.js)发起一个创建订单的请求,后端需要协同 OrderService
、StockService
和 PaymentService
。
- Saga Orchestrator (Flask): 核心协调者。它负责驱动整个 Saga 流程,维护事务状态机,并与参与者服务通信。
- Participant Services (Flask): 具体的业务服务,如库存服务和支付服务。它们只负责执行自己的本地事务和补偿事务。
- Message Broker (RabbitMQ): 服务间的通信总线。编排器通过它向参与者发送命令,参与者通过它返回结果。这种异步通信是实现服务解耦和系统韧性的关键。
- Real-time Frontend (Next.js): 用户界面。它不仅要能触发 Saga,更重要的是能够实时、清晰地展示这个长事务的每一步进展,包括成功、失败与补偿状态。我们将使用 Server-Sent Events (SSE) 实现这一目标。
- Styling (Tailwind CSS): 我们将使用 Tailwind CSS 来构建一个动态的状态展示组件,通过原子化的 CSS 类直观地反映 Saga 的生命周期。
sequenceDiagram participant Client (Next.js) participant API Gateway participant Orchestrator (Flask) participant RabbitMQ participant StockService (Flask) participant PaymentService (Flask) Client->>+API Gateway: POST /orders (创建订单) API Gateway->>+Orchestrator: start_order_saga(order_data) Orchestrator->>Orchestrator: 创建 Saga 实例, 状态: PENDING Orchestrator->>RabbitMQ: publish('stock.reserve', {saga_id, ...}) Orchestrator-->>-Client: HTTP 202 Accepted { saga_id } Note right of Client: 客户端开始通过 SSE 监听 saga_id 的状态 RabbitMQ->>+StockService: consume('stock.reserve') StockService->>StockService: 执行本地事务 (锁定库存) alt 成功 StockService->>RabbitMQ: publish('saga.reply', {saga_id, status: 'SUCCESS'}) else 失败 StockService->>RabbitMQ: publish('saga.reply', {saga_id, status: 'FAILURE'}) end StockService-->>-RabbitMQ: ack RabbitMQ->>+Orchestrator: consume('saga.reply') Orchestrator->>Orchestrator: 更新 Saga 状态, 推进或补偿 alt 库存锁定成功 Orchestrator->>RabbitMQ: publish('payment.process', {saga_id, ...}) else 库存锁定失败 Orchestrator->>Orchestrator: Saga 失败, 无需补偿 end %% ... 支付流程类似 ... RabbitMQ->>+PaymentService: consume('payment.process') PaymentService->>PaymentService: 执行本地事务 (扣款) PaymentService->>RabbitMQ: publish('saga.reply', {saga_id, status: 'SUCCESS/FAILURE'}) PaymentService-->>-RabbitMQ: ack RabbitMQ->>+Orchestrator: consume('saga.reply') Orchestrator->>Orchestrator: 更新 Saga 状态 alt 支付成功 Orchestrator->>Orchestrator: Saga 完成, 状态: COMMITTED else 支付失败 Orchestrator->>Orchestrator: Saga 失败, 状态: COMPENSATING Orchestrator->>RabbitMQ: publish('stock.release', {saga_id, ...}) end
Saga 编排器 (Flask) 的实现
编排器是整个架构的大脑。我们将用一个 Flask 应用来实现它,并结合一个简单的内存状态机来管理 Saga 实例。在生产环境中,这应该被替换为持久化存储,如 Redis 或数据库。
1. 项目结构与依赖
/orchestrator
|-- app.py # Flask 主应用
|-- saga_manager.py # Saga 状态机与流程定义
|-- sse.py # Server-Sent Events 实现
|-- rabbit.py # RabbitMQ 连接与消息发布/消费
|-- requirements.txt # pika, flask, flask-cors
2. Saga 状态机与定义 (saga_manager.py
)
这是编排器的核心逻辑。我们定义 Saga 的每一步、对应的执行命令和补偿命令。
# saga_manager.py
import uuid
import threading
import logging
from enum import Enum
from rabbit import RabbitMQConnection
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 使用线程安全的字典来存储Saga实例的状态
# 在生产环境中,这应该被替换为Redis或数据库
sagas = {}
sagas_lock = threading.Lock()
class SagaStatus(str, Enum):
PENDING = "PENDING"
EXECUTING_STOCK = "EXECUTING_STOCK_RESERVATION"
EXECUTING_PAYMENT = "EXECUTING_PAYMENT_PROCESSING"
COMMITTED = "COMMITTED"
COMPENSATING_STOCK = "COMPENSATING_STOCK_RELEASE"
FAILED = "FAILED"
# Saga流程定义
SAGA_DEFINITION = {
'create_order': [
{
'name': 'reserve_stock',
'command_queue': 'stock.reserve.queue',
'on_success': SagaStatus.EXECUTING_PAYMENT,
'on_failure': SagaStatus.FAILED,
'compensation_queue': 'stock.release.queue',
},
{
'name': 'process_payment',
'command_queue': 'payment.process.queue',
'on_success': SagaStatus.COMMITTED,
'on_failure': SagaStatus.COMPENSATING_STOCK,
}
]
}
def get_sse_manager():
""" 延迟导入以避免循环依赖 """
from sse import sse_manager
return sse_manager
def find_step_by_name(saga_name, step_name):
for step in SAGA_DEFINITION.get(saga_name, []):
if step['name'] == step_name:
return step
return None
def find_step_by_compensation(saga_name, compensation_queue):
for step in SAGA_DEFINITION.get(saga_name, []):
if step.get('compensation_queue') == compensation_queue:
return step
return None
class SagaInstance:
def __init__(self, saga_id, saga_name, payload):
self.id = saga_id
self.name = saga_name
self.payload = payload
self.current_step_index = 0
self.status = SagaStatus.PENDING
self.definition = SAGA_DEFINITION[saga_name]
def advance(self):
""" Saga向前推进 """
if self.status == SagaStatus.COMMITTED:
logging.info(f"Saga {self.id} already committed.")
get_sse_manager().broadcast(self.id, {'status': self.status, 'message': 'Saga completed successfully.'})
return
step = self.definition[self.current_step_index]
self.status = SagaStatus[f"EXECUTING_{step['name'].upper()}"]
logging.info(f"Saga {self.id} advancing to step '{step['name']}'. Status: {self.status}")
get_sse_manager().broadcast(self.id, {'status': self.status, 'step': step['name']})
with RabbitMQConnection() as rabbit:
rabbit.publish(
queue=step['command_queue'],
body={'saga_id': self.id, 'step': step['name'], 'payload': self.payload}
)
def process_reply(self, reply):
""" 处理参与者的回复 """
step_name = reply.get('step')
step = find_step_by_name(self.name, step_name)
if not step:
logging.error(f"Saga {self.id}: Received reply for unknown step '{step_name}'.")
return
if reply['status'] == 'SUCCESS':
logging.info(f"Saga {self.id}: Step '{step_name}' succeeded.")
self.status = step['on_success']
self.current_step_index += 1
if self.status != SagaStatus.COMMITTED:
self.advance()
else:
logging.info(f"Saga {self.id} successfully committed.")
get_sse_manager().broadcast(self.id, {'status': self.status, 'message': 'Saga completed successfully.'})
else: # FAILURE
logging.warning(f"Saga {self.id}: Step '{step_name}' failed. Initiating compensation.")
self.status = step['on_failure']
self.compensate()
def compensate(self):
""" Saga向后补偿 """
if self.current_step_index < 0:
self.status = SagaStatus.FAILED
logging.error(f"Saga {self.id} compensation finished. Final status: FAILED.")
get_sse_manager().broadcast(self.id, {'status': self.status, 'message': 'Saga failed and could not be compensated.'})
return
step_to_compensate = self.definition[self.current_step_index]
compensation_queue = step_to_compensate.get('compensation_queue')
if not compensation_queue:
logging.warning(f"Saga {self.id}: No compensation defined for step '{step_to_compensate['name']}'. Moving to previous step.")
self.current_step_index -= 1
self.compensate() # 递归补偿上一步
return
self.status = SagaStatus[f"COMPENSATING_{step_to_compensate['name'].upper()}"]
logging.info(f"Saga {self.id}: Compensating step '{step_to_compensate['name']}'. Status: {self.status}")
get_sse_manager().broadcast(self.id, {'status': self.status, 'step': step_to_compensate['name']})
with RabbitMQConnection() as rabbit:
rabbit.publish(
queue=compensation_queue,
body={'saga_id': self.id, 'step': step_to_compensate['name'], 'payload': self.payload}
)
def start_new_saga(saga_name, payload):
saga_id = str(uuid.uuid4())
instance = SagaInstance(saga_id, saga_name, payload)
with sagas_lock:
sagas[saga_id] = instance
instance.advance()
return saga_id
def handle_saga_reply(reply):
saga_id = reply.get('saga_id')
with sagas_lock:
instance = sagas.get(saga_id)
if not instance:
logging.error(f"Received reply for non-existent saga_id: {saga_id}")
return
instance.process_reply(reply)
3. Server-Sent Events (SSE) 流式更新 (sse.py
)
为了将后端状态实时推送到前端,SSE 是一个比 WebSocket 更轻量的选择。它基于标准的 HTTP,非常适合单向通信场景。
# sse.py
import json
from queue import Queue
import threading
class SSEManager:
def __init__(self):
self.listeners = {}
self.lock = threading.Lock()
def subscribe(self, client_id):
q = Queue()
with self.lock:
self.listeners[client_id] = q
return q
def unsubscribe(self, client_id):
with self.lock:
if client_id in self.listeners:
del self.listeners[client_id]
def broadcast(self, client_id, data):
""" 发送消息给特定的订阅者 """
with self.lock:
q = self.listeners.get(client_id)
if q:
# 必须格式化为 SSE 规范的 "data: ...\n\n"
message = f"data: {json.dumps(data)}\n\n"
q.put(message)
sse_manager = SSEManager()
4. 主应用与 API 端点 (app.py
)
这是将所有部分粘合起来的 Flask 应用。它提供一个 API 来启动 Saga,一个 SSE 端点来监听状态,并启动一个后台线程来消费 RabbitMQ 的回复。
# app.py
import json
import threading
from flask import Flask, request, Response
from flask_cors import CORS
import pika
from saga_manager import start_new_saga, handle_saga_reply
from sse import sse_manager
from rabbit import RabbitMQConnection
app = Flask(__name__)
# 允许来自 Next.js 开发服务器的跨域请求
CORS(app, resources={r"/*": {"origins": "http://localhost:3000"}})
@app.route('/orders', methods=['POST'])
def create_order():
""" 触发一个新的 'create_order' Saga """
data = request.get_json()
if not data:
return {"error": "Invalid payload"}, 400
# 在真实项目中,这里会有负载验证
saga_id = start_new_saga('create_order', data)
# 返回202 Accepted,表示请求已被接受处理,但尚未完成
# 客户端应使用返回的 saga_id 来监听状态更新
return {"saga_id": saga_id}, 202
@app.route('/saga-status/<saga_id>')
def saga_status_stream(saga_id):
""" SSE 端点,用于流式传输 Saga 状态 """
def event_stream():
q = sse_manager.subscribe(saga_id)
try:
while True:
message = q.get()
yield message
except GeneratorExit:
# 当客户端断开连接时,清理订阅者
sse_manager.unsubscribe(saga_id)
return Response(event_stream(), mimetype='text/event-stream')
def start_reply_consumer():
""" 启动一个后台线程来消费来自参与者的回复 """
def callback(ch, method, properties, body):
try:
reply = json.loads(body)
handle_saga_reply(reply)
except json.JSONDecodeError as e:
app.logger.error(f"Failed to decode reply message: {body}, error: {e}")
finally:
ch.basic_ack(delivery_tag=method.delivery_tag)
with RabbitMQConnection() as rabbit:
rabbit.consume('saga.reply.queue', callback)
if __name__ == '__main__':
# 在单独的线程中启动 RabbitMQ 消费者
consumer_thread = threading.Thread(target=start_reply_consumer, daemon=True)
consumer_thread.start()
app.run(port=5001, debug=True)
参与者服务 (Flask) 的实现
以 StockService
为例,所有参与者服务的结构都类似。它们是独立的 Flask 应用,只关心自己的业务逻辑和与 RabbitMQ 的交互。
# stock_service/app.py
import json
import threading
import pika
import time
import random
# 这是一个模拟的库存数据库
inventory = {"product-123": 10}
inventory_lock = threading.Lock()
def reserve_stock(ch, method, properties, body):
data = json.loads(body)
saga_id = data['saga_id']
payload = data['payload']
product_id = payload['product_id']
quantity = payload['quantity']
reply_status = 'FAILURE'
with inventory_lock:
if inventory.get(product_id, 0) >= quantity:
# 模拟操作延迟
time.sleep(2)
inventory[product_id] -= quantity
reply_status = 'SUCCESS'
print(f"Stock for {product_id} reserved. Remaining: {inventory[product_id]}")
# 模拟随机失败
if random.random() < 0.2: # 20% 的失败率
print(f"Simulating failure for stock reservation for saga {saga_id}")
reply_status = 'FAILURE'
send_reply(saga_id, 'reserve_stock', reply_status)
ch.basic_ack(delivery_tag=method.delivery_tag)
def release_stock(ch, method, properties, body):
""" 补偿事务:必须是幂等的且不能失败 """
data = json.loads(body)
payload = data['payload']
product_id = payload['product_id']
quantity = payload['quantity']
with inventory_lock:
inventory[product_id] = inventory.get(product_id, 0) + quantity
print(f"COMPENSATION: Stock for {product_id} released. Current: {inventory[product_id]}")
ch.basic_ack(delivery_tag=method.delivery_tag)
# 补偿事务不发送回复,因为它标志着一个分支的结束
def send_reply(saga_id, step, status):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='saga.reply.queue', durable=True)
message = json.dumps({'saga_id': saga_id, 'step': step, 'status': status})
channel.basic_publish(
exchange='',
routing_key='saga.reply.queue',
body=message
)
connection.close()
# 主消费逻辑
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='stock.reserve.queue', durable=True)
channel.queue_declare(queue='stock.release.queue', durable=True)
channel.basic_consume(queue='stock.reserve.queue', on_message_callback=reserve_stock)
channel.basic_consume(queue='stock.release.queue', on_message_callback=release_stock)
print('StockService is waiting for messages...')
channel.start_consuming()
前端状态同步 (Next.js) 的实现
前端的挑战在于如何优雅地处理这种异步、长周期的后端任务。用户不能被一个无尽的加载动画阻塞。
1. 状态管理与 API 调用
我们使用一个 React Hook 来封装与 Saga 后端的交互。
// hooks/useOrderSaga.js
import { useState, useCallback } from 'react';
export const useOrderSaga = () => {
const [sagaId, setSagaId] = useState(null);
const [status, setStatus] = useState('IDLE');
const [steps, setSteps] = useState([]); // [{ name, status }]
const [error, setError] = useState(null);
const startOrderSaga = useCallback(async (orderData) => {
setStatus('INITIATING');
setSteps([]);
setError(null);
setSagaId(null);
try {
const response = await fetch('http://localhost:5001/orders', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(orderData),
});
if (response.status !== 202) {
throw new Error('Failed to initiate order process.');
}
const { saga_id } = await response.json();
setSagaId(saga_id);
setStatus('LISTENING');
// 建立 SSE 连接
const eventSource = new EventSource(`http://localhost:5001/saga-status/${saga_id}`);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
setStatus(data.status);
// 更新步骤状态
setSteps(prevSteps => {
const newSteps = [...prevSteps];
const stepIndex = newSteps.findIndex(s => s.name === data.step);
if (stepIndex > -1) {
// 更新现有步骤
if (data.status.startsWith('COMPENSATING')) {
newSteps[stepIndex].status = 'COMPENSATING';
} else {
newSteps[stepIndex].status = 'SUCCESS';
}
} else if (data.step) {
// 添加新步骤
if (prevSteps.length > 0) {
prevSteps[prevSteps.length - 1].status = 'SUCCESS';
}
newSteps.push({ name: data.step, status: 'IN_PROGRESS' });
}
return newSteps;
});
if (data.status === 'COMMITTED' || data.status === 'FAILED') {
eventSource.close();
}
};
eventSource.onerror = () => {
setError('Connection to status stream lost.');
setStatus('ERROR');
eventSource.close();
};
} catch (err) {
setError(err.message);
setStatus('ERROR');
}
}, []);
return { sagaId, status, steps, error, startOrderSaga };
};
2. UI 组件与样式方案 (Tailwind CSS)
这个组件将使用 useOrderSaga
hook,并用 Tailwind CSS 动态地渲染 Saga 的状态。
// components/OrderProcessor.jsx
import { useOrderSaga } from '../hooks/useOrderSaga';
const stepFriendlyNames = {
reserve_stock: 'Reserving Stock',
process_payment: 'Processing Payment',
};
const StepIndicator = ({ step }) => {
const statusClasses = {
IN_PROGRESS: 'bg-blue-100 text-blue-800 animate-pulse',
SUCCESS: 'bg-green-100 text-green-800',
COMPENSATING: 'bg-yellow-100 text-yellow-800',
};
const statusText = {
IN_PROGRESS: 'In Progress...',
SUCCESS: '✓ Completed',
COMPENSATING: 'Compensating...',
};
return (
<li className={`p-4 rounded-md shadow-sm flex justify-between items-center mb-2 ${statusClasses[step.status] || 'bg-gray-100'}`}>
<span className="font-medium">{stepFriendlyNames[step.name] || step.name}</span>
<span className="text-sm font-semibold">{statusText[step.status] || ''}</span>
</li>
);
};
export default function OrderProcessor() {
const { sagaId, status, steps, error, startOrderSaga } = useOrderSaga();
const handleCreateOrder = () => {
const orderData = {
user_id: 'user-456',
product_id: 'product-123',
quantity: 1,
amount: 99.99,
};
startOrderSaga(orderData);
};
const isProcessing = status !== 'IDLE' && status !== 'COMMITTED' && status !== 'FAILED' && status !== 'ERROR';
return (
<div className="max-w-2xl mx-auto p-8 font-sans bg-white shadow-lg rounded-lg mt-10">
<h1 className="text-3xl font-bold text-gray-800 mb-6">Saga Pattern Demo</h1>
<button
onClick={handleCreateOrder}
disabled={isProcessing}
className="w-full bg-indigo-600 text-white font-bold py-3 px-6 rounded-lg hover:bg-indigo-700 disabled:bg-gray-400 disabled:cursor-not-allowed transition duration-300"
>
{isProcessing ? 'Processing Order...' : 'Create a New Order'}
</button>
{sagaId && (
<div className="mt-8">
<p className="text-sm text-gray-500 mb-4">
Saga ID: <code className="bg-gray-200 px-1 rounded">{sagaId}</code>
</p>
<h2 className="text-xl font-semibold text-gray-700 mb-4">Transaction Status: {status}</h2>
<ul className="space-y-2">
{steps.map((step, index) => <StepIndicator key={index} step={step} />)}
</ul>
</div>
)}
{status === 'COMMITTED' && (
<div className="mt-6 p-4 bg-green-50 border-l-4 border-green-500 text-green-700">
<p className="font-bold">Success!</p>
<p>Your order has been processed successfully.</p>
</div>
)}
{(status === 'FAILED' || error) && (
<div className="mt-6 p-4 bg-red-50 border-l-4 border-red-500 text-red-700">
<p className="font-bold">Process Failed</p>
<p>{error || 'The order could not be completed. Any completed steps have been rolled back.'}</p>
</div>
)}
</div>
);
}
架构的局限性与未来展望
此架构虽然解决了分布式事务的一致性问题,但也引入了新的复杂性。首先,编排器本身可能成为单点瓶颈。在生产环境中,编排器需要被设计为无状态的,并将 Saga 实例的状态持久化到高可用的存储(如 Redis Cluster 或 CockroachDB)中,自身则可以水平扩展。
其次,最终一致性意味着数据在短时间内可能是不一致的。业务方需要理解并接受这一点。对于某些需要强一致性读的场景(例如,用户查询订单状态时),可能需要设计特定的查询服务或采用 CQRS 模式来提供一个一致性的数据视图。
最后,调试和监控的难度大大增加。一个请求的完整生命周期跨越了多个服务和消息队列。如果没有健全的分布式追踪系统(例如,集成 OpenTelemetry),在生产环境中定位问题将是一场噩梦。每个 Saga 步骤的日志都必须包含唯一的 saga_id
和 correlation_id
,以便将分散的日志串联起来,还原完整的业务流程。