響應式程式設計筆記(第三部分):一個簡單的 HTTP 伺服器應用

工程 | Dave Syer | 2016年7月20日 | ...

在本文中,我們將繼續關於響應式程式設計的系列文章。本篇的重點不再是學習基本 API,而是更具體的用例和編寫實際有用的程式碼。我們將看到響應式是如何成為併發程式設計的有用抽象的,同時也會了解到它有一些非常底層的特性,我們需要謹慎對待。如果我們開始充分利用這些特性,就可以控制應用程式中以前由容器、平臺和框架隱藏的層。

使用 Spring MVC 連線阻塞式與響應式

響應式迫使您以不同的方式看待世界。不再是請求某物並獲得它(或未獲得它),而是所有東西都作為序列(Publisher)交付,您必須訂閱它。不再等待答案,而是需要註冊一個回撥。一旦習慣了,這並不難,但除非整個世界都發生了翻天覆地的變化並變得響應式,否則您會發現需要與舊式的阻塞式 API 互動。

假設我們有一個返回 HttpStatus 的阻塞方法

private RestTemplate restTemplate = new RestTemplate();

private HttpStatus block(int value) {
    return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
            .getStatusCode();
}

並且我們想用不同的引數重複呼叫它並聚合結果。這是一個經典的“雜湊-聚集”用例,例如,如果您有一個分頁的後端需要彙總跨多個頁面的“前 N”項,就會遇到這種情況。由於非響應式(阻塞式)操作的細節與雜湊-聚集模式無關,我們可以將它們下推到名為 block() 的方法中,並在稍後實現。這是一個(糟糕的)示例,它呼叫後端並將結果聚合成一個 Result 型別的物件

Flux.range(1, 10) // (1)
    .log()
    .map(this::block) // (2)
    .collect(Result::new, Result::add) // (3)
    .doOnSuccess(Result::stop) // (4)
  1. 呼叫 10 次

  2. 此處是阻塞式程式碼

  3. 收集結果並聚合成一個物件

  4. 最後停止計時(結果是 Mono<Result>

請勿在家中嘗試。這是一個“糟糕的”示例,因為雖然 API 在技術上使用正確,但我們知道它會阻塞呼叫執行緒;這段程式碼或多或少等同於一個 for 迴圈,其中包含 block() 方法的呼叫。更好的實現是將 block() 呼叫推送到後臺執行緒。我們可以透過將其包裝在返回 Mono<HttpStatus> 的方法中來實現

private Mono<HttpStatus> fetch(int value) {
    return Mono.fromCallable(() -> block(value)) // (1)
        .subscribeOn(this.scheduler);            // (2)
}
  1. 此處是 Callable 內部的阻塞式程式碼,用於延遲執行

  2. 在後臺執行緒上訂閱慢速的 Publisher

scheduler 被單獨宣告為一個共享欄位:Scheduler scheduler = Schedulers.parallel()。然後我們可以宣告我們想要使用 flatMap() 代替 map() 來處理序列

Flux.range(1, 10)
    .log()
    .flatMap(                             // (1)
        this::fetch, 4)                   // (2)
    .collect(Result::new, Result::add)
    .doOnSuccess(Result::stop)
  1. 進入新的 Publisher 以並行處理

  2. flatMap 中的併發提示

嵌入非響應式應用

如果想在非響應式環境中執行上面的雜湊-聚集程式碼,可以使用 Spring MVC,如下所示

@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
    return Flux.range(1, 10)
      ...
      .doOnSuccess(Result::stop)
      .toFuture();
}

如果你閱讀 @RequestMapping 的 Javadoc 文件,你會發現一個方法可以返回一個 CompletableFuture,“應用程式使用它在由其自行選擇的單獨執行緒中生成返回值”。在這種情況下,這個單獨的執行緒由“scheduler”提供,它是一個執行緒池,因此處理正在多個執行緒上進行,每次4個,因為 flatMap() 被呼叫的方式。

沒有免費的午餐

