領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多Reactor 團隊很高興地宣佈,Reactor 靈活、非同步、快速資料框架的 1.1.0.RELEASE 版本現已推出了一些重要更新。此版本包含了大量的 bug 修復和關鍵元件的重寫,以使其更快,也許更重要的是,在記憶體使用方面更高效。Reactor 1.1 現在包含了高盛 [1] 出色的 gs-collections 庫,它為處理各種對映和集合提供了非常流暢的 API。
以下是 Reactor 1.0 和 1.1 之間變更的非詳盡列表
與 1.0 版本相比,Stream API 最有用的新增功能包括 Stream.window 和 Stream.timeout 方法。這允許您在給定時間內收集值並將其傳遞到處理鏈中。例如,要每 500 毫秒處理一次收集到的值,請使用 window
Deferred<Pojo, Stream<Pojo>> in = Streams.defer(env);
// add all collected values every half-second
in.compose()
.window(500)
.consume(values -> service.addAll(values));
// another service emits data into the `Deferred`
Pojo p;
while(null != (p = input.next())) {
in.accept(p);
}
RingBuffer 的健壯 HashWheelTimer 實現gs-collections 5.0 [1] 的新 Consumer Registry 實現如果您需要以更可預測的方式控制記憶體使用,Reactor 包含一個分配 API,該 API 可以由您需要的任何特定池實現支援。Reactor 1.1 提供了兩種實現:基於 RingBuffer 的 Allocator 和基於引用計數的 Allocator。
基於 RingBuffer 的 Allocator 可以配置為非常類似於帶有事件處理程式的標準 Disruptor RingBuffer。但是,如果您只需要阻塞生產者並使用基於槽的分配策略,那麼使用 RingBuffer 進行分配就非常簡單了
Allocator<Event<Buffer>> pool = new RingBufferAllocatorSpec<Event<Buffer>>()
.ringSize(16 * 1024)
.allocator(() -> new Event<Buffer>(null))
.waitStrategy(new BusySpinWaitStrategy())
.get();
// in your code, maintain a `Reference` you can release
Reference<Event<Buffer>> ref = pool.allocate();
// pass your data POJO to other services
Event<Buffer> ev = ref.get().setData(buffer);
service.invoke(ev);
// when you're done, release the reference
ref.release();
日誌記錄對非同步應用程式的效能非常有害——特別是對於使用 RingBuffer 等技術(使用單個執行緒支援許多工)的應用程式。如果該執行緒被某個任務在寫入日誌條目時進行 IO 阻塞,那麼這可能會反向級聯到應用程式中,導致應用程式停滯。
Reactor 包含一個高效的 Logback [2] 非同步 Appender 實現,它將實際的附加操作轉移到專用的日誌執行緒上。這應該有助於緩解大多數應用程式中由日誌記錄引起的執行緒壓力。但有時這還不夠,需要更高吞吐量的解決方案。這就是 Reactor 基於 Java Chronicle 的 Appender 發揮作用的地方。
Java Chronicle [3] 是一個高速訊息庫,它使用記憶體對映檔案實現快速高效的資料持久化。Reactor 透過提供一個 Appender 將其與 Logback 整合,該 Appender 記錄應用程式的原始事件資料,但無需呼叫下游 appender。這意味著您的日誌事件儲存在 Chronicle 中,但處於原始狀態。需要一個額外的實用程式來後處理“持久”日誌檔案,然後將這些事件傳送到“真實”的 appender(例如檔案或資料庫),或者檢視 Chronicle 並查詢與給定模式匹配的條目。這在生產環境中非常有用,在應用程式正常執行時您不關心日誌記錄,但如果出現問題,您可以輕鬆地將資料從 Chronicle 提取到標準日誌檔案中進行取證分析。
要配置 Reactor DurableAsyncAppender 進行高速日誌記錄,只需在 Logback 配置中宣告它。以下是 logback.xml 配置中使用的示例
<appender name="chronicle" class="reactor.logback.DurableAsyncAppender">
<!-- Uncomment to have log events also sent to a "normal" file appender -->
<!--appender-ref ref="logfile"/-->
<basePath>log/</basePath>
<backlog>2097152</backlog>
</appender>
如果出現問題,您可以使用隨附的實用程式分析 chronicle,方法是將從 chronicle 中提取的事件定向到給定的“真實” Appender。此示例呼叫日誌實用程式(reactor-logback.jar 工件必須在類路徑上),並從 log/ 目錄讀取持久日誌檔案,從 logback.xml 讀取 Logback 配置,然後將所有 ERROR 訊息輸出到 logfile appender,該 appender 在 logback.xml 配置檔案中定義。
java reactor.logback.DurableLogUtility --path log/ --config logback.xml --output logfile --level ERROR
Groovy 2.3.0 剛剛釋出,它包含大量新功能和效能改進,以及 lambda 閉包支援和其他酷炫的 JDK 8 功能。Reactor 的 Groovy 支援已準備好在 Groovy 2.3 中使用,同時仍相容 JDK 7 上的 Groovy 2.2。
reactor-tcp 重新命名為 reactor-netjeromq 添加了 ZeroMQ 支援reconnect 支援TCP 模組已進行改進,其中包括對 UDP 的支援以及基於 ZeroMQ [4] 的新實現。
Reactor 中的 ZeroMQ 支援具有 tcp 和 inproc 支援,並提供簡潔流暢的 API,可以使用 Reactor 高效的編解碼器設施快速建立客戶端和伺服器。
ZeroMQ<JsonData> zmq = new ZeroMQ<>(reactorEnv)
.codec(new JacksonJsonCodec());
zmq.router("inproc://queue")
.consume(channel -> channel.consume(service::invoke));
zmq.dealer("inproc://queue")
.consume(channel -> {
JsonData data;
while(null != (data = in.next())) {
channel.sendAndForget(data);
}
});
reactor-benchmark 專案Artifacts 可在 Maven Central 和 repo.spring.io/libs-release 中找到。請注意,Spring 支援的座標在 1.1 版本中已更改為 org.projectreactor.spring:reactor-spring-* [6]。
參考文件可在 GitHub wiki 中找到。
更新的 API 文件可在 GitHub pages 網站上找到。
[1] - https://github.com/goldmansachs/gs-collections
[2] - http://logback.qos.ch/
[3] - https://github.com/OpenHFT/Java-Chronicle
[4] - http://zeromq.org/
[5] - http://openjdk.java.net/projects/code-tools/jmh/
[6] - http://repo.spring.io/libs-release/org/projectreactor/spring/