領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多Reactor 3.6.0 即將釋出,並將於 11 月 14 日全面上市。這篇博文介紹了此即將釋出的版本中包含的新功能!
如今,每個人都在談論 Java 21 和 Project Loom。Project Reactor 團隊也聽到了這些聲音,並看到了該專案在我們的生態系統中的價值。透過此即將釋出的版本,我們引入了對 VirtualThread 實現的支援。
讓我們考慮以下程式碼示例
package io.projectreactor.samples;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class LoomSample {
public static void main(String[] args) {
Flux.using(
() -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
Flux::fromStream,
Stream::close
)
.subscribeOn(Schedulers.boundedElastic())
.map(v -> Thread.currentThread() + " " + v)
.log()
.blockLast();
}
}
上面的程式碼以響應式方式讀取文字檔案中的所有行。不幸的是,Files.lines 方法是一個系統 I/O 呼叫,已知是阻塞的。因此,我們將所有這些操作安排在共享的 Schedulers.boundedElastic() 排程器上。眾所周知,Schedulers.boundedElastic() 是主要的共享排程器,用於 解除安裝 系統中可能進行的所有阻塞呼叫。它既用於簡單的 HTTP 阻塞呼叫,也用於封裝一些不可避免的阻塞系統互動,例如生成隨機的 UUID。然而,它預設使用 平臺 Thread 例項,這可能會增加系統的爭用。
現在,隨著 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler 實現可以取代預設實現,以便在 Schedulers.boundedElastic() 中使用虛擬執行緒而不是平臺執行緒。您所需要做的就是將您的應用程式執行在 Java 21+ 上,並設定 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true 系統屬性。

正如您可能已經注意到的,您將有一個 VirtualThread 例項來承載計劃的工作。
正如您可能從我們之前的部落格中聽到的,從 Reactor 3.5.0 開始,我們引入了一種機制,用於在 handle 和 tap 等運算子中,從 Reactor Context 自動恢復 ThreadLocal。後來,在 reactor 3.5.3 中,我們在 Project Reactor 中可用的所有運算子集中添加了 ThreadLocal 值的自動恢復。
static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1>
}
public static void main(String[] args) {
logger.info("Setting Trace ID test-123-567-890");
TRACE_ID.set("test-123-567-890"); <1>
Hooks.enableAutomaticContextPropagation(); <2>
Mono.fromCallable(() -> {
logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
return UUID.randomUUID();
})
.subscribeOn(Schedulers.boundedElastic()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
.block();
}
上面的程式碼生成一個隨機的 UUID,它將阻塞的生成過程 <3> 解除安裝到一個專門的工作器上。要啟用 ThreadLocal 自動傳播的魔力,您需要在執行時提供 Micrometer Context Propagation 庫,註冊 <1> 所需的 ThreadLocal 例項,然後呼叫 Hooks API <2> 來啟用這種特定的 傳播模式。如果我們檢查上面的程式碼輸出,我們會看到指定的 <1> TRACE_ID ThreadLocal 在所有地方 <3> <4> 都始終可用,無論 Thread 是否切換。
[ INFO] (main) Setting Trace ID test-123-567-890 <1>
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>
Thread main 上設定Thread boundedElastic-1 上可用儘管這種機制已經足夠接近每個人想要的,但它受到 Reactor 自己的生產者和轉換器的限制。為了理解它可能無法完美工作的地方,讓我們修改上面的示例並新增與外部基於 Reactive Streams 的庫(例如 JDK11 HttpClient)的整合。
static HttpClient jdkHttpClient = HttpClient.newHttpClient();
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}
public static void main(String[] args) {
logger.info("Setting Trace ID");
TRACE_ID.set("test-123-567-890");
Hooks.enableAutomaticContextPropagation();
Mono.fromFuture(() -> {
logger.info("[" + TRACE_ID.get() + "] Preparing request");
return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
.uri(URI.create("https://httpbin.org/drip"))
.GET()
.build(),
HttpResponse.BodyHandlers.ofPublisher());
})
.flatMapMany(r -> {
logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
return FlowAdapters.toPublisher(r.body()); <2>
})
.collect(new ByteBufferToStringCollector()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
.block();
}
在修改後的示例中,我們進行網路呼叫 <1>,然後讀取響應。響應體表示為 Flow.Publisher <2>,我們將其展平並轉換為字串表示 <3>。一旦這段程式碼執行,其中一種可能的輸出可能如下所示:
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>
從輸出中我們可以觀察到,在 reactor 3.5.3+ 中,消費外部 Publisher 可能會導致上下文丟失 <1>,因為我們不知道是否需要額外的工作來恢復丟失的 ThreadLocal 例項。
使用 reactor 3.6.x,此輸出始終一致
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>
在此版本中,我們增強了 ThreadLocal 值恢復機制,並添加了額外的邏輯,用於檢測任何外部 Publisher 實現。一旦檢測到這些實現,我們就會對其進行裝飾,以確保您在我們的管道中操作時永遠不會丟失 ThreadLocal 值。
在 reactor 3.6.x 中,我們採納了 多版本 jar (MRJ) 支援,並已新增 改進,儘可能消除了反射。我們 計劃 在即將釋出的版本中擴充套件 MRJ 使用,並使用所有 JDK9+ 特性!
敬請期待!所有原始碼都可以在 Github 上找到。