Reactor 除錯體驗

工程 | Sergei Egorov | 2019年3月28日 | ...

Project Reactor 團隊中,我們認為你所依賴庫的除錯體驗與例如功能集或效能同樣重要。

今天,我們很高興地宣佈 Reactor 家族中的兩個新的實驗性專案!

BlockHound - 初來乍到

初學者最常見的錯誤之一是阻塞那些應該只執行非阻塞程式碼的 Java 執行緒(例如,Schedulers.parallel())。
這是最有害的問題之一,因為它可能會阻塞不相關的處理,甚至導致死鎖!

考慮以下程式碼

Flux.range(0, Runtime.getRuntime().availableProcessors() * 2)
        .subscribeOn(Schedulers.parallel())
        .map(i -> {
            CountDownLatch latch = new CountDownLatch(1);

            Mono.delay(Duration.ofMillis(i * 100))
                .subscribe(it -> latch.countDown());

            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return i;
        })
        .blockLast();

執行這段程式碼需要多久?1 秒?10 秒?
如果我告訴你它永遠不會退出並會建立死鎖呢?原因如下:

  1. 我們建立 N * 2 個訊號,其中 N 是 JVM 可以使用的 CPU 數量。
  2. 我們使用 Schedulers.parallel 進行訂閱 - 這是一個限定大小的執行緒池,限制為 N 個執行緒。
  3. 對於每個訊號,我們都為 parallel 排程器安排另一個任務(Mono.delay 隱式地使用 Schedulers.parallel)。
  4. 我們的邏輯假設延遲很快就會被處理,並且閂鎖最終會解除阻塞。
  5. 然而,所有 N 個執行緒都會等待它們的閂鎖,而延遲任務將永遠不會被執行!

即使你沒有阻塞所有執行緒,而只阻塞了一部分,你也會阻止其他不相關的任務前進。最可能的結果是效能會下降。

當你正在 將舊的阻塞程式碼遷移到響應式方法時,這個問題尤其突出。即使是最有經驗的程式碼審查員也可能在你的 函式顏色相同時未能發現阻塞呼叫!

這就是我們建立 BlockHound 的原因 - 它是一個 Java Agent,用於檢測來自非阻塞執行緒的阻塞呼叫。與其他解決方案不同,它會修改原始方法(甚至是 native 方法!),使得即使使用反射也無法呼叫阻塞方法!

現在,如果按照 文件中的描述 將其新增到我們的應用程式中,我們將得到以下異常:

java.lang.Error: Blocking call! sun.misc.Unsafe#park
  at reactor.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
  at reactor.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
  at reactor.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
  at sun.misc.Unsafe.park(Unsafe.java)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
  at com.example.demo.BlockingCode.lambda$main$1(BlockingCode.java:24)

請注意,await 在內部呼叫了 Unsafe#park 來實現等待邏輯。我們不希望我們的執行緒被 park 或阻塞,BlockHound 會保護我們免受此害!

如果你想了解實現細節,請閱讀 工作原理 頁面。
TL;DR: 它包裝了原始方法,並且只添加了兩個方法呼叫。

你可以在測試環境或 QA/staging 環境中執行它,而不會損失效能。天哪(Holy Atom),考慮到它的低開銷,你甚至可以嘗試在生產環境中執行它! :)

BlockHound 可用於 Project Reactor 和 RxJava 2,並且你可以 編寫自己的整合

Reactor Debug agent - 生產就緒的組裝回溯

由於其函數語言程式設計特性,除錯響應式程式碼有時可能具有挑戰性:你不是精確地命令如何處理資料,而是宣告資料應該如何在系統中流動。這意味著宣告和執行發生在不同的時間點。

你可以在 Simon 的精彩文章中瞭解更多資訊:https://springframework.tw/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription

