Project Reactor 3 的上下文傳播 - 統一連線響應式與命令式

工程 | Dariusz Jędrzejczyk | 2023 年 3 月 30 日 | ...

本文是系列文章的一部分

  1. 基礎知識
  2. Spring Cloud Sleuth 的坎坷之路
  3. 統一連線響應式與命令式

我們以 Spring Cloud Sleuth 的 MANUAL 上下文傳播策略既高效又提供正確語義的思考結束了上一篇文章。基於諸多經驗,Spring、Micrometer 和 Reactor 團隊建立了一個新的上下文傳播庫。其目標是封裝在 ThreadLocal 值和類似 Map 的結構之間傳輸上下文資料的關注點。Micrometer 1.10 和 Reactor 3.5 都以此為基礎,在 Reactor 和命令式程式碼之間提供一流的體驗。透過使用 Reactor Context,我們隱式暴露了 Micrometer 用於追蹤庫以及填充 SLF4J 的 MDC 以提供包含追蹤識別符號的日誌所使用的 ThreadLocal 值。

在本文中,我們將採取與之前不同的方法。我們將從現有的頂層 API 開始,然後解釋其背後的原理。最後,您將能夠:

  • 理解這些機制為何如此運作。
  • 決定在您的應用或庫中採取哪種方法。
  • 瞭解何時以及為何您無需做任何事情,並期望一切開箱即用。

響應式 Context 和 ThreadLocals

讓我們回顧第一篇文章中的示例,其中我們展示了 delayElement 運算子如何導致響應式鏈丟失關聯識別符號。讓我們回顧一下程式碼,從我們的操作開始:

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty(); // Assume we’re actually storing the product
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true); // Assume we’re actually notifying the shop
}

然後我們需要回顧一下繫結請求處理器:

Mono<Void> handleRequest() {
  initRequest(); <1>
  log("Assembling the chain"); // <2>

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1)) // <3>
    .flatMap(product ->
      Flux.concat(
        addProduct(product), // <4>
        notifyShop(product)).then())
}

從 Reactor 3.5.0 開始,Reactor Context 能夠與 Micrometer 旗下的一個新庫整合,該庫名為 context-propagation。我們將在本文末尾更詳細地描述該整合。在 Reactor 3.5+ 中,當 context-propagation 庫位於類路徑上時,我們可以期望在 handle 運算子以及新的 tap 運算子中進行日誌記錄時,我們的 ThreadLocal 值能夠存在。

為了傳播我們的自定義 ThreadLocal,我們需要註冊一個 ThreadLocalContextAccessor

ContextRegistry.getInstance()
  .registerThreadLocalAccessor("CORRELATION_ID",
    CORRELATION_ID::get,
    CORRELATION_ID::set,
    CORRELATION_ID::remove);

目前,context-propagation 庫的細節對於實現我們需要的功能並不重要。我們只需要知道我們使用了鍵 CORRELATION_ID,它將與 Reactor Context 一起用於在我們特殊的 operators 中恢復 ThreadLocal。讓我們修改其餘程式碼來使用它們並在指定位置記錄日誌。

我們只需要對請求處理器進行一項更改:

Mono<Void> handleRequest() {
  initRequest(); // <1>
  log("Assembling the chain");

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then())
    .contextCapture(); // <2>
}

我們所做的唯一修改是返回給呼叫者的鏈末尾的 <2> contextCapture 運算子。此運算子的任務是捕獲當前 ThreadLocal 值(ThreadLocalAccessor 例項已在 ContextRegistry 中註冊)並將其儲存在 Reactor Context 中,使用相同的鍵。在此特定實現中,我們唯一的希望是訂閱在裝配階段立即發生,就像我們在 <1> 中設定 ThreadLocal 值一樣。

接下來,我們將使用 tap 運算子新增日誌記錄:

Mono<Void> addProduct(String productName) {
  return Mono.<Void>empty()
    .tap(() -> new DefaultSignalListener<>() {
      @Override
      public void doOnComplete() throws Throwable {
        log("Adding product: " + productName);
      }
  });
}

這裡,我們擴充套件了 reactor-corereactor.core.observability 包中的 DefaultSignalListener。我們只對訂閱完成訊號感興趣,在此處執行日誌操作。

對於 handle 運算子,我們將更改 notifyShop 方法:

