Reactor Californium-M1,今夏的里程碑版本列車

工程 | Simon Baslé | 2018 年 8 月 7 日 | ...

我代表 Reactor 團隊高興地宣佈最新的 Reactor 里程碑版本 Californium-M1 ? ?

團隊一直忙於開發 Reactor 3 的第三個主要版本 Californium。我們現在準備就一些選定的問題收集您的反饋,並且我們還準備了許多增強功能和錯誤修復供您使用。

Californium-M1 BOM

作為第三個釋出列車,我們繼續採用元素週期表中按字母順序遞增的名稱主題。鉲 (Californium) 是一種首次在加利福尼亞合成的元素。

該里程碑的 BOM 包含

  • reactor-core 3.2.0.M3
  • reactor-extra 3.2.0.M1(包含一些 API 對齊更改)
  • reactor-netty 0.8.0.M1

今年早些時候(M1),reactor-core 釋出了一個早期預覽版,主要聚焦於“錯誤模式繼續”功能,並且核心庫還在 6 月份釋出了一個脫離釋出列車的里程碑版本(M2)。這篇部落格文章涵蓋了後者的更改以及全新的 M3 中的更改。

Reactor Netty 0.8.0.M1

這裡的重點是 reactor-netty。預計會有一篇更全面的部落格文章詳細介紹 API 更改和新功能的理由,其中包括:

API 重構

團隊對 API 進行了大量重構,使其在構建客戶端和伺服器時更具指導性,避免了在 0.7.x 版本中容易發生的難以容忍的配置錯誤。

新的 API 也更清晰地闡明瞭生命週期。

HTTP2 支援

是的,HTTP2 支援 ? 目前主要以透明方式升級到 HTTP2,但我們正在努力在不久的將來將 HTTP2 獨立流作為一等公民加入。

Reactor Core 3.2.0

總的來說,與之前的 Bismuth 迭代相比,M2 和 M3 帶來了 70 多項更改。

reactor-core 中的 API 更改比 reactor-netty 少,更新注意事項主要涉及里程碑版本本身的差異。有關更多詳細資訊,請參閱 M2M3 的“更新注意事項”部分。

我們最需要您對以下功能的反饋

指標

