1. 解决的问题

当一个线程执行过程中开启了新的异步线程,会导致异步线程与当前线程的traceId不一致的问题。

在线程池中,traceId可能在线程创建那一刻就已经固定了,不会跟着使用场景上下文traceId变动,在后面的线程复用环节中一直都是这个traceId,会带来traceId混乱在一起的情况,同样也会带来异步线程与当前线程的traceId不一致的问题。

最终,导致异步线程在日志上无法准确追踪到整个调用链路。

2. 环境

总的来说基于 Zipkin server + Brave library,这一块基本使用需要参考一下文档:

https://zipkin.io/pages/instrumenting.html

https://github.com/openzipkin/brave

3. Brave的currentTraceContext

以MDCCurrentTraceContext作为上下文容器进行初始化方式为例:

var Tracing = Tracing.newBuilder().endpoint(endpoint)
   .spanReporter(spanReporter()).currentTraceContext(MDCCurrentTraceContext.create()).build();

当调用tracing#Tracer#nextSpan()来开启一个新的执行片段(Span)时:

/**
   * Returns a new child span if there's a {@link #currentSpan()} or a new trace if there isn't.
   *
   * <p>Prefer {@link #startScopedSpan(String)} if you are tracing a synchronous function or code
   * block.
   */
  public Span nextSpan() {
    TraceContext parent = currentTraceContext.get();
    return parent != null ? newChild(parent) : newTrace();
  }

本质上是从currentTraceContext中取出TraceContext再进行后续操作,而TraceContext中又有我们需要的traceId:TraceContext#traceIdString

因此,认为只要将这个currentTraceContext从当前线程“传递”到异步线程中就可以满足需求。

我这里追了下代码,MDCCurrentTraceContext是通过ThreadLocal(准确的说是InheritableThreadLocal)绑定到线程中的,TraceContext有多种实现,也有可能有其他的方式。猜想作为框架的Brave其实应该提供一个方法来统一处理这种需求。

4. 用Brave提供的API实现

Brave提供了以下API来装饰线程(池),帮我们做这个“传递”的动作:

  • brave.propagation.CurrentTraceContext#wrap(java.lang.Runnable)
  • brave.propagation.CurrentTraceContext#wrap(java.util.concurrent.Callable)
  • brave.propagation.CurrentTraceContext#executor
  • brave.propagation.CurrentTraceContext#executorService

下文是几个Demo代码。

4.1. Runnable使用

// Ignore DI Tracing
// Ignore DI THREAD_POOL
CompletableFuture<Void> smartFuture = CompletableFuture.runAsync(tracing.currentTraceContext().wrap(() -> {
    var tracer = tracing.tracer();
    var span = getNextSpan(tracer, "spanName");
    try (var ignored = tracer.withSpanInScope(span)) {
        // biz code
    } catch (Exception e) {
        span.error(e);
        throw e;
    } finally {
        span.finish();
    }
}), THREAD_POOL);

4.2. Spring线程池ThreadPoolTaskExecutor中使用

// Ignore DI Tracing
@Bean
public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(20);
    threadPoolTaskExecutor.setMaxPoolSize(100);
    threadPoolTaskExecutor.setQueueCapacity(100);
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    threadPoolTaskExecutor.setThreadNamePrefix("thread-prefix");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    // decorate runnable used in thread pool
    threadPoolTaskExecutor.setTaskDecorator(tracing.currentTraceContext()::wrap);
    return threadPoolTaskExecutor;
}

Java自带的ThreadPoolExecutor也是一样的,重写即可,方法同Spring的ThreadPoolTaskExecutor类中的setTaskDecorator方法。