領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多Reactor 團隊非常興奮地宣佈 Reactor 2.0 的初始里程碑版本!此更新包括在完全重寫的 Stream 和 Promise API 中實現完全相容的 Reactive Streams!這對 Reactor 使用者來說是一個巨大的進步。它開啟了與其他 Reactive Streams 實現(如 Akka Streams、Ratpack、RxJava 等)的整合。Reactor 為構建具有高吞吐量和低延遲要求的現代 #uberfastdata 應用程式提供了堅實的基礎。
Reactor 2.0 的主要變化是 Stream API。事實上,1.1 和 2.0 之間,程式碼庫的其他大部分要麼只是略有改進,要麼保持不變。但 Stream 和 Promise 則不同。這些元件已經完全重寫,從頭開始利用 Reactive Streams 規範,在功能性響應式流管道中提供完全非阻塞的背壓。
Akka 創始人 Roland Kuhn 博士對此話題有過非常精彩的闡述,如果您有興趣瞭解非阻塞背壓背後的原因,我們鼓勵您觀看他在會議上的相關演講,其中大部分都可以在 YouTube 上找到。
TL;DR
背壓是釋出者和訂閱者關係的一種反轉,其中 Subscriber 對 Publisher 說“給我接下來的 N 個可用項”,而不是 Publisher 對 Subscriber 說“無論你是否能處理,都拿走我所有的這些項”。由於 Publisher 是被動地向 Subscriber 提供資料元素,而不是反過來,因此(在完全 Reactive Streams 管道中)不需要緩衝資料,因為你永遠不會有超過你所能處理的資料。實際上,一些緩衝或排隊是必要的,但像 Reactor 這樣的庫讓你無需擔心如何實現這一點,這樣你就可以編寫完全響應式的程式碼,在資料可用時對其進行響應,而無需費心找出 BlockingQueue 或其他低效方案的神秘組合,以確保非同步元件彼此正確隔離。
Reactor 團隊投入了大量令人沮喪的時間,實現了 Reactive Streams 規範的全面實現。Reactor 的 Stream 元件為您提供了有用且易於理解的掛鉤,您可以在其上掛載您的業務邏輯,這樣您只需關注編寫適當作用域的功能元件,該元件將響應單個數據元素,而無需用大量樣板式的物流程式碼來處理資料在不同執行緒之間的傳遞、執行有界佇列和緩衝以及處理響應式非同步元件通常所需的各種其他任務。
您可以在響應式地理編碼器演示中找到一個程式碼示例,Reactor 團隊在今年的達拉斯 SpringOne 會議上討論了該演示(回放可在 InfoQ 上供 SpringOne2GX 2014 與會者觀看,稍後將公開)。
以下是一個小片段,展示瞭如何建立一個新的 Stream,將業務邏輯附加到它,然後將資料釋出到其中。
// by default Streams use the Disruptor RingBufferDispatcher
HotStream<String> helloStream = Streams.defer(env);
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
helloStream.broadcastNext("World");
當你執行這段程式碼時,你將看到“Hello World!”的文字被記錄。你還應該注意到日誌是從 RingBuffer 執行緒而不是你的主執行緒進行的。換句話說,你剛剛提交了一個任務到另一個執行緒非同步執行,將結果轉換為其他內容,然後使用 Reactive Streams 非阻塞、基於需求的背壓響應結果,而沒有任何嘈雜的基於 Future 的阻塞程式碼!
您也可以建立“冷”流,這與使用 RxJava 的 Observable 非常相似。
// stream contains the single value "Hello World!"
Stream<String> helloStream = Streams.just("World");
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
執行此程式碼時,您會看到“Hello World!”文字被記錄,與前面的示例類似。這裡的區別在於我們從未呼叫 broadcastNext(String) 方法,因為當我們附加 Consumer<String> 時,它已經為我們處理了。您可以像建立 RxJava Observable 一樣,從任何值或值集合建立流。這使您可以將標準 Java Collection API 與響應式流 API 結合使用。
像 Spark、Storm 和其他大資料庫這樣的流式 API 證明,在資源有限的系統(基本上是我們所有在雲中執行的系統)上,以更函式式和響應式的方式處理資料效率更高,而且由於用於構建處理管道的 DSL 具有宣告式、自文件的特性,因此(在許多情況下)也更容易理解。當你將業務邏輯提煉到其本質時,你確實會注意到沒有多少事情不能表達為轉換或消費者函式。你要麼接收輸入併產生輸出,要麼只是接收輸入。Reactor 的 Stream API 沉浸在這種正規化中,因此為您提供了大量的選項(誰能告訴我這個電影引用:“你認為我有大量的皮納塔嗎?”)來處理資料,因為它透過您的管道。除了像 map(Function<T,V>) 和 filter(Predicate<T>) 這樣的簡單函式之外,還有更復雜的選項,如 buffer(int) 或 buffer(int, long, TimeUnit)。後者提供了極其有用的基於長度和時間的“微批處理”。例如,為了微批處理一組傳送到廣域網連線代價高昂的資料庫更新,您可能希望將它們緩衝起來,直到達到設定數量或經過一定超時。
// create a stream backed by a load-balanced, round-robin assigned Dispatcher
Stream<Update> updateStream = Streams.defer(env, env.getDefaultDispatcherFactory().get());
updateStream.buffer(1024, 350, TimeUnit.MILLISECONDS)
.consume(driver::batchUpdate);
這將收集流式更新,直到收集到 1024 個更新或 350 毫秒已過去,以先發生者為準。然後,它將透過傳遞一個包含 1024 個元素或 350 毫秒內收集到的元素數量的 List<Update> 來觸發下游處理。這使您能夠編寫非常高效的系統,以批處理方式處理大量資料,以最小化網路頻寬使用並最大化吞吐量(同時仍保持可預測的延遲)。
除了微批處理,Stream 還提供了諸如 filter、flatMap、movingBuffer、join、merge、sample、sort 等許多自解釋的操作。與 Scala 的集合 API 或 RxJava 的 Observable 非常相似,Reactor 的 Stream 提供了功能性和響應式的方式來快速、高效、以極高的吞吐量處理資料,同時保持可預測的低延遲。毫不誇張地說,您可以使用 Stream 作為基礎元件來編寫整個應用程式,它既可以用於提交非同步任務執行,也可以以響應式方式處理傳統資料集合——然後透過結合即時資料和歷史資料來混合這兩種方法。
有時需要將資料流拆分為並行管道以進行併發處理。Reactor 的 Stream 透過 parallel(int) 操作提供了極其便捷的方式。您只需將業務邏輯附加到 parallel 呼叫後提供的 Stream,資料將在下游管道之間進行輪詢,以進行併發處理。
HotStream<String> stream = Streams.defer(env);
// by default uses number of CPUs as thread count
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info));
這裡有一個有趣的 Reactive Streams 實現示例,它在您的程式碼中展現出來:當您執行這段程式碼時,您將不會得到任何輸出。.parallel() 操作不會在管道上建立“需求”。在 Reactive Streams 系統中,是管道的末端將資料拉入操作,而不是生產者將其推出。由於此管道的末端沒有終端操作,因此資料無法被拉過。實際上,這通常不是問題,因為在實際應用程式中您確實想要處理資料。在此示例中,我們可以在 .parallel() 之後新增 .drain() 呼叫來產生需求並拉取資料。我們可能不會在生產系統中使用這種方法,但對於測試和演示,我們可以透過 drain 輕鬆實現。
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
使用響應式系統時,有時會很沮喪,不明白為什麼事情沒有按預期工作。雖然庫無法在 IDE 中更好地除錯非同步流方面做太多事情,但總有經過驗證的“大量日誌記錄”方法。Reactor 添加了幾個有點隱藏的方法,稱為 .debug() 和 .log(),它們應該可以幫助您弄清楚您的流是如何構建以及它們正在做什麼。.debug() 方法將為您提供流如何連線的輸出。它將顯示哪些操作連線到哪些操作,以及每個操作中當前可用的容量。.log() 方法將日誌記錄操作附加到您的流並輸出訂閱和釋出事件。
如果我們在上面的示例中,在 .parallel() 之前新增一個 .log() 呼叫,我們將獲得額外的日誌,告訴我們發生了什麼。
stream.log()
.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
將產生
[ringBuffer-1] INFO r.r.a.LoggerAction - onSubscribe: {capacity=0/8188 [0%], current=0, pending=0, waiting=0}
[main] INFO r.r.a.LoggerAction - subscribe: ConcurrentAction-{dispatcher=RingBuffer:8192, max-capacity=8188}
[ringBuffer-1] INFO r.r.a.LoggerAction - request: 9223372036854775807
[ringBuffer-1] INFO r.r.a.LoggerAction - onNext: World
[ringBufferGroup-2] INFO r.r.StreamTests - Hello World!
要將現有應用程序升級到 Reactor 2.0,您可能只需調整幾處。如果您在 Reactor 1.1 中使用流,您會發現 Reactor 2.0 流在值釋出方面有所不同。.broadcastNext() 方法在 Action 子類和 HotStream 上定義,但不在其他一些操作上。Reactor 1.1 使用 Deferred 來發布值,因此您的程式碼需要調整,將釋出者型別更改為可以訪問 .broadcastNext() 方法的型別。如果您使用的是普通的 Reactor 或基於 Spring 和註解的事件處理,則幾乎不需要更改任何內容。
要訪問里程碑工件,請在您選擇的構建系統中使用 http://repo.spring.io/libs-milestone 儲存庫。例如,如果您正在使用 Gradle(當然您正在使用 Gradle,對吧?),只需像這樣配置您的 repositories 塊:
repositories {
maven { url 'http://repo.spring.io/libs-milestone' }
mavenCentral()
}
要報告錯誤、關注 Reactor 2.0 的開發、閱讀 wiki 或以其他方式參與 Reactor 社群,請訪問 Reactor 的 GitHub 主頁:https://github.com/reactor/reactor。您還可以線上閱讀 JavaDoc:http://reactor.github.io/docs/api/2.0.0.M1/index.html