使用 Project Reactor 2 進行上下文傳播 - Spring Cloud Sleuth 的坎坷之路

工程 | Dariusz Jędrzejczyk | 2023年3月29日 | ...

此文章是系列文章的一部分

  1. 基礎知識
  2. Spring Cloud Sleuth 的坎坷之路
  3. 響應式與命令式之間的統一橋接

Spring Cloud Sleuth 最近更名為 Micrometer Tracing,它是 Micrometer 專案的一部分。大部分追蹤檢測都集中在 Micrometer 中新的可觀察性 API 下。這些專案的目標是使任何應用程式都可觀察——以包含關聯識別符號的指標、追蹤和日誌的形式。為了實現這個目標,庫需要一種傳輸上下文資訊的方法。當應用程式以任何形式處理非同步時,這項任務就變得相當具有挑戰性。在上一篇文章中,我們介紹了使用 ThreadLocal 和 Reactor Context 進行上下文傳播的基礎知識。

Spring Cloud Sleuth 在非同步上下文傳播的方法上經歷了多次調整。由於 Sleuth 處理不需要響應式 API 的第三方檢測庫,因此建立一種向它們提供上下文的穩定方式至關重要。這些庫通常不假設非同步,而是依賴於靜態的 ThreadLocal 狀態。多年來,ThreadLocal 為 JVM 應用程式提供了隱式的上下文儲存,以驅動可觀察性功能。隨著時間的推移,Project Reactor 在底層原語之上引入了各種鉤子和包裝機制,以使響應式和命令式之間的橋接成為可能。在本文中,我們將探討將上下文傳播到 ThreadLocal 值的方法,並討論其中可能存在的錯誤。我們將探討 Sleuth 採取的方法,並總結我們發現的既高效能又語義健全的良好折衷方案。

在我們描述 Sleuth 引入的方法之前,我們應該考慮在命令式和響應式世界之間進行橋接所帶來的危險。

隱藏併發下的副作用陷阱

我們在上一篇文章中討論了 Thread 切換和相關副作用的一些潛在問題。現在我們將透過使用 Reactor 的外掛機制來解決可能遇到的問題,從而更多地探索響應式程式設計的屬性。

總結 Spring Cloud Sleuth 遇到的所有問題是一個不斷變化的目標。此外,許多組織都有自己的上下文傳播機制實現,例如,用於填充 SLF4J 的 MDC。本文無意全面總結所有潛在陷阱。它旨在建立一些直覺,幫助你理解最終的真理:你要麼遵守響應式程式設計規則,要麼就準備好在最意想不到的時刻失敗。

排程器鉤子

眾所周知,響應式鏈可以使用不同的 Thread 來傳播訊號。從我們上一篇文章中學到的,當執行在另一個 Thread 上繼續時,在任務執行時恢復上下文是有意義的。Project Reactor 將管理 Thread 的任務委託給 Scheduler。它還提供了一個專用的鉤子,允許攔截特定工作單元的排程和執行:Schedulers.onScheduleHook。它的工作方式與上一篇文章中的 WrappedExecutor 類似。讓我們看看何時可能考慮使用它的場景。

清理

在第一部分中,我們瞭解到不能依賴 ThreadLocal 值在響應式鏈中始終可用。如果我們嘗試在訂閱時初始化它,並在 doFinally 運算子中清除它呢?我們的應用程式可以使用有限數量的 Thread 處理許多請求,其中一些是併發的。由於這些平臺 Thread 可以重用,我們需要在處理另一個請求之前清理與一個請求相關的任何 ThreadLocal 狀態,以免不同的請求使用殘留的關聯識別符號。

接下來的程式碼示例是對我們之前編寫的未使用 Reactor Context 的程式碼的修改。

handleRequest 方法的潛在實現可能如下所示

Mono<Void> handleRequest() {
  return Mono.fromSupplier(() -> {
    initRequest(); // <1>
    return "test-product";
  }).flatMap(product ->
    Flux.concat(
      addProduct(product),
      notifyShop(product)).then())
    .doOnSuccess(v -> log("Done."))
    .doFinally(signalType ->
      CORRELATION_ID.remove()); // <2>
}

