案例研究:使用 CDC Debezium 源和 Analytics Sink 進行即時變更資料捕獲 (CDC) 分析

工程 | Christian Tzolov | 2020 年 12 月 14 日 | ...

本文是探索基於 Java Functions 新設計的 Spring Cloud Stream 應用系列部落格的一部分。

以下是本系列部落格的所有之前部分。

在本篇博文中,我們將探討允許我們從 MySQL、PostgreSQL、MongoDB、Oracle、DB2 和 SQL Server 等資料庫捕獲資料庫變更,並透過各種訊息繫結器(如 RabbitMQ、Apache Kafka、Azure Event Hubs、Google PubSub 和 Solace PubSub+ 等)即時處理這些變更的Debezium CDC source

我們還將揭示如何使用Analytics sink將捕獲的資料庫變更轉換為指標,並將其釋出到各種監控系統進行進一步分析。

本文首先解釋了 CDC supplierAnalytics consumer 元件,展示瞭如何在自己的 Spring 應用中以程式設計方式定製和使用它們。接下來我們解釋了CDC sourceAnalytics sink是如何基於 supplier 和 consumer 構建,以提供開箱即用、即時可用的流應用。

最後,我們將演示使用Spring Cloud Data Flow (SCDF)部署即時響應資料庫更新、將變更事件轉換為分析指標並將其釋出到 Prometheus 以便使用 Grafana 進行分析和視覺化展示的流管道是多麼容易。

變更資料捕獲

變更資料捕獲(CDC)是一種技術,用於觀察寫入資料庫的所有資料變更,並將其釋出為可以流式處理的事件。由於應用資料庫總是在變化,CDC 允許您對這些變更做出反應,並使您的應用能夠以與提交到資料庫相同的順序流式傳輸每個行級變更。

CDC 支援多種用例,例如:快取失效、記憶體資料檢視、更新搜尋索引、透過保持不同資料來源同步實現資料複製、即時欺詐檢測、儲存審計跟蹤、資料溯源等等。

Spring Cloud Data Flow CDC Source 應用基於Debezium構建,Debezium 是一種流行的開源、基於日誌的 CDC 實現,支援各種資料庫。CDC Source 支援多種訊息繫結器,包括 Apache Kafka、Rabbit MQ、Azure Event Hubs、Google PubSub、Solace PubSub+。

注意

CDC source 實現嵌入了Debezium Engine,它不依賴於 Apache Kafka 和 ZooKeeper!您可以將 CDC source 與任何受支援的訊息繫結器一起使用!然而,Debezium Engine 確實存在一些需要考慮的限制

CDC Debezium Supplier

CDC Debezium Supplier 被實現為一個 java.util.function.Supplier bean,呼叫時將傳遞給定目錄中的檔案內容。該檔案 supplier 具有以下簽名

Supplier<Flux<Message<?>>>

該 supplier 的使用者可以訂閱返回的 Flux<Message<?>,它是一個訊息流或結構複雜的CDC Change Events流。每個事件由三個部分組成(例如 metadatabeforeafter),如下面的 payload 樣本所示

{
  "before": { ... },  // row data before the change.
  "after": { ... },  // row data after the change.
  "source": {  //  the names of the database and table where the change was made.
    "connector": "mysql", "server_id": 223344,"snapshot": "false",
    "name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
    "db": "inventory",  // source database name.
    "table": "customers", // source table name.
  },
  "op": "u",  // operation that made the change.
  "ts_ms": 1607440256301, // timestamp - when the change was made.
  "transaction": null  // transaction information (optional).
}

如果 cdc.flattening.enabled 屬性設定為 true,則只有 after 部分作為獨立訊息傳遞。

為了呼叫 CDC supplier,我們需要指定一個源資料庫來接收 CDC 事件。cdc.connector 屬性用於選擇受支援的源資料庫型別,包括 mysqlpostgressql serverdb2oraclecassandramongocdc.config.database.* 屬性用於配置源訪問。以下是連線到 MySQL 資料庫的示例配置

# DB type
cdc.connector=mysql

# DB access
cdc.config.database.user=debezium
cdc.config.database.password=dbz
cdc.config.database.hostname=localhost
cdc.config.database.port=3306

# DB source metadata
cdc.name=my-sql-connector
cdc.config.database.server.id=85744
cdc.config.database.server.name=my-app-connector

cdc.namecdc.config.database.server.idcdc.config.database.server.name 屬性用於識別和分派傳入事件。您可以選擇設定 cdc.flattening.enabled=true 以平展 CDC 事件,用其 after 欄位替換原始變更事件,從而建立一個簡單的 Kafka 記錄。您還可以選擇使用 cdc.schema=true 將 DB schema 包含到 CDC 事件中。

