reactor-core 3.6.0 有什麼新變化?

工程 | Oleh Dokuka | 2023年10月31日 | ...

Reactor 3.6.0 即將釋出,並將在 11 月 14 日正式釋出 (GA)。這篇博文介紹了此即將釋出的版本中包含的新功能!

支援虛擬執行緒

如今,每個人都在談論 Java 21Project Loom。Project Reactor 團隊聽取了這些意見,並看到了該專案在我們生態系統中的價值。在此即將釋出的版本中,我們引入了對 VirtualThread 實現的支援。

為何如此方便?

考慮以下程式碼示例

package io.projectreactor.samples;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class LoomSample {

   public static void main(String[] args) {
      Flux.using(
                () -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
                Flux::fromStream,
                Stream::close
           )
          .subscribeOn(Schedulers.boundedElastic())
          .map(v -> Thread.currentThread() + " " + v)
          .log()
          .blockLast();
   }
}

上面的程式碼以響應式方式讀取文字檔案中的所有行。不幸的是,Files.lines 方法是系統 I/O 呼叫,已知會阻塞。因此,我們將所有這些操作排程到共享的 Schedulers.boundedElastic() 排程器上。Schedulers.boundedElastic() 是用於解除安裝系統中可能進行的任何阻塞呼叫的主要共享排程器,這已不是秘密。它既用於簡單的 HTTP 阻塞呼叫,也用於包裝一些不可避免的阻塞系統互動,例如生成隨機的 UUID。但是,它預設使用 平臺 Thread 例項,這可能會增加系統的競爭。

現在,藉助 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 實現可以替換預設實現,以便在 Schedulers.boundedElastic() 中使用虛擬執行緒而不是平臺執行緒。您只需在 Java 21+ 上執行您的應用並設定 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系統屬性即可。

Reactive Bounded Elastic on VirtualThreads

您可能已經注意到,您將擁有一個承載計劃工作的 VirtualThread 例項。

更好的自動上下文傳播

正如您可能從我們自 Reactor 3.5.0 開始的早期部落格中聽到的那樣,我們引入了一種機制,用於在 handletap 等運算子中,從 Reactor Context 自動恢復 ThreadLocal。後來,在 reactor 3.5.3 中,我們在 Project Reactor 中可用的所有運算子中添加了 ThreadLocal 值的自動恢復功能。

static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1> 
}

public static void main(String[] args) {
   logger.info("Setting Trace ID test-123-567-890");
   TRACE_ID.set("test-123-567-890"); <1>

   Hooks.enableAutomaticContextPropagation(); <2> 

   Mono.fromCallable(() -> {
          logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
          return UUID.randomUUID();
       })
       .subscribeOn(Schedulers.boundedElastic()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
       .block();
}

上面的程式碼生成一個隨機的 UUID,它將阻塞生成過程 <3> 解除安裝到專門的工作器上。要啟用自動 ThreadLocal 傳播魔法,您需要在執行時提供 Micrometer Context Propagation 庫,註冊 <1> 所需的 ThreadLocal 例項,然後呼叫 Hooks API <2> 來啟用此特定的傳播模式。如果我們檢查上面程式碼的輸出,我們會看到指定的 <1> TRACE_ID ThreadLocal 在所有位置 <3> <4> 始終可用,無論 Thread 如何切換。

[ INFO] (main) Setting Trace ID test-123-567-890 <1> 
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>

  1. Thread main 上設定了 Trace ID
  2. Thread boundedElastic-1 上可用的相同 Trace ID

儘管此機制已足夠接近每個人想要的,但它受到 Reactor 自有的生產者和轉換器的限制。要了解它可能無法完美工作的地方,讓我們修改上面的示例,並新增與外部基於 Reactive Streams 的庫(例如 JDK11 HttpClient)的整合

static HttpClient jdkHttpClient = HttpClient.newHttpClient();

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}

public static void main(String[] args) {
   logger.info("Setting Trace ID");
   TRACE_ID.set("test-123-567-890");

   Hooks.enableAutomaticContextPropagation();

   Mono.fromFuture(() -> {
          logger.info("[" + TRACE_ID.get() + "] Preparing request");
          return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
                                             .uri(URI.create("https://httpbin.org/drip"))
                                             .GET()
                                             .build(),
                HttpResponse.BodyHandlers.ofPublisher());
       })
       .flatMapMany(r -> {
          logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
          return FlowAdapters.toPublisher(r.body()); <2>
       })
       .collect(new ByteBufferToStringCollector()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
       .block();
}

在修改後的示例中,我們進行網路呼叫 <1>,然後讀迴響應。響應體表示為 Flow.Publisher <2>,我們將其展平並轉換為字串表示形式 <3>。此程式碼執行後,可能的輸出之一可能如下所示

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>

從輸出中我們可以觀察到,使用 reactor 3.5.3+,消費外部 Publisher 可能會導致上下文丟失 <1>,因為我們不知道是否需要進行額外的提升來恢復丟失的 ThreadLocal 例項。

使用 reactor 3.6.x,此輸出始終一致

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>

在此版本中,我們增強了 ThreadLocal 值恢復機制,並添加了額外的邏輯來檢測任何外部 Publisher 實現。一旦檢測到這些實現,我們就會對它們進行裝飾,以確保您在管道中操作時永遠不會丟失 ThreadLocal 值。

還有什麼?支援 Multi-Release Jar!

在 reactor 3.6.x 中,我們採納了 multi-release jar (MRJ) 支援,並已經添加了改進,在可能的情況下消除了反射。我們計劃在未來的版本中擴充套件 MRJ 的使用,並利用所有 JDK9+ 的特性!

敬請關注!所有原始碼都可以在 Github 上找到

獲取 Spring 新聞簡報

訂閱 Spring 新聞簡報以保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速發展。

瞭解更多

獲取支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群所有即將到來的活動。

檢視全部