Flux 的飛行 3 - 執行緒切換與排程器 (Hopping Threads and Schedulers)

工程 | Simon Baslé | 2019 年 12 月 13 日 | ...

這篇博文是系列文章中的第三篇,旨在深入探討 Reactor 的更高階概念和內部工作原理。

在這篇文章中,我們將探討執行緒模型、大多數運算子如何與併發無關、Scheduler 抽象以及如何使用 publishOn 等運算子在序列中間切換執行緒。

本系列文章源自 Flight of the Flux 演講,我發現其內容更適合博文形式。

下表將在其他帖子釋出時更新連結,以下是計劃中的內容:

  1. 組裝 (Assembly) vs 訂閱 (Subscription)
  2. 除錯注意事項
  3. 執行緒切換與排程器 (本文)
  4. 內部工作原理:工作竊取 (work stealing)
  5. 內部工作原理:運算子融合 (operator fusion)

如果您還不瞭解 Reactive Streams 和 Reactor 的基本概念,請前往網站的 學習部分參考指南

事不宜遲,讓我們開始吧。

執行緒模型

Reactor 的運算子通常與併發無關:它們不強制特定的執行緒模型,只在其 onNext 方法被呼叫的 Thread 上執行。

正如本系列第一篇帖子中看到的,執行訂閱呼叫的 Thread 也會產生影響:subscribe 呼叫會一直鏈式傳遞,直到到達資料生成者 Publisher (鏈中最左邊的部分),然後此 Publisher 透過 onSubscribe 提供一個 Subscription,該 Subscription 反過來向下傳遞,被請求等等。預設情況下,同樣,此資料生成過程在啟動訂閱的 Thread 上開始。

對此有一個普遍例外:處理時間概念的運算子。任何此類運算子預設將在 Schedulers.parallel() 排程器上執行計時器/延遲等。

還存在一些其他例外,它們也執行在 parallel() Scheduler 上。可以透過其至少有一個接受 Scheduler 引數的過載方法來識別它們。

但是 Scheduler 是什麼?我們為什麼需要它?

Scheduler 抽象

在 Reactor 中,Scheduler 是一個抽象,它賦予使用者對執行緒的控制。一個 Scheduler 可以生成 Worker,這些 Worker 在概念上是 Thread,但不一定由一個 Thread 支援(稍後會看到一個例子)。Scheduler 也包含一個時鐘的概念,而 Worker 純粹是關於任務排程。

interface Scheduler extends Disposable {
    
  Disposable schedule(Runnable task);
  Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
  Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  
  long now(TimeUnit unit);
  
  Worker createWorker();
  
  interface Worker extends Disposable {
    Disposable schedule(Runnable task);
    Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
    Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
  }
}

Reactor 提供了幾種預設的 Scheduler 實現,每種都有其管理 Workers 的特殊性。它們可以透過 Schedulers 工廠方法例項化。以下是它們典型用法的經驗法則:

  • Schedulers.immediate() 可以用作一個空物件 (null object),用於當 API 需要一個 Scheduler 但您不想改變執行緒時。
  • Schedulers.single() 用於可以在一個唯一的 ExecutorService 上執行的一次性任務。
  • Schedulers.parallel() 適合 CPU 密集但生命週期短的任務。它可以並行執行 N 個此類任務(預設情況下,N == CPU 數量)。
  • Schedulers.elastic()Schedulers.boundedElastic() 適用於生命週期較長的任務(例如,阻塞式 I/O 任務)。elastic 按需生成執行緒,沒有限制,而最近引入的 boundedElastic 也這樣做,但對建立的執行緒數量有上限。

每種 Scheduler 型別都有一個由上述方法返回的預設全域性例項,但可以使用 Schedulers.new*** 工廠方法建立新例項(例如,Schedulers.newParallel("myParallel", 10)) 用於建立一個自定義並行 Scheduler,其中 N = 10)。

parallel 型別由 N 個基於 ScheduledExecutorService 的工作執行緒支援。如果您向其提交 N 個長期任務,則無法再執行其他工作,因此它更適合短生命週期任務。

elastic 型別也由基於 ScheduledExecutorService 的工作執行緒支援,不同之處在於它按需建立這些工作執行緒並將它們放入池中。不再使用的 Workerdispose() 時返回到池中,並在此處保留配置的 TTL 時長,因此新的入站任務可以重用空閒的工作執行緒。但是,如果沒有任何空閒的 Worker 可用,它會繼續建立新的工作執行緒。

