Reactor 2020.0(代號 Europium)的第一個里程碑

工程 | Simon Baslé | 2020 年 7 月 10 日 | ...

本月初,我們釋出了 Reactor 2020.0 的第一個里程碑。這個代號為 Europium 的週期,緊隨 Dysprosium 週期(其中包括 reactor-core 3.3.x 和 reactor-netty 0.9.x)。

它包括 reactor-core 3.4.0 和 reactor-netty 1.0.0

在這篇博文中,我們將介紹 reactor-core 里程碑的一些亮點,並簡要提及 M2 的計劃。

對於 reactor-netty,我們將在此處連結到單獨的博文,一旦釋出。

另請注意,目前採用了一種新的版本控制方案,該方案已在整個 Spring 產品組合中採用:請參閱參考指南這篇部落格文章

關於Processor的變更

核心的主要變化是對 Reactor 中Processor實現及其公開方式的長期努力。

這是reactor-core 3.4.0-M1的主要關注點,目標是逐步淘汰具體風格的FluxProcessor(並在某種程度上淘汰MonoProcessor)的使用。

Processor是 Reactive Streams 的一個介面,最初旨在表示可由庫共享的反應式管道中的“步驟”。但如今,運算子很大程度上直接作為Publisher/Subscriber對實現,因此在 Reactor 中,處理器最終覆蓋了不同的用例(最常見的是從一個Publisher向多個Subscriber進行多播)。

因此,最常見的是,使用者將處理器視為“手動建立Flux”的一種方式:他們不將Processor連線到父釋出者(即將其用作Subscriber),而是直接呼叫其onNext/onComplete/onError方法。不幸的是,這是一種有問題的方法,因為此類呼叫必須以符合 Reactive Streams 規範的方式進行,這意味著它們需要外部同步。

歷史上,透過在FluxProcessor上引入sink()方法來緩解了這個問題。其思想是,如果您想以這種手動方式使用FluxProcessor,您需要例項化所需的處理器風格,然後呼叫其sink()方法一次,並從那裡使用生成的FluxSink來觸發訊號給訂閱者。在下游,FluxProcessor本身被公開(作為可以組合運算子的Flux)。

從可發現性的角度來看,這仍然有問題,因為滿足最常見用例的“正確方法”是最難想到的。

在 3.4.0 中,我們打算扭轉這種局面,將Sink使用模式作為一流公民放在聚光燈下,並使Processor使用模式更難意外發現或誤用。

第一個里程碑透過以下方式邁出了第一步:

  • 棄用所有FluxProcessor的具體實現,這些實現現已計劃在3.5.0中移除
  • 公開一個Sinks實用程式類,該類包含用於手動觸發的接收器的工廠方法

在 M1 中,處理器的風格仍然存在,但工廠方法已複製到Processors類中,但這在 M2 中已在重做。我們打算在 M2 中將風格的選擇移至Sinks。然後,將有一種方法可以將Sink轉換為FluxProcessor,從而消除了 M2 中對Processors的需求。

在 M1 中從具體處理器遷移

M1中,所有具體xxxProcessor(例如UnicastProcessor.create())上的工廠方法都已移至Processors(用於基本情況)或Processors.more()(用於允許更精細調整的過載)。這些方法透過字首區分風格

  • UnicastProcessor -> Processors.unicast()Processors.more().unicast(...)
  • EmitterProcessor -> Processors.multicast()Processors.more().multicast(...)
  • DirectProcessor -> Processors.more().multicastNoBackpressure()
  • ReplayProcessor -> Processors.replayAll()/replay(int)/replayTimeout(Duration)/replaySizeAndTimeout(int, Duration) 以及Processors.more()上的類似方法

所有這些處理器在概念上都具有相同的輸入和輸出型別<T>,因此它們是FluxProcessor<T,T>。在 M1 中引入了一個便利介面FluxIdentityProcessor<T>,但它除了減少泛型數量外沒有帶來太多其他功能,因此可能會在 M2 中移除。

但我們說過,不應使用Processors中的FluxProcessor,而應優先使用Sinks。在這種情況下,首先獲取一個接收器,然後將其轉換為FluxMono,供應用程式的其餘部分進行組合,如下例所示

//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();

//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();

//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();

請注意,該類目前提供的變體少於Processors,但這正在 M2 中重新考慮。

棄用和移除

早在 3.3.0 中已棄用的幾個類已被移除

  • TopicProcessor
  • WorkQueueProcessor

Schedulers.boundedElastic()自 3.3.0 釋出以來已經推出,我們認為我們現在可以棄用其祖先elastic(),而不僅僅是建議使用 boundedElastic 而不是 elastic。

未來,在 3.5.0 中,elastic Scheduler將被移除。

Reactor-Netty 正在達到 1.0

這裡有很多內容需要介紹,我們將在單獨的部落格文章中進行介紹。

結論

請嘗試M1

我們已經在 M2 中對接收器和處理器進行了進一步的更改,以及其他主題,如Context運算子,避免在subscribe中丟擲異常,並改進指標方面。

一如既往,非常歡迎對 M1 和當前 M2 快照的反饋。

在此期間,祝您反應式程式設計愉快! Reactor 團隊。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有