Spring Cloud Stream 應用的 Java 函式介紹 - 第 1 部分

工程 | David Turanski | 2020 年 7 月 20 日 | ...

Spring Cloud Stream 應用的 Java 函式介紹 - 第 1 部分

上週我們釋出了 Spring Cloud Stream 應用的 Java 函式介紹 - 第 0 部分
以宣佈 Spring Cloud Stream applications 2020.0.0-M2 的釋出。
在這裡,我們探討 函式組合,這是第 0 部分介紹的面向函式架構帶來的更強大的特性之一。如果您還沒有機會閱讀 第 0 部分,現在是很好的時機!

函式組合

函式組合在數學和計算機科學中擁有堅實的理論基礎。
實際上,它是將一系列函式連線起來建立更復雜函式的一種方式。

我們來看一個使用 Java 函式的簡單例子。我們有兩個函式,reverseupper
每個函式都接受一個 String 作為輸入,併產生一個 String 作為輸出。我們可以使用內建的 andThen 方法來組合它們。組合後的函式本身是一個 Function<String, String>
如果您執行這段程式碼,它將列印 ESREVER

Function<String, String> reverse = s -> new StringBuilder(s).reverse().toString(); Function<String, String> upper = String::toUpperCase; Function<String, String> reverseUpper = reverse.andThen(upper); System.out.println(reverseUpper.apply("reverse"));

提示

除了 andThen 之外,java.util.Function 還包括 compose 方法,它首先應用引數 (b),然後將 a 應用到結果上。
因此,a.compose(b).apply(s) 等同於 a.apply(b.apply(s))

Spring Cloud Function 中的函式組合

Spring Cloud Function 包含一些出色的功能,可以將函式組合提升到新的水平。

宣告式組合

如果我們將上面例子中的函式定義為 Spring bean,

@Bean Function<String, String> reverse() { return s -> new StringBuilder(s).reverse().toString(); }

@Bean Function<String, String> upper() { return String::toUpperCase;

}

我們可以使用 spring.cloud.function.definition 屬性來組合這些函式,例如 spring.cloud.function.definition=upper|reverse

這裡 | 是一個組合運算子,它會生成一個自動配置的 bean 來實現組合函式,並提供相關資源,讓您可以無縫地呼叫組合函式。

與 Supplier 和 Consumer 的組合

Spring Cloud Function 擴充套件了原生的 Java 函式組合,支援與 Supplier 和 Consumer 的組合。

