領先一步
VMware 提供培訓和認證,助您加速前進。
瞭解更多本文是系列文章的一部分
Spring Cloud Sleuth 最近成為了 Micrometer Tracing,它是 Micrometer 專案的一部分。大多數跟蹤插樁都集中在 Micrometer 中,位於新的可觀測性 API之下。這些專案的目標是讓任何應用程式都具備可觀測性——以包含關聯識別符號的指標、跟蹤和日誌的形式。為了實現這一目標,庫需要一種方式來傳輸上下文資訊。當應用程式以任何形式處理非同步時,這項任務就變得相當具有挑戰性。在上一篇文章中,我們回顧了使用 ThreadLocal
和 Reactor Context
進行上下文傳播的基礎知識。
Spring Cloud Sleuth 在處理非同步上下文傳播的方式上經歷了多次重大調整。由於 Sleuth 需要與不一定具有響應式 API 的第三方插樁庫打交道,因此擁有一種既定的方式來使上下文對它們可用至關重要。這些庫通常不假設非同步,而是依賴靜態的 ThreadLocal
狀態。多年來,ThreadLocal
為 JVM 應用程式提供了隱式的上下文儲存,用於驅動可觀測性功能。隨著時間的推移,Project Reactor 在底層原語之上引入了各種鉤子(hook)和包裝(wrapping)機制,以便實現響應式和命令式之間的橋接。在本文中,我們將探討將上下文傳播到 ThreadLocal
值的方法,並討論它們可能出現的問題。我們將探討 Sleuth 採取的方法,並總結我們發現的一種兼顧效能和語義合理性的良好折衷方案。
在描述 Sleuth 引入的方法之前,我們應該考慮在命令式世界和響應式世界之間進行橋接所存在的危險。
在上一篇文章中,我們討論了一些 Thread
切換和相關副作用的潛在問題。現在,我們將利用 Reactor 的外掛機制來解決我們可能遇到的問題,從而更深入地探索響應式程式設計的特性。
總結 Spring Cloud Sleuth 遇到的所有問題是一個不斷變化的目標。此外,許多組織都有自己的上下文傳播機制實現,例如用於填充 SLF4J 的 MDC
。本文無意全面總結所有潛在陷阱。它旨在建立一些直覺,幫助你理解最終的真理:要麼遵循響應式程式設計規則,要麼準備在最意想不到的時刻失敗。
正如我們所知,響應式鏈可以使用不同的 Thread
來傳播訊號。從上一篇文章中我們瞭解到,當執行在另一個 Thread
上繼續時,在任務執行時恢復上下文是合理的。Project Reactor 將管理 Thread
的任務委託給 Scheduler
。它還提供了一個專用的鉤子,允許攔截特定工作單元的排程和執行:Schedulers.onScheduleHook
。它的工作方式類似於上一篇文章中的 WrappedExecutor
。讓我們來看一個可能考慮使用它的場景。
在第一部分中,我們瞭解到在響應式鏈中不能持續依賴 ThreadLocal
值可用。如果我們嘗試在訂閱時初始化它,並在 doFinally
運算子中清除它呢?我們的應用程式可以使用有限數量的 Thread
來處理許多請求,其中一些是併發的。由於這些平臺 Thread
可以被重用,我們需要在處理另一個請求之前對與前一個請求相關的任何 ThreadLocal
狀態進行清理,以確保不同的請求不會使用殘留的關聯識別符號。
接下來的程式碼示例是對上一部分中我們編寫的未使用 Reactor Context
的程式碼進行的修改。
handleRequest
方法的一種潛在實現可能如下所示
Mono<Void> handleRequest() {
return Mono.fromSupplier(() -> {
initRequest(); // <1>
return "test-product";
}).flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.doOnSuccess(v -> log("Done."))
.doFinally(signalType ->
CORRELATION_ID.remove()); // <2>
}
在 <1>
中,我們設定了 ThreadLocal
的值,在 <2>
中,我們嘗試清除它。
我們還修改了執行的操作,以便能夠在 addProduct
方法中新增一個人工延遲
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return Mono.<Void>empty()
.delaySubscription(Duration.ofMillis(10),
Schedulers.single()); // <1>
});
}
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return Mono.just(true);
});
}
注意,在 <1>
中,我們透過延遲訂閱並使用 Schedulers.single()
在 10ms 後啟動訂閱來引入非同步。delaySubscription
將使用該 Scheduler
底層的 ScheduledExecutorService
並在延遲後在另一個 Thread
上啟動訂閱。
從上一篇文章中我們知道,在這種情況下我們需要恢復 ThreadLocals
,因此我們使用提到的 Scheduler
外掛來實現這一點
Schedulers.onScheduleHook("context.propagation", WrappedRunnable::new);
在 Reactor 的 Scheduler
上執行的每個任務都會恢復 ThreadLocal
的值,因此我們應該安全無虞。
現在,讓我們模擬兩個順序請求,它們之間有一個日誌,用於驗證 CORRELATION_ID
是否已正確清除
log("Got first request, calling handler");
handleRequest().block();
log("Got second request, calling handler");
log("There should be no correlationId on this line!");
handleRequest().block();
日誌如下所示
[ main][ null] Got first request, calling handler // <1>
[ main][ 8658769170992364531] Adding product: test-product
[ single-1][ 8658769170992364531] Notifying shop about: test-product
[ single-1][ 8658769170992364531] Done.
[ main][ 8658769170992364531] Got second request, calling handler
[ main][ 8658769170992364531] There should be no correlationId on this line!
[ main][ 711436174608061530] Adding product: test-product
[ single-1][ 711436174608061530] Notifying shop about: test-product
[ single-1][ 711436174608061530] Done.
與 “test-product”
處理相關的日誌具有正確的關聯識別符號。然而,在請求之間發生了什麼?我們期望 ThreadLocal
在 doFinally
中被清除。不幸的是,請求之間的日誌仍然包含一個識別符號。那麼發生了什麼?
注意 “Notifying shop about”
的日誌發生在 Thread
single-1
上。訊號是在該 Thread
上傳遞的,所以我們在那裡清除了 ThreadLocal
,但主 Thread
仍然被汙染了(在 <1>
中)。現在,處理程式之外的執行可以使用錯誤的關聯識別符號用於不同的目的。我們可以嘗試透過在伺服器層(負責分派請求)新增清理邏輯來緩解此問題,並確保用於請求的每個 Thread
不被汙染。但是,如果我們的管道更復雜,這並不能挽救所有其他潛在的 Scheduler
Thread
。
這種方法在允許應用程式在響應式鏈中透明地使用 ThreadLocal
值方面取得了相當大的進展。從效能角度來看,它也是合理的,因為它不會在每個運算子周圍設定和重置 ThreadLocal
,而只在處理專案時發生 Thread
切換時進行。然而,它也表明仍然存在一些未解決的副作用。在接下來的示例中,我們將體驗並嘗試解決不同的場景。
使用 ThreadLocal
作為上下文元資料傳輸機制的策略還有一個常見問題,即使用了 Reactor 之外的非同步庫,並且該庫自行切換了 Thread
。當執行切換到不受包裝的 ExecutorService
控制的不同 Thread
時,上下文就會丟失。
讓我們實際看看。我們將重用目前為止看過的大部分程式碼,但對 notifyShop
方法進行一項更改。它現在透過使用以下方法來模擬遠端呼叫
Mono<Boolean> makeRequest(String productName) {
return Mono.fromFuture(CompletableFuture.supplyAsync(() -> true,
CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS)));
}
所以 notifyShop
看起來像這樣
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName);
});
}
如果我們觸發處理程式一次
handleRequest().block();
我們得到以下輸出
[ main][ 683056557275963407] Adding product: test-product
[ single-1][ 683056557275963407] Notifying shop about: test-product
[l-worker-1][ null] Done!
日誌縮短了 Thread
名稱以提高可見性,但 l-worker-1
實際上是 ForkJoinPool.commonPool-worker-1
的縮短版。
正如我們所見,我們的執行在一個我們無法控制的通用 ForkJoinPool
上繼續進行。一個問題是,從該 Thread
切換開始,我們就再也看不到我們的關聯識別符號了;另一個問題是,我們在一個實際上缺少關聯資訊的 Thread
上執行了清理操作。
我們可以透過 Executor
或任務包裝來潛在地改善(部分)情況,如前一篇文章所述,但我們並非總是擁有這樣的控制權——例如,如果我們呼叫一個使用 CompletableFuture
的外部庫。
我們幾乎準備好討論 Sleuth 的策略了。Schedulers.onScheduleHook
對於響應式處理中可能發生的非顯而易見的 Thread
切換提供了有限的能力。我們需要對操作的執行有更多的控制。我們將透過引入兩種外部服務通訊方式來演示這些限制。
addProduct
方法現在發起一個遠端請求,並在我們控制的 Scheduler
上釋出結果。將繁重的計算轉移到不同的 Thread
上是很常見的做法。為此,我們使用了 publishOn
運算子
Mono<Void> addProduct(String productName) {
return Mono.defer(() -> {
log("Adding product: " + productName);
return makeRequest(productName)
.publishOn(Schedulers.single())
.then();
});
}
notifyShop
方法模擬將結果對映到可能多個 Publisher
中。這在響應是一個複合結果的情況下是典型的場景——例如,如果響應是一個 JSON 陣列,我們打算將每個項作為對另一個服務的單獨呼叫進行處理,或者豐富單個結果。我們使用一個簡化版本,只取一個結果
Mono<Boolean> notifyShop(String productName) {
return Mono.defer(() -> {
log("Notifying shop about: " + productName);
return makeRequest(productName)
.flatMapMany(result ->
Flux.just("result")
.map(x -> result))
.take(1)
.single();
});
}
現在我們跳過處理程式,手動初始化關聯識別符號,然後訂閱這些鏈
initRequest();
addProduct("test-product")
.doOnSuccess(v -> log("Added."))
.block();
initRequest();
notifyShop("test-product")
.doOnSuccess(v -> log("Notified."))
.block();
讓我們看看輸出
[ main][ 6606077262934500649] Adding product: test-product
[ single-1][ null] Added.
[ main][ 182687922231622589] Notifying shop about: test-product
[l-worker-1][ null] Notified.
這是預料之中的,因為 doOnSuccess
中發生的兩個日誌都是由 CompletableFuture
在 ForkJoinPool
Thread
上傳遞值觸發的。即使我們有 Scheduler
包裝,結果首先是在我們無法控制的 Thread
上傳遞的,因此即使在 addProduct
中使用 publishOn
也無濟於事。
我們能做些什麼來改善這種情況嗎?Reactor 有一個細粒度的外掛系統,它允許我們在任何管道中裝飾任何運算子。我們可以嘗試使用它來恢復關聯識別符號。
這些外掛將使用自定義的 Subscriber
實現,該實現在訂閱時捕獲關聯識別符號
static class CorrelatingSubscriber<T> implements CoreSubscriber<T> {
final CoreSubscriber<T> delegate;
Long correlationId;
public CorrelatingSubscriber(CoreSubscriber<T> delegate) {
this.delegate = delegate;
}
@Override
public void onSubscribe(Subscription s) {
delegate.onSubscribe(s);
this.correlationId = CORRELATION_ID.get();
}
@Override
public void onNext(T t) {
CORRELATION_ID.set(this.correlationId);
delegate.onNext(t);
}
@Override
public void onError(Throwable t) {
CORRELATION_ID.set(this.correlationId);
delegate.onError(t);
}
@Override
public void onComplete() {
CORRELATION_ID.set(this.correlationId);
delegate.onComplete();
}
}
要更改運算子,使其實現將呼叫委託給實際的 Subscriber
例項,我們可以使用 Operators.lift
方法
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber));
首先,我們嘗試一個外掛,它允許我們修改鏈中的每一個運算子
Hooks.onEachOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
讓我們再次執行示例並檢視輸出
[ main][ 7295088917002526647] Adding product: test-product
[ single-1][ 7295088917002526647] Added.
[ main][ 383851863754448684] Notifying shop about: test-product
[l-worker-1][ 383851863754448684] Notified.
哇!即使在如此複雜的場景中,我們也設法獲取了關聯識別符號。最初的訂閱行為捕獲了 ThreadLocal
值並在每一步都恢復了它。即使 notifyShop
方法中使用的 flatMap
(它會自行訂閱)也奏效了,因為在訂閱另一個 Thread
之前,ThreadLocal
已從先前的捕獲中填充!這聽起來確實很棒,但這種方法也有缺點。第一個也是最明顯的缺點是效能。傳播發生在每個運算子上。使用這種技術,我們首先包裝(decorate)每個物件,並在每一步都進行 ThreadLocal
訪問。所有這些操作都很昂貴。要了解更多資訊,請觀看Oleh 關於響應式效能的演講。
所以我們嘗試另一種方法。這次,我們將使用一個外掛,它會附著到鏈中被視為最後一個運算子的運算子上——也就是緊接在 subscribe()
呼叫之前的運算子。
對於響應式鏈可以得出一個觀察結論:對於同步運算子,我們不需要在每個單獨的操作(例如 filter
或 map
)中恢復最初捕獲的上下文,而只需要在鏈中最後一個運算子被訂閱時恢復。只要不涉及 Thread
邊界的跨越,這種機制就能奏效。為了支援可能跨越這些邊界的運算子(例如 flatMap
,它涉及訂閱一個新的 Publisher
),需要一個特殊的技巧。它將對映的結果視為它們操作的內部 Publisher
的最後一個運算子。
讓我們嘗試這種方法
Hooks.onLastOperator(
Operators.lift((scannable, subscriber) ->
new CorrelatingSubscriber<>(subscriber)));
並執行
[ main][ 2122332013640150746] Adding product: test-product
[ single-1][ 2122332013640150746] Added.
[ main][ 459477771449275997] Notifying shop about: test-product
[l-worker-1][ null] Notified.
它在 addProduct
中與 publishOn
一起工作正常,但在 notifyShop
中的 flatMap
上失敗了。
讓我們分析一下為什麼 notifyShop
會失敗。我們呼叫 block()
會捕獲 ThreadLocal
併為每個向下遊傳播的訊號恢復它。透過在 flatMapMany
中完成的對映,我們正在處理之前提到的非同步邊界。我們的外掛實際上應用於內部源 (Flux.just().map().single()
)。
然而,這些努力仍然沒有奏效,儘管自定義的 Subscriber
在 flatMapMany
內部被呼叫並嘗試恢復 ThreadLocal
的值。觸發內部訂閱的訊號是在我們無法控制的 Thread
上啟動的,因此我們根本沒有 ThreadLocal
可以捕獲。
在使用 publishOn
運算子的情況下則不同。對其的訂閱始於我們控制的 Thread
。因此,當處理來自 makeRequest()
方法的結果訊號時,它只會在我們控制的 Thread
上傳遞。.doOnSuccess(v -> log("Added."))
的執行發生在與 flatMapMany
不同步的 Thread
邊界之後。
這就是為什麼 onEachOperator
能覆蓋更多情況——它在每一步都恢復初始值,無論是否存在非同步邊界。儘管如此,onLastOperator
的效能略優於 onEachOperator
。
還有一個外掛,如果將其與之前的鉤子結合使用,我們可以完全控制響應式傳遞。Spring Cloud Sleuth 也使用了它。我們考慮的是最近引入的外掛 Hooks.addQueueWrapper
。不過,我們將不詳細探討它。它可以解決 Reactor 中工作竊取機制引入的問題。非同步運算子,例如 flatMap
,可以在將訊號傳遞給運算子的各種 Thread
上取得進展。想象一個背壓場景,處理暫停了一段時間。在某個時刻,一個新的 Thread
可以接管併發出 Subscription.request(n)
呼叫,這將導致累積的值立即傳遞。現在你可以問自己:“什麼累積的值?”這是一個好問題。Reactor 中的許多運算子使用內部 Queue
來實現背壓或保留序列傳遞語義。因為這些 Queue
的 draining 可以在任何 Thread
上發生,所以上下文資訊應該附加到儲存在 Queue
中的每個訊號上——也就是說,用於我們關聯目的的 ThreadLocal
值。這就是為什麼我們需要一個 Queue
包裝器——在將值提交到 Queue
時,我們捕獲 ThreadLocal
狀態。當從 Queue
中檢索值時,狀態就會恢復。
在展示了在 reactive-streams 術語之外操作的風險以及我們可以用來傳播 ThreadLocal
上下文的機制後,讓我們總結一下 Spring Cloud Sleuth 使用的四種策略
DECORATE_ON_EACH
DECORATE_ON_LAST
DECORATE_QUEUES
MANUAL
前三種策略試圖利用響應式運算子的一些特性,結合 Reactor 的外掛機制,並使用 ThreadLocal
作為內部傳輸機制以及與插樁庫共享上下文資料的方式。前三種策略還假定使用 Schedulers.onScheduleHook 進行 Scheduler
包裝。另一方面,最後一種策略則利用了 Reactor 繫結到 Subscriber
的 Context
。
此策略使用我們之前見過的 Hooks.onEachOperator
外掛。效能影響巨大,儘管 Sleuth 添加了許多最佳化以在非必要時不進行恢復。通常,這種方法非常有效。但它也非常激進,因此如果某個運算子需要更改上下文,就可能難以處理。下游運算子將看不到更改,因為來自初始訂閱的上下文在每一步都被恢復了。
使用 Hooks.onLastOperator
是為了提高效能。這種方法可能會失敗,因為它提供了靈活性。如果上游運算子修改了上下文,下游操作會看到更改。這帶來了風險,如果某個運算子清除了該上下文,那麼在該上下文被排程到包裝的 Scheduler
之前,上下文就會丟失。另一個風險是我們之前示例中看到的情況,即訂閱發生在某個 Thread
上,但請求資料發生在另一個不受 Reactor 控制的 Thread
上。
DECORATE_QUEUES
是前一種策略的演進,它糾正了一些錯誤場景(請求資料帶外發生或多個 Thread
釋出資料),但並非所有場景。我們之前描述過如何使用 Hooks.addQueueWrapper
外掛。Queue
包裝的一個已知問題是,在處理完一個專案後,沒有可靠的方法進行清理。上下文在從 Queue
中檢索專案時恢復。沒有圍繞透過下游運算子傳輸的專案處理的範圍。因此,這種方法也容易汙染 ThreadLocal
儲存。最近在 draining 過程中進行了一些改進以限制影響。
在這種策略中,Sleuth 唯一做的事情是在訂閱時將 ThreadLocal
的值作為快照捕獲到 Reactor 的 Context
中。使用者需要在相關位置提取該快照並填充 ThreadLocal
,以便將其提供給插樁庫。對於支援的跟蹤插樁,例如 Zipkin 和 Brave,Sleuth 透過使用作用域(scoping)的概念來恢復 ThreadLocal
—— ThreadLocal
會為插樁而恢復,並在快照關閉後立即消失。這是效能最高的方法,儘管它需要使用者進行手動(正如其名稱所示)處理。
在區域性作用域中使用 Reactor Context 填充 ThreadLocal
被證明既具有高效能,又符合響應式鏈的工作方式。將上下文與 Subscriber
關聯是一種經過驗證的方法,不會意外導致上下文資料丟失。在下一篇文章中,我們將展示 Reactor 3.5 和 Micrometer 1.10 如何將手動方法提升到新的水平,並提供一種結構化的方法來跨越響應式和命令式邊界傳播上下文。