一个请求超时,客户端没有收到响应,于是它重试了。这个操作是创建一个订单。结果,数据库里出现了两条一模一样的订单,唯一的区别是ID和创建时间。这是真实项目中再常见不过的场景,尤其是在支付、交易这类核心模块,一次网络抖动就可能造成资损。解决这个问题的核心,就是实现API的幂等性(Idempotency)。
幂等性要求一个操作无论执行一次还是多次,产生的影响都是相同的。要实现它,技术方案有很多,每种方案都有其适用场景与妥协。
方案A:依赖数据库唯一约束
最直接的思路是在数据入口层做防线。客户端在每次请求时生成一个唯一的请求ID(例如 Idempotency-Key
请求头),服务端在创建订单时,将这个请求ID与订单数据一同存入数据库。
// 这是一个简化的模型,仅用于说明
public class Order
{
public Guid Id { get; set; }
public decimal Amount { get; set; }
public string ClientRequestId { get; set; } // 幂等键
// ... 其他属性
}
public class OrderDbContext : DbContext
{
public DbSet<Order> Orders { get; set; }
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// 在数据库层面增加唯一约束
modelBuilder.Entity<Order>()
.HasIndex(o => o.ClientRequestId)
.IsUnique();
}
}
在业务逻辑中,我们只需要 try-catch
数据库的唯一约束异常。
优势:
- 实现简单: 核心逻辑由数据库保证,应用层代码干净。
- 强一致性: 依赖数据库事务的原子性,不存在中间状态。
劣势:
- 业务入侵:
ClientRequestId
这个纯粹的技术概念污染了Order
这个领域模型。领域模型应该只关心业务,而不是实现细节。 - 灵活性差: 如果幂等性逻辑需要扩展,比如不仅要防止重复创建,还要在重复请求时返回第一次成功的结果,这个方案就很难处理。当捕获到唯一约束异常时,我们无法轻易地从异常信息中反查出已存在的订单ID并返回其完整信息。
- 性能考量: 每次都依赖数据库抛出异常来控制业务逻辑,是一种反模式。在高并发下,大量的异常抛出与捕获对性能有明确的负面影响。
- 适用场景有限: 此方案仅适用于“创建”操作。对于“更新”操作(比如“为订单支付100元”),就无法用简单的唯一约束来保证幂等。
在真实项目中,这种方案通常只作为最后的防线,而非首选。我们需要一个更通用、与业务逻辑解耦的架构方案。
方案B:框架层的通用幂等性中间件
一个更优的架构是将幂等性处理逻辑从业务代码中抽离出来,下沉到框架层。在 ASP.NET Core 中,中间件(Middleware)和过滤器(Filter)是实现这种横切关注点的理想选择。我们可以构建一个通用的幂等性处理管道,对需要幂等的接口进行无感植入。
这个方案的核心流程如下:
sequenceDiagram participant Client participant Middleware as Idempotency Middleware participant Controller participant Storage as Distributed Cache (Redis) Client->>Middleware: POST /api/orders (Header: Idempotency-Key: xyz) Middleware->>Storage: GET idempotency_key:xyz alt Key Not Found Middleware->>Storage: SET lock:xyz (Acquire Lock) alt Lock Acquired Middleware->>Controller: Execute Action() Controller-->>Middleware: Result (e.g., 201 Created) Middleware->>Storage: SET idempotency_key:xyz = Result (with TTL) Middleware->>Storage: DEL lock:xyz (Release Lock) Middleware-->>Client: Result (201 Created) else Lock Failed Middleware-->>Client: 409 Conflict (Request in progress) end else Key Found (Cached Result) Middleware-->>Client: Return Cached Result end
优势:
- 关注点分离: 业务开发者只需关心业务逻辑,通过一个简单的特性(Attribute)即可为接口启用幂等性,无需编写任何重复的幂等处理代码。
- 通用性强: 可以处理各种HTTP方法(POST, PUT, PATCH)。可以配置缓存第一次请求的完整响应(状态码、响应体),并在后续重复请求中直接返回,这对于客户端来说是完全透明的。
- 可扩展性: 存储机制是可插拔的。虽然通常选用Redis,但可以轻松替换为其他分布式缓存或数据库。
劣势:
- 实现复杂: 需要处理分布式锁、并发控制、缓存穿透等问题,实现一个生产级的中间件需要周密的考虑。
- 引入新依赖: 需要引入一个外部的分布式缓存/锁服务,比如Redis,增加了系统的复杂度和运维成本。
最终选择与实现
对于一个需要长期演进的系统而言,方案B的优势远大于其劣势。一次投入构建一个健壮的幂等性框架,可以为整个团队节省大量重复劳动,并从架构层面统一幂等性实现,避免出错。
下面,我们将从零开始,在 ASP.NET Core 7 中构建这个基于分布式锁的幂等性中间件。
1. 定义核心抽象
首先,定义幂等性检查的标记。一个 Attribute 是最清晰的声明方式。
IdempotentAttribute.cs
/// <summary>
/// 标记一个Action需要进行幂等性检查。
/// </summary>
[AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = false)]
public sealed class IdempotentAttribute : Attribute
{
/// <summary>
/// 幂等性检查的有效期(秒)。请求结果将被缓存此段时间。
/// 默认为24小时。
/// </summary>
public int ExpiresInSeconds { get; set; } = 24 * 60 * 60;
/// <summary>
/// 包含幂等键的HTTP请求头名称。
/// 默认为 "Idempotency-Key"。
/// </summary>
public string HeaderKey { get; set; } = "Idempotency-Key";
}
接下来,是缓存结果的存储接口。这个接口是解耦的关键,它使得底层存储可以是Redis,也可以是内存缓存(用于测试)或其他任何实现。
IIdempotencyCache.cs
using Microsoft.AspNetCore.Mvc;
namespace IdempotencyMiddleware.Core;
/// <summary>
/// 定义幂等性缓存操作的接口。
/// </summary>
public interface IIdempotencyCache
{
/// <summary>
/// 尝试获取缓存的响应。
/// </summary>
/// <param name="idempotencyKey">幂等键。</param>
/// <returns>如果找到缓存,则返回IActionResult;否则返回null。</returns>
Task<IActionResult?> GetResponseAsync(string idempotencyKey);
/// <summary>
/// 缓存响应。
/// </summary>
/// <param name="idempotencyKey">幂等键。</param>
/// <param name="response">要缓存的响应结果。</param>
/// <param name="expiresIn">过期时间。</param>
Task SetResponseAsync(string idempotencyKey, IActionResult response, TimeSpan expiresIn);
}
2. Redis 实现与分布式锁
我们将使用 StackExchange.Redis
来实现缓存,并利用其 LockTakeAsync
功能来实现一个简单的分布式锁。在真实项目中,使用 RedLock.net 这样的库或者基于Lua脚本的原子操作会更健壮,但这里的实现足以阐明核心思想。
RedisIdempotencyCache.cs
using System.Net;
using System.Text.Json;
using IdempotencyMiddleware.Core;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.Formatters;
using Microsoft.AspNetCore.Mvc.Infrastructure;
using StackExchange.Redis;
namespace IdempotencyMiddleware.Infrastructure;
public class RedisIdempotencyCache : IIdempotencyCache
{
private readonly IDatabase _database;
private readonly IActionContextAccessor _actionContextAccessor;
private static readonly JsonSerializerOptions _jsonSerializerOptions = new()
{
// 配置以支持复杂的 MVC 类型序列化
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
public RedisIdempotencyCache(IConnectionMultiplexer redis, IActionContextAccessor actionContextAccessor)
{
_database = redis.GetDatabase();
_actionContextAccessor = actionContextAccessor;
}
public async Task<IActionResult?> GetResponseAsync(string idempotencyKey)
{
var cacheKey = $"idempotency:{idempotencyKey}";
var cachedValue = await _database.StringGetAsync(cacheKey);
if (cachedValue.IsNullOrEmpty)
{
return null;
}
// 反序列化存储的响应
var cachedResponse = JsonSerializer.Deserialize<CachedResponse>(cachedValue!, _jsonSerializerOptions);
if (cachedResponse == null) return null;
return ToActionResult(cachedResponse);
}
public async Task SetResponseAsync(string idempotencyKey, IActionResult response, TimeSpan expiresIn)
{
var cachedResponse = await ToCachedResponseAsync(response);
if (cachedResponse == null) return;
var cacheKey = $"idempotency:{idempotencyKey}";
var serializedResponse = JsonSerializer.Serialize(cachedResponse, _jsonSerializerOptions);
await _database.StringSetAsync(cacheKey, serializedResponse, expiresIn);
}
// 将 IActionResult 转换为可序列化的对象
private async Task<CachedResponse?> ToCachedResponseAsync(IActionResult result)
{
if (result is not ObjectResult objectResult)
{
// 对于非 ObjectResult (如 StatusCodeResult),我们只保存状态码
if (result is IStatusCodeActionResult statusCodeResult)
{
return new CachedResponse
{
StatusCode = statusCodeResult.StatusCode ?? 500, // 默认500
ContentType = string.Empty,
Body = null
};
}
return null;
}
var httpContext = _actionContextAccessor.ActionContext?.HttpContext;
if (httpContext == null) return null;
// 这里的坑在于: IActionResult 的 Body 是一个流, 不能直接序列化。
// 我们需要重新执行一次内容协商和序列化过程,把它变成一个字符串。
// 这是一个简化的实现,生产环境可能需要更精细的处理。
var originalBodyStream = httpContext.Response.Body;
using var memoryStream = new MemoryStream();
httpContext.Response.Body = memoryStream;
await result.ExecuteResultAsync(_actionContextAccessor.ActionContext!);
memoryStream.Seek(0, SeekOrigin.Begin);
var bodyAsText = await new StreamReader(memoryStream).ReadToEndAsync();
// 恢复原始流
httpContext.Response.Body = originalBodyStream;
return new CachedResponse
{
StatusCode = objectResult.StatusCode ?? (int)HttpStatusCode.OK,
ContentType = objectResult.ContentTypes.FirstOrDefault() ?? "application/json",
Body = bodyAsText
};
}
// 将可序列化的对象转换回 IActionResult
private IActionResult ToActionResult(CachedResponse cachedResponse)
{
var result = new ContentResult
{
Content = cachedResponse.Body,
ContentType = cachedResponse.ContentType,
StatusCode = cachedResponse.StatusCode
};
return result;
}
// 用于序列化的数据传输对象
private class CachedResponse
{
public int StatusCode { get; set; }
public string ContentType { get; set; } = string.Empty;
public string? Body { get; set; }
}
}
这个实现中有个关键的难点:如何序列化 IActionResult
。IActionResult
本身是一个接口,其具体实现(如 OkObjectResult
)包含不可序列化的组件(如 IOutputFormatter
)。我们的解决方案是“重放”ExecuteResultAsync
方法,捕获其输出到内存流,然后将流内容序列化。这是一个取巧但有效的方法。
3. 核心中间件实现
这是所有逻辑的汇集点。我们将使用一个中间件来拦截请求,检查 IdempotentAttribute
,并执行核心的 “Lock -> Check -> Execute -> Save” 流程。
IdempotencyMiddleware.cs
using System.Net;
using IdempotencyMiddleware.Core;
using Microsoft.AspNetCore.Mvc.Controllers;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace IdempotencyMiddleware;
public class IdempotencyMiddleware
{
private readonly RequestDelegate _next;
private readonly ILogger<IdempotencyMiddleware> _logger;
private readonly IIdempotencyCache _cache;
private readonly IDatabase _redisDatabase; // 用于分布式锁
public IdempotencyMiddleware(RequestDelegate next, ILogger<IdempotencyMiddleware> logger, IIdempotencyCache cache, IConnectionMultiplexer redis)
{
_next = next;
_logger = logger;
_cache = cache;
_redisDatabase = redis.GetDatabase();
}
public async Task InvokeAsync(HttpContext context)
{
var endpoint = context.GetEndpoint();
var idempotentAttribute = endpoint?.Metadata.GetMetadata<IdempotentAttribute>();
// 如果 Action 没有标记 [Idempotent],则直接跳过
if (idempotentAttribute == null)
{
await _next(context);
return;
}
// 提取幂等键
if (!context.Request.Headers.TryGetValue(idempotentAttribute.HeaderKey, out var idempotencyKey) || string.IsNullOrEmpty(idempotencyKey))
{
_logger.LogWarning("Idempotency key header '{HeaderKey}' not found.", idempotentAttribute.HeaderKey);
context.Response.StatusCode = (int)HttpStatusCode.BadRequest;
await context.Response.WriteAsync($"Header '{idempotentAttribute.HeaderKey}' is required.");
return;
}
// 检查缓存
var cachedResponse = await _cache.GetResponseAsync(idempotencyKey!);
if (cachedResponse != null)
{
_logger.LogInformation("Idempotency key '{IdempotencyKey}' found in cache. Returning cached response.", idempotencyKey);
await ExecuteResultAsync(context, cachedResponse);
return;
}
// 核心逻辑:获取分布式锁来处理并发请求
var lockKey = $"lock:idempotency:{idempotencyKey}";
var lockToken = Guid.NewGuid().ToString();
// 锁的过期时间应该很短,仅用于防止并发执行,而不是长期锁定
var lockExpiry = TimeSpan.FromSeconds(10);
if (!await _redisDatabase.LockTakeAsync(lockKey, lockToken, lockExpiry))
{
_logger.LogWarning("Could not acquire lock for idempotency key '{IdempotencyKey}'. Another request is in progress.", idempotencyKey);
context.Response.StatusCode = (int)HttpStatusCode.Conflict; // 409 Conflict 是一个合适的代码
await context.Response.WriteAsync("A request with this idempotency key is already in progress.");
return;
}
try
{
// 在获得锁之后,必须再次检查缓存。
// 这是为了处理一个经典竞态条件:
// 请求A获取锁 -> 请求B等待锁 -> 请求A执行完毕,释放锁,写入缓存 -> 请求B获取到锁。
// 如果请求B不再次检查,它会重复执行操作。
cachedResponse = await _cache.GetResponseAsync(idempotencyKey!);
if (cachedResponse != null)
{
_logger.LogInformation("Double-checked cache hit for '{IdempotencyKey}'.", idempotencyKey);
await ExecuteResultAsync(context, cachedResponse);
return;
}
// 捕获下游执行结果,以便缓存
var originalBodyStream = context.Response.Body;
using var responseBody = new MemoryStream();
context.Response.Body = responseBody;
await _next(context);
// 只有成功的请求 (2xx) 才应该被缓存
if (context.Response.StatusCode >= 200 && context.Response.StatusCode < 300)
{
responseBody.Seek(0, SeekOrigin.Begin);
var responseBodyString = await new StreamReader(responseBody).ReadToEndAsync();
responseBody.Seek(0, SeekOrigin.Begin);
// 构建一个 IActionResult 用于缓存
var resultToCache = new ContentResult
{
Content = responseBodyString,
ContentType = context.Response.ContentType,
StatusCode = context.Response.StatusCode
};
var expiresIn = TimeSpan.FromSeconds(idempotentAttribute.ExpiresInSeconds);
await _cache.SetResponseAsync(idempotencyKey!, resultToCache, expiresIn);
}
// 将捕获的响应写回原始的响应流
await responseBody.CopyToAsync(originalBodyStream);
}
finally
{
// 确保锁被释放
await _redisDatabase.LockReleaseAsync(lockKey, lockToken);
}
}
private async Task ExecuteResultAsync(HttpContext context, IActionResult result)
{
var actionContext = new ActionContext(context, new RouteData(), new ControllerActionDescriptor());
await result.ExecuteResultAsync(actionContext);
}
}
4. 注册与使用
最后,将所有组件注册到ASP.NET Core的依赖注入容器中,并启用中间件。
Program.cs
using IdempotencyMiddleware;
using IdempotencyMiddleware.Core;
using IdempotencyMiddleware.Infrastructure;
using Microsoft.AspNetCore.Mvc.Infrastructure;
using StackExchange.Redis;
var builder = WebApplication.CreateBuilder(args);
// 1. 添加控制器和服务
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
// 2. 注册 Redis 连接
// 在生产环境中,配置应该来自 appsettings.json
builder.Services.AddSingleton<IConnectionMultiplexer>(
ConnectionMultiplexer.Connect("localhost:6379,allowAdmin=true")
);
// 3. 注册我们的幂等性服务
builder.Services.AddSingleton<IActionContextAccessor, ActionContextAccessor>();
builder.Services.AddScoped<IIdempotencyCache, RedisIdempotencyCache>();
var app = builder.Build();
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
// 4. 在认证和授权之后,但在端点执行之前,启用幂等性中间件
app.UseAuthorization();
app.UseMiddleware<IdempotencyMiddleware>(); // <<-- 启用中间件
app.MapControllers();
app.Run();
现在,在任何需要幂等的Controller Action上,只需添加 [Idempotent]
特性即可。
OrdersController.cs
[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
private readonly ILogger<OrdersController> _logger;
public OrdersController(ILogger<OrdersController> logger)
{
_logger = logger;
}
[HttpPost]
[Idempotent(ExpiresInSeconds = 60 * 60)] // 幂等性保护,结果缓存1小时
public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request, [FromHeader(Name = "Idempotency-Key")] string idempotencyKey)
{
// 在真实项目中,这里会执行数据库操作、调用其他服务等
_logger.LogInformation("Processing order creation for key: {key}", idempotencyKey);
// 模拟耗时操作
await Task.Delay(1000);
var order = new { Id = Guid.NewGuid(), request.ProductId, request.Quantity, CreatedAt = DateTime.UtcNow };
// 返回一个 201 Created 结果,并包含新创建的资源位置
return CreatedAtAction(nameof(GetOrderById), new { id = order.Id }, order);
}
[HttpGet("{id}")]
public IActionResult GetOrderById(Guid id)
{
return Ok(new { Id = id, ProductId = "PROD-123", Quantity = 1 });
}
}
public record OrderRequest(string ProductId, int Quantity);
架构的局限性与未来迭代
这个方案虽然健壮,但并非银弹。它存在一些固有的局限性:
- Redis 依赖: 整个幂等性保障强依赖于Redis的可用性和性能。如果Redis集群出现故障,所有被
[Idempotent]
保护的接口都将失效。必须为Redis做高可用部署。 - 锁的性能开销: 每次请求都需要与Redis进行至少一次网络交互以获取锁,这会增加请求的延迟。对于非幂等接口,这个开销是不存在的。
- 缓存内容的大小: 我们缓存了完整的HTTP响应体。如果接口返回的数据非常大(比如一个巨大的JSON数组或文件),这会占用大量Redis内存,并增加序列化/反序列化的开销。
- 键的冲突:
Idempotency-Key
的生成策略由客户端决定。如果客户端的UUID生成算法有问题,可能导致不同请求产生相同的键,从而错误地返回缓存结果。
未来的迭代方向可以包括:
- 存储策略优化: 对于大响应体,可以考虑只缓存一个指向持久化存储(如S3或数据库)的引用,而不是将整个响应体塞进Redis。
- 无锁化方案探索: 对于某些场景,可以探索基于状态机的幂等性实现,或者使用支持原子性
SET if Not Exists
操作的存储,减少对显式锁的依赖。 - 与API网关集成: 更进一步,可以将幂等性检查上移到API网关层(如Nginx+Lua, Kong, APISIX)。这样做的好处是,重复的请求甚至不会到达后端服务,进一步节省了计算资源。但这会增加网关的复杂性,且网关层通常难以获取完整的业务执行结果。