在 ASP.NET Core 中构建基于分布式锁的幂等性API中间件


一个请求超时,客户端没有收到响应,于是它重试了。这个操作是创建一个订单。结果,数据库里出现了两条一模一样的订单,唯一的区别是ID和创建时间。这是真实项目中再常见不过的场景,尤其是在支付、交易这类核心模块,一次网络抖动就可能造成资损。解决这个问题的核心,就是实现API的幂等性(Idempotency)。

幂等性要求一个操作无论执行一次还是多次,产生的影响都是相同的。要实现它,技术方案有很多,每种方案都有其适用场景与妥协。

方案A:依赖数据库唯一约束

最直接的思路是在数据入口层做防线。客户端在每次请求时生成一个唯一的请求ID(例如 Idempotency-Key 请求头),服务端在创建订单时,将这个请求ID与订单数据一同存入数据库。

// 这是一个简化的模型,仅用于说明
public class Order
{
    public Guid Id { get; set; }
    public decimal Amount { get; set; }
    public string ClientRequestId { get; set; } // 幂等键
    // ... 其他属性
}

public class OrderDbContext : DbContext
{
    public DbSet<Order> Orders { get; set; }

    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        // 在数据库层面增加唯一约束
        modelBuilder.Entity<Order>()
            .HasIndex(o => o.ClientRequestId)
            .IsUnique();
    }
}

在业务逻辑中,我们只需要 try-catch 数据库的唯一约束异常。

优势:

  1. 实现简单: 核心逻辑由数据库保证,应用层代码干净。
  2. 强一致性: 依赖数据库事务的原子性,不存在中间状态。

劣势:

  1. 业务入侵: ClientRequestId 这个纯粹的技术概念污染了 Order 这个领域模型。领域模型应该只关心业务,而不是实现细节。
  2. 灵活性差: 如果幂等性逻辑需要扩展,比如不仅要防止重复创建,还要在重复请求时返回第一次成功的结果,这个方案就很难处理。当捕获到唯一约束异常时,我们无法轻易地从异常信息中反查出已存在的订单ID并返回其完整信息。
  3. 性能考量: 每次都依赖数据库抛出异常来控制业务逻辑,是一种反模式。在高并发下,大量的异常抛出与捕获对性能有明确的负面影响。
  4. 适用场景有限: 此方案仅适用于“创建”操作。对于“更新”操作(比如“为订单支付100元”),就无法用简单的唯一约束来保证幂等。

在真实项目中,这种方案通常只作为最后的防线,而非首选。我们需要一个更通用、与业务逻辑解耦的架构方案。

方案B:框架层的通用幂等性中间件

一个更优的架构是将幂等性处理逻辑从业务代码中抽离出来,下沉到框架层。在 ASP.NET Core 中,中间件(Middleware)和过滤器(Filter)是实现这种横切关注点的理想选择。我们可以构建一个通用的幂等性处理管道,对需要幂等的接口进行无感植入。

这个方案的核心流程如下:

sequenceDiagram
    participant Client
    participant Middleware as Idempotency Middleware
    participant Controller
    participant Storage as Distributed Cache (Redis)

    Client->>Middleware: POST /api/orders (Header: Idempotency-Key: xyz)
    Middleware->>Storage: GET idempotency_key:xyz
    alt Key Not Found
        Middleware->>Storage: SET lock:xyz (Acquire Lock)
        alt Lock Acquired
            Middleware->>Controller: Execute Action()
            Controller-->>Middleware: Result (e.g., 201 Created)
            Middleware->>Storage: SET idempotency_key:xyz = Result (with TTL)
            Middleware->>Storage: DEL lock:xyz (Release Lock)
            Middleware-->>Client: Result (201 Created)
        else Lock Failed
            Middleware-->>Client: 409 Conflict (Request in progress)
        end
    else Key Found (Cached Result)
        Middleware-->>Client: Return Cached Result
    end

优势:

  1. 关注点分离: 业务开发者只需关心业务逻辑,通过一个简单的特性(Attribute)即可为接口启用幂等性,无需编写任何重复的幂等处理代码。
  2. 通用性强: 可以处理各种HTTP方法(POST, PUT, PATCH)。可以配置缓存第一次请求的完整响应(状态码、响应体),并在后续重复请求中直接返回,这对于客户端来说是完全透明的。
  3. 可扩展性: 存储机制是可插拔的。虽然通常选用Redis,但可以轻松替换为其他分布式缓存或数据库。

