集成Saga与ELK实现从Compose UI到Hudi数据湖的分布式事务追踪


一台内部运营设备上,操作员在Jetpack Compose构建的界面上点击了“启动月度数据对账”按钮。屏幕显示任务已提交,进度条开始缓缓移动。三十分钟后,进度条卡在65%,最终弹出一个模糊的“对账失败”提示。现在,问题来了:故障点在哪?是客户端网络请求超时?是订单服务在执行补偿逻辑时出错?是资金服务调用下游依赖失败?还是最终写入数据湖的Spark作业因为数据倾斜而崩溃?日志散落在四个微服务、一个客户端和Hadoop集群的YARN日志中,时间戳是我们唯一的线索,一场跨越多个技术栈的“日志风暴”开始了。

定义问题:跨域追踪的鸿沟

在我们的系统中,一个完整的业务流程横跨了三个完全不同的技术领域:

  1. 移动客户端 (Jetpack Compose): 作为业务流程的起点,负责参数收集和任务触发。其运行环境和生命周期与后端完全隔离。
  2. 后端微服务 (Spring Boot): 采用Saga模式编排多个服务(如订单、库存、支付)来完成一个长周期的分布式事务。服务间通信可能是同步HTTP调用,也可能是异步消息。
  3. 数据平台 (Spark & Apache Hudi): Saga的最终结果或中间状态,需要以事务方式写入Apache Hudi数据湖,供后续的分析和审计。

当失败发生时,我们面临的根本挑战是缺乏一个统一的上下文标识符,能够将用户在Compose UI上的一次点击,与Saga事务中的每一个步骤,以及最终写入Hudi的那个Commit串联起来。没有这个标识符,故障排查效率极低,平均故障恢复时间(MTTR)居高不下。

方案A:依赖人工关联的临时方案

在项目初期,我们采用了最直接的方法:在每个组件中尽可能多地记录业务标识符,例如 userId, taskId, orderId 等,并依赖工程师的人工经验在Kibana中组合查询。

  • 实现方式:

    • 客户端日志:记录当前登录的 userId 和生成的 taskId
    • 后端服务日志:在业务逻辑中手动传递 taskId,并在日志中打印。
    • Spark作业:通过启动参数传入 taskId,在日志中输出。
  • 优势:

    • 实现成本极低,无需引入新的技术栈或对架构进行大的改动。
  • 劣势:

    • 不可靠: 严重依赖开发人员的自觉性。任何一个环节忘记传递或记录 taskId,追踪链条就会断裂。
    • 效率低下: 当问题复杂,涉及多个业务ID时,查询变得异常困难。例如,一个对账任务可能涉及数千个 orderId,无法通过单一ID进行有效筛选。
    • 无法扩展: 无法形成标准化的监控和告警。我们不能基于“某个Saga事务失败”来创建告警,因为系统本身无法识别一个完整的事务链。

在真实项目中,这种方式在系统复杂度提升后迅速失效。一次线上故障的排查耗费了两位工程师近四个小时,最终才定位到一个参与方服务的幂等性处理缺陷。这坚定了我们寻找一个系统性解决方案的决心。

方案B:全面的OpenTelemetry分布式追踪

另一个备选方案是全面拥抱OpenTelemetry标准,为客户端、所有微服务乃至Spark作业都集成SDK,构建一个完整的分布式追踪体系。

  • 实现方式:

    • 在Compose应用中引入OpenTelemetry SDK,自动或手动创建Span来包裹用户操作和网络请求。
    • 在Spring Boot微服务中,使用OTel Java Agent进行无侵入式的埋点,自动完成服务间调用的上下文传递。
    • 为Spark作业编写特定的Instrumentation逻辑,确保TraceContext能够跨越Driver和Executor进行传递。
    • 部署OpenTelemetry Collector来接收、处理和导出Trace数据到Jaeger或Zipkin,同时将日志导出到ELK。
  • 优势:

    • 标准化: 遵循业界通用标准,生态成熟。
    • 功能强大: 提供丰富的Trace和Span信息,不仅能用于故障排查,还能用于性能瓶颈分析、服务依赖拓扑可视化等。
  • 劣势:

    • 实施复杂性高: 对团队的技术要求高。特别是在Spark等非标准HTTP服务环境中正确地传递上下文,存在不少坑。
    • 资源开销大: 完整的追踪数据量巨大,对存储和计算资源有较高要求,可能带来显著的成本增加。
    • 过度设计: 我们当前的核心痛点是Saga事务的失败审计和端到端调试,而非微秒级的性能分析。引入一个完整的追踪系统,有点“杀鸡用牛刀”的意味。

