領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多在本文中,我們將繼續響應式程式設計系列,重點透過實際的程式碼示例來解釋一些概念。最終結果應該能讓你更好地理解響應式程式設計的不同之處,以及它的函式式特性。這裡的例子相當抽象,但它們提供了一種思考 API 和程式設計風格的方式,讓你開始感受它的不同。我們將看到響應式程式設計的元素,並學習如何控制資料流,以及必要時如何在後臺執行緒中處理資料。
我們將使用 Reactor 庫來闡明我們需要說明的要點。程式碼也可以很輕易地用其他工具編寫。如果你想嘗試程式碼並檢視其執行情況,而無需複製貼上任何內容,可以在 Github 上找到包含測試的工作示例。
首先,從 https://start.spring.io 獲取一個空白專案,並新增 Reactor Core 依賴。使用 Maven 如下:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.0.RC2</version>
</dependency>
使用 Gradle 也非常相似:
compile 'io.projectreactor:reactor-core:3.0.0.RC2'
現在我們來編寫一些程式碼。
響應式程式設計的基本構建塊是事件序列,以及兩個主角:釋出者和這些事件的訂閱者。將序列稱為“流”也可以,因為它確實是流。如果需要,我們將使用小寫字母“s”的“stream”,但 Java 8 有一個不同的 java.util.Stream,所以儘量不要混淆。無論如何,我們將嘗試將敘述集中在釋出者和訂閱者上(這就是 Reactive Streams 所做的)。
Reactor 是我們將在示例中使用的庫,所以我們將堅持其表示法,並稱釋出者為 Flux(它實現了 Reactive Streams 中的 Publisher 介面)。RxJava 庫非常相似,並且具有許多並行功能,因此在這種情況下,我們將討論 Observable,但程式碼會非常相似。(Reactor 2.0 稱之為 Stream,如果我們需要同時討論 Java 8 Streams,這會造成混淆,所以我們只使用 Reactor 3.0 中的新程式碼。)
Flux 是特定 POJO 型別事件序列的釋出者,因此它是泛型的,即 Flux<T> 是 T 的釋出者。Flux 有一些靜態便利方法可以從各種來源建立自身例項。例如,從陣列建立 Flux:
Flux<String> flux = Flux.just("red", "white", "blue");
我們剛剛生成了一個 Flux,現在我們可以用它做一些事情。實際上,你只能做兩件事:操作它(轉換它,或與其他序列組合),訂閱它(它是一個釋出者)。
你經常會遇到一個你知道只包含一個或零個元素的序列,例如按 ID 查詢實體的儲存庫方法。Reactor 有一個 Mono 型別,表示單值或空的 Flux。Mono 具有與 Flux 非常相似的 API,但更專注於此,因為並非所有運算子都對單值序列有意義。RxJava 也有一個附加元件(在 1.x 版本中)稱為 Single,以及一個用於空序列的 Completable。Reactor 中的空序列是 Mono<Void>。
Flux 有*很多*方法,幾乎所有這些方法都是運算子。我們不會在這裡全部檢視它們,因為有更好的地方可以查詢(例如 Javadoc)。我們只需要瞭解運算子是什麼以及它能為你做什麼。
例如,要將 Flux 內部事件記錄到標準輸出,你可以呼叫 .log() 方法。或者你可以使用 map() 來轉換它:
Flux<String> flux = Flux.just("red", "white", "blue");
Flux<String> upper = flux
.log()
.map(String::toUpperCase);
在這段程式碼中,我們透過將輸入中的字串轉換為大寫來對其進行轉換。到目前為止,一切都很簡單。
關於這個小樣本有趣的是——如果你不習慣的話,甚至令人震驚——還沒有處理任何資料。甚至沒有記錄任何內容,因為實際上什麼都沒有發生(試試看你就知道了)。在 Flux 上呼叫運算子相當於為以後構建了一個執行計劃。它是完全宣告性的,這也是為什麼人們稱之為“函式式”的原因。運算子中實現的邏輯只有在資料開始流動時才執行,而這隻有在有人訂閱 Flux(或等效地訂閱 Publisher)時才會發生。
所有響應式庫以及 Java 8 Streams 中都存在這種處理資料序列的宣告式函式式方法。考慮這段類似的程式碼,使用與 Flux 內容相同的 Stream:
Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
System.out.println(value);
return value.toUpperCase();
});
我們對 Flux 的觀察同樣適用於這裡:沒有資料被處理,它只是一個執行計劃。然而,Flux 和 Stream 之間存在一些重要的差異,這使得 Stream 不適用於響應式用例。Flux 有更多的運算子,其中大部分只是為了方便,但真正的區別在於你想消費資料時,所以接下來我們需要看看這一點。
提示
Sebastien Deleuze 有一篇關於響應式型別的有用部落格,他在其中透過檢視它們定義的型別以及如何使用它們來描述各種流式和響應式 API 之間的差異。Flux 和 Stream 之間的差異在那裡得到了更詳細的強調。
要讓資料流動,你必須使用 subscribe() 方法之一訂閱 Flux。只有這些方法才能讓資料流動。它們會回溯到你在序列上宣告的運算子鏈(如果有的話),並請求釋出者開始建立資料。在我們一直在使用的示例中,這意味著遍歷底層字串集合。在更復雜的用例中,它可能會觸發從檔案系統讀取檔案、從資料庫拉取資料或呼叫 HTTP 服務。
以下是 subscribe() 方法的實際呼叫:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe();
輸出為:
09:17:59.665 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog - onComplete()
由此我們可以看出,不帶引數的 subscribe() 的效果是請求釋出者傳送*所有*資料——只記錄了一個 request(),它是“無限制的”。我們還可以看到釋出每個專案(onNext())、序列結束(onComplete())和原始訂閱(onSubscribe())的回撥。如果你需要,你可以使用 Flux 中的 doOn*() 方法自己監聽這些事件,這些方法本身是運算子,而不是訂閱者,所以它們不會自行導致任何資料流動。
subscribe() 方法被過載,其他變體為你提供了不同的選項來控制發生的事情。一種重要且方便的形式是帶有回撥作為引數的 subscribe()。第一個引數是一個 Consumer,它為你提供了每個專案的回撥,你還可以選擇新增一個 Consumer 用於錯誤(如果有的話),以及一個普通的 Runnable 以在序列完成時執行。例如,只使用每個專案的回撥:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(System.out::println);
以下是輸出:
09:56:12.680 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog - onComplete()
我們可以透過多種方式控制資料流,使其“有界”。用於控制的原始 API 是從 Subscriber 獲取的 Subscription。上述對 subscribe() 的短呼叫等效的完整形式是:
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String t) {
System.out.println(t);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
為了控制流,例如一次最多消費2個專案,你可以更智慧地使用 Subscription:
.subscribe(new Subscriber<String>() {
private long count = 0;
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(2);
}
@Override
public void onNext(String t) {
count++;
if (count>=2) {
count = 0;
subscription.request(2);
}
}
...
這個 Subscriber 正在“批次”處理專案,每次2個。這是一個常見的用例,所以你可能會考慮將實現提取到一個便利類中,這樣程式碼也會更具可讀性。輸出如下:
09:47:13.562 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog - onComplete()
事實上,批次訂閱者是一個非常常見的用例,以至於 Flux 中已經提供了便利方法。上面的批次處理示例可以這樣實現:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribe(null, 2);
(注意帶有請求限制的 subscribe() 呼叫)。這是輸出:
10:25:43.739 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog - onComplete()
提示
像 Spring Reactive Web 這樣為你處理序列的庫可以處理訂閱。能夠將這些關注點下推到堆疊中是件好事,因為這可以讓你避免程式碼中充斥著非業務邏輯,使其更具可讀性,更易於測試和維護。因此,通常來說,如果你能**避免訂閱**序列,或者至少將該程式碼推入處理層,並從業務邏輯中移除,這是一件好事。
上述所有日誌的一個有趣特點是它們都在“主”執行緒上,也就是呼叫 subscribe() 的執行緒。這突出了一點:Reactor 對執行緒非常節儉,因為這為你提供了獲得最佳效能的最大機會。如果你在過去 5 年裡一直在處理執行緒和執行緒池以及非同步執行,試圖從服務中榨取更多效能,這可能是一個令人驚訝的說法。但這是真的:在沒有強制切換執行緒的情況下,即使 JVM 經過最佳化以非常高效地處理執行緒,在單個執行緒上進行計算也總是更快。Reactor 已經將控制所有非同步處理的許可權交給了你,它假定你知道自己在做什麼。
Flux 提供了一些配置方法來控制執行緒邊界。例如,你可以使用 Flux.subscribeOn() 配置在後臺執行緒中處理訂閱:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.parallel())
.subscribe(null, 2);
結果可以在輸出中看到:
13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
提示
如果你自己編寫這段程式碼,或者複製貼上它,請記住在 JVM 退出之前等待處理停止。
請注意,訂閱和所有處理都在單個後臺執行緒“parallel-1-1”上進行——這是因為我們要求主 Flux 的訂閱者在後臺執行。如果專案處理是 CPU 密集型的,這很好(但實際上在後臺執行緒中沒有意義,因為你支付了上下文切換的代價,但並沒有更快地獲得結果)。你可能還希望能夠執行 I/O 密集型且可能阻塞的專案處理。在這種情況下,你希望儘可能快地完成它,而不會阻塞呼叫者。執行緒池仍然是你的朋友,這就是你從 Schedulers.parallel() 中獲得的。要將單個專案的處理切換到單獨的執行緒(最多達到池的限制),我們需要將它們分解為單獨的釋出者,併為每個釋出者請求在後臺執行緒中獲取結果。實現此目的的一種方法是使用名為 flatMap() 的運算子,它將專案對映到 Publisher(可能型別不同),然後映射回新型別的序列:
Flux.just("red", "white", "blue")
.log()
.flatMap(value ->
Mono.just(value.toUpperCase())
.subscribeOn(Schedulers.parallel()),
2)
.subscribe(value -> {
log.info("Consumed: " + value);
})
這裡要注意 flatMap() 的使用,它將專案下推到“子”釋出者中,我們可以在其中控制每個專案的訂閱,而不是整個序列的訂閱。Reactor 內建的預設行為是儘可能長時間地掛在一個執行緒上,所以如果我們要它在後臺執行緒中處理特定的專案或專案組,我們需要明確指定。實際上,這是少數幾種公認的強制並行處理的技巧之一(有關更多詳細資訊,請參閱 Reactive Gems 問題)。
輸出看起來像這樣:
15:24:36.596 [main] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog - onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog - onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
請注意,現在有多個執行緒正在消費專案,並且 flatMap() 中的併發提示確保在任何給定時間都有 2 個專案正在處理,只要它們可用。我們看到很多 request(1),因為系統試圖在管道中保持 2 個專案,並且它們通常不會同時完成處理。事實上,Reactor 在這裡試圖非常智慧,它會從上游 Publisher 預取專案,以嘗試消除訂閱者的等待時間(我們在這裡沒有看到這一點,因為數量很低——我們只處理了 3 個專案)。
提示
三個專案(“紅色”、“白色”、“藍色”)可能太少,無法令人信服地看到不止一個後臺執行緒,因此最好生成更多資料。例如,你可以使用隨機數生成器來做到這一點。
Flux 還有一個 publishOn() 方法,它的作用相同,但用於監聽器(即 onNext() 或消費者回調),而不是訂閱者本身:
Flux.just("red", "white", "blue")
.log()
.map(String::toUpperCase)
.subscribeOn(Schedulers.newParallel("sub"))
.publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
log.info("Consumed: " + value);
});
輸出看起來像這樣:
15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog - onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog - onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
請注意,消費者回調(記錄“Consumed: …”)位於釋出者執行緒 pub-1-1 上。如果你刪除 subscribeOn() 呼叫,你可能會看到第二批資料的所有處理也都在 pub-1-1 執行緒上。這再次是 Reactor 線上程使用上節儉的表現——如果R沒有明確的執行緒切換請求,它將保持在同一個執行緒上進行下一次呼叫,無論那是什麼。
注意
我們在此示例中將程式碼從 subscribe(null, 2) 更改為在 publishOn() 中新增 prefetch=2。在這種情況下,subscribe() 中的獲取大小提示將被忽略。
還有另一種訂閱序列的方法,即呼叫 Mono.block() 或 Mono.toFuture() 或 Flux.toStream()(這些是“提取器”方法——它們將你從響應式型別中拉出,進入一個不那麼靈活的阻塞抽象)。Flux 也有轉換器 collectList() 和 collectMap(),可以將 Flux 轉換為 Mono。它們實際上不訂閱序列,但它們會拋棄你可能對單個專案訂閱的任何控制。
警告
一個好的經驗法則是“**永遠不要呼叫提取器**”。當然也有一些例外(否則這些方法就不會存在)。一個值得注意的例外是在測試中,因為能夠阻塞以允許結果累積是很有用的。
這些方法作為一種逃生艙口存在,用於從響應式橋接到阻塞式;例如,如果你需要適應遺留 API,如 Spring MVC。當你呼叫 Mono.block() 時,你就會拋棄響應式流的所有優勢。這是響應式流和 Java 8 Streams 之間的關鍵區別——原生 Java Stream 只有“全有或全無”的訂閱模型,相當於 Mono.block()。當然,subscribe() 也可以阻塞呼叫執行緒,所以它和轉換器方法一樣危險,但你擁有更多的控制權——你可以透過使用 subscribeOn() 來防止它阻塞,並且你可以透過施加背壓並定期決定是否繼續來逐個處理專案。
在本文中,我們涵蓋了 Reactive Streams 和 Reactor API 的基礎知識。如果你需要了解更多,有很多地方可以查詢,但沒有什麼能替代動手編碼,所以請使用 GitHub 中的程式碼(本文在專案中名為“flux”的測試中),或者前往 Lite RX Hands On 工作坊。到目前為止,這真的只是開銷,我們還沒有學到太多用非響應式工具無法以更明顯的方式完成的事情。本系列的下一篇文章將更深入地探討響應式模型的阻塞、排程和非同步方面,並向你展示如何獲得真正的好處。