介紹 Spring Cloud Stream 應用程式的 Java 函式 - 第 1 部分

工程 | David Turanski | 2020 年 7 月 20 日 | ...

介紹 Spring Cloud Stream 應用程式的 Java 函式 - 第 1 部分

上週我們釋出了介紹 Spring Cloud Stream 應用程式的 Java 函式 - 第 0 部分
以宣佈 Spring Cloud Stream 應用程式 2020.0.0-M2 的釋出。
在這裡,我們探討了函式組合,這是第 0 部分中介紹的面向函式架構所啟用的更強大功能之一。如果您還沒有機會閱讀第 0 部分,現在正是時候!

函式組合

函式組合在數學和計算機科學中擁有堅實的理論基礎。
實際上,它是一種將一系列函式連線起來以建立更復雜函式的方法。

讓我們看一個使用 Java 函式的簡單示例。我們有兩個函式,reverseupper
每個函式都接受一個字串作為輸入並生成一個字串作為輸出。我們可以使用內建的 andThen 方法組合它們。複合函式本身是一個 Function<String, String>
如果您執行它,它將列印 ESREVER

Function<String, String> reverse = s -> new StringBuilder(s).reverse().toString(); Function<String, String> upper = String::toUpperCase; Function<String, String> reverseUpper = reverse.andThen(upper); System.out.println(reverseUpper.apply("reverse"));

提示

除了 andThenjava.util.Function 還包括 compose,它首先應用引數 (b),然後將 a 應用於結果。
因此,a.compose(b).apply(s) 等效於 a.apply(b.apply(s))

Spring Cloud Function 中的函式組合

Spring Cloud Function 包含一些很棒的功能,可以將組合函式提升到另一個層次。

宣告式組合

如果我們將上述示例中的函式定義為 Spring bean,

@Bean Function<String, String> reverse() { return s -> new StringBuilder(s).reverse().toString(); }

@Bean Function<String, String> upper() { return String::toUpperCase;

}

我們可以使用 spring.cloud.function.definition 屬性 spring.cloud.function.definition=upper|reverse 來組合這些函式

這裡 | 是一個組合運算子,它會生成一個自動配置的 bean,實現複合函式,以及相關的資源,讓您可以無縫呼叫複合函式。

與 Supplier 和 Consumer 的組合

Spring Cloud Function 擴充套件了原生 Java 函式組合,以支援與 Supplier 和 Consumer 的組合。

