Spring Cloud Data Flow 中的組合函式支援

工程 | Ilayaperumal Gopinathan | 2019年1月9日 | ...

Spring Cloud Stream 最近新增了一項功能,可以將函式定義組合到現有的 Spring Cloud Stream 應用程式中。在本篇部落格中,我們將瞭解 Spring Cloud Data Flow 如何利用此功能在流式管道中組合函式。

有何不同?

在 Spring Cloud Data Flow 中,流式資料管道由 Spring Cloud Stream 應用程式組成。開發人員可以選擇並使用開箱即用的流應用程式,這些應用程式涵蓋了許多常見用例。開發人員還可以擴充套件這些開箱即用的應用程式,或使用 Spring Cloud Stream 框架建立自定義應用程式。

Spring Cloud Stream 2.1.0 GA 版本的釋出包含了基於Spring Cloud Function的程式設計模型的整合,該模型允許將業務邏輯表示為 java.util.Functionjava.util.Consumerjava.util.Supplier,它們分別代表 ProcessorSinkSource 的角色。鑑於這種靈活性,Spring Cloud Stream 框架現在支援一種簡單但強大的函式組合方法。在這種上下文中,組合可以是將源和處理器組合成一個應用程式:一個“新源”。或者,它可以是將處理器 + 接收器組合成一個應用程式:“一個新接收器”。這種靈活性為流應用程式開發人員開闢了有趣的新機會。

讓我們看看如何使用三個應用程式建立一個管道來執行簡單的轉換,然後再看看如何使用兩個利用函式組合的應用程式來實現同樣的管道。

使用三個應用程式的流式管道

對於第一個流,

我們將使用開箱即用的 http-sourcetransform-processorlog-sink 應用程式。

第一步,啟動 Spring Cloud Data Flow local 伺服器

java -jar spring-cloud-dataflow-server-local-1.7.3.RELEASE.jar

現在,啟動 Spring Cloud Data Flow shell

java -jar spring-cloud-dataflow-shell-1.7.3.RELEASE.jar

現在讓我們註冊使用 RabbitMQ 繫結的 HTTP 源、轉換器處理器和日誌接收器應用程式

dataflow:>app register --name http --type source --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/http-source-rabbit/2.1.0.M2/http-source-rabbit-2.1.0.M2.jar
dataflow:>app register --name transformer --type processor --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/transform-processor-rabbit/2.1.0.M2/transform-processor-rabbit-2.1.0.M2.jar
dataflow:>app register --name log --type sink --uri https://repo.spring.io/milestone/org/springframework/cloud/stream/app/log-sink-rabbit/2.1.0.M2/log-sink-rabbit-2.1.0.M2.jar

現在我們可以建立一個沒有函式組合的簡單流

dataflow:>stream create hello --definition "http --server.port=9000 | transformer --expression=(\"Hello \"+payload.toString().toUpperCase()) | log"

然後我們可以部署這個流

dataflow:>stream deploy hello --properties "deployer.*.local.inheritLogging=true"
dataflow:>http post --data "friend" --target "https://:9000"
POST (text/plain) https://:9000 friend
202 ACCEPTED

您可以在 log 應用程式中看到以下日誌訊息

[sformer.hello-1] log-sink                                 : Hello FRIEND

在這個流中,我們將 http (source)、transformer (processor) 和 log (sink) 應用程式作為獨立應用程式部署在目標平臺(本例中是 local)上。對於某些用例,例如簡單的負載轉換邏輯,我們可能希望將 Processor 應用程式與 SourceSink 應用程式結合使用。例如,在源輸出資料中遮蔽某些特定使用者欄位的轉換場景不一定需要作為單獨的獨立應用程式部署。相反,它可以在 Source 或 Sink 應用程式中進行組合。

要將 Processor 函式組合到 Source 或 Sink 應用程式中,我們使用 Spring Cloud Stream 的函式組合支援。

Spring Cloud Stream 中的函式組合支援基於 Spring Cloud Function 的能力,即允許將 java.util.Supplierjava.util.Consumerjava.util.Function 註冊為 Spring @Bean 定義。這些函式 @Bean 定義在執行時可用於組合。

Spring Cloud Stream 引入了一個名為 spring.cloud.stream.function.definition 的新屬性,該屬性對應於 Spring Cloud Function 中的函式定義 DSL。設定此屬性後,所需的函式 bean 會在執行時自動連結。

函式組合的發生方式如下:

當 Spring Cloud Stream 應用程式型別為 Source 時,組合函式應用於源的 output 之後。

當 Spring Cloud Stream 應用程式型別為 Sink 時,組合函式應用於接收器的 input 之前。

這使得可以將函式(在 Spring Cloud Function DSL 中定義)組合到現有的 Spring Cloud Stream 應用程式中,然後由 Spring Cloud Data Flow 在流式資料管道中進行編排。

將函式組合到流應用程式中

讓我們建立並部署一個流,將前面示例中的轉換器表示式組合到 Source 應用程式本身中。轉換邏輯透過使用兩個 java.util.Function 實現來完成。

我們將建立一個新的源應用程式,稱之為 http-transformer,它擴充套件了開箱即用的 http source 應用程式。新源應用程式的原始碼可以在這裡找到。

