使用 Project Reactor 1 進行上下文傳播 - 基礎知識

工程 | Dariusz Jędrzejczyk | 2023年3月28日 | ...

此文章是系列文章的一部分

  1. 基礎知識
  2. Spring Cloud Sleuth 的坎坷之路
  3. 響應式與命令式之間的統一橋接

Spring Boot 3 和 Spring Framework 6 為我們帶來了一種統一且一致的方式,可以在使用 Micrometer 的應用程式中啟用可觀測性。從 Spring Cloud Sleuth 到 Micrometer 的演變,以及 Observation API 和 Micrometer Tracing,使我們整合了各種上下文傳播方法。在本系列部落格文章中,我們旨在解釋我們如何在 Project Reactor 中支援上下文傳播,以滿足命令式庫的需求。透過從基礎開始構建您的理解,您將能夠使用這些構造並理解其底層原理。我們假設您對響應式程式設計概念有基本的理解。如果您是新手或想複習知識,請檢視 Project Reactor 文件中的響應式程式設計簡介

在本文中,我們將開發一個簡單的電子商務應用程式。我們有限的視角考慮了一個新增產品並通知商店有新產品新增到庫存的請求。作為負責任的開發人員,我們希望記錄特定請求所採取的所有步驟,以便在調查問題時,我們可以檢視日誌並瞭解發生了什麼。我們將探討如何以命令式風格實現為日誌實用程式提供有關請求的上下文元資料的目標,並將其與 Project Reactor 更具函式式、宣告式風格進行比較。接下來的文章將更詳細地探討為什麼以及如何我們需要在這兩種程式設計風格之間建立橋樑。

ThreadLocal

為了識別屬於特定請求的日誌,我們需要一種關聯它們的方法。我們可以生成一個簡單的隨機識別符號,如下所示

static long correlationId() {
  return Math.abs(ThreadLocalRandom.current().nextLong());
}

我們需要一種方法使關聯識別符號在日誌實用程式中可用。我們可以將關聯作為業務邏輯中每個方法呼叫的一部分,但這會非常侵入性和冗長。

通常,第三方庫使用 JDK 的 ThreadLocal 來傳遞與我們應用程式業務邏輯不直接相關的隱式資訊。

讓我們為關聯識別符號宣告一個靜態欄位

static final ThreadLocal<Long> CORRELATION_ID = new ThreadLocal<>();

這是我們的日誌方法。它列印當前的 Thread 名稱並格式化輸出

static void log(String message) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, CORRELATION_ID.get(), message);
}

現在我們擁有了處理請求和使用隱式關聯識別符號記錄每個步驟所需的一切。

在每個請求開始時,應用程式都會呼叫以下方法來啟動關聯

static void initRequest() {
  CORRELATION_ID.set(correlationId()));
}

我們簡化的請求處理程式執行以下操作

void handleRequest() {
  initRequest();

  addProduct("test-product");
  notifyShop("test-product");
}

業務邏輯中的日誌記錄如下所示

void addProduct(String productName) {
  log("Adding product: " + productName);
  // ...
}

void notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  // ...
}

我們可以期望我們的應用程式記錄這些行

[      main][ 8592000019542134146] Adding product: test-product
[      main][ 8592000019542134146] Notifying shop about: test-product

只要特定請求的執行發生在同一個 Thread 上,並且不與其他關注點交錯,ThreadLocal 就可以讓我們將業務邏輯與用於日誌記錄的元資料解耦。

非同步處理

假設這個應用程式開始承受更高的負載,需要處理許多併發請求。假設我們可以使用一個非同步和非阻塞的伺服器實現,它要求我們提供非同步宣告而不是命令式和阻塞步驟。

我們的請求處理程式可以返回一個 CompletableFuture 以非同步和非阻塞方式處理請求

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"))
    .thenRunAsync(() -> notifyShop("test-product"))
}

不幸的是,當我們執行非同步版本時,日誌不再包含關聯識別符號

[l-worker-1][                null] Adding product: test-product
[l-worker-1][                null] Notifying shop about: test-product

任務包裝

