Reactive 程式設計筆記(二):編寫一些程式碼

工程 | Dave Syer | June 13, 2016 | ...

在本文中,我們將繼續探討 Reactive 程式設計系列,並著重透過實際程式碼示例來解釋一些概念。最終目標是讓您更好地理解 Reactive 的不同之處以及它為何是函式式的。這裡的示例相當抽象,但它們為您提供了一種思考 API 和程式設計風格的方式,並開始感受它的不同之處。我們將看到 Reactive 的組成元素,並學習如何控制資料流,以及如何在必要時在後臺執行緒中進行處理。

設定專案

我們將使用 Reactor 庫來闡述我們需要說明的要點。程式碼使用其他工具編寫同樣容易。如果您想自己動手嘗試程式碼並檢視其執行情況,而無需複製貼上任何內容,可以在 Github 中找到帶有測試的示例程式碼。

要開始,請從 https://start.spring.io 獲取一個空白專案並新增 Reactor Core 依賴。使用 Maven

		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-core</artifactId>
			<version>3.0.0.RC2</version>
		</dependency>

使用 Gradle 也非常相似

    compile 'io.projectreactor:reactor-core:3.0.0.RC2'

現在我們來編寫一些程式碼。

它為何是函式式的?

Reactive 的基本構建塊是一個事件序列,以及兩個主角:事件的釋出者(publisher)和訂閱者(subscriber)。將序列稱為“流”(stream)也可以,因為它就是流。如果需要,我們將使用小寫字母開頭的“stream”,但 Java 8 有一個 java.util.Stream,它有所不同,所以儘量不要混淆。無論如何,我們將嘗試把重點放在釋出者和訂閱者上(Reactive Streams 就是這樣做的)。

Reactor 是我們將在示例中使用的庫,因此我們將遵循其表示法,並將釋出者稱為 Flux(它實現了 Reactive Streams 的 Publisher 介面)。RxJava 庫非常相似,並且有很多並行功能,因此在這種情況下,我們將討論 Observable,但程式碼會非常相似。(Reactor 2.0 將其稱為 Stream,如果還需要討論 Java 8 Streams 會引起混淆,所以我們將只使用 Reactor 3.0 中的新程式碼。)

生成器

Flux 是特定 POJO 型別事件序列的釋出者,因此它是泛型的,即 Flux<T>T 的釋出者。Flux 提供了一些靜態便利方法,可以從多種源建立其自身例項。例如,從陣列建立 Flux

Flux<String> flux = Flux.just("red", "white", "blue");

我們剛剛生成了一個 Flux,現在可以用它做一些事情。實際上,您只能用它做兩件事:對其進行操作(轉換它或與其他序列組合),以及訂閱它(它是一個釋出者)。

單值序列

您經常會遇到已知僅包含一個或零個元素的序列,例如按 ID 查詢實體的 repository 方法。Reactor 有一個 Mono 型別,表示單值或空的 FluxMono 的 API 與 Flux 非常相似,但更具針對性,因為並非所有運算子都適用於單值序列。RxJava 也有一個附加元件(在 1.x 版本中)稱為 Single,還有一個用於空序列的 Completable。Reactor 中的空序列是 Mono<Void>

運算子

Flux 上有很多方法,幾乎所有方法都是運算子。我們不會在這裡全部介紹,因為有更好的地方(例如 Javadoc)可以查閱。我們只需要瞭解運算子是什麼以及它能為您做什麼。

例如,要請求將 Flux 內部事件記錄到標準輸出,您可以呼叫 .log() 方法。或者您可以使用 map() 進行轉換

Flux<String> flux = Flux.just("red", "white", "blue");

Flux<String> upper = flux
  .log()
  .map(String::toUpperCase);

在這段程式碼中,我們透過將字串轉換為大寫來轉換輸入中的字串。到目前為止,這很簡單。

這個小示例有趣的地方在於——如果您不習慣,甚至會感到震撼——就是目前還沒有處理任何資料。甚至沒有記錄任何內容,因為實際上什麼也沒發生(試試看就知道了)。在 Flux 上呼叫運算子相當於構建一個稍後執行的計劃。它是完全宣告式的,這就是為什麼人們稱其為“函式式”的原因。運算子中實現的邏輯只在資料開始流動時執行,而這隻有當有人訂閱了 Flux(或等效地訂閱了 Publisher)時才會發生。