最终选择:基于关联ID与结构化日志的务实方案

权衡之后,我们决定采用一种介于两者之间的务实方案:强制推行一个全局唯一的关联ID (Correlation ID),并结合全链路的结构化JSON日志。

这个方案的核心思想是:为每个从客户端发起的业务流程生成一个唯一的correlationId,并确保这个ID在后续的所有同步调用、异步消息、数据处理任务中被不间断地传递。同时,所有组件的日志都必须是结构化的(JSON格式),并且每条日志记录都必须包含这个correlationId字段。

  • 选择理由:
    1. 目标明确: 直击核心痛点——打通追踪链。它以最低的成本解决了最重要的问题。
    2. 易于实施: 相较于OpenTelemetry,该方案对现有代码的侵入性更小。主要工作在于中间件的配置(如API网关、HTTP客户端拦截器、日志框架),业务代码只需少量改动或无需改动。
    3. 成本可控: 仅增加了少量日志字段,对ELK的存储和查询压力影响不大。
    4. 足够强大: 通过在Kibana中筛选一个correlationId,我们就能获得该请求从客户端到数据湖的完整日志视图,足以满足绝大多数Saga事务的调试需求。

以下是该方案的整体架构和核心实现细节。

graph TD
    subgraph Client [Android客户端]
        A[Compose UI] -- 触发操作, 生成CorrelationID --> B[ViewModel]
        B -- 发起请求 --> C[OkHttp/Ktor Interceptor]
    end

    subgraph Backend [后端微服务集群]
        C -- HTTP请求 (Header: X-Correlation-ID) --> D[API Gateway]
        D -- 转发请求 --> E[Saga Orchestrator Service]
        E -- 调用 --> F[Participant Service A]
        E -- 调用 --> G[Participant Service B]
        E -- 提交异步任务 --> H[Message Queue]
    end

    subgraph DataPlatform [数据处理平台]
        H -- 消费消息 (Payload含CorrelationID) --> I[Spark Driver]
        I -- 分发任务 --> J[Spark Executors]
        J -- 写入数据 --> K[Apache Hudi]
    end

    subgraph Observability [可观测性平台]
        L[Filebeat]
        M[Logstash]
        N[Elasticsearch]
        O[Kibana]
    end

    C -- Log --> L
    E -- Log --> L
    F -- Log --> L
    G -- Log --> L
    I -- Log --> L
    J -- Log --> L
    L --> M
    M --> N
    N <--> O

    style K fill:#f9f,stroke:#333,stroke-width:2px
    style O fill:#bbf,stroke:#333,stroke-width:2px

核心实现概览

1. Jetpack Compose客户端:生成并传递ID

一切的源头在客户端。当用户发起一个需要追踪的业务操作时,我们必须生成correlationId并将其注入到发出的第一个网络请求中。

ViewModel 中,我们使用 UUID 生成ID。

// In some ViewModel
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.launch
import java.util.UUID

// 假设这是我们的数据仓库或Use Case
class ReconciliationRepository(private val apiClient: ApiClient) {
    suspend fun startReconciliation(params: Map<String, String>, correlationId: String) {
        // apiClient 内部会处理 Header
        apiClient.post("/reconciliation/start", params, correlationId)
    }
}

class ReconciliationViewModel(private val repository: ReconciliationRepository) : ViewModel() {

    fun onStartReconciliationClicked() {
        viewModelScope.launch {
            // 关键步骤:在业务流程开始时生成唯一的关联ID
            val correlationId = UUID.randomUUID().toString()
            
            // 使用结构化日志库 (如 Timber + 自定义JsonTree) 记录操作起点
            // Log.i("Starting reconciliation process with correlationId: $correlationId")
            
            try {
                val params = mapOf("month" to "2023-10")
                repository.startReconciliation(params, correlationId)
                // 更新UI为成功状态
            } catch (e: Exception) {
                // 即使是客户端本地错误,日志中也应包含ID
                // Log.e(e, "Failed to start reconciliation for correlationId: $correlationId")
                // 更新UI为失败状态
            }
        }
    }
}

为了避免在每个请求中手动添加Header,我们使用网络库的拦截器。以下是使用Ktor客户端的示例:

import io.ktor.client.*
import io.ktor.client.engine.okhttp.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*

object ApiClientFactory {
    
    private const val CORRELATION_ID_HEADER = "X-Correlation-ID"