解決此問題的已知方法是包裝由非同步 API 執行的任務。透過包裝,我們指的是一種執行 ThreadLocal 上下文恢復的實現。當任務建立時,捕獲當前上下文。當工作 Thread 實際執行任務時,該上下文將恢復。讓我們看看這在我們使用 Runnable 的示例案例中如何工作

class WrappedRunnable implements Runnable {

  private final Long correlationId;
  private final Runnable wrapped;

  public WrappedRunnable(Runnable wrapped) {
    this.correlationId = CORRELATION_ID.get();
    this.wrapped = wrapped;
  }

  @Override
  public void run() {
    Long old = CORRELATION_ID.get();
    CORRELATION_ID.set(this.correlationId);
    try {
      wrapped.run();
    } finally {
      CORRELATION_ID.set(old);
    }
  }
}

我們可以像這樣重新實現我們的處理程式

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(new WrappedRunnable(
      () -> addProduct("test-product")))
    .thenRunAsync(new WrappedRunnable(
      () -> notifyShop("test-product")));
}

不幸的是,這帶來了很多開銷。幸運的是,JDK 有一個用於執行非同步任務的 API:Executor 介面。在實際場景中,我們希望使用更全面的 API,即 ExecutorService。然而,出於我們的解釋目的,Executor 應該足夠了。

讓我們看看

static class WrappedExecutor implements Executor {

  private final Executor actual;

  WrappedExecutor(Executor actual) {
    this.actual = actual;
  }

  @Override
  public void execute(Runnable command) {
    actual.execute(new WrappedRunnable(command));
  }
}

讓我們重用 CompletableFuture 框架預設使用的通用 ForkJoinPool,但用我們的實現包裝它。現在我們的程式碼看起來像這樣

static Executor executor = new WrappedExecutor(ForkJoinPool.commonPool());

CompletableFuture<Void> handleRequest() {
  return CompletableFuture
    .runAsync(() -> addProduct("test-product"), executor)
    .thenRunAsync(() -> notifyShop("test-product"), executor);
}

我們的日誌再次正常工作

[l-worker-1][ 7321040639376081961] Adding product: test-product
[l-worker-2][ 7321040639376081961] Notifying shop about: test-product

在某些情況下,CompletableFuture 框架可以提供以非阻塞方式處理非同步任務的方法。然而,在許多情況下,有限的 API 表面及其行為特性可能會受到限制。例如,當我們的系統達到容量時,我們可能希望延遲處理並在以後恢復。使用 CompletableFuture,所有建立的例項在建立後立即開始計算。我們可能還希望對資料流應用更細粒度的操作,而不是對單個計算單元進行操作。由於這些以及更多原因,我們可能會考慮使用響應式程式設計庫。我們將考慮 Project Reactor,它是 Spring 組合中預設的響應式實現。

Project Reactor

為了提供一個健壯的非同步處理框架,Java 社群提出了 Reactive Streams 規範。它有助於建立 JDK 之前所缺乏的通用詞彙——明確的訊號傳播、錯誤處理、終止和生命週期管理語義。它還允許內建背壓。Spring 透過引入 WebFlux 採用了這種方法,使 Project Reactor 及其響應式型別成為 API 的一等公民。

Reactive Streams 為非同步流處理帶來了優雅而簡潔的解決方案。然而,上下文傳播不屬於規範的一部分。響應式庫的非阻塞和非同步特性,加上潛在的複雜實現,使得使用 ThreadLocal 極其困難。原因是無法保證哪個 Thread 可以執行使用者的程式碼。只要實現保證序列傳遞,就可以執行各種最佳化,從而使使用者的程式碼與併發無關,將處理併發的負擔轉移到庫內部。

為了兌現其保證,Java 中的響應式程式設計假設使用函數語言程式設計正規化來形成宣告式和可組合的流,它與不同的 Thread 可以執行使用者提供的程式碼的事實無關。只要使用者程式碼中沒有假設在特定 Thread 中執行的副作用,響應式庫就可以提供極其高效能的執行時,同時遵守規範。ThreadLocal 顯然違反了這一要求。