所有 Reactive 庫以及 Java 8 Streams 中都存在這種宣告式、函式式處理資料序列的方法。考慮這段看起來相似的程式碼,使用與 Flux 內容相同的 Stream

Stream<String> stream = Streams.of("red", "white", "blue");
Stream<String> upper = stream.map(value -> {
    System.out.println(value);
    return value.toUpperCase();
});

我們對 Flux 的觀察在這裡也適用:沒有資料被處理,它只是一個執行計劃。然而,FluxStream 之間有一些重要的區別,這使得 Stream 不適合 Reactive 用例。Flux 有更多的運算子,其中很多隻是為了方便,但真正的區別在於您想要消費資料時,所以這就是我們接下來需要探討的內容。

提示

Sebastien Deleuze 寫了一篇關於 Reactive 型別的有用部落格,他在其中透過檢視各種流式和響應式 API 定義的型別以及如何使用它們來描述它們之間的差異。其中更詳細地強調了 FluxStream 之間的區別。

訂閱者

要使資料流動,您必須使用其中一個 subscribe() 方法訂閱 Flux。只有這些方法才能使資料流動。它們會沿著您在序列上宣告的運算子鏈(如果存在)回溯,並請求釋出者開始建立資料。在我們一直在使用的示例中,這意味著遍歷底層字串集合。在更復雜的用例中,它可能會觸發從檔案系統讀取檔案,或者從資料庫拉取資料,或者呼叫 HTTP 服務。

下面是一個 subscribe() 的實際呼叫

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe();

輸出如下

09:17:59.665 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@3ffc5af1)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:17:59.666 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:17:59.667 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

因此我們可以從中看到,不帶引數的 subscribe() 的作用是請求釋出者傳送所有資料——只記錄了一個 request() 並且它是“無限制的”(unbounded)。我們還可以看到釋出每個專案時的回撥(onNext()),序列結束時的回撥(onComplete()),以及原始訂閱時的回撥(onSubscribe())。如果需要,您可以使用 Flux 中的 doOn*() 方法自己監聽這些事件,這些方法本身是運算子,而不是訂閱者,所以它們本身不會導致資料流動。

subscribe() 方法是過載的,其他變體為您提供了不同的選項來控制行為。一種重要且方便的形式是帶回調函式作為引數的 subscribe()。第一個引數是一個 Consumer,它為每個專案提供一個回撥;您還可以選擇新增一個用於處理錯誤的 Consumer(如果發生錯誤),以及一個在序列完成時執行的普通 Runnable。例如,只使用逐項回撥

Flux.just("red", "white", "blue")
    .log()
    .map(String::toUpperCase)
.subscribe(System.out::println);

輸出如下

09:56:12.680 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@59f99ea)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  request(unbounded)
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
RED
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
WHITE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
BLUE
09:56:12.682 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

我們可以透過多種方式控制資料流,使其“有界”(bounded)。原始控制 API 是您從 Subscriber 獲得的 Subscription。上面簡短的 subscribe() 呼叫的等效完整形式是

.subscribe(new Subscriber<String>() {

    @Override
    public void onSubscribe(Subscription s) {
        s.request(Long.MAX_VALUE);
    }
    @Override
    public void onNext(String t) {
        System.out.println(t);
    }
    @Override
    public void onError(Throwable t) {
    }
    @Override
    public void onComplete() {
    }

});

要控制流,例如一次最多消費 2 個專案,您可以更智慧地使用 Subscription

.subscribe(new Subscriber<String>() {

    private long count = 0;
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(2);
    }

    @Override
    public void onNext(String t) {
        count++;
        if (count>=2) {
            count = 0;
            subscription.request(2);
        }
     }
...

這個 Subscriber 正在“批次”處理專案,每次 2 個。這是一個常見用例,因此您可以考慮將實現提取到一個便利類中,這樣也會使程式碼更具可讀性。輸出如下

09:47:13.562 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@61832929)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.564 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  request(2)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
09:47:13.565 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

