使用 Project Reactor 2 進行上下文傳播 - Spring Cloud Sleuth 的坎坷之路

工程 | Dariusz Jędrzejczyk | 2023年3月29日 | ...

本文是系列文章的一部分

  1. 基礎知識
  2. Spring Cloud Sleuth 的坎坷之路
  3. 響應式與命令式之間的統一橋接

Spring Cloud Sleuth 最近成為了 Micrometer Tracing,它是 Micrometer 專案的一部分。大多數跟蹤插樁都集中在 Micrometer 中,位於新的可觀測性 API之下。這些專案的目標是讓任何應用程式都具備可觀測性——以包含關聯識別符號的指標、跟蹤和日誌的形式。為了實現這一目標,庫需要一種方式來傳輸上下文資訊。當應用程式以任何形式處理非同步時,這項任務就變得相當具有挑戰性。在上一篇文章中,我們回顧了使用 ThreadLocal 和 Reactor Context 進行上下文傳播的基礎知識。

Spring Cloud Sleuth 在處理非同步上下文傳播的方式上經歷了多次重大調整。由於 Sleuth 需要與不一定具有響應式 API 的第三方插樁庫打交道,因此擁有一種既定的方式來使上下文對它們可用至關重要。這些庫通常不假設非同步,而是依賴靜態的 ThreadLocal 狀態。多年來,ThreadLocal 為 JVM 應用程式提供了隱式的上下文儲存,用於驅動可觀測性功能。隨著時間的推移,Project Reactor 在底層原語之上引入了各種鉤子(hook)和包裝(wrapping)機制,以便實現響應式和命令式之間的橋接。在本文中,我們將探討將上下文傳播到 ThreadLocal 值的方法,並討論它們可能出現的問題。我們將探討 Sleuth 採取的方法,並總結我們發現的一種兼顧效能和語義合理性的良好折衷方案。

在描述 Sleuth 引入的方法之前,我們應該考慮在命令式世界和響應式世界之間進行橋接所存在的危險。

隱藏併發下的副作用陷阱 (Pitfalls of Side Effects in the Face of Hidden Concurrency)

在上一篇文章中,我們討論了一些 Thread 切換和相關副作用的潛在問題。現在,我們將利用 Reactor 的外掛機制來解決我們可能遇到的問題,從而更深入地探索響應式程式設計的特性。

總結 Spring Cloud Sleuth 遇到的所有問題是一個不斷變化的目標。此外,許多組織都有自己的上下文傳播機制實現,例如用於填充 SLF4J 的 MDC。本文無意全面總結所有潛在陷阱。它旨在建立一些直覺,幫助你理解最終的真理:要麼遵循響應式程式設計規則,要麼準備在最意想不到的時刻失敗。

排程器鉤子 (Scheduler Hook)

正如我們所知,響應式鏈可以使用不同的 Thread 來傳播訊號。從上一篇文章中我們瞭解到,當執行在另一個 Thread 上繼續時,在任務執行時恢復上下文是合理的。Project Reactor 將管理 Thread 的任務委託給 Scheduler。它還提供了一個專用的鉤子,允許攔截特定工作單元的排程和執行:Schedulers.onScheduleHook。它的工作方式類似於上一篇文章中的 WrappedExecutor。讓我們來看一個可能考慮使用它的場景。

清理 (Cleanup)

在第一部分中,我們瞭解到在響應式鏈中不能持續依賴 ThreadLocal 值可用。如果我們嘗試在訂閱時初始化它,並在 doFinally 運算子中清除它呢?我們的應用程式可以使用有限數量的 Thread 來處理許多請求,其中一些是併發的。由於這些平臺 Thread 可以被重用,我們需要在處理另一個請求之前對與前一個請求相關的任何 ThreadLocal 狀態進行清理,以確保不同的請求不會使用殘留的關聯識別符號。

接下來的程式碼示例是對上一部分中我們編寫的未使用 Reactor Context 的程式碼進行的修改。

handleRequest 方法的一種潛在實現可能如下所示