劣势:

  1. 实现复杂: 需要处理分布式锁、并发控制、缓存穿透等问题,实现一个生产级的中间件需要周密的考虑。
  2. 引入新依赖: 需要引入一个外部的分布式缓存/锁服务,比如Redis,增加了系统的复杂度和运维成本。

最终选择与实现

对于一个需要长期演进的系统而言,方案B的优势远大于其劣势。一次投入构建一个健壮的幂等性框架,可以为整个团队节省大量重复劳动,并从架构层面统一幂等性实现,避免出错。

下面,我们将从零开始,在 ASP.NET Core 7 中构建这个基于分布式锁的幂等性中间件。

1. 定义核心抽象

首先,定义幂等性检查的标记。一个 Attribute 是最清晰的声明方式。

IdempotentAttribute.cs

/// <summary>
/// 标记一个Action需要进行幂等性检查。
/// </summary>
[AttributeUsage(AttributeTargets.Method, Inherited = true, AllowMultiple = false)]
public sealed class IdempotentAttribute : Attribute
{
    /// <summary>
    /// 幂等性检查的有效期(秒)。请求结果将被缓存此段时间。
    /// 默认为24小时。
    /// </summary>
    public int ExpiresInSeconds { get; set; } = 24 * 60 * 60;

    /// <summary>
    /// 包含幂等键的HTTP请求头名称。
    /// 默认为 "Idempotency-Key"。
    /// </summary>
    public string HeaderKey { get; set; } = "Idempotency-Key";
}

接下来,是缓存结果的存储接口。这个接口是解耦的关键,它使得底层存储可以是Redis,也可以是内存缓存(用于测试)或其他任何实现。

IIdempotencyCache.cs

using Microsoft.AspNetCore.Mvc;

namespace IdempotencyMiddleware.Core;

/// <summary>
/// 定义幂等性缓存操作的接口。
/// </summary>
public interface IIdempotencyCache
{
    /// <summary>
    /// 尝试获取缓存的响应。
    /// </summary>
    /// <param name="idempotencyKey">幂等键。</param>
    /// <returns>如果找到缓存,则返回IActionResult;否则返回null。</returns>
    Task<IActionResult?> GetResponseAsync(string idempotencyKey);

    /// <summary>
    /// 缓存响应。
    /// </summary>
    /// <param name="idempotencyKey">幂等键。</param>
    /// <param name="response">要缓存的响应结果。</param>
    /// <param name="expiresIn">过期时间。</param>
    Task SetResponseAsync(string idempotencyKey, IActionResult response, TimeSpan expiresIn);
}

2. Redis 实现与分布式锁

我们将使用 StackExchange.Redis 来实现缓存,并利用其 LockTakeAsync 功能来实现一个简单的分布式锁。在真实项目中,使用 RedLock.net 这样的库或者基于Lua脚本的原子操作会更健壮,但这里的实现足以阐明核心思想。

RedisIdempotencyCache.cs

using System.Net;
using System.Text.Json;
using IdempotencyMiddleware.Core;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Mvc.Formatters;
using Microsoft.AspNetCore.Mvc.Infrastructure;
using StackExchange.Redis;

namespace IdempotencyMiddleware.Infrastructure;

public class RedisIdempotencyCache : IIdempotencyCache
{
    private readonly IDatabase _database;
    private readonly IActionContextAccessor _actionContextAccessor;
    private static readonly JsonSerializerOptions _jsonSerializerOptions = new()
    {
        // 配置以支持复杂的 MVC 类型序列化
        PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
    };

    public RedisIdempotencyCache(IConnectionMultiplexer redis, IActionContextAccessor actionContextAccessor)
    {
        _database = redis.GetDatabase();
        _actionContextAccessor = actionContextAccessor;
    }

    public async Task<IActionResult?> GetResponseAsync(string idempotencyKey)
    {
        var cacheKey = $"idempotency:{idempotencyKey}";
        var cachedValue = await _database.StringGetAsync(cacheKey);

        if (cachedValue.IsNullOrEmpty)
        {
            return null;
        }

        // 反序列化存储的响应
        var cachedResponse = JsonSerializer.Deserialize<CachedResponse>(cachedValue!, _jsonSerializerOptions);
        if (cachedResponse == null) return null;

        return ToActionResult(cachedResponse);
    }

