不能只有大資料,還得有快資料:Reactor 1.0 正式釋出 (GA)

釋出 | Jon Brisbin | 2013 年 11 月 12 日 | ...

我很高興宣佈 Reactor,一個用於在 JVM 上構建響應式、快資料應用的強大基礎庫已達到正式釋出 (GA) 階段!

Reactor 是什麼,我為什麼要關心它?

Reactor 提供了必要的抽象來構建高吞吐量、低延遲的應用——我們現在稱之為“快資料”應用——這些應用絕對必須能夠處理每秒數千、數萬甚至數百萬的併發請求。

您應該關心 Reactor,因為現代應用(擁有非人類消費者,例如手機及其上的應用)生成的資料量超過了傳統的每連線一個執行緒伺服器所能支援的能力,因此 Reactor 為您提供了構建這類高擴充套件性應用所需的工具和抽象,而不會讓您陷入非同步應用中管理狀態和傳遞事件的繁瑣事務。現代 JVM 應用必須構建在堅實的非同步和反應式元件基礎上,這些元件可以在極少數系統執行緒上高效地管理大量任務的執行。Reactor 專門設計用於幫助您構建這類應用,而不會阻礙您或強迫您遵循某種固定的模式。

Reactor 是基礎

Reactor 本身深受同名著名設計模式的影響——但它並不僅僅受此模式啟發。它也包含 Actor 模型和傳統事件驅動回撥程式設計的元素。

儘管它是 Spring IO 平臺基礎的一部分,但 核心 Reactor 庫不依賴於 Spring。Reactor Core 是一個獨立的庫,其唯一的外部依賴是 SLF4J 和優秀的 LMAX Disruptor RingBuffer 庫

在 Reactor Core 的基礎上構建了其他可選元件,以便於按照常見模式開發應用。Reactor 的一些內建的一流支援包括:

  • 透過高速的 Processor 抽象支援 LMAX Disruptor,該抽象提供了 RingBuffer 之上的 Reactor API。
  • 透過靈活的 PersistentQueue 抽象支援高效能的 JavaChronicle 持久化訊息傳遞庫
  • 支援 Groovy 閉包和 @CompileStatic,並提供全面的環境構建和事件連線 DSL。
  • 基於 Netty 4.0 的高效能 TCP 客戶端和伺服器支援。
  • 強大的基於註解的 Spring 支援。
  • 啟動時有大量內容...

Reactor 速度很快

Reactor 從頭開始設計,旨在靈活高效地工作,以便它能夠不妨礙您,並幫助您的應用盡可能快地處理資料。在最快的配置下,一個標準的基於 RingBuffer 的 Reactor 可以在一臺標準的開發者筆記本上每秒釋出超過 1000-1500 萬個事件。高效能的 Processor 抽象可以將每秒超過 1 億個事件傳輸到您的應用中。您的應用對資料進行的何種處理會降低 Reactor 的速度,這取決於具體的任務。但在最佳的無操作模式下,吞吐量如此之高,應用不會因為等待 Reactor 完成工作而停滯!

Reactor 是函式式的

Reactor core 包含一些基本抽象,這些抽象受到 JDK 8 新函式式抽象(如 Function<T,V>Consumer<T>Supplier<T>Predicate<T>)的啟發(在某些情況下甚至直接基於它們)。Reactor 本身不僅構建在這些抽象的基礎上,您的應用也可以利用它們。未來某個時候,當 JDK 8 的普及率足夠高時,Reactor 就可以直接從庫中刪除這些抽象,並依賴於 JDK 8 中的對應部分。在此之前,您的 JDK 6 和 7 應用現在就可以受益於這些函式式抽象。

Reactor 是響應式的

.NET 的 Reactive ExtensionsNetflix 的 RxJava、JDK 8 Stream 抽象以及許多其他庫(更不用說 20 年的事件驅動計算機科學)的啟發,Reactor 提供了一種“響應式”程式設計模型,使得協調非同步任務變得更加容易。像 Stream<T>Promise<T> 這樣的抽象使得連結非阻塞操作變得簡單而簡潔——告別回撥函式巢狀!

@Inject
AsyncDataLoader loader;

Promise<Buffer> p = loader.get("U-U-I-D")
    .map(new Function<Buffer, Data>() {
      public Data apply(Buffer buff) {
        // transform data
        Data data = parser.parse(buff);
        return data;
      }
    })
    .filter(new Predicate<Data>() {
      public boolean test(Data data) {
        // check Data for certain conditions being true
        return null != data.getName();
      }
    })
    .consume(new Consumer<Data>() {
      public void accept(Data data) {
        // only Data that passes the Predicate test makes it here...
      }
    });
    
