領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多我很高興地宣佈,Reactor,一個用於在 JVM 上構建響應式、快速資料應用程式的強大基礎庫,已達到 GA 版本!
Reactor 提供了必要的抽象,用於構建高吞吐量、低延遲(我們現在稱之為“快速資料”)的應用程式,這些應用程式必須處理每秒數千、數萬甚至數百萬的併發請求。
您應該關注 Reactor,因為現代應用程式(帶有非人類消費者,如手機及在其上執行的應用程式)產生的資料量超過了傳統“每連線一執行緒”伺服器所能支援的範圍,因此 Reactor 為您提供了構建這些高規模應用程式所需的工具和抽象,而無需陷入管理狀態和在非同步應用程式中傳遞事件的繁瑣物流。現代 JVM 應用程式必須建立在非同步和響應式元件的堅實基礎上,這些元件可以在極少數系統執行緒上高效管理大量任務的執行。Reactor 專門設計用於幫助您構建這類應用程式,而不會阻礙您或強迫您遵循某種特定的模式。
Reactor 本身深受同名知名設計模式的影響,但其靈感並非僅來源於該模式。它也包含Actor 模型和傳統基於事件的回撥程式設計的元素。
儘管它是 Spring IO 平臺基礎的一部分,但 Reactor 核心庫不依賴於 Spring。Reactor 核心是一個獨立的庫,其唯一的外部依賴是 SLF4J 和出色的 LMAX Disruptor RingBuffer 庫。
在 Reactor 核心之上構建了其他可選元件,以方便根據常見模式開發應用程式。Reactor 的一些內建一流支援包括:
@CompileStatic,並提供全面的環境構建和事件連線 DSL。Reactor 從一開始就被設計為靈活高效,以便它能夠讓您擺脫困擾,並幫助您以最快的速度處理應用程式中的資料。在其最快的配置下,一個標準的支援 RingBuffer 的 Reactor 在一臺標準開發筆記型電腦上每秒可以釋出超過 10-15 百萬個事件。高效能的 Processor 抽象可以每秒向您的應用程式注入超過 1 億個事件。您的應用程式如何處理資料以減慢 Reactor 的速度,這取決於任務。但在最佳的空操作模式下,其吞吐量如此之高,應用程式不會因為等待 Reactor 完成工作而停滯!
Reactor 核心包含一些基本抽象,這些抽象受到 JDK 8 新的函式式抽象(如 Function<T,V>、Consumer<T>、Supplier<T> 和 Predicate<T>)的啟發(在某些情況下直接基於這些抽象)。Reactor 本身不僅建立在這些抽象的基礎上,您的應用程式也可以利用它們。未來某個時候,JDK 8 的普及程度將足夠高,屆時 Reactor 可以簡單地從 Reactor 中刪除這些抽象,並依賴 JDK 8 中的抽象。在此之前,您的 JDK 6 和 7 應用程式現在就可以從這些函式式抽象中受益。
受 .NET 的 Reactive Extensions、Netflix 的 RxJava、JDK 8 的 Stream 抽象以及許多其他庫(更不用說 20 年的事件驅動計算機科學)的啟發,Reactor 提供了一種“響應式”程式設計模型,使協調非同步任務變得更加容易。Stream<T> 和 Promise<T> 等抽象使鏈式非阻塞操作變得簡單而簡潔——而且沒有回撥地獄!
@Inject
AsyncDataLoader loader;
Promise<Buffer> p = loader.get("U-U-I-D")
.map(new Function<Buffer, Data>() {
public Data apply(Buffer buff) {
// transform data
Data data = parser.parse(buff);
return data;
}
})
.filter(new Predicate<Data>() {
public boolean test(Data data) {
// check Data for certain conditions being true
return null != data.getName();
}
})
.consume(new Consumer<Data>() {
public void accept(Data data) {
// only Data that passes the Predicate test makes it here...
}
});
// Promises can also block like a Future
Buffer buff = p.await();
這些操作(map、filter、consume)都是獨立執行的(可能)非同步操作。在傳統的多執行緒環境中,需要新增大量圍繞 Future 阻塞和等待完成的嘈雜程式碼。然而,使用 Reactor,您只需以響應式方式將操作連結在一起,以便當前一個操作完成時,下一個操作將“響應”資料。
Reactor 包含對 Groovy 語言的一流支援。它支援使用閉包作為回撥,為配置 Reactor 環境提供了強大的 DSL,並提供了一些非常酷的運算子過載,用於編寫簡潔的程式碼。
Clojurewerkz 有一個名為 Meltdown 的庫,它是基於 Reactor 的。新增其他 JVM 語言支援無需太多工作。Reactor 的 API 被設計為可擴充套件的,以便非 Java 語言可以從 Reactor 的工具中受益。
Reactor 已經為 Java 8 做好準備,所以我們首先來看一些使用 JDK 8 出色 Lambda 特性的 Reactor 程式碼
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.<String>on($("topic"), ev -> System.out.prinltn("greeting: " + ev.getData()));
r.notify("topic", Event.wrap("Hello World!"));
Reactor 希望實現的目標之一是減少您需要編寫的程式碼量;上面已經非常簡潔了。但即使在 Java 6 和 7 中,它也同樣非常簡潔
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.on($("topic"), new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
System.out.prinltn("greeting: " + ev.getData());
}
});
r.notify("topic", Event.wrap("Hello World!"));
在 Groovy 中,它甚至更簡潔(正如你所期望的),因為語言支援負責將一些物件轉換為正確的型別並允許使用閉包
def env = new Environment()
def r = Reactors.reactor(env).on("topic") { String greeting ->
println "greeting: $greeting"
}
r.notify "topic", "Hello World!"
Dispatcher 負責在給定的 Thread 上執行任務。有各種內建的 Dispatcher 實現,它們可以在呼叫執行緒中、從執行緒池中、使用單執行緒事件迴圈式排程,或者最快的排程器:使用 LMAX Disruptor RingBuffer 排程任務的 RingBufferDispatcher。
每當您在 Reactor 中建立一個元件時,通常會指定在排程事件時使用的 Dispatcher。在處理高容量應用程式時,使用執行緒池可能會變得非常消耗 CPU 和 GC,而事件排程到 RingBuffer 中則效率極高。使用 RingBufferDispatcher 每秒可以排程數千萬個事件。
Selector 是將操作動態對映到事件鍵的元件。當您將一個操作分配給 Reactor 時,您透過註冊一個 Selector 來告訴它響應哪些事件鍵。有幾種內建實現,它們可以匹配諸如 Object.equals()、進行基於字串的正則表示式匹配、URI 模板匹配(因此您可以使用熟悉的括號分隔的佔位符表示法來匹配 URI)、Class.isAssignableFrom() 匹配(僅選擇那些從公共抽象派生的鍵)、Predicate 匹配(允許您基於作用域謂詞建立任意的 Predicate<T> 選擇器),甚至還有一個可選的 JsonPathSelector,它使用 JsonPath 透過 JsonPath 表示式查詢鍵中的資料。
您在示例中可能已經注意到,作為 Java 開發人員,您可能會對使用 $ 快捷方法建立 Selector [1] 感到有些困惑。如果您曾使用 jQuery 進行 Web 開發,那麼您會感到非常熟悉,因為 $ 方法只是建立 Selector 的快捷方式,就像 jQuery 在編寫 $(".css-class") 等時建立 CSS 查詢一樣。如果美元符號對您來說太不尋常,Reactor 總是試圖提供不止一種方法來完成任務;您可以使用 Selectors.object(T) 或 ObjectSelector.objectSelector() 靜態建立方法(或者直接使用建構函式例項化 ObjectSelector)。
[1]: 除了 $(T),還有其他用於建立 Selectors 的快捷輔助方法。例如,R(String) 用於建立 RegexSelectors,T(Class<?>) 用於建立 ClassSelectors,以及 U(String) 用於建立 UriTemplateSelectors。
Reactor 的 Promise 和 Stream 提供了一種響應式、組合式的方式來協調多個非同步任務,而不會產生過多的回撥地獄。Promise 是一個有狀態元件,可以在您的應用程式中傳遞,它代表一個將從另一個執行緒填充的值。與傳統的 Future 一樣,Promise 可以阻塞呼叫執行緒。但更重要的是,Promise 使轉換值和執行整個處理鏈變得容易。
Stream 與 Promise 類似,它提供了一個組合 API 來響應未來的值。但 Stream 與 Promise 的不同之處在於,它旨在處理多個值流經。
要在 Promise 或 Stream 中填充值,您需要建立一個 Deferred,它是一個 Consumer<T>。您可以將此 Deferred 傳遞到您的服務層,以將最終值傳回給呼叫者。
// Only create one of these per JVM
static Environment env = new Environment();
public class DataLoader {
public Promise<Buffer> load(String key) {
Deferred<Buffer, Promise<Buffer>> deferred = Promises.defer(env);
// submit work to be done in another thread
// like reading data from a datastore
datastore.load(key, deferred);
return deferred.compose();
}
}
// Your service layer uses this API
@Inject
DataLoader loader;
loader.load("obj-key")
.onSuccess(new Consumer<Buffer>() {
public void accept(Buffer b) {
// handle eventual data
}
})
.onError(new Consumer<Throwable>() {
public void accept(Throwable t) {
// handle errors
}
});
Scala 的 Tuple 類是一種型別安全的方式,用於傳遞封裝其他值的單個物件,而無需建立應用程式特定的、一次性使用的“持有者”Bean。Reactor 將此功能融入其對 Tuple 類的自己的解釋中。
元組使用起來極其簡單。您可以使用 Tuple.from(T1, T2, …) 方法建立一個元組,並可以使用 Tuple.getT1() 到 Tuple.getTN() 方法從中獲取值。
reactor.on($("topic"), new Consumer<Event<Tuple2<URI, Buffer>>>() {
public void accept(Event<Tuple2<URI, Buffer>> ev) {
URI uri = tup.getT1();
Buffer buff = tup.getT2();
// deal with request from uri.getPath()
}
});
// notify consumers of new request
reactor.notify("topic", Event.wrap(Tuple.from(requestUri, request)));
請查閱 Tuple API 文件,瞭解所有可能性。
Reactor 提供了功能齊全的 TCP 客戶端和伺服器抽象。它們提供了一種簡單的方法來構建能夠支援大量客戶端的基於 TCP 的應用程式。Reactor TCP 支援中的基本抽象是通用的,可以建立多個實現來利用不同的 TCP 技術。但是,內建實現利用出色的 Netty 庫進行非同步 IO。
Reactor 是開源的,採用 Apache 許可。開發人員和使用者社群只是一群普通人,他們希望共同努力,為在 JVM 上構建響應式、快速資料應用程式奠定堅實基礎。加入我們的社群,瞭解更多關於 Reactor 的資訊,或者透過您希望看到的任何改進做出貢獻。
要快速開始使用 Reactor 並檢視各種上下文中的程式碼,請查閱快速入門
https://github.com/reactor/reactor-quickstart
或示例
https://github.com/reactor/reactor-samples
要分叉原始碼、閱讀維基或提交問題,請訪問我們的 GitHub 頁面
https://github.com/reactor/reactor
您可以加入一個 Google 群組,提出問題或參與關於 Reactor 的討論
https://groups.google.com/forum/#!forum/reactor-framework
獲取 Maven 工件以包含在您的專案中
<dependencies>
<!-- core components -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- groovy support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-groovy</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- tcp client/server -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-tcp</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- spring support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-spring</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
</dependencies>