boundedElastic 型別在概念上與 elastic 非常相似,不同之處在於它對建立的基於 ScheduledExecutorServiceWorker 數量設定了上限。超過此上限後,其 createWorker() 方法將返回一個門面 (facade) Worker,該門面 Worker 會將任務排隊而不是立即提交。一旦有具體的 Worker 可用,它就會與門面互換,並開始實際提交任務(使其表現得就像您剛剛提交了任務一樣,包括延遲的任務)。此外,可以設定所有該 Scheduler 例項的門面工作執行緒可以排隊的延遲任務總數的上限。

排程器總是由 ExecutorService 支援嗎?

正如我們上面所說,不是。我們已經看到一個例子:immediate() Scheduler。這個排程器不會改變程式碼執行的 Thread

但在 reactor-test 庫中有一個更有用的例子:VirtualTimeScheduler。這個 Scheduler 在當前 Thread 上執行,但會將提交給它的所有任務都打上它們應該執行的時間戳。

然後它管理一個虛擬時鐘(得益於 Scheduler 也具有時鐘的職責),可以手動推進。推進時,所有排隊等待在新虛擬時間戳之前或之時執行的任務都將被執行。

這在測試場景中非常有用,當您有一個帶有長間隔/延遲的 FluxMono,並且您想測試邏輯而不是時序時。例如,像 Mono.delay(Duration.ofHours(4)) 這樣的程式碼可以在 100ms 內執行...

我們也可以想象圍繞 Actor 系統、ForkJoinPool、即將推出的 Loom 纖維等實現一個 Scheduler...

關於執行緒

人們經常詢問如何在 Scheduler 的執行緒和執行緒之間來回切換。從主執行緒切換到排程器顯然是可能的,**但從任意執行緒切換到執行緒是不可能的**。這完全是 Java 的限制,因為沒有辦法向執行緒提交任務(例如,沒有 MainThreadExecutorService)。

將排程器應用於運算子

既然我們熟悉了 Reactor 中執行緒的基本構建塊,讓我們看看這在運算子世界中如何體現。

我們已經確定,大多數運算子繼續在其發出訊號的 Thread 上工作,時間類運算子(如 Mono.delaybufferTimeout() 等)除外。

Reactor 的理念是為您提供正確的工具,透過組合運算子來實現。執行緒也不例外:請看 subscribeOnpublishOn

這兩個運算子只是接受一個 Scheduler,並將執行切換到該排程器的一個 Worker 上。當然,兩者之間有重大區別:)

publishOn(Scheduler s) 運算子

這是您想要切換執行緒時需要的基本運算子。來自其源的入站訊號將在給定的 Scheduler釋出,從而有效地將執行緒切換到該排程器的一個工作執行緒。

這適用於 onNextonCompleteonError 訊號。也就是說,從上游源流向下游訂閱者的訊號。

因此,從本質上講,此運算子下方出現的每個處理步驟都將在新的 Scheduler s 上執行,直到另一個運算子再次切換(例如,另一個 publishOn)。

讓我們看一個故意粗糙的阻塞呼叫示例。但請記住,響應式鏈中的阻塞呼叫總是粗糙的!:)

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

在上面的示例中,假設此程式碼在執行緒上執行,每個 Flux.fromIterable 都會在該同一個 Thread 上發出其 List 的內容。然後我們在一個 map 內部使用一個命令式阻塞式 Web 客戶端來獲取每個 url 的主體,這“繼承”了該執行緒(從而阻塞了它)。因此,每個 subscribe 中的資料消費 lambda 也在主執行緒上執行。

結果是,所有這些 URL 都將在主執行緒上依次處理。

main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E

如果我們引入 publishOn,我們可以使此程式碼更具效能,這樣 Flux 之間就不會相互阻塞。

Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

這可能產生類似以下輸出:

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

現在,第一個列表和第二個列表交錯出現,太棒了!

subscribeOn(Scheduler s) 運算子

在前面的例子中,我們看到了如何使用 publishOn 將阻塞工作轉移到單獨的執行緒上,方法是將這些阻塞工作的觸發器(要抓取的 URL)的釋出切換到提供的 Scheduler 上。

由於 map 運算子在其源執行緒上執行,因此透過在 map 之前放置 publishOn 來切換該源執行緒可以按預期工作。

但是,如果這個 URL 抓取方法是別人寫的,並且他們遺憾地忘記新增 publishOn 怎麼辦?有沒有辦法影響上游Thread

