Flux 之飛翔 2 - 除錯注意事項

工程 | Simon Baslé | 2019年4月16日 | ...

這篇博文是系列文章中的第二篇,旨在深入探討Reactor更高階的概念和內部工作原理。

它源於我的 Flight of the Flux 演講,我發現其內容更適合博文格式。

我將在其他文章釋出時更新下表中的連結,但這是計劃中的內容

  1. 組裝與訂閱
  2. 除錯注意事項(本文)
  3. 跳躍執行緒和排程器
  4. 內部工作原理:工作竊取
  5. 內部工作原理:運算子融合

如果您錯過了 Reactive Streams 和 Reactor 的基本概念介紹,請訪問網站的學習部分參考指南

話不多說,讓我們開始吧

在 Reactive 世界中除錯

從命令式、阻塞式正規化切換到反應式、非阻塞式正規化會帶來好處,但也會帶來一些問題。其中之一就是除錯體驗。這是為什麼呢?

主要是因為您已經習慣了依賴老式的 堆疊跟蹤,但突然之間,由於反應式程式設計的非同步特性,這個寶貴的工具變得不那麼有價值了。但這並非反應式程式設計所特有:一旦引入非同步程式碼,您就會在程式中建立一個邊界,將排程程式碼和非同步執行程式碼分隔開來。

用普通非同步程式碼演示問題

我們以 ExecutorServiceFuture 為例(這裡沒有 Reactor 程式碼)

	private static void imperative() throws ExecutionException, InterruptedException {
		final ScheduledExecutorService executor =
				Executors.newSingleThreadScheduledExecutor();

		int seconds = LocalTime.now().getSecond();
		List<Integer> source;
		if (seconds % 2 == 0) {
			source = IntStream.range(1, 11).boxed().collect(Collectors.toList());
		}
		else if (seconds % 3 == 0) {
			source = IntStream.range(0, 4).boxed().collect(Collectors.toList());
		}
		else {
			source = Arrays.asList(1, 2, 3, 4);
		}

		executor.submit(() -> source.get(5))  //line 76
		        .get();
	}

這個例子有點做作,但假設我們程式碼中有三分之二的路徑會導致非同步任務丟擲 IndexOutOfBoundsException... 堆疊跟蹤會有多大幫助呢?

java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at Scratch.imperative(Scratch.java:77)
	at Scratch.main(Scratch.java:50)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
	at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351)
	at Scratch.lambda$imperative$0(Scratch.java:76)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

我們看到

  • Futureget() 方法丟擲了 ExecutionException
  • 原因是 IndexOutOfBoundsException
  • 丟擲異常的程式碼在 submit(() -> source.get(5)) lambda 的第 76 行
  • 它在 FutureTask 中執行,來自一個名為 ThreadPoolExecutor 的東西,它本身在 Thread 中執行...
  • 我們有兩個潛在的源頭可能導致這種情況,但不知道哪個是罪魁禍首(在呼叫 submit() 之前採用了哪條路徑)。

用處不大 :-(

在 Reactor 中演示問題

如果我們尋找上面程式碼的 Reactor 等效項,我們可以得到以下程式碼

	private static void reactive() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5);
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5);
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5);
		}

		source.subscribeOn(Schedulers.parallel())
		      .block(); //line 97
	}

這將觸發以下堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1495)
		at Scratch.reactive(Scratch.java:97)
		at Scratch.main(Scratch.java:51)
  • 我們再次看到了 ArrayIndexOutOfBoundsException,這暗示源對於 MonoElementAt 運算子來說太短了
  • 我們看到它來自 onComplete,本身由 request 觸發...以及 reactor.core.publisher 中的一堆其他步驟
  • 對這些 reactor 方法稍有熟悉,我們可能會推斷出管道由 range (FluxRange.subscribe)、elementAtsubscribeOn 組成...
  • 丟擲程式碼似乎是從 ThreadPoolExecutor 的工作 Thread 執行的
  • 線索在這裡中斷...

更糟糕的是,即使我們去掉了 subscribeOn,我們仍然無法發現是哪條可能的錯誤路徑被觸發了

	private static void reactiveNoSubscribeOn() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5);
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5);
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5);
		}

		source.block(); //line 116
	}

給出堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.reactiveNoSubscribeOn(Scratch.java:116)
	at Scratch.main(Scratch.java:52)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1495)
		... 2 more