    public async Task SetResponseAsync(string idempotencyKey, IActionResult response, TimeSpan expiresIn)
    {
        var cachedResponse = await ToCachedResponseAsync(response);
        if (cachedResponse == null) return;
        
        var cacheKey = $"idempotency:{idempotencyKey}";
        var serializedResponse = JsonSerializer.Serialize(cachedResponse, _jsonSerializerOptions);

        await _database.StringSetAsync(cacheKey, serializedResponse, expiresIn);
    }
    
    // 将 IActionResult 转换为可序列化的对象
    private async Task<CachedResponse?> ToCachedResponseAsync(IActionResult result)
    {
        if (result is not ObjectResult objectResult)
        {
            // 对于非 ObjectResult (如 StatusCodeResult),我们只保存状态码
            if (result is IStatusCodeActionResult statusCodeResult)
            {
                return new CachedResponse
                {
                    StatusCode = statusCodeResult.StatusCode ?? 500, // 默认500
                    ContentType = string.Empty,
                    Body = null
                };
            }
            return null;
        }

        var httpContext = _actionContextAccessor.ActionContext?.HttpContext;
        if (httpContext == null) return null;

        // 这里的坑在于: IActionResult 的 Body 是一个流, 不能直接序列化。
        // 我们需要重新执行一次内容协商和序列化过程,把它变成一个字符串。
        // 这是一个简化的实现,生产环境可能需要更精细的处理。
        var originalBodyStream = httpContext.Response.Body;
        using var memoryStream = new MemoryStream();
        httpContext.Response.Body = memoryStream;

        await result.ExecuteResultAsync(_actionContextAccessor.ActionContext!);
        
        memoryStream.Seek(0, SeekOrigin.Begin);
        var bodyAsText = await new StreamReader(memoryStream).ReadToEndAsync();

        // 恢复原始流
        httpContext.Response.Body = originalBodyStream;

        return new CachedResponse
        {
            StatusCode = objectResult.StatusCode ?? (int)HttpStatusCode.OK,
            ContentType = objectResult.ContentTypes.FirstOrDefault() ?? "application/json",
            Body = bodyAsText
        };
    }
    
    // 将可序列化的对象转换回 IActionResult
    private IActionResult ToActionResult(CachedResponse cachedResponse)
    {
        var result = new ContentResult
        {
            Content = cachedResponse.Body,
            ContentType = cachedResponse.ContentType,
            StatusCode = cachedResponse.StatusCode
        };
        return result;
    }

    // 用于序列化的数据传输对象
    private class CachedResponse
    {
        public int StatusCode { get; set; }
        public string ContentType { get; set; } = string.Empty;
        public string? Body { get; set; }
    }
}

这个实现中有个关键的难点:如何序列化 IActionResultIActionResult 本身是一个接口,其具体实现(如 OkObjectResult)包含不可序列化的组件(如 IOutputFormatter)。我们的解决方案是“重放”ExecuteResultAsync 方法,捕获其输出到内存流,然后将流内容序列化。这是一个取巧但有效的方法。

3. 核心中间件实现

这是所有逻辑的汇集点。我们将使用一个中间件来拦截请求,检查 IdempotentAttribute,并执行核心的 “Lock -> Check -> Execute -> Save” 流程。

IdempotencyMiddleware.cs

using System.Net;
using IdempotencyMiddleware.Core;
using Microsoft.AspNetCore.Mvc.Controllers;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;

namespace IdempotencyMiddleware;

public class IdempotencyMiddleware
{
    private readonly RequestDelegate _next;
    private readonly ILogger<IdempotencyMiddleware> _logger;
    private readonly IIdempotencyCache _cache;
    private readonly IDatabase _redisDatabase; // 用于分布式锁

    public IdempotencyMiddleware(RequestDelegate next, ILogger<IdempotencyMiddleware> logger, IIdempotencyCache cache, IConnectionMultiplexer redis)
    {
        _next = next;
        _logger = logger;
        _cache = cache;
        _redisDatabase = redis.GetDatabase();
    }

