Reactor Californium-M1,今年夏天的里程碑釋出列車

工程 | Simon Baslé | 2018 年 8 月 7 日 | ...

我謹代表 Reactor 團隊,很高興宣佈最新的 Reactor 里程碑 Californium-M1 ? ?

團隊一直忙於 Californium,這是 Reactor 3 的第三個主要版本。我們現在已準備好聽取您對一些特定問題的反饋,並且我們還準備了大量增強功能和錯誤修復供您使用。

Californium-M1 BOM

對於其第三個釋出列車,我們繼續沿用元素週期表中按字母順序遞增的名稱主題。Californium 是第一個在加利福尼亞合成的元素。

該里程碑的 BOM 包含

  • reactor-core 3.2.0.M3
  • reactor-extra 3.2.0.M1(帶有一些 API 對齊更改)
  • reactor-netty 0.8.0.M1

今年早些時候 (M1),reactor-core 釋出了一個早期預覽版,該版本只專注於“錯誤模式繼續”功能,並且核心庫在 6 月份還發布了一個非定期里程碑版本 (M2)。本部落格文章除了涵蓋全新的 M3 中的變更,還包括了 M2 中的變更。

Reactor Netty 0.8.0.M1

這裡的重要更新是 reactor-netty。敬請期待一篇更詳細的部落格文章,它將詳細介紹 API 變更和新功能背後的原因,其中包括:

API 重構

團隊對 API 進行了大規模重構,使其在構建客戶端和伺服器時更具指導性,避免了 0.7.x 版本中容易出現的難以除錯的配置錯誤。

新 API 還更清晰地勾勒出了生命週期。

HTTP2 支援

是的,HTTP2 支援!目前主要是透明地升級到 HTTP2,但我們正在努力在不久的將來將 HTTP2 獨立流作為一等公民新增進來。

Reactor Core 3.2.0

M2 和 M3 總共帶來了 70 多項變更,相較於之前的 Bismuth 版本。

reactor-core 的 API 變更少於 reactor-netty,更新考慮主要在於里程碑版本之間的差異。有關更多詳細資訊,請參閱 M2M3 的“更新考慮”部分。

我們最需要您對以下功能的反饋:

指標

