案例研究:聚合器函式和處理器

工程 | Artem Bilan | 2020年10月26日 | ...

本文是部落格系列文章的一部分,該系列探討了基於 Java 函式重新設計的 Spring Cloud Stream 應用。在本篇文章中,我們將深入研究聚合器函式及其與拆分器函式的關係。我們將瞭解如何自定義預設行為。我們還將探討為聚合器配置共享訊息儲存的重要性。

以下是本部落格系列的所有先前部分。

聚合器函式

聚合器函式是 Spring Integration 中 [AggregatingMessageHandler](https://docs.springframework.tw/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的基礎,繼承了其大部分功能,並將常用的聚合器選項暴露為配置屬性。有關更多資訊,請參閱 AggregatorFunctionProperties(或下一節)。聚合器函式是完全響應式的,被定義為 Function<Flux<Message<?>>, Flux<Message<?>>。這是因為聚合器邏輯不需要立即生成回覆。相反,它將當前訊息儲存在訊息儲存中,與其他訊息分組在一起,以便收集或歸約為某個結果,直到滿足釋放結果所需的條件。這樣一來,將入站訊息作為流(Flux)處理,並透過聚合器將它們組合成出站流(也是一個 Flux),感覺非常自然。因此,我們只需要對聚合器函式的結果進行 subscribe() 即可啟動流程。事實上,當我們在 Spring Cloud Stream 應用中使用此類響應式函式時,這正是自動發生的:框架會為我們構建一個來自輸入目標的 Flux 訊息流,並在輸出目標上處理生成的 Flux

用法

聚合器通常與 Splitter(拆分器) 結合使用,拆分器將單個入站訊息轉換為多個出站訊息,包括一些序列詳細資訊頭。經過一些單獨的專案處理(轉換、豐富等)後,我們新增一個聚合器將這些專案重新組合成單個訊息。上述的序列詳細資訊頭被用作預設的關聯和釋放策略,用於將訊息儲存在組中,並決定何時以及如何組合並生成單個訊息。使用函式組合來構建這種處理邏輯感覺很自然,我們稍後會討論。但現在,讓我們(為簡單起見)假設我們有一些想要組合成單個訊息的資料!

首先,我們需要在 Spring Boot 專案中為聚合器函式新增依賴項

<dependency>
    <groupId>org.springframework.cloud.fn</groupId>
    <artifactId>aggregator-function</artifactId>
</dependency>

就這麼簡單!聚合器函式 bean 會被自動配置,以便我們可以將其自動注入到程式碼中使用。

@Autowired
Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction;
...
Flux<Message<?>> input =
        Flux.just(MessageBuilder.withPayload("2")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build(),
         MessageBuilder.withPayload("1")
                .setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "some_mey")
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
                .setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
                .build());

Flux<Message<?>> output = this.aggregatorFunction.apply(input);

output.as(StepVerifier::create)
            .assertNext((message) ->
                    assertThat(message)
                            .extracting(Message::getPayload)
                            .isInstanceOf(List.class)
                            .asList()
                            .hasSize(2)
                            .contains("1", "2"))
            .thenCancel()
            .verify();

在這段程式碼片段中,我們演示瞭如何將兩個具有預定義序列詳細資訊的簡單訊息組合成一個 List<String>。所有繁重的工作都在 aggregatorFunction 及其預設的關聯和釋放策略中完成。這也包括預設的組組合器選項,它從釋放組中的訊息構建一個 payload 列表。

更復雜的用例和配置選項將在下一節中介紹。

持久化狀態儲存

我們在應用中處理和操作的資料和資訊確實是應用最重要的部分。關於何時將資料儲存在記憶體中而不是某些外部儲存中,我們需要三思。在大多數情況下,我們會使用某些資料庫作為狀態儲存和/或訊息中介軟體,以防止生產者和消費者之間的資料丟失。作為額外的好處,這使得叢集中的不同例項能夠訪問共享儲存,以實現流暢的分散式計算。

聚合器函式並非必須使用持久化狀態儲存才能工作,但在生產環境中,它是必要的,以避免資料丟失並確保故障轉移。

配置