    public async Task InvokeAsync(HttpContext context)
    {
        var endpoint = context.GetEndpoint();
        var idempotentAttribute = endpoint?.Metadata.GetMetadata<IdempotentAttribute>();

        // 如果 Action 没有标记 [Idempotent],则直接跳过
        if (idempotentAttribute == null)
        {
            await _next(context);
            return;
        }

        // 提取幂等键
        if (!context.Request.Headers.TryGetValue(idempotentAttribute.HeaderKey, out var idempotencyKey) || string.IsNullOrEmpty(idempotencyKey))
        {
            _logger.LogWarning("Idempotency key header '{HeaderKey}' not found.", idempotentAttribute.HeaderKey);
            context.Response.StatusCode = (int)HttpStatusCode.BadRequest;
            await context.Response.WriteAsync($"Header '{idempotentAttribute.HeaderKey}' is required.");
            return;
        }

        // 检查缓存
        var cachedResponse = await _cache.GetResponseAsync(idempotencyKey!);
        if (cachedResponse != null)
        {
            _logger.LogInformation("Idempotency key '{IdempotencyKey}' found in cache. Returning cached response.", idempotencyKey);
            await ExecuteResultAsync(context, cachedResponse);
            return;
        }

        // 核心逻辑:获取分布式锁来处理并发请求
        var lockKey = $"lock:idempotency:{idempotencyKey}";
        var lockToken = Guid.NewGuid().ToString();
        // 锁的过期时间应该很短,仅用于防止并发执行,而不是长期锁定
        var lockExpiry = TimeSpan.FromSeconds(10); 

        if (!await _redisDatabase.LockTakeAsync(lockKey, lockToken, lockExpiry))
        {
            _logger.LogWarning("Could not acquire lock for idempotency key '{IdempotencyKey}'. Another request is in progress.", idempotencyKey);
            context.Response.StatusCode = (int)HttpStatusCode.Conflict; // 409 Conflict 是一个合适的代码
            await context.Response.WriteAsync("A request with this idempotency key is already in progress.");
            return;
        }

        try
        {
            // 在获得锁之后,必须再次检查缓存。
            // 这是为了处理一个经典竞态条件:
            // 请求A获取锁 -> 请求B等待锁 -> 请求A执行完毕,释放锁,写入缓存 -> 请求B获取到锁。
            // 如果请求B不再次检查,它会重复执行操作。
            cachedResponse = await _cache.GetResponseAsync(idempotencyKey!);
            if (cachedResponse != null)
            {
                _logger.LogInformation("Double-checked cache hit for '{IdempotencyKey}'.", idempotencyKey);
                await ExecuteResultAsync(context, cachedResponse);
                return;
            }

            // 捕获下游执行结果,以便缓存
            var originalBodyStream = context.Response.Body;
            using var responseBody = new MemoryStream();
            context.Response.Body = responseBody;

            await _next(context);

            // 只有成功的请求 (2xx) 才应该被缓存
            if (context.Response.StatusCode >= 200 && context.Response.StatusCode < 300)
            {
                responseBody.Seek(0, SeekOrigin.Begin);
                var responseBodyString = await new StreamReader(responseBody).ReadToEndAsync();
                responseBody.Seek(0, SeekOrigin.Begin);

                // 构建一个 IActionResult 用于缓存
                var resultToCache = new ContentResult
                {
                    Content = responseBodyString,
                    ContentType = context.Response.ContentType,
                    StatusCode = context.Response.StatusCode
                };
                
                var expiresIn = TimeSpan.FromSeconds(idempotentAttribute.ExpiresInSeconds);
                await _cache.SetResponseAsync(idempotencyKey!, resultToCache, expiresIn);
            }
            
            // 将捕获的响应写回原始的响应流
            await responseBody.CopyToAsync(originalBodyStream);
        }
        finally
        {
            // 确保锁被释放
            await _redisDatabase.LockReleaseAsync(lockKey, lockToken);
        }
    }

    private async Task ExecuteResultAsync(HttpContext context, IActionResult result)
    {
        var actionContext = new ActionContext(context, new RouteData(), new ControllerActionDescriptor());
        await result.ExecuteResultAsync(actionContext);
    }
}

4. 注册与使用