// Promises can also block like a Future
Buffer buff = p.await();

這些操作(mapfilterconsume)中的每一個都是獨立執行的(可能)非同步操作。在傳統的多執行緒環境中,需要新增大量圍繞阻塞 Future 和等待完成的冗餘程式碼。然而,使用 Reactor,您只需以響應式的方式將操作連結起來,這樣當上一個操作完成後,下一個操作就會對資料“做出反應”。

Reactor 很 Groovy

Reactor 包含對 Groovy 語言的一流支援。它支援使用閉包作為回撥函式,擁有一個強大的 DSL 用於配置 Reactor 環境,並提供了一些非常酷的運算子過載來編寫簡潔的程式碼。

Reactor 是可擴充套件的

Clojurewerkz 有一個名為 Meltdown 的庫,它是基於 Reactor 構建的。可以毫不費力地新增對其他 JVM 語言的支援。Reactor 的 API 設計為可擴充套件的,以便非 Java 語言也能從 Reactor 的工具中受益。

程式碼長什麼樣?

Reactor 已經支援 Java 8,所以我們先來看一些使用 JDK 8 強大 Lambda 特性的 Reactor 程式碼。

import static reactor.event.selector.Selectors.*;

// Only create one of these per JVM
static Environment env = new Environment();

// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
  .<String>on($("topic"), ev -> System.out.prinltn("greeting: " + ev.getData()));

r.notify("topic", Event.wrap("Hello World!"));

Reactor 希望實現的目標之一是減少您需要編寫的程式碼量;上面的程式碼非常簡潔。即使在 Java 6 和 7 中,它也非常簡潔。

import static reactor.event.selector.Selectors.*;

// Only create one of these per JVM
static Environment env = new Environment();

// Create a Reactor and listen to a topic using a Selector
Reactor r = Reactors.reactor(env)
  .on($("topic"), new Consumer<Event<String>>() {
    public void accept(Event<String> ev) {
      System.out.prinltn("greeting: " + ev.getData());
    }
  });

r.notify("topic", Event.wrap("Hello World!"));

在 Groovy 中則更加簡潔(正如您所預期的),因為語言支援會處理一些物件的型別轉換,並允許使用閉包。

def env = new Environment()

def r = Reactors.reactor(env).on("topic") { String greeting ->
  println "greeting: $greeting"
}

r.notify "topic", "Hello World!"

排程器 (Dispatchers)

一個 Dispatcher 負責在給定的 Thread 上執行任務。Dispatcher 有多種內建實現,可以在呼叫執行緒中、執行緒池中的執行緒上、使用單執行緒事件迴圈式排程,或者最快的排程器:使用 LMAX Disruptor RingBuffer 排程任務的 RingBufferDispatcher

每當您在 Reactor 中建立元件時,通常會指定用於排程事件的 Dispatcher。在高容量應用中,使用執行緒池可能會對 CPU 和 GC 產生極高的開銷,而將事件排程到 RingBuffer 中則極其高效。使用 RingBufferDispatcher 每秒可以排程數千萬個事件。

選擇器 (Selectors)

一個 Selector 是動作與事件鍵的動態對映。當您向 Reactor 分配一個動作時,透過註冊一個 Selector 來告訴它響應哪些事件鍵。有幾種內建實現,可以匹配諸如 Object.equals() 之類的內容,進行基於字串的正則表示式匹配,URI 模板匹配(您可以使用熟悉的括號定界佔位符表示法來匹配 URI),Class.isAssignableFrom() 匹配(只選擇繼承自某個共同抽象的鍵),Predicate 匹配(允許您基於特定範圍的謂詞建立任意 Predicate<T> 選擇器),甚至還有一個可選的 JsonPathSelector,它使用 JsonPath 透過 JsonPath 表示式從鍵中查詢資料。

您可能已經注意到,在示例中使用了 Java 開發者可能有點困惑的東西:用於建立 Selector$ 快捷方法 [1]。如果您使用過 jQuery 進行 Web 開發,那麼您會感到非常熟悉,因為 $ 方法只是建立 Selector 的一個快捷方式,就像 jQuery 在編寫 $(".css-class") 時建立 CSS Query 一樣。如果美元符號對您來說太不尋常,Reactor 總是嘗試提供多種方法來完成同一件事;您可以使用 Selectors.object(T)ObjectSelector.objectSelector() 靜態建立方法代替(或者直接使用建構函式建立 ObjectSelector 例項)。

