不能只有大資料,還必須是快速資料:Reactor 1.0 釋出 GA 版本

釋出 | Jon Brisbin | 2013年11月12日 | ...

我很高興地宣佈,Reactor,一個用於在 JVM 上構建響應式、快速資料應用程式的強大基礎庫,已達到 GA 版本!

什麼是 Reactor?我為什麼要關心?

Reactor 提供了必要的抽象,用於構建高吞吐量、低延遲(我們現在稱之為“快速資料”)的應用程式,這些應用程式必須處理每秒數千、數萬甚至數百萬的併發請求。

您應該關注 Reactor,因為現代應用程式(帶有非人類消費者,如手機及在其上執行的應用程式)產生的資料量超過了傳統“每連線一執行緒”伺服器所能支援的範圍,因此 Reactor 為您提供了構建這些高規模應用程式所需的工具和抽象,而無需陷入管理狀態和在非同步應用程式中傳遞事件的繁瑣物流。現代 JVM 應用程式必須建立在非同步和響應式元件的堅實基礎上,這些元件可以在極少數系統執行緒上高效管理大量任務的執行。Reactor 專門設計用於幫助您構建這類應用程式,而不會阻礙您或強迫您遵循某種特定的模式。

Reactor 是基礎

Reactor 本身深受同名知名設計模式的影響,但其靈感並非來源於該模式。它也包含Actor 模型和傳統基於事件的回撥程式設計的元素。

儘管它是 Spring IO 平臺基礎的一部分,但 Reactor 核心庫不依賴於 Spring。Reactor 核心是一個獨立的庫,其唯一的外部依賴是 SLF4J 和出色的 LMAX Disruptor RingBuffer 庫

在 Reactor 核心之上構建了其他可選元件,以方便根據常見模式開發應用程式。Reactor 的一些內建一流支援包括:

  • 透過高速 Processor 抽象提供 LMAX Disruptor 支援,該抽象透過 RingBuffer 提供 Reactor API。
  • 透過靈活的 PersistentQueue 抽象,支援高效能的 JavaChronicle 持久訊息傳遞庫
  • 支援 Groovy 閉包和 @CompileStatic,並提供全面的環境構建和事件連線 DSL。
  • 基於 Netty 4.0 的高效能 TCP 客戶端和伺服器支援。
  • 強大的基於註解的 Spring 支援。
  • 啟動時有大量內容...

Reactor 速度快

Reactor 從一開始就被設計為靈活高效,以便它能夠讓您擺脫困擾,並幫助您以最快的速度處理應用程式中的資料。在其最快的配置下,一個標準的支援 RingBuffer 的 Reactor 在一臺標準開發筆記型電腦上每秒可以釋出超過 10-15 百萬個事件。高效能的 Processor 抽象可以每秒向您的應用程式注入超過 1 億個事件。您的應用程式如何處理資料以減慢 Reactor 的速度,這取決於任務。但在最佳的空操作模式下,其吞吐量如此之高,應用程式不會因為等待 Reactor 完成工作而停滯!

Reactor 是函式式的

Reactor 核心包含一些基本抽象,這些抽象受到 JDK 8 新的函式式抽象(如 Function<T,V>Consumer<T>Supplier<T>Predicate<T>)的啟發(在某些情況下直接基於這些抽象)。Reactor 本身不僅建立在這些抽象的基礎上,您的應用程式也可以利用它們。未來某個時候,JDK 8 的普及程度將足夠高,屆時 Reactor 可以簡單地從 Reactor 中刪除這些抽象,並依賴 JDK 8 中的抽象。在此之前,您的 JDK 6 和 7 應用程式現在就可以從這些函式式抽象中受益。

Reactor 是響應式的

.NET 的 Reactive ExtensionsNetflix 的 RxJava、JDK 8 的 Stream 抽象以及許多其他庫(更不用說 20 年的事件驅動計算機科學)的啟發,Reactor 提供了一種“響應式”程式設計模型,使協調非同步任務變得更加容易。Stream<T>Promise<T> 等抽象使鏈式非阻塞操作變得簡單而簡潔——而且沒有回撥地獄!

@Inject
AsyncDataLoader loader;