Mono<Void> handleRequest() {
  return Mono.fromSupplier(() -> {
    initRequest(); // <1>
    return "test-product";
  }).flatMap(product ->
    Flux.concat(
      addProduct(product),
      notifyShop(product)).then())
    .doOnSuccess(v -> log("Done."))
    .doFinally(signalType ->
      CORRELATION_ID.remove()); // <2>
}

<1> 中,我們設定了 ThreadLocal 的值,在 <2> 中,我們嘗試清除它。

我們還修改了執行的操作,以便能夠在 addProduct 方法中新增一個人工延遲

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return Mono.<Void>empty()
      .delaySubscription(Duration.ofMillis(10),
        Schedulers.single()); // <1>
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return Mono.just(true);
  });
}

注意,在 <1> 中,我們透過延遲訂閱並使用 Schedulers.single() 在 10ms 後啟動訂閱來引入非同步。delaySubscription 將使用該 Scheduler 底層的 ScheduledExecutorService 並在延遲後在另一個 Thread 上啟動訂閱。

從上一篇文章中我們知道,在這種情況下我們需要恢復 ThreadLocals,因此我們使用提到的 Scheduler 外掛來實現這一點

Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);

在 Reactor 的 Scheduler 上執行的每個任務都會恢復 ThreadLocal 的值,因此我們應該安全無虞。

現在,讓我們模擬兩個順序請求,它們之間有一個日誌,用於驗證 CORRELATION_ID 是否已正確清除

log("Got first request, calling handler");
handleRequest().block();

log("Got second request, calling handler");
log("There should be no correlationId on this line!");

handleRequest().block();

日誌如下所示

[      main][                null] Got first request, calling handler // <1>
[      main][ 8658769170992364531] Adding product: test-product
[  single-1][ 8658769170992364531] Notifying shop about: test-product
[  single-1][ 8658769170992364531] Done.
[      main][ 8658769170992364531] Got second request, calling handler
[      main][ 8658769170992364531] There should be no correlationId on this line!
[      main][  711436174608061530] Adding product: test-product
[  single-1][  711436174608061530] Notifying shop about: test-product
[  single-1][  711436174608061530] Done.

“test-product” 處理相關的日誌具有正確的關聯識別符號。然而,在請求之間發生了什麼?我們期望 ThreadLocaldoFinally 中被清除。不幸的是,請求之間的日誌仍然包含一個識別符號。那麼發生了什麼?

注意 “Notifying shop about” 的日誌發生在 Thread single-1 上。訊號是在該 Thread 上傳遞的,所以我們在那裡清除了 ThreadLocal,但主 Thread 仍然被汙染了(在 <1> 中)。現在,處理程式之外的執行可以使用錯誤的關聯識別符號用於不同的目的。我們可以嘗試透過在伺服器層(負責分派請求)新增清理邏輯來緩解此問題,並確保用於請求的每個 Thread 不被汙染。但是,如果我們的管道更復雜,這並不能挽救所有其他潛在的 Scheduler Thread

這種方法在允許應用程式在響應式鏈中透明地使用 ThreadLocal 值方面取得了相當大的進展。從效能角度來看,它也是合理的,因為它不會在每個運算子周圍設定和重置 ThreadLocal,而只在處理專案時發生 Thread 切換時進行。然而,它也表明仍然存在一些未解決的副作用。在接下來的示例中,我們將體驗並嘗試解決不同的場景。

外部源和匯合的困難 (Difficulties with External Sources and Sinks)

使用 ThreadLocal 作為上下文元資料傳輸機制的策略還有一個常見問題,即使用了 Reactor 之外的非同步庫,並且該庫自行切換了 Thread。當執行切換到不受包裝的 ExecutorService 控制的不同 Thread 時,上下文就會丟失。

讓我們實際看看。我們將重用目前為止看過的大部分程式碼,但對 notifyShop 方法進行一項更改。它現在透過使用以下方法來模擬遠端呼叫

Mono<Boolean> makeRequest(String productName) {
  return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
    CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}

所以 notifyShop 看起來像這樣

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName);
  });
}

如果我們觸發處理程式一次

handleRequest().block();

我們得到以下輸出