事實上,批次處理訂閱者是一個如此常見的用例,以至於 Flux 中已經提供了便利方法。上面的批次處理示例可以這樣實現

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
.subscribe(null, 2);

(注意呼叫 subscribe() 時帶有請求限制)。輸出如下

10:25:43.739 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@4667ae56)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.740 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  request(2)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onNext(blue)
10:25:43.741 [main] INFO reactor.core.publisher.FluxLog -  onComplete()

提示

像 Spring Reactive Web 這樣會為您處理序列的庫可以處理訂閱。將這些關注點下推到更底層是一個好主意,因為這可以避免用非業務邏輯程式碼混淆您的程式碼,使其更具可讀性,更容易測試和維護。因此,通常來說,如果可以避免訂閱序列,或者至少將該程式碼推到處理層,使其脫離業務邏輯,這是件好事。

執行緒、排程器和後臺處理

上面所有日誌的一個有趣的特點是它們都在“主”執行緒上,也就是呼叫 subscribe() 的執行緒。這突出了一個重要點:Reactor 線上程使用上極其節儉,因為這為您提供了獲得最佳效能的最大機會。如果您在過去 5 年裡一直在與執行緒、執行緒池和非同步執行打交道,試圖從服務中榨取更多效能,這可能是一個令人驚訝的說法。但這是真的:在沒有明確指令要求切換執行緒的情況下,即使 JVM 經過最佳化可以非常高效地處理執行緒,在單個執行緒上進行計算總是更快。Reactor 已經把控制所有非同步處理的鑰匙交給了您,並假設您知道自己在做什麼。

Flux 提供了一些配置方法來控制執行緒邊界。例如,您可以使用 Flux.subscribeOn() 將訂閱配置為在後臺執行緒中處理

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.parallel())
.subscribe(null, 2);

結果可以在輸出中看到

13:43:41.279 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxArray$ArraySubscription@58663fc3)
13:43:41.280 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
13:43:41.281 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()

提示

如果您自己編寫或複製貼上這段程式碼,請記住在 JVM 退出之前等待處理停止。

注意,訂閱以及所有處理都在一個後臺執行緒“parallel-1-1”上進行——這是因為我們要求主 Flux 的訂閱者在後臺執行。如果專案處理是 CPU 密集型的,這樣做是沒問題的(但實際上在後臺執行緒中是無意義的,因為您付出了上下文切換的成本,但並不能更快地獲得結果)。您可能還希望執行 I/O 密集型且可能阻塞的專案處理。在這種情況下,您會希望儘快完成它而不會阻塞呼叫者。執行緒池仍然是您的朋友,這就是您從 Schedulers.parallel() 獲得的。要將單個專案的處理切換到單獨的執行緒(最多達到執行緒池的限制),我們需要將它們分解成單獨的釋出者,併為每個釋出者請求在後臺執行緒中處理結果。一種方法是使用名為 flatMap() 的運算子,它將專案對映到一個 Publisher(可能型別不同),然後映射回一個新型別的序列

Flux.just("red", "white", "blue")
  .log()
  .flatMap(value ->
     Mono.just(value.toUpperCase())
       .subscribeOn(Schedulers.parallel()),
     2)
.subscribe(value -> {
  log.info("Consumed: " + value);
})

這裡要注意使用 flatMap() 將專案下推到“子”釋出者中,我們可以在其中控制每個專案的訂閱,而不是整個序列的訂閱。Reactor 內建的預設行為是儘可能地保持在單個執行緒上,因此如果希望它在後臺執行緒中處理特定專案或專案組,我們需要明確指定。實際上,這是強制並行處理的少數公認技巧之一(詳情請參閱 Reactive Gems 問題)。

輸出如下