在 Reactor 中,我們稱之為“組裝時(Assembly time)”和“執行時(Execution time)”。在組裝時,你透過呼叫 myFlux.map(i -> i * 2).filter(5 % i == 1).single() 和其他運算子來“設計”你的管道。稍後,這個“管道定義”將被用來處理由 myFlux 釋出的訊號。但是當發生錯誤時會怎樣?

你們中的一些人可能已經知道 Hooks.onOperatorDebug()。這是 reactor-core 中一個非常有用的鉤子。它可以將堆疊跟蹤從這樣轉換

java.lang.IndexOutOfBoundsException: Source emitted more than one item
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
  at reactor.core.publisher.FluxRange$RangeSubscription.fastPath(FluxRange.java:129)
  at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:107)
  at reactor.core.publisher.MonoSingle$SingleSubscriber.request(MonoSingle.java:94)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onSubscribe(MonoSingle.java:114)
  at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
  at reactor.core.publisher.MonoSingle.subscribe(MonoSingle.java:58)
  at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
  at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
  at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

轉換成這樣

java.lang.IndexOutOfBoundsException: Source emitted more than one item
  at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
   ...
  at java.lang.Thread.run(Thread.java:748)
  Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
  reactor.core.publisher.Flux.single(Flux.java:7380)
  com.example.demo.Example.run(Example.java:13)
Error has been observed by the following operator(s):
  |_	Flux.single ⇢ com.example.demo.Example.run(Example.java:13)
  |_	Mono.subscribeOn ⇢ com.example.demo.Example.run(Example.java:14)

對於以下程式碼

9:  public class Example {
10:
11:     public static void run() {
12:        Flux.range(0, 5)
13:            .single() // <-- Aha!
14:            .subscribeOn(Schedulers.parallel())
15:            .block();
16:     }
17: }

如你所見,啟用除錯模式後,我們可以清楚地識別出發生錯誤的組裝操作。它就像一個堆疊跟蹤,但(由於執行與組裝分離)是一個回溯。

你可能會想:“太棒了,現在我想在生產環境中使用它!”——我們也是這麼想的。但是當你使用 Hooks.onOperatorDebug() 時,我們必須在組裝時進行繁重的堆疊遍歷,以便在你每次呼叫諸如 .map(...) 這樣的運算子時捕獲呼叫點,即使你的程式碼永遠不會丟擲錯誤!這是因為 Java 缺乏呼叫點跟蹤功能,唯一的替代方案是 new Exception().getStackTrace()StackWalker(在 Java 9+ 中)。

顯然,我們不能在生產環境中使用這種方法,所以我們為此建立了一個工具!

ReactorDebugAgent 是來自 reactor-tools 專案 的一個 Java agent,它可以幫助你在應用程式中除錯異常,而無需付出執行時開銷(與 Hooks.onOperatorDebug() 不同)。

⚠️ 該專案正處於孵化階段,未來可能成為一個獨立專案或 https://github.com/reactor/reactor-core 的一個模組,也可能不會。

它(透過位元組碼轉換)轉換鏈式呼叫,例如

Flux.range(0, 5)
       .single()

轉換為

Flux flux = Flux.range(0, 5);
flux = Hooks.addCallSiteInfo(flux, "Flux.range\n foo.Bar.baz(Bar.java:21)"));
flux = flux.single();
flux = Hooks.addCallSiteInfo(flux, "Flux.single\n foo.Bar.baz(Bar.java:22)"));

要啟用它,你需要先初始化 agent

ReactorDebugAgent.init();

ℹ️ 由於此實現會在載入你的類時對其進行修改,因此最佳位置是將其放在你的 main(String[]) 方法中的所有其他內容之前

public static void main(String[] args) {
    ReactorDebugAgent.init();
    SpringApplication.run(Application.class, args);
}

結論

我們希望這些工具能讓你的開發者生涯更輕鬆,並讓你在使用 Project Reactor 時感到更舒適!

獲取 Spring 新聞簡報

透過 Spring 新聞簡報保持聯絡

訂閱

先行一步

VMware 提供培訓和認證,為你的進步加速。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部