<1> 中我們設定 ThreadLocal 值,在 <{2}gt; 中我們嘗試清除它。

我們還修改了我們執行的操作,以便能夠在 addProduct 方法中新增人工延遲

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return Mono.<Void>empty()
      .delaySubscription(Duration.ofMillis(10),
        Schedulers.single()); // <1>
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return Mono.just(true);
  });
}

請注意,在 <1> 中,我們透過延遲訂閱引入了非同步性,並使用 Schedulers.single() 在 10ms 後啟動訂閱。delaySubscription 將使用該 Scheduler 的底層 ScheduledExecutorService,並在延遲後在另一個 Thread 上啟動訂閱。

從上一篇文章中我們知道,在這種情況下我們需要恢復 ThreadLocals,所以我們使用提到的 Scheduler 外掛來實現

Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);

在 Reactor 的 Scheduler 上執行的每個任務都將恢復 ThreadLocal 值,所以我們應該很安全。

現在,讓我們模仿兩個連續的請求,中間用一條日誌驗證 CORRELATION_ID 是否已正確清除

log("Got first request, calling handler");
handleRequest().block();

log("Got second request, calling handler");
log("There should be no correlationId on this line!");

handleRequest().block();

日誌如下:

[      main][                null] Got first request, calling handler // <1>
[      main][ 8658769170992364531] Adding product: test-product
[  single-1][ 8658769170992364531] Notifying shop about: test-product
[  single-1][ 8658769170992364531] Done.
[      main][ 8658769170992364531] Got second request, calling handler
[      main][ 8658769170992364531] There should be no correlationId on this line!
[      main][  711436174608061530] Adding product: test-product
[  single-1][  711436174608061530] Notifying shop about: test-product
[  single-1][  711436174608061530] Done.

“test-product” 處理相關的日誌具有正確的關聯識別符號。然而,請求之間發生了什麼?我們期望 ThreadLocaldoFinally 中被清除。不幸的是,請求之間的日誌仍然包含一個識別符號。那麼發生了什麼?

請注意,“Notifying shop about” 日誌發生在 single-1 Thread 上。訊號是在那個 Thread 上傳遞的,所以我們在那裡清除了 ThreadLocal,但主 Thread 仍然被汙染(在 <1>)。現在,我們處理程式之外的執行可以使用錯誤的關聯識別符號用於不同的目的。我們可以嘗試透過在伺服器層(分派請求)新增清理邏輯來緩解這個問題,並確保用於請求的每個 Thread 都未被汙染。如果我們的管道更復雜,這並不能儲存所有其他潛在的 Scheduler Thread

這種方法在允許應用程式在響應式鏈中透明地使用 ThreadLocal 值方面取得了相當大的進展。從效能角度來看,它也是合理的,因為它不會在每個運算子周圍設定和重置 ThreadLocal,而只在處理專案時發生 Thread 切換時才進行。然而,它也表明仍有一些未解決的副作用。在接下來的示例中,我們將體驗並嘗試解決不同的場景。

外部源和匯的難題

使用 ThreadLocal 作為上下文元資料傳輸機制的策略,另一個常見問題是當使用了不同於 Reactor 的非同步庫並且它自己切換 Thread 時。當執行切換到不受包裝的 ExecutorService 控制的不同 Thread 時,上下文會丟失。

讓我們看看實際情況。我們將重用目前為止看到的大部分程式碼,只對 notifyShop 方法進行一次更改。它現在使用以下方法模擬遠端呼叫:

Mono<Boolean> makeRequest(String productName) {
  return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
    CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}

所以 notifyShop 看起來像這樣

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName);
  });
}

如果我們觸發一次處理程式

handleRequest().block();

我們得到以下輸出

[      main][  683056557275963407] Adding product: test-product
[  single-1][  683056557275963407] Notifying shop about: test-product
[l-worker-1][                null] Done!