注意

源資料庫必須配置為暴露其 Write-Ahead Log API,以便 Debezium 能夠連線並消費 CDC 事件。Debezium Connector Documentation 詳細描述瞭如何為任何受支援的資料庫啟用 CDC。出於此處演示的目的,我們將使用預配置的MySQL docker image

在自定義應用中重用 CDC Supplier

CDC supplier 是一個可重用的 Spring bean,我們可以將其注入到終端使用者自定義應用中。注入後,可以直接呼叫它,並與自定義資料處理相結合。以下是一個示例。

@Autowired
Supplier<Flux<Message<?>>> cdcSupplier;

public void consumeDataAndSendEmail() {
  Flux<Message<?> cdcData = cdcSupplier.get();
  messageFlux.subscribe(t -> {
      if (t == something)
         //send the email here.
      }
  }
}

在上面的虛擬碼中,我們注入了 CDC supplier bean,然後使用它呼叫其 get() 方法來獲取一個 Flux。然後我們訂閱該Flux,每次透過 Flux 接收到資料時,都應用一些過濾,並根據資料執行操作。這只是一個簡單的說明,展示瞭如何重用 CDC supplier。在實際應用中嘗試時,您可能需要在實現中進行更多調整,例如在進行條件檢查之前將接收到的資料的預設資料型別從 byte[] 轉換為其他型別。

提示

為了構建一個獨立的、非流式的應用,您可以利用cdc-debezium-boot-starter。只需新增 cdc-debezium-boot-starter 依賴項並實現自定義的 Consumer<SourceRecord> handler 來處理傳入的資料庫變更事件即可。

CDC Debezium Source

正如本系列部落格中所見,所有開箱即用的 Spring Cloud Stream 源應用都已自動配置了幾個通用的處理器。您可以將這些處理器作為CDC source的一部分啟用。以下是一個示例,我們在其中執行 CDC source,接收資料,然後在將消費的資料傳送到中介軟體上的目的地之前對其進行轉換。

java -jar cdc-debezium-source.jar
 --cdc.connector=mysql --cdc.name=my-sql-connector
 --cdc.config.database.server.name=my-app-connector
 --cdc.config.database.user=debezium --cdc.config.database.password=dbz
 --cdc.config.database.hostname=localhost --cdc.config.database.port=3306
 --cdc.schema=true
 --cdc.flattening.enabled=true
 --spring.cloud.function.definition=cdcSupplier|spelFunction
 --spel.function.expression=payload.toUpperCase()

透過為 spring.cloud.function.definition 屬性提供值 cdcSupplier|spelFunction,我們激活了與 CDC supplier 組合的 spel 函式。然後我們提供一個 SpEL 表示式,用於使用 spel.function.expression 轉換資料。還有其他幾種函式可以以這種方式進行組合。有關更多詳細資訊,請參閱此處

Analytics Consumer

Analytics consumer 提供了一個函式,用於從輸入資料訊息中計算分析結果,並將其作為指標釋出到各種監控系統。它利用 micrometer library 為大多數流行的監控系統提供統一的程式設計體驗,並使用 Spring Expression Language (SpEL) 來定義如何從輸入資料計算指標名稱、值和標籤。

我們可以在自定義應用中直接使用 consumer bean 來計算傳遞訊息的分析結果。以下是 Analytics consumer bean 的型別簽名

Consumer<Message<?>> analyticsConsumer

注入到自定義應用後,使用者可以直接呼叫 consumer 的 accept() 方法,並提供一個 Message<?> 物件,以計算分析結果並將其釋出到後端監控系統。

Message 是資料的通用容器。每個 Message 例項包括一個 payloadheaders,其中包含使用者可擴充套件的屬性,格式為鍵值對。SpEL 表示式用於訪問訊息的 headers 和 payload,以計算指標的數量和標籤。例如,計數器指標可以有一個從輸入訊息 payload 大小計算出的值 amount,並新增一個從 kind header 值提取的 my_tag 標籤

analytics.amount-expression=payload.lenght()
analytics.tag.expression.my_tag=headers['kind']

Analytics consumer 的配置屬性以 analytics.* 字首開頭。請參閱AnalyticsConsumerProperties瞭解可用的 analytics 屬性。監控配置屬性以 management.metrics.export 字首開頭。要配置特定的監控系統,請遵循提供的配置說明

Analytics Sink

CDC Source 的情況一樣,Spring Cloud Stream 開箱即用的應用已經提供了一個基於 Analytics consumerAnalytics sink

