先行一步
VMware 提供培訓和認證,助您快速提升。
瞭解更多Spring Cloud Stream 最近新增了一項功能,可以將函式定義組合到現有的 Spring Cloud Stream 應用程式中。在本篇部落格中,我們將瞭解 Spring Cloud Data Flow 如何利用此功能在流式管道中組合函式。
在 Spring Cloud Data Flow 中,流式資料管道由 Spring Cloud Stream 應用程式組成。開發人員可以選擇並使用開箱即用的流應用程式,這些應用程式涵蓋了許多常見用例。開發人員還可以擴充套件這些開箱即用的應用程式,或使用 Spring Cloud Stream 框架建立自定義應用程式。
Spring Cloud Stream 2.1.0 GA 版本的釋出包含了基於Spring Cloud Function的程式設計模型的整合,該模型允許將業務邏輯表示為 java.util.Function
、java.util.Consumer
和 java.util.Supplier
,它們分別代表 Processor
、Sink
和 Source
的角色。鑑於這種靈活性,Spring Cloud Stream 框架現在支援一種簡單但強大的函式組合方法。在這種上下文中,組合可以是將源和處理器組合成一個應用程式:一個“新源”。或者,它可以是將處理器 + 接收器組合成一個應用程式:“一個新接收器”。這種靈活性為流應用程式開發人員開闢了有趣的新機會。
讓我們看看如何使用三個應用程式建立一個管道來執行簡單的轉換,然後再看看如何使用兩個利用函式組合的應用程式來實現同樣的管道。
對於第一個流,
我們將使用開箱即用的 http-source
、transform-processor
和 log-sink
應用程式。
第一步,啟動 Spring Cloud Data Flow local
伺服器
java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar
現在,啟動 Spring Cloud Data Flow shell
java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar
現在讓我們註冊使用 RabbitMQ 繫結的 HTTP 源、轉換器處理器和日誌接收器應用程式
dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar
現在我們可以建立一個沒有函式組合的簡單流
dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"
然後我們可以部署這個流
dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://:9000"
POST (text/plain) https://:9000 friend
202 ACCEPTED
您可以在 log
應用程式中看到以下日誌訊息
[sformer.hello-1] log-sink : Hello FRIEND
在這個流中,我們將 http (source)、transformer (processor) 和 log (sink) 應用程式作為獨立應用程式部署在目標平臺(本例中是 local
)上。對於某些用例,例如簡單的負載轉換邏輯,我們可能希望將 Processor
應用程式與 Source
或 Sink
應用程式結合使用。例如,在源輸出資料中遮蔽某些特定使用者欄位的轉換場景不一定需要作為單獨的獨立應用程式部署。相反,它可以在 Source 或 Sink 應用程式中進行組合。
要將 Processor 函式組合到 Source 或 Sink 應用程式中,我們使用 Spring Cloud Stream 的函式組合支援。
Spring Cloud Stream 中的函式組合支援基於 Spring Cloud Function 的能力,即允許將 java.util.Supplier
、java.util.Consumer
和 java.util.Function
註冊為 Spring @Bean
定義。這些函式 @Bean
定義在執行時可用於組合。
Spring Cloud Stream 引入了一個名為 spring.cloud.stream.function.definition
的新屬性,該屬性對應於 Spring Cloud Function 中的函式定義 DSL。設定此屬性後,所需的函式 bean 會在執行時自動連結。
函式組合的發生方式如下:
當 Spring Cloud Stream 應用程式型別為 Source
時,組合函式應用於源的 output
之後。
當 Spring Cloud Stream 應用程式型別為 Sink
時,組合函式應用於接收器的 input
之前。
這使得可以將函式(在 Spring Cloud Function DSL 中定義)組合到現有的 Spring Cloud Stream 應用程式中,然後由 Spring Cloud Data Flow 在流式資料管道中進行編排。
讓我們建立並部署一個流,將前面示例中的轉換器表示式組合到 Source
應用程式本身中。轉換邏輯透過使用兩個 java.util.Function
實現來完成。
我們將建立一個新的源應用程式,稱之為 http-transformer
,它擴充套件了開箱即用的 http source 應用程式。新源應用程式的原始碼可以在這裡找到。
http-transformer
應用程式包含 upper
和 concat
函式 bean,定義如下:
@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
public static void main(String[] args) {
SpringApplication.run(HttpSourceRabbitApplication.class, args);
}
}
克隆 github 倉庫後,您可以使用 maven 構建應用程式
cd function-composition/http-transformer ./mvnw clean package
現在使用 Data Flow Shell 註冊 http-transformer
應用程式。
注意
對於下面的應用程式註冊,在
--uri
選項中,將 artifact 的目錄名和路徑替換為您系統上的相應值。
dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
現在讓我們建立流
dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"
部署流時,我們傳遞 spring.cloud.stream.function.definition
屬性來定義組合函式 DSL(如 Spring Cloud Function 中定義)。在本例中,它是
dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
上述部署將 upper
和 concat
函式 bean 組合到 http
source 應用程式中。
然後我們可以將負載傳送到 http
應用程式
dataflow:>http post --data "friend" --target "https://:9001"
> POST (text/plain) https://:9001 friend
> 202 ACCEPTED
然後在 log
應用程式中看到輸出,如下所示:
[helloComposed-1] log-sink : Hello FRIEND
注意
請注意,函式組合支援不適用於開箱即用的 Spring Cloud Stream Processor
應用程式,因為在函式應該應用於現有處理器應用程式邏輯之前還是之後存在歧義。
但是,您可以使用標準的 java.util.Function API 建立利用函式組合的自己的處理器應用程式,如下例所示:
@Configuration
public static class FunctionProcessorConfiguration {
@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}
@Bean
public Function<String, String> upper() {
return value -> value.toUpperCase();
}
@Bean
public Function<String, String> concat() {
return value -> "Hello "+ value;
}
}
然後您需要使用以下屬性進行部署:spring.cloud.stream.function.definition=upperAndConcat
另一個有趣的功能是 Spring Cloud Function 支援 Kotlin 函式的函式組合。這允許我們將任何 Kotlin 函式 bean 新增到 Source
或 Sink
應用程式的可組合函式中。
為了展示其工作原理,讓我們使用我們示例 github 倉庫中的 http-transformer-kotlin-processor
應用程式。
Kotlin 函式 bean 配置為處理器。在這裡,Kotlin 函式 bean 是下面定義的 transform
函式
@Bean
open fun transform(): (String) -> String {
return { "How are you ".plus(it) }
}
此外,該專案包含 spring-cloud-function-kotlin
依賴項,用於為 Kotlin 函式應用函式配置支援,定義如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-kotlin</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
cd function-composition/http-transformer-kotlin ./mvnw clean package
注意
對於下面的應用程式註冊,在
--uri
選項中,將 artifact 的目錄名和路徑替換為您系統上的相應值。
dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar
使用此應用程式作為 Source
建立流
dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"
正如我們在 http-transformer
示例中所做的那樣,我們可以使用 spring.cloud.stream.function.definition
屬性來指定任何有效的組合函式 DSL 來構建函式組合。在本例中,我們將透過 Java 配置註冊的函式 bean 與來自 Kotlin 處理器配置的函式 bean 結合起來。
dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"
這裡,函式名 transform
對應於 Kotlin 函式。
注意:我們可以進行 Kotlin 函式和 Java 函式之間的組合,因為 Kotlin 函式在內部被轉換為 java.util.Function
。
dataflow:>http post --data "friend" --target "https://:9002"
> POST (text/plain) https://:9002 friend
> 202 ACCEPTED
並且,您可以在 log
應用程式中看到輸出,如下所示:
[omposedKotlin-1] log-sink : Hello How are you FRIEND
在此示例中,http-transformer
也包含了函式的原始碼。但是,您可以透過在單獨的 artifact 中定義函式 bean 來使應用程式更加模組化。然後,您只需向專案中新增一個 maven 依賴項並設定 spring.cloud.stream.function.definition
屬性即可構建應用程式。透過這種方式,您可以將大部分業務邏輯編寫為函式,並在需要時將其與 Source 或 Sink 組合使用。