Promise<Buffer> p = loader.get("U-U-I-D")
    .map(new Function<Buffer, Data>() {
      public Data apply(Buffer buff) {
        // transform data
        Data data = parser.parse(buff);
        return data;
      }
    })
    .filter(new Predicate<Data>() {
      public boolean test(Data data) {
        // check Data for certain conditions being true
        return null != data.getName();
      }
    })
    .consume(new Consumer<Data>() {
      public void accept(Data data) {
        // only Data that passes the Predicate test makes it here...
      }
    });
    
// Promises can also block like a Future
Buffer buff = p.await();

這些操作(mapfilterconsume)都是獨立執行的(可能)非同步操作。在傳統的多執行緒環境中,需要新增大量圍繞 Future 阻塞和等待完成的嘈雜程式碼。然而,使用 Reactor,您只需以響應式方式將操作連結在一起,以便當前一個操作完成時,下一個操作將“響應”資料。

Reactor 是 Groovy 的

Reactor 包含對 Groovy 語言的一流支援。它支援使用閉包作為回撥,為配置 Reactor 環境提供了強大的 DSL,並提供了一些非常酷的運算子過載,用於編寫簡潔的程式碼。

Reactor 是可擴充套件的

Clojurewerkz 有一個名為 Meltdown 的庫,它是基於 Reactor 的。新增其他 JVM 語言支援無需太多工作。Reactor 的 API 被設計為可擴充套件的,以便非 Java 語言可以從 Reactor 的工具中受益。

但是程式碼看起來怎麼樣?

Reactor 已經為 Java 8 做好準備,所以我們首先來看一些使用 JDK 8 出色 Lambda 特性的 Reactor 程式碼

import static reactor.event.selector.Selectors.*;

// Only create one of these per JVM
static Environment env = new Environment();

// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
  .<String>on($("topic"), ev -> System.out.prinltn("greeting: " + ev.getData()));

r.notify("topic", Event.wrap("Hello World!"));

Reactor 希望實現的目標之一是減少您需要編寫的程式碼量;上面已經非常簡潔了。但即使在 Java 6 和 7 中,它也同樣非常簡潔

import static reactor.event.selector.Selectors.*;

// Only create one of these per JVM
static Environment env = new Environment();

// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
  .on($("topic"), new Consumer<Event<String>>() {
    public void accept(Event<String> ev) {
      System.out.prinltn("greeting: " + ev.getData());
    }
  });

r.notify("topic", Event.wrap("Hello World!"));

在 Groovy 中,它甚至更簡潔(正如你所期望的),因為語言支援負責將一些物件轉換為正確的型別並允許使用閉包

def env = new Environment()

def r = Reactors.reactor(env).on("topic") { String greeting ->
  println "greeting: $greeting"
}

r.notify "topic", "Hello World!"

排程器 (Dispatchers)

Dispatcher 負責在給定的 Thread 上執行任務。有各種內建的 Dispatcher 實現,它們可以在呼叫執行緒中、從執行緒池中、使用單執行緒事件迴圈式排程,或者最快的排程器:使用 LMAX Disruptor RingBuffer 排程任務的 RingBufferDispatcher

每當您在 Reactor 中建立一個元件時,通常會指定在排程事件時使用的 Dispatcher。在處理高容量應用程式時,使用執行緒池可能會變得非常消耗 CPU 和 GC,而事件排程到 RingBuffer 中則效率極高。使用 RingBufferDispatcher 每秒可以排程數千萬個事件。

選擇器 (Selectors)

Selector 是將操作動態對映到事件鍵的元件。當您將一個操作分配給 Reactor 時,您透過註冊一個 Selector 來告訴它響應哪些事件鍵。有幾種內建實現,它們可以匹配諸如 Object.equals()、進行基於字串的正則表示式匹配、URI 模板匹配(因此您可以使用熟悉的括號分隔的佔位符表示法來匹配 URI)、Class.isAssignableFrom() 匹配(僅選擇那些從公共抽象派生的鍵)、Predicate 匹配(允許您基於作用域謂詞建立任意的 Predicate<T> 選擇器),甚至還有一個可選的 JsonPathSelector,它使用 JsonPath 透過 JsonPath 表示式查詢鍵中的資料。

您在示例中可能已經注意到,作為 Java 開發人員,您可能會對使用 $ 快捷方法建立 Selector [1] 感到有些困惑。如果您曾使用 jQuery 進行 Web 開發,那麼您會感到非常熟悉,因為 $ 方法只是建立 Selector 的快捷方式,就像 jQuery 在編寫 $(".css-class") 等時建立 CSS 查詢一樣。如果美元符號對您來說太不尋常,Reactor 總是試圖提供不止一種方法來完成任務;您可以使用 Selectors.object(T)ObjectSelector.objectSelector() 靜態建立方法(或者直接使用建構函式例項化 ObjectSelector)。