[      main][  683056557275963407] Adding product: test-product
[  single-1][  683056557275963407] Notifying shop about: test-product
[l-worker-1][                null] Done!

日誌縮短了 Thread 名稱以提高可見性,但 l-worker-1 實際上是 ForkJoinPool.commonPool-worker-1 的縮短版。

正如我們所見,我們的執行在一個我們無法控制的通用 ForkJoinPool 上繼續進行。一個問題是,從該 Thread 切換開始,我們就再也看不到我們的關聯識別符號了;另一個問題是,我們在一個實際上缺少關聯資訊的 Thread 上執行了清理操作。

我們可以透過 Executor 或任務包裝來潛在地改善(部分)情況,如前一篇文章所述,但我們並非總是擁有這樣的控制權——例如,如果我們呼叫一個使用 CompletableFuture 的外部庫。

運算子鉤子 (Operator Hooks)

我們幾乎準備好討論 Sleuth 的策略了。Schedulers.onScheduleHook 對於響應式處理中可能發生的非顯而易見的 Thread 切換提供了有限的能力。我們需要對操作的執行有更多的控制。我們將透過引入兩種外部服務通訊方式來演示這些限制。

addProduct 方法現在發起一個遠端請求,並在我們控制的 Scheduler 上釋出結果。將繁重的計算轉移到不同的 Thread 上是很常見的做法。為此,我們使用了 publishOn 運算子

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return makeRequest(productName)
      .publishOn(Schedulers.single())
      .then();
  });
}

notifyShop 方法模擬將結果對映到可能多個 Publisher 中。這在響應是一個複合結果的情況下是典型的場景——例如,如果響應是一個 JSON 陣列,我們打算將每個項作為對另一個服務的單獨呼叫進行處理,或者豐富單個結果。我們使用一個簡化版本,只取一個結果

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName)
      .flatMapMany(result ->
        Flux.just("result")
          .map(x -> result))
          .take(1)
          .single();
    });
}

現在我們跳過處理程式,手動初始化關聯識別符號,然後訂閱這些鏈

initRequest();
addProduct("test-product")
  .doOnSuccess(v -> log("Added."))
  .block();

initRequest();
notifyShop("test-product")
  .doOnSuccess(v -> log("Notified."))
  .block();

讓我們看看輸出

[      main][ 6606077262934500649] Adding product: test-product
[  single-1][                null] Added.
[      main][  182687922231622589] Notifying shop about: test-product
[l-worker-1][                null] Notified.

這是預料之中的,因為 doOnSuccess 中發生的兩個日誌都是由 CompletableFutureForkJoinPool Thread 上傳遞值觸發的。即使我們有 Scheduler 包裝,結果首先是在我們無法控制的 Thread 上傳遞的,因此即使在 addProduct 中使用 publishOn 也無濟於事。

我們能做些什麼來改善這種情況嗎?Reactor 有一個細粒度的外掛系統,它允許我們在任何管道中裝飾任何運算子。我們可以嘗試使用它來恢復關聯識別符號。

這些外掛將使用自定義的 Subscriber 實現,該實現在訂閱時捕獲關聯識別符號

static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
  final CoreSubscriber<T> delegate;
  Long correlationId;

  public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
    this.delegate = delegate;
  }

  @Override
  public void onSubscribe(Subscription s) {
    delegate.onSubscribe(s);
    this.correlationId = CORRELATION_ID.get();
  }

  @Override
  public void onNext(T t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onNext(t);
  }

  @Override
  public void onError(Throwable t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onError(t);
  }

  @Override
  public void onComplete() {
    CORRELATION_ID.set(this.correlationId);
    delegate.onComplete();
  }
}

要更改運算子,使其實現將呼叫委託給實際的 Subscriber 例項,我們可以使用 Operators.lift 方法

Operators.lift((scannable, subscriber) ->
  new CorrelatingSubscriber<>(subscriber));

onEachOperator 鉤子 (onEachOperator Hook)

首先,我們嘗試一個外掛,它允許我們修改鏈中的每一個運算子

Hooks.onEachOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

讓我們再次執行示例並檢視輸出