使用後臺執行緒的雜湊-聚集是一種有用的模式,但它並非完美無缺——它沒有阻塞呼叫者,但它阻塞了某些東西,所以只是將問題轉移了。這有一些實際意義。我們有一個 HTTP 伺服器,它有(可能)非阻塞的 IO 處理器,將工作傳遞迴一個執行緒池,每個 HTTP 請求一個執行緒——所有這些都發生在 Servlet 容器(例如 Tomcat)內部。請求是非同步處理的,因此 Tomcat 中的工作執行緒沒有被阻塞,而我們在“scheduler”中建立的執行緒池最多可以在 4 個併發執行緒上進行處理。我們正在處理 10 個後端請求(對 block() 的呼叫),因此使用 scheduler 理論上可以帶來最多 4 倍的延遲降低。換句話說,如果在一個執行緒中按順序處理所有 10 個請求需要 1000 毫秒,那麼對於我們 HTTP 服務接收到的單個傳入請求,處理時間可能會降至 250 毫秒。但是我們應該強調“可能”:只有在處理執行緒(在兩個階段,Tomcat 工作執行緒和應用 scheduler)沒有爭用的情況下,它才會這麼快。如果您的伺服器有大量核心,併發性非常低,即連線到您應用程式的客戶端數量很少,並且幾乎沒有兩個客戶端同時發出請求的可能性,那麼您可能會看到接近理論上的改進。一旦有多個客戶端試圖連線,它們都會競爭相同的 4 個執行緒,延遲會上升,甚至可能比單個客戶端沒有後臺處理時更差。我們可以透過建立一個更大的執行緒池來改善併發客戶端的延遲,例如

    private Scheduler scheduler = Schedulers.newParallel("sub", 16);

(16 個執行緒。)現在我們為執行緒及其堆疊使用了更多記憶體,並且在低併發情況下可以預期延遲會降低,但在高併發情況下則不一定,如果我們的硬體核心數少於 16 個。我們也不期望在高負載下獲得更高的吞吐量:如果執行緒存在競爭,管理這些資源的成本很高,這必須體現在某個重要的指標中。如果您對這種權衡的更詳細分析感興趣,可以參考 Rob Harrop 的系列部落格中關於負載下效能指標如何擴充套件的詳細分析。

提示

Tomcat 預設分配 100 個執行緒來處理 HTTP 請求。如果我們知道所有處理都將在我們的 scheduler 執行緒池上進行,那麼這數量就過多了。存在阻抗不匹配:scheduler 執行緒池可能成為瓶頸,因為它比上游的 Tomcat 執行緒池執行緒少。這突出了效能調優非常困難,並且雖然您可以控制所有配置,但這是一種微妙的平衡。

如果我們使用一個根據需求調整容量的 scheduler,我們可以做得比固定執行緒池更好。Reactor 為此提供了便利,所以如果您使用 Schedulers.elastic() 嘗試相同的程式碼(您可以在任何地方呼叫它;只有一個例項),您會發現在負載下會建立更多執行緒。

響應式貫穿始終

從阻塞式到響應式的橋接是一種有用的模式,並且使用 Spring MVC 中的現有技術很容易實現(如上所示)。響應式旅程的下一階段是完全擺脫應用程式執行緒中的阻塞,這需要新的 API 和新工具。最終,我們必須從上到下都是響應式的,包括伺服器和客戶端。這就是 Spring Reactive 的目標,它是一個新的框架,與 Spring MVC 正交,但滿足同樣的需求,並使用類似的程式設計模型。

注意

Spring Reactive 最初是一個獨立專案,但在 Spring Framework 5.0 版本中被整合進去(第一個里程碑於 2016 年 6 月釋出)。

在我們的雜湊-聚集示例中,完全實現響應式的第一步是在 classpath 中用 spring-boot-starter-webflux 替換 spring-boot-starter-web。對於 Maven:

<dependencies>
  <dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>

或者對於 Gradle:

dependencies {
	compile('org.springframework.boot:spring-boot-starter-webflux')
    ...
}

然後在控制器中,我們可以簡單地去掉與 CompletableFuture 的橋接,直接返回一個 Mono 型別的物件

