Spring Cloud Data Flow 中的組合函式支援

工程 | Ilayaperumal Gopinathan | 2019年1月9日 | ...

Spring Cloud Stream 最近添加了一項功能,可以將函式定義組合到現有的 Spring Cloud Stream 應用程式中。在這篇部落格中,我們將瞭解 Spring Cloud Data Flow 如何利用此功能在流式管道中組合函式。

有什麼不同之處?

在 Spring Cloud Data Flow 中,流式資料管道由 Spring Cloud Stream 應用程式組成。開發人員可以選擇現成的流式應用程式,這些應用程式涵蓋了許多常見用例。開發人員還可以擴充套件這些現成的應用程式,或使用 Spring Cloud Stream 框架建立自定義應用程式。

隨著Spring Cloud Stream 2.1.0 GA的釋出,集成了Spring Cloud Function支援的程式設計模型,該模型允許業務邏輯表示為 java.util.Functionjava.util.Consumerjava.util.Supplier,分別代表 ProcessorSinkSource 的角色。鑑於這種靈活性,Spring Cloud Stream 框架現在支援一種簡單但強大的函式組合方法。在這種情況下,組合可以是將源和處理器合併到一個應用程式中:一個“新源”。否則,它可以是將處理器+接收器合併到一個應用程式中:“新接收器”。這種靈活性為流應用程式開發人員打開了有趣的機遇。

讓我們考慮一下如何使用三個應用程式建立一個用於執行簡單轉換的管道,然後看看如何使用兩個使用函式組合的應用程式將其實現為管道。

三個應用程式的流式管道

對於第一個流,

我們將使用現成的 http-sourcetransform-processorlog-sink 應用程式。

首先,啟動 Spring Cloud Data Flow local 伺服器

java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar

現在,啟動 Spring Cloud Data Flow shell

java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar

現在我們註冊使用 RabbitMQ 繫結器的 HTTP 源、轉換器處理器和日誌接收器應用程式

dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar

現在我們可以建立一個簡單的流,而無需進行函式組合

dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"

然後我們可以部署流

dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://:9000"
POST (text/plain) https://:9000 friend
202 ACCEPTED

您可以在 log 應用程式中看到以下日誌訊息

[sformer.hello-1] log-sink                                 : Hello FRIEND

在此流中,我們將 http (源)、轉換器 (處理器) 和日誌 (接收器) 應用程式作為獨立應用程式部署在目標平臺 (在本例中是 local) 上。對於某些用例,對於簡單的有效負載轉換邏輯,我們可能希望將 Processor 應用程式與 SourceSink 應用程式結合起來。例如,在源輸出資料時掩蓋某些特定使用者欄位的轉換場景不一定需要部署為單獨的獨立應用程式。相反,它可以組合在源或接收器應用程式中。

要將處理器函式組合到源或接收器應用程式中,我們使用 Spring Cloud Stream 的函式組合支援。

Spring Cloud Stream 中的函式組合支援基於 Spring Cloud Function 允許將 java.util.Supplierjava.util.Consumerjava.util.Function 註冊為 Spring @Bean 定義的能力。這些函式 @Bean 定義可在執行時進行組合。

Spring Cloud Stream 引入了一個名為 spring.cloud.stream.function.definition 的新屬性,該屬性對應於 Spring Cloud Function 中的函式定義 DSL。設定此屬性後,所需的函式 Bean 將在執行時自動連結。

函式組合發生的方式如下

當 Spring Cloud Stream 應用程式的型別為 Source 時,組合函式將在源 output 之後應用。

當 Spring Cloud Stream 應用程式的型別為 Sink 時,組合函式將在接收器 input 之前應用。

這使得能夠將函式 (在 Spring Cloud Function DSL 中定義) 組合到現有的 Spring Cloud Stream 應用程式中,並隨後由 Spring Cloud Data Flow 在流式資料管道中進行編排。

將函式組合到流應用程式中

讓我們建立並部署一個流,將前面示例中的轉換器表示式組合到 Source 應用程式本身。轉換器邏輯是透過使用兩個 java.util.Function 實現來完成的。

