搶先一步
VMware 提供培訓和認證,助您快速進步。
瞭解更多Reactor 團隊非常興奮,終於能夠宣佈 Reactor 2.0 的一個初步里程碑版本!此更新在完全重寫的 Stream
和 Promise
API 中包含了完全相容 Reactive Streams 的實現!這對於 Reactor 使用者來說是巨大的一步。它打開了與 Akka Streams, Ratpack, RxJava 等其他 Reactive Streams 實現整合的可能性。Reactor 提供了一個堅實的基礎,可以在此基礎上構建具有高吞吐量和低延遲要求的現代 #uberfastdata
應用。
Reactor 2.0 的主要變化是 Stream API。事實上,程式碼庫的大部分其他部分在 1.1 和 2.0 之間只是進行了輕微改進或保持不變。但 Stream
和 Promise
則不同。這些元件已從頭完全重寫,以利用 Reactive Streams 規範,在功能性響應式流管道中提供完全非阻塞的背壓。
Akka 的著名人物 Dr. Roland Kuhn 在這個問題上發表過非常雄辯的演講,如果您有興趣探索非阻塞背壓背後的原因,我們鼓勵您觀看他關於該主題的會議演示,其中大部分可在 YouTube 上找到。
長話短說 (TL;DR)
背壓是釋出者和訂閱者關係的一種反轉,其中 Subscriber
對 Publisher
說“給我接下來的 N 個可用項”,而不是 Publisher
對 Subscriber
說“無論你是否能夠處理,都把這些項全部拿走”。由於 Publisher
是被動地向 Subscriber
提供資料元素,而不是反過來,因此(在完全 Reactive Streams 管道中)沒有必要緩衝資料,因為流入的資料永遠不會超過你能夠處理的量。實際上,一些緩衝或排隊是必要的,但像 Reactor 這樣的庫消除了你擔心如何實現這一點的需要,這樣你就可以編寫完全響應式的程式碼,隨著資料變得可用而做出響應,而不是試圖找出必須採用哪種神奇的 BlockingQueue
或其他低效方案的組合,以確保非同步元件彼此正確隔離。
Reactor 團隊投入了數量驚人的時間來全面實現 Reactive Streams 規範。Reactor 的 Stream
元件為您提供了有用且易於理解的鉤子,您可以將業務邏輯掛在上面,這樣您只需關注編寫適當範圍的功能元件,該元件將響應單個數據元素,而無需用大量樣板式的邏輯來處理資料從一個執行緒傳遞到另一個執行緒、執行有界佇列和緩衝以及其他在處理響應式非同步元件時通常必要的各種任務。
關於這在您的程式碼中如何體現的示例,可以在 Reactive Geocoder Demo 中找到,Reactor 團隊在今年德克薩斯州達拉斯舉行的 SpringOne 會議上討論了該示例(回放可在 InfoQ 上供 SpringOne2GX 2014 參會者觀看,稍後將對公眾開放)。
以下是一個小片段,展示瞭如何建立新的 Stream
,為其附加業務邏輯,然後向其釋出資料。
// by default Streams use the Disruptor RingBufferDispatcher
HotStream<String> helloStream = Streams.defer(env);
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
helloStream.broadcastNext("World");
當您執行此程式碼時,您將看到日誌中打印出“Hello World!”文字。您還應該注意到日誌是從 RingBuffer 執行緒進行的,而不是從您的主執行緒。換句話說,您剛剛提交了一個任務到另一個執行緒進行非同步執行,將其結果轉換為其他內容,然後使用 Reactive Streams 非阻塞、基於需求的背壓響應了結果,沒有任何基於 Future 的嘈雜阻塞程式碼!
您也可以建立“冷”流,這與使用 RxJava 的 Observable
非常相似。
// stream contains the single value "Hello World!"
Stream<String> helloStream = Streams.just("World");
helloStream.map(s -> "Hello " + s + "!")
.consume(log::info);
當您執行此程式碼時,您將看到日誌中打印出“Hello World!”文字,與上一個示例類似。這裡的區別在於我們無需呼叫 broadcastNext(String)
方法,因為當我們附加 Consumer<String>
時,這一切都已為我們處理。您可以從任何值或值的集合建立流,就像建立 RxJava Observable
一樣。這使您可以將標準的 Java Collection API 與響應式流 API 混合使用。
像 Spark、Storm 和其他大資料庫這樣的流 API 證明,在資源有限的系統(基本上就是我們在雲中執行的任何東西)上執行時,以更具功能性和響應式的方式處理資料更有效,並且由於用於構建處理管道的 DSL 具有宣告性和自文件化的特性,在許多情況下也更容易理解。當你將業務邏輯提煉到其本質時,你會注意到很少有事物不能表達為轉換或消費函式。你要麼接受輸入併產生輸出,要麼只接受輸入。Reactor 的 Stream API 根植於這種正規化,因此提供了豐富(誰能告訴我這是哪部電影臺詞:“你會說我有很多皮納塔嗎?”)的選擇,用於處理流經管道的資料。除了像 map(Function<T,V>)
和 filter(Predicate<T>)
這樣的簡單函式外,還有更復雜的選項,如 buffer(int)
或 buffer(int, long, TimeUnit)
。後者提供了極其有用的基於長度和時間的“微批處理”。例如,對於透過 WAN 連線傳送開銷較大的資料庫更新集,您可能希望緩衝它們,直到收集到設定的數量或經過一定的超時時間。
// create a stream backed by a load-balanced, round-robin assigned Dispatcher
Stream<Update> updateStream = Streams.defer(env, env.getDefaultDispatcherFactory().get());
updateStream.buffer(1024, 350, TimeUnit.MILLISECONDS)
.consume(driver::batchUpdate);
這將收集流式更新,直到收集到 1024 個或者經過 350 毫秒,以先到者為準。然後,它將透過傳遞一個包含 1024 個元素或在 350 毫秒內收集到的元素數量的 List<Update>
來觸發下游處理。這使您可以編寫非常高效的系統,以批處理方式處理大量資料,以最大程度地減少網路頻寬使用並最大化吞吐量(同時仍保持可預測的延遲)。
除了微批處理,Stream
還提供了諸如 filter
, flatMap
, movingBuffer
, join
, merge
, sample
, sort
等許多其他操作,這些操作大多不言自明。很像 Scala 的集合 API 或 RxJava 的 Observable,Reactor 的 Stream
提供功能性和響應式的方法,以快速、高效、高吞吐量地處理資料,同時保持可預測的低延遲。毫不誇張地說,您可以使用 Stream
作為基礎元件來編寫整個應用程式,該元件用於提交非同步任務執行,並以響應式方式處理傳統資料集合——然後透過結合即時資料和歷史資料來混合這兩種方法。
有時需要將資料流分割成並行管道進行併發處理。Reactor 的 Stream
透過 parallel(int)
操作提供了極其便捷的方式來實現這一點。您只需將業務邏輯附加到 parallel
呼叫後提供的 Stream
上,資料就會在下游管道之間進行輪詢,以進行併發處理。
HotStream<String> stream = Streams.defer(env);
// by default uses number of CPUs as thread count
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info));
以下是一個有趣的 Reactive Streams 實現示例,展示了它在您的程式碼中如何體現:當您執行此程式碼時,您不會得到任何輸出。.parallel()
操作不會在管道上建立“需求”。在 Reactive Streams 系統中,是由管道的末端將資料拉入操作,而不是由生產者推送。由於此管道末端沒有終端操作,資料無法被拉出。在實際應用中,這通常不是問題,因為您實際上希望處理資料。在此示例中,我們只需在 .parallel()
之後新增一個 .drain()
呼叫即可產生需求並將資料拉出。我們在生產系統中不太可能這樣做,但在測試和演示中,我們可以透過 drain 輕鬆解決問題。
stream.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
對於響應式系統,有時很難理解為什麼事情不像預期的那樣工作。雖然庫在改善 IDE 內非同步流的即時除錯過程方面能做的事情不多,但總有屢試不爽的方法,那就是大量的日誌記錄。Reactor 添加了兩個有點隱藏的方法,名為 .debug()
和 .log()
,它們應該能幫助您弄清楚流是如何構建的以及它們正在做什麼。.debug()
方法將輸出流的連線方式。它會顯示哪些操作連線到哪些操作,以及每個操作當前可用的容量。.log()
方法會為您的流附加一個日誌操作,並輸出訂閱和釋出事件。
如果在上面的示例中,我們在 .parallel()
呼叫之前新增一個 .log()
呼叫,我們將獲得額外的日誌資訊,告訴我們正在發生什麼。
stream.log()
.parallel(substream -> substream.map(greeting -> "Hello " + greeting + "!")
.consume(log::info))
.drain();
將產生
[ringBuffer-1] INFO r.r.a.LoggerAction - onSubscribe: {capacity=0/8188 [0%], current=0, pending=0, waiting=0} [main] INFO r.r.a.LoggerAction - subscribe: ConcurrentAction-{dispatcher=RingBuffer:8192, max-capacity=8188} [ringBuffer-1] INFO r.r.a.LoggerAction - request: 9223372036854775807 [ringBuffer-1] INFO r.r.a.LoggerAction - onNext: World [ringBufferGroup-2] INFO r.r.StreamTests - Hello World!
要將現有應用程序升級到 Reactor 2.0,您可能只需要調整幾處。如果您在 Reactor 1.1 中使用了 stream,您會發現 Reactor 2.0 的 stream 在值的釋出方式上有所不同。.broadcastNext()
方法定義在 Action
子類和 HotStream
上,而不是其他一些操作上。Reactor 1.1 使用 Deferred
釋出值,因此您的程式碼需要進行調整,將釋出者型別更改為可以訪問 .broadcastNext()
方法的型別。如果您使用普通的 Reactor
或基於 Spring 和註解的事件處理,您幾乎不需要更改任何東西。
要訪問里程碑構件,請在您選擇的構建系統中使用 http://repo.spring.io/libs-milestone
倉庫。例如,如果使用 Gradle(當然您在使用 Gradle,對吧?),只需像這樣配置您的 repositories
塊
repositories {
maven { url 'http://repo.spring.io/libs-milestone' }
mavenCentral()
}
要報告 bug、關注 Reactor 2.0 的開發、閱讀 wiki 或以其他方式參與 Reactor 社群,請訪問 Reactor 的 GitHub 主頁:https://github.com/reactor/reactor。您還可以線上閱讀 JavaDoc:http://reactor.github.io/docs/api/2.0.0.M1/index.html