領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多在 Project Reactor 團隊,我們認為您所依賴的庫的除錯體驗與功能集或效能同樣重要。
今天,我們很高興宣佈 Reactor 家族的兩個新實驗專案!
最常見的初學者錯誤之一是阻塞那些本應只執行非阻塞程式碼的 Java 執行緒(例如,Schedulers.parallel())。
這是一個最具破壞性的問題,因為您可能會阻塞不相關的處理甚至造成死鎖!
考慮以下程式碼
Flux.range(0, Runtime.getRuntime().availableProcessors() * 2)
.subscribeOn(Schedulers.parallel())
.map(i -> {
CountDownLatch latch = new CountDownLatch(1);
Mono.delay(Duration.ofMillis(i * 100))
.subscribe(it -> latch.countDown());
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i;
})
.blockLast();
執行這段程式碼需要多久?1秒?10秒?
如果我告訴你它永遠不會退出並且會造成死鎖呢?原因如下:
N * 2 個訊號,其中 N 是 JVM 可以使用的 CPU 數量。Schedulers.parallel 進行訂閱——這是一個有界池,限制為 N 個執行緒。Mono.delay 隱式使用 Schedulers.parallel)。即使您不阻塞所有執行緒而只阻塞部分執行緒,您也會阻止其他不相關的任務的推進。最可能的結果是效能會下降。
當您將舊的阻塞程式碼遷移到響應式方法時,這個問題尤其明顯。即使是最有經驗的程式碼審查員也可能無法發現阻塞呼叫,因為您的函式顏色相同!
這就是我們建立 BlockHound 的原因——一個 Java 代理,用於檢測非阻塞執行緒中的阻塞呼叫。與其他解決方案不同,它會檢測原始方法(甚至是本地方法!),並且沒有辦法呼叫阻塞方法,即使使用反射也不行!
現在,如果我們將它新增到我們的應用程式中,如文件中所述,我們將得到以下異常:
java.lang.Error: Blocking call! sun.misc.Unsafe#park
at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
at sun.misc.Unsafe.park(Unsafe.java)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at com.example.demo.BlockingCode.lambda$main$1(BlockingCode.java:24)
請注意,await 在內部為等待邏輯呼叫 Unsafe#park。我們不希望我們的執行緒被暫停或阻塞,BlockHound 可以保護我們免受此影響!
如果您想了解實現細節,請閱讀工作原理頁面。
TL;DR: 它包裝了原始方法,並僅向它們添加了兩個方法呼叫。
您可以在測試中或在您的 QA/staging 環境中執行它,而不會損失效能。天哪,鑑於其低開銷,您甚至可以嘗試在生產環境中執行它! :)
BlockHound 適用於 Project Reactor 和 RxJava 2,您可以編寫自己的整合。
除錯響應式程式碼有時會很有挑戰性,因為它具有函數語言程式設計的特性:您不是精確地命令要對資料執行什麼操作,而是宣告資料應該如何在系統中流動。這意味著宣告和執行發生在不同的時間點。
您可以在 Simon 的精彩文章中閱讀更多相關資訊:https://springframework.tw/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription
在 Reactor 中,我們稱之為“組裝時間”和“執行時間”。在組裝時間,您透過呼叫 myFlux.map(i -> i * 2).filter(5 % i == 1).single() 和其他運算子來“設計”您的管道。稍後,這個“管道定義”將用於處理由 myFlux 釋出的訊號。但是當發生錯誤時會發生什麼呢?
你們中的一些人可能已經知道 Hooks.onOperatorDebug()。這是 reactor-core 中一個非常有用的鉤子。它將堆疊跟蹤從這樣
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107)
at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
轉換為這樣
java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
...
at java.lang.Thread.run(Thread.java:748)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:7380)
com.example.demo.Example.run(Example.java:13)
Error has been observed by the following operator(s):
|_ Flux.single ⇢ com.example.demo.Example.run(Example.java:13)
|_ Mono.subscribeOn ⇢ com.example.demo.Example.run(Example.java:14)
對於以下程式碼
9: public class Example {
10:
11: public static void run() {
12: Flux.range(0, 5)
13: .single() // <-- Aha!
14: .subscribeOn(Schedulers.parallel())
15: .block();
16: }
17: }
正如您所看到的,啟用除錯模式後,我們可以清楚地識別發生錯誤的組裝操作。它就像一個堆疊跟蹤,但是(由於執行與組裝分離)它是一個回溯。
您可能會想:“太好了,現在我想在生產環境中使用它!”——我們也是。但是當您使用 Hooks.onOperatorDebug() 時,即使您的程式碼永遠不會丟擲錯誤,我們也必須在組裝時執行繁重的堆疊遍歷,以便在您每次呼叫 .map(...) 等運算子時捕獲呼叫站點!這是由於 Java 中缺乏呼叫站點跟蹤,其中唯一的替代方案是 new Exception().getStackTrace() 或 StackWalker(在 Java 9+ 中)。
顯然,我們不能在生產環境中使用這種方法,所以我們為此製作了一個工具!
來自 reactor-tools 專案的 ReactorDebugAgent 是一個 Java 代理,可以幫助您除錯應用程式中的異常,而無需支付執行時成本(與 Hooks.onOperatorDebug() 不同)。
⚠️ 此專案處於孵化階段,將來可能會也可能不會成為一個獨立的M專案或 https://github.com/reactor/reactor-core 的模組。
它透過位元組碼轉換將鏈式呼叫從這樣
Flux.range(0, 5)
.single()
轉換為
Flux flux = Flux.range(0, 5);
flux = Hooks.addCallSiteInfo(flux, "Flux.range\n foo.Bar.baz(Bar.java:21)"));
flux = flux.single();
flux = Hooks.addCallSiteInfo(flux, "Flux.single\n foo.Bar.baz(Bar.java:22)"));
要啟用它,您需要首先初始化代理
ReactorDebugAgent.init();
ℹ️ 由於實現會在類載入時檢測您的類,因此最佳放置位置是在 main(String[]) 方法中的所有其他內容之前。
public static void main(String[] args) {
ReactorDebugAgent.init();
SpringApplication.run(Application.class, args);
}
我們希望這些工具能讓您作為開發人員的生活更輕鬆,並讓您更舒適地使用 Project Reactor!