為了更好的可讀性,日誌縮短了 Thread 名稱,但 l-worker-1 實際上是 ForkJoinPool.commonPool-worker-1 的縮寫。

正如我們所看到的,我們的執行在一個我們不控制的通用 ForkJoinPool 上繼續。一個問題是,從那個 Thread 切換開始,我們就不再看到我們的關聯識別符號了,而另一個問題是,我們在一個實際上缺少關聯資訊的 Thread 上執行了清理。

我們可能可以透過 Executor 或任務包裝來(部分)改善這種情況,如上一篇文章所述,但我們並不總是擁有這種控制權——例如,如果我們呼叫使用 CompletableFuture 的外部庫。

運算子鉤子

我們已經準備好討論 Sleuth 的策略了。Schedulers.onScheduleHook 在響應式處理中可能發生的非顯式 Thread 切換方面提供了有限的能力。我們需要對操作的執行有更多的控制。我們將透過引入兩種外部服務通訊方式來演示其侷限性。

addProduct 方法現在發出遠端請求,並在我們控制的 Scheduler 上釋出結果。將繁重計算解除安裝到不同的 Thread 上是很常見的。為此,我們使用 publishOn 運算子

Mono<Void> addProduct(String productName) {
  return Mono.defer(() -> {
    log("Adding product: " + productName);
    return makeRequest(productName)
      .publishOn(Schedulers.single())
      .then();
  });
}

notifyShop 方法模擬將結果對映到可能多個 Publisher。這在響應是複合結果的典型場景中很常見——例如,如果響應是 JSON 陣列,我們打算將每個專案作為對另一個服務的單獨呼叫進行處理或豐富單個結果。讓我們使用簡化版本,只取一個結果

Mono<Boolean> notifyShop(String productName) {
  return Mono.defer(() -> {
    log("Notifying shop about: " + productName);
    return makeRequest(productName)
      .flatMapMany(result ->
        Flux.just("result")
          .map(x -> result))
          .take(1)
          .single();
    });
}

現在讓我們跳過處理程式,手動啟動關聯識別符號,然後訂閱這些鏈

initRequest();
addProduct("test-product")
  .doOnSuccess(v -> log("Added."))
  .block();

initRequest();
notifyShop("test-product")
  .doOnSuccess(v -> log("Notified."))
  .block();

我們看看輸出

[      main][ 6606077262934500649] Adding product: test-product
[  single-1][                null] Added.
[      main][  182687922231622589] Notifying shop about: test-product
[l-worker-1][                null] Notified.

這是預期的,因為 doOnSuccess 中發生的兩個日誌都是由於 CompletableFutureForkJoinPool Thread 上傳遞值而觸發的。儘管我們有 Scheduler 包裝,但結果首先在不受我們控制的 Thread 上傳遞,因此即使 addProduct 中使用的 publishOn 也沒有幫助。

我們能做些什麼來改善這種情況嗎?Reactor 有一個細粒度的外掛系統,它允許我們裝飾任何管道中的任何運算子。我們可以嘗試使用它來恢復關聯識別符號。

外掛將使用自定義的 Subscriber 實現,它在訂閱時捕獲關聯識別符號

static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
  final CoreSubscriber<T> delegate;
  Long correlationId;

  public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
    this.delegate = delegate;
  }

  @Override
  public void onSubscribe(Subscription s) {
    delegate.onSubscribe(s);
    this.correlationId = CORRELATION_ID.get();
  }

  @Override
  public void onNext(T t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onNext(t);
  }

  @Override
  public void onError(Throwable t) {
    CORRELATION_ID.set(this.correlationId);
    delegate.onError(t);
  }

  @Override
  public void onComplete() {
    CORRELATION_ID.set(this.correlationId);
    delegate.onComplete();
  }
}

要更改運算子,使其實現將呼叫委託給實際的 Subscriber 例項,我們可以使用 Operators.lift 方法

Operators.lift((scannable, subscriber) ->
  new CorrelatingSubscriber<>(subscriber));

onEachOperator 鉤子

首先,我們將嘗試一個外掛,它允許我們更改鏈中的每個運算子

