領先一步
VMware 提供培訓和認證,助您快速進步。
瞭解更多本文屬於系列文章
Spring Boot 3 和 Spring Framework 6 帶來了一種統一且一致的方式,以在使用了 Micrometer 的應用程式中啟用可觀測性 (Observability)。從 Spring Cloud Sleuth 到 Micrometer 的演進,以及 Observation API 和 Micrometer Tracing,促使我們整合了各種上下文傳播 (Context Propagation) 的方法。在本系列博文中,我們將解釋我們是如何在 Project Reactor 中實現上下文傳播支援,以滿足命令式 (Imperative) 庫的需求。透過從基礎開始構建理解,您將能夠使用這些構造並瞭解底層發生的情況。我們假設您對響應式程式設計概念有基本瞭解。如果您是新手或想複習知識,請參閱 Project Reactor 文件中的響應式程式設計簡介。
在本文中,我們將開發一個簡單的電商應用。我們的討論角度限定於這樣一個請求:新增一個產品並通知商店庫存中新增了產品。作為負責任的開發者,我們希望記錄針對特定請求所採取的所有步驟,以便在調查問題時,我們可以檢視日誌並瞭解發生了什麼。我們將探討如何在命令式風格下實現為日誌工具提供關於請求的上下文元資料的目標,並將其與 Project Reactor 更具函式式、宣告式的風格進行比較。接下來的文章將更詳細地探討我們為何以及如何需要在兩種程式設計風格之間建立橋樑。
為了識別屬於特定請求的日誌,我們需要一種關聯它們的方式。我們可以生成一個簡單的隨機識別符號,如下所示:
static long correlationId() {
return Math.abs(ThreadLocalRandom.current().nextLong());
}
我們需要一種方法,使關聯識別符號在日誌工具中可用。我們可以將關聯作為業務邏輯中每個方法呼叫的一部分,但這會非常侵入且冗長。
通常,第三方庫使用 JDK 的 ThreadLocal
來傳遞並非應用程式業務邏輯主要關注點的隱式資訊。
讓我們為關聯識別符號宣告一個靜態欄位
static final ThreadLocal<Long> CORRELATION_ID = new ThreadLocal<>();
這是我們的日誌方法。它列印當前 Thread
的名稱並格式化輸出
static void log(String message) {
String threadName = Thread.currentThread().getName();
String threadNameTail = threadName.substring(
Math.max(0, threadName.length() - 10));
System.out.printf("[%10s][%20s] %s%n",
threadNameTail, CORRELATION_ID.get(), message);
}
現在我們擁有了處理請求並使用隱式關聯識別符號記錄每個步驟所需的一切。
在每個請求開始時,應用程式會呼叫以下方法來初始化關聯
static void initRequest() {
CORRELATION_ID.set(correlationId()));
}
我們簡化的請求處理器執行以下操作
void handleRequest() {
initRequest();
addProduct("test-product");
notifyShop("test-product");
}
業務邏輯中的日誌記錄如下所示
void addProduct(String productName) {
log("Adding product: " + productName);
// ...
}
void notifyShop(String productName) {
log("Notifying shop about: " + productName);
// ...
}
我們可以預期應用程式會記錄這些行
[ main][ 8592000019542134146] Adding product: test-product
[ main][ 8592000019542134146] Notifying shop about: test-product
只要特定請求的執行發生在同一個 Thread
上,並且不與其他關注點交錯,ThreadLocal
就允許我們將業務邏輯與用於日誌記錄的元資料解耦。
假設此應用程式開始面臨更高的負載,需要處理大量併發請求。想象一下,我們可以使用一個非同步且非阻塞的伺服器實現,它要求我們提供非同步宣告,而不是命令式和阻塞步驟。
我們的請求處理器可以返回一個 CompletableFuture
,以非同步且非阻塞的方式處理請求
CompletableFuture<Void> handleRequest() {
return CompletableFuture
.runAsync(() -> addProduct("test-product"))
.thenRunAsync(() -> notifyShop("test-product"))
}
不幸的是,當我們執行非同步版本時,日誌不再包含關聯識別符號
[l-worker-1][ null] Adding product: test-product
[l-worker-1][ null] Notifying shop about: test-product
解決此問題的一個已知緩解方法是包裝非同步 API 執行的任務。包裝是指一種執行 ThreadLocal
上下文恢復的實現。當任務建立時,捕獲當前上下文。當工作 Thread
實際執行任務時,恢復該上下文。讓我們看看這在我們使用 Runnable
的示例中如何工作
class WrappedRunnable implements Runnable {
private final Long correlationId;
private final Runnable wrapped;
public WrappedRunnable(Runnable wrapped) {
this.correlationId = CORRELATION_ID.get();
this.wrapped = wrapped;
}
@Override
public void run() {
Long old = CORRELATION_ID.get();
CORRELATION_ID.set(this.correlationId);
try {
wrapped.run();
} finally {
CORRELATION_ID.set(old);
}
}
}
我們可以重新實現我們的處理器,如下所示
CompletableFuture<Void> handleRequest() {
return CompletableFuture
.runAsync(new WrappedRunnable(
() -> addProduct("test-product")))
.thenRunAsync(new WrappedRunnable(
() -> notifyShop("test-product")));
}
不幸的是,這帶來了很多開銷。幸運的是,JDK 有一個用於執行非同步任務的 API:Executor
介面。在實際場景中,我們希望使用更全面的 API,即 ExecutorService
。然而,為了說明目的,Executor
就足夠了。
讓我們來看看
static class WrappedExecutor implements Executor {
private final Executor actual;
WrappedExecutor(Executor actual) {
this.actual = actual;
}
@Override
public void execute(Runnable command) {
actual.execute(new WrappedRunnable(command));
}
}
讓我們重用 CompletableFuture
框架預設使用的通用 ForkJoinPool
,但用我們的實現對其進行包裝。現在我們的程式碼看起來像這樣
static Executor executor = new WrappedExecutor(ForkJoinPool.commonPool());
CompletableFuture<Void> handleRequest() {
return CompletableFuture
.runAsync(() -> addProduct("test-product"), executor)
.thenRunAsync(() -> notifyShop("test-product"), executor);
}
我們的日誌再次正常工作
[l-worker-1][ 7321040639376081961] Adding product: test-product
[l-worker-2][ 7321040639376081961] Notifying shop about: test-product
在某些場景下,CompletableFuture
框架可以提供以非阻塞方式處理非同步任務的手段。然而,在許多情況下,其有限的 API 表面及其行為特性可能會受到限制。例如,我們可能希望延遲處理並在系統達到容量時稍後恢復。使用 CompletableFuture
,所有建立的例項一旦建立就立即開始計算。我們可能還希望對資料流應用更精細的操作,而不是僅對單一計算單元進行操作。出於這些原因以及更多原因,我們可能會考慮使用響應式程式設計庫。我們將考慮使用 Project Reactor,它是 Spring 產品組合中預設的響應式實現。
為了提供一個健壯的非同步處理框架,Java 社群提出了 Reactive Streams 規範。它幫助建立了 JDK 之前所缺乏的通用詞彙——明確的訊號傳播、錯誤處理、終止和生命週期管理語義。它還允許內建背壓 (backpressure)。Spring 透過引入 WebFlux 採用了這種方法,使 Project Reactor 及其響應式型別成為 API 的一等公民。
Reactive Streams 為非同步流處理帶來了優雅且極簡的解決方案。然而,上下文傳播 (Context Propagation) 並非規範的一部分。響應式庫的非阻塞和非同步特性,加上潛在複雜的實現,使得使用 ThreadLocal
變得極其困難。原因是無法保證使用者的程式碼可以在哪個 Thread
上執行。只要實現保證序列交付,就允許進行各種最佳化,從而使使用者的程式碼與併發無關 (concurrency-agnostic),將處理併發的負擔轉移到庫的內部。
為了履行其保證,Java 中的響應式程式設計假定使用函數語言程式設計正規化來形成宣告式且可組合的流,這與不同 Thread
可以執行使用者提供的程式碼這一事實無關。只要使用者程式碼中沒有假設在特定 Thread
內執行的副作用,響應式庫就可以提供極其高效能的執行時,同時遵守規範。ThreadLocal
顯然違反了這一要求。
讓我們嘗試重寫我們的處理器以使用 Project Reactor。單個操作變為
Mono<Void> addProduct(String productName) {
log("Adding product: " + productName);
return Mono.empty(); // Assume we’re actually storing the product
}
Mono<Boolean> notifyShop(String productName) {
log("Notifying shop about: " + productName);
return Mono.just(true); // Assume we’re actually notifying the shop
}
讓我們嘗試使用上述程式碼
Mono<Void> handleRequest() {
initRequest();
log("Assembling the chain");
return Mono.just("test-product")
.flatMap(product ->
Flux.concat(
addProduct(product),
notifyShop(product))
.then())
}
我們幼稚的實現產生了預期的輸出
[ main][ 7224499961623309444] Assembling the chain
[ main][ 7224499961623309444] Adding product: test-product
[ main][ 7224499961623309444] Notifying shop about: test-product
上述實現在 main
Thread
中呼叫,且執行被限制在該 Thread
中。但是我們不應該做這樣的假設。
在處理器中,我們在傳播處理結果之前引入了輕微延遲。我們這樣做是為了演示幕後發生的隱式 Thread
切換。
Mono<Void> handleRequest() {
initRequest(); <1>
log("Assembling the chain"); // <2>
return Mono.just("test-product")
.delayElement(Duration.ofMillis(1)) // <3>
.flatMap(product ->
Flux.concat(
addProduct(product), // <4>
notifyShop(product))
.then())
}
執行時,會列印以下內容
[ main][ 6265915299594887150] Assembling the chain
[parallel-1][ null] Adding product: test-product
[parallel-1][ null] Notifying shop about: test-product
發生了什麼?為什麼其中一個日誌有關聯識別符號而其他日誌沒有?
當伺服器呼叫我們的處理器時,<1>
處的初始化設定了 ThreadLocal
關聯識別符號,並且 <2>
處的日誌能夠使用它。有響應式程式設計經驗的人會告訴您問題在於執行發生在不同的階段。ThreadLocal
是在組裝時 (assembly time) 設定的。“您也應該在訂閱時 (subscription time) 恢復它”會是一個建議。稍後我們會回到這一點。如果“組裝時”、“訂閱時”和“執行時 (execution time)”這些術語讓您感到困惑,請參閱 Simon 的博文或觀看同名演講中的精彩解釋。
雖然方法會立即返回,但這不保證執行已經開始。這是因為返回的 Mono
必須被訂閱 (subscribe) 才能觸發處理。它可能在不同的 Thread
中發生。<3>
處的 delayElement
運算子隱式地使用了 Reactor 中的一個共享 Scheduler
(執行緒池的抽象)在指定的延遲後將訊號傳送到另一個 Thread
上。該訊號傳播到下游運算子,這讓我們能夠按順序先新增產品再通知商店。我們組裝的管道還有更多令人驚訝的方面,但我們先不要過於困惑。
問題在於,在 <4>
處,如果我們記錄日誌,我們無法真正確定呼叫將發生在哪個 Thread
上。像 flatMap
這樣的運算子可以引入它們自己的非同步性。
在通常情況下,當鏈被訂閱時,值開始被髮送。因此,我們可以在每次訂閱時恢復 ThreadLocal
的值。但這並非總是最好的主意。Subscription
可以在不同的 Thread
上非同步傳送。值也可以在不同的 Thread
上傳送。在背壓 (backpressure) 的情況下,訊號可以作為請求更多資料的結果傳送到執行請求的 Thread
上,而不是資料 Publisher
使用的 Thread
上。有很多活動部件和奇怪之處需要考慮!要了解更多關於 Reactor 中的執行緒和非同步執行的資訊,請查閱我們之前系列博文的另一部分。
Project Reactor 引入了一種與函數語言程式設計良好契合的機制,用於傳輸上下文元資料 (contextual metadata)。它簡稱為 Context
。儘管幕後會發生 Thread 切換,但它始終附著在響應式鏈上。
正如我們所見,Project Reactor 允許宣告式地指定意圖,同時保持與併發無關。它確實在必要時提供了控制併發的手段,透過使用專用的運算子或配置引數(例如 publishOn
, subscribeOn
或 flatMap
的高階引數),但該級別的控制已從核心處理邏輯中抽象出來。
我們之前提到了副作用。我們如何擺脫這些副作用,同時仍然能夠傳輸上下文元資料 (contextual metadata)?
為了與函數語言程式設計很好地配合,Context
繫結到 Subscriber
,即 Publisher
發出的訊號的消費者。訂閱後,Subscriber
對組裝管道中的所有前置運算子可見。當我們為一個 Subscriber
例項關聯一個不可變的類似 Map
的資料結構時,它允許在響應式管道的各個部分附加和檢索上下文資訊。
透過控制影響以及在響應式鏈的各個步驟之間提供繼承的手段,Reactor Context
是一個無副作用的概念,可用於為處理提供元資訊。“這正是我們需要關聯請求的方式!”
讓我們重寫我們的應用程式,使用 Reactor Context
代替 ThreadLocal
。
首先,我們需要將關聯識別符號作為日誌方法的顯式引數
static void log(String message, long correlationId) {
String threadName = Thread.currentThread().getName();
String threadNameTail = threadName.substring(
Math.max(0, threadName.length() - 10));
System.out.printf("[%10s][%20s] %s%n",
threadNameTail, correlationId, message);
}
我們的操作如下
Mono<Void> addProduct(String productName) {
return Mono.deferContextual(ctx -> {
log("Adding product: " + productName, ctx.get("CORRELATION_ID"));
return Mono.empty(); // Assume we’re actually storing the product
});
}
Mono<Boolean> notifyShop(String productName) {
return Mono.deferContextual(ctx -> {
log("Notifying shop about: " + productName,
ctx.get("CORRELATION_ID"));
return Mono.just(true);
});
}
有趣的是我們如何提供關聯識別符號。我們使用一個特殊的運算子,Mono.deferContextual
,它可以訪問 Context
。我們從 ContextView
(一個簡化的只讀 Context
版本)中提取關聯識別符號,然後返回一個實際的 Mono
供呼叫者訂閱。
我們的處理器看起來像這樣
Mono<Void> handleRequest() {
long correlationId = correlationId();
log("Assembling the chain", correlationId);
Mono.just("test-product")
.delayElement(Duration.ofMillis(1))
.flatMap(product ->
Flux.concat(addProduct(product), notifyShop(product))
.then())
.contextWrite(Context.of("CORRELATION_ID", correlationId));
訂閱後,輸出正如預期
[ main][ 6328001264807824115] Assembling the chain
[parallel-1][ 6328001264807824115] Adding product: test-product
[parallel-1][ 6328001264807824115] Notifying shop about: test-product
資訊流的反轉非常明顯。就像任何響應式鏈一樣,我們透過組裝一系列運算子來定義處理流程。一旦我們(或者實際上是伺服器)訂閱此鏈,資訊就會從下游運算子流向上游運算子以啟動處理。之後,實際的資料訊號從上游傳送到下游——例如,“test-product”值會傳遞給 flatMap
運算子,然後傳遞給 concat
運算子,後者又將值提供給 addProduct
和 notifyShop
。由於這種邏輯流,我們在非常末端(使用 contextWrite
方法)寫入 Context
,就在任何 Subscriber
訂閱鏈之前。我們可以想象,Context
隨後會與 Subscriber
一起對上游運算子中的所有階段都可訪問。
無論響應式管道在執行使用者業務邏輯的過程中進行多少次執行緒跳轉,上下文都不會丟失。
您可以在我們的文件中閱讀更多關於 Reactor Context
的資訊。
不幸的是,我們不能指望第三方庫使用 Reactor Context
來提供可觀測性 (observability) 能力。傳播隱式元資訊的實際標準是 ThreadLocal
。像 SLF4J 這樣的庫使用命令式 (imperative) 風格,在 Java 社群中地位穩固。如果我們可以讓它們與響應式正規化一起工作,而不是期望它們去適應響應式正規化,那將是顯而易見的勝利。在下一部分中,我們將討論在 Spring Cloud Sleuth(一個可以與 Reactor 一起使用的追蹤庫)的響應式鏈中傳播 ThreadLocal
值的歷史和挑戰。