Spring Cloud Stream - 以及 Spring Integration。

工程 | Artem Bilan | 2019年10月25日 | ...

如果你聽過 Oleg Zhurakousky 在 Spring One Platform 2019 上關於 Spring Cloud Stream & Functions 的演講,或者讀過他最近關於 簡化 Spring Cloud Stream函式式 Spring Cloud Stream 的博文,你可能會說:“等等!Spring Integration 支援怎麼了?我的 @ServiceActivatorIntegrationFlow 現在該怎麼辦?我以前把 Sink.input() 作為通道來消費繫結器目的地,並帶有 Spring Integration 邏輯!” 正如 Oleg 在他的博文中所提到的,這仍然可以透過現有的 @EnableBinding 等方式實現,但我們正在擺脫這種模式,那麼我們如何在函式式 Spring Cloud Stream 的世界中仍然受益於 Spring Integration 的所有功能呢?

在這篇博文中,我將結合 Spring Integration 及其在現代基於函式的流中的重要性,對 Spring Cloud Stream 的函式式特性進行闡述!

Spring Integration 作為一個函式?!

是的,我們確實可以建立一個簡單的 Function 橋接,它會呼叫 MessageChannel.send(),但我們也可以使用 Spring Integration 中的 Messaging Gateway 抽象來實現,如下所示

@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
                  extends Function<Message<InputData>, Message<OutputData>> { }

鑑於生成的 bean 是 java.util.function.Function 的擴充套件,它是一個完全有效的 Spring Cloud Function 和 Spring Cloud Stream 繫結候選物件。Spring Cloud Stream 會使用其泛型輸入/輸出引數型別在之前和之後執行適當的有效負載轉換。此外,頭資訊會從繫結器傳遞到下游整合流,然後再返回。這很好,但我們仍然需要了解通道並提供一些 SI 特定註解來將此類閘道器與我們的流連線起來(樣板程式碼)。

使用 Spring Integration 的 Java DSL,我們可以進一步消除更多的樣板程式碼,同時受益於使用函式式 Spring Cloud Stream。我們需要的是相同的 gateway 方法,但採用 DSL 風格。Oleg 博文中的 uppercase 示例使用 Spring Integration 將如下所示

@SpringBootApplication
public class SampleApplication  {

    @Bean
    public IntegrationFlow uppercaseFlow() {
        return IntegrationFlows.from(Function.class,
                             gateway -> gateway.beanName("uppercase"))
                   .<String, String>transform(String::toUpperCase)
                   .get();
    }
}

用 Spring Integration 實現大小寫轉換用例雖然很傻,但想象一下我們需要進行一些複雜邏輯,例如 splitscatter-gather(並行呼叫外部服務),然後 aggregate、進行一些審計,最後才將函式結果返回到輸出目的地。所有這些以及更多都可以透過 Spring Integration 及其 EIP 支援、Java DSL 抽象以及當然是前面提到的函式包裝器來實現。

java.util.function.Consumerjava.util.function.Supplier 介面可以以類似的方式使用,在其周圍的閘道器代理中根據其契約提供適當的邏輯。

您可以在 Spring Integration 參考手冊中檢視有關函式支援的更多資訊。

響應式流如何?

我們之前展示的所有內容都與命令式函式有關,它們是按事件觸發的。響應式函式僅透過將整個事件流作為 Flux 傳遞給函式來觸發一次。Spring Integration 中的響應式流支援可幫助您編寫響應式 Spring Integration 流,這些流可以作為 Spring Cloud Stream 中的函式公開。

以下示例展示瞭如何圍繞響應式 Spring Integration 呼叫構建響應式函式包裝器

public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
                           ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows.from(FluxFunction.class,
                        gateway -> gateway.beanName("uppercase"))
            .handle(RSockets.outboundGateway("/uppercase")
                    .command(RSocketOutboundGateway.Command.requestStreamOrChannel)
                    .expectedResponseType(String.class)
                    .clientRSocketConnector(clientRSocketConnector))
            .get();
}

雖然在 RSocket 上實現 uppercase 仍然很愚蠢,但此示例的目標是讓您瞭解如何使用 Spring Integration 解決更復雜的用例。

在這裡,我們將一個 Flux 傳遞給一個函式,並將其傳播到 RSocket 請求者以進行 request channel 互動模型。結果 Flux 透過 Spring Integration 內部的 replyChannel 頭傳回函式返回。

另一個響應式示例可能類似於將資料從模型傳輸到模型。換句話說,將事件流表示為 Supplier

@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
    return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
            .toReactivePublisher();
}

@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
                    Publisher<Message<byte[]>> httpRequestPublisher) {
    return () -> Flux.from(httpRequestPublisher);
}

這樣,傳入的 HTTP 請求會進入一個源 Flux,供輸出繫結器目的地向下遊拉取,同時遵守背壓和其他響應式流要求。

有關 Spring Integration 中響應式流支援的更多資訊,請參閱參考手冊

總結

Spring Integration 仍然是 Spring Cloud Stream 微服務開發的重要組成部分。它的函式式支援允許將屬於企業整合模式類別的複雜用例公開為 Java 函式,從而在 Spring Cloud Stream 中提供一致的執行模型。事實上,透過使用這個基礎,Spring Cloud Stream App Starters 最終將被函式實現所取代。

請隨時提供任何反饋!

附言:對於那些不耐煩的 Kotlin 使用者,我想分享一個最近啟動的 Spring Integration Kotlin DSL 專案。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有