Hooks.onEachOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

我們再執行一次示例,檢查輸出

[      main][ 7295088917002526647] Adding product: test-product
[  single-1][ 7295088917002526647] Added.
[      main][  383851863754448684] Notifying shop about: test-product
[l-worker-1][  383851863754448684] Notified.

哇!我們甚至在如此複雜的場景中也成功獲取了關聯識別符號。訂閱的初始動作捕獲了 ThreadLocal 值,並在每一步恢復了它。即使 notifyShop 方法中使用的 flatMap (它自己訂閱)也有效,因為在訂閱到另一個 Thread 之前,ThreadLocal 會從之前的捕獲中填充!這聽起來確實很棒,但這種方法也有缺點。第一個也是最明顯的缺點是效能。傳播發生在每個運算子中。使用這種技術,我們首先裝飾每個物件,並在每一步進行 ThreadLocal 訪問。所有這些操作都很昂貴。要了解更多資訊,請觀看Oleh 關於響應式效能的演講

onLastOperator 鉤子

所以我們嘗試另一種方法。這次,我們將使用一個外掛,它附加到鏈中被認為是最後一個運算子——一個直接在 subscribe() 呼叫之前的運算子。

關於響應式鏈,可以觀察到一點:在同步運算子的情況下,我們不需要在每個單獨的操作(例如,filtermap)中恢復最初捕獲的上下文,而只需在鏈中的最後一個運算子被訂閱時恢復。只要不涉及 Thread 邊界交叉,這種機制就有效。為了支援可能跨越這些邊界的運算子(例如,flatMap,它涉及訂閱一個新的 Publisher),有一個特殊的技巧。它將對映結果視為它們所操作的內部 Publishers 的最後一個運算子。

我們試試這個方法

Hooks.onLastOperator(
  Operators.lift((scannable, subscriber) ->
    new CorrelatingSubscriber<>(subscriber)));

並執行

[      main][ 2122332013640150746] Adding product: test-product
[  single-1][ 2122332013640150746] Added.
[      main][  459477771449275997] Notifying shop about: test-product
[l-worker-1][                null] Notified.

它在 addProduct 中使用了 publishOn 成功了,但在 notifyShop 中的 flatMap 失敗了。

讓我們分析一下 notifyShop 失敗的原因。我們對 block() 的呼叫捕獲了 ThreadLocal 併為每個向下遊傳輸的訊號恢復了它。在 flatMapMany 中完成的對映,我們正在處理我們之前提到的非同步邊界。我們的外掛實際上應用於內部源(Flux.just().map().single())。

然而,儘管自定義 SubscriberflatMapMany 內部被呼叫並嘗試恢復 ThreadLocal 值,這些努力仍然沒有幫助。觸發內部訂閱的訊號是在我們不控制的 Thread 上啟動的,所以我們根本沒有 ThreadLocal 可供捕獲。

publishOn 運算子的情況下則不同。它的訂閱開始於我們控制的 Thread。因此,當由於 makeRequest() 方法而處理訊號時,它只在受我們控制的 Thread 上傳遞。.doOnSuccess(v -> log("Added.")) 的執行發生在與 flatMapMany 不同步的 Thread 邊界之後。

這就是為什麼 onEachOperator 涵蓋了更多情況——它在每一步恢復初始值,無論非同步邊界如何。儘管如此,onLastOperator 的效能略優於 onEachOperator

addQueueWrapper 鉤子

