飛行 Flux 2 - 除錯注意事項

工程 | Simon Baslé | April 16, 2019 | ...

這篇部落格文章是系列文章的第二篇,旨在更深入地探討 Reactor 的高階概念和內部工作原理。

它源於我的 Flight of the Flux 演講,我發現其內容更適合部落格文章的形式。

其他文章釋出後,我將更新下表中的連結,但這是計劃中的內容

  1. 裝配 vs 訂閱
  2. 除錯注意事項(本文)
  3. 執行緒和排程器切換
  4. 內部工作原理:工作竊取
  5. 內部工作原理:運算子融合

如果您想了解 Reactive Streams 的介紹以及 Reactor 的基本概念,請前往網站的 學習部分參考指南

事不宜遲,讓我們開始吧

在響應式世界中除錯

從命令式、阻塞正規化切換到響應式、非阻塞正規化帶來了好處,但也伴隨一些注意事項。其中之一就是除錯體驗。為什麼呢?

主要是因為您已經習慣依賴於經典的 堆疊跟蹤,但由於響應式程式設計的 非同步 特性,這個寶貴的工具突然變得價值大減。然而,這並非響應式程式設計所特有:一旦引入非同步程式碼,您就會在程式中建立排程程式碼和非同步執行程式碼之間的邊界。

用普通非同步程式碼演示問題

讓我們來看一個使用 ExecutorServiceFuture 的例子(這裡沒有 Reactor 程式碼)

	private static void imperative() throws ExecutionException, InterruptedException {
		final ScheduledExecutorService executor =
				Executors.newSingleThreadScheduledExecutor();

		int seconds = LocalTime.now().getSecond();
		List<Integer> source;
		if (seconds % 2 == 0) {
			source = IntStream.range(1, 11).boxed().collect(Collectors.toList());
		}
		else if (seconds % 3 == 0) {
			source = IntStream.range(0, 4).boxed().collect(Collectors.toList());
		}
		else {
			source = Arrays.asList(1, 2, 3, 4);
		}

		executor.submit(() -> source.get(5))  //line 76
		        .get();
	}

這個例子有點牽強,但讓我們想象一下,在程式碼中有三條路徑中的兩條可能導致非同步任務丟擲 IndexOutOfBoundsException... 堆疊跟蹤會有多大幫助呢?

java.util.concurrent.ExecutionException: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
	at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
	at Scratch.imperative(Scratch.java:77)
	at Scratch.main(Scratch.java:50)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 5 out of bounds for length 4
	at java.base/java.util.Arrays$ArrayList.get(Arrays.java:4351)
	at Scratch.lambda$imperative$0(Scratch.java:76)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

我們看到

  • Futureget() 方法丟擲了 ExecutionException
  • 原因是 IndexOutOfBoundsException
  • 丟擲異常的程式碼位於 submit(() -> source.get(5)) lambda 表示式 第 76 行
  • 它在 FutureTask 中執行,來自於一個叫做 ThreadPoolExecutor 的東西,它本身又執行在 Thread 中...
  • 我們有兩個潛在的源可能導致這個問題,但不知道是哪一個罪魁禍首(在呼叫 submit() 之前的測試中採用了哪條路徑)。

不太有用 :-(

在 Reactor 中演示問題

如果我們在 Reactor 中尋找上述程式碼的等價實現,我們可以得到這個

	private static void reactive() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5);
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5);
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5);
		}

		source.subscribeOn(Schedulers.parallel())
		      .block(); //line 97
	}

這會觸發以下堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.trySchedule(MonoSubscribeOn.java:186)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.onSubscribe(MonoSubscribeOn.java:131)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3711)
	at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1495)
		at Scratch.reactive(Scratch.java:97)
		at Scratch.main(Scratch.java:51)
  • 我們再次看到了 ArrayIndexOutOfBoundsException,這暗示著源對於 MonoElementAt 運算子來說太短了
  • 我們看到它來自於一個 onComplete 事件,該事件本身由 request 觸發... 以及 reactor.core.publisher 中的一堆其他步驟
  • 對這些 reactor 方法稍加熟悉後,我們或許可以推斷出管道由 range (FluxRange.subscribe)、elementAtsubscribeOn 組成...
  • 丟擲異常的程式碼似乎是在 ThreadPoolExecutor 的工作 Thread 中執行的
  • 線索在這裡中斷了...