這源於一些隱含為真的概念

  • Function 與 Consumer 組合後是一個 Consumer

  • Supplier 與 Function 組合後是一個 Supplier

  • Supplier 與 Consumer 組合是一種有效的處理模型(沒有輸入或輸出,這種形式的組合不對映到函式式介面,但類似於 Runnable

正如我們將看到的,Spring Cloud Stream Applications 有效地運用了這些概念。

型別轉換

在使用函式組合時,我們必須考慮相容的引數型別。
使用原生的 Java 組合,我們可以將 Function<Integer,String> 與 Function<String, Integer> 組合成一個 Function<Integer, Integer>

Function<Integer, String> intToStr = String::valueOf; Function<String, Integer> doubleit = i -> Integer.parseInt(i) * 2; Function<Integer, Integer> composite = intToStr.andThen(doubleit); composite.apply(10);

執行 Spring 應用程式時,Spring Cloud Function 使用 Spring 標準的型別轉換支援,根據需要強制轉換函式引數。
給定以下 Function bean 定義,函式定義 intToStr|doubleit 按預期工作,將 String 轉換為 Integer。

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, String> intToStr() { return String::valueOf;

}

除了轉換基本型別之外,Spring 函式還可以在 Message 和 POJO、JSON String 和 POJO 之間進行轉換,等等。
例如,以下函式可以按任意順序組合

@Bean Function<Integer, Integer> doubleit() { return i -> i * 2; }

@Bean Function<Integer, Message> convertIntMessage() { return i -> MessageBuilder.withPayload(String.valueOf(i)).build();

}

Spring Cloud Stream 中的函式組合

Spring Cloud Stream 3.x 構建在 Spring Cloud Function 之上,以完全支援函數語言程式設計模型。Spring Cloud Stream 的基本前提是它使得函式能夠在分散式環境中執行。繫結器將打包在 Spring Boot 應用程式中的函式的輸入和輸出繫結到配置的訊息代理目的地,這樣,一個函式產生的輸出就可以作為另一個遠端執行函式的輸入被消費。我們可以將資料流管道視為函式元件的分散式組合。

為了說明這一點,一個典型的 Spring Cloud Stream 管道,例如

source | processor1 | processor2 | processor3 | sink

在邏輯上等同於

supplier | function1 | function2 | function3 | sink

這個想法帶來了一些有趣的架構選擇,因為我們可以使用函式組合將部分或全部這些元件組合到一個應用程式中。

例如,我們可以將三個處理器序列實現為一個單一應用程式,我們稱之為 composed-processor,它打包了 function1function2function3,並透過 spring.cloud.function.definition=function1|function2|function3 進行組合。現在管道可以部署為

source | composed-processor | sink

更簡單的是,我們可以建立一個 composed-source,在 source 中完成所有處理

composed-source | sink

一如既往,這裡沒有唯一的正確答案。總是有權衡需要考慮

  • 函式組合減少了部署數量。這降低了成本、延遲、操作複雜性等等。

  • 單獨部署是松耦合的,可以獨立擴充套件。

  • 訊息代理提供有保證的交付。當一個簡單的無狀態應用程式宕機並重新啟動時,它可以從中斷的地方繼續,處理上一個處理步驟的待處理結果。

  • 執行復雜處理的單一應用程式更難理解,並且將中間處理結果儲存在記憶體中,或者可能儲存在臨時資料儲存中。當有狀態應用程式失敗時,可能導致狀態不一致,從而使恢復更加困難。

如果這些權衡看起來很熟悉,那是因為它們與微服務與單體應用之間的爭論大致相同。最終,做最適合您的選擇。

與預打包 Source 應用的函式組合

在某些情況下,函式組合是顯而易見的。從一開始,我們就提供了預打包的處理器來執行簡單的轉換或使用 SpEL 進行過濾。在使用預打包的 source 或 sink 時,傳統架構需要一個單獨的處理器。使用者普遍抱怨“為什麼我只需要評估一個 SpEL 表示式,卻需要部署一個單獨的應用程式?”為了解決這個問題,我們在早期版本中初步引入了一種對 函式組合 的支援形式。要將此功能與預打包的應用程式一起使用,需要 fork 它們以修改程式碼或構建依賴項來提供函式。

當前版本為所有預打包的 source 提供了開箱即用的函式組合。具體來說,現在可以將 source 與 預打包函式 組合,在本地執行以下任何操作

  • 執行 SpEL 轉換

  • 豐富訊息頭

  • 過濾事件

  • 產生任務啟動請求

例如,我們可以將 time source 與訊息頭豐富器和過濾器透過配置屬性進行組合,並將其作為獨立的 Spring Boot 應用程式執行

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=even --spring.cloud.function.definition=timeSupplier|headerEnricherFunction|filterFunction --header.enricher.headers=seconds=T(java.lang.Integer).valueOf(payload .substring(payload.length() - 2)) --filter.function.expression=headers[seconds]%2==0

這將每隔一秒(當秒數為偶數時)釋出時間,例如 `07/16/20 16:43:48,到配置的目的地 even

這裡我們使用一個預打包的 RabbitMQ time source,將輸出繫結到一個名為 even 的 topic exchange。如果 exchange 不存在,繫結器將建立它。函式定義擴充套件了 supplier,以提取秒數,將其轉換為整數並存儲在 seconds 訊息頭中,然後根據訊息頭的值進行過濾。只有偶數值透過過濾器。

任務啟動請求

2018 年,我們引入了一種參考架構,用於使用 Spring Cloud Data Flow 和 Spring Batch 執行檔案攝取。為此,我們將 sftp source fork 為 sftp-dataflow,專門用於實現一個生成任務啟動請求的預打包 source。任務啟動請求是一個簡單的值物件,渲染為 JSON,並由 tasklauncher-sink 消費。sink 作為 Data Flow 的客戶端,根據請求啟動批處理應用程式。我們最初選擇 sftp 是因為它是在檔案處理中最常用的協議。然而,我們意識到同樣的模式可以應用於任何 source。現在我們可以透過函式組合來實現這一點。除了標準的 sftp source 之外,我們還可以從 ftpfiles3 等觸發任務啟動。甚至 time source 也可以用於定期啟動任務。

這個有些刻意的例子生成任務啟動請求

java -jar target/time-source-rabbit-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.output.destination=time-test --spring.cloud.stream.function.definition=timeSupplier|spelFunction|headerEnricherFunction|taskLaunchRequestFunction --spel.function.expression=payload.length() --header.enricher.headers=task-id=payload*2 --task.launch.request.task-name-expression="'task-'+headers['task-id']

JSON 格式的有效載荷是 {"args":[],"deploymentProps":{},"name":"task-34"}

與使用者編寫程式碼的函式組合

實際上,當用戶開發 Spring Cloud Stream 管道時,很可能會從我們預打包的 Spring Cloud Stream Applications 中選擇 source 和 sink。處理器通常是使用者編寫的程式碼,實現特定的業務邏輯。如果您正在編寫處理器,或者想擴充套件 source 或 sink,任何 函式 都可以供您使用。由於我們將函式作為獨立構件釋出,您可以輕鬆地將它們包含在您的依賴項中。您既可以使用上面所示的宣告式組合,也可以將它們注入到您的程式碼中並透過程式設計方式呼叫它們。當然,您也可以輕鬆地整合自己的函式。

如何貢獻新的函式或應用程式?

如果您在現有函式和應用程式目錄中找不到您需要的內容,請考慮貢獻。這樣,整個開源社群都會受益。在後續的文章中,我們將透過一個實際例子來介紹如何開發函式和 stream 應用程式。

我們鼓勵社群參與到這個專案中。除了程式碼貢獻,我們非常感謝文件改進和建立 issue。

敬請期待…​

這篇部落格是系列文章中的第二篇,將涵蓋許多相關主題。未來幾周將有更多深入探討和重點主題的文章。我們將帶您全面瞭解此倉庫中包含的元件及其周圍的過程。

訂閱 Spring 電子報

隨時關注 Spring 電子報

訂閱

先人一步

VMware 提供培訓和認證,助您加速發展。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,一次簡單訂閱即可獲得。

瞭解更多

即將到來的活動

檢視 Spring 社群所有即將到來的活動。

檢視全部