Flux 之旅 1 - 組裝與訂閱

工程 | Simon Baslé | 2019 年 3 月 6 日 | ...

這篇博文是系列文章的第一篇,旨在深入探討 Reactor 更高階的概念和內部工作原理。

它源自我關於 Flight of the Flux 的演講,我發現其內容更適合部落格文章的形式。

其他文章釋出後,我會更新下面的表格,但以下是計劃中的內容:

  1. 組裝與訂閱(本文)
  2. 除錯注意事項
  3. 執行緒跳躍與排程器
  4. 內部工作原理:任務竊取
  5. 內部工作原理:運算子融合

如果你還不瞭解 Reactive Streams 和 Reactor 的基本概念,請前往網站的學習區參考指南

事不宜遲,我們開始吧

組裝時機

初次接觸 JVM 上的 Reactive Streams 和響應式程式設計時,你首先了解的是 PublisherSubscriber 之間的高階關係:一個產生資料,另一個消費資料。很簡單對吧?此外,Publisher 似乎將資料 推送Subscriber

但在使用像 Reactor (或 RxJava2) 這樣的 Reactive Streams 庫時,你很快就會遇到這句格言:

不訂閱,一切都不會發生

有時,你可能會讀到這兩個庫實現了“推-拉混合模型”。等一下!

稍後我們會回到這一點,但要理解這句話,首先需要認識到,預設情況下,Reactor 的響應式型別是 懶惰的

呼叫 FluxMono 上的方法(即 運算子)不會立即觸發行為。相反,它會返回一個新的 Flux(或 Mono)例項,你可以在其上繼續組合更多運算子。因此,你建立了一個 運算子鏈(或運算子的非迴圈圖),它代表你的 非同步處理管道

這個 宣告式 階段稱為 組裝時機

我們來看一個例子,客戶端應用向伺服器發起 HTTP 請求,期望獲得 HttpResponse:

Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made

使用流暢 API 可以簡化如下:

Mono<String> quote = makeHttpRequest()
    .map(req -> parseJson(req))
    .map(json -> json.getString("quote"));

宣告完管道後,有兩種情況:要麼將表示處理管道的 Flux/Mono 傳遞給另一段程式碼,要麼觸發管道。

前一種情況意味著你返回 Mono 的程式碼可能會應用其他運算子,從而產生一個新的派生管道。由於運算子建立新例項(就像洋蔥一樣),你自己的 Mono 不會被修改,因此它可以被多次進一步裝飾,產生截然不同的結果:

//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);

//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

//by this point, none of the 3 "pipelines" have triggered an HTTP request

CompletableFuture 相比,CompletableFuture 本質上不是懶惰的:一旦你獲得了 CompletableFuture 的引用,就意味著處理已經在進行中了……

考慮到這一點,讓我們來看看如何觸發響應式管道。

訂閱時機

到目前為止,我們已經 組裝了一個非同步管道。也就是說,我們透過使用運算子例項化了 FluxMono 變數,從而生成了行為像洋蔥一樣層層疊加的其他 Flux/Mono 例項。

但資料還沒有開始流經這些宣告好的管道。

這是因為資料流動的觸發點不是管道的宣告,而是對它的 訂閱。請記住:

不訂閱,一切都不會發生

訂閱的行為就是在說“好的,這個管道代表資料的轉換,我對最終形式的資料感興趣”。最常見的方式是呼叫 Flux.subscribe(valueConsumer, errorConsumer)

這種興趣訊號會透過運算子鏈反向傳播,直到源運算子,即實際產生初始資料的 Publisher

makeHttpRequest() //<5>
    .map(req -> parseJson(req)) //<4>
    .map(json -> json.getString("quote")) //<3>
    .flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
    .subscribe(System.out::println, Throwable::printStackTrace); //<1>
  1. 我們訂閱 words Flux,表示希望將每個單詞列印到控制檯(並列印任何錯誤的堆疊跟蹤)。
  2. 這個興趣訊號被傳遞給 flatMapMany 步驟……
  3. ……然後沿著鏈向上傳遞給 json map 步驟……
  4. ……再然後是 request map 步驟……
  5. ……最終到達 makeHttpRequest()(我們將其視為源)。