Mono<Boolean> notifyShop(String productName) {
  return Mono.just(true)
    .handle((result, sink) -> {
      log("Notifying shop about: " + productName);
      sink.next(result);
    });
}

讓我們看看現在當我們呼叫處理器時,是否能得到正確的輸出:

handleRequest().block();

結果如下:

[      main][  643805344761543048] Assembling the chain
[parallel-1][  643805344761543048] Adding product: test-product
[parallel-1][  643805344761543048] Notifying shop about: test-product

太棒了!這實際上與 Spring Cloud Sleuth 的 MANUAL 策略相同,但已整合到 Reactor 的內部機制中,因此您無需手動恢復 ThreadLocal 值。我們選擇 taphandle 是因為這些運算子可以訪問繫結到 SubscriberContext,並允許在具體的 Reactive Streams 訊號上採取行動。

請記住:Reactor Context 用於寫入,ThreadLocals 用於讀取。

實際上,我們的請求處理器有點危險。如果我們延遲訂閱行為,我們將丟失關聯識別符號。考慮一下:

Mono<Void> requestHandler = handleRequest(); // <1>

Thread subscriberThread = new Thread(requestHandler::block); // <2>
subscriberThread.start();
subscriberThread.join();

輸出如下:

[      main][ 1388809065574796038] Assembling the chain
[parallel-1][                null] Adding product: test-product
[parallel-1][                null] Notifying shop about: test-product

裝配發生在 <1> 中,並且在 main 中設定了 ThreadLocal。然而,訂閱發生在 <2> 中的新 Thread 上,該 Thread 沒有可捕獲的 ThreadLocal 值。因此,我們的日誌沒有關聯識別符號。我們可以用 Mono.defer() 包裝處理器的主體來解決此問題。但是,與其那樣做,不如考慮我們是否一開始就需要實際設定 ThreadLocal

在呼叫 Reactor 鏈的命令式應用中(例如呼叫 WebClient 的 Spring MVC 控制器方法),ThreadLocal 值已經建立,contextCapture 將會捕獲它們並將其儲存在 Context 中。

另一方面,在像 WebFlux 這樣的響應式棧中,直接使用 contextWrite 更有意義。

我們知道 Reactor 會使用其 Context 的內容來恢復 ThreadLocal 值。如果我們直接將預期值儲存在 Context 中,而不是從當前狀態捕獲它們,我們將稍微提高效能,同時也將提高與函數語言程式設計正規化的相容性。讓我們試試看:

Mono<Void> handleRequest() {
  // initRequest(); -- no write to ThreadLocal
  log("Assembling the chain");

  return Mono.just("test-product")
    // <1>
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then())
    .contextWrite(
      Context.of("CORRELATION_ID", correlationId())); // <2>
}

讓我們執行它:

[      main][                null] Assembling the chain
[parallel-1][ 7059587638538899074] Adding product: test-product
[parallel-1][ 7059587638538899074] Notifying shop about: test-product

很好!我們的實際響應式鏈包含正確的關聯識別符號。

不幸的是,我們在裝配階段丟失了一個。原因之一是日誌記錄並非發生在 handletap 運算子內部。如果我們在 <1> 中使用 tap 運算子新增初始日誌,就不會出現問題。包含關聯識別符號的 Context 繫結到 <2> 上游的鏈。如果我們緊隨 contextWrite 呼叫後新增一個記錄日誌的 tap 運算子,我們將看不到正確的關聯識別符號——在該階段附加的 Context 是不同的,不包含我們的識別符號。我們稍後將回到這個問題,但首先,讓我們考慮是否可以簡化程式碼並避免使用特殊的運算子。

自動上下文傳播

reactor-core 3.5.0 釋出時,它被包含在 Spring Framework 6.0 和 Spring Boot 3.0 中。使用 Spring Cloud Sleuth 進行追蹤的現有 Spring 使用者習慣於日誌中填充 trace-idspan-id 值(類似於我們的關聯識別符號)。切換到新正規化(可觀察性是 Spring 核心產品套件的一部分)將要求現有應用重寫其日誌邏輯以使用 handletap 運算子。我們繼續思考如何使更多運算子能夠恢復 ThreadLocal 值。

