Flux 之飛翔 1 - 組裝與訂閱

工程 | Simon Baslé | 2019 年 3 月 6 日 | ...

這篇博文是系列文章中的第一篇,旨在深入探討Reactor更高階的概念和內部工作原理。

它源於我的 Flight of the Flux 演講,我發現其內容更適合博文格式。

我將在其他文章釋出時更新下表中的連結,但這是計劃中的內容

  1. 組裝與訂閱(本文)
  2. 除錯注意事項
  3. 跳躍執行緒和排程器
  4. 內部工作原理:工作竊取
  5. 內部工作原理:運算子融合

如果您缺少對 Reactive Streams 和 Reactor 基本概念的介紹,請訪問網站的學習部分參考指南

閒話少說,我們開始吧

組裝時間

當您第一次瞭解 JVM 上的 Reactive Streams反應式程式設計 時,首先學習的是 PublisherSubscriber 之間的高階關係:一個生成資料,另一個消費資料。很簡單,對吧?此外,似乎 Publisher 資料推送到 Subscriber

但是當使用像 Reactor(或 RxJava2)這樣的 Reactive Streams 庫時,您會很快遇到以下口頭禪

不訂閱,什麼都不會發生

有時,您可能會讀到這兩個庫都實現了“推拉混合模型”。等一下!

我們稍後再討論這個問題,但要理解這句話,您首先需要認識到,預設情況下,Reactor 的反應式型別是 惰性 的。

FluxMono(即 運算子)上呼叫方法不會立即觸發行為。相反,它會返回一個 Flux(或 Mono)的新例項,您可以在其上繼續組合其他運算子。因此,您建立了一個 運算子鏈(或運算子無環圖),它代表您的 非同步處理管道

這個 宣告性 階段稱為 組裝時間

我們來看一個客戶端應用程式向伺服器發出 HTTP 請求,期望得到 HttpResponse 的例子

Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made

這可以使用流暢的 API 進行簡化

Mono<String> quote = makeHttpRequest()
    .map(req -> parseJson(req))
    .map(json -> json.getString("quote"));

宣告完管道後,有兩種情況:要麼將代表處理管道的 Flux/Mono 傳遞給另一段程式碼,要麼觸發管道。

前者意味著你返回 Mono 的程式碼可能會應用其他運算子,從而產生一個新的派生管道。由於運算子會建立新的例項(就像一個洋蔥),你自己的 Mono 不會被修改,所以它可以被多次裝飾,產生截然不同的結果

//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);

//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

//by this point, none of the 3 "pipelines" have triggered an HTTP request

CompletableFuture 相比,它本質上不是惰性的:一旦你獲得了 CompletableFuture 的引用,就意味著處理已經進行中……

考慮到這一點,讓我們看看如何觸發反應式管道。

訂閱時間

到目前為止,我們已經 組裝了一個非同步管道。也就是說,我們透過使用 運算子 例項化了 FluxMono 變數,這些變數會生成像洋蔥一樣分層的 Flux/Mono

但是資料尚未開始流經這些已宣告的管道。

那是因為資料流動的觸發器不是管道的宣告,而是對它的 訂閱。記住

不訂閱,什麼都不會發生

訂閱是說“好的,這個管道代表資料的轉換,我對此資料的最終形式感興趣”的行為。最常見的方法是呼叫 Flux.subscribe(valueConsumer, errorConsumer)

這種興趣訊號透過運算子鏈向後傳播,直到 運算子,即實際生成初始資料的 Publisher

makeHttpRequest() //<5>
    .map(req -> parseJson(req)) //<4>
    .map(json -> json.getString("quote")) //<3>
    .flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
    .subscribe(System.out::println, Throwable::printStackTrace); //<1>
  1. 我們訂閱 Flux 單詞,表示我們想將每個單詞列印到控制檯(並列印任何錯誤的堆疊跟蹤)
  2. 這個興趣訊號被髮送到 flatMapMany 步驟...
  3. ...然後將其訊號上傳到 JSON map 步驟...
  4. ...然後是請求 map 步驟...
  5. ...最終到達 makeHttpRequest()(我們將其視為我們的來源)

此時,源被觸發。它以適當的方式生成資料:在這裡,它將向產生 JSON 的端點發出 HTTP 請求,然後發出 HTTP 響應。

從那時起,我們就進入了 執行時間。資料已開始流經管道(以更自然的自上而下順序,或 上游下游

  1. HttpResponse 被髮送到 parseJson map
  2. 它提取 JSON 正文並將其傳送到 getString map
  3. 它提取 quote 並將其傳遞給 flatMapMany
  4. flatMapMany 將引用拆分為單詞並單獨發出每個單詞
  5. subscribe 中的值處理程式會收到每個單詞的通知,並將其逐行列印到控制檯。

希望這能幫助您理解組裝時間與訂閱/執行時間之間的區別!

冷與熱

解釋完區別並引入這個口頭禪後,現在可能是引入一個例外的最佳時機 :laughing

不訂閱,什麼都不會發生……直到某些事情發生

到目前為止,我們一直在處理一種名為 PublisherFluxMono 源。正如我們所解釋的,這些 Publisher 是惰性的,只有在有 Subscription 時才生成資料。此外,它們為每個單獨的 Subscription 重新生成資料。

在我們的 HTTP 響應 Mono 示例中,HTTP 請求將在每次訂閱時執行

Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));

evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request

另外,有些運算子的行為意味著多次訂閱。例如,retry 在發生錯誤(onError 訊號)時會重新訂閱其源,而 repeatonComplete 訊號時也會這樣做。

因此,對於像 HTTP 請求這樣的冷源,像 retry 這樣的操作會重新執行請求,從而允許從瞬態伺服器端錯誤中恢復。

另一方面,Publisher 則不那麼明確:它不一定需要 Subscriber 才能開始泵送資料。它也不一定為每個新的 Subscriber 重新生成專用資料。

為了說明這一點,我們先介紹一個冷的釋出者示例,然後我們將展示如何將該 釋出者轉換為 釋出者

Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

這會打印出

clock1 1s
clock1 2s
clock1 3s
    clock2 1s
clock1 4s
    clock2 2s
clock1 5s
    clock2 3s
clock1 6s
    clock2 4s

我們可以透過呼叫 share()clockTicks 源轉換為熱源。

Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();

clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");

Thread.sleep(2000);

clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");

它反而產生了以下結果

clock1 1s
clock1 2s
clock1 3s
    clock2 3s
clock1 4s
    clock2 4s
clock1 5s
    clock2 5s
clock1 6s
    clock2 6s

您會發現兩個訂閱現在共享相同的時鐘滴答。share() 透過讓源將元素多播給新的 Subscribers 來將冷轉換為熱,但 僅限在這些新訂閱之後發出的元素。由於 clock2 訂閱晚了 2 秒,它錯過了早期的發出 1s2s

因此,熱釋出者可以不那麼惰性,儘管它們通常至少需要一個初始 Subscription 來觸發資料流。

結論

在本文中,我們瞭解了例項化 Flux / 鏈式運算子(又稱 組裝時間)、觸發它(又稱 訂閱時間)和執行它(又稱 執行時間)之間的區別。

因此我們瞭解到 FluxMono 大多是惰性的(又稱 Publisher):不訂閱,什麼都不會發生

最後,我們瞭解了 FluxMono 的另一種形式,稱為 Publisher,它的行為略有不同,並且不那麼惰性。

在下一篇文章中,我們將探討這三個階段如何在開發人員除錯基於 Reactor 的程式碼時產生重大影響。

與此同時,祝您反應式程式設計愉快!

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有