Reactor 2.5:JVM 的第二代響應式基礎

釋出 | Stephane Maldini | 2016年2月16日 | ...

進入響應式流時代

Reactor 2.0 的開發始於 2014 年底,與 Reactive Streams 大致在同一時間。我們熱衷於加入這項工作,並儘早採用 背壓協議,以緩解我們主要的訊息傳遞限制:有界容量。我們在 Reactor 2.0 中首次嘗試實現了基於 RingBuffer 的排程器的 Reactive Streams 版本,並衍生出一種越來越流行的響應式模式:Reactive Extensions

與此同時,Reactive Streams 開始獲得關注,整個庫生態系統都在討論這一轉變。常見的問題是什麼?實現 Reactive Streams 語義絕非易事。我們發現越來越需要一個響應式基礎來解決訊息傳遞問題並實現通用的流式運算子。因此,我們為 Reactor Core 建立了一個專門的專案空間,並與 Spring Framework 團隊一起開始了重點工作。

從 2.5 版本開始,Reactor 現在被 組織成多個專案,例如 2.0.x 等維護分支保持不變。這體現在版本釋出管理中,例如 Reactor Core 2.5 M1 是唯一可用的里程碑版本,其他專案將隨後釋出各自獨有的版本。

為了支援這個新的專案模型,我們在 http://projectreactor.io 部署了一個新的網站,希望能更加受歡迎。

協作式地重新審視 Reactive Streams

這種新的組織方式大大降低了參與專案活動的門檻。專案受益於 Spring API 設計方面的協作以及來自 Sébastien DeleuzeBrian Clozel 等人的直接貢獻。Reactor 也歡迎新的外部貢獻者和評審者的幫助

Reactor 2.5 中的依賴和協作
Reactor 2.5

Reactive Streams Commons

Reactive Streams Commons 倉庫 是一個開放的研究專案,致力於提高 Reactive Extensions 等在 Reactive Streams 規範 中的效率。它被 Reactor Core 和 Stream 完全內聯,作為該專案所關注的眾多革新的契約入口。

"RSC" 因此是一個自由形式的專案,類似於 JCTools 在併發佇列方面的做法。其最大的進展之一是一種“融合”(Fusion)協議,旨在減少響應式處理鏈中大多數同步和部分非同步階段的開銷。最後,這項工作幫助修復了一百多個流式錯誤,我們的測試流程現在包括 RSC 單元/整合測試和 JMH 基準測試,並結合 Reactor 自身的整合測試和 基準測試

Reactor Core 2.5.0.M1

今天的 Reactor 部落格系列以一個令人高興的事件開始:Reactor Core 2.5.0.M1 釋出!在其新範圍和與 Reactive Streams Commons 的緊密聯絡下,Reactor Core 提供了足夠的 Rx 支援,用於構建響應式應用或庫,例如 Spring Reactive Web 支援。對於急切的讀者,可以看看 github 上已經提供的 快速入門

快速瀏覽散射-聚集(scatter-gather)場景

Mono.from(userRequestPublisher)
    .then(userRepository::findUserProfile, 
          userRepository::findUserPaymentMethod)
    .log("user.requests")
    .or(Mono.delay(5)
            .then(n -> Mono.error(new TimeoutException()))
    .mergeWith(userRepository::findSimilarUserDetails)
    .map(userDetailsTuple -> userDetailsTuple.t1.username)
    .publishOn(SchedulerGroup.io())
    .subscribe(responseSubscriber);

詳細介紹

  • Flux,一個 釋出者 (Publisher),擁有精簡的 Rx 作用域,用於表示 0 到 N 個數據信號。運算子包括 create(), interval(), merge(), zip(), concat(), switchOnError()switchOnEmpty()

Flux in action

  • Mono,一個 釋出者 (Publisher),擁有精簡的 Rx 派生作用域,適用於強型別化這種特定數量(0 或 1 個)的訊號。運算子包括 delay(), then(), any(), and(), or(), otherwise(), otherwiseIfEmpty(), where() 以及一個阻塞式的 get()

Mono in action

  • 基於簡單的 Java 介面(Runnable, Callable)的新排程契約。

-- 包括 SchedulerGroup, TopicProcessorWorkQueueProcessor。-- 取代了之前的 Enviroment/Dispatcher 組合,同時滿足相同的需求,並且將很快提供簡單的遷移路徑。不再有持有 dispatcher 引用的靜態狀態。-- 相關運算子:publishOn()dispatchOn()

  • 使用 TestSubscriberPublisher 源提供測試支援。
  • Callable, Runnable, Iterable, Java 8 CompletableFuture, Java 9 Flow.Publisher, RxJava 1 ObservableSingle 轉換為相容 Reactive Streams 的 FluxMono,無需額外的橋接依賴。
  • 完全改版和整合的 Javadoc,包括略微調整的彈珠圖 (marble diagrams)。
  • 一個用於構建您自己的響應式元件的微型工具包,包含實用工具和基礎 Subscriber,可以隨意複用。

-- 一個經濟高效的 Timer API 和實現(雜湊輪定時器)。-- 新的 Fusion API,可以虛擬地合併響應式鏈中的兩個或多個階段 -- 一個經過調整的 QueueSupplier,將為正確的容量提供正確的佇列

  • 基於 狀態 表示的新自省 (Introspection) API。

-- Publisher 日誌記錄,如果可用,可回退到 java.util.logging 或 SLF4J。可以直接在 FluxMono 上使用 log() 運算子。-- 與包括 Reactive Streams 在內的任何其他契約正交,一切都可以是 Backpressurable,一個 Completable 或是一個生成通用物件(可能是 Subscriber)的 Receiver,這反過來允許我們追蹤流的完整圖並用狀態指標對其進行增強:

下一步計劃是什麼?

我們非常希望能收集您的反饋,您可以前往相應的 issues 倉庫或加入我們最近建立的 Gitter 頻道。請繼續關注下一篇關於 Reactor Stream 2.5.0.M1 的文章,它是 Reactive Streams 之上的完整 Rx 實現。

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

先行一步

VMware 提供培訓和認證,助力您的進步。

瞭解更多

獲取支援

Tanzu Spring 透過一項簡單的訂閱,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群中所有即將到來的活動。

檢視全部