搶佔先機
VMware 提供培訓和認證,助您加速進步。
瞭解更多我只是想在這裡記錄下昨天讓我會心一笑的一個經歷:讓快速改進的 Spring Cloud Data Flow 在幾分鐘內從 (Spring Boot) 啟動器 (start(-ers)) 搖擺(wiggle)到服務 (service)!
唯一的先決條件是你有一個正在執行的 Redis 例項。我的 Redis 例項執行在
127.0.0.1
上,Spring Boot 無需額外配置即可找到並使用它。
我們將使用超讚的 Spring Initializr 輕鬆生成我們的應用程式。還記得那些愚蠢的 Apple 廣告,“There's an App For That?” 別管那個了,現在是 勾選框 就行!看看你是否像我一樣喜歡這個體驗!
前往 Spring Initializr 選擇 Local Data Flow Server
並將 artifact 命名為 df-server
。這將用於啟動一個本地 Data Flow 服務 - 一個 REST API 和一些持久化邏輯 - 用於編排和儲存關於流和任務的資訊。在舊的 Spring XD 世界中,這被稱為 Spring XD 的 Admin Server。
在你選擇的 IDE 中開啟專案,並將 @EnableDataFlowServer
新增到 DfServerApplication
類中
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;
@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {
public static void main(String[] args) {
SpringApplication.run(DfServerApplication.class, args);
}
}
在 df-server
專案的根目錄下執行 mvn spring-boot:run
,應用程式將在埠 9393
上啟動。
提示:當你看到歡迎 ASCII 藝術字時,你就(很可能)成功了!
關於提示的提示:好吧,這可能不完全正確。它可能會因為各種原因失敗(比如服務或嵌入式 H2 資料庫的埠衝突),但高質量的 ASCII 藝術字已被證明在(我的)研究中具有治療作用……(研究物件是……我自己)。
前往 Spring Initializr 選擇 Data Flow Shell
並將 artifact 命名為 df-shell
。這將用於啟動一個由 Spring Shell 驅動的 Data Flow shell。
Data Flow shell 可以在任何作業系統上執行。它是我們剛剛啟動的 Data Flow 服務的客戶端。它允許我們使用熟悉的管道與過濾器 DSL 和命令來操作服務。我和其他開發者一樣喜歡精美的橫幅 ASCII 藝術字,但美好的事物也可能(呃!)過猶不及。預設情況下,Spring Shell 和 Spring Boot 都試圖發出 ASCII 橫幅,所以這次我們將讓 Spring Boot 避開(這次!)。在你選擇的 IDE 中開啟專案,並將 @EnableDataFlowShell
新增到 DfShellApplication
類中,然後配置 SpringApplication
的建立方式以隱藏 Spring Boot 橫幅。
package com.example;
import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.dataflow.shell.EnableDataFlowShell;
@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(DfShellApplication.class)
.bannerMode(Banner.Mode.OFF)
.run(args);
}
}
在 df-shell
專案的根目錄下執行 mvn spring-boot:run
。預設情況下,你應該能夠與在本地執行的 Data Flow 伺服器進行互動。嘗試執行 module list
命令。你應該會看到一個表格,其中列出了 Spring Cloud Data Flow 已知的所有內建元件。
前往 Spring Initializr 選擇 Stream Redis
並將 artifact 命名為 logging-sink
。我們將使用 Spring Cloud Stream 構建一個記錄傳入訊息的*自定義*模組。Spring Cloud Stream 構建於 Spring 的 MessageChannel
抽象和 Spring Integration 中的元件模型之上,可簡化描述和整合基於訊息的微服務的工作。然後,我們將使用 Spring Cloud Data Flow 來部署和編排這個模組。
Spring Cloud Data Flow 是一種強大的方式,可以用小的 Spring Boot 驅動的模組來描述複雜的整合、批處理和流處理工作負載。有幾種型別的 module
(模組)。一個 source
(源)生成資料,通常按固定計劃生成,下游元件可以消費和處理這些資料。一個 processor
(處理器)接收資料,對其進行處理,然後輸出資料。一個 sink
(匯)只接收資料,但不產生任何要傳送出去的東西。這些元件可以很好地組合在一起,描述任何潛在的連續工作負載(物聯網感測器資料、24/7 事件處理、線上事務資料攝取和整合場景等)。最終,source 通常是 Spring Integration 的入站介面卡。processor 通常是任何接收資料並輸出資料的 Spring Integration 元件(如 transformer)。sink 通常是 Spring Integration 的出站介面卡。
一個 task
(任務)描述任何最終會停止的工作負載。它可能是一個簡單的 Spring Boot Command Line Runner
或一個 Spring Batch Job
。
儘管如此,Spring Cloud Data Flow 並沒有特定的 Spring Integration 知識。它只瞭解 Spring Cloud Stream 以及眾所周知的 Spring MessageChannels
(訊息通道),例如 input
和 output
。它不關心這些通道的終端是什麼。Spring Cloud Data Flow 也沒有特定的 Spring Batch 知識。它只瞭解 Spring Cloud Task。
正如 UNIX sh
shell 環境允許我們透過將資料分別傳遞到 stdin
和從 stdout
傳遞資料,從而從單一功能的命令列工具組合出任意多且任意複雜的解決方案一樣,Spring Cloud Data Flow 也允許我們從單一功能的訊息傳遞元件組合出任意多且任意複雜的解決方案。
Spring Cloud Data Flow 已經內建了許多開箱即用的功能。我們將開發並安裝一個簡單的模組來記錄東西 - 在我們的例子中,是時間。值得注意的是,我們這樣做是為了自己的學習,但實際上我們並*不需要*這樣做;Spring Cloud Data Flow 已經提供了 log
模組!(還有一個 time
模組!)
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.Map;
@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {
@MessageEndpoint
public static class LoggingMessageEndpoint {
@ServiceActivator(inputChannel = Sink.INPUT)
public void logIncomingMessages(
@Payload String msg,
@Headers Map<String, Object> headers) {
System.out.println(msg);
headers.entrySet().forEach(e ->
System.out.println(e.getKey() + '=' + e.getValue()));
}
}
public static void main(String[] args) {
SpringApplication.run(LoggingSinkApplication.class, args);
}
}
這是一個簡單的 Spring Cloud Stream 繫結。Sink.class
是一個定義了 MessageChannel input()
的介面。Spring Cloud Stream 會將其轉換為一個即時、命名的通道,連線到一個訊息代理(在本例中是 Redis,儘管未來幾個月 Spring Cloud Data Flow 的預設設定可能會更改為 RabbitMQ),我們任何訊息傳遞程式碼都可以使用這個通道。這個示例使用 Spring Integration 在訊息到達時打印出傳入的訊息資料。首先,讓我們向 Data Flow 註冊我們的自定義模組,然後組成一個流,該流從 time
元件接收包含時間的傳入訊息,然後記錄結果。
首先,對 logging-sink
專案執行 mvn clean install
,以便它能在本地 Maven 倉庫中被解析。Spring Cloud Data Flow 使用可插拔的策略來解析自定義模組的例項。在我們的示例中,它將嘗試在我們的系統本地 Maven 倉庫中解析它們。
返回 Data Flow Shell 並輸入以下內容
dataflow:>module register --name custom-log --type sink --uri maven://com.example:logging-sink:jar:0.0.1-SNAPSHOT
Successfully registered module 'sink:custom-log'
dataflow:>module list
╔══════════════╤════════════════╤═══════════════════╤═════════╗
║ source │ processor │ sink │ task ║
╠══════════════╪════════════════╪═══════════════════╪═════════╣
║file │bridge │aggregate-counter │timestamp║
║ftp │filter │cassandra │ ║
║http │groovy-filter │counter │ ║
║jdbc │groovy-transform│custom-log │ ║
║jms │httpclient │field-value-counter│ ║
║load-generator│pmml │file │ ║
║rabbit │splitter │ftp │ ║
║sftp │transform │gemfire │ ║
║tcp │ │gpfdist │ ║
║time │ │hdfs │ ║
║trigger │ │jdbc │ ║
║twitterstream │ │log │ ║
║ │ │rabbit │ ║
║ │ │redis │ ║
║ │ │router │ ║
║ │ │tcp │ ║
║ │ │throughput │ ║
║ │ │websocket │ ║
╚══════════════╧════════════════╧═══════════════════╧═════════╝
dataflow:>stream create --name time-to-log --definition 'time | custom-log'
Created new stream 'time-to-log'
dataflow:>stream list
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│ Status ║
╠═══════════╪═════════════════╪══════════╣
║time-to-log│time | custom-log│undeployed║
╚═══════════╧═════════════════╧══════════╝
dataflow:>stream deploy --name time-to-log
Deployed stream 'time-to-log'
你會在 Data Flow 服務日誌中看到模組已經啟動並連線在一起。在我的具體日誌中,我觀察到
2016-04-05 09:09:18.067 INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.custom-log instance 0
Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log
2016-04-05 09:09:30.838 INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer : deploying app time-to-log.time instance 0
Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.time
檢視日誌尾部,確認你內心深處已經知道的事情:我們的自定義 logging-sink
正在工作!
tail -f /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log/std*
邁向雲!我們使用的是本地 Data Flow 伺服器。還有其他實現可用於像 Cloud Foundry 這樣的處理平臺。Cloud Foundry Data Flow Server 會啟動應用程式例項,而不是本地 Java 程序。現在,構建一個可擴充套件的資料攝取和處理流就像 cf push ..
和 cf scale -i $MOAR
一樣簡單!
我們只使用了 Spring Cloud Data Flow 的一小部分功能!使用 Spring Cloud Data Flow 可以編排任意數量由 Spring Cloud Stream 驅動的基於訊息的微服務。我建議檢視一些內建的 Spring Cloud Stream 模組以獲取靈感。