關於響應式程式設計的筆記 第三部分:一個簡單的 HTTP 伺服器應用程式

工程 | Dave Syer | 2016年7月20日 | ...

在本文中,我們將繼續響應式程式設計系列,重點不再是學習基本 API,而是更側重於具體的用例和編寫真正有用的程式碼。我們將看到響應式程式設計是併發程式設計的一個有用抽象,但它也有一些非常底層的特性,我們應該學會謹慎對待。如果我們開始充分利用這些特性,我們就可以控制應用程式中那些以前不可見、被容器、平臺和框架隱藏的層。

使用 Spring MVC 從阻塞式橋接到響應式

響應式程式設計迫使你從不同的角度看待世界。不再是請求某物然後獲取它(或者不獲取),而是所有事物都作為序列(Publisher)傳遞,你必須訂閱它。不再等待答覆,你必須註冊一個回撥。習慣之後,這並不難,但除非整個世界天翻地覆地變成響應式,否則你將發現自己需要與舊式的阻塞式 API 進行互動。

假設我們有一個返回 HttpStatus 的阻塞方法

private RestTemplate restTemplate = new RestTemplate();

private HttpStatus block(int value) {
    return this.restTemplate.getForEntity("http://example.com/{value}", String.class, value)
            .getStatusCode();
}

我們想用不同的引數反覆呼叫它並聚合結果。這是一個經典的“散射-收集”用例,例如,當你有一個分頁的後端需要跨多個頁面彙總“前 N”項時。由於非響應式(阻塞)操作的細節與散射-收集模式無關,我們可以將它們推到一個名為 block() 的方法中,稍後實現它。下面是一個(糟糕的)示例,它呼叫後端並將結果聚合到一個型別為 Result 的物件中。

