領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本月初,我們釋出了 Reactor 2020.0 的第一個里程碑。這個代號為 Europium 的週期,緊隨 Dysprosium 週期(其中包括 reactor-core 3.3.x 和 reactor-netty 0.9.x)。
它包括 reactor-core 3.4.0 和 reactor-netty 1.0.0。
在這篇博文中,我們將介紹 reactor-core 里程碑的一些亮點,並簡要提及 M2 的計劃。
對於 reactor-netty,我們將在此處連結到單獨的博文,一旦釋出。
另請注意,目前採用了一種新的版本控制方案,該方案已在整個 Spring 產品組合中採用:請參閱參考指南和這篇部落格文章。
Processor的變更核心的主要變化是對 Reactor 中Processor實現及其公開方式的長期努力。
這是reactor-core 3.4.0-M1的主要關注點,目標是逐步淘汰具體風格的FluxProcessor(並在某種程度上淘汰MonoProcessor)的使用。
Processor是 Reactive Streams 的一個介面,最初旨在表示可由庫共享的反應式管道中的“步驟”。但如今,運算子很大程度上直接作為Publisher/Subscriber對實現,因此在 Reactor 中,處理器最終覆蓋了不同的用例(最常見的是從一個Publisher向多個Subscriber進行多播)。
因此,最常見的是,使用者將處理器視為“手動建立Flux”的一種方式:他們不將Processor連線到父釋出者(即將其用作Subscriber),而是直接呼叫其onNext/onComplete/onError方法。不幸的是,這是一種有問題的方法,因為此類呼叫必須以符合 Reactive Streams 規範的方式進行,這意味著它們需要外部同步。
歷史上,透過在FluxProcessor上引入sink()方法來緩解了這個問題。其思想是,如果您想以這種手動方式使用FluxProcessor,您需要例項化所需的處理器風格,然後呼叫其sink()方法一次,並從那裡使用生成的FluxSink來觸發訊號給訂閱者。在下游,FluxProcessor本身被公開(作為可以組合運算子的Flux)。
從可發現性的角度來看,這仍然有問題,因為滿足最常見用例的“正確方法”是最難想到的。
在 3.4.0 中,我們打算扭轉這種局面,將Sink使用模式作為一流公民放在聚光燈下,並使Processor使用模式更難意外發現或誤用。
第一個里程碑透過以下方式邁出了第一步:
FluxProcessor的具體實現,這些實現現已計劃在3.5.0中移除Sinks實用程式類,該類包含用於手動觸發的接收器的工廠方法在 M1 中,處理器的風格仍然存在,但工廠方法已複製到Processors類中,但這在 M2 中已在重做。我們打算在 M2 中將風格的選擇移至Sinks。然後,將有一種方法可以將Sink轉換為FluxProcessor,從而消除了 M2 中對Processors的需求。
在M1中,所有具體xxxProcessor(例如UnicastProcessor.create())上的工廠方法都已移至Processors(用於基本情況)或Processors.more()(用於允許更精細調整的過載)。這些方法透過字首區分風格
UnicastProcessor -> Processors.unicast() 和 Processors.more().unicast(...)EmitterProcessor -> Processors.multicast() 和 Processors.more().multicast(...)DirectProcessor -> Processors.more().multicastNoBackpressure()ReplayProcessor -> Processors.replayAll()/replay(int)/replayTimeout(Duration)/replaySizeAndTimeout(int, Duration) 以及Processors.more()上的類似方法所有這些處理器在概念上都具有相同的輸入和輸出型別<T>,因此它們是FluxProcessor<T,T>。在 M1 中引入了一個便利介面FluxIdentityProcessor<T>,但它除了減少泛型數量外沒有帶來太多其他功能,因此可能會在 M2 中移除。
但我們說過,不應使用Processors中的FluxProcessor,而應優先使用Sinks。在這種情況下,首先獲取一個接收器,然後將其轉換為Flux或Mono,供應用程式的其餘部分進行組合,如下例所示
//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();
//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();
//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();
請注意,該類目前提供的變體少於Processors,但這正在 M2 中重新考慮。
早在 3.3.0 中已棄用的幾個類已被移除
TopicProcessorWorkQueueProcessorSchedulers.boundedElastic()自 3.3.0 釋出以來已經推出,我們認為我們現在可以棄用其祖先elastic(),而不僅僅是建議使用 boundedElastic 而不是 elastic。
未來,在 3.5.0 中,elastic Scheduler將被移除。
這裡有很多內容需要介紹,我們將在單獨的部落格文章中進行介紹。
請嘗試M1!
我們已經在 M2 中對接收器和處理器進行了進一步的更改,以及其他主題,如Context運算子,避免在subscribe中丟擲異常,並改進指標方面。
一如既往,非常歡迎對 M1 和當前 M2 快照的反饋。
在此期間,祝您反應式程式設計愉快! Reactor 團隊。