我們將建立一個新的源應用程式,我們稱之為 http-transformer,它擴充套件了現成的 http 源應用程式。新源應用程式的源可以在這裡找到。

http-transformer 應用程式包含 upperconcat 函式 Bean,如下定義

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {

	@Bean
	public Function<String, String> upper() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> concat() {
		return value -> "Hello "+ value;
	}


	public static void main(String[] args) {
		SpringApplication.run(HttpSourceRabbitApplication.class, args);
	}
}

克隆 github 倉庫後,您可以使用 maven 構建應用程式

cd function-composition/http-transformer ./mvnw clean package

現在,使用 Data Flow Shell 註冊 http-transformer 應用程式。

注意

對於下面的應用程式註冊 --uri 選項,請將工件的目錄名稱和路徑替換為適合您系統的名稱和路徑。

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar

現在我們來建立流

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"

在部署流時,我們傳遞 spring.cloud.stream.function.definition 屬性來定義組合函式 DSL (在 Spring Cloud Function 中定義)。在這種情況下,它是

dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"

上面的部署將 upperconcat 函式 Bean 組合到 http 源應用程式中。

然後我們可以將有效負載傳送到 http 應用程式

dataflow:>http post --data "friend" --target "https://:9001"
> POST (text/plain) https://:9001 friend
> 202 ACCEPTED

然後您可以在 log 應用程式中看到輸出,如下所示

[helloComposed-1] log-sink                                 : Hello FRIEND

注意

請注意,函式組合支援不適用於現成的 Spring Cloud Stream Processor 應用程式,因為無法明確是應該在現有處理器應用程式邏輯之前還是之後應用函式。

但是,您可以建立自己的處理器應用程式,這些應用程式使用函式組合和標準的 java.util.Function API,如下例所示

@Configuration
public static class FunctionProcessorConfiguration {

@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

然後您需要使用以下屬性進行部署: spring.cloud.stream.function.definition=upperAndConcat

Kotlin 支援

另一個有趣的功能是 Spring Cloud Function 支援 Kotlin 函式的函式組合。這使我們可以將任何 Kotlin 函式 Bean 新增到 SourceSink 應用程式的可組合函式中。

要檢視此功能的工作原理,讓我們使用我們示例 github 倉庫中的 http-transformer-kotlin-processor 應用程式。

Kotlin 函式 Bean 被配置為處理器。這裡,Kotlin 函式 Bean 是定義的 transform 函式如下

@Bean
open fun transform(): (String) -> String {
   return { "How are you ".plus(it) }
}

此外,該專案還具有 spring-cloud-function-kotlin 作為依賴項,用於為 Kotlin 函式應用函式配置支援,定義如下

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-kotlin</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

cd function-composition/http-transformer-kotlin ./mvnw clean package

注意

對於下面的應用程式註冊 --uri 選項,請將工件的目錄名稱和路徑替換為適合您系統的名稱和路徑。

dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar

要使用此應用程式建立流作為 Source

dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"

與我們在 http-transformer 示例中所做的一樣,我們可以使用 spring.cloud.stream.function.definition 屬性來指定任何有效的組合函式 DSL 來構建函式組合。在這種情況下,它是

dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"

這裡,函式名 transform 對應於 Kotlin 函式。

注意:我們可以執行 Kotlin 函式和 Java 函式之間的組合,因為 Kotlin 函式在內部會被轉換為 java.util.Function

dataflow:>http post --data "friend" --target "https://:9002"
> POST (text/plain) https://:9002 friend
> 202 ACCEPTED

並且,您可以在 log 應用程式中看到輸出,如下所示

[omposedKotlin-1] log-sink               : Hello How are you FRIEND

在此示例中,http-transformer 也包含函式的原始碼。但是,您可以將應用程式定義為更模組化,將函式 Bean 定義在單獨的工件中。然後,您可以透過僅向專案新增 maven 依賴項並透過設定 spring.cloud.stream.function.definition 屬性來構建應用程式。這樣,您就可以將大部分業務邏輯編碼為函式,並在必要時將其與源或接收器進行組合。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有