此時,源被觸發。它以適當的方式生成資料:在這裡,它會向一個產生 JSON 的端點發起 HTTP 請求,然後發出 HTTP 響應。

從那時起,我們處於 執行時機。資料已經開始流經管道(按照更自然的從上到下順序,即從上游到下游)。

  1. HttpResponse 被髮出到 parseJson map
  2. 它提取 JSON 主體並將其發出到 getString map
  3. 它提取引用並將其傳遞給 flatMapMany
  4. flatMapMany 將引用拆分成單詞,並單獨發出每個單詞。
  5. subscribe 中的值處理器會收到每個單詞的通知,並將它們列印到控制檯,每行一個。

希望這能幫助你理解組裝時機、訂閱/執行時機之間的區別!

冷流 vs 熱流

解釋完區別並介紹了這句格言之後,可能是引入一個例外的好時機 :laughing

不訂閱,一切都不會發生……直到某些事情發生

冷流

到目前為止,我們一直在處理一種稱為 冷流 PublisherFluxMono 源。正如我們所解釋的,這些 Publisher 是懶惰的,只有在有 Subscription 時才會生成資料。此外,它們會為每個獨立的 Subscription 重新生成資料。

在我們關於 HTTP 響應 Mono 的例子中,每次訂閱都會執行 HTTP 請求。

Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request

順帶一提,某些運算子的行為意味著多次訂閱。例如,retry 會在發生錯誤(onError 訊號)時重新訂閱其源,而 repeat 會在 onComplete 訊號時做同樣的事情。

因此,對於像 HTTP 請求這樣的冷流源,像 retry 這樣的運算子會重新執行請求,從而可以從短暫的伺服器端錯誤中恢復過來。

熱流

另一方面,熱流 Publisher 則不那麼明確:它不一定需要 Subscriber 來開始推送資料。它也不一定為每個新的 Subscriber 重新生成專門的資料。

為了說明這一點,我們引入一個新的冷流 publisher 示例,然後展示如何將該冷流 publisher 轉換為熱流:

Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

輸出如下:

clock1 1s
clock1 2s
clock1 3s
    clock2 1s
clock1 4s
    clock2 2s
clock1 5s
    clock2 3s
clock1 6s
    clock2 4s

我們可以透過呼叫 share()clockTicks 源轉換為熱流。

Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

結果變為如下:

clock1 1s
clock1 2s
clock1 3s
    clock2 3s
clock1 4s
    clock2 4s
clock1 5s
    clock2 5s
clock1 6s
    clock2 6s

你會看到這兩個訂閱現在共享時鐘的相同 tick。share() 將冷流轉換為熱流,透過讓源將元素多播給新的 Subscribers,但僅多播在這些新訂閱之後發出的元素。由於 clock2 晚訂閱了 2 秒,它錯過了早期的 1s2s 的發出。

所以熱流 publishers 可以不那麼懶惰,儘管它們通常至少需要一個初始 Subscription 來觸發資料流動。

結論

在本文中,我們瞭解了例項化 Flux / 鏈式呼叫運算子(即 組裝時機)、觸發它(即 訂閱時機)和執行它(即 執行時機)之間的區別。

因此,我們瞭解到 FluxMono 大部分是懶惰的(即 冷流 Publisher):不訂閱,一切都不會發生。

最後,我們瞭解了另一種風味的 FluxMono,稱為 熱流 Publisher,它的行為略有不同,並且不那麼懶惰。

在下一篇中,我們將看到這三個階段為何在你除錯基於 reactor 的程式碼時會產生重大差異。

祝你響應式程式設計愉快!

獲取 Spring 資訊

訂閱 Spring 資訊,保持連線

訂閱

領先一步

VMware 提供培訓和認證,助你加速進步。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一次簡單訂閱。

瞭解更多

即將到來的活動

檢視 Spring 社群所有即將到來的活動。

檢視全部