領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多這篇博文是系列文章中的第三篇,旨在深入探討Reactor更高階的概念和內部工作原理。
在這篇文章中,我們探索了執行緒模型,一些(大多數)運算子如何與併發無關,Scheduler 抽象以及如何使用 publishOn 等運算子在序列中從一個執行緒跳躍到另一個執行緒。
這個系列源於 Flight of the Flux 演講,我發現其內容更適合博文格式。
下表將在其他文章釋出時更新連結,但這是計劃中的內容
如果您錯過了 Reactive Streams 簡介和 Reactor 的基本概念,請訪問網站的學習部分和參考指南。
話不多說,讓我們開始吧
Reactor 運算子通常是併發無關的:它們不施加特定的執行緒模型,只在呼叫其 onNext 方法的 Thread 上執行。
正如我們在這個系列的第一篇文章中看到的,執行訂閱呼叫的 Thread 也有影響:subscribe 呼叫會一直鏈接,直到達到資料生產的 Publisher(運算子鏈的最左側部分),然後這個 Publisher 透過 onSubscribe 提供一個 Subscription,反過來向下傳遞,被請求等等... 預設情況下,這個資料生產過程再次在發起訂閱的 Thread 上開始。
對此有一個普遍的例外:涉及時間概念的運算子。任何此類運算子預設都在 Schedulers.parallel() 排程器上執行計時器/延遲等。
還有其他一些例外,它們也在此 parallel() Scheduler 上執行。它們的特徵是至少有一個接受 Scheduler 引數的過載。
但是 Scheduler 是什麼,為什麼我們需要它?
Scheduler 抽象在 Reactor 中,Scheduler 是一個抽象,它讓使用者可以控制執行緒。Scheduler 可以生成 Worker,它們概念上是 Thread,但不一定由 Thread 支援(我們稍後會看到一個例子)。Scheduler 還包括時鐘的概念,而 Worker 純粹是關於排程任務的。
interface Scheduler extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
long now(TimeUnit unit);
Worker createWorker();
interface Worker extends Disposable {
Disposable schedule(Runnable task);
Disposable schedule(Runnable task, long initialDelay, TimeUnit delayUnit);
Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit);
}
}
Reactor 附帶了幾個預設的 Scheduler 實現,每個實現都有其管理 Workers 的特定方式。它們可以透過 Schedulers 工廠方法例項化。以下是它們典型用法的經驗法則
Schedulers.immediate() 可以用作 null 物件,當 API 需要 Scheduler 但您不想改變執行緒時Schedulers.single() 用於可以在一個唯一的 ExecutorService 上執行的一次性任務Schedulers.parallel() 適用於 CPU 密集但生命週期短的任務。它可以並行執行 N 個此類任務(預設 N == CPU 數量)Schedulers.elastic() 和 Schedulers.boundedElastic() 適用於更長時間的任務(例如阻塞式 IO 任務)。elastic 按需生成執行緒,沒有限制,而最近引入的 boundedElastic 則對建立的執行緒數量設定了上限。每種 Scheduler 都有一個由上述方法返回的預設全域性例項,但可以使用 Schedulers.new*** 工廠方法建立新例項(例如 Schedulers.newParallel("myParallel", 10)) 建立一個自定義並行 Scheduler,其中 N = 10)。
parallel 型別由 N 個工作執行緒支援,每個工作執行緒都基於 ScheduledExecutorService。如果您向它提交 N 個長時間任務,就無法再執行其他工作,因此它適合短生命週期任務。
elastic 型別也由基於 ScheduledExecutorService 的工作執行緒支援,但它按需建立這些工作執行緒並將其池化。不再使用的 Worker 在 dispose() 時返回到池中,並在此處保留配置的 TTL 時長,因此新傳入的任務可以重用空閒的工作執行緒。但是,如果沒有空閒的 Worker 可用,它會繼續建立新的工作執行緒。
boundedElastic 型別在概念上與 elastic 非常相似,只是它對建立的由 ScheduledExecutorService 支援的 Worker 數量設定了上限。超過此點,其 createWorker() 方法返回一個門面 Worker,該門面 Worker 將任務排隊而不是立即提交。一旦具體的 Worker 可用,它就會與門面交換並開始實際提交任務(使其行為就像您剛剛提交了任務一樣,包括延遲的任務)。此外,可以對所有門面工作執行緒(屬於 Scheduler 例項)可以排隊的延遲任務的總數設定上限。
如上所述,不。我們實際上已經看到了一個例子:immediate() Scheduler。這個排程器不會修改程式碼執行的 Thread。
但在 reactor-test 庫中有一個更有用的例子:VirtualTimeScheduler。這個 Scheduler 在當前 Thread 上執行,但會用提交給它的任務應該執行的時間戳來標記所有任務。
然後它管理一個虛擬時鐘(得益於 Scheduler 還具有時鐘的職責),可以手動推進。當這樣做時,排隊在新的虛擬時間戳之前或同時執行的任務將被執行。
這在測試場景中非常有用,當您有一個帶有長間隔/延遲的 Flux 或 Mono,並且您想測試邏輯而不是時間。例如,像 Mono.delay(Duration.ofHours(4)) 這樣的程式碼可以在 100ms 內執行...
還可以想象圍繞 Actor 系統、ForkJoinPool、即將推出的 Loom 協程等實現一個 Scheduler...
關於 main
Thread人們經常詢問在
Scheduler的執行緒和 main 執行緒之間來回切換。從主執行緒切換到排程器顯然是可能的,但從任意執行緒切換到 main 執行緒是不可能的。這純粹是 Java 的限制,因為沒有辦法向 main 執行緒提交任務(例如,沒有 MainThreadExecutorService)。
現在我們熟悉了 Reactor 中執行緒的構建塊,讓我們看看這在運算子世界中如何體現。
我們已經確定,大多數運算子會繼續在其收到訊號的 Thread 上工作,除了基於時間的運算子(例如 Mono.delay、bufferTimeout() 等)。
Reactor 的理念是為您提供透過組合運算子來做正確事情的工具。執行緒也不例外:請看 subscribeOn 和 publishOn。
這兩個運算子只需一個 Scheduler,然後會在該排程器的一個 Worker 上切換執行。當然,兩者之間存在主要區別:)
publishOn(Scheduler s) 運算子這是當您想切換執行緒時所需的基本運算子。來自其源的傳入訊號在給定的 Scheduler 上釋出,有效地將執行緒切換到該排程器的一個工作執行緒。
這對於 onNext、onComplete 和 onError 訊號都是有效的。也就是說,從上游源流向S下游訂閱者的訊號。
因此,本質上,出現在此運算子之下的每個處理步驟都將在新的 Scheduler s 上執行,直到另一個運算子再次切換(例如,另一個 publishOn)。
讓我們以一個故意粗略的阻塞呼叫示例,但請記住,反應式鏈中的阻塞呼叫總是粗略的!:)
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
在上面的示例中,假設此程式碼在 main 執行緒上執行,每個 Flux.fromIterable 都在同一個 Thread 上發出其 List 的內容。然後我們在 map 中使用一個命令式阻塞 Web 客戶端來獲取每個 url 的主體,它“繼承”該執行緒(從而阻塞它)。因此,每個 subscribe 中的資料消費 lambda 也都在主執行緒上執行。
因此,所有這些 URL 都在主執行緒上按順序處理。
main from first list, got A
main from first list, got B
main from first list, got C
main from second list, got D
main from second list, got E
如果我們引入 publishOn,我們可以使程式碼效能更高,這樣 Flux 之間就不會相互阻塞。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
這可能會給我們以下輸出
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
第一個列表和第二個列表現在交錯排列了,太棒了!
subscribeOn(Scheduler s) 運算子在前面的例子中,我們看到了 publishOn 如何透過將阻塞工作的觸發器(要獲取的 URL)的釋出切換到提供的 Scheduler,從而將阻塞工作分流到單獨的執行緒。
由於 map 運算子在其源執行緒上執行,因此在 map 之前放置 publishOn 來切換該源執行緒是有效的。
但是,如果那個獲取 URL 的方法是其他人編寫的,並且他們很遺憾忘記新增 publishOn 怎麼辦?有沒有辦法影響上游的 Thread?
在某種程度上,是有的。這就是 subscribeOn 派上用場的地方。
此運算子改變 subscribe 方法的執行位置。由於訂閱訊號向上流動,它直接影響源 Flux 訂閱並開始生成資料的位置。
因此,它似乎可以作用於反應式運算子鏈的向上和向下部分(只要沒有混入 publishOn)
//code provided in library you have no write access to
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url)); //oops!
}
//your code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));
就像我們的第二個 publishOn 示例一樣,該程式碼將正確輸出類似以下內容
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
那麼發生了什麼?
subscribe 呼叫仍然在 main 執行緒上執行,但它們將 subscribe 訊號傳播到它們的源 subscribeOn。反過來,subscribeOn 將相同的訊號傳播到它自己的源 fetchUrls,但在一個 boundedElastic Worker 上。
在 fetchUrls 返回的 Flux 序列中,map 訂閱在 boundedElastic 工作執行緒上,range 也是。range 開始生成資料,仍然在 boundedElastic 工作執行緒上。
這沿著資料路徑繼續向下,每個訂閱者在其源執行緒(boundedElastic 執行緒)上執行 onNext。
最後,在 subscribe(...) 呼叫中配置的 lambdas 也在 boundedElastic 執行緒上執行。
重要
區分訂閱的行為和傳遞給
subscribe()方法的 lambda 至關重要。此方法訂閱其源Flux,但 lambda 在處理結束時執行,即資料流經所有步驟(包括切換到另一個執行緒的步驟)之後。因此,lambda 執行的
Thread可能與訂閱Thread不同,即呼叫subscribe方法的執行緒。
如果我是 fetchUrls 庫的作者,我可以透過略微不同的方式利用 subscribeOn,讓每個獲取操作都在自己的 Worker 上執行,從而使程式碼效能更高。
final Flux<String> betterFetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.flatMap(url ->
//wrap the blocking call in a Mono
Mono.fromCallable(() -> blockingWebClient.get(url))
//ensure that Mono is subscribed in an boundedElastic Worker
.subscribeOn(Schedulers.boundedElastic())
); //each individual URL fetch runs in its own thread!
}
subscribeOn 將在訂閱階段全程作用,從底部到頂部,然後沿資料路徑直到遇到 publishOn(或基於時間的運算子)。
讓我們考慮以下示例
Flux.just("hello")
.doOnNext(v -> System.out.println("just " + Thread.currentThread().getName()))
.publishOn(Scheduler.boundedElastic())
.doOnNext(v -> System.out.println("publish " + Thread.currentThread().getName()))
.delayElements(Duration.ofMillis(500))
.subscribeOn(Schedulers.elastic())
.subscribe(v -> System.out.println(v + " delayed " + Thread.currentThread().getName()));
這將列印
just elastic-1
publish boundedElastic-1
hello delayed parallel-1
我們應該逐步解釋發生了什麼
subscribe 在 main 執行緒上呼叫,但由於緊隨其上的 subscribeOn,訂閱迅速切換到 elastic 排程器。elastic 上訂閱,從底部到頂部。just 在 elastic 排程器上發出其值。doOnNext 在同一個執行緒上接收該值並將其打印出來:just elastic-1publishOn:來自 doOnNext 的資料在 boundedElastic 排程器上傳播到下游。doOnNext 在 boundedElastic 上接收其資料並相應地列印 publish bounderElastic-1。delayElements 是一個時間運算子,因此預設情況下它在 Schedulers.parallel() 排程器上釋出資料。subscribeOn 除了在同一執行緒上傳播訊號外,不做任何事情。subscribe(...) 的 lambda 在接收資料訊號的執行緒中執行,因此 lambda 列印 hello delayed parallel-1在本文中,我們瞭解了 Scheduler 抽象以及它如何實現 VirtualTimeScheduler 等高階用法。
然後我們學習瞭如何在反應式序列中間切換執行緒(或者說是 Scheduler 工作執行緒),以及 publishOn 和 subscribeOn 之間的區別。
在下一部分中,我們將深入探討庫的內部,以描述為確保 Reactor 效能而採取的一些最佳化措施。
與此同時,祝您反應式程式設計愉快!