該 sink 可用於 Apache KafkaRabbitMQ 繫結器變體。當用作 Spring Cloud Stream sink 時,Analytics consumer 會自動配置為接受來自相應中介軟體系統的資料,例如,來自 Kafka topic 或 RabbitMQ exchange 的資料。

在 Spring Cloud Data Flow 上執行

單獨執行 CDC sourceAnalytics sink 沒什麼問題,但 Spring Cloud Data Flow 使將它們作為管道執行變得非常容易。基本上,我們想要編排看起來像這樣的資料流

scdf pipelines

cdc-log 管道部署了一個 cdc-source,它使用 JSON 訊息格式將所有資料庫變更流式傳輸到 log-sink。同時,cdc-analytics-tap 管道接入 cdc-source 的輸出到 analytics-sink,以便從 CDC 事件計算 DB 統計資料並將其釋出到時間序列資料庫 (TSDB),例如 Prometheus 或 Wavefront。Grafana 儀表盤用於視覺化這些變更。

Spring Cloud Data Flow 的安裝說明解釋瞭如何在任何受支援的雲平臺上安裝 Spring Cloud Data Flow。

下面,我們將簡要介紹設定 Spring Cloud Data Flow 的步驟。首先,我們需要獲取用於執行 Spring Cloud Data Flow、Prometheus 和 Grafana 的 docker-compose 檔案

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose.yml

wget -O docker-compose-prometheus.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.7.0/spring-cloud-dataflow-server/docker-compose-prometheus.yml

此外,獲取這個額外的 docker-compose 檔案,用於安裝一個配置為暴露其預寫日誌(write-ahead log)的源 MySQL 資料庫,cdc-debezium 將連線到該資料庫。

wget -O mysql-cdc.yml https://gist.githubusercontent.com/tzolov/48dec8c0db44e8086916129201cc2c8c/raw/26e1bf435d58e25ff836e415dae308edeeef2784/mysql-cdc.yml

mysql-cdc 使用 debezium/example-mysql 映象,並附帶一個 inventory 示例資料庫

invetory db

為了正確執行 Spring Cloud Data Flow,我們需要設定一些環境變數。

export DATAFLOW_VERSION=2.7.1
export SKIPPER_VERSION=2.6.1
export STREAM_APPS_URI=https://dataflow.spring.io/kafka-maven-latest

現在一切準備就緒,是時候開始執行 Spring Cloud Data Flow 和所有其他輔助元件了。

docker-compose -f docker-compose.yml -f docker-compose-prometheus.yml -f mysql-cdc.yml up

提示

要使用 RabbitMQ 而不是 Apache Kafka,您可以按照RabbitMQ Instead of Kafka 說明下載額外的 docker-compose 檔案,並將 STREAM_APPS_URI 變數設定為 https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-rabbit-maven

提示

要使用 Wavefront 而不是 Prometheus & Grafana,請遵循Wavefront 說明。

SCDF 啟動並執行後,訪問 https://:9393/dashboard。然後轉到左側的 Streams 並選擇 Create Stream。從 source 應用中選擇 cdc-debezium,從 sink 應用中選擇 loganalytics,以定義 cdc-log = cdc-debezium | logcdc-analytic-tap = :cdc-log.cdc-debezium > analytics 管道。您可以點選應用選項來選擇所需的屬性。

為了更快地啟動,您可以複製/貼上以下現成的管道定義片段

cdc-log = cdc-debezium --cdc.name=mycdc --cdc.flattening.enabled=false --cdc.connector=mysql --cdc.config.database.user=debezium --cdc.config.database.password=dbz --cdc.config.database.dbname=inventory --cdc.config.database.hostname=mysql-cdc --cdc.config.database.port=3307 --cdc.stream.header.offset=true --cdc.config.database.server.name=my-app-connector --cdc.config.tombstones.on.delete=false | log

cdc-analytic-tap = :cdc-log.cdc-debezium > analytics --analytics.name=cdc --analytics.tag.expression.table=#jsonPath(payload,'$..table') --analytics.tag.expression.operation=#jsonPath(payload,'$..op') --analytics.tag.expression.db=#jsonPath(payload,'$..db')

cdc-log 管道部署了一個 cdc-debezium source,它連線到地址為 mysql-cdc:3307 的 MySQL 資料庫,並將 DB 變更事件流式傳輸到 log sink。有關可用配置選項,請參閱cdc-debezium docs

cdc-analytic-tap 管道接入 cdc-debezium source 的輸出,並將 cdc 事件流式傳輸到 analytics sink。analytics 建立一個metrics counter(稱為 cdc),並使用SpEL expressions 從流式傳輸的訊息 payloads 計算指標標籤(例如 db、table 和 operations)。