[      main][ 7295088917002526647] Adding product: test-product
[  single-1][ 7295088917002526647] Added.
[      main][  383851863754448684] Notifying shop about: test-product
[l-worker-1][  383851863754448684] Notified.

哇!即使在如此複雜的場景中,我們也設法獲取了關聯識別符號。最初的訂閱行為捕獲了 ThreadLocal 值並在每一步都恢復了它。即使 notifyShop 方法中使用的 flatMap(它會自行訂閱)也奏效了,因為在訂閱另一個 Thread 之前,ThreadLocal 已從先前的捕獲中填充!這聽起來確實很棒,但這種方法也有缺點。第一個也是最明顯的缺點是效能。傳播發生在每個運算子上。使用這種技術,我們首先包裝(decorate)每個物件,並在每一步都進行 ThreadLocal 訪問。所有這些操作都很昂貴。要了解更多資訊,請觀看Oleh 關於響應式效能的演講

onLastOperator 鉤子 (onLastOperator Hook)

所以我們嘗試另一種方法。這次,我們將使用一個外掛,它會附著到鏈中被視為最後一個運算子的運算子上——也就是緊接在 subscribe() 呼叫之前的運算子。

對於響應式鏈可以得出一個觀察結論:對於同步運算子,我們不需要在每個單獨的操作(例如 filtermap)中恢復最初捕獲的上下文,而只需要在鏈中最後一個運算子被訂閱時恢復。只要不涉及 Thread 邊界的跨越,這種機制就能奏效。為了支援可能跨越這些邊界的運算子(例如 flatMap,它涉及訂閱一個新的 Publisher),需要一個特殊的技巧。它將對映的結果視為它們操作的內部 Publisher 的最後一個運算子。

讓我們嘗試這種方法

Hooks.onLastOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

並執行

[      main][ 2122332013640150746] Adding product: test-product
[  single-1][ 2122332013640150746] Added.
[      main][  459477771449275997] Notifying shop about: test-product
[l-worker-1][                null] Notified.

它在 addProduct 中與 publishOn 一起工作正常,但在 notifyShop 中的 flatMap 上失敗了。

讓我們分析一下為什麼 notifyShop 會失敗。我們呼叫 block() 會捕獲 ThreadLocal 併為每個向下遊傳播的訊號恢復它。透過在 flatMapMany 中完成的對映,我們正在處理之前提到的非同步邊界。我們的外掛實際上應用於內部源 (Flux.just().map().single())。

然而,這些努力仍然沒有奏效,儘管自定義的 SubscriberflatMapMany 內部被呼叫並嘗試恢復 ThreadLocal 的值。觸發內部訂閱的訊號是在我們無法控制的 Thread 上啟動的,因此我們根本沒有 ThreadLocal 可以捕獲。

在使用 publishOn 運算子的情況下則不同。對其的訂閱始於我們控制的 Thread。因此,當處理來自 makeRequest() 方法的結果訊號時,它只會在我們控制的 Thread 上傳遞。.doOnSuccess(v -> log("Added.")) 的執行發生在與 flatMapMany 不同步的 Thread 邊界之後。

這就是為什麼 onEachOperator 能覆蓋更多情況——它在每一步都恢復初始值,無論是否存在非同步邊界。儘管如此,onLastOperator 的效能略優於 onEachOperator

addQueueWrapper 鉤子 (addQueueWrapper Hook)

還有一個外掛,如果將其與之前的鉤子結合使用,我們可以完全控制響應式傳遞。Spring Cloud Sleuth 也使用了它。我們考慮的是最近引入的外掛 Hooks.addQueueWrapper。不過,我們將不詳細探討它。它可以解決 Reactor 中工作竊取機制引入的問題。非同步運算子,例如 flatMap,可以在將訊號傳遞給運算子的各種 Thread 上取得進展。想象一個背壓場景,處理暫停了一段時間。在某個時刻,一個新的 Thread 可以接管併發出 Subscription.request(n) 呼叫,這將導致累積的值立即傳遞。現在你可以問自己:“什麼累積的值?”這是一個好問題。Reactor 中的許多運算子使用內部 Queue 來實現背壓或保留序列傳遞語義。因為這些 Queue 的 draining 可以在任何 Thread 上發生,所以上下文資訊應該附加到儲存在 Queue 中的每個訊號上——也就是說,用於我們關聯目的的 ThreadLocal 值。這就是為什麼我們需要一個 Queue 包裝器——在將值提交到 Queue 時,我們捕獲 ThreadLocal 狀態。當從 Queue 中檢索值時,狀態就會恢復。

