領先一步
VMware 提供培訓和認證,助您加速前進。
瞭解更多我很高興宣佈 Reactor,一個用於在 JVM 上構建響應式、快資料應用的強大基礎庫已達到正式釋出 (GA) 階段!
Reactor 提供了必要的抽象來構建高吞吐量、低延遲的應用——我們現在稱之為“快資料”應用——這些應用絕對必須能夠處理每秒數千、數萬甚至數百萬的併發請求。
您應該關心 Reactor,因為現代應用(擁有非人類消費者,例如手機及其上的應用)生成的資料量超過了傳統的每連線一個執行緒伺服器所能支援的能力,因此 Reactor 為您提供了構建這類高擴充套件性應用所需的工具和抽象,而不會讓您陷入非同步應用中管理狀態和傳遞事件的繁瑣事務。現代 JVM 應用必須構建在堅實的非同步和反應式元件基礎上,這些元件可以在極少數系統執行緒上高效地管理大量任務的執行。Reactor 專門設計用於幫助您構建這類應用,而不會阻礙您或強迫您遵循某種固定的模式。
Reactor 本身深受同名著名設計模式的影響——但它並不僅僅受此模式啟發。它也包含 Actor 模型和傳統事件驅動回撥程式設計的元素。
儘管它是 Spring IO 平臺基礎的一部分,但 核心 Reactor 庫不依賴於 Spring。Reactor Core 是一個獨立的庫,其唯一的外部依賴是 SLF4J 和優秀的 LMAX Disruptor RingBuffer 庫。
在 Reactor Core 的基礎上構建了其他可選元件,以便於按照常見模式開發應用。Reactor 的一些內建的一流支援包括:
@CompileStatic
,並提供全面的環境構建和事件連線 DSL。Reactor 從頭開始設計,旨在靈活高效地工作,以便它能夠不妨礙您,並幫助您的應用盡可能快地處理資料。在最快的配置下,一個標準的基於 RingBuffer 的 Reactor 可以在一臺標準的開發者筆記本上每秒釋出超過 1000-1500 萬個事件。高效能的 Processor 抽象可以將每秒超過 1 億個事件傳輸到您的應用中。您的應用對資料進行的何種處理會降低 Reactor 的速度,這取決於具體的任務。但在最佳的無操作模式下,吞吐量如此之高,應用不會因為等待 Reactor 完成工作而停滯!
Reactor core 包含一些基本抽象,這些抽象受到 JDK 8 新函式式抽象(如 Function<T,V>
、Consumer<T>
、Supplier<T>
和 Predicate<T>
)的啟發(在某些情況下甚至直接基於它們)。Reactor 本身不僅構建在這些抽象的基礎上,您的應用也可以利用它們。未來某個時候,當 JDK 8 的普及率足夠高時,Reactor 就可以直接從庫中刪除這些抽象,並依賴於 JDK 8 中的對應部分。在此之前,您的 JDK 6 和 7 應用現在就可以受益於這些函式式抽象。
受 .NET 的 Reactive Extensions、Netflix 的 RxJava、JDK 8 Stream 抽象以及許多其他庫(更不用說 20 年的事件驅動計算機科學)的啟發,Reactor 提供了一種“響應式”程式設計模型,使得協調非同步任務變得更加容易。像 Stream<T>
和 Promise<T>
這樣的抽象使得連結非阻塞操作變得簡單而簡潔——告別回撥函式巢狀!
@Inject
AsyncDataLoader loader;
Promise<Buffer> p = loader.get("U-U-I-D")
.map(new Function<Buffer, Data>() {
public Data apply(Buffer buff) {
// transform data
Data data = parser.parse(buff);
return data;
}
})
.filter(new Predicate<Data>() {
public boolean test(Data data) {
// check Data for certain conditions being true
return null != data.getName();
}
})
.consume(new Consumer<Data>() {
public void accept(Data data) {
// only Data that passes the Predicate test makes it here...
}
});
// Promises can also block like a Future
Buffer buff = p.await();
這些操作(map
、filter
、consume
)中的每一個都是獨立執行的(可能)非同步操作。在傳統的多執行緒環境中,需要新增大量圍繞阻塞 Future 和等待完成的冗餘程式碼。然而,使用 Reactor,您只需以響應式的方式將操作連結起來,這樣當上一個操作完成後,下一個操作就會對資料“做出反應”。
Reactor 包含對 Groovy 語言的一流支援。它支援使用閉包作為回撥函式,擁有一個強大的 DSL 用於配置 Reactor 環境,並提供了一些非常酷的運算子過載來編寫簡潔的程式碼。
Clojurewerkz 有一個名為 Meltdown 的庫,它是基於 Reactor 構建的。可以毫不費力地新增對其他 JVM 語言的支援。Reactor 的 API 設計為可擴充套件的,以便非 Java 語言也能從 Reactor 的工具中受益。
Reactor 已經支援 Java 8,所以我們先來看一些使用 JDK 8 強大 Lambda 特性的 Reactor 程式碼。
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.<String>on($("topic"), ev -> System.out.prinltn("greeting: " + ev.getData()));
r.notify("topic", Event.wrap("Hello World!"));
Reactor 希望實現的目標之一是減少您需要編寫的程式碼量;上面的程式碼非常簡潔。即使在 Java 6 和 7 中,它也非常簡潔。
import static reactor.event.selector.Selectors.*;
// Only create one of these per JVM
static Environment env = new Environment();
// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
.on($("topic"), new Consumer<Event<String>>() {
public void accept(Event<String> ev) {
System.out.prinltn("greeting: " + ev.getData());
}
});
r.notify("topic", Event.wrap("Hello World!"));
在 Groovy 中則更加簡潔(正如您所預期的),因為語言支援會處理一些物件的型別轉換,並允許使用閉包。
def env = new Environment()
def r = Reactors.reactor(env).on("topic") { String greeting ->
println "greeting: $greeting"
}
r.notify "topic", "Hello World!"
一個 Dispatcher
負責在給定的 Thread
上執行任務。Dispatcher
有多種內建實現,可以在呼叫執行緒中、執行緒池中的執行緒上、使用單執行緒事件迴圈式排程,或者最快的排程器:使用 LMAX Disruptor RingBuffer 排程任務的 RingBufferDispatcher
。
每當您在 Reactor 中建立元件時,通常會指定用於排程事件的 Dispatcher
。在高容量應用中,使用執行緒池可能會對 CPU 和 GC 產生極高的開銷,而將事件排程到 RingBuffer 中則極其高效。使用 RingBufferDispatcher
每秒可以排程數千萬個事件。
一個 Selector
是動作與事件鍵的動態對映。當您向 Reactor
分配一個動作時,透過註冊一個 Selector
來告訴它響應哪些事件鍵。有幾種內建實現,可以匹配諸如 Object.equals()
之類的內容,進行基於字串的正則表示式匹配,URI 模板匹配(您可以使用熟悉的括號定界佔位符表示法來匹配 URI),Class.isAssignableFrom()
匹配(只選擇繼承自某個共同抽象的鍵),Predicate
匹配(允許您基於特定範圍的謂詞建立任意 Predicate<T>
選擇器),甚至還有一個可選的 JsonPathSelector
,它使用 JsonPath 透過 JsonPath 表示式從鍵中查詢資料。
您可能已經注意到,在示例中使用了 Java 開發者可能有點困惑的東西:用於建立 Selector
的 $
快捷方法 [1]。如果您使用過 jQuery 進行 Web 開發,那麼您會感到非常熟悉,因為 $
方法只是建立 Selector
的一個快捷方式,就像 jQuery 在編寫 $(".css-class")
時建立 CSS Query 一樣。如果美元符號對您來說太不尋常,Reactor 總是嘗試提供多種方法來完成同一件事;您可以使用 Selectors.object(T)
或 ObjectSelector.objectSelector()
靜態建立方法代替(或者直接使用建構函式建立 ObjectSelector
例項)。
[1]: 除了 $(T)
,還有其他建立 Selectors 的快捷輔助方法。例如,R(String)
用於建立 RegexSelectors,T(Class<?>)
用於建立 ClassSelectors,U(String)
用於建立 UriTemplateSelectors。
Reactor 的 Promise
和 Stream
提供了一種響應式的、可組合的方式來協調多個非同步任務,而不會產生過多的回撥函式巢狀。Promise
是一個有狀態元件,可以在應用中傳遞,代表一個將從另一個執行緒填充的值。像傳統的 Future
一樣,Promise
可以阻塞呼叫執行緒。但更重要的是,Promise
使得轉換值和執行整個處理鏈變得容易。
一個 Stream
與 Promise
類似,因為它提供了一個組合 API 來響應未來的值。但是 Stream
與 Promise
的不同之處在於,它被設計用來處理透過的多個值。
要在 Promise
或 Stream
中填充值,您需要建立一個 Deferred
,它是一個 Consumer<T>
。您可以將此 Deferred
傳遞到您的服務層,以便將最終值傳回給呼叫者。
// Only create one of these per JVM
static Environment env = new Environment();
public class DataLoader {
public Promise<Buffer> load(String key) {
Deferred<Buffer, Promise<Buffer>> deferred = Promises.defer(env);
// submit work to be done in another thread
// like reading data from a datastore
datastore.load(key, deferred);
return deferred.compose();
}
}
// Your service layer uses this API
@Inject
DataLoader loader;
loader.load("obj-key")
.onSuccess(new Consumer<Buffer>() {
public void accept(Buffer b) {
// handle eventual data
}
})
.onError(new Consumer<Throwable>() {
public void accept(Throwable t) {
// handle errors
}
});
Scala 的 Tuple 類是一種型別安全的方式,用於傳遞一個封裝了其他值的單一物件,而無需建立特定於應用的、一次性的“holder”bean。Reactor 將此功能融入到其自己的 Tuple
類實現中。
元組使用起來非常簡單。您可以使用 Tuple.from(T1, T2, …)
方法建立一個元組,然後可以使用 Tuple.getT1()
到 Tuple.getTN()
方法獲取其中的值。
reactor.on($("topic"), new Consumer<Event<Tuple2<URI, Buffer>>>() {
public void accept(Event<Tuple2<URI, Buffer>> ev) {
URI uri = tup.getT1();
Buffer buff = tup.getT2();
// deal with request from uri.getPath()
}
});
// notify consumers of new request
reactor.notify("topic", Event.wrap(Tuple.from(requestUri, request)));
檢視 Tuple API 文件以瞭解所有可能性。
Reactor 提供功能齊全的 TCP 客戶端和伺服器抽象。它們提供了一種簡單的方式來構建可以支援大量客戶端的基於 TCP 的應用。Reactor TCP 支援中的基本抽象是通用的,可以建立多種實現來利用不同的 TCP 技術。然而,內建的實現利用了優秀的 Netty 庫來進行非同步 IO。
Reactor 是開源的,採用 Apache 許可。開發者和使用者社群是一群普通人,他們希望一起工作,為在 JVM 上構建響應式、快資料應用建立一個奇妙的基礎。加入我們的社群,瞭解更多關於 Reactor 的資訊,或者透過您希望看到的任何改進來貢獻力量。
要快速開始使用 Reactor 並檢視不同上下文中的一些程式碼,請檢視快速入門指南:
https://github.com/reactor/reactor-quickstart
或示例:
https://github.com/reactor/reactor-samples
要 fork 原始碼、閱讀 wiki 或提交問題,請訪問我們在 GitHub 上的頁面:
https://github.com/reactor/reactor
您可以加入 Google Group 來提問或參與圍繞 Reactor 的討論:
https://groups.google.com/forum/#!forum/reactor-framework
獲取 Maven Artifacts 以包含到您的專案中
<dependencies>
<!-- core components -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- groovy support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-groovy</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- tcp client/server -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-tcp</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<!-- spring support -->
<dependency>
<groupId>org.projectreactor</groupId>
<artifactId>reactor-spring</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
</dependencies>