領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多如果你聽過 Oleg Zhurakousky 在 Spring One Platform 2019 上關於 Spring Cloud Stream & Functions 的演講,或者讀過他最近關於 簡化 Spring Cloud Stream 和 函式式 Spring Cloud Stream 的博文,你可能會說:“等等!Spring Integration 支援怎麼了?我的 @ServiceActivator 或 IntegrationFlow 現在該怎麼辦?我以前把 Sink.input() 作為通道來消費繫結器目的地,並帶有 Spring Integration 邏輯!” 正如 Oleg 在他的博文中所提到的,這仍然可以透過現有的 @EnableBinding 等方式實現,但我們正在擺脫這種模式,那麼我們如何在函式式 Spring Cloud Stream 的世界中仍然受益於 Spring Integration 的所有功能呢?
在這篇博文中,我將結合 Spring Integration 及其在現代基於函式的流中的重要性,對 Spring Cloud Stream 的函式式特性進行闡述!
是的,我們確實可以建立一個簡單的 Function 橋接,它會呼叫 MessageChannel.send(),但我們也可以使用 Spring Integration 中的 Messaging Gateway 抽象來實現,如下所示
@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
extends Function<Message<InputData>, Message<OutputData>> { }
鑑於生成的 bean 是 java.util.function.Function 的擴充套件,它是一個完全有效的 Spring Cloud Function 和 Spring Cloud Stream 繫結候選物件。Spring Cloud Stream 會使用其泛型輸入/輸出引數型別在之前和之後執行適當的有效負載轉換。此外,頭資訊會從繫結器傳遞到下游整合流,然後再返回。這很好,但我們仍然需要了解通道並提供一些 SI 特定註解來將此類閘道器與我們的流連線起來(樣板程式碼)。
使用 Spring Integration 的 Java DSL,我們可以進一步消除更多的樣板程式碼,同時受益於使用函式式 Spring Cloud Stream。我們需要的是相同的 gateway 方法,但採用 DSL 風格。Oleg 博文中的 uppercase 示例使用 Spring Integration 將如下所示
@SpringBootApplication
public class SampleApplication {
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(Function.class,
gateway -> gateway.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.get();
}
}
用 Spring Integration 實現大小寫轉換用例雖然很傻,但想象一下我們需要進行一些複雜邏輯,例如 split、scatter-gather(並行呼叫外部服務),然後 aggregate、進行一些審計,最後才將函式結果返回到輸出目的地。所有這些以及更多都可以透過 Spring Integration 及其 EIP 支援、Java DSL 抽象以及當然是前面提到的函式包裝器來實現。
java.util.function.Consumer 和 java.util.function.Supplier 介面可以以類似的方式使用,在其周圍的閘道器代理中根據其契約提供適當的邏輯。
您可以在 Spring Integration 參考手冊中檢視有關函式支援的更多資訊。
我們之前展示的所有內容都與命令式函式有關,它們是按事件觸發的。響應式函式僅透過將整個事件流作為 Flux 傳遞給函式來觸發一次。Spring Integration 中的響應式流支援可幫助您編寫響應式 Spring Integration 流,這些流可以作為 Spring Cloud Stream 中的函式公開。
以下示例展示瞭如何圍繞響應式 Spring Integration 呼叫構建響應式函式包裝器
public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows.from(FluxFunction.class,
gateway -> gateway.beanName("uppercase"))
.handle(RSockets.outboundGateway("/uppercase")
.command(RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
雖然在 RSocket 上實現 uppercase 仍然很愚蠢,但此示例的目標是讓您瞭解如何使用 Spring Integration 解決更復雜的用例。
在這裡,我們將一個 Flux 傳遞給一個函式,並將其傳播到 RSocket 請求者以進行 request channel 互動模型。結果 Flux 透過 Spring Integration 內部的 replyChannel 頭傳回函式返回。
另一個響應式示例可能類似於將資料從推模型傳輸到拉模型。換句話說,將事件流表示為 Supplier
@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
.toReactivePublisher();
}
@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
Publisher<Message<byte[]>> httpRequestPublisher) {
return () -> Flux.from(httpRequestPublisher);
}
這樣,傳入的 HTTP 請求會進入一個源 Flux,供輸出繫結器目的地向下遊拉取,同時遵守背壓和其他響應式流要求。
有關 Spring Integration 中響應式流支援的更多資訊,請參閱參考手冊。
Spring Integration 仍然是 Spring Cloud Stream 微服務開發的重要組成部分。它的函式式支援允許將屬於企業整合模式類別的複雜用例公開為 Java 函式,從而在 Spring Cloud Stream 中提供一致的執行模型。事實上,透過使用這個基礎,Spring Cloud Stream App Starters 最終將被函式實現所取代。
請隨時提供任何反饋!
附言:對於那些不耐煩的 Kotlin 使用者,我想分享一個最近啟動的 Spring Integration Kotlin DSL 專案。