正如我們在前一篇文章中看到的,恢復可以跨多個運算子的 ThreadLocal 值並非易事。選擇 handletap 是因為它們不會導致 ThreadLocal 值洩露。使用者程式碼執行後不會傳播任何訊號。使用者程式碼執行時 ThreadLocal 值存在。然後捕獲結果。最後,清除 ThreadLocal 上下文。只有在此之後,響應式訊號才會傳播到下游運算子。此外,我們希望更加有選擇性,因為在每個運算子中執行恢復會產生大量開銷,正如第 2 部分中所討論的。

我們仔細重新思考了所有內容,並提出了一個想法,可以將其結合到以下呼叫中(從 reactor-core 3.5.3 開始):

Hooks.enableAutomaticContextPropagation();

我們可以將其新增到我們應用的 main 方法中。

現在我們可以恢復我們最初的操作方法實現:

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty();
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true);
}

我們保持 handleRequest 方法和在新 Thread 上的訂閱不變。讓我們執行它:

[      main][                null] Assembling the chain
[parallel-1][ 8362586195225273235] Adding product: test-product
[parallel-1][ 8362586195225273235] Notifying shop about: test-product

成功!

有了這項功能,我們可以將使用 Spring Cloud Sleuth 的現有程式碼庫遷移到新的 Spring Framework,而無需更改日誌記錄方式。透過上述鉤子,如果您使用帶有 Micrometer Tracing 的 Spring Boot Actuator,SLF4J 日誌將填充追蹤資訊,而無需進行任何操作。很快,Spring Boot 將自動為您呼叫該鉤子。

編寫框架程式碼

我們提到過,我們將回到裝配時日誌記錄的問題。到目前為止,我們一直在請求處理邏輯中啟動關聯識別符號生成過程。理想情況下,我們的處理器應該由伺服器呼叫,並且結果 Publisher (FluxMono) 由呼叫程式碼訂閱。我們的處理器回到了最初的形狀:

Mono<Void> handleRequest() {
  log("Assembling the chain");

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product)).then());
}

讓我們透過將上下文附加到返回的 Mono 來模仿伺服器程式碼:

Mono<Void> requestHandler = handleRequest()
  .contextWrite(Context.of("CORRELATION_ID", correlationId()));

然後我們需要執行它:

requestHandler.block();

裝配時仍然缺少關聯識別符號:

[      main][                null] Assembling the chain
[parallel-1][ 5508113792645841519] Adding product: test-product
[parallel-1][ 5508113792645841519] Notifying shop about: test-product

contextWrite 運算子在訂閱時(以及其他生命週期事件)恢復 ThreadLocal 值。要在裝配時讓使用者程式碼擁有日誌,呼叫該程式碼的整個過程需要成為響應式鏈的一部分。這樣,使用者程式碼在外部 Mono 訂閱時執行,並且返回的內部 Mono 立即被訂閱。在整個執行過程中,如果我們這樣做在我們的“框架”程式碼中,外部 MonoContext 就可以在 ThreadLocal 中可用:

Mono<Void> requestHandler = Mono.defer(() -> handleRequest())
  .contextWrite(Context.of("CORRELATION_ID", correlationId()));

我們只需要做的是使用 Mono.defer() 並將 Context 附加到它。

幸運的是,Spring Framework 做得很好,並且也在訂閱階段處理了我們的裝配。

上下文傳播問題是否已解決?

這種新方法看起來非常令人鼓舞。有人可能會想,考慮到過去採用的方法,這種新機制將如何打破?我們對這種方法更有信心,因為它更符合 Reactive Streams 的本質。那些不基於 Reactor Context 的方法內嵌著一個主要的誤解——它們將 ThreadLocal 值向下傳播——希望在某個時刻進行清理。然而,傳播並沒有語義上的邊界來停止。

依賴 ThreadLocal 值向下傳播也可能是錯誤的來源。響應式庫向上遊和下游傳播訊號。一個訊號可能觸發另一個訊號,但並非必須如此。不同的 Thread 可以繼續處理。像 flatMap 之類的運算子進行的某些最佳化(例如預取)可以從上游請求並排隊值,而無需我們下游傳播機制的參與。如果我們想在背壓或取消時也能獲得上下文資訊,我們需要考慮所有可能的訊號。

一個重要的觀察來自 Context 規定邏輯邊界的方式。當你呼叫 contextWrite 並將值儲存在 Context 中時,所有上游運算子都可以訪問修改後的版本。所有下游運算子將看不到修改,但會看到你的修改所基於的狀態。