已新增 Micrometer 和 .metrics() 支援(#1183, #1123)。新的 .metrics() 運算子僅在 classpath 中存在 Micrometer 時才會執行某些操作。

它記錄 onNext 時間、訂閱到完成時間、訊號計數等指標——所有這些都從緊鄰上游運算子產生訊號的角度進行衡量。

它是在 M2 中引入的,但在 M3 中得到了改進,並且某些標籤名稱發生了(破壞性)更改(#1245)。

請注意,一個重要目標是避免在 Reactor 公共 API 中暴露 Micrometer 的內容。我們不希望對 Micrometer 產生強制依賴,並且努力將其使用限制在僅當我們在 classpath 中檢測到它時才載入的內部類中。

下一步:在 GA 版本之前,還應該為 Schedulers(或者更確切地說,支援某些 SchedulersExecutorServices)提供基本的測量支援。我們還在尋找一種方法,可以在全域性範圍內為 Reactor 選擇特定的 MeterRegistry,同樣不暴露引用 MeterRegistry 介面的公共 API。

高階重試

我們添加了一個預配置的 retryWhen 替代方案,具有指數回退和抖動 (retryBackoff())。詳見 #1122

此版本的重試反映了我們認為是業界在重試方面的最佳實踐。它是過於簡單的 retry(n)、複雜的 retryWhen(Function)reactor-addons 中更可配置的 RetryFunction 之間的一個很好的折中方案。

基於資源的響應式閉包

為了幫助您構建響應式事務塊,我們添加了 usingWhen。與 using 類似,它包裝一個資源,從中生成一個 Flux,並確保當 Flux 終止時資源得到妥善清理。

主要區別在於

  • 資源透過 Publisher 非同步提供。
  • 清理也是非同步的 (Function<Resource, Publisher>),並且只延遲終止訊號的傳播,不延遲 onNext 訊號。
  • 該運算子可以針對完成、錯誤和取消終止分別進行非同步“清理”。

這在 M2 中引入,但在 M3 中略有更改,以修復 Context 傳播並支援取消 Publisher<Resource>。透過在 Resource 甚至發出之前取消此運算子返回的主 Flux<T>,您的取消指令將傳播到 Publisher<Resource>

onDiscard 鉤子

這個全域性鉤子以 Consumer<Object> 的形式出現,旨在為處理需要特殊清理的堆外物件的高階使用者提供最後一塊缺失的功能。

通常,Netty 的 ByteBuf 或 Spring 5 的 DataBuffer 屬於這一類:它們是池化的堆外物件,在使用完畢後需要呼叫 release(),否則可能導致記憶體洩漏。

這些元素可能在響應式序列的縫隙中丟失,並在以下三種主要情況下永遠不會到達使用者程式碼:

  1. 運算子的源格式錯誤且不遵守 RS 規範(例如,在發出 onComplete 訊號之後發出 ByteBuf)。
  2. 運算子作為其語義的一部分過濾某些元素(例如 filter)。
  3. 運算子為了背壓最佳化目的進行預取,並且被取消,從而丟棄其預取佇列。

情況 (1) 已經被 onNextDropped 鉤子覆蓋,但情況 (3) 絕對沒有。情況 (2)(過濾語義)有點居中,例如可以在過濾 Predicate 內部進行清理。但這很麻煩且容易被遺忘。

因此,我們將 onDiscard 新增到我們的 Hooks 庫中,以涵蓋 (2) 和 (3)。請注意,與“錯誤時繼續”功能不同,目前還沒有公共 API 可以在特定的 Flux 例項上設定該鉤子。存在一個透過 Context 實現的未支援的變通方法,並且官方 API 很可能在 GA 版本或以後的維護版本中出現。

onDiscard 鉤子具有以下特性和要求

  • 它是累加的,這意味著兩次呼叫 Hooks.onDiscard(Consumer) 將使用 Consumer#andThen 組合這兩個消費者。
  • 它不是按鍵的,這意味著多次呼叫是累加的,但只能完全重置(而不是按每個 Consumer 進行重置)。
  • Consumer 在型別轉換之前必須執行 instanceof 檢查,因為它將與不同型別的物件一起使用。
  • Consumer 不得丟擲異常,並且應根據需要包含 try catch 塊。
  • Consumer 必須是冪等的,因為它可能在同一個例項上被多次呼叫(例如,在緩衝區重疊的情況下)。

另外值得一提的是,errorStrategyContinue() 在 M3 中已重新命名為 onErrorContinue()

Reactor Extra 3.2.0.M1

最後,reactor-extra 在 retry/repeat 工具方面進行了一些次要的 API 更改。它與 core 運算子對齊,使用相同的預設值,並且使用 Long 而不是 Integer 作為索引。

下一步

reactor-core 的下一步是對 Processor 物件的暴露方式進行重構。當前的 FluxProcessor<IN, OUT> 有點臃腫,因為它擴充套件並暴露了整個 Flux API。

此外,FluxProcessor#sink() 及其相關的 FluxSink 太容易被誤用,尤其是在既希望將 Processor 訂閱到 Publisher 源,又希望透過 sink() 手動向其推送資料時,這目前實際上是不支援的。另外,sink() 應該只調用一次,並且返回的 FluxSink<T> 例項應該被重用的事實也不夠清晰。

因此,我們正在考慮在 Processor<T, T> 上構建一個外觀模式,它直接實現 FluxSink(而不是 Flux),可以在同時用作訂閱者和 sink 時工作,並且包含一個 asFlux() 檢視方法,以便選擇性地在其之上構建一個 Flux 運算子鏈。

MonoProcessor 很可能會沿著這些步驟發展,成為一個(更簡單的)介面,具體實現將被重新命名為 MonoNextProcessor。我們還在考慮提供一個獨立的 MonoSink 實現,使用者可以直接操作它,而無需使用 Mono.create()

結論

酷人從不等待 GA 版本釋出!趕快去試試那個閃亮的里程碑版本吧,一如既往地歡迎您的反饋。 :)

祝您響應式程式設計愉快!

訂閱 Spring 郵件列表

透過 Spring 郵件列表保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,為您的進步加速助力。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部