案例研究:使用 HTTP 源和 JDBC 接收器構建和執行流應用程式

工程 | David Turanski | 2020年8月10日 | ...

簡介

到目前為止,在本系列中,我們已經介紹了基於 Java 函式的新流應用程式以及函式組合。我們還提供瞭如何從供應商構建源以及從消費者構建接收器的詳細示例。在這裡,我們繼續接下來的幾個案例研究中的第一個。每個案例研究都演示瞭如何在各種場景中使用一個或多個可用的預打包 Spring Boot 流應用程式來構建資料流管道。

今天我們將展示兩個最常用的應用程式:HTTP 源JDBC 接收器。我們將使用它們構建一個簡單的服務,該服務接受 HTTP POST 請求並將內容儲存到資料庫表中。我們將首先將它們作為獨立的Spring Cloud Stream應用程式執行,然後展示如何使用Spring Cloud Data Flow編排相同的管道。本文以分步教程的形式呈現,我們鼓勵您在閱讀時遵循這些步驟。

準備環境

這個簡單的流應用程式由兩個透過訊息代理進行通訊的遠端程序組成。預打包的流應用程式可以開箱即用地與 Apache Kafka 或 RabbitMQ 配合使用。這裡我們將使用 Apache Kafka。JDBC 接收器將資料插入資料庫。在本例中,我們將使用 MySQL。

application schematic

假設我們從頭開始,開發環境中沒有 Kafka 或 MySQL。為了執行這個示例,我們將使用 Docker。因此,我們需要在本地機器上執行 Docker。稍後我們將使用 Spring Cloud Data Flow,所以我們將利用 Data Flow 的 docker-compose 安裝。這是開始使用 Data Flow 的最簡單方法。它會啟動多個容器,包括 MySQL 和 Kafka。為了使這些後端服務可用於獨立應用程式,我們需要調整標準安裝以釋出埠,並更改 Kafka 的廣告主機名。

注意

我已經在 Mac OS 上使用此設定執行,並期望類似的設定也能在 Windows 上執行。如果您遇到問題或有任何有用的提示,請在評論區留言。

首先,讓我們建立一個名為 http-jdbc-demo 的目錄,並將 docker-compose.yml 從 github 下載到該目錄。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml

curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml -o docker-compose.yml

為了能夠從本地主機連線到 Kafka 和 MySQL,我們將下載另一個 YAML 檔案來覆蓋我們的自定義配置。

wget -O shared-kafka-mysql.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/shared-kafka-mysql.yml

接下來,我們需要獲取本地機器的區域網 IP 地址。在 Mac 上,您可以透過以下幾種方式之一完成此操作,例如

dig +short $(hostname)

ping $(hostname)

區域網 IP 地址對 docker 容器也是可訪問的,而容器內部的 localhost127.0.0.1 指的是自身。我們需要將環境變數 KAFKA_ADVERTISED_HOST_NAME 設定為這個值。我們還需要設定其他幾個環境變數

export KAFKA_ADVERTISED_HOST_NAME=$(dig +short $(hostname))
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0

並註冊 Data Flow 中的最新流應用程式

export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven

現在,從我們的專案目錄中,我們可以啟動 Data Flow 叢集。

docker-compose -f docker-compose.yml -f shared-kafka-mysql.yml up

這將顯示大量日誌訊息並持續執行,直到您終止它(例如,Ctrl-C),這將停止所有容器。保持此終端開啟。

開啟一個新的終端並輸入

docker ps

這將列出 Data Flow 叢集中正在執行的容器。我們稍後會檢視 Data Flow。此時,請確保 dataflow-kafka 容器在 PORTS 下顯示 0.0.0.0:9092→9092/tcp,並且 dataflow-mysql 類似地顯示 0.0.0.0:3306→3306/tcp

docker ps

建立資料庫表

我們可以將 JDBC sink 應用程式配置為自動初始化資料庫,但為了簡單起見,我們將手動建立它。我們可以使用任何 JDBC 資料庫工具或透過在 dataflow-mysql 容器內執行 mysql 來完成此操作。

docker exec -it dataflow-mysql mysql -uroot -p

系統將提示您輸入密碼。資料庫憑據已在 docker-compose.yml 中配置。如果您不想在那裡查詢,使用者名稱是 root,密碼是 rootpw