在某種程度上,有。這就是 subscribeOn 可以派上用場的地方。

此運算子更改了 subscribe 方法的執行位置。並且由於訂閱訊號向上流動,它直接影響源 Flux 在何處訂閱並開始生成資料。

因此,它似乎可以作用於響應式運算子鏈的上下游部分(只要鏈中沒有丟擲 publishOn

//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .map(url -> blockingWebClient.get(url)); //oops!
}

//your code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

就像我們的第二個 publishOn 示例一樣,這段程式碼將正確地輸出類似以下內容:

boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

那麼發生了什麼?

subscribe 呼叫仍然在執行緒上執行,但它們迅速將 subscribe 訊號傳播給了它們的源 subscribeOn。然後,subscribeOn 將該相同的訊號從 fetchUrls 傳播給它自己的源,**但這次是在一個 boundedElastic Worker 上**。

fetchUrls 返回的 Flux 序列中,map 在 boundedElastic 工作執行緒上訂閱,range 也是如此。range 開始生成資料,仍然在 boundedElastic 工作執行緒上。

資料路徑沿著這條路線繼續,每個訂閱者都在其源執行緒(即 boundedElastic 執行緒)上執行 onNext

最後,在 subscribe(...) 呼叫中配置的 lambdas 也將在 boundedElastic 執行緒上執行。

重要提示

區分訂閱的行為和傳遞給 subscribe() 方法的 lambda 是很重要的。此方法訂閱其源 Flux,但 lambda 在處理結束時執行,當資料流經所有步驟(包括切換到另一個執行緒的步驟)後。

因此,執行 lambda 的 Thread 可能與訂閱的 Thread 不同,即呼叫 subscribe 方法的執行緒。

如果我們是 fetchUrls 庫的作者,我們可以透過以稍微不同的方式利用 subscribeOn,讓每次抓取都在自己的 Worker 上執行,從而使程式碼更具效能:

final Flux<String> betterFetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
    .flatMap(url -> 
             //wrap the blocking call in a Mono
             Mono.fromCallable(() -> blockingWebClient.get(url))
             //ensure that Mono is subscribed in an boundedElastic Worker
             .subscribeOn(Schedulers.boundedElastic())
    ); //each individual URL fetch runs in its own thread!
}

如果我混合使用兩者呢?

subscribeOn 將貫穿整個訂閱階段,從下到上,然後在資料路徑上作用,直到遇到 publishOn(或時間類運算子)。

讓我們考慮以下示例:

Flux.just("hello")
    .doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
    .publishOn(Scheduler.boundedElastic())
    .doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
    .delayElements(Duration.ofMillis(500))
    .subscribeOn(Schedulers.elastic())
    .subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));

這將列印:

just elastic-1
publish boundedElastic-1
hello delayed parallel-1

我們應該一步步拆解發生了什麼:

  • 在這裡,subscribe執行緒上呼叫,但由於緊鄰其上的 subscribeOn,訂閱迅速切換到 elastic 排程器。
  • 其上面的所有運算子也都從下到上在 elastic 上訂閱。
  • justelastic 排程器上發出其值。
  • 第一個 doOnNext 在同一執行緒上接收該值並打印出來:just elastic-1
  • 然後,在從上到下的資料路徑上,我們遇到了 publishOn:來自 doOnNext 的資料在 boundedElastic 排程器上向下遊傳播。
  • 第二個 doOnNextboundedElastic 上接收其資料,並相應地列印 publish bounderElastic-1
  • delayElements 是一個時間運算子,因此預設情況下它會在 Schedulers.parallel() 排程器上釋出資料。
  • 在資料路徑上,subscribeOn 只負責在同一執行緒上傳播訊號,不做其他事情。
  • 在資料路徑上,傳遞給 subscribe(...) 的 lambda(s) 在接收資料訊號的執行緒中執行,因此 lambda 列印 hello delayed parallel-1

結論

在本文中,我們瞭解了 Scheduler 抽象以及它如何支援高階用法,如 VirtualTimeScheduler

然後我們學習瞭如何在響應式序列中間切換執行緒(或者更確切地說是 Scheduler 工作執行緒),以及 publishOnsubscribeOn 之間的區別。

在下一篇文章中,我們將深入探討庫的內部機制,以描述為確保 Reactor 效能而採取的一些最佳化措施。

在此期間,祝您響應式程式設計愉快!

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,助您快速進步。

瞭解更多

獲取支援

Tanzu Spring 在一個簡單的訂閱中提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視所有