@RequestMapping("/parallel")
public Mono<Result> parallel() {
    return Flux.range(1, 10)
            .log()
            .flatMap(this::fetch, 4)
            .collect(Result::new, Result::add)
            .doOnSuccess(Result::stop);
}

將這段程式碼放入 Spring Boot 應用程式中,它將在 Tomcat、Jetty 或 Netty 中執行,具體取決於 classpath 中找到了什麼。Tomcat 是該 starter 中的預設伺服器,因此如果您想切換,必須排除它並提供不同的伺服器。就啟動時間、記憶體使用和執行時資源使用而言,這三個伺服器特性非常相似。

我們仍然在 block() 中有阻塞的後端呼叫,所以我們仍然需要線上程池上訂閱,以避免阻塞呼叫者。如果我們有一個非阻塞客戶端,就可以改變這一點,例如,代替使用 RestTemplate,我們使用新的 WebClient,然後可以使用以下方式來使用非阻塞客戶端:

private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());

private Mono<HttpStatus> fetch(int value) {
    return this.client.perform(HttpRequestBuilders.get("http://example.com"))
            .extract(WebResponseExtractors.response(String.class))
            .map(response -> response.getStatusCode());
}

請注意,WebClient.perform()(或者更確切地說,WebResponseExtractor)有一個響應式返回型別,我們已將其轉換為 Mono<HttpStatus>,但我們尚未訂閱它。我們希望框架完成所有訂閱,所以現在我們是響應式貫穿始終的。

警告

Spring Reactive 中返回 Publisher 的方法**是**非阻塞的,但一般來說,返回 Publisher(或 FluxMonoObservable)的方法僅僅是一個提示,表明它**可能**是非阻塞的。如果您正在編寫此類方法,務必分析(最好進行測試)它們是否阻塞,並明確告知呼叫者它們可能阻塞。

注意

我們剛才使用的非阻塞客戶端來簡化 HTTP 棧的技巧,在常規 Spring MVC 中也適用。上面 fetch() 方法的結果可以轉換為 CompletableFuture 並從常規的 @RequestMapping 方法中返回(例如在 Spring Boot 1.3 中)。

控制反轉

現在我們可以移除 HTTP 請求處理器中呼叫 fetch() 後的併發提示了

@RequestMapping("/netty")
public Mono<Result> netty() {
    return Flux.range(1, 10) // (1)
        .log() //
        .flatMap(this::fetch) // (2)
        .collect(Result::new, Result::add)
        .doOnSuccess(Result::stop);
}
  1. 呼叫 10 次

  2. 進入新的 Publisher 以並行處理

考慮到我們完全不需要額外的 callable 和訂閱執行緒,這段程式碼比我們必須橋接到阻塞客戶端時乾淨得多,這歸功於程式碼完全響應式。響應式 WebClient 返回一個 Mono,這立即促使我們在轉換鏈中選擇 flatMap(),所需的程式碼也就自然生成了。編寫起來體驗更好,可讀性也更高,因此更容易維護。此外,由於沒有執行緒池和併發提示,我們的效能預期中不再有那個神奇的 4 倍因子需要考慮。某個地方仍然存在限制,但它不再是我們應用層面的選擇所施加的,也不再受伺服器“容器”中任何東西的限制。這並非魔法,物理定律依然存在,所以所有後端呼叫仍將花費大約 100 毫秒,但在低爭用情況下,我們甚至可能看到所有 10 個請求在大致與單個請求相同的時間內完成。隨著伺服器負載的增加,延遲和吞吐量自然會劣化,但這種劣化受限於緩衝區競爭和核心網路,而不是應用執行緒管理。這是一種控制反轉,控制下移到了應用程式碼之下的棧層。

請記住,相同的應用程式程式碼可以在 Tomcat、Jetty 或 Netty 上執行。目前,Tomcat 和 Jetty 的支援是建立在 Servlet 3.1 非同步處理之上的,因此不再侷限於每個請求一個執行緒。它是建立在一個響應式橋接器之上,該橋接器將 Servlet 3.1 的概念適配到響應式正規化。對於 Reactor Netty,背壓和響應式支援是內建的。根據您選擇的 HTTP 客戶端庫,伺服器和客戶端可能會共享相同的 HTTP 資源並進一步最佳化。我們將在本系列的另一篇文章中再討論這一點。

