領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本文是關於基於 Java Functions 的全新 Spring Cloud Stream 應用程式的部落格系列的一部分。
以下是本部落格系列的所有先前部分。
在本文中,我們將介紹 Debezium CDC 源,它允許我們從 MySQL、PostgreSQL、MongoDB、Oracle、DB2 和 SQL Server 等資料庫捕獲資料庫更改,並透過 RabbitMQ、Apache Kafka、Azure Event Hubs、Google PubSub 和 Solace PubSub+ 等各種訊息繫結程式即時處理這些更改。
此外,我們將揭示如何使用 Analytics sink 將捕獲的資料庫更改轉換為指標,並將其釋出到各種監控系統以進行進一步分析。
本文首先解釋 CDC supplier 和 Analytics consumer 元件,展示如何在您自己的 Spring 應用程式中以程式設計方式進行自定義和使用它們。接下來,我們將解釋 CDC 源 和 Analytics sink 如何構建在 supplier 和 consumer 之上,提供開箱即用、立即可用的流式應用程式。
最後,我們將演示如何輕鬆使用 Spring Cloud Data Flow (SCDF) 來部署響應資料庫更新、將更改事件轉換為分析指標併發布到 Prometheus 以便與 Grafana 進行分析和視覺化進行即時處理的流式管道。
資料更改捕獲 (CDC) 是一種用於觀察寫入資料庫的所有資料更改並將它們作為事件釋出以進行流式處理的技術。由於您的應用程式資料庫一直在變化,CDC 允許您對這些更改做出反應,並讓您的應用程式以與提交到資料庫相同的順序流式傳輸每一行級別的更改。
CDC 支援多種用例,例如:快取無效化、記憶體資料檢視、更新搜尋索引、透過保持不同資料來源同步來進行資料複製、即時欺詐檢測、儲存審計跟蹤、資料來源等等。
Spring Cloud Data Flow CDC Source 應用程式是圍繞 Debezium 構建的,Debezium 是一個流行的開源日誌式 CDC 實現,支援各種資料庫。CDC Source 支援多種訊息繫結程式,包括 Apache Kafka、Rabbit MQ、Azure Event Hubs、Google PubSub、Solace PubSub+。
注意
CDC source 實現嵌入了 Debezium Engine,並且不依賴於 Apache Kafka 或 ZooKeeper!您可以將 CDC source 與任何支援的訊息繫結程式一起使用!不過,Debezium Engine 存在一些 侷限性 需要考慮。
CDC Debezium Supplier 實現為 java.util.function.Supplier bean,當呼叫時,它將提供給定目錄中檔案的內容。檔案 supplier 具有以下簽名:
Supplier<Flux<Message<?>>>
Supplier 的使用者可以訂閱返回的 Flux<Message<?>,這是一個訊息流或 CDC 更改事件,它們具有複雜的結構。每個事件包含三個部分(例如 metadata、before 和 after),如下面的 payload 示例所示:
{
"before": { ... }, // row data before the change.
"after": { ... }, // row data after the change.
"source": { // the names of the database and table where the change was made.
"connector": "mysql", "server_id": 223344,"snapshot": "false",
"name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
"db": "inventory", // source database name.
"table": "customers", // source table name.
},
"op": "u", // operation that made the change.
"ts_ms": 1607440256301, // timestamp - when the change was made.
"transaction": null // transaction information (optional).
}
如果 cdc.flattening.enabled 屬性設定為 true,則只有 after 部分作為獨立訊息傳遞。
為了呼叫 CDC supplier,我們需要指定一個源資料庫來接收 CDC 事件。cdc.connector 屬性用於在支援的 mysql、postgres、sql server、db2、oracle、cassandra 和 mongo 源資料庫型別之間進行選擇。cdc.config.database.* 屬性有助於配置源訪問。以下是連線到 MySQL 資料庫的示例配置:
# DB type
cdc.connector=mysql
# DB access
cdc.config.database.user=debezium
cdc.config.database.password=dbz
cdc.config.database.hostname=localhost
cdc.config.database.port=3306
# DB source metadata
cdc.name=my-sql-connector
cdc.config.database.server.id=85744
cdc.config.database.server.name=my-app-connector
cdc.name、cdc.config.database.server.id 和 cdc.config.database.server.name 屬性用於標識和分發傳入的事件。可選地,您可以設定 cdc.flattening.enabled=true 來展平 CDC 事件,用其 after 欄位替換原始更改事件,從而建立一個簡單的 Kafka 記錄。可選地,使用 cdc.schema=true 將 DB schema 包含到 CDC 事件中。
注意
源資料庫必須配置為公開其 Write-Ahead Log API,以便 Debezium 能夠連線並消耗 CDC 事件。 Debezium Connector Documentation 提供了關於如何為任何支援的資料庫啟用 CDC 的詳細描述。出於演示目的,我們將使用預配置的 MySQL docker 映象。
CDC supplier 是一個可重用的 Spring bean,我們可以將其注入到終端使用者的自定義應用程式中。注入後,可以直接呼叫它並結合自定義資料處理。這是一個示例。
@Autowired
Supplier<Flux<Message<?>>> cdcSupplier;
public void consumeDataAndSendEmail() {
Flux<Message<?> cdcData = cdcSupplier.get();
messageFlux.subscribe(t -> {
if (t == something)
//send the email here.
}
}
}
在上面的虛擬碼中,我們注入了 CDC supplier bean,然後使用它來呼叫其 get() 方法獲取 Flux。然後我們訂閱該 Flux,每次收到 Flux 中的任何資料時,應用一些過濾,並根據這些資料採取行動。這只是一個簡單的說明,展示了我們如何重用 CDC supplier。當您在實際應用程式中嘗試此操作時,可能需要對實現進行更多調整,例如在進行條件檢查之前將接收資料的預設資料型別從 byte[] 轉換為其他型別。
提示
要構建獨立的、非流式應用程式,您可以利用 cdc-debezium-boot-starter。只需新增 cdc-debezium-boot-starter 依賴項,並實現自定義的 Consumer<SourceRecord> 處理程式來處理傳入的資料庫更改事件。
正如我們在本部落格系列中所見,所有開箱即用的 Spring Cloud Stream 源應用程式都已通過幾個通用的預置處理器進行了自動配置。您可以將這些處理器作為 CDC source 的一部分進行啟用。這是一個示例,其中我們執行 CDC 源並接收資料,然後在傳送到中介軟體目標之前轉換消耗的資料。
java -jar cdc-debezium-source.jar
--cdc.connector=mysql --cdc.name=my-sql-connector
--cdc.config.database.server.name=my-app-connector
--cdc.config.database.user=debezium --cdc.config.database.password=dbz
--cdc.config.database.hostname=localhost --cdc.config.database.port=3306
--cdc.schema=true
--cdc.flattening.enabled=true
--spring.cloud.function.definition=cdcSupplier|spelFunction
--spel.function.expression=payload.toUpperCase()
透過為 spring.cloud.function.definition 屬性提供值 cdcSupplier|spelFunction,我們激活了與 CDC supplier 組合的 spel 函式。然後,我們使用 spel.function.expression 提供用於轉換資料的 SpEL 表示式。還有許多其他函式可以這樣組合。有關更多詳細資訊,請參閱 此處。
Analytics consumer 提供了一個函式,該函式從輸入資料訊息計算分析,並將它們作為指標釋出到各種監控系統。它利用 micrometer 庫 在大多數流行的 監控系統 中提供統一的程式設計體驗,並使用 Spring Expression Language (SpEL) 來定義如何從輸入資料計算指標名稱、值和標籤。
我們可以在自定義應用程式中直接使用 consumer bean 來計算傳遞訊息的分析。以下是 Analytics consumer bean 的型別簽名:
Consumer<Message<?>> analyticsConsumer
一旦注入到自定義應用程式中,使用者就可以直接呼叫 consumer 的 accept() 方法,並提供一個 Message<?> 物件來計算分析並將它們釋出到後端監控系統。
Message 是資料的通用容器。每個 Message 例項都包含一個 payload 和 headers,其中包含使用者可擴充套件的鍵值對屬性。SpEL 表示式用於訪問訊息的 headers 和 payload 來計算指標數量和標籤。例如,計數器指標可以有一個值 amount,該值由輸入訊息 payload 的大小計算得出,並新增一個 my_tag 標籤,該標籤從 kind header 值中提取:
analytics.amount-expression=payload.lenght()
analytics.tag.expression.my_tag=headers['kind']
Analytics consumer 的配置屬性以 analytics.* 字首開頭。有關可用的分析屬性,請參閱 AnalyticsConsumerProperties。監控配置屬性以 management.metrics.export 字首開頭。要配置特定的監控系統,請遵循提供的 配置說明。
與 CDC Source 一樣,Spring Cloud Stream 的開箱即用應用程式已經提供了一個基於 Analytics consumer 的 Analytics sink。
該 sink 可用於 Apache Kafka 和 RabbitMQ 繫結程式變體。當用作 Spring Cloud Stream sink 時,Analytics consumer 會自動配置為接受來自相應中介軟體系統的資料,例如,來自 Kafka 主題或 RabbitMQ 交換。
單獨執行 CDC source 和 Analytics sink 已經很方便了,但 Spring Cloud Data Flow 可以讓它們作為管道輕鬆執行。基本上,我們希望編排如下所示的資料流:

cdc-log 管道部署一個 cdc-source,它將所有資料庫更改以 JSON 訊息格式流式傳輸到 log-sink。同時,cdc-analytics-tap 管道 訂閱 cdc-source 的輸出到 analytics-sink,以從 CDC 事件計算 DB 統計資訊並將其釋出到時間序列資料庫 (TSDB),例如 Prometheus 或 Wavefront。Grafana 儀表板用於視覺化這些更改。
Spring Cloud Data Flow 的 安裝說明 解釋瞭如何為任何支援的雲平臺安裝 Spring Cloud Data Flow。
下面,我們將簡要提供設定 Spring Cloud Data Flow 的步驟。首先,我們需要獲取用於執行 Spring Cloud Data Flow、Prometheus 和 Grafana 的 docker-compose 檔案。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose.yml
wget -O docker-compose-prometheus.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose-prometheus.yml
此外,獲取此額外的 docker-compose 檔案來安裝一個源 MySQL 資料庫,該資料庫配置為公開其寫前日誌,供 cdc-debezium 連線。
wget -O mysql-cdc.yml https://gist.githubusercontent.com/tzolov/48dec8c0db44e8086916129201cc2c8c/raw/26e1bf435d58e25ff836e415dae308edeeef2784/mysql-cdc.yml
mysql-cdc 使用 debezium/example-mysql 映象,並附帶一個庫存示例資料庫。