聚合器函式的配置選項(AggregatorFunctionProperties)非常直觀,並且與 [AggregatingMessageHandler](https://docs.springframework.tw/spring-integration/docs/current/reference/html/message-routing.html#aggregator) 的高階選項完全一致。它們如下:

  • correlation - 一個 SpEL 表示式,用於從入站訊息中確定關聯鍵(組 ID)。如果提供此表示式,它將為底層的 AggregatingMessageHandler 構建一個 ExpressionEvaluatingCorrelationStrategy。預設情況下(未提供時),AggregatingMessageHandler 使用 HeaderAttributeCorrelationStrategy,它基於 IntegrationMessageHeaderAccessor.CORRELATION_ID - 這個頭可以由上游的拆分器、PublishSubscribeChannel 或 recipient-list router 填充。

  • release - 一個 SpEL 表示式,用於確定儲存的訊息組是否應該被釋放並作為輸出訊息發出。如果提供此表示式,它將為底層的 AggregatingMessageHandler 構建一個 ExpressionEvaluatingReleaseStrategy。預設情況下(未提供時),AggregatingMessageHandler 使用 SimpleSequenceSizeReleaseStrategy,它基於儲存的組大小和 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE - 這個頭可以由上游的拆分器、PublishSubscribeChannel 或 recipient-list router 填充。

  • aggregation - 一個 SpEL 表示式,用於從已釋放的訊息組構建輸出結果。如果提供此表示式,它將對底層的 AggregatingMessageHandlerExpressionEvaluatingMessageGroupProcessor 起作用。預設情況下(未提供時),AggregatingMessageHandler 使用 DefaultAggregatingMessageGroupProcessor,它只是將組中訊息的 payload 組合成一個 List 併合並它們的頭。

  • groupTimeout - 一個 SpEL 表示式,用於安排一個後臺任務,當沒有更多訊息到達組時使其過期。有關此選項的更多資訊,請參閱 Spring Integration

  • messageStoreType - 來自 AggregatorFunctionProperties.MessageStoreType 常量類的一個值,用於指示使用哪種 MessageGroupStore 實現來儲存訊息,直到組被釋放。支援的 MessageGroupStore 包括:ConfigurableMongoDbMessageStoreRedisMessageStoreGemfireMessageStoreJdbcMessageStore 和預設的 SimpleMessageStore,它將訊息儲存在記憶體中。這是最重要的選項,應根據目標環境和可用的持久化儲存來選擇。當聚合器函式作為叢集例項部署時(例如,在 Spring Cloud Data Flow 中作為 aggregator-processor 的一部分使用時),它的價值更大,因為透過共享狀態,你可以在一個例項上將訊息傳送到聚合器,但它們可以在另一個例項上被釋放。這樣,在應用崩潰時就不會丟失訊息。MessageGroupStore 實現的依賴項被打包到最終的函式 uber jar 中,並根據此選項進行自動配置。唯一的區別在於 JDBC,我們需要根據目標環境要求提供合適的驅動程式。有關 MessageGroupStore 抽象的更多資訊,請參閱 Spring Integration System Management 以及之前關於如何提供 JDBC 驅動程式的部落格文章。這些持久化儲存的所有配置選項與 Spring Boot 為我們提供的自動配置方式相同。

  • messageStoreEntity - 此選項僅特定於某些 MessageGroupStore 實現:它指的是 Gemfire/Geode 的客戶端區域;JDBC 的表字首;MongoDB 的集合名稱。對於其他實現,此選項將被忽略。

有關這些元件的更多資訊,請參閱 Spring Integration 以及 Stream Applications 專案中相應的函式實現(如果有)。

因此,如果我們要執行一個聚合器函式(作為獨立應用、Spring Cloud Stream 的 processor(處理器) 或 Spring Cloud Data Flow 流定義的一部分),並使用一些自定義屬性並連線共享的 MongoDB 儲存,我們可以這樣宣告:

java -jar aggregator-processor-kafka-3.0.0-SNAPSHOT.jar --aggregator.correlation=T(Thread).currentThread().id --aggregator.release=!messages.?[payload == 'bar'].empty --aggregator.aggregation=#this.?[payload == 'foo'].![payload] --aggregator.messageStoreType=mongodb --aggregator.message-store-entity=aggregatorTest --spring.data.mongodb.uri=mongodb:///test

其中這些屬性的值如下:

  • aggregator.correlation - 消費者執行緒 ID,用作訊息分組的鍵;

  • aggregator.release - 一個針對訊息組的 SpEL 表示式,僅當 bar payload 到達時才釋放該組;

  • aggregator.aggregation - 一個 SpEL 表示式,用於選擇和投影訊息組集合,其中只有帶有 foo payload 的訊息被組合到最終結果中;

  • aggregator.messageStoreType - 使用 MongoDb MessageGroupStore 實現;

  • aggregator.message-store-entity - MongoDb 資料庫中的集合名稱;

  • spring.data.mongodb.uri - MongoDb 資料庫連線。

即使我們在自定義 Spring Boot 應用中將此函式與其他函式組合,相同的配置屬性集也保持不變。有關更多資訊,請參閱下一節。

組合

聚合器函式本身在生產解決方案中可能沒有意義。當然,在大多數情況下,它會與其他上游和下游函式結合使用。如前所述,通常會將聚合器與 splitter(拆分器) 預處理組合。可以透過 Java API 使用 Function.andThen()Function.compose() 以程式設計方式組合這些函式,但是,由於這些方法對型別非常嚴格,我們需要進行一些中間轉換以滿足函式的輸入和輸出。有了 Spring Cloud Function 庫的支援,我們可以繞過編寫各種轉換的繁重程式設計工作,同時保留所需的序列詳細資訊訊息頭。我們寧願依賴框架中的型別推斷和開箱即用的轉換能力,並儘可能保持組合的簡單性。

假設我們有以下輸入 JSON:

{
  "store": {
    "book": [
      {
        "category": "reference",
        "author": "Nigel Rees",
        "title": "Sayings of the Century",
        "price": 8.95
      },
      {
        "category": "fiction",
        "author": "Evelyn Waugh",
        "title": "Sword of Honour",
        "price": 12.99
      },
      {
        "category": "fiction",
        "author": "Herman Melville",
        "title": "Moby Dick",
        "isbn": "0-553-21311-3",
        "price": 8.99
      },
      {
        "category": "fiction",
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings",
        "isbn": "0-395-19395-8",
        "price": 22.99
      }
    ]
  }
}

我們的任務是將書名以逗號分隔的單個字串形式提供。

我們只需要將三個開箱即用的函式組合成一個 Spring Cloud Function(或 Stream)應用。這些函式的依賴項是:splitter-functionspel-function 和我們的 aggregator-function。此類應用的配置屬性可能如下所示:

spring.cloud.function.definition=splitterFunction|spelFunction|aggregatorFunction splitter.expression=#jsonPath(payload,'$.store.book') spel.function.expression=title aggregator.aggregation=T(org.springframework.util.StringUtils).collectionToCommaDelimitedString(#this.![payload])

我們可以使用類似的流定義和 Spring Cloud Data Flow 配置。唯一的區別是,函式之間的訊息將透過 binder 傳輸,使用預構建的處理器應用。實際上,你可以在諸如 Mongo DB source(MongoDB 源) 之類的場景中使用這種組合。使用 Spring Cloud Data Flow 時還需要記住一點,聚合器函式是型別無關的,並接收帶有 byte[] payload 的訊息。如果你計劃對 payload 執行一些複雜的邏輯,如上述表示式所示,你可能需要將其與上游的某個函式組合,將 byte[] payload 轉換為域物件或其他相容型別,如 HashMap。如果 payload 是 JSON 表示形式,則始終可以使用我們上面為拆分器表示式展示的 #jsonPath() SpEL 函式來訪問它。

有關此係列先前部落格文章中關於函式組合的更多資訊,請參閱其中一篇。

結論

本文詳細介紹了聚合器 Function 以及它如何在 Spring Cloud Stream Aggregator Processor 中使用。我們還探討了如何使用此函式的配置屬性。然後,我們深入研究了在獨立應用中使用聚合器的幾種變體,並在此過程中探索了各種功能。最後,我們瞭解瞭如何在聚合器中輕鬆切換 MessageGroupStore 實現,以實現訊息間的持久化狀態。

敬請關注

本系列將繼續。在接下來的幾周內,我們將探討更多的函式和應用。

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

提升自我

VMware 提供培訓和認證,助您快速提升。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一次簡單訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部