例如,讓我們修改 MySQL inventory 資料庫中的 customers 表。更新事務作為變更事件傳送到 cdc-debezium source,它將原生 DB 事件轉換為統一的訊息 payload,如下所示

{
  "before": {
    "id": 1004, "first_name": "Anne", "last_name": "Kretchmar", "email": "[email protected]"
  },
  "after": {
    "id": 1004, "first_name": "Anne2", "last_name": "Kretchmar", "email": "[email protected]"
  },
  "source": {
    "version": "1.3.1.Final", "connector": "mysql", "server_id": 223344, "thread": 5,
    "name": "my-app-connector", "file": "mysql-bin.000003", "pos": 355, "row": 0,
    "db": "inventory",
    "table": "customers",
  },
  "op": "u",
  "ts_ms": 1607440256301,
  "transaction": null
}

以下 SpEL 表示式用於從 CDC 訊息 payload 中計算 3 個標籤(dbtableoperation)。這些標籤會分配給釋出到 Prometheus 的每個 cdc 指標。

--analytics.tag.expression.db=#jsonPath(payload,'$..db')
--analytics.tag.expression.table=#jsonPath(payload,'$..table')
--analytics.tag.expression.operation=#jsonPath(payload,'$..op')

以下是選擇所有屬性後的截圖示例

scdf create streams

建立部署 cdc-logcdc-analytics-tap 管道,接受所有預設選項。您也可以選擇使用 Group Actions 同時部署這兩個流。

流部署後,您可以透過 SCDF UI 或使用 Skipper docker 容器檢視已部署應用的日誌,具體方法如文件中解釋。如果您檢視 Log sink 應用的日誌,應該會看到類似於這些的 CDC JSON 訊息

cdc event log

接下來使用按鈕(或直接開啟 localhost:3000)進入 Grafana 儀表盤,並以使用者:admin 和密碼:admin 登入。您可以探索 Applications 儀表盤來檢查已部署管道的效能。現在您可以匯入 CDC Grafana Dashboard-Prometheus.json 儀表盤,並看到類似於此的儀表盤

grafana dashboard

以下查詢用於在 Prometheus 中聚合 cdc_total 指標

sort_desc(topk(10, sum(cdc_total) by (db, table)))
sort_desc(topk(100, sum(cdc_total) by (op)))

提示

您可以在 https://:9090 開啟 Prometheus UI,檢查配置並執行一些臨時 PQL 查詢。

生成資料庫活動

您可以連線到地址為 localhost:3307 的 inventory CDC MySQL 資料庫(使用者:root,密碼:debezium),並開始修改資料。

以下 docker 命令展示瞭如何連線到 mysql-cdc

docker exec -it mysql-cdc  mysql -uroot -pdebezium --database=inventory

以下指令碼有助於生成多個 insert、update 和 delete DB 事務

for i in {1..100}; do docker exec -it mysql-cdc  mysql -uroot -pdebezium --database=inventory -e'INSERT INTO customers (first_name, last_name, email) VALUES ("value1", "value2", "val@bla"); UPDATE customers SET first_name="value2" WHERE first_name="value1"; DELETE FROM customers where first_name="value2";'; done

您將看到 log-sink 的日誌反映這些變更,以及 CDC 儀表盤圖表的更新。

結論與未來工作

在本篇部落格中,我們瞭解了 CDC-debezium supplier 和 Analytics consumer 函式及其對應的 Spring Cloud Stream source 和 sink 是如何工作的。supplier 和 consumer 函式可以注入到自定義應用中,與其他業務邏輯相結合。

source 和 sink 應用是開箱即用的,可用於 Kafka 和 RabbitMQ 中介軟體變體。

您可以輕鬆構建將 Cdc-debezium supplier 與 Geode consumer 相結合的獨立應用,以建立和維護資料庫資料的記憶體檢視。類似地,您可以將 Cdc-debezium supplier 與 Elasticsearch consumer 相結合,以即時維護資料庫資料的可搜尋索引。

更令人興奮的是,您可以使用開箱即用的 cdc-debezium sourcegeode sinkelasticsearch sink 應用實現上述場景。您可以在不同的訊息繫結器和源資料庫上構建這些管道。

這個Spring One 簡報展示了一個高階用例,使用 CDC-debezium 和機器學習構建用於信用卡欺詐檢測的流式資料管道。

cdc fraud detection

本系列部落格還有幾篇後續文章即將釋出。敬請期待。

獲取 Spring 時事通訊

訂閱 Spring 時事通訊,保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部