Spring Cloud Stream - 與 Spring Integration。

工程 | Artem Bilan | 2019 年 10 月 25 日 | ...

如果您聽過 Oleg Zhurakousky 在 Spring One Platform 2019 大會上關於 Spring Cloud Stream & Functions 的演講,或者閱讀過他最近關於 Simplified Spring Cloud StreamFunctional Spring Cloud Stream 的部落格文章,您可能會想問:“等等!Spring Integration 支援怎麼了?我現在如何處理我的 @ServiceActivatorIntegrationFlow?我過去常常將 Sink.input() 作為通道來使用一些 Spring Integration 邏輯消費 binder 目標!”正如 Oleg 在他的部落格文章中提到的,使用現有的 @EnableBinding 等方式仍然可能,但我們正逐漸遠離這種模型,那麼在函式式 Spring Cloud Stream 的世界中,我們如何才能繼續受益於 Spring Integration 的所有特性呢?

在這篇部落格文章中,我將圍繞 Spring Integration 進一步闡述 Spring Cloud Stream 的函式式特性,以及它在現代基於函式的流中的重要性!

Spring Integration 作為函式?!

是的,我們確實可以構建一個簡單的 Function 橋,它可以呼叫 MessageChannel.send(),但我們也可以使用 Spring Integration 中的 Messaging Gateway 抽象來實現,如下所示:

@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
                  extends Function<Message<InputData>, Message<OutputData>> { }

鑑於生成的 bean 是 java.util.function.Function 的擴充套件,它是一個完全有效的 Spring Cloud Function 和 Spring Cloud Stream 繫結候選。Spring Cloud Stream 利用其泛型輸入/輸出引數型別來執行適當的負載轉換。此外,headers 從 binder 傳遞到下游 integration flow 並返回。這很好,但我們仍然需要了解通道並提供一些 SI 特定的註解來連線這種 gateway 與我們的 flow (樣板程式碼)。

藉助 Spring Integration 的 Java DSL,我們可以進一步減少樣板程式碼,同時獲得使用函式式 Spring Cloud Stream 的優勢。我們需要的是與 gateway 類似的方法,但採用 DSL 風格。Oleg 部落格文章中的 uppercase 示例如果使用 Spring Integration 將會如下所示:

@SpringBootApplication
public class SampleApplication  {

    @Bean
    public IntegrationFlow uppercaseFlow() {
        return IntegrationFlows.from(Function.class,
                             gateway -> gateway.beanName("uppercase"))
                   .<String, String>transform(String::toUpperCase)
                   .get();
    }
}

雖然使用 Spring Integration 實現大小寫轉換用例可能顯得有些簡單,但想象一下,我們需要執行一些複雜的邏輯,比如 split(拆分)、帶有並行呼叫外部服務的 scatter-gather(分散-聚集),然後 aggregate(聚合)、進行審計,最後才從我們的函式返回結果到輸出目標。所有這些以及更多功能都可以使用 Spring Integration、其 EIP 支援、Java DSL 抽象以及當然還有前面提到的函式包裝器來實現。

java.util.function.Consumerjava.util.function.Supplier 介面也可以以類似的方式使用,根據它們的契約在 gateway 代理周圍應用適當的邏輯。

您可以在 Spring Integration 的參考手冊中找到有關函式支援的更多資訊。

響應式流呢?

我們之前展示的所有內容都涉及命令式函式,它們是按事件觸發的。而響應式函式則是透過將整個事件流作為一個 Flux 傳遞給函式來一次性觸發的。Spring Integration 中的 Reactive Streams 支援幫助您編寫響應式 Spring Integration flow,這些 flow 可以作為 Spring Cloud Stream 中的函式暴露出來。

以下示例展示瞭如何圍繞響應式 Spring Integration 呼叫構建一個響應式函式包裝器:

public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
                           ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows.from(FluxFunction.class,
                        gateway -> gateway.beanName("uppercase"))
            .handle(RSockets.outboundGateway("/uppercase")
                    .command(RSocketOutboundGateway.Command.requestStreamOrChannel)
                    .expectedResponseType(String.class)
                    .clientRSocketConnector(clientRSocketConnector))
            .get();
}

雖然透過 RSocket 實現 uppercase 仍然顯得簡單,但此示例的目的是讓您瞭解如何使用 Spring Integration 處理更復雜的用例。

在這裡,我們獲得一個傳遞給函式的 Flux,並將其傳播到 RSocket requester,用於 request channel 互動模型。結果 Flux 透過 Spring Integration 內部的 replyChannel header 傳遞迴函式返回。

另一個響應式示例可能類似於將資料從推送模型轉移到拉取模型。換句話說,將事件流表示為 Supplier

@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
    return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
            .toReactivePublisher();
}

@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
                    Publisher<Message<byte[]>> httpRequestPublisher) {
    return () -> Flux.from(httpRequestPublisher);
}

這樣,傳入的 HTTP 請求會進入一個源 Flux,供輸出 binder 目標下游拉取,同時遵守背壓(back-pressure)和其他 Reactive Streams 要求。

有關 Spring Integration 中 Reactive Streams 支援的更多資訊,請參閱參考手冊

總結

Spring Integration 仍然是 Spring Cloud Stream 微服務開發的重要組成部分。它的函式式支援使得屬於企業整合模式類別的複雜用例能夠作為 Java 函式暴露出來,從而在 Spring Cloud Stream 中提供一致的執行模型。事實上,透過使用這個基礎,Spring Cloud Stream App Starters 最終將被函式實現所取代。

請隨時提供任何反饋!

附註:對於那些對 Kotlin 感到期待的人,我想分享一個最近啟動的Spring Integration Kotlin DSL 專案。

獲取 Spring 電子報

透過 Spring 電子報保持聯絡

訂閱

快人一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲取支援

Tanzu Spring 透過一份簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群中的所有即將到來的活動。

檢視全部