Spring Cloud Data Flow 的 1 個流、2 個應用程式和 3 個依賴項

工程 | Josh Long | April 05, 2016 | ...

我只是想在這裡記錄下昨天讓我會心一笑的一個經歷:讓快速改進的 Spring Cloud Data Flow 在幾分鐘內從 (Spring Boot) 啟動器 (start(-ers)) 搖擺(wiggle)到服務 (service)!

唯一的先決條件是你有一個正在執行的 Redis 例項。我的 Redis 例項執行在 127.0.0.1 上,Spring Boot 無需額外配置即可找到並使用它。

我們將使用超讚的 Spring Initializr 輕鬆生成我們的應用程式。還記得那些愚蠢的 Apple 廣告,“There's an App For That?” 別管那個了,現在是 勾選框 就行!看看你是否像我一樣喜歡這個體驗!

本地 Data Flow 伺服器

前往 Spring Initializr 選擇 Local Data Flow Server 並將 artifact 命名為 df-server。這將用於啟動一個本地 Data Flow 服務 - 一個 REST API 和一些持久化邏輯 - 用於編排和儲存關於流和任務的資訊。在舊的 Spring XD 世界中,這被稱為 Spring XD 的 Admin Server

在你選擇的 IDE 中開啟專案,並將 @EnableDataFlowServer 新增到 DfServerApplication 類中

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.dataflow.server.EnableDataFlowServer;

@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(DfServerApplication.class, args);
	}
}

df-server 專案的根目錄下執行 mvn spring-boot:run,應用程式將在埠 9393 上啟動。

提示:當你看到歡迎 ASCII 藝術字時,你就(很可能)成功了!

關於提示的提示:好吧,這可能不完全正確。它可能會因為各種原因失敗(比如服務或嵌入式 H2 資料庫的埠衝突),但高質量的 ASCII 藝術字已被證明在(我的)研究中具有治療作用……(研究物件是……我自己)。

Data Flow Shell

前往 Spring Initializr 選擇 Data Flow Shell 並將 artifact 命名為 df-shell。這將用於啟動一個由 Spring Shell 驅動的 Data Flow shell。

Data Flow shell 可以在任何作業系統上執行。它是我們剛剛啟動的 Data Flow 服務的客戶端。它允許我們使用熟悉的管道與過濾器 DSL 和命令來操作服務。我和其他開發者一樣喜歡精美的橫幅 ASCII 藝術字,但美好的事物也可能(呃!)過猶不及。預設情況下,Spring Shell Spring Boot 都試圖發出 ASCII 橫幅,所以這次我們將讓 Spring Boot 避開(這次!)。在你選擇的 IDE 中開啟專案,並將 @EnableDataFlowShell 新增到 DfShellApplication 類中,然後配置 SpringApplication 的建立方式以隱藏 Spring Boot 橫幅。

package com.example;

import org.springframework.boot.Banner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.dataflow.shell.EnableDataFlowShell;

@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {

	public static void main(String[] args) {
		new SpringApplicationBuilder(DfShellApplication.class)
				.bannerMode(Banner.Mode.OFF)
				.run(args);
	}
}

df-shell 專案的根目錄下執行 mvn spring-boot:run。預設情況下,你應該能夠與在本地執行的 Data Flow 伺服器進行互動。嘗試執行 module list 命令。你應該會看到一個表格,其中列出了 Spring Cloud Data Flow 已知的所有內建元件。

日誌 Sink 模組

前往 Spring Initializr 選擇 Stream Redis 並將 artifact 命名為 logging-sink。我們將使用 Spring Cloud Stream 構建一個記錄傳入訊息的*自定義*模組。Spring Cloud Stream 構建於 Spring 的 MessageChannel 抽象和 Spring Integration 中的元件模型之上,可簡化描述和整合基於訊息的微服務的工作。然後,我們將使用 Spring Cloud Data Flow 來部署和編排這個模組。

Spring Cloud Data Flow 是一種強大的方式,可以用小的 Spring Boot 驅動的模組來描述複雜的整合、批處理和流處理工作負載。有幾種型別的 module(模組)。一個 source(源)生成資料,通常按固定計劃生成,下游元件可以消費和處理這些資料。一個 processor(處理器)接收資料,對其進行處理,然後輸出資料。一個 sink(匯)只接收資料,但不產生任何要傳送出去的東西。這些元件可以很好地組合在一起,描述任何潛在的連續工作負載(物聯網感測器資料、24/7 事件處理、線上事務資料攝取和整合場景等)。最終,source 通常是 Spring Integration 的入站介面卡。processor 通常是任何接收資料並輸出資料的 Spring Integration 元件(如 transformer)。sink 通常是 Spring Integration 的出站介面卡。

一個 task(任務)描述任何最終會停止的工作負載。它可能是一個簡單的 Spring Boot Command Line Runner 或一個 Spring Batch Job

儘管如此,Spring Cloud Data Flow 並沒有特定的 Spring Integration 知識。它只瞭解 Spring Cloud Stream 以及眾所周知的 Spring MessageChannels(訊息通道),例如 inputoutput。它不關心這些通道的終端是什麼。Spring Cloud Data Flow 也沒有特定的 Spring Batch 知識。它只瞭解 Spring Cloud Task。

