領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多我非常高興地宣佈 Project Reactor 的首個里程碑版本釋出!Project Reactor 是一個用於在 JVM 上構建非同步、FastData 應用程式的基礎框架。Reactor 1.0.0.M1 包含了一些優秀的特性,包括:反應式組合助手 Stream 和 Promise、TcpServer 和 TcpClient,以及 Groovy 和 Spring 支援。受到 Reactive Extensions、RxJava、新的 JDK 8 Stream API(以及 Scala 等)的啟發,這些可組合元件讓協調非同步任務變得異常簡單。它們支援使用 Consumers 進行傳統的基於回撥的程式設計,但同時也提供了簡潔的組合 API,包含 map(Function fn)、filter(Predicate、batch(int size) 等方法。
Reactor 從一開始就設計為一個高效能、高可擴充套件的平臺,用於構建下一代大資料應用程式。在將應用程式擴充套件到成百上千甚至數百萬使用者時,非同步架構在效能上明顯優於每請求一執行緒的架構。Reactor 的非同步基礎為每秒處理數萬、數十萬甚至數百萬事件的大資料應用程式提供了堅實的基礎。它提供了簡單的工具來串聯非同步任務,並使執行這些任務變得像呼叫一個方法一樣簡單。
Stream 是一種簡單的方式,可以在資料非同步流經應用程式時對其進行處理。在 Reactor 中,一個 Stream 實際上由兩部分組成:Deferred(釋出者)和實際的 Stream(消費者)。您可以透過組合方法和簡單回撥的組合,在 Stream 上分配處理程式來處理資料。
在將資料放入佇列進行進一步處理之前,使用 Stream 和 JDK Lambdas 對進入應用程式的資料進行轉換和過濾,程式碼看起來會像這樣:
// Create Environment in which Reactors operate
Environment env = new Environment();
// Create a Stream using the high-speed LMAX Disruptor RingBuffer
Deferred<Trade, Stream<Trade>> incoming = Streams.<Trade>defer()
.env(env)
.dispatcher(Environment.RING_BUFFER)
.get();
// Work with the incoming trades
Stream<Trade> trades = incoming.compose();
Stream<Order> orders = trades.map(trade -> tradeService.placeTrade(trade));
// Filter out large orders from small
Stream<Order> highPriority = orders.filter(order -> order.getSize() >= 1000);
Stream<Order> lowPriority = orders.filter(order -> order.getSize() < 1000);
// Consume the orders in different ways
highPriority.consume(order -> orderService.executeNow(order));
lowPriority.consume(order -> orderService.executeLater(order));
M1 版本還包含一個易於使用的 TCP 客戶端和伺服器。由超高速的 Netty 網路庫提供支援,一個由 Reactor 驅動的 syslog 伺服器在伺服器級硬體上每秒可以攝取大約 100 萬條訊息。Reactor 的 TCP 支援包含一個簡單的 Codec 工具,它易於擴充套件,超越了核心庫中提供的預設編解碼器集,並且設計得輕量化,透過使用 Reactor 的 Buffer 類,該類提供了對資料極其高效的檢視,以及大量用於處理標準 Java NIO ByteBuffers 的輔助方法——但避免了直接處理 ByteBuffer 的痛苦。
Reactor 的 TCP 支援開箱即用地提供了 JSON。建立一個使用 JSON 作為協議的基於 TCP 的 RPC 伺服器就像這樣簡單:
TcpServer<Pojo, Pojo> server = new TcpServerSpec<Pojo, Pojo>(NettyTcpServer.class)
.env(env)
.codec(new JsonCodec<>(Pojo.class))
.consume(conn -> {
conn.consume(data -> {
// handle incoming data
});
})
.get()
.start();
Reactor M1 還提供了出色的 Groovy 支援。它提供了助手,使得使用 Closures 消費事件變得非常簡潔。毋庸置疑,用 Groovy 編寫 Reactor 事件處理程式碼非常簡單。使用 Closures 處理 Reactors 讓非同步程式碼實際上可讀!
def env = new Environment()
// Create Reactor using default RingBuffer Dispatcher
def reactor = Reactors.reactor().env(env).get()
reactor.on('topic') { String s ->
// handle data
}
// Publish an event to a topic
r1.notify 'topic', 'Hello World!'
Reactor M1 還包括 Spring 支援,使得編寫事件驅動的 POJO 像 MVC 控制器一樣簡單。透過使用 @On 註解標記方法,一個透過元件掃描發現的 bean 可以自動連線到一個 Reactor,並接收事件通知。
一個簡單的基於 JavaConfig 的 Spring 配置可能看起來像這樣:
public class HandlerBean {
@On(reactor = "@rootReactor", selector = '$("test")')
public void handleTest() {
// event 'test' was fired
}
}
@Configuration
public class AnnotatedHandlerConfig {
@Bean
public Environment env() {
return new Environment();
}
@Bean
public Reactor rootReactor() {
return env().getRootReactor();
}
}
只需將您的 Reactor 注入到服務層,當事件準備就緒時,使用 notify() 方法在 Reactor 上釋出它們。
Maven 工件可在 SpringSource Artifactory 倉庫中獲取。在一個 Gradle 專案中,您會像這樣引入 Reactor:
ext {
reactorVersion = '1.0.0.M1'
}
repositories {
maven { url 'http://repo.springsource.org/libs-milestone' }
mavenCentral()
}
dependencies {
// Reactor Core
compile 'org.projectreactor:reactor-core:$reactorVersion'
}
原始碼可在 GitHub 上獲取:https://github.com/reactor/reactor
加入 Reactor Google+ 社群以瞭解 Reactor 的最新動態,或在 Twitter 上 關注我們 @ProjectReactor。
文件可在 GitHub Wiki 和 API Javadoc 上獲取。
您也可以在 GitHub Issues 上提交問題並跟蹤開發進度。
我們將在今年的 SpringOne 上 就 Reactor 進行一次完整的專題演講。如果您還沒有計劃參加,那麼您真的應該去!會議日程安排滿了關於 Spring 社群正在做的激動人心的事情的精彩專題。快來加入我們吧!
SpringOne2GX 2013,9 月 9-12 日在加利福尼亞州聖克拉拉舉行
我迫不及待地想投入到下一個衝刺中,努力實現 1.0 GA。我們非常歡迎您的加入!