讓我們嘗試重寫我們的處理程式以使用 Project Reactor。單個操作變為

Mono<Void> addProduct(String productName) {
  log("Adding product: " + productName);
  return Mono.empty(); // Assume we’re actually storing the product
}

Mono<Boolean> notifyShop(String productName) {
  log("Notifying shop about: " + productName);
  return Mono.just(true); // Assume we’re actually notifying the shop
}

讓我們嘗試使用上面的程式碼

Mono<Void> handleRequest() {
  initRequest();
  log("Assembling the chain");

  return Mono.just("test-product")
    .flatMap(product ->
      Flux.concat(
        addProduct(product),
        notifyShop(product))
      .then())
}

我們簡單的實現產生了預期的輸出

[      main][ 7224499961623309444] Assembling the chain
[      main][ 7224499961623309444] Adding product: test-product
[      main][ 7224499961623309444] Notifying shop about: test-product

上面的實現是在 main Thread 中呼叫的,並且執行被限制在該 Thread 中。我們不應該做這樣的假設。

在處理程式中,我們在傳播處理結果之前引入了一個小的延遲。我們這樣做是為了演示幕後發生的隱式 Thread 切換。

Mono<Void> handleRequest() {
  initRequest(); <1>
  log("Assembling the chain"); // <2>

  return Mono.just("test-product")
    .delayElement(Duration.ofMillis(1)) // <3>
    .flatMap(product ->
      Flux.concat(
        addProduct(product), // <4>
        notifyShop(product))
      .then())
}

執行時,打印出以下內容

[      main][ 6265915299594887150] Assembling the chain
[parallel-1][                null] Adding product: test-product
[parallel-1][                null] Notifying shop about: test-product

發生了什麼?為什麼一個日誌有相關識別符號而其他日誌沒有?

當伺服器呼叫我們的處理程式時,<1> 處的初始化設定了 ThreadLocal 關聯識別符號,<2> 處的日誌能夠使用它。那些經驗豐富的響應式程式設計人員會告訴您問題在於執行發生在不同的階段。ThreadLocal 在組裝時設定。“您也應該在訂閱時恢復它”將是一個建議。我們稍後再討論。如果“組裝”、“訂閱”和“執行時間”這些術語讓您感到困惑,請檢視Simon 的部落格文章中的出色解釋或觀看同名演講

雖然方法會立即返回,但這並不能保證執行已經開始。這是因為返回的 Mono 必須被訂閱才能觸發處理。它可能發生在不同的 Thread 中。<3> 處的 delayElement 運算子隱式地使用 Reactor 中的共享 Scheduler(執行緒池的抽象)在指定延遲後在另一個 Thread 上傳遞訊號。該訊號傳播到下游運算子,這使我們能夠先新增產品,然後通知商店。我們組裝的管道還有更多令人驚訝的方面,但我們不要太困惑。

問題在於,在 <4> 中,如果我們記錄日誌,我們無法真正確定呼叫將在哪個 Thread 上發生。像 flatMap 這樣的運算子可以引入自己的非同步性。

在常規情況下,當鏈被訂閱時,值開始被傳遞。因此,我們可以在每次訂閱時恢復 ThreadLocal 值。但這並非總是最好的主意。Subscription 可以非同步傳遞,在不同的 Thread 上。值也可以在不同的 Thread 上傳遞。在背壓的情況下,訊號可以作為請求更多資料的結果在執行請求的 Thread 上傳遞,而不是由資料 Publisher 使用的 Thread。有很多需要考慮的活動部件和怪癖!要了解更多關於 Reactor 中的執行緒和非同步執行,請查閱我們之前部落格文章系列的另一部分

Reactor Context

Project Reactor 引入了一種與函數語言程式設計很好地對齊的機制,以提供傳輸上下文元資料的方法。它簡單地稱為 Context。儘管幕後發生了執行緒切換,但它仍然附加到響應式鏈上。

正如我們所見,Project Reactor 允許宣告性地指定意圖,同時保持併發無關性。它確實提供了在必要時控制併發的方法,透過使用專用運算子或配置引數(例如 publishOnsubscribeOnflatMap 的高階引數),但這種控制級別與核心處理邏輯抽象分離。

