領先一步
VMware 提供培訓和認證,助你加速進步。
瞭解更多這篇博文是系列文章的第一篇,旨在深入探討 Reactor 更高階的概念和內部工作原理。
它源自我關於 Flight of the Flux 的演講,我發現其內容更適合部落格文章的形式。
其他文章釋出後,我會更新下面的表格,但以下是計劃中的內容:
如果你還不瞭解 Reactive Streams 和 Reactor 的基本概念,請前往網站的學習區和參考指南。
事不宜遲,我們開始吧
初次接觸 JVM 上的 Reactive Streams 和響應式程式設計時,你首先了解的是 Publisher
和 Subscriber
之間的高階關係:一個產生資料,另一個消費資料。很簡單對吧?此外,Publisher
似乎將資料 推送 給 Subscriber
。
但在使用像 Reactor (或 RxJava2) 這樣的 Reactive Streams 庫時,你很快就會遇到這句格言:
不訂閱,一切都不會發生
有時,你可能會讀到這兩個庫實現了“推-拉混合模型”。等一下!拉?
稍後我們會回到這一點,但要理解這句話,首先需要認識到,預設情況下,Reactor 的響應式型別是 懶惰的。
呼叫 Flux
或 Mono
上的方法(即 運算子)不會立即觸發行為。相反,它會返回一個新的 Flux
(或 Mono
)例項,你可以在其上繼續組合更多運算子。因此,你建立了一個 運算子鏈(或運算子的非迴圈圖),它代表你的 非同步處理管道。
這個 宣告式 階段稱為 組裝時機。
我們來看一個例子,客戶端應用向伺服器發起 HTTP 請求,期望獲得 HttpResponse:
Mono<HttpResponse> httpSource = makeHttpRequest();
Mono<Json> jsonSource = httpSource.map(req -> parseJson(req));
Mono<String> quote = jsonSource.map(json -> json.getString("quote"));
//at this point, no HTTP request has been made
使用流暢 API 可以簡化如下:
Mono<String> quote = makeHttpRequest()
.map(req -> parseJson(req))
.map(json -> json.getString("quote"));
宣告完管道後,有兩種情況:要麼將表示處理管道的 Flux
/Mono
傳遞給另一段程式碼,要麼觸發管道。
前一種情況意味著你返回 Mono
的程式碼可能會應用其他運算子,從而產生一個新的派生管道。由於運算子建立新例項(就像洋蔥一樣),你自己的 Mono
不會被修改,因此它可以被多次進一步裝飾,產生截然不同的結果:
//you could derive a `Mono<String>` of odd-length strings vs even-length ones
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
//or even a `Flux<String>` of words in a quote
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
//by this point, none of the 3 "pipelines" have triggered an HTTP request
與 CompletableFuture
相比,CompletableFuture
本質上不是懶惰的:一旦你獲得了 CompletableFuture
的引用,就意味著處理已經在進行中了……
考慮到這一點,讓我們來看看如何觸發響應式管道。
到目前為止,我們已經 組裝了一個非同步管道。也就是說,我們透過使用運算子例項化了 Flux
和 Mono
變數,從而生成了行為像洋蔥一樣層層疊加的其他 Flux
/Mono
例項。
但資料還沒有開始流經這些宣告好的管道。
這是因為資料流動的觸發點不是管道的宣告,而是對它的 訂閱。請記住:
不訂閱,一切都不會發生
訂閱的行為就是在說“好的,這個管道代表資料的轉換,我對最終形式的資料感興趣”。最常見的方式是呼叫 Flux.subscribe(valueConsumer, errorConsumer)
。
這種興趣訊號會透過運算子鏈反向傳播,直到源運算子,即實際產生初始資料的 Publisher
:
makeHttpRequest() //<5>
.map(req -> parseJson(req)) //<4>
.map(json -> json.getString("quote")) //<3>
.flatMapMany(quote -> Flux.fromArray(quote.split(" "))) //<2>
.subscribe(System.out::println, Throwable::printStackTrace); //<1>
Flux
,表示希望將每個單詞列印到控制檯(並列印任何錯誤的堆疊跟蹤)。flatMapMany
步驟……map
步驟……map
步驟……makeHttpRequest()
(我們將其視為源)。此時,源被觸發。它以適當的方式生成資料:在這裡,它會向一個產生 JSON 的端點發起 HTTP 請求,然後發出 HTTP 響應。
從那時起,我們處於 執行時機。資料已經開始流經管道(按照更自然的從上到下順序,即從上游到下游)。
HttpResponse
被髮出到 parseJson map
。map
。flatMapMany
。flatMapMany
將引用拆分成單詞,並單獨發出每個單詞。subscribe
中的值處理器會收到每個單詞的通知,並將它們列印到控制檯,每行一個。希望這能幫助你理解組裝時機、訂閱/執行時機之間的區別!
解釋完區別並介紹了這句格言之後,可能是引入一個例外的好時機 :laughing
不訂閱,一切都不會發生……直到某些事情發生
到目前為止,我們一直在處理一種稱為 冷流 Publisher 的 Flux
和 Mono
源。正如我們所解釋的,這些 Publisher
是懶惰的,只有在有 Subscription
時才會生成資料。此外,它們會為每個獨立的 Subscription
重新生成資料。
在我們關於 HTTP 響應 Mono
的例子中,每次訂閱都會執行 HTTP 請求。
Mono<String> evenLength = quote.filter(str -> str.length() % 2 == 0);
Mono<String> oddLength = quote.filter(str -> str.length() % 2 == 1);
Flux<String> words = quote.flatMapMany(quote -> Flux.fromArray(quote.split(" ")));
evenLength.subscribe(); //this triggers an HTTP request
oddLength.subscribe(); //this triggers another HTTP request
words.subscribe(); //this triggers a third HTTP request
順帶一提,某些運算子的行為意味著多次訂閱。例如,retry
會在發生錯誤(onError
訊號)時重新訂閱其源,而 repeat
會在 onComplete
訊號時做同樣的事情。
因此,對於像 HTTP 請求這樣的冷流源,像 retry
這樣的運算子會重新執行請求,從而可以從短暫的伺服器端錯誤中恢復過來。
另一方面,熱流 Publisher 則不那麼明確:它不一定需要 Subscriber
來開始推送資料。它也不一定為每個新的 Subscriber
重新生成專門的資料。
為了說明這一點,我們引入一個新的冷流 publisher 示例,然後展示如何將該冷流 publisher 轉換為熱流:
Flux<Long> clockTicks = Flux.interval(Duration.ofSeconds(1));
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
輸出如下:
clock1 1s
clock1 2s
clock1 3s
clock2 1s
clock1 4s
clock2 2s
clock1 5s
clock2 3s
clock1 6s
clock2 4s
我們可以透過呼叫 share()
將 clockTicks
源轉換為熱流。
Flux<Long> coldTicks = Flux.interval(Duration.ofSeconds(1));
Flux<Long> clockTicks = coldTicks.share();
clockTicks.subscribe(tick -> System.out.println("clock1 " + tick + "s");
Thread.sleep(2000);
clockTicks.subscribe(tick -> System.out.println("\tclock2 " + tick + "s");
結果變為如下:
clock1 1s
clock1 2s
clock1 3s
clock2 3s
clock1 4s
clock2 4s
clock1 5s
clock2 5s
clock1 6s
clock2 6s
你會看到這兩個訂閱現在共享時鐘的相同 tick。share()
將冷流轉換為熱流,透過讓源將元素多播給新的 Subscribers
,但僅多播在這些新訂閱之後發出的元素。由於 clock2
晚訂閱了 2 秒,它錯過了早期的 1s
和 2s
的發出。
所以熱流 publishers 可以不那麼懶惰,儘管它們通常至少需要一個初始 Subscription
來觸發資料流動。
在本文中,我們瞭解了例項化 Flux
/ 鏈式呼叫運算子(即 組裝時機)、觸發它(即 訂閱時機)和執行它(即 執行時機)之間的區別。
因此,我們瞭解到 Flux
和 Mono
大部分是懶惰的(即 冷流 Publisher):不訂閱,一切都不會發生。
最後,我們瞭解了另一種風味的 Flux
和 Mono
,稱為 熱流 Publisher,它的行為略有不同,並且不那麼懶惰。
在下一篇中,我們將看到這三個階段為何在你除錯基於 reactor 的程式碼時會產生重大差異。
祝你響應式程式設計愉快!