搶佔先機
VMware 提供培訓和認證,助力您快速進步。
瞭解更多到目前為止,本系列已經介紹了基於 Java 函式的流式應用程式 和 函式組合。我們還提供了詳細的示例,說明如何從 supplier 構建 source 以及如何從 consumer 構建 sink。在這裡,我們將繼續這一旅程,展示接下來的幾個案例研究中的第一個。每個案例研究都演示瞭如何在各種場景下使用一個或多個現有的預打包 Spring Boot 流式應用程式來構建資料流管道。
今天我們將展示兩個最常用的應用程式:HTTP source 和 JDBC sink。我們將使用它們構建一個簡單的服務,該服務接受 HTTP POST 請求並將內容儲存到資料庫表中。我們將首先將它們作為獨立的 Spring Cloud Stream 應用程式執行,然後展示如何使用 Spring Cloud Data Flow 編排相同的管道。本文將以分步教程的形式呈現,我們鼓勵您在閱讀時按照步驟進行操作。
這個簡單的流式應用程式由兩個透過訊息代理通訊的遠端程序組成。預打包的流式應用程式開箱即用,可與 Apache Kafka 或 RabbitMQ 一起工作。在這裡,我們將使用 Apache Kafka。JDBC sink 將資料插入到資料庫中。我們將使用 MySQL 作為此示例的資料庫。
假設我們從頭開始,開發環境中沒有 Kafka 或 MySQL。為了執行這個示例,我們將使用 Docker 來玩一下。所以我們需要在本地機器上執行 Docker。稍後我們將使用 Spring Cloud Data Flow,因此我們將利用 Data Flow 的 docker-compose 安裝。這是開始使用 Data Flow 最簡單的方法。它會啟動幾個容器,包括 MySQL 和 Kafka。為了使這些後端服務可供獨立應用程式使用,我們需要調整標準安裝以釋出埠,並更改 Kafka 的 advertised host name。
注意
我已經在 Mac OS 上執行過這種設定,預計類似的設定也可以在 Windows 上工作。如果您遇到問題或有一些有用的技巧要分享,請在評論區留言。
首先,讓我們建立一個名為 http-jdbc-demo
的目錄,然後將 docker-compose.yml
從 github 下載到該目錄中。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml
或者
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml -o docker-compose.yml
為了啟用從本地主機到 Kafka 和 MySQL 的連線,我們將下載另一部分 YAML 檔案來覆蓋或自定義配置。
wget -O shared-kafka-mysql.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/shared-kafka-mysql.yml
接下來,我們需要獲取本地機器的區域網 IP 地址。在 Mac 上,您可以通過幾種方式之一執行此操作,例如
dig +short $(hostname)
或者
ping $(hostname)
區域網 IP 地址對於 docker 容器也是可訪問的,而容器內的 localhost
或 127.0.0.1
指的是自身。我們需要將環境變數 KAFKA_ADVERTISED_HOST_NAME
設定為此值。我們還需要設定其他一些環境變數
export KAFKA_ADVERTISED_HOST_NAME=$(dig +short $(hostname))
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
並將最新的流應用程式註冊到 Data Flow 中
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
現在,從我們的專案目錄中,我們可以啟動 Data Flow 叢集
docker-compose -f docker-compose.yml -f shared-kafka-mysql.yml up
這將顯示大量的日誌訊息,並將持續執行直到您終止它(例如,Ctrl-C),這將停止所有容器。保持此終端開啟。
開啟一個新終端並輸入
docker ps
這將列出 Data Flow 叢集中正在執行的容器。稍後我們將檢視 Data Flow。此時,請確保 dataflow-kafka
容器在 PORTS
下顯示 0.0.0.0:9092→9092/tcp
,並且 dataflow-mysql
類似地顯示 0.0.0.0:3306→3306/tcp
。
我們可以配置 JDBC sink 應用程式自動初始化資料庫,但為了簡單起見,我們將手動建立它。我們可以使用任何 JDBC 資料庫工具或在 dataflow-mysql
容器內執行 mysql
來完成此操作。
docker exec -it dataflow-mysql mysql -uroot -p
系統將提示您輸入密碼。資料庫憑據配置在 docker-compose.yml 中。如果您不想檢視那裡,使用者名稱是 root
,密碼是 rootpw
。
輸入以下命令(您應該能夠複製並貼上整個內容)來建立資料庫和表。
CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
PRIMARY KEY (id));
輸入 exit;
退出。
此時,我們準備好執行 HTTP source 和 JDBC sink 了。Spring Boot 可執行 jar 已釋出到 Spring Maven 倉庫。我們需要使用 Kafka binder 構建的 jar 包。
wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/http-source-kafka/3.0.0-SNAPSHOT/http-source-kafka-3.0.0-SNAPSHOT.jar
wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/jdbc-sink-kafka/3.0.0-SNAPSHOT/jdbc-sink-kafka-3.0.0-SNAPSHOT.jar
我們將在不同的終端會話中執行這些應用程式。我們需要配置這些應用程式使用同一個 Kafka topic,我們將其命名為 jdbc-demo-topic
。Spring Cloud Stream Kafka binder 將自動建立此 topic。我們還需要配置 JDBC sink 連線到我們的資料庫,並將資料對映到我們建立的表中。我們將釋出如下所示的 JSON 資料:
{
“name”:”My Name”,
“address”: {
“street”:”My Street”,
“city”: “My City”
}
}
我們希望將這些值插入到 Demo
資料庫的 People
表中的 name
、street
和 city
列。
開啟一個下載了 jar 包的新終端會話並執行
java -jar jdbc-sink-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.consumer.table-name=People --jdbc.consumer.columns=name,city:address.city,street:address.street --spring.cloud.stream.bindings.input.destination=jdbc-demo-topic
注意用於將欄位對映到列的 jdbc.consumer.columns
語法。
開啟一個下載了 jar 包的新終端會話並執行
java -jar http-source-kafka-3.0.0-SNAPSHOT.jar --server.port=9000 --spring.cloud.stream.bindings.output.destination=jdbc-demo-topic
這裡我們將 source 的 HTTP 埠設定為 9000(預設是 8080)。另外,source 的輸出目的地與 sink 的輸入目的地匹配非常重要。
接下來,我們需要向 https://:9000 傳送一些資料。
curl https://:9000 -H'Content-Type:application/json' -d '{"name":"My Name","address":{"street":"My Street","city":"My City"}}}
再次,找到一個開啟的終端會話並
docker exec -it dataflow-mysql mysql -uroot -p
使用 rootpw
登入並查詢表
如果您看到此結果,恭喜!獨立的 Spring Cloud Stream 應用程式按預期工作。我們現在可以終止獨立應用程式(Ctrl-C)了。讓 docker-compose 程序保持執行,以便我們可以檢視 Data Flow。
正如我們所見,即使我們不必編寫任何程式碼,在“裸機”上執行這些應用程式也需要許多手動步驟。這些步驟包括:
自定義 docker-compose 配置,或另外在本地機器上安裝 kafka 和 mysql
使用 Maven URL 下載所需版本的流應用程式(我們只是恰好知道在這裡應該使用哪些版本)
確保 Spring Cloud Stream destination binding 配置正確,以便應用程式能夠通訊
查詢並閱讀 文件 以獲取配置屬性(我們已經為準備此示例做了此工作)並正確設定它們。
管理多個終端會話
在接下來的章節中,我們將看到使用 Spring Cloud Data Flow 完成此操作可以消除所有這些步驟,並提供更豐富的整體開發體驗。
要開始使用,請在 https://:9393/dashboard 開啟 Data Flow 控制檯。這將帶您進入應用程式檢視,在那裡我們可以看到已註冊的預打包應用程式。我們之前執行的 docker-compose 命令執行了此步驟,它使用了我們提供的 URL 來獲取流應用程式的最新快照版本,包括我們剛剛執行的相同 jar 檔案。
在控制檯中,從左側選單中選擇 Streams
,然後單擊 Create Streams
開啟圖形化流編輯器。
將 http source 和 jdbc sink 拖放到編輯器面板中,並使用滑鼠連線這兩個控制代碼。或者,您可以直接在頂部的文字框中鍵入 Data Flow 流定義 DSL:http | jdbc
。
接下來我們需要配置應用程式。如果您單擊其中任何一個應用程式,您將看到一個 Options
連結。開啟 JDBC sink 的選項視窗。您將看到所有可用的配置屬性及其簡要描述。以下截圖顯示了部分檢視;我們需要滾動才能看到其餘部分。
和之前一樣,我們需要提供 url、使用者名稱、密碼、表和列。在這裡,我們需要將 JDBC URL 更改為 jdbc:mariadb://mysql:3306/Demo
,因為主機名 mysql
對應於 docker-compose.yml
中定義的 mysql 服務名稱。此外,我們將 http 埠設定為 20000
,因為它在配置的已釋出埠範圍內。有關更多詳細資訊,請參閱 skipper-server 配置。
讓我們看看自動生成的流定義 DSL
http --port=20000 | jdbc --password=rootpw --username=root --url=jdbc:mariadb://mysql:3306/Demo --columns=name,city:address.city,street:address.street --table-name=People
這個 DSL 可以用於指令碼或 Data Flow 客戶端應用程式,以自動化流建立。我們的配置已完成,但是 Spring Cloud Stream destination bindings 在哪裡?我們不需要它們,因為 Data Flow 會為我們處理連線。
選擇 Create Stream
按鈕並將流命名為 http-jdbc
。
要部署流,請單擊播放按鈕
接受預設部署屬性,然後單擊頁面底部的 Deploy stream
。
如有必要,單擊 Refresh
按鈕。大約一分鐘後,您應該會看到我們的流已部署。
在這裡,我們將傳送一些不同的值到埠 20000
curl https://:20000 -H'Content-Type:application/json' -d '{"name":"Your Name","address":{"street":"Your Street","city":"Your City"}}}'
當我們再次執行查詢時,應該會看到表中添加了一條新記錄。
幹得漂亮!
細心的讀者會注意到,儘管平臺本身執行在容器中,但已部署的應用程式並沒有建立 Docker 容器。在 Data Flow 的 架構 中,Skipper 伺服器負責部署流應用程式。在本地配置中,Skipper 使用 Local Deployer 在其 localhost
上執行 jar 檔案,就像我們獨立執行應用程式時一樣。要驗證這一點,我們可以在 skipper 容器中執行 ps
命令。
docker exec -it skipper ps -ef
要檢視控制檯日誌,請使用 stdout
路徑
docker exec -it skipper more /tmp/1596916545104/http-jdbc.jdbc-v4/stdout_0.log
tail -f
命令也同樣適用。
如果部署成功,應用程式日誌也可以從 UI 中檢視。
但如果部署失敗,我們可能需要深入檢視以進行故障排除。
注意
本地 Data Flow 安裝適用於本地開發和探索,但我們不推薦用於生產環境。生產級別的 Spring Cloud Data Flow OSS 以及商業許可產品,都適用於 Kubernetes 和 Cloud Foundry。
我們剛剛詳細介紹瞭如何使用預打包的 Spring Cloud Stream 應用程式構建一個簡單的資料流管道,將透過 HTTP POST 的 JSON 內容儲存到關係型資料庫中。我們使用 Docker 和 docker-compose 安裝了本地環境,然後首先在“裸機”上部署了 source 和 sink 應用程式,然後使用 Spring Cloud Data Flow 進行部署。希望我們透過與 Spring Cloud Stream、Data Flow、Docker 容器、HTTP source 和 JDBC sink 的互動學到了一些有趣的東西。
在接下來的幾周裡,我們將為大家帶來更多 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每個案例都將探討不同的流應用程式和功能。