已新增 Micrometer 和 .metrics() 支援 (#1183, #1123)。新的 .metrics() 運算子僅在 Micrometer 存在於類路徑上時才執行操作。

它會記錄 onNext 計時、訂閱到完成計時、訊號計數等指標——所有這些都從上游運算子產生的訊號的角度來看。

它在 M2 中引入,但在 M3 中得到了改進,並且一些標籤名稱也發生了(破壞性)更改 (#1245)。

請注意,一個重要的目標是避免在公共 Reactor API 中暴露 Micrometer 內容。我們不希望強制依賴 Micrometer,我們努力將其使用限制在僅當我們檢測到它在類路徑上時才載入的內部類中。

接下來:在 GA 之前,還應該有對 Schedulers(或者說支援某些 SchedulersExecutorServices)的基本檢測支援。我們還在研究一種為 Reactor 全域性選擇特定 MeterRegistry 的方法,同樣不暴露引用 MeterRegistry 介面的公共 API。

高階重試

我們添加了一個預配置的 retryWhen 替代方案,帶有指數退避和抖動 (retryBackoff())。請參閱 #1122

此版本的重試反映了我們認為在重試方面的行業最佳實踐。它是過於簡單的 retry(n)、複雜的 retryWhen(Function)reactor-addons 中更可配置的 RetryFunction 之間的一個良好折衷。

基於資源的響應式閉包

為了幫助您構建響應式事務塊,我們添加了 usingWhen。與 using 類似,它包裝一個資源,從中生成一個 Flux,並確保在 Flux 終止時正確清理資源。

主要區別在於:

  • 資源透過 Publisher 非同步提供。
  • 清理也是非同步的 (Function<Resource, Publisher>),並且只延遲終端訊號的傳播,而不延遲 onNext 訊號的傳播。
  • 運算子可以為完成、錯誤和取消終止擁有單獨的非同步“清理”。

這在 M2 中引入,但在 M3 中進行了微小更改,以修復 Context 傳播並支援 Publisher<Resource> 的取消。透過在此運算子返回的主 Flux<T> Resource 發出之前取消,您的取消指令將傳播到 Publisher<Resource>

onDiscard 鉤子

這個全域性鉤子,以 Consumer<Object> 的形式出現,旨在成為處理需要特殊清理的堆外物件的高階使用者所需的最後一塊拼圖。

通常,Netty 的 ByteBuf 或 Spring 5 的 DataBuffer 屬於此類別:它們是池化的、堆外的,並且一旦不再使用就需要呼叫 release(),否則會導致記憶體洩漏。

此類元素在響應式序列中可能被遺漏,並且在三種主要情況下永遠不會到達使用者程式碼:

  1. 運算子的源格式不正確,不符合 RS 規範(例如,它在發出 onComplete 之後才發出 ByteBuf)。
  2. 運算子根據其語義過濾某些元素(例如 filter)。
  3. 運算子出於背壓最佳化目的進行預取,但被取消,從而丟棄其預取佇列。

情況 (1) 已由 onNextDropped 鉤子涵蓋,但情況 (3) 絕對沒有。情況 (2)(過濾語義)則介於兩者之間,例如,可以在過濾 Predicate 內部進行清理。但這很麻煩,而且容易被遺忘。

因此,我們在 Hooks 的功能集中添加了 onDiscard 來涵蓋 (2) 和 (3)。請注意,與“出錯時繼續”功能不同,目前沒有公共 API 可以為特定的 Flux 例項設定鉤子。有一個使用 Context 的不受支援的變通方法,官方 API 很可能會在 GA 或後續維護版本中出現。

onDiscard 鉤子具有以下特性和要求

  • 它是附加的,這意味著呼叫 Hooks.onDiscard(Consumer) 兩次將使用 Consumer#andThen 組合這兩個消費者。
  • 它不是鍵控的,這意味著雖然多次呼叫是附加的,但它只能完全重置(不能按每個 Consumer 進行重置)。
  • Consumer 在型別轉換之前必須執行 instanceof 檢查,因為它將用於不同型別的物件。
  • Consumer 決不能丟擲異常,並且應根據需要包含 try catch 塊。
  • Consumer 必須是冪等的,因為它可能會在同一例項上多次呼叫(例如,在緩衝區重疊的情況下)。

附帶說明一下,errorStrategyContinue() 在 M3 中已重新命名為 onErrorContinue()

Reactor Extra 3.2.0.M1

最後,reactor-extra 在 retry/repeat 工具方面有較小的 API 更改。它與 core 運算子對齊,使用相同的預設值和 Long 而不是 Integer 索引。

下一步

reactor-core 的下一步是重構 Processor 物件的暴露方式。當前的 FluxProcessor<IN, OUT> 有點臃腫,因為它擴充套件並暴露了整個 Flux API。

此外,FluxProcessor#sink() 和相關的 FluxSink 太容易被誤用,特別是當人們既想將 Processor 訂閱到 Publisher 源,又想透過 sink() 手動推送資料時,這目前並不真正受支援。sink() 應該只調用一次,並且返回的 FluxSink<T> 例項應該重用,這一點也不夠清晰。

因此,我們正在研究一個 Processor<T, T> 的門面,它直接實現 FluxSink(而不是 Flux),在同時用作訂閱者和接收器時工作,並且有一個 asFlux() 檢視方法來可選地在其之上構建 Flux 運算子鏈。

MonoProcessor 很可能會遵循這些步驟,成為一個(更簡單的)介面,具體實現將重新命名為 MonoNextProcessor。我們還在考慮提供一個獨立的 MonoSink 實現,使用者可以直接操作,而無需使用 Mono.create()

結論

酷的人不會等待 GA 釋出!快去試駕那個閃亮的里程碑版本吧,一如既往,歡迎反饋。:)

祝您反應式程式設計愉快!

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有