最后,将所有组件注册到ASP.NET Core的依赖注入容器中,并启用中间件。

Program.cs

using IdempotencyMiddleware;
using IdempotencyMiddleware.Core;
using IdempotencyMiddleware.Infrastructure;
using Microsoft.AspNetCore.Mvc.Infrastructure;
using StackExchange.Redis;

var builder = WebApplication.CreateBuilder(args);

// 1. 添加控制器和服务
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// 2. 注册 Redis 连接
// 在生产环境中,配置应该来自 appsettings.json
builder.Services.AddSingleton<IConnectionMultiplexer>(
    ConnectionMultiplexer.Connect("localhost:6379,allowAdmin=true")
);

// 3. 注册我们的幂等性服务
builder.Services.AddSingleton<IActionContextAccessor, ActionContextAccessor>();
builder.Services.AddScoped<IIdempotencyCache, RedisIdempotencyCache>();

var app = builder.Build();

if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

// 4. 在认证和授权之后,但在端点执行之前,启用幂等性中间件
app.UseAuthorization();
app.UseMiddleware<IdempotencyMiddleware>(); // <<-- 启用中间件

app.MapControllers();

app.Run();

现在,在任何需要幂等的Controller Action上,只需添加 [Idempotent] 特性即可。

OrdersController.cs

[ApiController]
[Route("api/[controller]")]
public class OrdersController : ControllerBase
{
    private readonly ILogger<OrdersController> _logger;

    public OrdersController(ILogger<OrdersController> logger)
    {
        _logger = logger;
    }

    [HttpPost]
    [Idempotent(ExpiresInSeconds = 60 * 60)] // 幂等性保护,结果缓存1小时
    public async Task<IActionResult> CreateOrder([FromBody] OrderRequest request, [FromHeader(Name = "Idempotency-Key")] string idempotencyKey)
    {
        // 在真实项目中,这里会执行数据库操作、调用其他服务等
        _logger.LogInformation("Processing order creation for key: {key}", idempotencyKey);
        
        // 模拟耗时操作
        await Task.Delay(1000); 

        var order = new { Id = Guid.NewGuid(), request.ProductId, request.Quantity, CreatedAt = DateTime.UtcNow };
        
        // 返回一个 201 Created 结果,并包含新创建的资源位置
        return CreatedAtAction(nameof(GetOrderById), new { id = order.Id }, order);
    }
    
    [HttpGet("{id}")]
    public IActionResult GetOrderById(Guid id)
    {
        return Ok(new { Id = id, ProductId = "PROD-123", Quantity = 1 });
    }
}

public record OrderRequest(string ProductId, int Quantity);

架构的局限性与未来迭代

这个方案虽然健壮,但并非银弹。它存在一些固有的局限性:

  1. Redis 依赖: 整个幂等性保障强依赖于Redis的可用性和性能。如果Redis集群出现故障,所有被 [Idempotent] 保护的接口都将失效。必须为Redis做高可用部署。
  2. 锁的性能开销: 每次请求都需要与Redis进行至少一次网络交互以获取锁,这会增加请求的延迟。对于非幂等接口,这个开销是不存在的。
  3. 缓存内容的大小: 我们缓存了完整的HTTP响应体。如果接口返回的数据非常大(比如一个巨大的JSON数组或文件),这会占用大量Redis内存,并增加序列化/反序列化的开销。
  4. 键的冲突: Idempotency-Key 的生成策略由客户端决定。如果客户端的UUID生成算法有问题,可能导致不同请求产生相同的键,从而错误地返回缓存结果。

未来的迭代方向可以包括:

  • 存储策略优化: 对于大响应体,可以考虑只缓存一个指向持久化存储(如S3或数据库)的引用,而不是将整个响应体塞进Redis。
  • 无锁化方案探索: 对于某些场景,可以探索基于状态机的幂等性实现,或者使用支持原子性SET if Not Exists操作的存储,减少对显式锁的依赖。
  • 与API网关集成: 更进一步,可以将幂等性检查上移到API网关层(如Nginx+Lua, Kong, APISIX)。这样做的好处是,重复的请求甚至不会到达后端服务,进一步节省了计算资源。但这会增加网关的复杂性,且网关层通常难以获取完整的业务执行结果。

  目录