http-transformer 應用程式包含 upperconcat 函式 bean,定義如下:

@SpringBootApplication
@Import(org.springframework.cloud.stream.app.http.source.HttpSourceConfiguration.class)
public class HttpSourceRabbitApplication {

	@Bean
	public Function<String, String> upper() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> concat() {
		return value -> "Hello "+ value;
	}


	public static void main(String[] args) {
		SpringApplication.run(HttpSourceRabbitApplication.class, args);
	}
}

克隆 github 倉庫後,您可以使用 maven 構建應用程式

cd function-composition/http-transformer ./mvnw clean package

現在使用 Data Flow Shell 註冊 http-transformer 應用程式。

注意

對於下面的應用程式註冊,在 --uri 選項中,將 artifact 的目錄名和路徑替換為您系統上的相應值。

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar

現在讓我們建立流

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log"

部署流時,我們傳遞 spring.cloud.stream.function.definition 屬性來定義組合函式 DSL(如 Spring Cloud Function 中定義)。在本例中,它是

dataflow:>stream deploy helloComposed --properties "app.http-transformer.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"

上述部署將 upperconcat 函式 bean 組合到 http source 應用程式中。

然後我們可以將負載傳送到 http 應用程式

dataflow:>http post --data "friend" --target "https://:9001"
> POST (text/plain) https://:9001 friend
> 202 ACCEPTED

然後在 log 應用程式中看到輸出,如下所示:

[helloComposed-1] log-sink                                 : Hello FRIEND

注意

請注意,函式組合支援不適用於開箱即用的 Spring Cloud Stream Processor 應用程式,因為在函式應該應用於現有處理器應用程式邏輯之前還是之後存在歧義。

但是,您可以使用標準的 java.util.Function API 建立利用函式組合的自己的處理器應用程式,如下例所示:

@Configuration
public static class FunctionProcessorConfiguration {

@Bean
public Function<String, String> upperAndConcat() {
return upper().andThen(concat());
}

  @Bean
  public Function<String, String> upper() {
     return value -> value.toUpperCase();
  }

  @Bean
  public Function<String, String> concat() {
     return value -> "Hello "+ value;
  }
}

然後您需要使用以下屬性進行部署:spring.cloud.stream.function.definition=upperAndConcat

Kotlin 支援

另一個有趣的功能是 Spring Cloud Function 支援 Kotlin 函式的函式組合。這允許我們將任何 Kotlin 函式 bean 新增到 SourceSink 應用程式的可組合函式中。

為了展示其工作原理,讓我們使用我們示例 github 倉庫中的 http-transformer-kotlin-processor 應用程式。

Kotlin 函式 bean 配置為處理器。在這裡,Kotlin 函式 bean 是下面定義的 transform 函式

@Bean
open fun transform(): (String) -> String {
   return { "How are you ".plus(it) }
}

此外,該專案包含 spring-cloud-function-kotlin 依賴項,用於為 Kotlin 函式應用函式配置支援,定義如下:

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-function-kotlin</artifactId>
      <version>2.0.0.RELEASE</version>
    </dependency>

cd function-composition/http-transformer-kotlin ./mvnw clean package

注意

對於下面的應用程式註冊,在 --uri 選項中,將 artifact 的目錄名和路徑替換為您系統上的相應值。

dataflow:>app register --name http-transformer-kotlin --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer-kotlin/target/http-transformer-kotlin-2.1.0.BUILD-SNAPSHOT.jar

使用此應用程式作為 Source 建立流

dataflow:>stream create helloComposedKotlin --definition "http-transformer-kotlin --server.port=9002 | log"

正如我們在 http-transformer 示例中所做的那樣,我們可以使用 spring.cloud.stream.function.definition 屬性來指定任何有效的組合函式 DSL 來構建函式組合。在本例中,我們將透過 Java 配置註冊的函式 bean 與來自 Kotlin 處理器配置的函式 bean 結合起來。

dataflow:>stream deploy helloComposedKotlin --properties "app.http-transformer-kotlin.spring.cloud.stream.function.definition=upper|transform|concat,deployer.*.local.inheritLogging=true"

這裡,函式名 transform 對應於 Kotlin 函式。

注意:我們可以進行 Kotlin 函式和 Java 函式之間的組合,因為 Kotlin 函式在內部被轉換為 java.util.Function

dataflow:>http post --data "friend" --target "https://:9002"
> POST (text/plain) https://:9002 friend
> 202 ACCEPTED

並且,您可以在 log 應用程式中看到輸出,如下所示:

[omposedKotlin-1] log-sink               : Hello How are you FRIEND

在此示例中,http-transformer 也包含了函式的原始碼。但是,您可以透過在單獨的 artifact 中定義函式 bean 來使應用程式更加模組化。然後,您只需向專案中新增一個 maven 依賴項並設定 spring.cloud.stream.function.definition 屬性即可構建應用程式。透過這種方式,您可以將大部分業務邏輯編寫為函式,並在需要時將其與 Source 或 Sink 組合使用。

獲取 Spring 通訊

訂閱 Spring 通訊,保持連線

訂閱

先行一步

VMware 提供培訓和認證,助您快速提升。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部