領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多Reactor 2.0 的開發始於 2014 年底,與 Reactive Streams 大致同期。我們熱衷於加入這項工作並早期採用背壓協議來緩解我們主要的消息傳遞限制:有界容量。我們在 Reactor 2.0 中首次嘗試實現基於環形緩衝區的排程器的 Reactive Streams 實現,並衍生出一種日益流行的響應式模式:Reactive Extensions。
與此同時,Reactive Streams 開始受到關注,整個庫生態系統都在討論這一轉變。常見的擔憂?實現 Reactive Streams 語義絕非易事。我們觀察到對響應式基礎的需求日益增加,以解決訊息傳遞並實現常見的流式運算子。因此,我們為 Reactor Core 建立了一個專門的專案空間,並與 Spring Framework 團隊開始了一項重點工作。
從 2.5 開始,Reactor 現在被組織成多個專案,2.0.x 等維護分支保持不變。這反映在釋出管理中,例如Reactor Core 2.5 M1是唯一可用的里程碑,其他專案將隨其獨有的版本釋出。
為了支援這種新的專案模型,我們在http://projectreactor.io 上部署了一個新的、希望更受歡迎的網站。
這種新的組織結構大大降低了參與專案活動的成本。該專案受益於 Spring API 設計協作以及來自 Sébastien Deleuze 和 Brian Clozel 的直接貢獻。Reactor 還歡迎新的外部貢獻者和評審員的幫助。
![]() |
Reactive Streams Commons 儲存庫是一項開放研究工作,專注於 Reactive Extensions 及更多領域的效率,以實現 Reactive Streams 規範。它完全由 Reactor Core 和 Stream 內聯,它們作為這項工作所關注的許多變革的契約門戶。
"RSC" 因此是一個類似於 JCTools 處理併發佇列的自由形式專案。它最大的進展之一是“融合”協議,以減少響應式處理鏈中大多數同步和某些非同步階段的開銷。最後,這項工作幫助修復了上百個流式錯誤,我們的測試過程現在包括 RSC 單元/整合測試和 JMH 基準測試,並結合 Reactor 自己的整合測試和 基準測試。
今天的 Reactor 部落格系列以一個令人愉快的事件開始:Reactor Core 2.5.0.M1 釋出!在其新的範圍和與 Reactive Streams Commons 的緊密聯絡下,Reactor Core 提供了足夠的 Rx 覆蓋範圍來構建響應式應用程式或庫,例如 Spring Reactive Web 支援。對於不耐煩的讀者,請檢視 GitHub 上已有的快速入門。
快速瀏覽一下分散-聚集場景
Mono.from(userRequestPublisher)
.then(userRepository::findUserProfile,
userRepository::findUserPaymentMethod)
.log("user.requests")
.or(Mono.delay(5)
.then(n -> Mono.error(new TimeoutException()))
.mergeWith(userRepository::findSimilarUserDetails)
.map(userDetailsTuple -> userDetailsTuple.t1.username)
.publishOn(SchedulerGroup.io())
.subscribe(responseSubscriber);
詳情
create()、interval()、merge()、zip()、concat()、switchOnError() 和 switchOnEmpty()。
delay()、then()、any()、and()、or()、otherwise()、otherwiseIfEmpty()、where() 和阻塞的 get()。
-- 包含 SchedulerGroup、TopicProcessor 和 WorkQueueProcessor。 -- 取代了之前的 Enviroment/Dispatcher 組合,同時解決了相同的需求,並且很快將記錄簡單的遷移路徑。不再有靜態狀態持有排程器的引用。 -- 連結運算子:publishOn() 和 dispatchOn()
Publisher 源的測試。Callable、Runnable、Iterable、Java 8 CompletableFuture、Java 9 Flow.Publisher、RxJava 1 Observable 和 Single 轉換為支援 Reactive Streams 的 Flux 和 Mono,無需額外的橋接依賴。-- 一種經濟高效的定時器 API 和實現(雜湊輪定時器)。 -- 新的融合 API,用於虛擬地合併響應式鏈中的兩個或更多階段。 -- 一個經過調整的 QueueSupplier,將為正確的容量提供正確的佇列。
-- Publisher 日誌記錄,如果可用,則回退到 java.util.logging 或 SLF4J。可直接在 Flux 和 Mono 上使用 log() 運算子。 -- 與包括 Reactive Streams 在內的任何其他契約正交,一切都可以是 Backpressurable、Completable 或作為 Receiver 生成泛型 Object(可能是 Subscriber),這反過來又允許我們追蹤流的完整圖並使用狀態指示器對其進行增強:
我們希望收集您的反饋意見,您可以訪問相應的 issues 儲存庫或加入我們最近建立的 Gitter 頻道。敬請關注下一篇關於 Reactor Stream 2.5.0.M1 的文章,它是 Reactive Streams 的完整 Rx 實現。