更糟的是,即使我們去掉了 subscribeOn,我們仍然無法發現觸發的是兩種可能的錯誤路徑中的哪一個

	private static void reactiveNoSubscribeOn() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5);
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5);
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5);
		}

		source.block(); //line 116
	}

產生堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.reactiveNoSubscribeOn(Scratch.java:116)
	at Scratch.main(Scratch.java:52)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93)
		at reactor.core.publisher.Mono.block(Mono.java:1495)
		... 2 more

這是因為,正如我們之前所見,程式碼在 裝配訂閱 之間存在一個額外的“邊界”。線索只能追溯到 訂閱 的點(這裡是 block()):-(

因此,在非同步世界中使用堆疊跟蹤進行分析和除錯會更困難,而在 Reactor 中甚至更難(因為它既是非同步的,又採用預設惰性的裝配 vs 訂閱方式)。但幸運的是,庫中提供了一些工具來嘗試緩解這一問題。

改進方法

迴歸經典:log

還記得您在命令式程式碼中散佈 print 語句的日子嗎?它可能不像啟動步進偵錯程式那樣酷炫,但有時它正是您需要的快速而粗糙的解決方案。

在 Reactor 中,您有 log() 運算子

  • 它會記錄 Reactive Stream 訊號:onNextonCompleteonError(甚至包括 onSubscribecancelrequest!)
  • 您可以調整它,只白名單其中的一部分訊號
  • 您也可以選擇一個特定的 Logger

簡而言之,log 是獲取序列中某一步驟正在發生的情況的便捷概覽(鳥瞰圖)的快速而粗糙的解決方案。在開發過程中請隨意使用它,並可以透過為每個 log 呼叫指定一個“名稱”來區分它們。

使用 log(String) 可以用來獲取導致錯誤的源的提示

	private static void log() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5)
			             .log("source A");
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5)
			             .log("source B");
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5)
			             .log("source C");
		}

		source.block(); //line 138
	}

堆疊跟蹤本身並沒有更有趣的資訊(除了提到了 MonoLogFuseable 類之外),但日誌本身包含了這個有趣的小細節

17:01:23.711 [main] INFO  source C - | onSubscribe([Fuseable] MonoElementAt.ElementAtSubscriber)
17:01:23.716 [main] INFO  source C - | request(unbounded)
17:01:23.717 [main] ERROR source C - | onError(java.lang.IndexOutOfBoundsException)
17:01:23.721 [main] ERROR source C - 
java.lang.IndexOutOfBoundsException: null

至少我們得到了硬編碼的 source C 標籤...

使用除錯模式豐富堆疊跟蹤

Reactor 中另一種可用的方法是嘗試在執行時堆疊跟蹤中找回裝配資訊。

這可以透過 Hooks 類啟用所謂的“除錯模式”來實現

Hooks.onOperatorDebug();

它有什麼作用?它使得每個運算子例項化(即裝配)捕獲一個堆疊跟蹤並保留下來供以後使用。

如果一個 onError 到達某個運算子,它會將該裝配堆疊跟蹤附加到 onErrorThrowable 上(作為 suppressed Exception)。結果是,當您檢視堆疊跟蹤時,您將獲得執行時和裝配時更完整的檢視。

開啟除錯模式後,在我們之前的例子中,我們將能夠看到採用了哪條裝配路徑以及實際處理的是哪個源

	private static void hook() {
		Hooks.onOperatorDebug();
		try {
			int seconds = LocalTime.now().getSecond();
			Mono<Integer> source;
			if (seconds % 2 == 0) {
				source = Flux.range(1, 10)
				             .elementAt(5); //line 149
			}
			else if (seconds % 3 == 0) {
				source = Flux.range(0, 4)
				             .elementAt(5); //line 153
			}
			else {
				source = Flux.just(1, 2, 3, 4)
				             .elementAt(5); //line 157
			}

			source.block(); //line 160
		}
		finally {
			Hooks.resetOnOperatorDebug();
		}
	}

