領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多這篇博文是系列文章中的第二篇,旨在深入探討Reactor更高階的概念和內部工作原理。
它源於我的 Flight of the Flux 演講,我發現其內容更適合博文格式。
我將在其他文章釋出時更新下表中的連結,但這是計劃中的內容
如果您錯過了 Reactive Streams 和 Reactor 的基本概念介紹,請訪問網站的學習部分和參考指南。
話不多說,讓我們開始吧
從命令式、阻塞式正規化切換到反應式、非阻塞式正規化會帶來好處,但也會帶來一些問題。其中之一就是除錯體驗。這是為什麼呢?
主要是因為您已經習慣了依賴老式的 堆疊跟蹤,但突然之間,由於反應式程式設計的非同步特性,這個寶貴的工具變得不那麼有價值了。但這並非反應式程式設計所特有:一旦引入非同步程式碼,您就會在程式中建立一個邊界,將排程程式碼和非同步執行程式碼分隔開來。
我們以 ExecutorService 和 Future 為例(這裡沒有 Reactor 程式碼)
private static void imperative() throws ExecutionException, InterruptedException {
final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor();
int seconds = LocalTime.now().getSecond();
List<Integer> source;
if (seconds % 2 == 0) {
source = IntStream.range(1, 11).boxed().collect(Collectors.toList());
}
else if (seconds % 3 == 0) {
source = IntStream.range(0, 4).boxed().collect(Collectors.toList());
}
else {
source = Arrays.asList(1, 2, 3, 4);
}
executor.submit(() -> source.get(5)) //line 76
.get();
}
這個例子有點做作,但假設我們程式碼中有三分之二的路徑會導致非同步任務丟擲 IndexOutOfBoundsException... 堆疊跟蹤會有多大幫助呢?
java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
at Scratch.imperative(Scratch.java:77)
at Scratch.main(Scratch.java:50)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351)
at Scratch.lambda$imperative$0(Scratch.java:76)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我們看到
Future 的 get() 方法丟擲了 ExecutionExceptionIndexOutOfBoundsExceptionsubmit(() -> source.get(5)) lambda 的第 76 行FutureTask 中執行,來自一個名為 ThreadPoolExecutor 的東西,它本身在 Thread 中執行...submit() 之前採用了哪條路徑)。用處不大 :-(
如果我們尋找上面程式碼的 Reactor 等效項,我們可以得到以下程式碼
private static void reactive() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5);
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5);
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5);
}
source.subscribeOn(Schedulers.parallel())
.block(); //line 97
}
這將觸發以下堆疊跟蹤
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1495)
at Scratch.reactive(Scratch.java:97)
at Scratch.main(Scratch.java:51)
ArrayIndexOutOfBoundsException,這暗示源對於 MonoElementAt 運算子來說太短了onComplete,本身由 request 觸發...以及 reactor.core.publisher 中的一堆其他步驟range (FluxRange.subscribe)、elementAt 和 subscribeOn 組成...ThreadPoolExecutor 的工作 Thread 執行的更糟糕的是,即使我們去掉了 subscribeOn,我們仍然無法發現是哪條可能的錯誤路徑被觸發了
private static void reactiveNoSubscribeOn() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5);
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5);
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5);
}
source.block(); //line 116
}
給出堆疊跟蹤
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.reactiveNoSubscribeOn(Scratch.java:116)
at Scratch.main(Scratch.java:52)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
at reactor.core.publisher.Mono.block(Mono.java:1495)
... 2 more
這是因為,正如我們之前看到的,在組裝和訂閱之間存在一個額外的“邊界”。線索只能追溯到訂閱點(這裡是 block()):-(
因此,在非同步世界中,使用堆疊跟蹤進行分析和除錯變得更加困難,在 Reactor 中甚至更困難(因為它既是非同步的,又採用預設的懶惰組裝與訂閱方法)。但幸運的是,庫中提供了工具來緩解這一事實。
log還記得你在命令式程式碼中散佈 print 語句嗎?它可能不如啟動單步偵錯程式酷炫,但有時它就是你需要的快速而粗暴的解決方案。
在 Reactor 中,您有 log() 運算子
onNext、onComplete、onError(甚至包括 onSubscribe、cancel 和 request!)Logger簡而言之,log 是一個快速而粗糙的解決方案,可以輕鬆地鳥瞰序列中某個步驟的發生情況。在開發過程中盡情使用它,並可以為每個 log 呼叫指定一個“名稱”以區分它們。
使用 log(String) 可以用來暗示哪個源導致了錯誤
private static void log() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5)
.log("source A");
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5)
.log("source B");
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5)
.log("source C");
}
source.block(); //line 138
}
堆疊跟蹤本身並沒有更有趣(除了提到 MonoLogFuseable 類,但日誌本身包含這個有趣的小細節)
17:01:23.711 [main] INFO source C - | onSubscribe([Fuseable] MonoElementAt.ElementAtSubscriber)
17:01:23.716 [main] INFO source C - | request(unbounded)
17:01:23.717 [main] ERROR source C - | onError(java.lang.IndexOutOfBoundsException)
17:01:23.721 [main] ERROR source C -
java.lang.IndexOutOfBoundsException: null
至少我們得到了我們硬編碼的 source C 標籤...
Reactor 中可用的另一種方法是嘗試在執行時堆疊跟蹤中找回組裝資訊。
這可以透過 Hooks 類啟用所謂的“除錯模式”來完成
Hooks.onOperatorDebug();
它有什麼作用?它使每個運算子例項化(即組裝)都捕獲一個堆疊跟蹤並保留它以供以後使用。
如果 onError 到達某個運算子,它會將該組裝堆疊跟蹤附加到 onError 的 Throwable 上(作為抑制的 Exception)。因此,當您看到堆疊跟蹤時,您將獲得更完整的執行時和組裝資訊。
在除錯模式開啟的情況下,在我們之前的示例中,我們將能夠看到採用了哪個組裝路徑以及實際處理了哪個源
private static void hook() {
Hooks.onOperatorDebug();
try {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5); //line 149
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5); //line 153
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5); //line 157
}
source.block(); //line 160
}
finally {
Hooks.resetOnOperatorDebug();
}
}
這會產生以下堆疊跟蹤
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
(...)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.hook(Scratch.java:160)
at Scratch.main(Scratch.java:54)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoElementAt] :
reactor.core.publisher.Flux.elementAt(Flux.java:4367)
Scratch.hook(Scratch.java:157)
Error has been observed by the following operator(s):
|_ Flux.elementAt ⇢ Scratch.hook(Scratch.java:157)
注意最後一行了嗎?太棒了 :-D
checkpoint 降低成本使用 Hooks.onOperatorDebug() 的一個缺點是,它會為應用程式中使用的每個運算子捕獲組裝堆疊跟蹤。填充單個堆疊跟蹤是一個昂貴的操作,因此不言而喻,這可能會對效能產生重大影響。因此,這僅建議在開發環境中使用。
幸運的是,如果您識別出程式碼庫中容易出現那種源模糊不清的部分,您可以稍微降低成本。
透過使用 checkpoint() 運算子,可以僅在該程式碼庫的特定點啟用組裝跟蹤捕獲。如果您使用 checkpoint(String) 為檢查點提供一個唯一且有意義的名稱,甚至可以完全不填充堆疊跟蹤
private static void checkpoint() {
int seconds = LocalTime.now().getSecond();
Mono<Integer> source;
if (seconds % 2 == 0) {
source = Flux.range(1, 10)
.elementAt(5)
.checkpoint("source range(1,10)");
}
else if (seconds % 3 == 0) {
source = Flux.range(0, 4)
.elementAt(5)
.checkpoint("source range(0,4)");
}
else {
source = Flux.just(1, 2, 3, 4)
.elementAt(5)
.checkpoint("source just(1,2,3,4)");
}
source.block(); //line 186
}
這將產生以下堆疊跟蹤
java.lang.IndexOutOfBoundsException
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:438)
at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:422)
at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:61)
at reactor.core.publisher.Mono.block(Mono.java:1494)
at Scratch.checkpoint(Scratch.java:186)
at Scratch.main(Scratch.java:55)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly site of producer [reactor.core.publisher.MonoElementAt] is identified by light checkpoint [source just(1,2,3,4)].
注意到最後一行了嗎?is identified by light checkpoint [source just(1,2,3,4)].,這告訴我們罪魁禍首(因為我們為檢查點使用了有意義的描述)。
在本文中,我們瞭解到在非同步程式設計中,堆疊跟蹤可能不那麼有用。這種影響因 Reactor 允許您以惰性方式構建反應式序列而進一步加劇。
我們研究了可能遇到的最壞情況以及減輕此問題的幾種方法。
完整的程式碼可以在 這裡 的 gist 中找到。
在下一篇中,我們將學習排程器以及如何在不同執行緒之間切換。
與此同時,祝您反應式程式設計愉快!