Flux.range(1, 10) // (1)
    .log()
    .map(this::block) // (2)
    .collect(Result::new, Result::add) // (3)
    .doOnSuccess(Result::stop) // (4)
  1. 進行 10 次呼叫

  2. 此處為阻塞程式碼

  3. 收集結果並聚合到單個物件中

  4. 在最後停止計時(結果是 Mono<Result>

不要在家裡這樣做。這是一個“糟糕”的示例,因為雖然 API 在技術上使用正確,但我們知道它將阻塞呼叫執行緒;這段程式碼或多或少等同於迴圈體中帶有 block() 呼叫的 for 迴圈。更好的實現會將 block() 呼叫推到後臺執行緒。我們可以透過將其包裝在一個返回 Mono<HttpStatus> 的方法中來實現。

private Mono<HttpStatus> fetch(int value) {
    return Mono.fromCallable(() -> block(value)) // (1)
        .subscribeOn(this.scheduler);            // (2)
}
  1. 此處為封裝在 Callable 中以延遲執行的阻塞程式碼

  2. 在後臺執行緒上訂閱慢速的 Publisher

scheduler 被宣告為一個單獨的共享欄位:Scheduler scheduler = Schedulers.parallel()。然後我們可以宣告我們想用 flatMap() 來處理序列,而不是使用 map()

Flux.range(1, 10)
    .log()
    .flatMap(                             // (1)
        this::fetch, 4)                   // (2)
    .collect(Result::new, Result::add)
    .doOnSuccess(Result::stop)
  1. 降級到新的 Publisher 以並行處理

  2. flatMap 中的併發提示

嵌入非響應式應用程式

如果我們想在非響應式環境中執行上述散射-收集程式碼,我們可以像這樣使用 Spring MVC。

@RequestMapping("/parallel")
public CompletableFuture<Result> parallel() {
    return Flux.range(1, 10)
      ...
      .doOnSuccess(Result::stop)
      .toFuture();
}

如果你閱讀 @RequestMapping 的 Javadoc,你會發現一個方法可以返回一個 CompletableFuture,“應用程式使用它來選擇一個單獨的執行緒來產生返回值”。在這種情況下,單獨的執行緒由“scheduler”提供,它是一個執行緒池,因此處理發生在多個執行緒上,由於 flatMap() 的呼叫方式,一次有 4 個執行緒。

沒有免費的午餐

帶有後臺執行緒的散射-收集是一種有用的模式,但它並不完美——它不會阻塞呼叫者,但它會阻塞其他東西,所以它只是在移動問題。有一些實際的含義。我們有一個 HTTP 伺服器,它(可能)有非阻塞 IO 處理程式,將工作傳遞迴執行緒池,每個 HTTP 請求一個執行緒——所有這些都發生在 Servlet 容器(例如 Tomcat)內部。請求是非同步處理的,因此 Tomcat 中的工作執行緒不會被阻塞,而我們在“scheduler”中建立的執行緒池最多有 4 個併發執行緒在處理。我們處理 10 個後端請求(呼叫 block()),因此使用排程器的最大理論收益是延遲降低 4 倍。換句話說,如果在一個執行緒中一個接一個地處理所有 10 個請求需要 1000 毫秒,那麼對於我們 HTTP 服務的一個傳入請求,我們可能會看到 250 毫秒的處理時間。但我們應該強調“可能”:只有在處理執行緒(在兩個階段,Tomcat 工作執行緒和應用程式排程器)沒有爭用時,它才會如此之快。如果你有一個具有大量核心、非常低的併發性(即,連線到應用程式的客戶端數量很少)並且兩個客戶端幾乎不可能同時發出請求的伺服器,那麼你可能會看到接近理論上的改進。一旦有多個客戶端試圖連線,它們將爭奪相同的 4 個執行緒,延遲將上升,甚至可能比沒有後臺處理的單個客戶端的延遲更差。我們可以透過建立具有更大執行緒池的排程器來提高併發客戶端的延遲,例如:

    private Scheduler scheduler = Schedulers.newParallel("sub", 16);

(16 個執行緒。)現在我們為執行緒及其堆疊使用了更多的記憶體,並且我們可以預期低併發下的延遲會降低,但對於高併發下的延遲不一定,如果我們的硬體核心少於 16 個。我們也不期望在高負載下吞吐量更高:如果存線上程爭用,管理這些資源會產生很高的成本,而這必須在某個指標上有所體現。如果你對這種權衡的更詳細分析感興趣,可以在 Rob Harrop 的部落格系列中找到一些關於效能指標如何在負載下擴充套件的詳細分析。

提示

Tomcat 預設分配 100 個執行緒來處理 HTTP 請求。如果我們知道所有處理都將在我們的排程器執行緒池上進行,那麼這是過多的。存在一個阻抗不匹配:排程器執行緒池可能是一個瓶頸,因為它擁有的執行緒少於上游 Tomcat 執行緒池。這突顯了效能調優可能非常困難的事實,並且,雖然你可能可以控制所有配置,但這是一個微妙的平衡。

如果我們使用一個根據需求調整其容量的排程器,我們可以做得比固定執行緒池更好。Reactor 提供了便利,所以如果你嘗試使用 Schedulers.elastic()(你可以在任何地方呼叫它;只有一個例項),你會發現在負載下會建立更多的執行緒。

完全響應式

從阻塞式到響應式的橋接是一種有用的模式,並且使用 Spring MVC 中現有的技術很容易實現(如上所示)。響應式旅程的下一個階段是完全擺脫應用程式執行緒的阻塞,為此需要新的 API 和新工具。最終,我們必須在整個堆疊中完全響應式,包括伺服器和客戶端。這是 Spring Reactive 的目標,這是一個新的框架,與 Spring MVC 正交,但滿足相同的需求,並使用相似的程式設計模型。

注意

Spring Reactive 最初是一個獨立專案,但在 5.0 版本(2016 年 6 月第一個里程碑)中被整合到 Spring Framework 中。

在我們的散射-收集示例中,完全響應式的第一步是將 spring-boot-starter-web 替換為類路徑上的 spring-boot-starter-webflux。在 Maven 中

<dependencies>
  <dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-webflux</artifactId>
  </dependency>

或在 Gradle 中

dependencies {
	compile('org.springframework.boot:spring-boot-starter-webflux')
    ...
}

然後在控制器中,我們可以直接放棄橋接到 CompletableFuture,並返回一個型別為 Mono 的物件。

@RequestMapping("/parallel")
public Mono<Result> parallel() {
    return Flux.range(1, 10)
            .log()
            .flatMap(this::fetch, 4)
            .collect(Result::new, Result::add)
            .doOnSuccess(Result::stop);
}

將此程式碼放入 Spring Boot 應用程式中,它將執行在 Tomcat、Jetty 或 Netty 上,具體取決於類路徑上的內容。Tomcat 是該 starter 中的預設伺服器,因此如果你想切換,必須排除它並提供另一個。三者在啟動時間、記憶體使用和執行時資源使用方面都具有非常相似的特徵。

我們仍然在 block() 中有阻塞的後端呼叫,所以我們仍然必須線上程池上訂閱以避免阻塞呼叫者。如果我們有一個非阻塞客戶端,我們可以改變這一點,例如,使用新的 WebClient 而不是 RestTemplate,那麼我們可能會這樣做以使用非阻塞客戶端。

private WebClient client = new WebClient(new ReactorHttpClientRequestFactory());

private Mono<HttpStatus> fetch(int value) {
    return this.client.perform(HttpRequestBuilders.get("http://example.com"))
            .extract(WebResponseExtractors.response(String.class))
            .map(response -> response.getStatusCode());
}

請注意,WebClient.perform()(或更確切地說,WebResponseExtractor)有一個響應式返回型別,我們已將其轉換為 Mono<HttpStatus>,但我們沒有訂閱它。我們希望框架完成所有訂閱工作,所以現在我們是完全響應式的。

警告

Spring Reactive 中返回 Publisher 的方法**是**非阻塞的,但總的來說,返回 Publisher(或 FluxMonoObservable)的方法只是一個提示,表明它可能是非阻塞的。如果你編寫這樣的方法,分析(最好是測試)它們是否會阻塞,並明確告知呼叫者它們是否可能這樣做非常重要。

注意

我們剛才使用非阻塞客戶端簡化 HTTP 堆疊的技巧也適用於常規的 Spring MVC。上面 fetch() 方法的結果可以轉換為 CompletableFuture 並從常規的 @RequestMapping 方法中傳遞出去(例如,在 Spring Boot 1.3 中)。

控制反轉

現在我們可以從 HTTP 請求處理程式中移除 fetch() 呼叫後的併發提示。

@RequestMapping("/netty")
public Mono<Result> netty() {
    return Flux.range(1, 10) // (1)
        .log() //
        .flatMap(this::fetch) // (2)
        .collect(Result::new, Result::add)
        .doOnSuccess(Result::stop);
}
  1. 進行 10 次呼叫

  2. 降級到新的 Publisher 以並行處理

考慮到我們根本不需要額外的 Callable 和 Subscriber 執行緒,這段程式碼比之前不得不橋接到阻塞客戶端時要乾淨得多,這歸功於程式碼是完全響應式的。響應式的 WebClient 返回一個 Mono,這讓我們立即選擇轉換鏈中的 flatMap(),我們需要的程式碼就自然產生了。它寫起來更愉快,也更易讀,因此更容易維護。此外,由於沒有執行緒池和併發提示,因此在我們的效能預期中沒有“魔法”的 4 倍因素。確實存在一個限制,但它不再由我們選擇的應用程式層施加,也不受伺服器“容器”的限制。這不是魔法,仍然有物理定律,所以後端呼叫仍然需要大約 100 毫秒,但低爭用下,我們甚至可能看到所有 10 個請求的完成時間與單個請求大致相同。隨著伺服器負載的增加,延遲和吞吐量自然會下降,但下降的方式是由緩衝區爭用和核心網路控制,而不是由應用程式執行緒管理。這是一種控制反轉,深入到應用程式程式碼之下的較低層。

請記住,相同的應用程式程式碼可以在 Tomcat、Jetty 或 Netty 上執行。目前,Tomcat 和 Jetty 的支援是基於 Servlet 3.1 非同步處理的,因此不再限於每個執行緒一個請求。它建立在一個響應式橋接之上,該橋接將 Servlet 3.1 的概念適配到響應式正規化。對於 Reactor Netty,背壓和響應式支援是內建的。根據你選擇的 HTTP 客戶端庫,伺服器和客戶端可以共享相同的 HTTP 資源並進一步最佳化。我們將在本系列文章的另一篇文章中詳細介紹這一點。

提示

示例程式碼 中,“reactive”示例具有 Maven 配置檔案“tomcat”、“tomcatNext”(用於 Tomcat 8.5)、“jetty”和“netty”,因此你可以輕鬆地嘗試所有不同的伺服器選項而無需更改一行程式碼。

注意

許多應用程式中的阻塞程式碼不是 HTTP 後端呼叫,而是資料庫互動。目前支援非阻塞客戶端的資料庫非常少(MongoDB 和 Couchbase 是值得注意的例外,但即使是它們也不如 HTTP 客戶端成熟)。在所有資料庫供應商追趕客戶端方面之前,執行緒池和阻塞到響應式的模式將長期存在。

仍然沒有免費的午餐

我們已經精簡了基本的散射-收集用例,直到程式碼變得非常乾淨,並且與執行它的硬體非常相容。我們編寫了一些簡單的程式碼,並且它被很好地堆疊和編排成一個使用 Spring 的有效 HTTP 服務。在好天氣裡,每個人都對結果非常滿意。但一旦出現錯誤,例如,一個表現糟糕的網路連線,或一個後端服務出現延遲問題,我們將遭受痛苦。

首先,最明顯的痛苦是程式碼是宣告性的,所以很難除錯。當錯誤發生時,診斷可能非常不透明。使用原始的、底層的 API,例如沒有 Spring 的 Reactor,甚至低到沒有 Reactor 的 Netty,可能會使情況更糟,因為那時我們必須自己構建大量的錯誤處理,每次與網路互動時都會重複樣板程式碼。至少有了 Spring 和 Reactor 的結合,我們可以期望看到 Stray、未捕獲異常的堆疊跟蹤日誌。但它們可能不容易理解,因為它們發生在我們無法控制的執行緒上,並且有時會顯示為相當底層的關注點,來自不熟悉的堆疊部分。

另一個痛苦的來源是,如果我們犯了一個錯誤並在我們的一個響應式回撥中阻塞,我們將佔用**所有**相同執行緒上的請求。使用基於 Servlet 的容器,每個請求都與一個執行緒隔離,阻塞不會耽誤其他請求,因為它們是在不同的執行緒上處理的。阻塞所有請求仍然是一種麻煩,但它只會表現為延遲的增加,每個請求的增加因子大致恆定。在響應式世界中,阻塞單個請求會導致所有請求的延遲增加,而阻塞所有請求可能會使伺服器癱瘓,因為沒有額外的緩衝區和執行緒層來彌補缺口。

結論

我們可以控制我們非同步處理的所有活動部件,這很好:每一層都有一個執行緒池大小和一個佇列。我們可以使其中一些層具有彈性,並嘗試根據它們的工作量進行調整。但到某個點,它就成了一個負擔,我們開始尋找更簡單、更輕便的東西。對可伸縮性的分析得出結論,通常最好拋棄額外的執行緒,並利用物理硬體施加的約束。這是“機械同情”的一個例子,正如 LMAX 在 Disruptor Pattern 中所充分利用的。

我們已經開始看到響應式方法的強大之處,但請記住,伴隨著強大的力量而來的是責任。它是激進的,它是根本性的。它是“推倒重來”的領域。所以,你也應該能夠理解,響應式並不是解決所有問題的辦法。事實上,它本身並不是任何問題的解決方案,它只是促進了一類特定問題的解決方案。使用它的好處可能會被學習它的成本、修改 API 使其完全響應式以及之後維護程式碼的成本所抵消,所以要謹慎行事。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有