領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本文是探索基於 Java Functions 新設計的 Spring Cloud Stream 應用系列部落格的一部分。
以下是本系列部落格的所有之前部分。
在本篇博文中,我們將探討允許我們從 MySQL、PostgreSQL、MongoDB、Oracle、DB2 和 SQL Server 等資料庫捕獲資料庫變更,並透過各種訊息繫結器(如 RabbitMQ、Apache Kafka、Azure Event Hubs、Google PubSub 和 Solace PubSub+ 等)即時處理這些變更的Debezium CDC source。
我們還將揭示如何使用Analytics sink將捕獲的資料庫變更轉換為指標,並將其釋出到各種監控系統進行進一步分析。
本文首先解釋了 CDC supplier
和 Analytics consumer
元件,展示瞭如何在自己的 Spring 應用中以程式設計方式定製和使用它們。接下來我們解釋了CDC source和Analytics sink是如何基於 supplier 和 consumer 構建,以提供開箱即用、即時可用的流應用。
最後,我們將演示使用Spring Cloud Data Flow (SCDF)部署即時響應資料庫更新、將變更事件轉換為分析指標並將其釋出到 Prometheus 以便使用 Grafana 進行分析和視覺化展示的流管道是多麼容易。
變更資料捕獲(CDC)是一種技術,用於觀察寫入資料庫的所有資料變更,並將其釋出為可以流式處理的事件。由於應用資料庫總是在變化,CDC 允許您對這些變更做出反應,並使您的應用能夠以與提交到資料庫相同的順序流式傳輸每個行級變更。
CDC 支援多種用例,例如:快取失效、記憶體資料檢視、更新搜尋索引、透過保持不同資料來源同步實現資料複製、即時欺詐檢測、儲存審計跟蹤、資料溯源等等。
Spring Cloud Data Flow CDC Source 應用基於Debezium構建,Debezium 是一種流行的開源、基於日誌的 CDC 實現,支援各種資料庫。CDC Source 支援多種訊息繫結器,包括 Apache Kafka、Rabbit MQ、Azure Event Hubs、Google PubSub、Solace PubSub+。
注意
CDC source
實現嵌入了Debezium Engine,它不依賴於 Apache Kafka 和 ZooKeeper!您可以將 CDC source
與任何受支援的訊息繫結器一起使用!然而,Debezium Engine
確實存在一些需要考慮的限制。
CDC Debezium Supplier 被實現為一個 java.util.function.Supplier
bean,呼叫時將傳遞給定目錄中的檔案內容。該檔案 supplier 具有以下簽名
Supplier<Flux<Message<?>>>
該 supplier 的使用者可以訂閱返回的 Flux<Message<?>
,它是一個訊息流或結構複雜的CDC Change Events流。每個事件由三個部分組成(例如 metadata
、before
和 after
),如下面的 payload 樣本所示
{
"before": { ... }, // row data before the change.
"after": { ... }, // row data after the change.
"source": { // the names of the database and table where the change was made.
"connector": "mysql", "server_id": 223344,"snapshot": "false",
"name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
"db": "inventory", // source database name.
"table": "customers", // source table name.
},
"op": "u", // operation that made the change.
"ts_ms": 1607440256301, // timestamp - when the change was made.
"transaction": null // transaction information (optional).
}
如果 cdc.flattening.enabled
屬性設定為 true
,則只有 after
部分作為獨立訊息傳遞。
為了呼叫 CDC supplier,我們需要指定一個源資料庫來接收 CDC 事件。cdc.connector
屬性用於選擇受支援的源資料庫型別,包括 mysql
、postgres
、sql server
、db2
、oracle
、cassandra
和 mongo
。cdc.config.database.*
屬性用於配置源訪問。以下是連線到 MySQL 資料庫的示例配置
# DB type
cdc.connector=mysql
# DB access
cdc.config.database.user=debezium
cdc.config.database.password=dbz
cdc.config.database.hostname=localhost
cdc.config.database.port=3306
# DB source metadata
cdc.name=my-sql-connector
cdc.config.database.server.id=85744
cdc.config.database.server.name=my-app-connector
cdc.name
、cdc.config.database.server.id
和 cdc.config.database.server.name
屬性用於識別和分派傳入事件。您可以選擇設定 cdc.flattening.enabled=true
以平展 CDC 事件,用其 after 欄位替換原始變更事件,從而建立一個簡單的 Kafka 記錄。您還可以選擇使用 cdc.schema=true
將 DB schema 包含到 CDC 事件中。
注意
源資料庫必須配置為暴露其 Write-Ahead Log API
,以便 Debezium 能夠連線並消費 CDC 事件。Debezium Connector Documentation 詳細描述瞭如何為任何受支援的資料庫啟用 CDC。出於此處演示的目的,我們將使用預配置的MySQL docker image。
CDC supplier 是一個可重用的 Spring bean,我們可以將其注入到終端使用者自定義應用中。注入後,可以直接呼叫它,並與自定義資料處理相結合。以下是一個示例。
@Autowired
Supplier<Flux<Message<?>>> cdcSupplier;
public void consumeDataAndSendEmail() {
Flux<Message<?> cdcData = cdcSupplier.get();
messageFlux.subscribe(t -> {
if (t == something)
//send the email here.
}
}
}
在上面的虛擬碼中,我們注入了 CDC supplier bean,然後使用它呼叫其 get()
方法來獲取一個 Flux。然後我們訂閱該Flux,每次透過 Flux 接收到資料時,都應用一些過濾,並根據資料執行操作。這只是一個簡單的說明,展示瞭如何重用 CDC supplier。在實際應用中嘗試時,您可能需要在實現中進行更多調整,例如在進行條件檢查之前將接收到的資料的預設資料型別從 byte[]
轉換為其他型別。
提示
為了構建一個獨立的、非流式的應用,您可以利用cdc-debezium-boot-starter。只需新增 cdc-debezium-boot-starter
依賴項並實現自定義的 Consumer<SourceRecord>
handler 來處理傳入的資料庫變更事件即可。
正如本系列部落格中所見,所有開箱即用的 Spring Cloud Stream 源應用都已自動配置了幾個通用的處理器。您可以將這些處理器作為CDC source的一部分啟用。以下是一個示例,我們在其中執行 CDC source,接收資料,然後在將消費的資料傳送到中介軟體上的目的地之前對其進行轉換。
java -jar cdc-debezium-source.jar
--cdc.connector=mysql --cdc.name=my-sql-connector
--cdc.config.database.server.name=my-app-connector
--cdc.config.database.user=debezium --cdc.config.database.password=dbz
--cdc.config.database.hostname=localhost --cdc.config.database.port=3306
--cdc.schema=true
--cdc.flattening.enabled=true
--spring.cloud.function.definition=cdcSupplier|spelFunction
--spel.function.expression=payload.toUpperCase()
透過為 spring.cloud.function.definition
屬性提供值 cdcSupplier|spelFunction
,我們激活了與 CDC supplier 組合的 spel 函式。然後我們提供一個 SpEL 表示式,用於使用 spel.function.expression
轉換資料。還有其他幾種函式可以以這種方式進行組合。有關更多詳細資訊,請參閱此處。
Analytics consumer 提供了一個函式,用於從輸入資料訊息中計算分析結果,並將其作為指標釋出到各種監控系統。它利用 micrometer library 為大多數流行的監控系統提供統一的程式設計體驗,並使用 Spring Expression Language (SpEL) 來定義如何從輸入資料計算指標名稱、值和標籤。
我們可以在自定義應用中直接使用 consumer bean 來計算傳遞訊息的分析結果。以下是 Analytics consumer bean 的型別簽名
Consumer<Message<?>> analyticsConsumer
注入到自定義應用後,使用者可以直接呼叫 consumer 的 accept()
方法,並提供一個 Message<?>
物件,以計算分析結果並將其釋出到後端監控系統。
Message 是資料的通用容器。每個 Message 例項包括一個 payload
和 headers
,其中包含使用者可擴充套件的屬性,格式為鍵值對。SpEL 表示式用於訪問訊息的 headers 和 payload,以計算指標的數量和標籤。例如,計數器指標可以有一個從輸入訊息 payload 大小計算出的值 amount
,並新增一個從 kind
header 值提取的 my_tag
標籤
analytics.amount-expression=payload.lenght()
analytics.tag.expression.my_tag=headers['kind']
Analytics consumer 的配置屬性以 analytics.*
字首開頭。請參閱AnalyticsConsumerProperties瞭解可用的 analytics 屬性。監控配置屬性以 management.metrics.export
字首開頭。要配置特定的監控系統,請遵循提供的配置說明。
與 CDC Source
的情況一樣,Spring Cloud Stream 開箱即用的應用已經提供了一個基於 Analytics consumer
的Analytics sink。
該 sink 可用於 Apache Kafka 和 RabbitMQ 繫結器變體。當用作 Spring Cloud Stream sink 時,Analytics consumer 會自動配置為接受來自相應中介軟體系統的資料,例如,來自 Kafka topic 或 RabbitMQ exchange 的資料。
單獨執行 CDC source
和 Analytics sink
沒什麼問題,但 Spring Cloud Data Flow 使將它們作為管道執行變得非常容易。基本上,我們想要編排看起來像這樣的資料流
cdc-log
管道部署了一個 cdc-source
,它使用 JSON 訊息格式將所有資料庫變更流式傳輸到 log-sink
。同時,cdc-analytics-tap
管道接入 cdc-source
的輸出到 analytics-sink
,以便從 CDC 事件計算 DB 統計資料並將其釋出到時間序列資料庫 (TSDB),例如 Prometheus 或 Wavefront。Grafana 儀表盤用於視覺化這些變更。
Spring Cloud Data Flow 的安裝說明解釋瞭如何在任何受支援的雲平臺上安裝 Spring Cloud Data Flow。
下面,我們將簡要介紹設定 Spring Cloud Data Flow 的步驟。首先,我們需要獲取用於執行 Spring Cloud Data Flow、Prometheus 和 Grafana 的 docker-compose 檔案
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose.yml
wget -O docker-compose-prometheus.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose-prometheus.yml
此外,獲取這個額外的 docker-compose 檔案,用於安裝一個配置為暴露其預寫日誌(write-ahead log)的源 MySQL 資料庫,cdc-debezium 將連線到該資料庫。
wget -O mysql-cdc.yml https://gist.githubusercontent.com/tzolov/48dec8c0db44e8086916129201cc2c8c/raw/26e1bf435d58e25ff836e415dae308edeeef2784/mysql-cdc.yml
mysql-cdc 使用 debezium/example-mysql 映象,並附帶一個 inventory 示例資料庫
為了正確執行 Spring Cloud Data Flow,我們需要設定一些環境變數。
export DATAFLOW_VERSION=2.7.1
export SKIPPER_VERSION=2.6.1
export STREAM_APPS_URI=https://dataflow.spring.io/kafka-maven-latest
現在一切準備就緒,是時候開始執行 Spring Cloud Data Flow 和所有其他輔助元件了。
docker-compose -f docker-compose.yml -f docker-compose-prometheus.yml -f mysql-cdc.yml up
提示
要使用 RabbitMQ
而不是 Apache Kafka
,您可以按照RabbitMQ Instead of Kafka 說明下載額外的 docker-compose 檔案,並將 STREAM_APPS_URI
變數設定為 https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-rabbit-maven。
提示
要使用 Wavefront 而不是 Prometheus & Grafana,請遵循Wavefront 說明。
SCDF 啟動並執行後,訪問 https://:9393/dashboard。然後轉到左側的 Streams
並選擇 Create Stream
。從 source 應用中選擇 cdc-debezium
,從 sink 應用中選擇 log
和 analytics
,以定義 cdc-log = cdc-debezium | log
和 cdc-analytic-tap = :cdc-log.cdc-debezium > analytics
管道。您可以點選應用選項來選擇所需的屬性。
為了更快地啟動,您可以複製/貼上以下現成的管道定義片段
cdc-log = cdc-debezium --cdc.name=mycdc --cdc.flattening.enabled=false --cdc.connector=mysql --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.dbname=inventory --cdc.config.database.hostname=mysql-cdc --cdc.config.database.port=3307 --cdc.stream.header.offset=true --cdc.config.database.server.name=my-app-connector --cdc.config.tombstones.on.delete=false | log
cdc-analytic-tap = :cdc-log.cdc-debezium > analytics --analytics.name=cdc --analytics.tag.expression.table=#jsonPath(payload,'$..table') --analytics.tag.expression.operation=#jsonPath(payload,'$..op') --analytics.tag.expression.db=#jsonPath(payload,'$..db')
cdc-log
管道部署了一個 cdc-debezium source
,它連線到地址為 mysql-cdc:3307
的 MySQL 資料庫,並將 DB 變更事件流式傳輸到 log sink
。有關可用配置選項,請參閱cdc-debezium docs。
cdc-analytic-tap
管道接入 cdc-debezium source
的輸出,並將 cdc 事件流式傳輸到 analytics sink
。analytics 建立一個metrics counter(稱為 cdc),並使用SpEL expressions 從流式傳輸的訊息 payloads 計算指標標籤(例如 db、table 和 operations)。
例如,讓我們修改 MySQL inventory
資料庫中的 customers
表。更新事務作為變更事件傳送到 cdc-debezium source
,它將原生 DB 事件轉換為統一的訊息 payload,如下所示
{
"before": {
"id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "[email protected]"
},
"after": {
"id": 1004, "first_name": "Anne2", "last_name": "Kretchmar", "email": "[email protected]"
},
"source": {
"version": "1.3.1.Final", "connector": "mysql", "server_id": 223344, "thread": 5,
"name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
"db": "inventory",
"table": "customers",
},
"op": "u",
"ts_ms": 1607440256301,
"transaction": null
}
以下 SpEL 表示式用於從 CDC 訊息 payload 中計算 3 個標籤(db
、table
、operation
)。這些標籤會分配給釋出到 Prometheus 的每個 cdc 指標。
--analytics.tag.expression.db=#jsonPath(payload,'$..db')
--analytics.tag.expression.table=#jsonPath(payload,'$..table')
--analytics.tag.expression.operation=#jsonPath(payload,'$..op')
以下是選擇所有屬性後的截圖示例
建立並部署 cdc-log
和 cdc-analytics-tap
管道,接受所有預設選項。您也可以選擇使用 Group Actions
同時部署這兩個流。
流部署後,您可以透過 SCDF UI 或使用 Skipper docker 容器檢視已部署應用的日誌,具體方法如文件中解釋。如果您檢視 Log sink 應用的日誌,應該會看到類似於這些的 CDC JSON 訊息
接下來使用按鈕(或直接開啟 localhost:3000)進入 Grafana 儀表盤,並以使用者:admin
和密碼:admin
登入。您可以探索 Applications
儀表盤來檢查已部署管道的效能。現在您可以匯入 CDC Grafana Dashboard-Prometheus.json 儀表盤,並看到類似於此的儀表盤
以下查詢用於在 Prometheus 中聚合 cdc_total
指標
sort_desc(topk(10, sum(cdc_total) by (db, table)))
sort_desc(topk(100, sum(cdc_total) by (op)))
提示
您可以在 https://:9090 開啟 Prometheus UI,檢查配置並執行一些臨時 PQL 查詢。
您可以連線到地址為 localhost:3307
的 inventory CDC MySQL 資料庫(使用者:root
,密碼:debezium
),並開始修改資料。
以下 docker 命令展示瞭如何連線到 mysql-cdc
docker exec -it mysql-cdc mysql -uroot -pdebezium --database=inventory
以下指令碼有助於生成多個 insert、update 和 delete DB 事務
for i in {1..100}; do docker exec -it mysql-cdc mysql -uroot -pdebezium --database=inventory -e'INSERT INTO customers (first_name, last_name, email) VALUES ("value1", "value2", "val@bla"); UPDATE customers SET first_name="value2" WHERE first_name="value1"; DELETE FROM customers where first_name="value2";'; done
您將看到 log-sink
的日誌反映這些變更,以及 CDC 儀表盤圖表的更新。
在本篇部落格中,我們瞭解了 CDC-debezium supplier 和 Analytics consumer 函式及其對應的 Spring Cloud Stream source 和 sink 是如何工作的。supplier 和 consumer 函式可以注入到自定義應用中,與其他業務邏輯相結合。
source 和 sink 應用是開箱即用的,可用於 Kafka 和 RabbitMQ 中介軟體變體。
您可以輕鬆構建將 Cdc-debezium supplier 與 Geode consumer 相結合的獨立應用,以建立和維護資料庫資料的記憶體檢視。類似地,您可以將 Cdc-debezium supplier 與 Elasticsearch consumer 相結合,以即時維護資料庫資料的可搜尋索引。
更令人興奮的是,您可以使用開箱即用的 cdc-debezium source、geode sink 和 elasticsearch sink 應用實現上述場景。您可以在不同的訊息繫結器和源資料庫上構建這些管道。
這個Spring One 簡報展示了一個高階用例,使用 CDC-debezium 和機器學習構建用於信用卡欺詐檢測的流式資料管道。
本系列部落格還有幾篇後續文章即將釋出。敬請期待。