Reactor 2.5:JVM 的第二代響應式基礎

釋出 | Stephane Maldini | 2016 年 2 月 16 日 | ...

進入響應流時代

Reactor 2.0 的開發始於 2014 年底,與 Reactive Streams 大致同期。我們熱衷於加入這項工作並早期採用背壓協議來緩解我們主要的消​​息傳遞限制:有界容量。我們在 Reactor 2.0 中首次嘗試實現基於環形緩衝區的排程器的 Reactive Streams 實現,並衍生出一種日益流行的響應式模式:Reactive Extensions

與此同時,Reactive Streams 開始受到關注,整個庫生態系統都在討論這一轉變。常見的擔憂?實現 Reactive Streams 語義絕非易事。我們觀察到對響應式基礎的需求日益增加,以解決訊息傳遞並實現常見的流式運算子。因此,我們為 Reactor Core 建立了一個專門的專案空間,並與 Spring Framework 團隊開始了一項重點工作。

從 2.5 開始,Reactor 現在被組織成多個專案,2.0.x 等維護分支保持不變。這反映在釋出管理中,例如Reactor Core 2.5 M1是唯一可用的里程碑,其他專案將隨其獨有的版本釋出。

為了支援這種新的專案模型,我們在http://projectreactor.io 上部署了一個新的、希望更受歡迎的網站。

響應流的協作式新方法

這種新的組織結構大大降低了參與專案活動的成本。該專案受益於 Spring API 設計協作以及來自 Sébastien DeleuzeBrian Clozel 的直接貢獻。Reactor 還歡迎新的外部貢獻者和評審員的幫助。

Reactor 2.5 中的依賴和協作
Reactor 2.5

Reactive Streams Commons

Reactive Streams Commons 儲存庫是一項開放研究工作,專注於 Reactive Extensions 及更多領域的效率,以實現 Reactive Streams 規範。它完全由 Reactor Core 和 Stream 內聯,它們作為這項工作所關注的許多變革的契約門戶。

"RSC" 因此是一個類似於 JCTools 處理併發佇列的自由形式專案。它最大的進展之一是“融合”協議,以減少響應式處理鏈中大多數同步和某些非同步階段的開銷。最後,這項工作幫助修復了上百個流式錯誤,我們的測試過程現在包括 RSC 單元/整合測試和 JMH 基準測試,並結合 Reactor 自己的整合測試和 基準測試

Reactor Core 2.5.0.M1

今天的 Reactor 部落格系列以一個令人愉快的事件開始:Reactor Core 2.5.0.M1 釋出!在其新的範圍和與 Reactive Streams Commons 的緊密聯絡下,Reactor Core 提供了足夠的 Rx 覆蓋範圍來構建響應式應用程式或庫,例如 Spring Reactive Web 支援。對於不耐煩的讀者,請檢視 GitHub 上已有的快速入門

快速瀏覽一下分散-聚集場景

Mono.from(userRequestPublisher)
    .then(userRepository::findUserProfile, 
          userRepository::findUserPaymentMethod)
    .log("user.requests")
    .or(Mono.delay(5)
            .then(n -> Mono.error(new TimeoutException()))
    .mergeWith(userRepository::findSimilarUserDetails)
    .map(userDetailsTuple -> userDetailsTuple.t1.username)
    .publishOn(SchedulerGroup.io())
    .subscribe(responseSubscriber);

詳情

  • Flux,一個具有精簡 Rx 範圍的 Publisher,用於釋出 0 到 N 個數據信號。運算子包括 create()interval()merge()zip()concat()switchOnError()switchOnEmpty()

Flux in action

  • Mono,一個具有精簡 Rx 派生範圍的 Publisher,適用於嚴格型別化這種特定數量性質的 0 或 1 個數據信號。運算子包括 delay()then()any()and()or()otherwise()otherwiseIfEmpty()where() 和阻塞的 get()

Mono in action

  • 基於純 Java 介面(Runnable、Callable)的新簡單排程契約。

-- 包含 SchedulerGroupTopicProcessorWorkQueueProcessor。 -- 取代了之前的 Enviroment/Dispatcher 組合,同時解決了相同的需求,並且很快將記錄簡單的遷移路徑。不再有靜態狀態持有排程器的引用。 -- 連結運算子:publishOn()dispatchOn()

  • 使用 TestSubscriber 支援 Publisher 源的測試。
  • CallableRunnableIterable、Java 8 CompletableFuture、Java 9 Flow.Publisher、RxJava 1 ObservableSingle 轉換為支援 Reactive Streams 的 FluxMono,無需額外的橋接依賴。
  • 全面修訂並整合的 Javadoc,包括略微調整的 marble 圖。
  • 一個微型工具包,包含實用工具和基礎 Subscriber,可隨意重用以實現您自己的響應式元件。

-- 一種經濟高效的定時器 API 和實現(雜湊輪定時器)。 -- 新的融合 API,用於虛擬地合併響應式鏈中的兩個或更多階段。 -- 一個經過調整的 QueueSupplier,將為正確的容量提供正確的佇列。

  • 基於狀態表示的新內省 API。

-- Publisher 日誌記錄,如果可用,則回退到 java.util.logging 或 SLF4J。可直接在 FluxMono 上使用 log() 運算子。 -- 與包括 Reactive Streams 在內的任何其他契約正交,一切都可以是 BackpressurableCompletable 或作為 Receiver 生成泛型 Object(可能是 Subscriber),這反過來又允許我們追蹤流的完整圖並使用狀態指示器對其進行增強:

下一步是什麼?

我們希望收集您的反饋意見,您可以訪問相應的 issues 儲存庫或加入我們最近建立的 Gitter 頻道。敬請關注下一篇關於 Reactor Stream 2.5.0.M1 的文章,它是 Reactive Streams 的完整 Rx 實現。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有