    fun create(): HttpClient {
        return HttpClient(OkHttp) {
            defaultRequest {
                // 这是一个示例,实际中ID通过参数传入
                // header(CORRELATION_ID_HEADER, UUID.randomUUID().toString())
            }
        }
    }
    
    // 扩展函数,用于在请求级别设置ID
    suspend fun HttpClient.postWithCorrelation(
        urlString: String, 
        body: Any, 
        correlationId: String
    ) {
        post(urlString) {
            // 核心:将 correlationId 注入请求头
            header(CORRELATION_ID_HEADER, correlationId)
            setBody(body)
        }
    }
}

同时,配置一个结构化日志库(如Timber配合一个自定义的JsonTree),将correlationId作为通用字段写入所有客户端日志,这对于排查客户端独有的问题(如UI渲染、本地数据库)至关重要。

2. Spring Boot微服务:接收、处理和传播ID

后端服务是correlationId传递的核心枢纽。我们利用 ThreadLocal 机制(具体实现为SLF4J的MDC)来自动将correlationId注入到所有日志中。

首先,创建一个Servlet Filter 来从请求头中提取ID并放入MDC。

import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.UUID;

@Component
public class CorrelationIdFilter extends OncePerRequestFilter {

    private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
    private static final String CORRELATION_ID_MDC_KEY = "correlationId";

    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
            throws ServletException, IOException {
        try {
            String correlationId = request.getHeader(CORRELATION_ID_HEADER);
            if (correlationId == null || correlationId.isEmpty()) {
                // 如果上游没有提供,我们生成一个新的。这确保了链路的完整性。
                correlationId = UUID.randomUUID().toString();
            }
            MDC.put(CORRELATION_ID_MDC_KEY, correlationId);
            // 确保下游服务也能收到
            response.setHeader(CORRELATION_ID_HEADER, correlationId);
            filterChain.doFilter(request, response);
        } finally {
            // 在请求处理完毕后清理MDC,防止内存泄漏
            MDC.remove(CORRELATION_ID_MDC_KEY);
        }
    }
}

接下来,配置 logback-spring.xml 以JSON格式输出日志,并自动包含MDC中的所有字段。

<configuration>
    <springProperty scope="context" name="appName" source="spring.application.name"/>

    <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder">
            <includeMdc>true</includeMdc> <!-- 关键:包含MDC中的所有内容 -->
            <customFields>{"app_name":"${appName}"}</customFields>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="jsonConsoleAppender"/>
    </root>
</configuration>

现在,任何地方的一行 log.info("Processing order"); 都会自动生成如下的JSON日志,correlationId 已内嵌其中:

{
  "@timestamp": "2023-10-27T10:45:00.123Z",
  "message": "Processing order",
  "level": "INFO",
  "thread_name": "http-nio-8080-exec-1",
  "logger_name": "com.example.SagaOrchestrator",
  "app_name": "saga-orchestrator-service",
  "correlationId": "a1b2c3d4-e5f6-7890-1234-567890abcdef"
}

对于服务间的调用(如使用RestTemplateFeignClient),还需要配置拦截器,从MDC中读取correlationId并添加到出站请求的Header中,从而完成ID的传播。

3. Spark与Apache Hudi:将追踪链延伸至数据湖

这是最棘手但价值最高的一步。Saga流程通常会通过消息队列触发一个异步的Spark作业。我们需要确保correlationId能被传递给这个作业,并最终记录在Hudi的元数据中。

步骤1: 通过消息传递ID

当Saga编排器服务需要触发Spark作业时,它会向Kafka或RabbitMQ发送一条消息。此时,correlationId必须是消息体或消息头的一部分。

// 在Saga编排器中
public void triggerHudiWriteJob(ReconciliationTask task) {
    String correlationId = MDC.get("correlationId"); // 从MDC获取
    JobTriggerMessage message = new JobTriggerMessage(task.getData(), correlationId);
    // kafkaTemplate.send("hudi-jobs-topic", message);
    log.info("Submitted Hudi write job for task {}", task.getId()); // 这条日志也会带上ID
}

步骤2: Spark作业接收并使用ID

Spark作业消费消息后,提取correlationId并将其贯穿整个作业生命周期。在Spark中,跨Stage和Task传递变量需要使用广播变量或在操作RDD/DataFrame时闭包捕获。

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}