我們需要設定一些環境變數才能正確執行 Spring Cloud Data Flow。
export DATAFLOW_VERSION=2.7.1
export SKIPPER_VERSION=2.6.1
export STREAM_APPS_URI=https://dataflow.spring.io/kafka-maven-latest
現在一切準備就緒,是時候開始執行 Spring Cloud Data Flow 和所有其他輔助元件了。
docker-compose -f docker-compose.yml -f docker-compose-prometheus.yml -f mysql-cdc.yml up
提示
要使用 RabbitMQ 而不是 Apache Kafka,您可以下載一個額外的 docker-compose 檔案,如 RabbitMQ Instead of Kafka 說明中所述,並將 STREAM_APPS_URI 變數設定為 https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-rabbit-maven。
提示
要使用 Wavefront 而不是 Prometheus & Grafana,請遵循 Wavefront 說明。
SCDF 啟動並執行後,請轉到 https://:9393/dashboard。然後,在左側轉到 Streams 並選擇 Create Stream。從源應用程式中選擇 cdc-debezium,從 sink 應用程式中選擇 log 和 analytics,以定義 cdc-log = cdc-debezium | log 和 cdc-analytic-tap = :cdc-log.cdc-debezium > analytics 管道。您可以單擊應用程式的選項來選擇所需的屬性。
為了更快地啟動,您可以複製/貼上以下現成的管道定義片段:
cdc-log = cdc-debezium --cdc.name=mycdc --cdc.flattening.enabled=false --cdc.connector=mysql --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.dbname=inventory --cdc.config.database.hostname=mysql-cdc --cdc.config.database.port=3307 --cdc.stream.header.offset=true --cdc.config.database.server.name=my-app-connector --cdc.config.tombstones.on.delete=false | log
cdc-analytic-tap = :cdc-log.cdc-debezium > analytics --analytics.name=cdc --analytics.tag.expression.table=#jsonPath(payload,'$..table') --analytics.tag.expression.operation=#jsonPath(payload,'$..op') --analytics.tag.expression.db=#jsonPath(payload,'$..db')
cdc-log 管道部署一個 cdc-debezium source,該源連線到 mysql-cdc:3307 的 MySQL 資料庫,並將 DB 更改事件流式傳輸到 log sink。有關可用的配置選項,請參閱 cdc-debezium 文件。
cdc-analytic-tap 管道訂閱 cdc-debezium source 的輸出,並將 cdc 事件流式傳輸到 analytics sink。分析功能會建立一個 指標計數器(稱為 cdc),並使用 SpEL 表示式 從流式訊息 payload 計算指標標籤(例如,db、table 和 operation)。
例如,讓我們修改 MySQL inventory 資料庫中的 customers 表。更新事務作為更改事件傳送到 cdc-debezium source,後者將本地 DB 事件轉換為統一的訊息 payload,如下所示:
{
"before": {
"id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "[email protected]"
},
"after": {
"id": 1004, "first_name": "Anne2", "last_name": "Kretchmar", "email": "[email protected]"
},
"source": {
"version": "1.3.1.Final", "connector": "mysql", "server_id": 223344, "thread": 5,
"name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
"db": "inventory",
"table": "customers",
},
"op": "u",
"ts_ms": 1607440256301,
"transaction": null
}
以下 SpEL 表示式用於從 CDC 訊息 payload 計算 3 個標籤(db、table、operation)。這些標籤會分配給釋出到 Prometheus 的每個 cdc 指標。
--analytics.tag.expression.db=#jsonPath(payload,'$..db')
--analytics.tag.expression.table=#jsonPath(payload,'$..table')
--analytics.tag.expression.operation=#jsonPath(payload,'$..op')
以下是選擇所有屬性後的截圖:

