領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多此文章是系列文章的一部分
我們在上一篇文章中得出的結論是,Spring Cloud Sleuth 的 MANUAL 上下文傳播策略既高效又提供了正確的語義。基於大量經驗,Spring、Micrometer 和 Reactor 團隊建立了一個新的上下文傳播庫。其目標是封裝在 ThreadLocal 值和類似 Map 的結構之間傳輸上下文資料的關注點。Micrometer 1.10 和 Reactor 3.5 都以此為基礎,在 Reactor 和命令式程式碼之間提供一流的體驗。透過使用 Reactor Context,我們隱式地暴露了 ThreadLocal 值,這些值被 Micrometer 用於儀表化跟蹤庫,並用於填充 SLF4J 的 MDC 以提供包含跟蹤識別符號的日誌。
在本文中,我們將採用與以前不同的方法。我們將從可用的頂層 API 開始,然後解釋幕後發生的事情,而不是從頭開始構建我們的知識。最後,您將能夠:
讓我們回顧第一篇文章中的示例,其中我們展示了 delayElement 運算子如何導致響應式鏈丟失關聯識別符號。讓我們回想一下程式碼,從我們的操作開始:
Mono<Void> addProduct(String productName) {
log("Adding product: " + productName);
return Mono.empty(); // Assume we’re actually storing the product
}
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return Mono.just(true); // Assume we’re actually notifying the shop
}
然後我們需要回憶繫結請求處理程式。
Mono<Void> handleRequest() {
initRequest(); <1>
log("Assembling the chain"); // <2>
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1)) // <3>
.flatMap(product ->
Flux.concat(
addProduct(product), // <4>
notifyShop(product)).then())
}
從 Reactor 3.5.0 開始,Reactor Context 能夠與 Micrometer 旗下的一個新庫(名為 context-propagation)整合。我們將在本文末尾更詳細地描述這種整合。在 Reactor 3.5+ 中,當 context-propagation 庫在類路徑上時,我們可以預期在 handle 運算子和新的 tap 運算子中進行日誌記錄時,我們的 ThreadLocal 值將存在。
要傳播我們的自定義 ThreadLocal,我們需要註冊一個 ThreadLocalContextAccessor。
ContextRegistry.getInstance()
.registerThreadLocalAccessor("CORRELATION_ID",
CORRELATION_ID::get,
CORRELATION_ID::set,
CORRELATION_ID::remove);
目前,context-propagation 庫的細節對於實現我們的需求並不重要。我們唯一需要知道的是,我們使用了鍵 CORRELATION_ID,它將與 Reactor Context 一起用於在我們的特殊運算子中恢復 ThreadLocal。讓我們修改其餘程式碼以使用它們並在指定位置進行日誌記錄。
我們只需要對請求處理程式進行一項更改。
Mono<Void> handleRequest() {
initRequest(); // <1>
log("Assembling the chain");
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.contextCapture(); // <2>
}
我們所做的唯一修改是返回給呼叫者的鏈末尾的 <2> contextCapture 運算子。此運算子的作用是捕獲當前 ThreadLocal 值(已在 ContextRegistry 中註冊 ThreadLocalAccessor 例項),並將其儲存在 Reactor Context 中的相同鍵下。在此特定實現中,我們唯一的希望是訂閱在組裝階段之後立即發生,如 <1> 中,我們設定了 ThreadLocal 值。
接下來,我們將使用 tap 運算子新增日誌記錄。
Mono<Void> addProduct(String productName) {
return Mono.<Void>empty()
.tap(() -> new DefaultSignalListener<>() {
@Override
public void doOnComplete() throws Throwable {
log("Adding product: " + productName);
}
});
}
在這裡,我們正在擴充套件 reactor-core 的 reactor.core.observability 包中的 DefaultSignalListener。我們只對完成訊號感興趣,在該訊號處我們執行日誌操作。
對於 handle 運算子,我們將修改 notifyShop 方法。
Mono<Boolean> notifyShop(String productName) {
return Mono.just(true)
.handle((result, sink) -> {
log("Notifying shop about: " + productName);
sink.next(result);
});
}
現在讓我們看看,當我們呼叫處理程式時,是否能得到正確的輸出。
handleRequest().block();
結果如下:
[ main][ 643805344761543048] Assembling the chain
[parallel-1][ 643805344761543048] Adding product: test-product
[parallel-1][ 643805344761543048] Notifying shop about: test-product
太棒了!這實際上與 Spring Cloud Sleuth 的 MANUAL 策略相同,但已整合到 Reactor 的內部,因此您無需手動恢復 ThreadLocal 值。我們選擇 tap 和 handle 是因為這些運算子可以訪問繫結到 Subscriber 的 Context,並允許對具體的 Reactive Streams 訊號採取行動。
記住:Reactor Context 用於寫入,ThreadLocals 用於讀取。
事實上,我們的請求處理程式有點危險。如果延遲訂閱操作,我們將丟失關聯識別符號。考慮:
Mono<Void> requestHandler = handleRequest(); // <1>
Thread subscriberThread = new Thread(requestHandler::block); // <2>
subscriberThread.start();
subscriberThread.join();
輸出如下:
[ main][ 1388809065574796038] Assembling the chain
[parallel-1][ null] Adding product: test-product
[parallel-1][ null] Notifying shop about: test-product
組裝發生在 <1>,ThreadLocal 在 main 中設定。然而,訂閱發生在 <2> 中的一個新 Thread 上,該 Thread 沒有 ThreadLocal 值可供捕獲。因此,我們的日誌沒有關聯識別符號。我們可以用 Mono.defer() 包裝處理程式的正文來解決此問題。但是,與其這樣做,不如考慮我們是否首先需要實際設定 ThreadLocal。
在呼叫 Reactor 鏈的命令式應用程式中,例如呼叫 WebClient 的 Spring MVC 控制器方法,ThreadLocal 值已經建立,contextCapture 將捕獲它們並將其儲存在 Context 中。
另一方面,在像 WebFlux 這樣的響應式棧中,直接使用 contextWrite 更合理。
我們知道 Reactor 將使用其 Context 的內容來恢復 ThreadLocal 值。如果我們直接將所需值儲存在 Context 中,而不是從當前狀態捕獲它們,我們將稍微提高效能,但也會提高與函數語言程式設計正規化的符合性。讓我們試試看。
Mono<Void> handleRequest() {
// initRequest(); -- no write to ThreadLocal
log("Assembling the chain");
return Mono.just("test-product")
// <1>
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then())
.contextWrite(
Context.of("CORRELATION_ID", correlationId())); // <2>
}
我們來執行它。
[ main][ null] Assembling the chain
[parallel-1][ 7059587638538899074] Adding product: test-product
[parallel-1][ 7059587638538899074] Notifying shop about: test-product
太棒了!我們實際的響應式鏈包含一個正確的關聯識別符號。
不幸的是,我們在組裝階段丟失了一個。其中一個原因是日誌沒有發生在 handle 或 tap 運算子中。如果我們在 <1> 中使用 tap 運算子新增一個初始日誌,我們就會沒問題。帶有關聯識別符號的 Context 繫結到 <2> 上游的鏈。如果我們在 contextWrite 呼叫之後新增一個日誌 tap 運算子,我們將看不到正確的關聯識別符號——在該階段附加的 Context 是一個不同的上下文,不包含我們的識別符號。我們稍後會回到這個問題,但首先,讓我們考慮是否可以簡化程式碼並避免使用特殊運算子。
當 reactor-core 3.5.0 釋出時,它被包含在 Spring Framework 6.0 和 Spring Boot 3.0 中。使用 Spring Cloud Sleuth 進行跟蹤的現有 Spring 使用者習慣於日誌中填充 trace-id 和 span-id 值(類似於我們的關聯識別符號)。切換到新正規化(可觀察性是 Spring 核心產品套件的一部分)將要求現有應用程式重寫其日誌記錄以使用 handle 和 tap 運算子。我們繼續思考如何使更多運算子能夠恢復 ThreadLocal 值。
正如我們在上一篇文章中看到的,恢復可以跨多個運算子的 ThreadLocal 值並非易事。選擇 handle 和 tap 是因為它們不會讓 ThreadLocal 值洩漏。執行使用者程式碼不會傳播任何訊號。當用戶程式碼執行時,ThreadLocal 值是存在的。然後捕獲結果。最後,清除 ThreadLocal 上下文。只有之後,訊號才會響應式地傳播到下游運算子。此外,我們希望更具選擇性,因為在每個運算子中執行恢復會產生大量開銷,正如第 2 部分中所討論的。
我們仔細重新思考了一切,並提出了一個可以組合到以下呼叫的想法(從 reactor-core 3.5.3 開始)
Hooks.enableAutomaticContextPropagation();
我們可以將其新增到應用程式的 main 方法中。
我們現在可以恢復動作方法的初始實現。
Mono<Void> addProduct(String productName) {
log("Adding product: " + productName);
return Mono.empty();
}
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return Mono.just(true);
}
我們保持 handleRequest 方法和在新 Thread 上的訂閱不變。讓我們執行它。
[ main][ null] Assembling the chain
[parallel-1][ 8362586195225273235] Adding product: test-product
[parallel-1][ 8362586195225273235] Notifying shop about: test-product
成功!
有了這個功能,我們可以將使用 Spring Cloud Sleuth 的現有程式碼庫遷移到新的 Spring Framework,而無需對日誌記錄方式進行任何更改。有了上述鉤子,如果您將 Spring Boot Actuator 與 Micrometer Tracing 結合使用,SLF4J 日誌將填充跟蹤資訊,而無需執行任何操作。很快,Spring Boot 將自動為您呼叫該鉤子。
我們提到我們將回到組裝時日誌的問題。到目前為止,我們一直在請求處理邏輯中啟動關聯識別符號生成過程。理想情況下,我們的處理程式應該由伺服器呼叫,並且返回的 Publisher(Flux 或 Mono)由呼叫程式碼訂閱。我們的處理程式恢復到初始狀態:
Mono<Void> handleRequest() {
log("Assembling the chain");
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product)).then());
}
讓我們透過將上下文附加到返回的 Mono 來模擬伺服器程式碼。
Mono<Void> requestHandler = handleRequest()
.contextWrite(Context.of("CORRELATION_ID", correlationId()));
然後我們需要執行它。
requestHandler.block();
組裝時間仍然缺少關聯識別符號。
[ main][ null] Assembling the chain
[parallel-1][ 5508113792645841519] Adding product: test-product
[parallel-1][ 5508113792645841519] Notifying shop about: test-product
contextWrite 運算子在訂閱時(以及其他生命週期事件)恢復 ThreadLocal 值。為了讓使用者程式碼在組裝時擁有日誌,對該程式碼的整個呼叫需要成為響應式鏈的一部分。這樣,使用者程式碼在外部 Mono 訂閱時執行,並且返回的內部 Mono 立即訂閱。對於整個執行,如果我們這樣做,外部 Mono 的 Context 將在 ThreadLocal 中可用,在我們的“框架”程式碼中:
Mono<Void> requestHandler = Mono.defer(() -> handleRequest())
.contextWrite(Context.of("CORRELATION_ID", correlationId()));
我們所需要做的就是使用 Mono.defer() 並將 Context 附加到它上面。
幸運的是,Spring Framework 執行良好,並且也在訂閱階段處理了我們的組裝。
這種新方法看起來非常有前景。人們可能會想,鑑於過去採取的方法,這種新機制將如何被打破?我們對這種方法更有信心,因為它更符合 Reactive Streams 的本質。過去不基於 Reactor Context 的方法存在一個主要的誤解——它們將 ThreadLocal 值向下傳播——希望在某個時候進行清理。然而,傳播並沒有語義邊界來停止。
依賴 ThreadLocal 值下游傳播也可能是錯誤的來源。響應式庫向上遊和下游傳播訊號。一個訊號可能觸發另一個訊號,但它並非必須如此。不同的 Thread 可以繼續處理。flatMap 類運算子執行的某些最佳化(例如預取)可以從上游請求和排隊值,而無需我們下游傳播機制的參與。如果我們希望即使在反壓或取消時記錄日誌時也能擁有上下文資訊,我們需要考慮所有可能的訊號。
一個重要的觀察來自 Context 決定邏輯邊界的方式。當您呼叫 contextWrite 並在 Context 中儲存一個值時,所有上游運算子都可以訪問修改後的版本。所有下游運算子將看不到修改,但會看到您的修改所基於的狀態。
繫結到 Subscriber 的 Context 的性質是我們新方法的基礎。我們修改了 contextWrite 運算子,使其在訂閱時、取消時和請求時訊號向上遊傳播時,將 ThreadLocal 值設定為反映當前 Context。但是,當訊號向下遊傳播時,它將這些 ThreadLocal 值重置為下游 Context 中表示的值。
我們仍然需要使用 Scheduler 包裝方法。我們還需要 Queue 包裝方法(為此我們需要改進生命週期語義)。
但是,我們可以考慮透過在這些情況下傳輸 Reactor Context 而不是捕獲 ThreadLocal 值來改善情況。這可以提高效能。
此外,當我們使用不受 Reactor 控制的 Publisher 或使用我們無法控制的 Thread 的源(例如使用 Mono.fromFuture() 示例來模擬遠端呼叫)時,我們仍然會丟失 ThreadLocal 值。目前的一種緩解措施是引入 contextWrite 運算子的語義邊界,這並不會真正改變 Context,就像 notifyShop 方法的這個變體一樣:
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return makeRequest(productName) // <1>
.contextWrite(Function.identity()) // <2>
.doOnNext(r -> log("Request done.")); // <3>
}
makeRequest 方法在系列的上一篇文章中定義。如果我們假設 makeRequest 是一個第三方庫呼叫,它使用我們無法控制的 Thread,我們也無法包裝它在 <1> 中以及在完成其操作的非同步程式碼中執行的程式碼。該鏈的任何日誌都不會填充關聯識別符號。傳播此類上下文將是庫作者的責任。但是,因為我們在 <2> 中使用了邊界,所以我們在 <3> 中的日誌包含關聯識別符號。
我們打算在 reactor-core 中新增必要的功能,為那些以 Reactor 無法控制的方式更改 Threads 的源提供這樣的邊界。
在命令式場景中,只調用響應式程式碼以使用阻塞訂閱(例如透過使用 block()),我們計劃自動執行 contextCapture 以透明地將當前 ThreadLocal 值傳播到響應式鏈中。例如,這在 Spring MVC 應用程式中與 WebClient 互動時將非常有用。
捕獲 ThreadLocal 狀態並在各個位置恢復它本身就是一個有趣的話題。通常,我們會想到多個 ThreadLocal 值,它們之間具有邏輯連線,或者與各種關注點對應的類似 Map 的結構。我們建立了一個專用庫,透過捕獲 ThreadLocal 的狀態並將其恢復到相應的目標中,從而在 ThreadLocal 和任意物件之間進行轉換。在前面的示例中,我們使用了 context-propagation 庫的一些 API。它在 Micrometer 旗下開源,如果您想在程式碼中使用它,它還提供了帶有示例的參考文件。
Project Reactor 使用 ServiceLoader JDK 機制註冊了一個處理 Reactor Context 的 ContextAccessor。另一方面,Micrometer 註冊了一個 ObservationThreadLocalAccessor,它處理 Micrometer Tracing 和其他檢測機制工作所需的 ThreadLocal 狀態,使用單一 Observation 概念。
我們強烈建議嘗試將 Spring Boot 與 Spring Boot Actuator 結合使用,以啟用跟蹤功能,親身體驗這種無縫的體驗。
在本系列部落格文章中,我們介紹了上下文傳播的基礎知識,並探討了命令式和響應式程式設計正規化之間橋接的歷史和現狀。我們衷心希望您現在能夠自信地使用我們實現的功能。在最佳情況下,如果您使用自動上下文傳播功能,您無需做太多工作。此外,在這種有趣的情況下,我們希望您的自定義傳播邏輯能夠利用本文中描述的原語。如果您有任何疑問,可以聯絡我們,或者在 GitHub 上報告問題。
本系列若無同事們逐字審查,便無法發表。我要感謝(按字母順序排列):Simon Baslé、Jay Bryant、Pierre De Rop、Oleh Dokuka、Marcin Grzejszczak、Robert McNees、Rossen Stoyanchev 和 Tadaya Tsuyukubo。
要使用所用的示例,請隨時使用我的 GitHub 倉庫中的相關包。