領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多致響應式 Spring 社群的朋友們!
雖然團隊仍在大量開發 3.1 系列,但我們也希望讓社群有機會搶先了解未來 3.2 系列的計劃。
特別是,3.2.0.RELEASE 的一大亮點是增加了我們一直稱之為“錯誤模式”、“繼續模式”或最近更正式的“錯誤策略”。
這很簡單,真的:如果運算子中執行的使用者程式碼中的異常可以恢復,允許序列繼續,那會怎麼樣?
讓我們舉個例子,假設您有以下方法
public Flux<Integer> divide100By(Flux<Integer> dividers) {
return dividers.map(div -> 100 / div);
}
如果 dividers 源在某個時刻發出 0,那麼結果 Flux 將立即因 ArithmeticException 的 onError 訊號而終止。
如果源恰好是 Flux.range(0, 10),那麼仍然有 9 個完全有效的值可以被對映。
您如何才能做到只忽略這種瞬時異常(且僅忽略這種異常),並給進一步的有效值處理的機會?
在 Reactor 3.1 中,您可以採用一種權宜之計,即使用 flatMap 為每個元素建立一個內部序列,然後將錯誤恢復運算子應用於這些細粒度序列
public Flux<Integer> divide100By(Flux<Integer> dividers) {
return dividers.flatMap(div -> (1)
Mono.just(100 / div) (2)
.doOnError(e -> { (3)
if (e instanceof ArithmeticException) process(e); (4)
})
.onErrorResume(ArithmeticException.class, e -> Mono.empty()) (5)
);
}
我們使用 flatMap 而不是 map,為每個值生成一個小的內部 Mono
那個 Mono 基本上就是舊的 map 操作……
……並添加了錯誤恢復。
首先,我們確保在“恢復”之前處理(例如記錄)ArithmeticException(且僅處理這些異常)
然後我們使用 onErrorResume 和 Mono.empty() 來有效地忽略結果序列中的異常
這可行,但編寫起來有點麻煩(儘管 compose 和 transform 可以幫助共享這類程式碼)。此外,我們從一個 map 運算子轉向了帶有幾個內部運算子的 flatMap。
由於需要協調多個源,flatMap 比 map 有更多的開銷。儘管運算子融合等技術可以減少這種開銷,但它仍然存在。
如果我們想進一步減少這種處理的開銷,那麼困難在於我們現在必須在每個運算子的實現層面進行工作。
鏈中的每個運算子都必須以某種方式被告知異常應該被捕獲但不能透過 onError 傳播,而是以不同的方式處理。這是一個很大的改變,而且是橫向的!
請注意,這在概念上聽起來像是一個 filter,但用於錯誤。就像 filter 一樣,這意味著在一個 onNext 丟擲異常後繼續處理其源的運算子也應該從其源請求至少一個額外的元素。
儘管它可以隔離到一個特殊的執行路徑中,但這仍然是對運算子的複雜核心實現更改。
然後是 API 的問題:作為建構函式引數或 Flux 中帶有“錯誤恢復”布林值的額外過載來設定它會非常繁瑣……我們真的需要將 Flux API 中的方法數量翻倍以支援該功能嗎?
幸運的是,不需要:從 3.1 開始,我們有了 Context,它是將此類資訊傳播到鏈中每個(Reactor)運算子的好方法。
所以這就是我們為錯誤策略功能所採取的道路
支援只新增到**特定運算子**(map、filter、flatMap、handle 等)。這些運算子有一個特殊的 javadoc 標籤來記錄這一事實。
透過在給定 Flux 的 Context 中放置一個特殊鍵來啟用該功能
每個支援的運算子在其 onNext 實現中都有一個特殊路徑,該路徑會檢查該鍵,如果找到,則會改變其處理錯誤的方式。
該功能透過 errorStrategyContinue() API 暴露給使用者
它可以更細粒度:可以過濾哪些異常可以恢復,併為這些已恢復的異常設定自定義處理程式。
重要
需要記住的一點是,由於這是透過 Context 啟用的,因此該功能遵循與 Context 相同的傳播規則。例如,它將在 flatMap 中的內部序列上啟用。如果不需要,請在 flatMap 中使用 errorStrategyStop() 返回預設行為(這不會超出 flatMap 的範圍)。**它也會向後傳播,在 errorStrategyXXX 之前啟用運算子**。
這是我們的上一個示例在使用錯誤策略後的樣子
public Flux<Integer> divide100By(Flux<Integer> dividers) {
return dividers.map(div -> 100 / div) (1)
.errorStrategyContinue(ArithmeticException.class, (2)
(error, value) -> process(error)); (3)
}
回到簡單的 map
我們只從 ArithmeticException 中恢復
我們將此類異常傳遞給我們的內部處理程式(請注意,我們還可以訪問導致異常的原始值,如果有的話)
我們剛剛釋出了一個 3.2.0.M1 里程碑 [1],主要包含錯誤策略功能,我們希望您進行測試?
注意
這是一個非常橫向的更改,即使您不打算使用它,使用該構件執行您的測試也很有價值,以驗證如果您沒有顯式使用 errorStrategyContinue(),您應該不會看到任何行為變化(因為該功能包含在特定的執行路徑中)。
為了獲取里程碑版本,將 repo.spring.io/milestone 倉庫新增到您的 Maven 或 Gradle 構建配置中,並獲取 reactor-core 3.2.0.M1 構件
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.0.M1</version>
</dependency>
對於該功能的 API,目前還沒有定論。因此,如果您有任何反饋,請透過在 GitHub 上提出問題 或在 Gitter 上討論該功能來告訴我們。
與此同時,祝您編碼愉快!
1. PS:此里程碑提前釋出,而 3.1.3.RELEASE 仍在開發中,因此請注意,它不包含 3.1.3 和後續 3.1.x 版本中的所有修復。