領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本文是探索基於 Java Functions 的全新 Spring Cloud Stream 應用程式的部落格系列的一部分。在本集中,我們將研究 Aggregator 函式及其與 Splitter 函式的關係。我們將看到如何自定義預設行為。我們還將探討配置共享訊息儲存對聚合器的重要性。
以下是本部落格系列的所有先前部分。
Aggregator 函式是 Spring Integration 中 [AggregatingMessageHandler](https://docs.springframework.tw/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的基礎,繼承了其大部分功能,並將常用的聚合器選項作為配置屬性公開。有關更多資訊,請參閱 AggregatorFunctionProperties(或下一節)。聚合器函式是完全響應式的,定義為 Function<Flux<Message<?>>, Flux<Message<?>>。這是因為聚合器邏輯不需要它立即產生回覆。相反,它將當前訊息儲存在訊息儲存中,與其他訊息分組以收集或歸約到某個結果,直到滿足釋放結果的條件為止。這樣,將入站訊息視為流(Flux)並透過聚合器將其合併為輸出流(也是 Flux)是很自然的。因此,我們只需要 subscribe() Aggregator 函式的結果即可啟動流。事實上,當我們在 Spring Cloud Stream 應用程式中使用這種響應式函式時,這正是自動發生的:框架為我們構建了來自輸入目標的郵件流,並在輸出目標上處理生成的郵件流。
通常,聚合器與 Splitter 結合使用,Splitter 將單個入站訊息轉換為多個出站訊息,包括一些序列詳細資訊頭。在對各個專案進行一些處理(轉換、豐富等)之後,我們新增一個聚合器以將這些專案重新組合成一條訊息。上述序列詳細資訊頭用作預設的相關性和釋放策略,以將訊息分組儲存並決定何時以及如何組合和生成單個訊息。透過函式組合來構建這種處理邏輯是很自然的,我們稍後將討論這一點。但現在,讓我們(為簡單起見)想象一下,我們有一些資料想要合併成一條訊息!
首先,我們需要在我們的 Spring Boot 專案中新增聚合器函式的依賴項
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>aggregator-function</artifactId>
</dependency>
就是這樣!聚合器函式 bean 將被自動配置,足以讓我們將函式自動裝入程式碼並使用它
@Autowired
Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction;
...
Flux<Message<?>> input =
Flux.just(MessageBuilder.withPayload("2")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build(),
MessageBuilder.withPayload("1")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build());
Flux<Message<?>> output = this.aggregatorFunction.apply(input);
output.as(StepVerifier::create)
.assertNext((message) ->
assertThat(message)
.extracting(Message::getPayload)
.isInstanceOf(List.class)
.asList()
.hasSize(2)
.contains("1", "2"))
.thenCancel()
.verify();
此程式碼片段演示了兩個具有預定義序列詳細資訊的簡單訊息如何合併為單個 List<String>。所有繁重的工作都在 aggregatorFunction 及其預設的相關性和釋放策略中完成。這還包括預設的組組合器選項,它從已釋放組中的訊息構建一個有效負載列表。
我們將在下一節中回顧更復雜的用例和配置選項。
我們在應用程式中處理和操作的資料和資訊是應用程式最重要的部分。我們需要仔細考慮何時將資料保留在記憶體中,而不是保留在外部儲存中。在大多數情況下,我們將使用某個資料庫作為狀態儲存和/或訊息中介軟體,以防止生產者和消費者之間丟失資料。作為額外的好處,這使得叢集中的不同例項可以訪問共享儲存,以實現順暢的分散式計算。
聚合器函式正常工作不需要持久狀態儲存,但在生產環境中是必需的,以避免資料丟失並確保故障轉移。
聚合器函式的配置選項(AggregatorFunctionProperties)非常直接,並且與 [AggregatingMessageHandler](https://docs.springframework.tw/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的高階選項完全一致。它們如下:
correlation - 用於從入站訊息確定相關鍵(組 ID)的 SpEL 表示式。Such an expression (if provided) builds an ExpressionEvaluatingCorrelationStrategy for the underlying AggregatingMessageHandler. By default (when not provided), the AggregatingMessageHandler uses a HeaderAttributeCorrelationStrategy which is based on the IntegrationMessageHeaderAccessor.CORRELATION_ID - a header which can be populated upstream by the splitter, or PublishSubscribeChannel or recipient-list router.
release - 用於確定是否應釋放已儲存的訊息組並將其作為輸出訊息發出的 SpEL 表示式。Such an expression (if provided) builds an ExpressionEvaluatingReleaseStrategy for the underlying AggregatingMessageHandler. By default (when not provided), the AggregatingMessageHandler uses a SimpleSequenceSizeReleaseStrategy which is based on the stored group size and an IntegrationMessageHeaderAccessor.SEQUENCE_SIZE - a header which can be populated upstream by the splitter, or PublishSubscribeChannel or recipient-list router.
aggregation - 用於從已釋放的訊息組構建輸出結果的 SpEL 表示式。This expression (if provided) contributes to the ExpressionEvaluatingMessageGroupProcessor for the underlying AggregatingMessageHandler. By default (when not provided), the AggregatingMessageHandler uses a DefaultAggregatingMessageGroupProcessor which just combines payloads of messages in group into the List and merge their headers.
groupTimeout - 用於安排後臺任務以在沒有更多訊息到達組時使組過期的 SpEL 表示式。有關此選項的更多資訊,請參閱 Spring Integration。
messageStoreType - 來自 AggregatorFunctionProperties.MessageStoreType 常量類的值,用於指示使用哪個 MessageGroupStore 實現來儲存訊息,直到組被釋放。支援的 MessageGroupStore 包括:ConfigurableMongoDbMessageStore、RedisMessageStore、GemfireMessageStore、JdbcMessageStore 和 SimpleMessageStore(預設值,將訊息儲存在記憶體中)。這是最重要的選項,應根據目標環境和可用永續性儲存進行選擇。當聚合器函式作為叢集例項部署時(例如,透過 Spring Cloud Data Flow 作為 aggregator-processor 的一部分使用),共享狀態時,您可以在一個例項上將訊息傳送到聚合器,但可以在另一個例項上釋放它們。這樣,應用程式崩潰時就不會丟失訊息。MessageGroupStore 實現的依賴項打包在最終的函式 uber jar 中,並根據這些選項進行自動配置。唯一不同的是 JDBC,我們必須根據目標環境的要求提供適當的驅動程式。有關 MessageGroupStore 抽象的更多資訊,請參閱 Spring Integration System Management 和上一篇部落格中如何 提供 JDBC 驅動程式。所有這些永續性儲存的配置選項與 Spring Boot 為我們自動配置它們所提供的選項相同。
messageStoreEntity - 此選項僅適用於某些 MessageGroupStore 實現:它指的是 Gemfire/Geode 的客戶端區域;JDBC 的表字首;MongoDB 的集合名稱。對於其他實現,它將被忽略。
有關這些元件的更多資訊,請參閱 Spring Integration 和 Stream Applications 專案中的相應函式實現(如果有)。
因此,如果我們想執行一個聚合器函式(作為獨立函式、Spring Cloud Stream 處理器 或作為 Spring Cloud Data Flow 流定義的一部分),並帶有某些自定義屬性,並且針對共享的 MongoDB 儲存,我們可以這樣宣告:
java -jar aggregator-processor-kafka-3.0.0-SNAPSHOT.jar --aggregator.correlation=T(Thread).currentThread().id --aggregator.release=!messages.?[payload == 'bar'].empty --aggregator.aggregation=#this.?[payload == 'foo'].![payload] --aggregator.messageStoreType=mongodb --aggregator.message-store-entity=aggregatorTest --spring.data.mongodb.uri=mongodb:///test
其中這些屬性的值是:
aggregator.correlation - 消費者執行緒 ID 作為訊息分組的鍵;
aggregator.release - 一個針對訊息組的 SpEL 表示式,只有當 bar 有效負載到達時才釋放它;
aggregator.aggregation - 一個 SpEL 表示式,用於選擇和投影訊息組集合,其中只有有效負載為 foo 的訊息才會被合併到最終結果中;
aggregator.messageStoreType - 使用 MongoDb MessageGroupStore 實現;
aggregator.message-store-entity - MongoDb 資料庫中的集合名稱;
spring.data.mongodb.uri - MongoDb 資料庫連線。
即使我們將此函式與其他函式組合到自定義 Spring Boot 應用程式中,相同的配置屬性集也保持不變。有關更多資訊,請參閱下一節。
在生產解決方案中,Aggregator 函式本身可能沒有意義。當然,在大多數情況下,它與其他上游和下游函式結合使用。如前所述,通常將聚合器與 splitter 進行預處理組合。可以使用 Java API 透過 Function.andThan() 和 Function.compose() 以程式設計方式組合這些函式,但由於這些方法對型別非常嚴格,我們需要進行一些中間轉換才能滿足函式的輸入和輸出。藉助 Spring Cloud Function 庫,我們可以繞過編寫各種轉換的繁重程式設計工作,同時保留所需的序列詳細資訊訊息頭。我們寧願依賴框架中的型別推斷和開箱即用的轉換功能,使我們的組合儘可能簡單。
假設我們有一個如下所示的輸入 JSON:
{
"store": {
"book": [
{
"category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{
"category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{
"category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{
"category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
]
}
}
我們的任務是將書名提供為單個逗號分隔的字串。
我們只需要將三個開箱即用的函式組合成一個 Spring Cloud Function(或 Stream)應用程式。這些函式的依賴項是:splitter-function、spel-function 和我們的 aggregator-function。此類應用程式的配置屬性可能如下所示:
spring.cloud.function.definition=splitterFunction|spelFunction|aggregatorFunction splitter.expression=#jsonPath(payload,'$.store.book') spel.function.expression=title aggregator.aggregation=T(org.springframework.util.StringUtils).collectionToCommaDelimitedString(#this.![payload])
我們可以在 Spring Cloud Data Flow 中使用類似的流定義和配置。唯一的區別是函式之間的訊息將在繫結器上傳輸,使用預先構建的處理器應用程式。您實際上可以在 Mongo DB source 之類的內容中使用這種組合。在使用 Spring Cloud Data Flow 時需要注意的另一件事是,Aggregator 函式是型別無關的,並且使用具有 byte[] 有效負載的訊息。如果您打算對有效負載執行一些複雜的邏輯,如上面的表示式所示,您可能需要將此函式與一個上游函式組合,將 byte[] 有效負載轉換為域物件或其他相容型別,如 HashMap。如果有效負載是 JSON 表示形式,則始終可以使用我們在上面為 splitter 表示式顯示的 #jsonPath() SpEL 函式進行訪問。
有關 函式組合 的更多資訊,請參閱本系列之前的部落格文章之一。
這篇部落格詳細介紹了 Aggregator Function 的細節及其在 Spring Cloud Stream Aggregator Processor 中的使用。我們還研究瞭如何使用此函式的配置屬性。然後,我們深入探討了在獨立應用程式中使用聚合器的幾種變體,並在此過程中探索了各種功能。最後,我們看到了如何輕鬆地在訊息之間切換持久狀態聚合器中的 MessageGroupStore 實現。
本系列將繼續。在接下來的幾周內,我們將介紹更多函式和應用程式。