超越
VMware 提供培訓和認證,助力您的飛速發展。
瞭解更多在本文中,我們將繼續關於響應式程式設計的系列文章。本篇的重點不再是學習基本 API,而是更具體的用例和編寫實際有用的程式碼。我們將看到響應式是如何成為併發程式設計的有用抽象的,同時也會了解到它有一些非常底層的特性,我們需要謹慎對待。如果我們開始充分利用這些特性,就可以控制應用程式中以前由容器、平臺和框架隱藏的層。
響應式迫使您以不同的方式看待世界。不再是請求某物並獲得它(或未獲得它),而是所有東西都作為序列(Publisher
)交付,您必須訂閱它。不再等待答案,而是需要註冊一個回撥。一旦習慣了,這並不難,但除非整個世界都發生了翻天覆地的變化並變得響應式,否則您會發現需要與舊式的阻塞式 API 互動。
假設我們有一個返回 HttpStatus
的阻塞方法
private RestTemplate restTemplate = new RestTemplate();
private HttpStatus block(int value) {
return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
.getStatusCode();
}
並且我們想用不同的引數重複呼叫它並聚合結果。這是一個經典的“雜湊-聚集”用例,例如,如果您有一個分頁的後端需要彙總跨多個頁面的“前 N”項,就會遇到這種情況。由於非響應式(阻塞式)操作的細節與雜湊-聚集模式無關,我們可以將它們下推到名為 block()
的方法中,並在稍後實現。這是一個(糟糕的)示例,它呼叫後端並將結果聚合成一個 Result
型別的物件
Flux.range(1, 10) // (1)
.log()
.map(this::block) // (2)
.collect(Result::new, Result::add) // (3)
.doOnSuccess(Result::stop) // (4)
呼叫 10 次
此處是阻塞式程式碼
收集結果並聚合成一個物件
最後停止計時(結果是 Mono<Result>
)
請勿在家中嘗試。這是一個“糟糕的”示例,因為雖然 API 在技術上使用正確,但我們知道它會阻塞呼叫執行緒;這段程式碼或多或少等同於一個 for 迴圈,其中包含 block()
方法的呼叫。更好的實現是將 block()
呼叫推送到後臺執行緒。我們可以透過將其包裝在返回 Mono<HttpStatus>
的方法中來實現
private Mono<HttpStatus> fetch(int value) {
return Mono.fromCallable(() -> block(value)) // (1)
.subscribeOn(this.scheduler); // (2)
}
此處是 Callable 內部的阻塞式程式碼,用於延遲執行
在後臺執行緒上訂閱慢速的 Publisher
scheduler
被單獨宣告為一個共享欄位:Scheduler scheduler = Schedulers.parallel()
。然後我們可以宣告我們想要使用 flatMap()
代替 map()
來處理序列
Flux.range(1, 10)
.log()
.flatMap( // (1)
this::fetch, 4) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop)
進入新的 Publisher 以並行處理
flatMap 中的併發提示
如果想在非響應式環境中執行上面的雜湊-聚集程式碼,可以使用 Spring MVC,如下所示
@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
return Flux.range(1, 10)
...
.doOnSuccess(Result::stop)
.toFuture();
}
如果你閱讀 @RequestMapping
的 Javadoc 文件,你會發現一個方法可以返回一個 CompletableFuture
,“應用程式使用它在由其自行選擇的單獨執行緒中生成返回值”。在這種情況下,這個單獨的執行緒由“scheduler”提供,它是一個執行緒池,因此處理正在多個執行緒上進行,每次4個,因為 flatMap()
被呼叫的方式。
使用後臺執行緒的雜湊-聚集是一種有用的模式,但它並非完美無缺——它沒有阻塞呼叫者,但它阻塞了某些東西,所以只是將問題轉移了。這有一些實際意義。我們有一個 HTTP 伺服器,它有(可能)非阻塞的 IO 處理器,將工作傳遞迴一個執行緒池,每個 HTTP 請求一個執行緒——所有這些都發生在 Servlet 容器(例如 Tomcat)內部。請求是非同步處理的,因此 Tomcat 中的工作執行緒沒有被阻塞,而我們在“scheduler”中建立的執行緒池最多可以在 4 個併發執行緒上進行處理。我們正在處理 10 個後端請求(對 block()
的呼叫),因此使用 scheduler 理論上可以帶來最多 4 倍的延遲降低。換句話說,如果在一個執行緒中按順序處理所有 10 個請求需要 1000 毫秒,那麼對於我們 HTTP 服務接收到的單個傳入請求,處理時間可能會降至 250 毫秒。但是我們應該強調“可能”:只有在處理執行緒(在兩個階段,Tomcat 工作執行緒和應用 scheduler)沒有爭用的情況下,它才會這麼快。如果您的伺服器有大量核心,併發性非常低,即連線到您應用程式的客戶端數量很少,並且幾乎沒有兩個客戶端同時發出請求的可能性,那麼您可能會看到接近理論上的改進。一旦有多個客戶端試圖連線,它們都會競爭相同的 4 個執行緒,延遲會上升,甚至可能比單個客戶端沒有後臺處理時更差。我們可以透過建立一個更大的執行緒池來改善併發客戶端的延遲,例如
private Scheduler scheduler = Schedulers.newParallel("sub", 16);
(16 個執行緒。)現在我們為執行緒及其堆疊使用了更多記憶體,並且在低併發情況下可以預期延遲會降低,但在高併發情況下則不一定,如果我們的硬體核心數少於 16 個。我們也不期望在高負載下獲得更高的吞吐量:如果執行緒存在競爭,管理這些資源的成本很高,這必須體現在某個重要的指標中。如果您對這種權衡的更詳細分析感興趣,可以參考 Rob Harrop 的系列部落格中關於負載下效能指標如何擴充套件的詳細分析。
提示
Tomcat 預設分配 100 個執行緒來處理 HTTP 請求。如果我們知道所有處理都將在我們的 scheduler 執行緒池上進行,那麼這數量就過多了。存在阻抗不匹配:scheduler 執行緒池可能成為瓶頸,因為它比上游的 Tomcat 執行緒池執行緒少。這突出了效能調優非常困難,並且雖然您可以控制所有配置,但這是一種微妙的平衡。
如果我們使用一個根據需求調整容量的 scheduler,我們可以做得比固定執行緒池更好。Reactor 為此提供了便利,所以如果您使用 Schedulers.elastic()
嘗試相同的程式碼(您可以在任何地方呼叫它;只有一個例項),您會發現在負載下會建立更多執行緒。
從阻塞式到響應式的橋接是一種有用的模式,並且使用 Spring MVC 中的現有技術很容易實現(如上所示)。響應式旅程的下一階段是完全擺脫應用程式執行緒中的阻塞,這需要新的 API 和新工具。最終,我們必須從上到下都是響應式的,包括伺服器和客戶端。這就是 Spring Reactive 的目標,它是一個新的框架,與 Spring MVC 正交,但滿足同樣的需求,並使用類似的程式設計模型。
注意
Spring Reactive 最初是一個獨立專案,但在 Spring Framework 5.0 版本中被整合進去(第一個里程碑於 2016 年 6 月釋出)。
在我們的雜湊-聚集示例中,完全實現響應式的第一步是在 classpath 中用 spring-boot-starter-webflux
替換 spring-boot-starter-web
。對於 Maven:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
或者對於 Gradle:
dependencies {
compile('org.springframework.boot:spring-boot-starter-webflux')
...
}
然後在控制器中,我們可以簡單地去掉與 CompletableFuture
的橋接,直接返回一個 Mono
型別的物件
@RequestMapping("/parallel")
public Mono<Result> parallel() {
return Flux.range(1, 10)
.log()
.flatMap(this::fetch, 4)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
將這段程式碼放入 Spring Boot 應用程式中,它將在 Tomcat、Jetty 或 Netty 中執行,具體取決於 classpath 中找到了什麼。Tomcat 是該 starter 中的預設伺服器,因此如果您想切換,必須排除它並提供不同的伺服器。就啟動時間、記憶體使用和執行時資源使用而言,這三個伺服器特性非常相似。
我們仍然在 block()
中有阻塞的後端呼叫,所以我們仍然需要線上程池上訂閱,以避免阻塞呼叫者。如果我們有一個非阻塞客戶端,就可以改變這一點,例如,代替使用 RestTemplate
,我們使用新的 WebClient
,然後可以使用以下方式來使用非阻塞客戶端:
private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());
private Mono<HttpStatus> fetch(int value) {
return this.client.perform(HttpRequestBuilders.get("http://example.com"))
.extract(WebResponseExtractors.response(String.class))
.map(response -> response.getStatusCode());
}
請注意,WebClient.perform()
(或者更確切地說,WebResponseExtractor
)有一個響應式返回型別,我們已將其轉換為 Mono<HttpStatus>
,但我們尚未訂閱它。我們希望框架完成所有訂閱,所以現在我們是響應式貫穿始終的。
警告
Spring Reactive 中返回 Publisher
的方法**是**非阻塞的,但一般來說,返回 Publisher
(或 Flux
、Mono
或 Observable
)的方法僅僅是一個提示,表明它**可能**是非阻塞的。如果您正在編寫此類方法,務必分析(最好進行測試)它們是否阻塞,並明確告知呼叫者它們可能阻塞。
注意
我們剛才使用的非阻塞客戶端來簡化 HTTP 棧的技巧,在常規 Spring MVC 中也適用。上面 fetch()
方法的結果可以轉換為 CompletableFuture
並從常規的 @RequestMapping
方法中返回(例如在 Spring Boot 1.3 中)。
現在我們可以移除 HTTP 請求處理器中呼叫 fetch()
後的併發提示了
@RequestMapping("/netty")
public Mono<Result> netty() {
return Flux.range(1, 10) // (1)
.log() //
.flatMap(this::fetch) // (2)
.collect(Result::new, Result::add)
.doOnSuccess(Result::stop);
}
呼叫 10 次
進入新的 Publisher 以並行處理
考慮到我們完全不需要額外的 callable 和訂閱執行緒,這段程式碼比我們必須橋接到阻塞客戶端時乾淨得多,這歸功於程式碼完全響應式。響應式 WebClient
返回一個 Mono
,這立即促使我們在轉換鏈中選擇 flatMap()
,所需的程式碼也就自然生成了。編寫起來體驗更好,可讀性也更高,因此更容易維護。此外,由於沒有執行緒池和併發提示,我們的效能預期中不再有那個神奇的 4 倍因子需要考慮。某個地方仍然存在限制,但它不再是我們應用層面的選擇所施加的,也不再受伺服器“容器”中任何東西的限制。這並非魔法,物理定律依然存在,所以所有後端呼叫仍將花費大約 100 毫秒,但在低爭用情況下,我們甚至可能看到所有 10 個請求在大致與單個請求相同的時間內完成。隨著伺服器負載的增加,延遲和吞吐量自然會劣化,但這種劣化受限於緩衝區競爭和核心網路,而不是應用執行緒管理。這是一種控制反轉,控制下移到了應用程式碼之下的棧層。
請記住,相同的應用程式程式碼可以在 Tomcat、Jetty 或 Netty 上執行。目前,Tomcat 和 Jetty 的支援是建立在 Servlet 3.1 非同步處理之上的,因此不再侷限於每個請求一個執行緒。它是建立在一個響應式橋接器之上,該橋接器將 Servlet 3.1 的概念適配到響應式正規化。對於 Reactor Netty,背壓和響應式支援是內建的。根據您選擇的 HTTP 客戶端庫,伺服器和客戶端可能會共享相同的 HTTP 資源並進一步最佳化。我們將在本系列的另一篇文章中再討論這一點。
提示
在示例程式碼中,“reactive”示例有 Maven 配置檔案 "tomcat"、"tomcatNext"(用於 Tomcat 8.5)、"jetty" 和 "netty",因此您無需更改一行程式碼即可輕鬆嘗試所有不同的伺服器選項。
注意
許多應用程式中的阻塞程式碼並非 HTTP 後端呼叫,而是資料庫互動。目前支援非阻塞客戶端的資料庫非常少(MongoDB 和 Couchbase 是明顯的例外,但即使是它們也不如 HTTP 客戶端成熟)。在所有資料庫廠商都在客戶端側跟上之前,執行緒池和阻塞到響應式模式將長期存在。
我們已經將基本的雜湊-聚集用例簡化到程式碼非常乾淨,並且對它執行的硬體非常友好。我們編寫了一些簡單的程式碼,並且使用 Spring 非常好地堆疊和編排成了一個工作的 HTTP 服務。在晴天,每個人都對結果非常滿意。但一旦出現錯誤,例如行為不端的網路連線,或者後端服務延遲很高,我們就會遭殃。
首先,最明顯的麻煩是,我們編寫的程式碼是宣告式的,因此很難除錯。發生錯誤時,診斷資訊可能非常不透明。使用原始的低階 API,例如沒有 Spring 的 Reactor,甚至沒有 Reactor 的 Netty,情況可能會更糟,因為那時我們必須自己構建大量的錯誤處理,每次與網路互動時都重複樣板程式碼。至少有了 Spring 和 Reactor 的組合,我們可以期望看到未捕獲的異常被記錄堆疊跟蹤。儘管如此,它們可能不容易理解,因為它們發生在不受我們控制的執行緒上,並且有時表現為相當低階的問題,來自棧中不熟悉的部分。
另一個痛點是,如果我們在某個響應式回撥中犯了錯誤並阻塞了,我們將導致同一執行緒上的**所有**請求都被阻塞。對於基於 Servlet 的容器,每個請求都被隔離到一個執行緒中,阻塞不會阻塞其他請求,因為它們在不同的執行緒上處理。阻塞所有請求仍然是麻煩的根源,但在這種情況下,它只會表現為每個請求延遲增加一個大致恆定的因子。在響應式世界中,阻塞單個請求可能導致所有請求延遲增加,而阻塞所有請求可能導致伺服器癱瘓,因為沒有額外的緩衝層和執行緒來彌補不足。
能夠控制非同步處理中的所有活動元件是件好事:每一層都有執行緒池大小和佇列。我們可以使其中一些層具有彈性,並嘗試根據它們的工作量進行調整。但在某個時候,這會成為一種負擔,我們開始尋找更簡單或更精簡的東西。對可伸縮性的分析得出結論,通常最好丟棄額外的執行緒,並根據物理硬體施加的限制工作。這是“機械同情”(mechanical sympathy)的一個例子,LMAX 在Disruptor 模式中成功地利用了這一點,並取得了巨大成效。
我們已經開始看到響應式方法的強大之處,但請記住,能力越大,責任越大。它是激進的,它是基礎性的。它屬於“推倒重來”的領域。所以您也會希望理解響應式並非解決所有問題的方案。事實上,它並非解決任何特定問題的方案,它只是 تسهیل (facilitates) 解決某一類問題。使用它所帶來的好處可能被學習它、修改您的 API 使其完全響應式以及後續維護程式碼的成本所抵消,因此請小心行事。