先人一步
VMware 提供培訓和認證,助你加速進步。
瞭解更多Reactor 團隊很高興地宣佈,Reactor 靈活、非同步、快速資料框架的一些重大更新現已在 Reactor 的 1.1.0.RELEASE 版本中提供。該版本包含大量錯誤修復和關鍵元件的重寫,使其速度更快,也許更重要的是,在記憶體使用方面更高效。Reactor 1.1 現在包含了來自高盛的優秀 gs-collections
庫 [1],該庫為處理各種 map 和 collection 提供了非常流暢的 API。
以下是 Reactor 1.0 和 1.1 之間更改的不完全列表
相對於 1.0 版本,Stream API 中一些更有用的新增功能包括 Stream.window
和 Stream.timeout
方法。這允許你在給定時間內收集值並將它們傳遞到處理鏈下游。例如,要處理每 500ms 收集到的值,可以使用 window
Deferred<Pojo, Stream<Pojo>> in = Streams.defer(env);
// add all collected values every half-second
in.compose()
.window(500)
.consume(values -> service.addAll(values));
// another service emits data into the `Deferred`
Pojo p;
while(null != (p = input.next())) {
in.accept(p);
}
RingBuffer
的穩健 HashWheelTimer 實現gs-collections
5.0 的新 Consumer Registry 實現 [1]如果你需要以更可預測的方式控制記憶體使用,Reactor 包含一個分配 API,該 API 可以由你需要的任何特定池化實現提供支援。Reactor 1.1 提供了兩種實現:基於 RingBuffer 的 Allocator
和引用計數 Allocator
。
基於 RingBuffer 的 Allocator
可以配置得非常像帶有事件處理程式的標準 Disruptor RingBuffer
。但如果你只需要阻塞生產者並使用基於槽的分配策略,那麼使用 RingBuffer 進行分配非常簡單
Allocator<Event<Buffer>> pool = new RingBufferAllocatorSpec<Event<Buffer>>()
.ringSize(16 * 1024)
.allocator(() -> new Event<Buffer>(null))
.waitStrategy(new BusySpinWaitStrategy())
.get();
// in your code, maintain a `Reference` you can release
Reference<Event<Buffer>> ref = pool.allocate();
// pass your data POJO to other services
Event<Buffer> ev = ref.get().setData(buffer);
service.invoke(ev);
// when you're done, release the reference
ref.release();
日誌記錄對非同步應用程式的效能可能非常不利——特別是使用 RingBuffer 等技術的應用程式,RingBuffer 使用單個執行緒支援許多工。如果該執行緒被某個執行 IO 寫入日誌條目的任務阻塞,那麼這可能會在應用程式中級聯回來,導致其停滯不前。
Reactor 包含一個高效的 Logback 非同步 Appender
實現 [2],它將實際的追加操作轉移到專用的日誌記錄執行緒。這應該有助於減輕大多數應用程式中由日誌記錄引起的執行緒壓力。但有時這還不夠,需要更高吞吐量的解決方案。這時 Reactor 基於 Java Chronicle 的 Appender
就派上用場了。
Java Chronicle [3] 是一個高速訊息庫,它使用記憶體對映檔案實現快速高效的資料持久化。Reactor 透過提供一個 Appender
將其與 Logback 整合,該 Appender 記錄應用程式的原始事件資料,但無需呼叫下游 appender。這意味著你的日誌事件儲存在 Chronicle
中,但處於其原始狀態。需要一個額外的工具來後處理“持久化”日誌檔案,然後將這些事件傳送到“真實”appender(例如檔案或資料庫)或檢視 Chronicle
並查詢與給定模式匹配的條目。這在生產環境中非常有用,當應用程式正常執行時你不需要關心日誌記錄,但如果出現問題,你可以輕鬆地從 Chronicle
中提取資料到標準日誌檔案中進行事後分析。
要配置 Reactor DurableAsyncAppender
以進行高速日誌記錄,只需在 Logback 配置中宣告它即可。以下是在 logback.xml
配置中使用它的示例
<appender name="chronicle" class="reactor.logback.DurableAsyncAppender">
<!-- Uncomment to have log events also sent to a "normal" file appender -->
<!--appender-ref ref="logfile"/-->
<basePath>log/</basePath>
<backlog>2097152</backlog>
</appender>
如果出現問題,你可以使用包含的工具分析 chronicle,將從 chronicle 中提取的事件定向到給定的“真實” Appender
。此示例呼叫日誌工具(reactor-logback.jar
artifact 必須位於 classpath 中),並從 log/
目錄讀取持久化日誌檔案,從 logback.xml
讀取 Logback 配置,然後將所有 ERROR 訊息輸出到 logfile
appender,該 appender 在 logback.xml
配置檔案中定義。
java reactor.logback.DurableLogUtility --path log/ --config logback.xml --output logfile --level ERROR
Groovy 2.3.0 剛剛釋出,包含 大量新功能和效能改進 以及 lambda 閉包支援和其他酷炫的 JDK 8 功能。Reactor 的 Groovy 支援已準備好在 Groovy 2.3 中使用,同時仍然相容 JDK 7 上的 Groovy 2.2。
reactor-tcp
重新命名為 reactor-net
jeromq
添加了 ZeroMQ 支援reconnect
支援對 TCP 模組進行了改進,集成了對 UDP 的支援以及基於 ZeroMQ 的新實現。[4]
Reactor 中的 ZeroMQ 支援具有 tcp
和 inproc
支援,並提供簡潔流暢的 API,可使用 Reactor 高效的編解碼器工具快速建立客戶端和伺服器。
ZeroMQ<JsonData> zmq = new ZeroMQ<>(reactorEnv)
.codec(new JacksonJsonCodec());
zmq.router("inproc://queue")
.consume(channel -> channel.consume(service::invoke));
zmq.dealer("inproc://queue")
.consume(channel -> {
JsonData data;
while(null != (data = in.next())) {
channel.sendAndForget(data);
}
});
reactor-benchmark
專案artifacts 可以在 Maven Central 和 repo.spring.io/libs-release
中獲取。請注意,Spring 支援的座標在 1.1 版本中已更改為 org.projectreactor.spring:reactor-spring-*
[6]。
參考文件可在 GitHub wiki 中獲取。
更新的 API 文件可在 GitHub pages 站點 上獲取。
[1] - https://github.com/goldmansachs/gs-collections
[2] - http://logback.qos.ch/
[3] - https://github.com/OpenHFT/Java-Chronicle
[4] - http://zeromq.org/
[5] - http://openjdk.java.net/projects/code-tools/jmh/
[6] - http://repo.spring.io/libs-release/org/projectreactor/spring/