object HudiWriterJob {
  def main(args: Array[String]): Unit = {
    // 实际项目中,这些信息从消息队列中消费得到
    val jsonData = """{"id": 1, "data": "some_value", "ts": 1698380400}"""
    val correlationId = "a1b2c3d4-e5f6-7890-1234-567890abcdef" // 从消息中解析

    val spark = SparkSession.builder()
      .appName("Hudi Writer with CorrelationId")
      .master("local[*]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
      
    // 在Spark Driver和Executor日志中也使用MDC
    // 这需要一些额外的配置,比如使用 log4j-spark-appender
    // org.slf4j.MDC.put("correlationId", correlationId)
    // log.info("Starting Hudi job")

    import spark.implicits._
    val df = spark.read.json(Seq(jsonData).toDS)

    val hudiTableName = "reconciliation_results"
    val hudiBasePath = "/tmp/hudi_tables/" + hudiTableName

    df.write.format("hudi")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "") // 非分区表
      .option(HoodieWriteConfig.TABLE_NAME.key, hudiTableName)
      .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
      // 关键步骤:将 correlationId 注入到Hudi的commit metadata中
      .option(s"${HoodieWriteConfig.METADATA_KEY_PREFIX}correlation.id", correlationId)
      .option(s"${HoodieWriteConfig.METADATA_KEY_PREFIX}triggered.by", "SagaOrchestrator")
      .mode(SaveMode.Append)
      .save(hudiBasePath)
      
    // org.slf4j.MDC.clear()
  }
}

HoodieWriteConfig.METADATA_KEY_PREFIX 这个选项是Hudi提供的强大功能,允许我们将自定义元数据附加到每一次提交(commit)上。这意味着,我们不仅可以在日志中追踪到这个作业,还可以直接在Hudi表的时间轴上,看到哪一次数据变更是由哪个correlationId触发的。这对于数据审计和问题溯源是极其宝贵的。

4. ELK Stack:汇聚与查询

所有组件的日志都以统一的JSON格式流向ELK。

  • Filebeat: 部署在所有服务器和客户端设备上(如果可能),收集日志文件。
  • Logstash: (可选,但推荐) 用于对日志进行进一步的解析、丰富和路由。它的json filter可以轻松处理我们的结构化日志。
  • Elasticsearch: 存储并索引日志。确保correlationId字段被映射为keyword类型,以获得最佳的过滤性能。
  • Kibana: 可视化和查询。

现在,当开头提到的那个“对账失败”问题再次出现时,我们的排查流程彻底改变了:

  1. 从用户处获取大致的操作时间,或从告警中得到一个失败的taskId
  2. 在Kibana中用taskId进行初步搜索,找到对应的correlationId
  3. 清除所有过滤器,只保留 correlationId: "a1b2c3d4-e5f6-7890-1234-567890abcdef"
  4. 瞬间,从Compose客户端发起的请求日志,到Saga编排器、各个参与方服务的执行与补偿日志,再到Spark作业的启动和执行日志,都按时间顺序清晰地呈现在眼前。我们可以快速定位到第一个出现ERROR级别日志的组件,从而锁定问题根源。

架构的扩展性与局限性

这个基于关联ID的方案虽然务实,但并非万能。

  • 扩展性:

    • 协议无关: 核心思想是传递一个标识符,不限于HTTP。在消息队列、gRPC调用中同样适用。
    • 易于升级: 未来如果需要更详细的性能分析,可以在此基础上平滑升级到OpenTelemetry。现有的correlationId可以作为traceId的根源,MDC中的信息也可以被OTel Agent自动捕获并添加为Span的属性。
    • 丰富上下文: 我们可以向MDC中添加更多有用的上下文,如sagaIdstepNameuserId等,进一步增强日志的可分析性。
  • 局限性:

    • 依赖纪律: 整个体系的有效性依赖于所有开发团队都严格遵守“传递ID”和“结构化日志”的约定。任何一个环节的遗漏都会导致追踪链中断。这需要通过代码审查、静态分析工具和团队共识来保障。
    • 异步难题: 在复杂的异步场景下(如CompletableFuture的线程切换、响应式编程框架),需要额外的工作来确保MDC上下文能正确地跨线程传递。虽然有成熟的库(如reactor-core对MDC的支持)可以解决,但这增加了额外的复杂性。
    • 非排他性: 它主要解决“发生了什么”和“在哪里发生”的问题,对于“为什么慢”这类性能问题,它提供的线索有限,这正是完整分布式追踪系统的长处。

对于一个以业务交付和系统稳定性为首要目标的工程团队而言,这套方案在投入产出比上达到了一个极佳的平衡点。它将一个原先需要数小时甚至数天的复杂分布式系统调试工作,缩短到了几分钟之内。


  目录