正如 UNIX sh shell 環境允許我們透過將資料分別傳遞到 stdin 和從 stdout 傳遞資料,從而從單一功能的命令列工具組合出任意多且任意複雜的解決方案一樣,Spring Cloud Data Flow 也允許我們從單一功能的訊息傳遞元件組合出任意多且任意複雜的解決方案。

Spring Cloud Data Flow 已經內建了許多開箱即用的功能。我們將開發並安裝一個簡單的模組來記錄東西 - 在我們的例子中,是時間。值得注意的是,我們這樣做是為了自己的學習,但實際上我們並*不需要*這樣做;Spring Cloud Data Flow 已經提供了 log 模組!(還有一個 time 模組!)

package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;

import java.util.Map;

@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

	@MessageEndpoint
	public static class LoggingMessageEndpoint {

		@ServiceActivator(inputChannel = Sink.INPUT)
		public void logIncomingMessages(
				@Payload String msg,
				@Headers Map<String, Object> headers) {

			System.out.println(msg);
			headers.entrySet().forEach(e ->
					System.out.println(e.getKey() + '=' + e.getValue()));

		}
	}

	public static void main(String[] args) {
		SpringApplication.run(LoggingSinkApplication.class, args);
	}
}

這是一個簡單的 Spring Cloud Stream 繫結。Sink.class 是一個定義了 MessageChannel input() 的介面。Spring Cloud Stream 會將其轉換為一個即時、命名的通道,連線到一個訊息代理(在本例中是 Redis,儘管未來幾個月 Spring Cloud Data Flow 的預設設定可能會更改為 RabbitMQ),我們任何訊息傳遞程式碼都可以使用這個通道。這個示例使用 Spring Integration 在訊息到達時打印出傳入的訊息資料。首先,讓我們向 Data Flow 註冊我們的自定義模組,然後組成一個流,該流從 time 元件接收包含時間的傳入訊息,然後記錄結果。

首先,對 logging-sink 專案執行 mvn clean install,以便它能在本地 Maven 倉庫中被解析。Spring Cloud Data Flow 使用可插拔的策略來解析自定義模組的例項。在我們的示例中,它將嘗試在我們的系統本地 Maven 倉庫中解析它們。

返回 Data Flow Shell 並輸入以下內容

dataflow:>module register --name custom-log --type sink --uri maven://com.example:logging-sink:jar:0.0.1-SNAPSHOT
Successfully registered module 'sink:custom-log'

dataflow:>module list
╔══════════════╤════════════════╤═══════════════════╤═════════╗
║    source    │   processor    │       sink        │  task   ║
╠══════════════╪════════════════╪═══════════════════╪═════════╣
║file          │bridge          │aggregate-counter  │timestamp║
║ftp           │filter          │cassandra          │         ║
║http          │groovy-filter   │counter            │         ║
║jdbc          │groovy-transform│custom-log         │         ║
║jms           │httpclient      │field-value-counter│         ║
║load-generator│pmml            │file               │         ║
║rabbit        │splitter        │ftp                │         ║
║sftp          │transform       │gemfire            │         ║
║tcp           │                │gpfdist            │         ║
║time          │                │hdfs               │         ║
║trigger       │                │jdbc               │         ║
║twitterstream │                │log                │         ║
║              │                │rabbit             │         ║
║              │                │redis              │         ║
║              │                │router             │         ║
║              │                │tcp                │         ║
║              │                │throughput         │         ║
║              │                │websocket          │         ║
╚══════════════╧════════════════╧═══════════════════╧═════════╝

dataflow:>stream create --name time-to-log --definition 'time | custom-log'
Created new stream 'time-to-log'

dataflow:>stream list
╔═══════════╤═════════════════╤══════════╗
║Stream Name│Stream Definition│  Status  ║
╠═══════════╪═════════════════╪══════════╣
║time-to-log│time | custom-log│undeployed║
╚═══════════╧═════════════════╧══════════╝

dataflow:>stream deploy --name time-to-log
Deployed stream 'time-to-log'

你會在 Data Flow 服務日誌中看到模組已經啟動並連線在一起。在我的具體日誌中,我觀察到

2016-04-05 09:09:18.067  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.custom-log instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log
2016-04-05 09:09:30.838  INFO 58339 --- [nio-9393-exec-3] o.s.c.d.spi.local.LocalAppDeployer       : deploying app time-to-log.time instance 0
   Logs will be in /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.time

檢視日誌尾部,確認你內心深處已經知道的事情:我們的自定義 logging-sink 正在工作!

tail -f /var/folders/cr/grkckb753fld3lbmt386jp740000gn/T/spring-cloud-dataflow-2481763302070183542/time-to-log-1459861757641/time-to-log.custom-log/std*

下一步

邁向雲!我們使用的是本地 Data Flow 伺服器。還有其他實現可用於像 Cloud Foundry 這樣的處理平臺。Cloud Foundry Data Flow Server 會啟動應用程式例項,而不是本地 Java 程序。現在,構建一個可擴充套件的資料攝取和處理流就像 cf push ..cf scale -i $MOAR 一樣簡單!

我們只使用了 Spring Cloud Data Flow 的一小部分功能!使用 Spring Cloud Data Flow 可以編排任意數量由 Spring Cloud Stream 驅動的基於訊息的微服務。我建議檢視一些內建的 Spring Cloud Stream 模組以獲取靈感。

訂閱 Spring 新聞通訊

保持與 Spring 新聞通訊的連線

訂閱

搶佔先機

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一個簡單訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部