我們前面提到過副作用。我們如何擺脫它們,同時仍然能夠傳輸上下文元資料?

為了與函數語言程式設計良好配合,Context 繫結到 Subscriber,即 Publisher 發出的訊號的消費者。訂閱時,Subscriber 對組裝管道中的所有先行運算子可見。當我們使用不可變的 Map 狀資料結構關聯到 Subscriber 例項時,它允許在響應式管道的部分中附加和檢索上下文資訊。

透過控制影響以及在響應式鏈中的步驟之間提供繼承的方法,Reactor Context 是一個無副作用的概念,可用於為處理提供元資訊。“這正是我們關聯請求所需要的!”

讓我們重寫我們的應用程式以使用 Reactor Context 而不是 ThreadLocal

首先,我們需要將關聯識別符號設為日誌方法的顯式引數

static void log(String message, long correlationId) {
  String threadName = Thread.currentThread().getName();
  String threadNameTail = threadName.substring(
    Math.max(0, threadName.length() - 10));
  System.out.printf("[%10s][%20s] %s%n",
    threadNameTail, correlationId, message);
}

我們的行動如下

Mono<Void> addProduct(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Adding product: " + productName, ctx.get("CORRELATION_ID"));
    return Mono.empty(); // Assume we’re actually storing the product
  });
}

Mono<Boolean> notifyShop(String productName) {
  return Mono.deferContextual(ctx -> {
    log("Notifying shop about: " + productName,
      ctx.get("CORRELATION_ID"));
    return Mono.just(true);
  });
}

有趣的是我們如何提供關聯識別符號。我們使用一個特殊的運算子 Mono.deferContextual,它可以訪問 Context。我們從 ContextView(一個簡化的、只讀的 Context 版本)中提取關聯識別符號,然後返回一個實際的 Mono 供呼叫者訂閱。

我們的處理程式看起來像這樣

Mono<Void> handleRequest() {
  long correlationId = correlationId();
  log("Assembling the chain", correlationId);

  Mono.just("test-product")
    .delayElement(Duration.ofMillis(1))
    .flatMap(product ->
      Flux.concat(addProduct(product), notifyShop(product))
          .then())
    .contextWrite(Context.of("CORRELATION_ID", correlationId));

訂閱後,輸出符合預期

[      main][ 6328001264807824115] Assembling the chain
[parallel-1][ 6328001264807824115] Adding product: test-product
[parallel-1][ 6328001264807824115] Notifying shop about: test-product

資訊流的反轉是顯而易見的。與任何響應式鏈一樣,我們透過組裝運算子鏈來定義處理流。一旦我們(或實際上是伺服器)訂閱此鏈,資訊就會從下游運算子流向上游運算子以啟動處理。之後,實際資料訊號從上游傳遞到下游——例如,“test-product”值傳遞給 flatMap 運算子,然後傳遞給 concat 運算子,後者又將值提供給 addProductnotifyShop。由於這種邏輯流,我們在最後(使用 contextWrite 方法)寫入 Context,就在任何 Subscriber 訂閱鏈之前。我們可以想象 Context 隨後變得與 Subscriber 一起可供上游運算子中的所有階段訪問。

無論響應式管道在執行使用者業務邏輯的過程中進行了多少次執行緒切換,上下文都不會丟失。

您可以在我們的文件中閱讀有關 Reactor Context 的更多資訊。

第三方庫

不幸的是,我們不能指望第三方庫使用 Reactor Context 來提供可觀察性功能。傳播隱式元資訊的實際“貨幣”是 ThreadLocal。像 SLF4J 這樣的庫採用命令式風格,並在 Java 社群中佔據穩定地位。如果我們能讓它們與響應式正規化協同工作,而不是期望它們適應響應式正規化,那將是一個明顯的勝利。在下一部分中,我們將討論在 Spring Cloud Sleuth(一個可以與 Reactor 一起使用的追蹤庫)中傳播響應式鏈中 ThreadLocal 值的歷史和挑戰。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有