搶佔先機
VMware 提供培訓和認證,助您加速發展。
瞭解更多本月早些時候,我們釋出了 Reactor 2020.0 的第一個里程碑版本。這個代號為 Europium
的週期接續了 Dysprosium 週期(其中包括 reactor-core 3.3.x 和 reactor-netty 0.9.x)。
它包括 reactor-core 3.4.0
和 reactor-netty 1.0.0
。
在這篇博文中,我們將介紹 reactor-core 里程碑版本的一些重點內容,並簡要提及 M2 版本的規劃。
關於 reactor-netty,我們會在單獨的博文釋出後立即在此處提供連結。
另請注意,新的版本控制方案已經到位,並已被 Spring 專案組合採納:請參閱參考指南和這篇博文。
Processor
的變更core 中的主要變化是針對 Reactor 中的 Processor
實現及其暴露方式進行了姍姍來遲的改進。
這是 reactor-core 3.4.0-M1
的主要關注點,目標是逐步淘汰具體 FluxProcessor
變體(以及某種程度上的 MonoProcessor
)的使用。
Processor
是 Reactive Streams 的一個介面,最初旨在表示反應式管道中可在庫之間共享的“步驟”。但如今,運算子大多直接實現為 Publisher/Subscriber
對,因此在 Reactor 中,處理器最終涵蓋了不同的用例(通常是將一個 Publisher
多播到多個 Subscriber
)。
因此,使用者最常將處理器視為一種“手動建立 Flux
”的方式:他們不是將 Processor
連線到父釋出者(即將其用作 Subscriber
),而是直接呼叫其 onNext
/onComplete
/onError
方法。不幸的是,這是一種有問題的做法,因為這些呼叫**必須**以符合 Reactive Streams 規範的方式進行,這意味著它們需要外部同步。
從歷史上看,透過在 FluxProcessor
上引入 sink()
方法緩解了這個問題。其思路是,如果您想以這種手動方式使用 FluxProcessor
,您需要例項化所需的處理器變體,然後**僅呼叫一次**其 sink()
方法,並從此以後使用生成的 FluxSink
來觸發訊號給訂閱者。在下游,FluxProcessor
本身被暴露出來(作為可在其上組合運算子的 Flux
)。
從可發現性角度來看,這仍然存在問題,因為滿足最常見用例的“正確方法”是最難想到的。
藉助 3.4.0,我們打算扭轉局面,將 Sink
的使用模式作為一流公民放在首位,並使 Processor
的使用模式更難被意外發現或誤用。
第一個里程碑透過以下方式邁出了第一步:
FluxProcessor
實現,這些實現計劃在 3.5.0
中移除。Sinks
工具類,其中包含用於手動觸發 sink 的工廠方法。在 M1 中,處理器的變體仍然存在,但工廠方法已被複制到 Processors
類中,但這已經在 M2 中進行重構。我們打算在 M2 中將變體的選擇移到 Sinks
上。屆時,將有一種方法可以將 Sink
轉換為 FluxProcessor
,從而消除 M2 中對 Processors
的需求。
在 M1
中,所有具體 xxxProcessor
上的工廠方法(例如 UnicastProcessor.create()
)已移至 Processors
用於基本情況,或移至 Processors.more()
用於允許更精細調整的過載方法。這些方法透過字首區分變體:
UnicastProcessor
-> Processors.unicast()
和 Processors.more().unicast(...)
EmitterProcessor
-> Processors.multicast()
和 Processors.more().multicast(...)
DirectProcessor
-> Processors.more().multicastNoBackpressure()
ReplayProcessor
-> Processors.replayAll()
/replay(int)
/replayTimeout(Duration)
/replaySizeAndTimeout(int, Duration)
以及 Processors.more()
上的類似方法從概念上講,所有這些處理器都具有相同的輸入和輸出型別 <T>
,因此它們是 FluxProcessor<T,T>
。M1 中引入了一個便利介面 FluxIdentityProcessor<T>
,但除了減少泛型數量之外並沒有帶來太多好處,因此它可能會在 M2 中移除。
但我們說過,相對於使用 Processors
中的 FluxProcessor
,人們應該優先使用 Sinks
。在這種情況下,首先會獲得一個 sink,並將其轉換為 Flux
或 Mono
,供應用程式的其餘部分進行組合,如下面的示例所示:
//you get the sink first and foremost
StandaloneFluxSink<Integer> sink = Sinks.multicast();
//this is what the rest of the application sees and uses
Flux<Integer> flux = sink.asFlux();
flux.map(i -> i * 10).subscribe();
flux.filter(i -> i % 2 == 0).subscribe();
//this is how you push data to the subscribers through the sink (thread safe)
sink.next(1);
sink.next(2);
sink.next(3);
sink.next(4);
sink.complete();
請注意,該類目前提供的變體比 Processors
少,但這正在 M2 中重新考慮。
一些早在 3.3.0 中已被棄用的類已被**移除**:
TopicProcessor
WorkQueueProcessor
Schedulers.boundedElastic()
自 3.3.0 起已可用,我們認為現在可以**棄用**其祖先方法 elastic()
,而不僅僅是建議使用 boundedElastic 而非 elastic。
再往後,在 3.5.0 中,elastic
Scheduler
將被移除。
這裡有很多內容需要涵蓋,我們將在另一篇博文中進行詳細介紹。
請試用 M1 版本!
我們已經在 M2 中對 sink 和 processor 進行了進一步更改,同時還處理了其他主題,如 Context
運算子、避免在 subscribe
中丟擲異常以及改進指標方面的內容。
一如既往,非常歡迎對 M1 和當前的 M2 快照版本提出反饋意見。
同時,祝您編碼愉快! Reactor 團隊。