這源於隱式為真的概念

  • 與 Consumer 組合的 Function 是一個 Consumer

  • 與 Function 組合的 Supplier 是一個 Supplier

  • 與 Consumer 組合的 Supplier 是一個有效的處理模型(沒有輸入或輸出,這種形式的組合不對映到函式式介面,但類似於 Runnable

正如我們將看到的,Spring Cloud Stream Applications 非常有效地運用了這些概念。

型別轉換

在使用函式組合時,我們必須考慮相容的引數型別。
使用原生 Java 組合,我們可以將 Function<Integer,String> 與 Function<String, Integer> 組合成 Function<Integer, Integer>

Function<Integer, String> intToStr = String::valueOf; Function<String, Integer> doubleit = i -> Integer.parseInt(i) * 2; Function<Integer, Integer> composite = intToStr.andThen(doubleit); composite.apply(10);

當執行 Spring 應用程式時,Spring Cloud Function 使用 Spring 的標準型別轉換支援根據需要強制轉換函式引數。
給定以下 Function bean 定義,函式定義 intToStr|doubleit 按預期工作,將字串轉換為整數。

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, String> intToStr() { return String::valueOf;

}

除了轉換基本型別,Spring 函式還可以在 Message 和 POJO、JSON String 和 POJO 等之間進行轉換。
例如,以下函式可以按任意順序組合

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, Message> convertIntMessage() { return i -> MessageBuilder.withPayload(String.valueOf(i)).build();

}

Spring Cloud Stream 中的函式組合

Spring Cloud Stream 3.x 基於 Spring Cloud Function 構建,以完全支援函數語言程式設計模型。Spring Cloud Stream 的基本前提是它允許函式在分散式環境中執行。繫結器將打包在 Spring Boot 應用程式中的函式的輸入和輸出繫結到配置的訊息代理目標,以便一個函式生成的輸出被用作另一個遠端執行函式的輸入。我們可以將資料流管道視為函式元件的分散式組合。

為了說明這一點,典型的 Spring Cloud Stream 管道,例如

源 | 處理器1 | 處理器2 | 處理器3 | 接收器

邏輯上等同於

供應商 | 函式1 | 函式2 | 函式3 | 接收器

這個想法帶來了一些有趣的架構選擇,因為我們可以使用函式組合將這些元件中的部分或全部組合到一個應用程式中。

例如,我們可以將三個處理器的序列實現為單個應用程式,我們稱之為 composed-processor,它打包了 function1function2function3,並透過 spring.cloud.function.definition=function1|function2|function3 組合。現在,管道可以部署為

源 | 組合處理器 | 接收器

更簡單的是,我們可以建立一個 composed-source 來在源中完成所有處理

組合源 | 接收器

一如既往,這裡沒有正確的答案。總有權衡需要考慮

  • 函式組合減少了部署。這降低了成本、延遲、操作複雜性等。

  • 單個部署是鬆散耦合的,可以獨立擴充套件。

  • 訊息代理提供有保證的交付。當一個簡單的無狀態應用程式宕機並重新啟動時,它可以從上次中斷的地方繼續,處理上一個處理步驟的待處理結果。

  • 一個執行復雜處理的單個應用程式更難理解,並將中間處理結果保留在記憶體中,或者可能在臨時資料儲存中。當有狀態應用程式失敗時,可能導致狀態不一致,使恢復更加困難。

如果這些權衡看起來很熟悉,那是因為它們與任何微服務與單體爭論幾乎相同。最終,選擇最適合您的方法。

與預打包源應用程式的函式組合

在某些情況下,函式組合是理所當然的。從一開始,我們就提供了預打包的處理器,用於執行簡單的轉換,或使用 SpEL 進行過濾。在使用預打包的源或接收器時,舊的架構需要一個單獨的處理器。使用者普遍抱怨“為什麼我需要部署一個單獨的應用程式來評估一個 SpEL 表示式?”為了解決這個問題,我們最初在早期版本中引入了一種 函式組合 支援形式。要將此功能與預打包的應用程式一起使用,需要分叉它們以修改程式碼或構建依賴項以提供這些函式。

當前版本為所有預打包的源提供了開箱即用的函式組合。具體來說,現在可以將源與預打包的函式組合,以在本地執行以下任何操作

  • 執行 SpEL 轉換

  • 豐富訊息頭

  • 過濾事件

  • 生成任務啟動請求

例如,我們可以將 time 源與頭部豐富器和過濾器與配置屬性組合,並將其作為獨立的 Spring Boot 應用程式執行

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=even --spring.cloud.function.definition=timeSupplier|headerEnricherFunction|filterFunction --header.enricher.headers=seconds=T(java.lang.Integer).valueOf(payload .substring(payload.length() - 2)) --filter.function.expression=headers[seconds]%2==0

這將每隔一秒(當秒數為偶數時)將時間(例如 07/16/20 16:43:48)釋出到配置的目標 even

這裡我們使用 RabbitMQ 的預打包時間源,將輸出繫結到名為 even 的主題交換機。如果交換機不存在,繫結器將建立它。函式定義擴充套件了供應商以提取秒數,將其轉換為整數並存儲在 seconds 訊息頭中,然後根據頭的值進行過濾。只有偶數才會透過過濾器。

任務啟動請求

2018 年,我們引入了一種參考架構,用於使用 Spring Cloud Data Flow 和 Spring Batch 執行檔案攝取。為此,我們將 sftp 源分叉為 sftp-dataflow,專門用於實現一個生成任務啟動請求的預打包源。任務啟動請求是一個簡單的值物件,呈現為 JSON,並由 tasklauncher-sink 消費。該接收器充當 Data Flow 的客戶端,根據請求啟動一個批處理應用程式。我們最初選擇 sftp 是因為它用於檔案處理最常用的協議。但是,我們意識到相同的模式可以應用於任何源。我們現在可以透過函式組合來實現這一點。除了標準的 sftp 源,我們還可以從 ftpfiles3 等觸發任務啟動。即使是時間源也可以用於定期啟動任務。

這個有點做作的例子生成了任務啟動請求

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=time-test --spring.cloud.stream.function.definition=timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction --spel.function.expression=payload.length() --header.enricher.headers=task-id=payload*2 --task.launch.request.task-name-expression="'task-'+headers['task-id']

JSON 格式的負載是 {"args":[],"deploymentProps":{},"name":"task-34"}

與使用者編寫程式碼的函式組合

實際上,當用戶開發 Spring Cloud Stream 管道時,他們很可能會從我們預打包的 Spring Cloud Stream Applications 中選擇一個源和接收器。處理器通常是使用者編寫的程式碼,實現特定的業務邏輯。如果您正在編寫處理器,或者想要擴充套件源或接收器,可以使用任何函式。由於我們將函式作為單獨的artifact釋出,您可以簡單地將它們包含在您的依賴項中。您可以像上面所示那樣使用宣告式組合,也可以將它們注入到您的程式碼中並以程式設計方式呼叫它們。當然,您也可以輕鬆整合自己的函式。

我如何貢獻新的函式或應用程式?

如果您在現有函式和應用程式目錄中找不到您需要的內容,請考慮貢獻。這樣,整個開源社群都將受益。在後續文章中,我們將透過一個真實的示例來講解如何開發函式和流應用程式。

我們鼓勵社群參與此專案。除了程式碼貢獻,我們非常感謝文件改進和建立問題。

敬請關注…​

本部落格是系列文章的第二篇,將涵蓋許多相關主題。在未來幾周內,請期待更多深入探討和重點主題。我們將帶您瞭解此儲存庫中包含的元件和周邊流程的整個範圍。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有