這是因為,正如我們之前看到的,在組裝訂閱之間存在一個額外的“邊界”。線索只能追溯到訂閱點(這裡是 block()):-(

因此,在非同步世界中,使用堆疊跟蹤進行分析和除錯變得更加困難,在 Reactor 中甚至更困難(因為它既是非同步的,又採用預設的懶惰組裝與訂閱方法)。但幸運的是,庫中提供了工具來緩解這一事實。

讓事情變得更好

回到經典:log

還記得你在命令式程式碼中散佈 print 語句嗎?它可能不如啟動單步偵錯程式酷炫,但有時它就是你需要的快速而粗暴的解決方案。

在 Reactor 中,您有 log() 運算子

  • 它會記錄 Reactive Stream 訊號:onNextonCompleteonError(甚至包括 onSubscribecancelrequest!)
  • 您可以對其進行調整,只允許這些訊號中的一部分
  • 您也可以選擇特定的 Logger

簡而言之,log 是一個快速而粗糙的解決方案,可以輕鬆地鳥瞰序列中某個步驟的發生情況。在開發過程中盡情使用它,並可以為每個 log 呼叫指定一個“名稱”以區分它們。

使用 log(String) 可以用來暗示哪個源導致了錯誤

	private static void log() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5)
			             .log("source A");
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5)
			             .log("source B");
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5)
			             .log("source C");
		}

		source.block(); //line 138
	}

堆疊跟蹤本身並沒有更有趣(除了提到 MonoLogFuseable 類,但日誌本身包含這個有趣的小細節)

17:01:23.711 [main] INFO  source C - | onSubscribe([Fuseable] MonoElementAt.ElementAtSubscriber)
17:01:23.716 [main] INFO  source C - | request(unbounded)
17:01:23.717 [main] ERROR source C - | onError(java.lang.IndexOutOfBoundsException)
17:01:23.721 [main] ERROR source C - 
java.lang.IndexOutOfBoundsException: null

至少我們得到了我們硬編碼的 source C 標籤...

使用除錯模式豐富堆疊跟蹤

Reactor 中可用的另一種方法是嘗試在執行時堆疊跟蹤中找回組裝資訊。

這可以透過 Hooks 類啟用所謂的“除錯模式”來完成

Hooks.onOperatorDebug();

它有什麼作用?它使每個運算子例項化(即組裝)都捕獲一個堆疊跟蹤並保留它以供以後使用。

如果 onError 到達某個運算子,它會將該組裝堆疊跟蹤附加到 onErrorThrowable 上(作為抑制的 Exception)。因此,當您看到堆疊跟蹤時,您將獲得更完整的執行時和組裝資訊。

在除錯模式開啟的情況下,在我們之前的示例中,我們將能夠看到採用了哪個組裝路徑以及實際處理了哪個源

	private static void hook() {
		Hooks.onOperatorDebug();
		try {
			int seconds = LocalTime.now().getSecond();
			Mono<Integer> source;
			if (seconds % 2 == 0) {
				source = Flux.range(1, 10)
				             .elementAt(5); //line 149
			}
			else if (seconds % 3 == 0) {
				source = Flux.range(0, 4)
				             .elementAt(5); //line 153
			}
			else {
				source = Flux.just(1, 2, 3, 4)
				             .elementAt(5); //line 157
			}

			source.block(); //line 160
		}
		finally {
			Hooks.resetOnOperatorDebug();
		}
	}

這會產生以下堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
(...)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.hook(Scratch.java:160)
	at Scratch.main(Scratch.java:54)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoElementAt] :
	reactor.core.publisher.Flux.elementAt(Flux.java:4367)
	Scratch.hook(Scratch.java:157)
Error has been observed by the following operator(s):
	|_	Flux.elementAt ⇢ Scratch.hook(Scratch.java:157)

注意最後一行了嗎?太棒了 :-D

使用 checkpoint 降低成本

使用 Hooks.onOperatorDebug() 的一個缺點是,它會為應用程式中使用的每個運算子捕獲組裝堆疊跟蹤。填充單個堆疊跟蹤是一個昂貴的操作,因此不言而喻,這可能會對效能產生重大影響。因此,這僅建議在開發環境中使用。

幸運的是,如果您識別出程式碼庫中容易出現那種源模糊不清的部分,您可以稍微降低成本。

透過使用 checkpoint() 運算子,可以僅在該程式碼庫的特定點啟用組裝跟蹤捕獲。如果您使用 checkpoint(String) 為檢查點提供一個唯一且有意義的名稱,甚至可以完全不填充堆疊跟蹤

	private static void checkpoint() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5)
			             .checkpoint("source range(1,10)");
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5)
			             .checkpoint("source range(0,4)");
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5)
			             .checkpoint("source just(1,2,3,4)");
		}

		source.block(); //line 186
	}

這將產生以下堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:438)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:422)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:61)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.checkpoint(Scratch.java:186)
	at Scratch.main(Scratch.java:55)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly site of producer [reactor.core.publisher.MonoElementAt] is identified by light checkpoint [source just(1,2,3,4)].

注意到最後一行了嗎?is identified by light checkpoint [source just(1,2,3,4)].,這告訴我們罪魁禍首(因為我們為檢查點使用了有意義的描述)。

結論

在本文中,我們瞭解到在非同步程式設計中,堆疊跟蹤可能不那麼有用。這種影響因 Reactor 允許您以惰性方式構建反應式序列而進一步加劇。

我們研究了可能遇到的最壞情況以及減輕此問題的幾種方法。

完整的程式碼可以在 這裡 的 gist 中找到。

在下一篇中,我們將學習排程器以及如何在不同執行緒之間切換。

與此同時,祝您反應式程式設計愉快!

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有