還有一個外掛,我們可以用它來完全控制響應式傳遞,如果我們將它與之前的鉤子結合使用的話。Spring Cloud Sleuth 也使用了它。我們正在考慮最近引入的一個外掛,Hooks.addQueueWrapper。不過,我們不會詳細探討它。它可以解決 Reactor 中工作竊取機制引入的問題。像 flatMap 這樣的非同步運算子,可以在向運算子傳遞訊號的各種 Thread 上取得進展。想象一個背壓場景,處理過程暫停了一段時間。在某個時刻,一個新的 Thread 可以接管併發出 Subscription.request(n) 呼叫,這會導致累積的值立即傳遞。現在你可能會問自己:“什麼累積值?”這是一個好問題。Reactor 中的許多運算子都使用內部 Queue 來實現背壓或保持序列傳遞語義。因為這些 Queue 的 draining 可以在任何 Thread 上發生,所以上下文資訊應該附加到儲存在 Queue 中的每個訊號——也就是說,我們用於關聯目的的 ThreadLocal 值。這就是我們需要 Queue 包裝器的原因——當一個值被提交到 Queue 時,我們捕獲 ThreadLocal 狀態。當一個值從 Queue 中檢索時,狀態就會恢復。

Spring Cloud Sleuth 中的上下文傳播

在展示了在響應式流之外操作的風險以及我們可以使用哪些機制來傳播 ThreadLocal 上下文之後,讓我們總結一下 Spring Cloud Sleuth 使用的四種策略

  1. DECORATE_ON_EACH
  2. DECORATE_ON_LAST
  3. DECORATE_QUEUES
  4. MANUAL

前三種策略試圖利用響應式運算子的一些特性,結合 Reactor 的外掛機制,並將 ThreadLocal 用作內部傳輸機制以及與檢測庫共享上下文資料的方式。前三種策略也假設使用Schedulers.onScheduleHook進行 Scheduler 包裝。另一方面,最後一種策略利用了 Reactor 的 Subscriber 繫結的 Context

DECORATE_ON_EACH

此策略使用我們之前看過的Hooks.onEachOperator外掛。儘管 Sleuth 添加了許多最佳化以避免不必要的恢復,但效能影響仍然非常顯著。通常,這種方法非常有效。不過,它非常激進,因此如果運算子需要更改上下文,就可能難以應對。下游運算子將看不到更改,因為每個步驟都會恢復初始訂閱的上下文。

DECORATE_ON_LAST

Hooks.onLastOperator 用於提高效能。這種方法可能因為其提供的靈活性而失敗。如果上游運算子修改了上下文,下游操作將看到此更改。這帶來了風險,如果某個運算子清除該上下文,那麼在該上下文丟失後,直到另一個訊號排程到包裝的 Scheduler,該上下文都將丟失。另一個風險是我們早期示例中看到的,訂閱發生在某個 Thread 上,但請求資料發生在另一個不受 Reactor 控制的 Thread 上。

DECORATE_QUEUES

作為前一種策略的演進,DECORATE_QUEUES 糾正了一些錯誤場景(請求資料帶外發生或多個 Thread 釋出資料),但並非所有場景。我們之前描述過Hooks.addQueueWrapper外掛的使用方式。Queue 包裝的一個已知問題是,在處理專案之後沒有可靠的清理方式。上下文在從 Queue 中檢索專案時恢復。沒有圍繞專案處理的範圍,該專案會透過下游運算子傳播。因此,這種方法也容易汙染 ThreadLocal 儲存。最近在 draining 過程中進行了一些改進,以限制其影響。

手動

在此策略中,Sleuth 所做的唯一事情是,在訂閱時將 ThreadLocal 中的值作為快照捕獲到 Reactor 的 Context 中。使用者需要自行在相關位置提取該快照,並填充 ThreadLocal,以便檢測庫可以使用它們。對於受支援的追蹤檢測,例如 Zipkin 和 Brave,Sleuth 透過使用範圍概念來恢復 ThreadLocal——ThreadLocal 在檢測期間恢復,並在快照關閉後立即消失。這是效能最高的方法,儘管它需要使用者手動處理(正如其名稱所示)。

演變

在區域性範圍內使用 Reactor Context 填充 ThreadLocal 既高效又符合響應式鏈的工作方式。將上下文與 Subscriber 關聯是一種行之有效的方法,它不會意外地導致上下文資料丟失。在下一篇文章中,我們將展示 Reactor 3.5 和 Micrometer 1.10 如何將手動方法提升到新的水平,並提供一種跨響應式和命令式邊界進行上下文傳播的結構化方法。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有