提示

示例程式碼中,“reactive”示例有 Maven 配置檔案 "tomcat"、"tomcatNext"(用於 Tomcat 8.5)、"jetty" 和 "netty",因此您無需更改一行程式碼即可輕鬆嘗試所有不同的伺服器選項。

注意

許多應用程式中的阻塞程式碼並非 HTTP 後端呼叫,而是資料庫互動。目前支援非阻塞客戶端的資料庫非常少(MongoDB 和 Couchbase 是明顯的例外,但即使是它們也不如 HTTP 客戶端成熟)。在所有資料庫廠商都在客戶端側跟上之前,執行緒池和阻塞到響應式模式將長期存在。

仍然沒有免費的午餐

我們已經將基本的雜湊-聚集用例簡化到程式碼非常乾淨,並且對它執行的硬體非常友好。我們編寫了一些簡單的程式碼,並且使用 Spring 非常好地堆疊和編排成了一個工作的 HTTP 服務。在晴天,每個人都對結果非常滿意。但一旦出現錯誤,例如行為不端的網路連線,或者後端服務延遲很高,我們就會遭殃。

首先,最明顯的麻煩是,我們編寫的程式碼是宣告式的,因此很難除錯。發生錯誤時,診斷資訊可能非常不透明。使用原始的低階 API,例如沒有 Spring 的 Reactor,甚至沒有 Reactor 的 Netty,情況可能會更糟,因為那時我們必須自己構建大量的錯誤處理,每次與網路互動時都重複樣板程式碼。至少有了 Spring 和 Reactor 的組合,我們可以期望看到未捕獲的異常被記錄堆疊跟蹤。儘管如此,它們可能不容易理解,因為它們發生在不受我們控制的執行緒上,並且有時表現為相當低階的問題,來自棧中不熟悉的部分。

另一個痛點是,如果我們在某個響應式回撥中犯了錯誤並阻塞了,我們將導致同一執行緒上的**所有**請求都被阻塞。對於基於 Servlet 的容器,每個請求都被隔離到一個執行緒中,阻塞不會阻塞其他請求,因為它們在不同的執行緒上處理。阻塞所有請求仍然是麻煩的根源,但在這種情況下,它只會表現為每個請求延遲增加一個大致恆定的因子。在響應式世界中,阻塞單個請求可能導致所有請求延遲增加,而阻塞所有請求可能導致伺服器癱瘓,因為沒有額外的緩衝層和執行緒來彌補不足。

結論

能夠控制非同步處理中的所有活動元件是件好事:每一層都有執行緒池大小和佇列。我們可以使其中一些層具有彈性,並嘗試根據它們的工作量進行調整。但在某個時候,這會成為一種負擔,我們開始尋找更簡單或更精簡的東西。對可伸縮性的分析得出結論,通常最好丟棄額外的執行緒,並根據物理硬體施加的限制工作。這是“機械同情”(mechanical sympathy)的一個例子,LMAX 在Disruptor 模式中成功地利用了這一點,並取得了巨大成效。

我們已經開始看到響應式方法的強大之處,但請記住,能力越大,責任越大。它是激進的,它是基礎性的。它屬於“推倒重來”的領域。所以您也會希望理解響應式並非解決所有問題的方案。事實上,它並非解決任何特定問題的方案,它只是 تسهیل (facilitates) 解決某一類問題。使用它所帶來的好處可能被學習它、修改您的 API 使其完全響應式以及後續維護程式碼的成本所抵消,因此請小心行事。

訂閱 Spring 快訊

訂閱 Spring 快訊,保持聯通

訂閱

超越

VMware 提供培訓和認證,助力您的飛速發展。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,一站式訂閱。

瞭解更多

即將舉辦的活動

檢視 Spring 社群即將舉辦的所有活動。

檢視全部