reactor-core 3.6.0 中有哪些新特性?

工程 | Oleh Dokuka | 2023 年 10 月 31 日 | ...

Reactor 3.6.0 即將釋出,並將於 11 月 14 日全面上市。這篇博文介紹了此即將釋出的版本中包含的新功能!

虛擬執行緒支援

如今,每個人都在談論 Java 21Project Loom。Project Reactor 團隊也聽到了這些聲音,並看到了該專案在我們的生態系統中的價值。透過此即將釋出的版本,我們引入了對 VirtualThread 實現的支援。

為什麼它很方便?

讓我們考慮以下程式碼示例

package io.projectreactor.samples;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class LoomSample {

   public static void main(String[] args) {
      Flux.using(
                () -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
                Flux::fromStream,
                Stream::close
           )
          .subscribeOn(Schedulers.boundedElastic())
          .map(v -> Thread.currentThread() + " " + v)
          .log()
          .blockLast();
   }
}

上面的程式碼以響應式方式讀取文字檔案中的所有行。不幸的是,Files.lines 方法是一個系統 I/O 呼叫,已知是阻塞的。因此,我們將所有這些操作安排在共享的 Schedulers.boundedElastic() 排程器上。眾所周知,Schedulers.boundedElastic() 是主要的共享排程器,用於 解除安裝 系統中可能進行的所有阻塞呼叫。它既用於簡單的 HTTP 阻塞呼叫,也用於封裝一些不可避免的阻塞系統互動,例如生成隨機的 UUID。然而,它預設使用 平臺 Thread 例項,這可能會增加系統的爭用。

現在,隨著 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 實現可以取代預設實現,以便在 Schedulers.boundedElastic() 中使用虛擬執行緒而不是平臺執行緒。您所需要做的就是將您的應用程式執行在 Java 21+ 上,並設定 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系統屬性。

Reactive Bounded Elastic on VirtualThreads

正如您可能已經注意到的,您將有一個 VirtualThread 例項來承載計劃的工作。

更好的自動上下文傳播

正如您可能從我們之前的部落格中聽到的,從 Reactor 3.5.0 開始,我們引入了一種機制,用於在 handletap 等運算子中,從 Reactor Context 自動恢復 ThreadLocal。後來,在 reactor 3.5.3 中,我們在 Project Reactor 中可用的所有運算子集中添加了 ThreadLocal 值的自動恢復。

static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1> 
}

public static void main(String[] args) {
   logger.info("Setting Trace ID test-123-567-890");
   TRACE_ID.set("test-123-567-890"); <1>

   Hooks.enableAutomaticContextPropagation(); <2> 

   Mono.fromCallable(() -> {
          logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
          return UUID.randomUUID();
       })
       .subscribeOn(Schedulers.boundedElastic()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
       .block();
}

上面的程式碼生成一個隨機的 UUID,它將阻塞的生成過程 <3> 解除安裝到一個專門的工作器上。要啟用 ThreadLocal 自動傳播的魔力,您需要在執行時提供 Micrometer Context Propagation 庫,註冊 <1> 所需的 ThreadLocal 例項,然後呼叫 Hooks API <2> 來啟用這種特定的 傳播模式。如果我們檢查上面的程式碼輸出,我們會看到指定的 <1> TRACE_ID ThreadLocal 在所有地方 <3> <4> 都始終可用,無論 Thread 是否切換。

[ INFO] (main) Setting Trace ID test-123-567-890 <1> 
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>

  1. 跟蹤 ID 在 Thread main 上設定
  2. 相同的跟蹤 ID 在 Thread boundedElastic-1 上可用

儘管這種機制已經足夠接近每個人想要的,但它受到 Reactor 自己的生產者和轉換器的限制。為了理解它可能無法完美工作的地方,讓我們修改上面的示例並新增與外部基於 Reactive Streams 的庫(例如 JDK11 HttpClient)的整合。

static HttpClient jdkHttpClient = HttpClient.newHttpClient();

static {
   ContextRegistry.getInstance()
                  .registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}

public static void main(String[] args) {
   logger.info("Setting Trace ID");
   TRACE_ID.set("test-123-567-890");

   Hooks.enableAutomaticContextPropagation();

   Mono.fromFuture(() -> {
          logger.info("[" + TRACE_ID.get() + "] Preparing request");
          return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
                                             .uri(URI.create("https://httpbin.org/drip"))
                                             .GET()
                                             .build(),
                HttpResponse.BodyHandlers.ofPublisher());
       })
       .flatMapMany(r -> {
          logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
          return FlowAdapters.toPublisher(r.body()); <2>
       })
       .collect(new ByteBufferToStringCollector()) <3>
       .doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
       .block();
}

在修改後的示例中,我們進行網路呼叫 <1>,然後讀取響應。響應體表示為 Flow.Publisher <2>,我們將其展平並轉換為字串表示 <3>。一旦這段程式碼執行,其中一種可能的輸出可能如下所示:

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>

從輸出中我們可以觀察到,在 reactor 3.5.3+ 中,消費外部 Publisher 可能會導致上下文丟失 <1>,因為我們不知道是否需要額外的工作來恢復丟失的 ThreadLocal 例項。

使用 reactor 3.6.x,此輸出始終一致

[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>

在此版本中,我們增強了 ThreadLocal 值恢復機制,並添加了額外的邏輯,用於檢測任何外部 Publisher 實現。一旦檢測到這些實現,我們就會對其進行裝飾,以確保您在我們的管道中操作時永遠不會丟失 ThreadLocal 值。

還有什麼?多版本 Jar 支援!

在 reactor 3.6.x 中,我們採納了 多版本 jar (MRJ) 支援,並已新增 改進,儘可能消除了反射。我們 計劃 在即將釋出的版本中擴充套件 MRJ 使用,並使用所有 JDK9+ 特性!

敬請期待!所有原始碼都可以在 Github 上找到。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有