快人一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多如果您聽過 Oleg Zhurakousky 在 Spring One Platform 2019 大會上關於 Spring Cloud Stream & Functions 的演講,或者閱讀過他最近關於 Simplified Spring Cloud Stream 和 Functional Spring Cloud Stream 的部落格文章,您可能會想問:“等等!Spring Integration 支援怎麼了?我現在如何處理我的 @ServiceActivator
或 IntegrationFlow
?我過去常常將 Sink.input()
作為通道來使用一些 Spring Integration 邏輯消費 binder 目標!”正如 Oleg 在他的部落格文章中提到的,使用現有的 @EnableBinding
等方式仍然可能,但我們正逐漸遠離這種模型,那麼在函式式 Spring Cloud Stream 的世界中,我們如何才能繼續受益於 Spring Integration 的所有特性呢?
在這篇部落格文章中,我將圍繞 Spring Integration 進一步闡述 Spring Cloud Stream 的函式式特性,以及它在現代基於函式的流中的重要性!
是的,我們確實可以構建一個簡單的 Function
橋,它可以呼叫 MessageChannel.send()
,但我們也可以使用 Spring Integration 中的 Messaging Gateway 抽象來實現,如下所示:
@MessagingGateway(defaultRequestChannel = "myIntegrationServiceChannel")
public interface MessageFunction
extends Function<Message<InputData>, Message<OutputData>> { }
鑑於生成的 bean 是 java.util.function.Function
的擴充套件,它是一個完全有效的 Spring Cloud Function 和 Spring Cloud Stream 繫結候選。Spring Cloud Stream 利用其泛型輸入/輸出引數型別來執行適當的負載轉換。此外,headers 從 binder 傳遞到下游 integration flow 並返回。這很好,但我們仍然需要了解通道並提供一些 SI 特定的註解來連線這種 gateway 與我們的 flow (樣板程式碼)。
藉助 Spring Integration 的 Java DSL,我們可以進一步減少樣板程式碼,同時獲得使用函式式 Spring Cloud Stream 的優勢。我們需要的是與 gateway
類似的方法,但採用 DSL 風格。Oleg 部落格文章中的 uppercase
示例如果使用 Spring Integration 將會如下所示:
@SpringBootApplication
public class SampleApplication {
@Bean
public IntegrationFlow uppercaseFlow() {
return IntegrationFlows.from(Function.class,
gateway -> gateway.beanName("uppercase"))
.<String, String>transform(String::toUpperCase)
.get();
}
}
雖然使用 Spring Integration 實現大小寫轉換用例可能顯得有些簡單,但想象一下,我們需要執行一些複雜的邏輯,比如 split
(拆分)、帶有並行呼叫外部服務的 scatter-gather
(分散-聚集),然後 aggregate
(聚合)、進行審計,最後才從我們的函式返回結果到輸出目標。所有這些以及更多功能都可以使用 Spring Integration、其 EIP 支援、Java DSL 抽象以及當然還有前面提到的函式包裝器來實現。
java.util.function.Consumer
和 java.util.function.Supplier
介面也可以以類似的方式使用,根據它們的契約在 gateway 代理周圍應用適當的邏輯。
您可以在 Spring Integration 的參考手冊中找到有關函式支援的更多資訊。
我們之前展示的所有內容都涉及命令式函式,它們是按事件觸發的。而響應式函式則是透過將整個事件流作為一個 Flux
傳遞給函式來一次性觸發的。Spring Integration 中的 Reactive Streams 支援幫助您編寫響應式 Spring Integration flow,這些 flow 可以作為 Spring Cloud Stream 中的函式暴露出來。
以下示例展示瞭如何圍繞響應式 Spring Integration 呼叫構建一個響應式函式包裝器:
public interface FluxFunction extends Function<Flux<String>, Flux<String>> { }
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(
ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows.from(FluxFunction.class,
gateway -> gateway.beanName("uppercase"))
.handle(RSockets.outboundGateway("/uppercase")
.command(RSocketOutboundGateway.Command.requestStreamOrChannel)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
雖然透過 RSocket 實現 uppercase
仍然顯得簡單,但此示例的目的是讓您瞭解如何使用 Spring Integration 處理更復雜的用例。
在這裡,我們獲得一個傳遞給函式的 Flux
,並將其傳播到 RSocket requester,用於 request channel
互動模型。結果 Flux
透過 Spring Integration 內部的 replyChannel
header 傳遞迴函式返回。
另一個響應式示例可能類似於將資料從推送模型轉移到拉取模型。換句話說,將事件流表示為 Supplier
:
@Bean
public Publisher<Message<byte[]>> httpSupplierFlow() {
return IntegrationFlows.from(WebFlux.inboundChannelAdapter("/requests"))
.toReactivePublisher();
}
@Bean
public Supplier<Flux<Message<byte[]>>> httpSupplier(
Publisher<Message<byte[]>> httpRequestPublisher) {
return () -> Flux.from(httpRequestPublisher);
}
這樣,傳入的 HTTP 請求會進入一個源 Flux
,供輸出 binder 目標下游拉取,同時遵守背壓(back-pressure)和其他 Reactive Streams 要求。
有關 Spring Integration 中 Reactive Streams 支援的更多資訊,請參閱參考手冊。
Spring Integration 仍然是 Spring Cloud Stream 微服務開發的重要組成部分。它的函式式支援使得屬於企業整合模式類別的複雜用例能夠作為 Java 函式暴露出來,從而在 Spring Cloud Stream 中提供一致的執行模型。事實上,透過使用這個基礎,Spring Cloud Stream App Starters 最終將被函式實現所取代。
請隨時提供任何反饋!
附註:對於那些對 Kotlin 感到期待的人,我想分享一個最近啟動的Spring Integration Kotlin DSL 專案。