繫結到 SubscriberContext 的性質是我們新方法的基礎。我們修改了 contextWrite 運算子,使其在訊號在訂閱時、取消和請求時向上遊傳播時設定 ThreadLocal 值以反映當前的 Context。但是,當訊號向下遊傳播時,它會將這些 ThreadLocal 值重置為下游 Context 中表示的值。

我們仍然需要使用 Scheduler 包裝方法。我們還需要 Queue 包裝方法(為此我們需要改進生命週期語義)。

但我們可以考慮透過在這些情況下傳輸 Reactor Context 而不是捕獲 ThreadLocal 值來改善情況。這可以提高效能。

此外,當我們使用不在 Reactor 控制範圍內的 Publishers 或使用我們無法控制的 Threads 的源(例如使用 Mono.fromFuture() 來模擬遠端呼叫)時,我們仍然會丟失 ThreadLocal 值。目前的緩解措施是引入 contextWrite 運算子的語義邊界,它並不真正改變 Context,就像 notifyShop 方法的這個變體一樣:

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return makeRequest(productName) // <1>
    .contextWrite(Function.identity()) // <2>
    .doOnNext(r -> log("Request done.")); // <3>
}

makeRequest 方法定義在本系列的上一篇文章中。如果我們假設 makeRequest 是第三方庫呼叫,它使用了我們無法控制的 Threads,那麼我們也無法包裝它在 <1> 中執行的程式碼以及完成其操作的非同步程式碼。該鏈部分進行的任何日誌記錄都不會填充關聯識別符號。傳播此類上下文將是庫作者的責任。但是,由於我們在 <2> 中使用了邊界,因此我們在 <3> 中的日誌包含關聯識別符號。

我們打算在 reactor-core 中新增必要的功能,以便為那些可能以 Reactor 無法控制的方式改變 Threads 的源提供此類邊界。

在命令式場景中,只調用響應式程式碼以使用阻塞訂閱(例如透過使用 block()),我們計劃自動執行 contextCapture,以透明地將當前 ThreadLocal 值傳播到響應式鏈中。這在使用 WebClient 的 Spring MVC 應用程式中將非常有用。

Context-propagation 庫

捕獲 ThreadLocal 狀態並在不同位置恢復它本身就是一個有趣的主題。通常,我們會想到多個相互之間具有邏輯連線或與各種關注點相對應的類似 Map 的結構。我們建立了一個專用庫,允許透過捕獲其狀態並將其恢復到各自的目標來在 ThreadLocals 和任意物件之間進行轉換。在前面的示例中,我們使用了 context-propagation 庫的一些 API。它在 Micrometer 旗下開源,如果您想在程式碼中使用它,它也有包含示例的參考文件

Project Reactor 使用 ServiceLoader JDK 機制註冊了一個處理 Reactor ContextContextAccessor。另一方面,Micrometer 註冊了一個 ObservationThreadLocalAccessor,它處理 Micrometer Tracing 和其他儀器機制透過單一 Observation 概念工作所需的 ThreadLocal 狀態。

我們強烈建議嘗試使用 Spring Boot 和 Spring Boot Actuator 來啟用追蹤功能,親身感受一下這種體驗的連貫性。

總結

在本系列部落格文章中,我們介紹了上下文傳播的基礎知識,並探討了連線命令式和響應式程式設計範例的歷史和現狀。我們希望您現在可以自信地使用我們實現的功能。在最好的情況下,如果您使用自動上下文傳播功能,您幾乎無需進行任何工作。此外,在這種有趣的場景下,我們希望您的自定義傳播邏輯可以利用我們在本文中描述的原語。如果您有疑問或想在 GitHub 上報告問題,可以聯絡我們

致謝

如果沒有同事們逐字逐句的審閱,本系列文章將無法發表。我要感謝(按字母順序排列):Simon Baslé, Jay Bryant, Pierre De Rop, Oleh Dokuka, Marcin Grzejszczak, Robert McNees, Rossen Stoyanchev, 和 Tadaya Tsuyukubo。

彩蛋

要玩轉使用的示例,請隨時使用我的 GitHub 倉庫中的相關包

訂閱 Spring 時事通訊

透過 Spring 時事通訊保持聯絡

訂閱

保持領先

VMware 提供培訓和認證,助力您突飛猛進。

瞭解更多

獲取支援

Tanzu Spring 透過簡單的訂閱,提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部