領先一步
VMware 提供培訓和認證,助您加速發展。
瞭解更多Reactor 3.6.0 即將釋出,並將在 11 月 14 日正式釋出 (GA)。這篇博文介紹了此即將釋出的版本中包含的新功能!
如今,每個人都在談論 Java 21 和 Project Loom。Project Reactor 團隊聽取了這些意見,並看到了該專案在我們生態系統中的價值。在此即將釋出的版本中,我們引入了對 VirtualThread
實現的支援。
考慮以下程式碼示例
package io.projectreactor.samples;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class LoomSample {
public static void main(String[] args) {
Flux.using(
() -> Files.lines(Paths.get(ClassLoader.getSystemResource("testfile.txt").toURI())),
Flux::fromStream,
Stream::close
)
.subscribeOn(Schedulers.boundedElastic())
.map(v -> Thread.currentThread() + " " + v)
.log()
.blockLast();
}
}
上面的程式碼以響應式方式讀取文字檔案中的所有行。不幸的是,Files.lines
方法是系統 I/O 呼叫,已知會阻塞。因此,我們將所有這些操作排程到共享的 Schedulers.boundedElastic()
排程器上。Schedulers.boundedElastic()
是用於解除安裝系統中可能進行的任何阻塞呼叫的主要共享排程器,這已不是秘密。它既用於簡單的 HTTP 阻塞呼叫,也用於包裝一些不可避免的阻塞系統互動,例如生成隨機的 UUID
。但是,它預設使用 平臺 Thread
例項,這可能會增加系統的競爭。
現在,藉助 Java 21+ 和新的 reactor-core 3.6.x,新的 BoundedElasticThreadPerTaskScheduler
實現可以替換預設實現,以便在 Schedulers.boundedElastic()
中使用虛擬執行緒而不是平臺執行緒。您只需在 Java 21+ 上執行您的應用並設定 -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true
系統屬性即可。
您可能已經注意到,您將擁有一個承載計劃工作的 VirtualThread
例項。
正如您可能從我們自 Reactor 3.5.0 開始的早期部落格中聽到的那樣,我們引入了一種機制,用於在 handle
和 tap
等運算子中,從 Reactor Context
自動恢復 ThreadLocal
。後來,在 reactor 3.5.3 中,我們在 Project Reactor 中可用的所有運算子中添加了 ThreadLocal
值的自動恢復功能。
static final ThreadLocal<String> TRACE_ID = ThreadLocal.withInitial(() -> "");
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID); <1>
}
public static void main(String[] args) {
logger.info("Setting Trace ID test-123-567-890");
TRACE_ID.set("test-123-567-890"); <1>
Hooks.enableAutomaticContextPropagation(); <2>
Mono.fromCallable(() -> {
logger.info("[" + TRACE_ID.get() + "] Generating UUID"); <4>
return UUID.randomUUID();
})
.subscribeOn(Schedulers.boundedElastic()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Generated UUID " + v)) <5>
.block();
}
上面的程式碼生成一個隨機的 UUID
,它將阻塞生成過程 <3> 解除安裝到專門的工作器上。要啟用自動 ThreadLocal
傳播魔法,您需要在執行時提供 Micrometer Context Propagation 庫,註冊 <1> 所需的 ThreadLocal
例項,然後呼叫 Hooks
API <2> 來啟用此特定的傳播模式。如果我們檢查上面程式碼的輸出,我們會看到指定的 <1> TRACE_ID
ThreadLocal
在所有位置 <3> <4> 始終可用,無論 Thread
如何切換。
[ INFO] (main) Setting Trace ID test-123-567-890 <1>
[ INFO] (boundedElastic-1) [test-123-567-890] Generating UUID <2>
[ INFO] (boundedElastic-1) [test-123-567-890] Generated UUID baa79b8a-7808-4c27-a426-8464e4372269 <2>
Thread main
上設定了 Trace IDThread boundedElastic-1
上可用的相同 Trace ID儘管此機制已足夠接近每個人想要的,但它受到 Reactor 自有的生產者和轉換器的限制。要了解它可能無法完美工作的地方,讓我們修改上面的示例,並新增與外部基於 Reactive Streams 的庫(例如 JDK11 HttpClient
)的整合
static HttpClient jdkHttpClient = HttpClient.newHttpClient();
static {
ContextRegistry.getInstance()
.registerThreadLocalAccessor("TRACE_ID", TRACE_ID);
}
public static void main(String[] args) {
logger.info("Setting Trace ID");
TRACE_ID.set("test-123-567-890");
Hooks.enableAutomaticContextPropagation();
Mono.fromFuture(() -> {
logger.info("[" + TRACE_ID.get() + "] Preparing request");
return jdkHttpClient.sendAsync(HttpRequest.newBuilder() <1>
.uri(URI.create("https://httpbin.org/drip"))
.GET()
.build(),
HttpResponse.BodyHandlers.ofPublisher());
})
.flatMapMany(r -> {
logger.info("[" + TRACE_ID.get() + "] " + "Handling response[" + r.statusCode() + "] and reading body");
return FlowAdapters.toPublisher(r.body()); <2>
})
.collect(new ByteBufferToStringCollector()) <3>
.doOnNext(v -> logger.info("[" + TRACE_ID.get() + "] " + "Response body is " + v))
.block();
}
在修改後的示例中,我們進行網路呼叫 <1>,然後讀迴響應。響應體表示為 Flow.Publisher
<2>,我們將其展平並轉換為字串表示形式 <3>。此程式碼執行後,可能的輸出之一可能如下所示
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [] Response body is ********** <1>
從輸出中我們可以觀察到,使用 reactor 3.5.3+,消費外部 Publisher
可能會導致上下文丟失 <1>,因為我們不知道是否需要進行額外的提升來恢復丟失的 ThreadLocal
例項。
使用 reactor 3.6.x,此輸出始終一致
[ INFO] (main) Setting Trace ID test-123-567-890
[ INFO] (main) [test-123-567-890] Preparing request
[ INFO] (ForkJoinPool.commonPool-worker-1) [test-123-567-890] Handling response[200] and reading body
[ INFO] (HttpClient-1-Worker-0) [test-123-567-890] Response body is ********** <1>
在此版本中,我們增強了 ThreadLocal
值恢復機制,並添加了額外的邏輯來檢測任何外部 Publisher
實現。一旦檢測到這些實現,我們就會對它們進行裝飾,以確保您在管道中操作時永遠不會丟失 ThreadLocal
值。
在 reactor 3.6.x 中,我們採納了 multi-release jar (MRJ) 支援,並已經添加了改進,在可能的情況下消除了反射。我們計劃在未來的版本中擴充套件 MRJ 的使用,並利用所有 JDK9+ 的特性!
敬請關注!所有原始碼都可以在 Github 上找到