這會產生以下堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
(...)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.hook(Scratch.java:160)
	at Scratch.main(Scratch.java:54)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoElementAt] :
	reactor.core.publisher.Flux.elementAt(Flux.java:4367)
	Scratch.hook(Scratch.java:157)
Error has been observed by the following operator(s):
	|_	Flux.elementAt ⇢ Scratch.hook(Scratch.java:157)

注意最後一行了嗎?耶 :-D

使用 checkpoint 降低開銷

使用 Hooks.onOperatorDebug() 的一個缺點是它會 為應用程式中使用的每一個運算子 執行裝配堆疊跟蹤捕獲。填充單個堆疊跟蹤是一項昂貴的操作,因此毫無疑問這會對效能產生嚴重影響。因此,這隻推薦在開發環境中使用。

幸運的是,如果您能識別出程式碼庫中容易出現此類源模糊性的部分,可以稍微降低開銷。

透過使用 checkpoint() 運算子,可以在程式碼庫的特定點才啟用裝配跟蹤捕獲。如果您使用 checkpoint(String) 為檢查點提供一個唯一且有意義的名稱,甚至可以完全不做堆疊跟蹤填充。

	private static void checkpoint() {
		int seconds = LocalTime.now().getSecond();
		Mono<Integer> source;
		if (seconds % 2 == 0) {
			source = Flux.range(1, 10)
			             .elementAt(5)
			             .checkpoint("source range(1,10)");
		}
		else if (seconds % 3 == 0) {
			source = Flux.range(0, 4)
			             .elementAt(5)
			             .checkpoint("source range(0,4)");
		}
		else {
			source = Flux.just(1, 2, 3, 4)
			             .elementAt(5)
			             .checkpoint("source just(1,2,3,4)");
		}

		source.block(); //line 186
	}

這會產生以下堆疊跟蹤

java.lang.IndexOutOfBoundsException
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onComplete(MonoElementAt.java:153)
	at reactor.core.publisher.FluxArray$ArraySubscription.fastPath(FluxArray.java:176)
	at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:96)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.request(MonoElementAt.java:92)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:438)
	at reactor.core.publisher.BlockingSingleSubscriber.onSubscribe(BlockingSingleSubscriber.java:49)
	at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:422)
	at reactor.core.publisher.MonoElementAt$ElementAtSubscriber.onSubscribe(MonoElementAt.java:107)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:53)
	at reactor.core.publisher.FluxArray.subscribe(FluxArray.java:59)
	at reactor.core.publisher.MonoElementAt.subscribe(MonoElementAt.java:59)
	at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:61)
	at reactor.core.publisher.Mono.block(Mono.java:1494)
	at Scratch.checkpoint(Scratch.java:186)
	at Scratch.main(Scratch.java:55)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly site of producer [reactor.core.publisher.MonoElementAt] is identified by light checkpoint [source just(1,2,3,4)].

請注意 is identified by light checkpoint [source just(1,2,3,4)].,這指出了我們的罪魁禍首(因為我們為檢查點使用了有意義的描述)。

結論

在本文中,我們瞭解到堆疊跟蹤在非同步程式設計中可能不太有用。Reactor 允許以惰性方式構建響應式序列,這使得這種影響進一步加劇。

我們研究了可能遇到的最糟糕情況,以及幾種可以減輕此問題的方法。

完整的程式碼可以在一個 gist 這裡找到。

在下一部分中,我們將瞭解排程器以及如何線上程之間切換。

與此同時,祝您響應式編碼愉快!

訂閱 Spring 電子報

透過 Spring 電子報保持聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,助您飛速進步。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部