建立 並 部署 cdc-log 和 cdc-analytics-tap 管道,接受所有預設選項。可選地,您可以使用 Group Actions 同時部署兩個流。
流部署後,您可以透過 SCDF UI 或使用 Skipper docker 容器 如文件中所述 來檢查已部署應用程式的日誌。如果檢查 Log sink 應用程式的日誌,您應該會看到類似的 CDC JSON 訊息:

接下來,透過 按鈕(或直接開啟 localhost:3000)訪問 Grafana 儀表板,使用使用者名稱:admin 和密碼:admin 登入。您可以瀏覽 Applications 儀表板來檢查已部署管道的效能。現在,您可以匯入 CDC Grafana Dashboard-Prometheus.json 儀表板,並看到類似以下的儀表板:

以下查詢已用於在 Prometheus 中聚合 cdc_total 指標:
sort_desc(topk(10, sum(cdc_total) by (db, table)))
sort_desc(topk(100, sum(cdc_total) by (op)))
提示
您可以在 https://:9090 開啟 Prometheus UI 來檢查配置,以及執行一些臨時的 PQL 查詢。
您可以連線到 localhost:3307 的 inventory CDC MySQL 資料庫(使用者:root 密碼:debezium)並開始修改資料。
以下 docker 命令顯示瞭如何連線到 mysql-cdc:
docker exec -it mysql-cdc mysql -uroot -pdebezium --database=inventory
以下指令碼有助於生成多個插入、更新和刪除的 DB 事務:
for i in {1..100}; do docker exec -it mysql-cdc mysql -uroot -pdebezium --database=inventory -e'INSERT INTO customers (first_name, last_name, email) VALUES ("value1", "value2", "val@bla"); UPDATE customers SET first_name="value2" WHERE first_name="value1"; DELETE FROM customers where first_name="value2";'; done
您將看到 log-sink 日誌反映了這些更改,以及 CDC 儀表板圖表的更新。
在本部落格中,我們瞭解了 CDC-debezium supplier 和 Analytics consumer 函式及其相應的 Spring Cloud Stream source 和 sink 的工作原理。supplier 和 consumer 函式可以注入到自定義應用程式中,以與其他業務邏輯結合。
source 和 sink 應用程式是開箱即用的,可用於 Kafka 和 RabbitMQ 中介軟體變體。
您可以輕鬆地構建獨立的應用程式,將 Cdc-debezium supplier 與 Geode consumer 結合起來,建立和維護資料庫資料的記憶體檢視。類似地,您可以將 Cdc-debezium supplier 與 Elasticsearch consumer 結合起來,即時維護資料庫資料的可搜尋索引。
更令人興奮的是,您可以使用 OOTB 的 cdc-debezium source、geode sink 和 elasticsearch sink 應用程式來實現上述場景。您可以在不同的訊息繫結程式和源資料庫之上構建這些管道。
這次 Spring One 演示 展示了一個高階用例,即使用 CDC-debezium 和機器學習構建用於信用卡欺詐檢測的流式資料管道。

我們還有幾期將在本部落格系列中釋出。請繼續關注。