[1]: 除了 $(T),還有其他用於建立 Selectors 的快捷輔助方法。例如,R(String) 用於建立 RegexSelectors,T(Class<?>) 用於建立 ClassSelectors,以及 U(String) 用於建立 UriTemplateSelectors。

Promise 和 Stream

Reactor 的 PromiseStream 提供了一種響應式、組合式的方式來協調多個非同步任務,而不會產生過多的回撥地獄。Promise 是一個有狀態元件,可以在您的應用程式中傳遞,它代表一個將從另一個執行緒填充的值。與傳統的 Future 一樣,Promise 可以阻塞呼叫執行緒。但更重要的是,Promise 使轉換值和執行整個處理鏈變得容易。

StreamPromise 類似,它提供了一個組合 API 來響應未來的值。但 StreamPromise 的不同之處在於,它旨在處理多個值流經。

要在 PromiseStream 中填充值,您需要建立一個 Deferred,它是一個 Consumer<T>。您可以將此 Deferred 傳遞到您的服務層,以將最終值傳回給呼叫者。

// Only create one of these per JVM
static Environment env = new Environment();

public class DataLoader {

  public Promise<Buffer> load(String key) {  
    Deferred<Buffer, Promise<Buffer>> deferred = Promises.defer(env);

    // submit work to be done in another thread
    // like reading data from a datastore
    datastore.load(key, deferred);
    
    return deferred.compose();
  }
  
}

// Your service layer uses this API
@Inject
DataLoader loader;

loader.load("obj-key")
  .onSuccess(new Consumer<Buffer>() {
    public void accept(Buffer b) {
      // handle eventual data
    }
  })
  .onError(new Consumer<Throwable>() {
    public void accept(Throwable t) {
      // handle errors
    }
  });

元組 (Tuples)

Scala 的 Tuple 類是一種型別安全的方式,用於傳遞封裝其他值的單個物件,而無需建立應用程式特定的、一次性使用的“持有者”Bean。Reactor 將此功能融入其對 Tuple 類的自己的解釋中。

元組使用起來極其簡單。您可以使用 Tuple.from(T1, T2, …) 方法建立一個元組,並可以使用 Tuple.getT1()Tuple.getTN() 方法從中獲取值。

reactor.on($("topic"), new Consumer<Event<Tuple2<URI, Buffer>>>() {
  public void accept(Event<Tuple2<URI, Buffer>> ev) {
    URI uri = tup.getT1();
    Buffer buff = tup.getT2();  
    
    // deal with request from uri.getPath()
  }
});

// notify consumers of new request
reactor.notify("topic", Event.wrap(Tuple.from(requestUri, request)));

請查閱 Tuple API 文件,瞭解所有可能性。

TcpClient 和 TcpServer

Reactor 提供了功能齊全的 TCP 客戶端和伺服器抽象。它們提供了一種簡單的方法來構建能夠支援大量客戶端的基於 TCP 的應用程式。Reactor TCP 支援中的基本抽象是通用的,可以建立多個實現來利用不同的 TCP 技術。但是,內建實現利用出色的 Netty 庫進行非同步 IO。

採用 Apache 許可並擁有友好的社群

Reactor 是開源的,採用 Apache 許可。開發人員和使用者社群只是一群普通人,他們希望共同努力,為在 JVM 上構建響應式、快速資料應用程式奠定堅實基礎。加入我們的社群,瞭解更多關於 Reactor 的資訊,或者透過您希望看到的任何改進做出貢獻。

要快速開始使用 Reactor 並檢視各種上下文中的程式碼,請查閱快速入門

https://github.com/reactor/reactor-quickstart

或示例

https://github.com/reactor/reactor-samples

要分叉原始碼、閱讀維基或提交問題,請訪問我們的 GitHub 頁面

https://github.com/reactor/reactor

您可以加入一個 Google 群組,提出問題或參與關於 Reactor 的討論

https://groups.google.com/forum/#!forum/reactor-framework

獲取 Maven 工件以包含在您的專案中

<dependencies>

	<!-- core components -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>
	
	<!-- groovy support -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-groovy</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

	<!-- tcp client/server -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-tcp</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

	<!-- spring support -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-spring</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

</dependencies>

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有