Reactor 1.0.0.M2 – JVM 上響應式快速資料應用的基礎

工程 | Jon Brisbin | 2013 年 8 月 27 日 | ...

我很高興地宣佈 Reactor 的第二個里程碑版本釋出,這是我們邁向 1.0 的一步! Reactor 1.0.0.M2 的 Maven 工件可在常用的里程碑倉庫中找到。

什麼是 Reactor?

Reactor 是一個基礎框架,用於在 JVM 上構建高吞吐量、非同步、響應式應用程式。 它為事件路由提供選擇器風格的主題匹配、動態消費者分配、超快的任務處理器以及用於非同步處理資料和協調非同步任務的響應式 Stream 和 Promise API。 它透過提供語言擴充套件來全面支援 Groovy 語言,從而使在 Groovy 中編寫 Reactor 應用程式非常 Groovy! 它還具有易於使用的 Spring 支援,可以自動將帶註釋的 POJO 連線到 Reactor。

此版本包含什麼?

第二個里程碑包括許多錯誤修復和一些非常令人興奮的新功能。 Reactor 現在包含一個 Processor 抽象,它是一個基於 LMAX Disruptor RingBuffer 的高度最佳化的任務處理器。 它使用 Reactor 中的通用抽象來配置 RingBuffer,並允許您使用 Reactor 的通用 API 而不是 Disruptor 特定的 API。 透過設計,它還跳過了 Reactor 提供的選擇器匹配和動態消費者分配,以便榨取它可以榨取的最後一滴吞吐量。 MacBook Pro 上的軼事基準測試表明,處理器每秒可以透過管道泵送大約 100,000,000 個事件。 是的,你沒看錯:每秒 1 億個!

1.0.0.M2 還包括 Reactor API 中的一個小但重要的新功能,該功能優化了 Reactor 中的事件釋出,從而實現了大約 30-50% 的更高吞吐量。 由於它從 Reactor 準備了一個最佳化的消費者列表,因此它並不適合所有情況,但對於每秒 1000 萬個事件的吞吐量來說,這是一個很棒的新功能。

最佳化釋出

Reactor 的強大之處之一是選擇器匹配主題(ish)釋出/訂閱。 它允許您使用主題、匿名物件、可分配型別層次結構、URI 路徑匹配或正則表示式(或者如果您實現自己的特定於域的選擇器,則可以使用任何其他型別的選擇器匹配)輕鬆地將處理程式分配給事件。 但是許多應用程式可以在啟動時分配其處理程式,這意味著可以最佳化到這些消費者的路徑,以實現高效的事件釋出。 新的 Reactor 方法 prepare(Object) 允許您預先選擇金鑰的消費者。 它返回一個消費者本身,事件釋出者可以使用該消費者來有效地通知新事件。

// Create Environment in which Reactors operate
Environment env = new Environment();
Reactor reactor = Reactors.reactor().env(env).get();

reactor.on($("say.hello"), new Consumer<Event<String>>() {
	public void accept(Event<String> ev) {
		System.out.println("Hello " + ev.getData() + "!");
	}
});

Consumer<Event<String>> sayHello = reactor.prepare("say.hello");
for(String name : listOfNames) {
	sayHello.accept(name);
}

RingBuffer 任務處理器

Reactor 1.0.0.M2 包含 Processor 抽象。 這是一個由 LMAX Disruptor RingBuffer 支援的簡單任務處理器,旨在將其無縫整合到 Reactor 中使用的響應式 API 中,因此它使用常見的抽象,例如 Supplier 和 Consumer。 可以在單個表示式中建立一個完全配置的處理器,並且使用 Java 8 lambda 表示式更為簡潔

Processor<Message> proc = new ProcessorSpec<Message>()
	.dataSupplier({ return new Message(); })
	.consume({ msg -> // handle the updated Message object })
	.get();

Processor 提供了兩種與底層 RingBuffer 互動的方式。 單操作模式透過呼叫 prepare() 方法從 Processor 請求一個 Operation 物件來工作。 Operation 上有一個 get() 方法,用於訪問 RingBuffer 建立時填充的預先分配的事件物件。 可以使用新資料更新此物件的成員。 準備好釋出操作並觸發事件處理程式後,只需呼叫 Operation 的 commit() 方法即可。

public class Message {
	int type;
	Buffer buffer;
}

@Autowired
Processor<Message> proc;

public void handle(Buffer buff) {
	Operation<Message> op = proc.prepare();

	op.get().type = buff.readInt();
	op.get().buffer = buff;

	op.commit();
}

如果您可以處理批次資料,則 Processor 提供一個 batch(int, Consumer) 方法,該方法允許您指定批次大小並傳遞一個突變器,其形式為 Consumer,其工作是更新每個事件的資料。 如果批處理大小大於底層 RingBuffer 的大小,則批處理將隱式重新整理,否則釋出步驟將延遲到達到批處理大小為止。 這通常會提高吞吐量和效率。

public class Message {
	int type;
	Buffer buffer;
}

@Autowired
Processor<Message> proc;

public void handle(List<Buffer> buffs) {

	proc.batch(buffs.size(), new Consumer<Message>() {
		ListIterator<Buffer> it = buffs.listIterator();

		public void accept(Message msg) {
			Buffer next = it.next();

			msg.type = next.readInt();
			msg.buffer = next;
		}
	});

}

SpringOne2GX

Reactor 將在今年的 SpringOne2GX 會議上得到突出展示,距離現在不到兩週的時間。 將有 一個由 Stephane Maldini 和 Jon Brisbin 主導的完整會議,以及幾乎不停歇的關於這項技術如何改變您構建應用程式方式的茶歇討論。 還有時間 註冊 並預訂房間。 但是快點!

資源

GitHub:(原始碼、問題跟蹤器)https://github.com/reactor/reactor/
Wiki:https://github.com/reactor/reactor/wiki
API 文件:http://reactor.github.io/docs/api/

Maven 工件

ext {
	reactorVersion = '1.0.0.M2'
}

repositories {
	mavenCentral()
	maven { url 'http://repo.springsource.org/libs-release' }
	maven { url 'http://repo.springsource.org/libs-milestone' }
}

dependencies {
	// Reactor core
	compile "org.projectreactor:reactor-core:$reactorVersion"

	// Reactor Groovy support
	compile "org.projectreactor:reactor-groovy:$reactorVersion"

	// Reactor TCP client/server
	compile "org.projectreactor:reactor-tcp:$reactorVersion"

	// Reactor Spring support
	compile "org.projectreactor:reactor-spring:$reactorVersion"
}

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

搶先一步

VMware 提供培訓和認證,以加速您的進步。

瞭解更多

獲得支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群中所有即將到來的活動。

檢視全部