Spring Cloud Sleuth 中的上下文傳播 (Context Propagation in Spring Cloud Sleuth)

在展示了在 reactive-streams 術語之外操作的風險以及我們可以用來傳播 ThreadLocal 上下文的機制後,讓我們總結一下 Spring Cloud Sleuth 使用的四種策略

  1. DECORATE_ON_EACH
  2. DECORATE_ON_LAST
  3. DECORATE_QUEUES
  4. MANUAL

前三種策略試圖利用響應式運算子的一些特性,結合 Reactor 的外掛機制,並使用 ThreadLocal 作為內部傳輸機制以及與插樁庫共享上下文資料的方式。前三種策略還假定使用 Schedulers.onScheduleHook 進行 Scheduler 包裝。另一方面,最後一種策略則利用了 Reactor 繫結到 SubscriberContext

DECORATE_ON_EACH

此策略使用我們之前見過的 Hooks.onEachOperator 外掛。效能影響巨大,儘管 Sleuth 添加了許多最佳化以在非必要時不進行恢復。通常,這種方法非常有效。但它也非常激進,因此如果某個運算子需要更改上下文,就可能難以處理。下游運算子將看不到更改,因為來自初始訂閱的上下文在每一步都被恢復了。

DECORATE_ON_LAST

使用 Hooks.onLastOperator 是為了提高效能。這種方法可能會失敗,因為它提供了靈活性。如果上游運算子修改了上下文,下游操作會看到更改。這帶來了風險,如果某個運算子清除了該上下文,那麼在該上下文被排程到包裝的 Scheduler 之前,上下文就會丟失。另一個風險是我們之前示例中看到的情況,即訂閱發生在某個 Thread 上,但請求資料發生在另一個不受 Reactor 控制的 Thread 上。

DECORATE_QUEUES

DECORATE_QUEUES 是前一種策略的演進,它糾正了一些錯誤場景(請求資料帶外發生或多個 Thread 釋出資料),但並非所有場景。我們之前描述過如何使用 Hooks.addQueueWrapper 外掛。Queue 包裝的一個已知問題是,在處理完一個專案後,沒有可靠的方法進行清理。上下文在從 Queue 中檢索專案時恢復。沒有圍繞透過下游運算子傳輸的專案處理的範圍。因此,這種方法也容易汙染 ThreadLocal 儲存。最近在 draining 過程中進行了一些改進以限制影響。

手動 (MANUAL)

在這種策略中,Sleuth 唯一做的事情是在訂閱時將 ThreadLocal 的值作為快照捕獲到 Reactor 的 Context 中。使用者需要在相關位置提取該快照並填充 ThreadLocal,以便將其提供給插樁庫。對於支援的跟蹤插樁,例如 Zipkin 和 Brave,Sleuth 透過使用作用域(scoping)的概念來恢復 ThreadLocal —— ThreadLocal 會為插樁而恢復,並在快照關閉後立即消失。這是效能最高的方法,儘管它需要使用者進行手動(正如其名稱所示)處理。

演進 (Evolution)

在區域性作用域中使用 Reactor Context 填充 ThreadLocal 被證明既具有高效能,又符合響應式鏈的工作方式。將上下文與 Subscriber 關聯是一種經過驗證的方法,不會意外導致上下文資料丟失。在下一篇文章中,我們將展示 Reactor 3.5 和 Micrometer 1.10 如何將手動方法提升到新的水平,並提供一種結構化的方法來跨越響應式和命令式邊界傳播上下文。

獲取 Spring 時事通訊

訂閱 Spring 時事通訊,保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速前進。

瞭解更多

獲取支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部