[1]: 除了 $(T),還有其他建立 Selectors 的快捷輔助方法。例如,R(String) 用於建立 RegexSelectors,T(Class<?>) 用於建立 ClassSelectors,U(String) 用於建立 UriTemplateSelectors。

Promise 和 Stream

Reactor 的 PromiseStream 提供了一種響應式的、可組合的方式來協調多個非同步任務,而不會產生過多的回撥函式巢狀。Promise 是一個有狀態元件,可以在應用中傳遞,代表一個將從另一個執行緒填充的值。像傳統的 Future 一樣,Promise 可以阻塞呼叫執行緒。但更重要的是,Promise 使得轉換值和執行整個處理鏈變得容易。

一個 StreamPromise 類似,因為它提供了一個組合 API 來響應未來的值。但是 StreamPromise 的不同之處在於,它被設計用來處理透過的多個值。

要在 PromiseStream 中填充值,您需要建立一個 Deferred,它是一個 Consumer<T>。您可以將此 Deferred 傳遞到您的服務層,以便將最終值傳回給呼叫者。

// Only create one of these per JVM
static Environment env = new Environment();

public class DataLoader {

  public Promise<Buffer> load(String key) {  
    Deferred<Buffer, Promise<Buffer>> deferred = Promises.defer(env);

    // submit work to be done in another thread
    // like reading data from a datastore
    datastore.load(key, deferred);
    
    return deferred.compose();
  }
  
}

// Your service layer uses this API
@Inject
DataLoader loader;

loader.load("obj-key")
  .onSuccess(new Consumer<Buffer>() {
    public void accept(Buffer b) {
      // handle eventual data
    }
  })
  .onError(new Consumer<Throwable>() {
    public void accept(Throwable t) {
      // handle errors
    }
  });

元組 (Tuples)

Scala 的 Tuple 類是一種型別安全的方式,用於傳遞一個封裝了其他值的單一物件,而無需建立特定於應用的、一次性的“holder”bean。Reactor 將此功能融入到其自己的 Tuple 類實現中。

元組使用起來非常簡單。您可以使用 Tuple.from(T1, T2, …) 方法建立一個元組,然後可以使用 Tuple.getT1()Tuple.getTN() 方法獲取其中的值。

reactor.on($("topic"), new Consumer<Event<Tuple2<URI, Buffer>>>() {
  public void accept(Event<Tuple2<URI, Buffer>> ev) {
    URI uri = tup.getT1();
    Buffer buff = tup.getT2();  
    
    // deal with request from uri.getPath()
  }
});

// notify consumers of new request
reactor.notify("topic", Event.wrap(Tuple.from(requestUri, request)));

檢視 Tuple API 文件以瞭解所有可能性。

TcpClient 和 TcpServer

Reactor 提供功能齊全的 TCP 客戶端和伺服器抽象。它們提供了一種簡單的方式來構建可以支援大量客戶端的基於 TCP 的應用。Reactor TCP 支援中的基本抽象是通用的,可以建立多種實現來利用不同的 TCP 技術。然而,內建的實現利用了優秀的 Netty 庫來進行非同步 IO。

Apache 許可,社群友好

Reactor 是開源的,採用 Apache 許可。開發者和使用者社群是一群普通人,他們希望一起工作,為在 JVM 上構建響應式、快資料應用建立一個奇妙的基礎。加入我們的社群,瞭解更多關於 Reactor 的資訊,或者透過您希望看到的任何改進來貢獻力量。

要快速開始使用 Reactor 並檢視不同上下文中的一些程式碼,請檢視快速入門指南:

https://github.com/reactor/reactor-quickstart

或示例:

https://github.com/reactor/reactor-samples

要 fork 原始碼、閱讀 wiki 或提交問題,請訪問我們在 GitHub 上的頁面:

https://github.com/reactor/reactor

您可以加入 Google Group 來提問或參與圍繞 Reactor 的討論:

https://groups.google.com/forum/#!forum/reactor-framework

獲取 Maven Artifacts 以包含到您的專案中

<dependencies>

	<!-- core components -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-core</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>
	
	<!-- groovy support -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-groovy</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

	<!-- tcp client/server -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-tcp</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

	<!-- spring support -->
	<dependency>
		<groupId>org.projectreactor</groupId>
		<artifactId>reactor-spring</artifactId>
		<version>1.0.0.RELEASE</version>
	</dependency>

</dependencies>

獲取 Spring 電子報

透過 Spring 電子報保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速前進。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單訂閱。

瞭解更多

即將舉辦的活動

檢視 Spring 社群的所有即將舉辦的活動。

檢視全部