輸入以下命令——您應該能夠複製並貼上整個內容——以建立資料庫和表。

CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
	 id INT NOT NULL AUTO_INCREMENT,
	 name VARCHAR(255) NOT NULL,
	 street VARCHAR(255) NOT NULL,
	 city VARCHAR(255) NOT NULL,
	 PRIMARY KEY (id));

輸入 exit; 退出。

create database

執行應用程式

此時,我們準備執行 HTTP 源和 JDBC 接收器。Spring Boot 可執行 JAR 已釋出到 Spring Maven 倉庫。我們需要使用 Kafka 繫結器構建的 JAR。

wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/http-source-kafka/3.0.0-SNAPSHOT/http-source-kafka-3.0.0-SNAPSHOT.jar

wget https://repo.spring.io/snapshot/org/springframework/cloud/stream/app/jdbc-sink-kafka/3.0.0-SNAPSHOT/jdbc-sink-kafka-3.0.0-SNAPSHOT.jar

我們將在單獨的終端會話中執行這些。我們需要配置這些應用程式以使用相同的 Kafka 主題,我們稱之為 jdbc-demo-topic。Spring Cloud Stream Kafka 繫結器將自動建立此主題。我們還需要配置 JDBC 接收器以連線到我們的資料庫並將資料對映到我們建立的表。我們將釋出如下所示的 JSON

{
 “name”:”My Name”,
 “address”: {
      “street”:”My Street”,
       “city”: “My City”
    }
}

我們要將這些值插入到 Demo 資料庫的 People 表中,插入到 namestreetcity 列中。

啟動 JDBC 接收器

開啟我們下載 jar 的新終端會話並執行

java -jar jdbc-sink-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.consumer.table-name=People --jdbc.consumer.columns=name,city:address.city,street:address.street --spring.cloud.stream.bindings.input.destination=jdbc-demo-topic

請注意用於將欄位對映到列的 jdbc.consumer.columns 語法。

啟動 HTTP 源

開啟我們下載 jar 的新終端會話並執行

java -jar http-source-kafka-3.0.0-SNAPSHOT.jar --server.port=9000 --spring.cloud.stream.bindings.output.destination=jdbc-demo-topic

這裡我們將源的 HTTP 埠設定為 9000(預設為 8080)。此外,源的輸出目的地與接收器的輸入目的地匹配非常重要。

釋出一些資料

接下來,我們需要向 https://:9000 釋出一些資料。

curl https://:9000 -H'Content-Type:application/json' -d '{"name":"My Name","address":{"street":"My Street","city":"My City"}}}

驗證資料已儲存

再次,找到一個開放的終端會話並

docker exec -it dataflow-mysql mysql -uroot -p

使用 rootpw 登入並查詢表

query standalone

如果你看到這個,恭喜你!獨立的 Spring Cloud Stream 應用程式按預期工作。我們現在可以終止我們的獨立應用程式 (Ctrl-C)。讓 docker-compose 程序保持執行,以便我們可以檢視 Data Flow。

使用 Spring Cloud Data Flow

正如我們所看到的,即使我們不需要編寫任何程式碼,在“裸機”上執行這些應用程式也需要很多手動步驟。這些步驟包括

  • 自定義 docker-compose 配置,或者在本地機器上安裝 kafka 和 mysql

  • 使用 Maven URL 下載所需版本的流應用程式(我們碰巧知道這裡要使用哪些)

  • 確保 Spring Cloud Stream 目標繫結已正確配置,以便應用程式可以通訊

  • 查詢並閱讀 文件 以獲取配置屬性(我們已經完成了這些以準備此示例)並正確設定它們。

  • 管理多個終端會話

在接下來的部分中,我們將看到使用 Spring Cloud Data Flow 可以消除所有這些步驟,並提供更豐富的整體開發體驗。

開啟 Data Flow 控制面板

要開始,請開啟 Data Flow Dashboard,地址為 https://:9393/dashboard。這將帶您進入應用程式檢視,您可以在其中看到已註冊的預打包應用程式。我們之前執行的 docker-compose 命令執行了此步驟,它使用我們提供的 URL 獲取流應用程式的最新快照版本,包括我們剛剛執行的相同 jar 檔案。

datflow ui applications

建立並部署流

