Reactor 2020.0 (代號 Europium) 的第一個里程碑

工程 | Simon Baslé | July 10, 2020 | ...

本月早些時候,我們釋出了 Reactor 2020.0 的第一個里程碑版本。這個代號為 Europium 的週期接續了 Dysprosium 週期(其中包括 reactor-core 3.3.x 和 reactor-netty 0.9.x)。

它包括 reactor-core 3.4.0 和 reactor-netty 1.0.0

在這篇博文中,我們將介紹 reactor-core 里程碑版本的一些重點內容,並簡要提及 M2 版本的規劃。

關於 reactor-netty,我們會在單獨的博文釋出後立即在此處提供連結。

另請注意,新的版本控制方案已經到位,並已被 Spring 專案組合採納:請參閱參考指南這篇博文

關於 Processor 的變更

core 中的主要變化是針對 Reactor 中的 Processor 實現及其暴露方式進行了姍姍來遲的改進。

這是 reactor-core 3.4.0-M1 的主要關注點,目標是逐步淘汰具體 FluxProcessor 變體(以及某種程度上的 MonoProcessor)的使用。

Processor 是 Reactive Streams 的一個介面,最初旨在表示反應式管道中可在庫之間共享的“步驟”。但如今,運算子大多直接實現為 Publisher/Subscriber 對,因此在 Reactor 中,處理器最終涵蓋了不同的用例(通常是將一個 Publisher 多播到多個 Subscriber)。

因此,使用者最常將處理器視為一種“手動建立 Flux”的方式:他們不是將 Processor 連線到父釋出者(即將其用作 Subscriber),而是直接呼叫其 onNext/onComplete/onError 方法。不幸的是,這是一種有問題的做法,因為這些呼叫**必須**以符合 Reactive Streams 規範的方式進行,這意味著它們需要外部同步。

從歷史上看,透過在 FluxProcessor 上引入 sink() 方法緩解了這個問題。其思路是,如果您想以這種手動方式使用 FluxProcessor,您需要例項化所需的處理器變體,然後**僅呼叫一次**其 sink() 方法,並從此以後使用生成的 FluxSink 來觸發訊號給訂閱者。在下游,FluxProcessor 本身被暴露出來(作為可在其上組合運算子的 Flux)。

從可發現性角度來看,這仍然存在問題,因為滿足最常見用例的“正確方法”是最難想到的。

藉助 3.4.0,我們打算扭轉局面,將 Sink 的使用模式作為一流公民放在首位,並使 Processor 的使用模式更難被意外發現或誤用。

第一個里程碑透過以下方式邁出了第一步:

  • 棄用所有具體的 FluxProcessor 實現,這些實現計劃在 3.5.0 中移除。
  • 暴露一個 Sinks 工具類,其中包含用於手動觸發 sink 的工廠方法。

在 M1 中,處理器的變體仍然存在,但工廠方法已被複制到 Processors 類中,但這已經在 M2 中進行重構。我們打算在 M2 中將變體的選擇移到 Sinks 上。屆時,將有一種方法可以將 Sink 轉換為 FluxProcessor,從而消除 M2 中對 Processors 的需求。

在 M1 中從具體的 Processor 遷移

M1 中,所有具體 xxxProcessor 上的工廠方法(例如 UnicastProcessor.create())已移至 Processors 用於基本情況,或移至 Processors.more() 用於允許更精細調整的過載方法。這些方法透過字首區分變體:

  • UnicastProcessor -> Processors.unicast()Processors.more().unicast(...)
  • EmitterProcessor -> Processors.multicast()Processors.more().multicast(...)
  • DirectProcessor -> Processors.more().multicastNoBackpressure()
  • ReplayProcessor -> Processors.replayAll()/replay(int)/replayTimeout(Duration)/replaySizeAndTimeout(int, Duration) 以及 Processors.more() 上的類似方法

從概念上講,所有這些處理器都具有相同的輸入和輸出型別 <T>,因此它們是 FluxProcessor<T,T>。M1 中引入了一個便利介面 FluxIdentityProcessor<T>,但除了減少泛型數量之外並沒有帶來太多好處,因此它可能會在 M2 中移除。

但我們說過,相對於使用 Processors 中的 FluxProcessor,人們應該優先使用 Sinks。在這種情況下,首先會獲得一個 sink,並將其轉換為 FluxMono,供應用程式的其餘部分進行組合,如下面的示例所示:

//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();

//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();

//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();

請注意,該類目前提供的變體比 Processors 少,但這正在 M2 中重新考慮。

棄用和移除

一些早在 3.3.0 中已被棄用的類已被**移除**:

  • TopicProcessor
  • WorkQueueProcessor

Schedulers.boundedElastic() 自 3.3.0 起已可用,我們認為現在可以**棄用**其祖先方法 elastic(),而不僅僅是建議使用 boundedElastic 而非 elastic。

再往後,在 3.5.0 中,elastic Scheduler 將被移除。

Reactor-Netty 達到 1.0 版本

這裡有很多內容需要涵蓋,我們將在另一篇博文中進行詳細介紹。

總結

請試用 M1 版本!

我們已經在 M2 中對 sink 和 processor 進行了進一步更改,同時還處理了其他主題,如 Context 運算子、避免在 subscribe 中丟擲異常以及改進指標方面的內容。

一如既往,非常歡迎對 M1 和當前的 M2 快照版本提出反饋意見。

同時,祝您編碼愉快! Reactor 團隊。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,助您加速發展。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部