一台内部运营设备上,操作员在Jetpack Compose构建的界面上点击了“启动月度数据对账”按钮。屏幕显示任务已提交,进度条开始缓缓移动。三十分钟后,进度条卡在65%,最终弹出一个模糊的“对账失败”提示。现在,问题来了:故障点在哪?是客户端网络请求超时?是订单服务在执行补偿逻辑时出错?是资金服务调用下游依赖失败?还是最终写入数据湖的Spark作业因为数据倾斜而崩溃?日志散落在四个微服务、一个客户端和Hadoop集群的YARN日志中,时间戳是我们唯一的线索,一场跨越多个技术栈的“日志风暴”开始了。
定义问题:跨域追踪的鸿沟
在我们的系统中,一个完整的业务流程横跨了三个完全不同的技术领域:
- 移动客户端 (Jetpack Compose): 作为业务流程的起点,负责参数收集和任务触发。其运行环境和生命周期与后端完全隔离。
- 后端微服务 (Spring Boot): 采用Saga模式编排多个服务(如订单、库存、支付)来完成一个长周期的分布式事务。服务间通信可能是同步HTTP调用,也可能是异步消息。
- 数据平台 (Spark & Apache Hudi): Saga的最终结果或中间状态,需要以事务方式写入Apache Hudi数据湖,供后续的分析和审计。
当失败发生时,我们面临的根本挑战是缺乏一个统一的上下文标识符,能够将用户在Compose UI上的一次点击,与Saga事务中的每一个步骤,以及最终写入Hudi的那个Commit串联起来。没有这个标识符,故障排查效率极低,平均故障恢复时间(MTTR)居高不下。
方案A:依赖人工关联的临时方案
在项目初期,我们采用了最直接的方法:在每个组件中尽可能多地记录业务标识符,例如 userId
, taskId
, orderId
等,并依赖工程师的人工经验在Kibana中组合查询。
实现方式:
- 客户端日志:记录当前登录的
userId
和生成的taskId
。 - 后端服务日志:在业务逻辑中手动传递
taskId
,并在日志中打印。 - Spark作业:通过启动参数传入
taskId
,在日志中输出。
- 客户端日志:记录当前登录的
优势:
- 实现成本极低,无需引入新的技术栈或对架构进行大的改动。
劣势:
- 不可靠: 严重依赖开发人员的自觉性。任何一个环节忘记传递或记录
taskId
,追踪链条就会断裂。 - 效率低下: 当问题复杂,涉及多个业务ID时,查询变得异常困难。例如,一个对账任务可能涉及数千个
orderId
,无法通过单一ID进行有效筛选。 - 无法扩展: 无法形成标准化的监控和告警。我们不能基于“某个Saga事务失败”来创建告警,因为系统本身无法识别一个完整的事务链。
- 不可靠: 严重依赖开发人员的自觉性。任何一个环节忘记传递或记录
在真实项目中,这种方式在系统复杂度提升后迅速失效。一次线上故障的排查耗费了两位工程师近四个小时,最终才定位到一个参与方服务的幂等性处理缺陷。这坚定了我们寻找一个系统性解决方案的决心。
方案B:全面的OpenTelemetry分布式追踪
另一个备选方案是全面拥抱OpenTelemetry标准,为客户端、所有微服务乃至Spark作业都集成SDK,构建一个完整的分布式追踪体系。
实现方式:
- 在Compose应用中引入OpenTelemetry SDK,自动或手动创建
Span
来包裹用户操作和网络请求。 - 在Spring Boot微服务中,使用OTel Java Agent进行无侵入式的埋点,自动完成服务间调用的上下文传递。
- 为Spark作业编写特定的Instrumentation逻辑,确保
TraceContext
能够跨越Driver和Executor进行传递。 - 部署OpenTelemetry Collector来接收、处理和导出Trace数据到Jaeger或Zipkin,同时将日志导出到ELK。
- 在Compose应用中引入OpenTelemetry SDK,自动或手动创建
优势:
- 标准化: 遵循业界通用标准,生态成熟。
- 功能强大: 提供丰富的Trace和Span信息,不仅能用于故障排查,还能用于性能瓶颈分析、服务依赖拓扑可视化等。
劣势:
- 实施复杂性高: 对团队的技术要求高。特别是在Spark等非标准HTTP服务环境中正确地传递上下文,存在不少坑。
- 资源开销大: 完整的追踪数据量巨大,对存储和计算资源有较高要求,可能带来显著的成本增加。
- 过度设计: 我们当前的核心痛点是Saga事务的失败审计和端到端调试,而非微秒级的性能分析。引入一个完整的追踪系统,有点“杀鸡用牛刀”的意味。
最终选择:基于关联ID与结构化日志的务实方案
权衡之后,我们决定采用一种介于两者之间的务实方案:强制推行一个全局唯一的关联ID (Correlation ID),并结合全链路的结构化JSON日志。
这个方案的核心思想是:为每个从客户端发起的业务流程生成一个唯一的correlationId
,并确保这个ID在后续的所有同步调用、异步消息、数据处理任务中被不间断地传递。同时,所有组件的日志都必须是结构化的(JSON格式),并且每条日志记录都必须包含这个correlationId
字段。
- 选择理由:
- 目标明确: 直击核心痛点——打通追踪链。它以最低的成本解决了最重要的问题。
- 易于实施: 相较于OpenTelemetry,该方案对现有代码的侵入性更小。主要工作在于中间件的配置(如API网关、HTTP客户端拦截器、日志框架),业务代码只需少量改动或无需改动。
- 成本可控: 仅增加了少量日志字段,对ELK的存储和查询压力影响不大。
- 足够强大: 通过在Kibana中筛选一个
correlationId
,我们就能获得该请求从客户端到数据湖的完整日志视图,足以满足绝大多数Saga事务的调试需求。
以下是该方案的整体架构和核心实现细节。
graph TD subgraph Client [Android客户端] A[Compose UI] -- 触发操作, 生成CorrelationID --> B[ViewModel] B -- 发起请求 --> C[OkHttp/Ktor Interceptor] end subgraph Backend [后端微服务集群] C -- HTTP请求 (Header: X-Correlation-ID) --> D[API Gateway] D -- 转发请求 --> E[Saga Orchestrator Service] E -- 调用 --> F[Participant Service A] E -- 调用 --> G[Participant Service B] E -- 提交异步任务 --> H[Message Queue] end subgraph DataPlatform [数据处理平台] H -- 消费消息 (Payload含CorrelationID) --> I[Spark Driver] I -- 分发任务 --> J[Spark Executors] J -- 写入数据 --> K[Apache Hudi] end subgraph Observability [可观测性平台] L[Filebeat] M[Logstash] N[Elasticsearch] O[Kibana] end C -- Log --> L E -- Log --> L F -- Log --> L G -- Log --> L I -- Log --> L J -- Log --> L L --> M M --> N N <--> O style K fill:#f9f,stroke:#333,stroke-width:2px style O fill:#bbf,stroke:#333,stroke-width:2px
核心实现概览
1. Jetpack Compose客户端:生成并传递ID
一切的源头在客户端。当用户发起一个需要追踪的业务操作时,我们必须生成correlationId
并将其注入到发出的第一个网络请求中。
在 ViewModel
中,我们使用 UUID
生成ID。
// In some ViewModel
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.launch
import java.util.UUID
// 假设这是我们的数据仓库或Use Case
class ReconciliationRepository(private val apiClient: ApiClient) {
suspend fun startReconciliation(params: Map<String, String>, correlationId: String) {
// apiClient 内部会处理 Header
apiClient.post("/reconciliation/start", params, correlationId)
}
}
class ReconciliationViewModel(private val repository: ReconciliationRepository) : ViewModel() {
fun onStartReconciliationClicked() {
viewModelScope.launch {
// 关键步骤:在业务流程开始时生成唯一的关联ID
val correlationId = UUID.randomUUID().toString()
// 使用结构化日志库 (如 Timber + 自定义JsonTree) 记录操作起点
// Log.i("Starting reconciliation process with correlationId: $correlationId")
try {
val params = mapOf("month" to "2023-10")
repository.startReconciliation(params, correlationId)
// 更新UI为成功状态
} catch (e: Exception) {
// 即使是客户端本地错误,日志中也应包含ID
// Log.e(e, "Failed to start reconciliation for correlationId: $correlationId")
// 更新UI为失败状态
}
}
}
}
为了避免在每个请求中手动添加Header,我们使用网络库的拦截器。以下是使用Ktor客户端的示例:
import io.ktor.client.*
import io.ktor.client.engine.okhttp.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
object ApiClientFactory {
private const val CORRELATION_ID_HEADER = "X-Correlation-ID"
fun create(): HttpClient {
return HttpClient(OkHttp) {
defaultRequest {
// 这是一个示例,实际中ID通过参数传入
// header(CORRELATION_ID_HEADER, UUID.randomUUID().toString())
}
}
}
// 扩展函数,用于在请求级别设置ID
suspend fun HttpClient.postWithCorrelation(
urlString: String,
body: Any,
correlationId: String
) {
post(urlString) {
// 核心:将 correlationId 注入请求头
header(CORRELATION_ID_HEADER, correlationId)
setBody(body)
}
}
}
同时,配置一个结构化日志库(如Timber配合一个自定义的JsonTree
),将correlationId
作为通用字段写入所有客户端日志,这对于排查客户端独有的问题(如UI渲染、本地数据库)至关重要。
2. Spring Boot微服务:接收、处理和传播ID
后端服务是correlationId
传递的核心枢纽。我们利用 ThreadLocal
机制(具体实现为SLF4J的MDC)来自动将correlationId
注入到所有日志中。
首先,创建一个Servlet Filter
来从请求头中提取ID并放入MDC。
import org.slf4j.MDC;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.UUID;
@Component
public class CorrelationIdFilter extends OncePerRequestFilter {
private static final String CORRELATION_ID_HEADER = "X-Correlation-ID";
private static final String CORRELATION_ID_MDC_KEY = "correlationId";
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
throws ServletException, IOException {
try {
String correlationId = request.getHeader(CORRELATION_ID_HEADER);
if (correlationId == null || correlationId.isEmpty()) {
// 如果上游没有提供,我们生成一个新的。这确保了链路的完整性。
correlationId = UUID.randomUUID().toString();
}
MDC.put(CORRELATION_ID_MDC_KEY, correlationId);
// 确保下游服务也能收到
response.setHeader(CORRELATION_ID_HEADER, correlationId);
filterChain.doFilter(request, response);
} finally {
// 在请求处理完毕后清理MDC,防止内存泄漏
MDC.remove(CORRELATION_ID_MDC_KEY);
}
}
}
接下来,配置 logback-spring.xml
以JSON格式输出日志,并自动包含MDC中的所有字段。
<configuration>
<springProperty scope="context" name="appName" source="spring.application.name"/>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeMdc>true</includeMdc> <!-- 关键:包含MDC中的所有内容 -->
<customFields>{"app_name":"${appName}"}</customFields>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
现在,任何地方的一行 log.info("Processing order");
都会自动生成如下的JSON日志,correlationId
已内嵌其中:
{
"@timestamp": "2023-10-27T10:45:00.123Z",
"message": "Processing order",
"level": "INFO",
"thread_name": "http-nio-8080-exec-1",
"logger_name": "com.example.SagaOrchestrator",
"app_name": "saga-orchestrator-service",
"correlationId": "a1b2c3d4-e5f6-7890-1234-567890abcdef"
}
对于服务间的调用(如使用RestTemplate
或FeignClient
),还需要配置拦截器,从MDC中读取correlationId
并添加到出站请求的Header中,从而完成ID的传播。
3. Spark与Apache Hudi:将追踪链延伸至数据湖
这是最棘手但价值最高的一步。Saga流程通常会通过消息队列触发一个异步的Spark作业。我们需要确保correlationId
能被传递给这个作业,并最终记录在Hudi的元数据中。
步骤1: 通过消息传递ID
当Saga编排器服务需要触发Spark作业时,它会向Kafka或RabbitMQ发送一条消息。此时,correlationId
必须是消息体或消息头的一部分。
// 在Saga编排器中
public void triggerHudiWriteJob(ReconciliationTask task) {
String correlationId = MDC.get("correlationId"); // 从MDC获取
JobTriggerMessage message = new JobTriggerMessage(task.getData(), correlationId);
// kafkaTemplate.send("hudi-jobs-topic", message);
log.info("Submitted Hudi write job for task {}", task.getId()); // 这条日志也会带上ID
}
步骤2: Spark作业接收并使用ID
Spark作业消费消息后,提取correlationId
并将其贯穿整个作业生命周期。在Spark中,跨Stage和Task传递变量需要使用广播变量或在操作RDD/DataFrame时闭包捕获。
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.{SaveMode, SparkSession}
object HudiWriterJob {
def main(args: Array[String]): Unit = {
// 实际项目中,这些信息从消息队列中消费得到
val jsonData = """{"id": 1, "data": "some_value", "ts": 1698380400}"""
val correlationId = "a1b2c3d4-e5f6-7890-1234-567890abcdef" // 从消息中解析
val spark = SparkSession.builder()
.appName("Hudi Writer with CorrelationId")
.master("local[*]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
// 在Spark Driver和Executor日志中也使用MDC
// 这需要一些额外的配置,比如使用 log4j-spark-appender
// org.slf4j.MDC.put("correlationId", correlationId)
// log.info("Starting Hudi job")
import spark.implicits._
val df = spark.read.json(Seq(jsonData).toDS)
val hudiTableName = "reconciliation_results"
val hudiBasePath = "/tmp/hudi_tables/" + hudiTableName
df.write.format("hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "id")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "") // 非分区表
.option(HoodieWriteConfig.TABLE_NAME.key, hudiTableName)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
// 关键步骤:将 correlationId 注入到Hudi的commit metadata中
.option(s"${HoodieWriteConfig.METADATA_KEY_PREFIX}correlation.id", correlationId)
.option(s"${HoodieWriteConfig.METADATA_KEY_PREFIX}triggered.by", "SagaOrchestrator")
.mode(SaveMode.Append)
.save(hudiBasePath)
// org.slf4j.MDC.clear()
}
}
HoodieWriteConfig.METADATA_KEY_PREFIX
这个选项是Hudi提供的强大功能,允许我们将自定义元数据附加到每一次提交(commit)上。这意味着,我们不仅可以在日志中追踪到这个作业,还可以直接在Hudi表的时间轴上,看到哪一次数据变更是由哪个correlationId
触发的。这对于数据审计和问题溯源是极其宝贵的。
4. ELK Stack:汇聚与查询
所有组件的日志都以统一的JSON格式流向ELK。
- Filebeat: 部署在所有服务器和客户端设备上(如果可能),收集日志文件。
- Logstash: (可选,但推荐) 用于对日志进行进一步的解析、丰富和路由。它的
json
filter可以轻松处理我们的结构化日志。 - Elasticsearch: 存储并索引日志。确保
correlationId
字段被映射为keyword
类型,以获得最佳的过滤性能。 - Kibana: 可视化和查询。
现在,当开头提到的那个“对账失败”问题再次出现时,我们的排查流程彻底改变了:
- 从用户处获取大致的操作时间,或从告警中得到一个失败的
taskId
。 - 在Kibana中用
taskId
进行初步搜索,找到对应的correlationId
。 - 清除所有过滤器,只保留
correlationId: "a1b2c3d4-e5f6-7890-1234-567890abcdef"
。 - 瞬间,从Compose客户端发起的请求日志,到Saga编排器、各个参与方服务的执行与补偿日志,再到Spark作业的启动和执行日志,都按时间顺序清晰地呈现在眼前。我们可以快速定位到第一个出现
ERROR
级别日志的组件,从而锁定问题根源。
架构的扩展性与局限性
这个基于关联ID的方案虽然务实,但并非万能。
扩展性:
- 协议无关: 核心思想是传递一个标识符,不限于HTTP。在消息队列、gRPC调用中同样适用。
- 易于升级: 未来如果需要更详细的性能分析,可以在此基础上平滑升级到OpenTelemetry。现有的
correlationId
可以作为traceId
的根源,MDC中的信息也可以被OTel Agent自动捕获并添加为Span的属性。 - 丰富上下文: 我们可以向MDC中添加更多有用的上下文,如
sagaId
、stepName
、userId
等,进一步增强日志的可分析性。
局限性:
- 依赖纪律: 整个体系的有效性依赖于所有开发团队都严格遵守“传递ID”和“结构化日志”的约定。任何一个环节的遗漏都会导致追踪链中断。这需要通过代码审查、静态分析工具和团队共识来保障。
- 异步难题: 在复杂的异步场景下(如
CompletableFuture
的线程切换、响应式编程框架),需要额外的工作来确保MDC上下文能正确地跨线程传递。虽然有成熟的库(如reactor-core
对MDC的支持)可以解决,但这增加了额外的复杂性。 - 非排他性: 它主要解决“发生了什么”和“在哪里发生”的问题,对于“为什么慢”这类性能问题,它提供的线索有限,这正是完整分布式追踪系统的长处。
对于一个以业务交付和系统稳定性为首要目标的工程团队而言,这套方案在投入产出比上达到了一个极佳的平衡点。它将一个原先需要数小时甚至数天的复杂分布式系统调试工作,缩短到了几分钟之内。