在控制面板中,從左側選單中選擇 Streams,然後單擊 Create Streams 以開啟圖形化流編輯器。

dataflow create stream

將 http 源和 jdbc 接收器拖放到編輯器窗格中,然後使用滑鼠連線兩個控制代碼。或者,您可以直接在頂部的文字框中鍵入 Data Flow 流定義 DSL:http | jdbc

接下來我們需要配置應用程式。如果您點選任何一個應用程式,您將看到一個 Options 連結。開啟 JDBC sink 的選項視窗。您將看到所有可用的配置屬性以及簡短描述。以下螢幕截圖顯示了部分檢視;我們需要滾動才能看到其餘部分。

datflow ui jdbc options

與之前一樣,我們需要提供 URL、使用者名稱、密碼、表和列。在這裡,我們需要將 JDBC URL 更改為 jdbc:mariadb://mysql:3306/Demo,因為主機名 mysql 對應於 docker-compose.yml 中定義的 mysql 服務名稱。此外,我們將 http 埠設定為 20000,因為它在已配置的已釋出埠範圍內。有關更多詳細資訊,請參閱 skipper-server 配置

![]](https://github.com/spring-cloud/stream-applications/blob/gh-pages/img/http-jdbc/datflow-ui-stream-configured.png?raw=true)

讓我們看看自動生成的流定義 DSL

http --port=20000 | jdbc --password=rootpw --username=root --url=jdbc:mariadb://mysql:3306/Demo --columns=name,city:address.city,street:address.street --table-name=People

此 DSL 可用於指令碼或 Data Flow 客戶端應用程式以自動化流建立。我們的配置已完成,但 Spring Cloud Stream 目標繫結在哪裡?我們不需要它們,因為 Data Flow 為我們處理了連線。

選擇 Create Stream 按鈕並將流命名為 http-jdbc

dataflow ui deploy 1

要部署流,請單擊播放按鈕

play button

接受預設部署屬性,然後單擊頁面底部的 Deploy stream

根據需要點選 Refresh 按鈕。大約一分鐘後,您應該會看到我們的流已部署。

dataflow ui deploy 2

釋出一些資料並驗證其是否已儲存

這裡我們將向埠 20000 釋出一些不同的值

curl https://:20000 -H'Content-Type:application/json' -d '{"name":"Your Name","address":{"street":"Your Street","city":"Your City"}}}'

當我們再次執行查詢時,我們應該會看到表中添加了一行新資料。

query dataflow

幹得好!

資料流應用程式部署

敏銳的讀者會注意到,即使平臺本身在容器中執行,也沒有為部署的應用程式建立 Docker 容器。在 Data Flow 架構中,Skipper 伺服器負責部署流應用程式。在本地配置中,Skipper 使用本地部署器在其 localhost 上執行 jar 檔案,就像我們執行獨立應用程式時一樣。為了證明這一點,我們可以在 Skipper 容器中執行 ps

docker exec -it skipper ps -ef

docker exec ps

要檢視控制檯日誌,請使用 stdout 路徑

docker exec -it skipper more /tmp/1596916545104/http-jdbc.jdbc-v4/stdout_0.log

tail -f 命令也適用。

如果部署成功,應用程式日誌也可以從 UI 中檢視。

dataflow ui app log

但如果部署失敗,我們可能需要深入研究以解決問題。

注意

本地 Data Flow 安裝適用於本地開發和探索,但我們不建議將其用於生產環境。生產級別的 Spring Cloud Data Flow OSS 以及商業許可產品可用於 KubernetesCloud Foundry

總結

我們剛剛仔細研究瞭如何使用預打包的 Spring Cloud Stream 應用程式構建一個簡單的資料流管道,以將透過 HTTP 釋出​​的 JSON 內容儲存到關係資料庫中。我們使用 Docker 和 docker-compose 安裝了一個本地環境,然後我們部署了源和接收器應用程式,首先在“裸機”上,然後使用 Spring Cloud Data Flow。希望我們學到了一些關於使用 Spring Cloud Stream、Data Flow、Docker 容器、HTTP 源和 JDBC 接收器有趣的知識。

敬請關注…​

在接下來的幾周,我們將展示更多 Spring Cloud Stream 和 Spring Cloud Data Flow 的案例研究,每個案例都將探索不同的流應用程式和功能。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有