領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多這篇博文是系列文章中的第一篇,旨在深入探討Reactor更高階的概念和內部工作原理。
它源於我的 Flight of the Flux 演講,我發現其內容更適合博文格式。
我將在其他文章釋出時更新下表中的連結,但這是計劃中的內容
如果您缺少對 Reactive Streams 和 Reactor 基本概念的介紹,請訪問網站的學習部分和參考指南。
閒話少說,我們開始吧
當您第一次瞭解 JVM 上的 Reactive Streams 和 反應式程式設計 時,首先學習的是 Publisher 和 Subscriber 之間的高階關係:一個生成資料,另一個消費資料。很簡單,對吧?此外,似乎 Publisher 將資料推送到 Subscriber。
但是當使用像 Reactor(或 RxJava2)這樣的 Reactive Streams 庫時,您會很快遇到以下口頭禪
不訂閱,什麼都不會發生
有時,您可能會讀到這兩個庫都實現了“推拉混合模型”。等一下!拉?
我們稍後再討論這個問題,但要理解這句話,您首先需要認識到,預設情況下,Reactor 的反應式型別是 惰性 的。
在 Flux 或 Mono(即 運算子)上呼叫方法不會立即觸發行為。相反,它會返回一個 Flux(或 Mono)的新例項,您可以在其上繼續組合其他運算子。因此,您建立了一個 運算子鏈(或運算子無環圖),它代表您的 非同步處理管道。
這個 宣告性 階段稱為 組裝時間。
我們來看一個客戶端應用程式向伺服器發出 HTTP 請求,期望得到 HttpResponse 的例子
Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made
這可以使用流暢的 API 進行簡化
Mono<String> quote = makeHttpRequest()
.map(req -> parseJson(req))
.map(json -> json.getString("quote"));
宣告完管道後,有兩種情況:要麼將代表處理管道的 Flux/Mono 傳遞給另一段程式碼,要麼觸發管道。
前者意味著你返回 Mono 的程式碼可能會應用其他運算子,從而產生一個新的派生管道。由於運算子會建立新的例項(就像一個洋蔥),你自己的 Mono 不會被修改,所以它可以被多次裝飾,產生截然不同的結果
//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
//by this point, none of the 3 "pipelines" have triggered an HTTP request
與 CompletableFuture 相比,它本質上不是惰性的:一旦你獲得了 CompletableFuture 的引用,就意味著處理已經進行中……
考慮到這一點,讓我們看看如何觸發反應式管道。
到目前為止,我們已經 組裝了一個非同步管道。也就是說,我們透過使用 運算子 例項化了 Flux 和 Mono 變數,這些變數會生成像洋蔥一樣分層的 Flux/Mono。
但是資料尚未開始流經這些已宣告的管道。
那是因為資料流動的觸發器不是管道的宣告,而是對它的 訂閱。記住
不訂閱,什麼都不會發生
訂閱是說“好的,這個管道代表資料的轉換,我對此資料的最終形式感興趣”的行為。最常見的方法是呼叫 Flux.subscribe(valueConsumer, errorConsumer)。
這種興趣訊號透過運算子鏈向後傳播,直到 源 運算子,即實際生成初始資料的 Publisher。
makeHttpRequest() //<5>
.map(req -> parseJson(req)) //<4>
.map(json -> json.getString("quote")) //<3>
.flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
.subscribe(System.out::println, Throwable::printStackTrace); //<1>
Flux 單詞,表示我們想將每個單詞列印到控制檯(並列印任何錯誤的堆疊跟蹤)flatMapMany 步驟...map 步驟...map 步驟...makeHttpRequest()(我們將其視為我們的來源)此時,源被觸發。它以適當的方式生成資料:在這裡,它將向產生 JSON 的端點發出 HTTP 請求,然後發出 HTTP 響應。
從那時起,我們就進入了 執行時間。資料已開始流經管道(以更自然的自上而下順序,或 上游 到 下游)
HttpResponse 被髮送到 parseJson mapgetString mapflatMapManyflatMapMany 將引用拆分為單詞並單獨發出每個單詞subscribe 中的值處理程式會收到每個單詞的通知,並將其逐行列印到控制檯。希望這能幫助您理解組裝時間與訂閱/執行時間之間的區別!
解釋完區別並引入這個口頭禪後,現在可能是引入一個例外的最佳時機 :laughing
不訂閱,什麼都不會發生……直到某些事情發生
到目前為止,我們一直在處理一種名為 冷 Publisher 的 Flux 和 Mono 源。正如我們所解釋的,這些 Publisher 是惰性的,只有在有 Subscription 時才生成資料。此外,它們為每個單獨的 Subscription 重新生成資料。
在我們的 HTTP 響應 Mono 示例中,HTTP 請求將在每次訂閱時執行
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request
另外,有些運算子的行為意味著多次訂閱。例如,retry 在發生錯誤(onError 訊號)時會重新訂閱其源,而 repeat 在 onComplete 訊號時也會這樣做。
因此,對於像 HTTP 請求這樣的冷源,像 retry 這樣的操作會重新執行請求,從而允許從瞬態伺服器端錯誤中恢復。
另一方面,熱 Publisher 則不那麼明確:它不一定需要 Subscriber 才能開始泵送資料。它也不一定為每個新的 Subscriber 重新生成專用資料。
為了說明這一點,我們先介紹一個冷的釋出者示例,然後我們將展示如何將該 冷 釋出者轉換為 熱 釋出者
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
這會打印出
clock1 1s
clock1 2s
clock1 3s
clock2 1s
clock1 4s
clock2 2s
clock1 5s
clock2 3s
clock1 6s
clock2 4s
我們可以透過呼叫 share() 將 clockTicks 源轉換為熱源。
Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
它反而產生了以下結果
clock1 1s
clock1 2s
clock1 3s
clock2 3s
clock1 4s
clock2 4s
clock1 5s
clock2 5s
clock1 6s
clock2 6s
您會發現兩個訂閱現在共享相同的時鐘滴答。share() 透過讓源將元素多播給新的 Subscribers 來將冷轉換為熱,但 僅限在這些新訂閱之後發出的元素。由於 clock2 訂閱晚了 2 秒,它錯過了早期的發出 1s 和 2s。
因此,熱釋出者可以不那麼惰性,儘管它們通常至少需要一個初始 Subscription 來觸發資料流。
在本文中,我們瞭解了例項化 Flux / 鏈式運算子(又稱 組裝時間)、觸發它(又稱 訂閱時間)和執行它(又稱 執行時間)之間的區別。
因此我們瞭解到 Flux 和 Mono 大多是惰性的(又稱 冷 Publisher):不訂閱,什麼都不會發生。
最後,我們瞭解了 Flux 和 Mono 的另一種形式,稱為 熱 Publisher,它的行為略有不同,並且不那麼惰性。
在下一篇文章中,我們將探討這三個階段如何在開發人員除錯基於 Reactor 的程式碼時產生重大影響。
與此同時,祝您反應式程式設計愉快!