15:24:36.596 [main] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@6f1fba17)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  request(2)
15:24:36.610 [main] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:24:36.613 [main] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:24:36.613 [parallel-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  request(1)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:24:36.613 [parallel-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:24:36.614 [parallel-3-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE
15:24:36.617 [parallel-2-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE

請注意,現在有多個執行緒正在消費這些專案,並且 flatMap() 中的併發提示確保在任何給定時間都有 2 個專案正在處理,只要它們可用。我們經常看到 request(1),因為系統試圖在 pipeline 中保持 2 個專案,並且通常它們不會同時完成處理。實際上,Reactor 在這方面非常智慧,它會從上游 Publisher 預取專案,以儘量消除訂閱者的等待時間(這裡看不到這一點,因為數量很小——我們只處理了 3 個專案)。

提示

三個專案(“red”,“white”,“blue”)可能太少,無法令人信服地看到多個後臺執行緒,因此最好生成更多資料。例如,您可以使用隨機數生成器來實現。

Flux 還有一個 publishOn() 方法,作用類似,但用於監聽器(即 onNext() 或消費者回調),而不是訂閱者本身

Flux.just("red", "white", "blue")
  .log()
  .map(String::toUpperCase)
  .subscribeOn(Schedulers.newParallel("sub"))
  .publishOn(Schedulers.newParallel("pub"), 2)
.subscribe(value -> {
    log.info("Consumed: " + value);
});

輸出如下

15:12:09.750 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onSubscribe(reactor.core.publisher.FluxIterable$IterableSubscription@172ed57)
15:12:09.758 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(red)
15:12:09.759 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(white)
15:12:09.770 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: RED
15:12:09.771 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: WHITE
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  request(2)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onNext(blue)
15:12:09.777 [sub-1-1] INFO reactor.core.publisher.FluxLog -  onComplete()
15:12:09.783 [pub-1-1] INFO com.example.FluxFeaturesTests - Consumed: BLUE

注意,消費者回調(日誌記錄“Consumed: …​”)位於釋出者執行緒 pub-1-1 上。如果您移除 subscribeOn() 呼叫,您可能會看到第二批資料的所有處理也在 pub-1-1 執行緒上進行。這再次體現了 Reactor 線上程使用上的節儉——如果沒有明確請求切換執行緒,它會在下一個呼叫中保持在同一執行緒上,無論該呼叫是什麼。

注意

我們在此示例中將程式碼從 subscribe(null, 2) 改為在 publishOn() 中新增 prefetch=2。在這種情況下,subscribe() 中的抓取大小提示會被忽略。

提取器:來自黑暗面的訂閱者

還有另一種訂閱序列的方式,即呼叫 Mono.block()Mono.toFuture()Flux.toStream()(這些是“提取器”方法——它們將您從 Reactive 型別中帶出,進入一個不太靈活的阻塞抽象)。Flux 還有轉換器 collectList()collectMap(),可以將 Flux 轉換為 Mono。它們實際上並不訂閱序列,但它們會丟棄您可能對單個專案的訂閱擁有的任何控制權。

警告

一個好的經驗法則是“永遠不要呼叫提取器”。也有一些例外情況(否則這些方法就不會存在)。一個值得注意的例外是在測試中,因為能夠阻塞以允許結果累積是有用的。

這些方法是作為從 Reactive 到阻塞的橋樑或逃生艙而存在的;例如,如果您需要適配遺留 API,比如 Spring MVC。當您呼叫 Mono.block() 時,您就丟棄了 Reactive Streams 的所有好處。這是 Reactive Streams 和 Java 8 Streams 之間的關鍵區別——原生的 Java Stream 只有“全部或無”的訂閱模型,相當於 Mono.block()。當然,subscribe() 也可以阻塞呼叫執行緒,所以它與轉換方法一樣危險,但您有更多的控制權——您可以使用 subscribeOn() 阻止它阻塞,並且可以透過應用背壓並定期決定是否繼續來逐項推送專案。

結論

在本文中,我們介紹了 Reactive Streams 和 Reactor API 的基礎知識。如果您需要了解更多資訊,有很多地方可以查詢,但沒有比動手程式設計更好的替代方法了,因此請使用 GitHub 中的程式碼(本文的測試位於名為“flux”的專案中),或前往 Lite RX Hands On 研討會。到目前為止,這真的只是開銷,我們並沒有學到用非 Reactive 工具無法以更明顯的方式完成的很多東西。該系列的下一篇文章將更深入地探討 Reactive 模型的阻塞、排程和非同步方面,並向您展示如何獲得真正的好處的機會。

訂閱 Spring 簡報

訂閱 Spring 簡報,保持聯絡

訂閱

先人一步

VMware 提供培訓和認證,助您加速前進。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部