案例研究:關係型資料庫源和檔案匯

工程 | Soby Chacko | 2020 年 9 月 10 日 | ...

本文是探討基於 Java 函式的全新設計的 Spring Cloud Stream 應用程式的部落格系列的一部分。在本篇中,我們將探索 JDBC supplier 和基於 Spring Cloud Stream 的 source。我們將看到如何從關係型資料庫匯出資料,並使用 File Consumer 和相應的 Spring Cloud Stream File sink 將其轉儲到檔案中。我們將介紹幾種不同的執行 JDBC Source 並將資料傳送到檔案的方式。

這是本部落格系列的所有先前部分。

從 RDBMS 獲取資料

查詢資料庫並處理結果是一個非常基本的企業用例。即使追溯到資訊科技革命的主機時代,我們也能看到這種模式被廣泛使用。在過去的幾十年裡,SQL 已經成為與資料庫通訊的典型語言。Java 從其最初版本開始,就透過一個名為 Java Database Connectivity(俗稱 JDBC)的庫,為基於資料庫的企業級應用程式添加了支援。在 Java 的早期,許多應用程式都是使用原生的 JDBC 庫編寫的。Spring Framework 從一開始就透過提供基於 JDK 中 JDBC 庫的模板模式 - JdbcTemplate - 來支援處理資料庫的核心用例。Spring Data 專案在此模板的基礎上添加了許多額外功能。當 Spring Integration 出現時,它利用了 Spring 中的這種支援,並提供了許多額外的元件,以便資料可以透過通道介面卡、閘道器等方式獲取。在其最新版本中(這也是本文的主題),我們意識到可以將這些 Spring Integration 元件作為簡單的 Java supplier 來查詢資料庫。我們將詳細介紹如何訪問此 supplier,在自定義應用程式中重用它,以及將其用作 Spring Cloud Stream source。

JDBC Supplier

JDBC Supplier 是一個實現為 java.util.function.supplier bean 的元件。當呼叫時,它會從資料庫表中提供資料。JDBC supplier 具有以下簽名。

Supplier<Flux<Message<?>>> jdbcSupplier()

預設情況下,JDBC supplier 根據資料庫表的行分割資料,其中資料庫的每一行表示為一個 java.util.Map 資料結構。例如,這裡有一個填充了一些資料的資料庫表。

ID

Name

1

Bob

2

Jane

3

John

當我們對這個表呼叫 JDBC supplier 時,我們會得到一個 Flux 型別的 Message 物件,其中每個 Message 物件的 payload 是一個 Map。第一個訊息將包含一個 Map,其鍵為 IDNAME,值分別為 1Bob。第二個訊息將包含具有相同鍵但值分別為 2Jane 的 Map,依此類推。我們也可以要求提供資料時不將它們分割成單獨的訊息。為此,我們可以使用屬性 jdbc.supplier.split 並將其設定為 false(預設值為 true)。當我們停用分割時,與上面的 JDBC supplier 簽名有一個重要區別。其簽名變為 Supplier<Message<?>>,並且不再是單獨的 Message<Map>,而是得到一個單一的 Message<List<Map>。如果將此應用於上面的示例,我們將得到一個包含 3 個元素的列表,每個元素都包含一個表示資料庫表每一行的 Map

JDBC Supplier 需要在每次呼叫時執行一個 SQL 查詢。此查詢是強制性的,必須透過屬性 jdbc.supplier.query 提供。我們還可以使用屬性 jdbc.supplier.update 強制 supplier 忽略已經讀取的行。本文後面將看到如何實現此操作的示例。

在自定義應用程式中重用 JDBC Supplier

上面提到的兩種 Supplier bean——預設啟用資料分割的和我們停用資料分割的——都被命名為 jdbcSupplier。我們可以在自定義應用程式中以該名稱限定注入它們。我們需要根據是否分割資料來確保使用正確的型別。例如,如果我們採用預設設定(即分割資料),那麼可以按如下方式自動注入 JDBC supplier。

@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;

另一方面,如果使用屬性 jdbc.supplier.split 停用分割,那麼需要以 Supplier<Message<?> 型別注入它。

注入後,我們可以呼叫 Supplierget 方法,然後開始接收資料。

File Consumer

與我們在上一篇部落格中看到的 File Supplier 類似,File Consumer 也是一個可重用 bean,我們可以將其注入到自定義應用程式中,並用它在目錄中建立新檔案。該 bean 實現為 java.util.function.Consumer。對於初學者來說,這可能會引起一些困惑,為什麼它被稱為 consumer,但實際上並沒有從檔案中消費任何東西。雖然名稱和實現是 consumer,但檔案 consumer 並非是輪詢或讀取檔案的 consumer,而是接收資料並寫入檔案的 consumer。“消費”檔案的用例由檔案 supplier 處理。

這是檔案 consumer 的型別簽名。

Consumer<Message<?>> fileConsumer()

由於它是一個 consumer,因此只適合在資料處理管道的末端使用此元件。consumer 接受輸入資料並將其寫入檔案。當我們將檔案 consumer 與 Spring Cloud Stream 透過 binder 實現結合時,它就變成了一個 sink 應用程式,從諸如 Kafka topic 或 RabbitMQ exchange 等中介軟體目標消費資料。

檔案 consumer 在幾種企業用例中非常有用。對於任何有新資料可用時建立或附加到檔案的業務場景,檔案 consumer 都很有用。

使用檔案 consumer 時,我們可以使用屬性 file.consumer.directoryfile.consumer.name 分別提供要寫入的檔案目錄和檔名。如果不設定這些屬性,它將使用 consumer 設定的預設值。預設情況下,資料將附加到檔案中,這可以透過屬性 file.consumer.mode 進行更改。有關更多詳細資訊,請參閱 FileConsumerProperties 的配置選項。

執行應用程式

當與 Spring Cloud Stream 結合時,上述功能元件變得更加強大。這些函式可以在訊息傳遞應用程式中使用,而 Spring Cloud Stream 使其更容易以與中介軟體無關的方式使用。JDBC Supplier 用於構建 JDBC source,該 source 可以與許多不同的中介軟體系統配合工作。同樣,File Consumer 被用作 File Sink 應用程式的基礎,該應用程式也可以與不同的訊息系統配合工作。

在下面的章節中,我們將獨立執行這些應用程式並驗證它們是否按預期工作。

設定 Apache Kafka 和 MySQL 資料庫

首先,建立一個新目錄 jdbc-file-demo

mkdir jdbc-file-demo && cd jdbc-file-demo

我們將執行 jdbc-souce 和 file-sink 應用程式的 Kafka 版本。我們將使用 Apache Kafka 作為中介軟體來執行這些應用程式。對於 JDBC source,我們將使用 MySQL 作為資料庫。我們為 Kafka 和 MySQL 提供了一個方便的 docker-compose 指令碼。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/kafka-mysql-only.yml

啟動 docker 容器

docker-compose up

執行 docker ps 並確保看到所有三個元件都已啟動並執行(Kafka、Zookeeper 和 MySQL)。

現在我們已經準備好必要的基礎設施,接下來讓我們在執行應用程式之前設定好 MySQL 資料庫。

docker exec -it jdbc-file-blog-mysql mysql -uroot -p

使用 rootpw 作為密碼。在終端中輸入以下命令來設定資料庫和表。

CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
	 id INT NOT NULL AUTO_INCREMENT,
	 name VARCHAR(255) NOT NULL,
	 street VARCHAR(255) NOT NULL,
	 city VARCHAR(255) NOT NULL,
	 tag CHAR(1),
	 PRIMARY KEY (id));

上邊的 schema 很容易理解,但 tag 列需要一些解釋。它用於避免重複讀取已經讀取過的表資料。其思想是,我們更新查詢返回的每一行的 tag 列,這樣它就不會包含在後續查詢中。我們將在下面看到詳細資訊。

將進入 MySQL 的終端會話保持開啟狀態,因為稍後我們會回到那裡。

作為獨立應用程式執行 file sink

讓我們獨立執行現成的 file sink。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-sink-kafka/3.0.0-SNAPSHOT/file-sink-kafka-3.0.0-SNAPSHOT.jar

然後按如下方式執行。

java -jar file-sink-kafka-3.0.0-SNAPSHOT.jar --file.consumer.directory=/tmp/processed-file --file.consumer.name=output.txt --spring.cloud.stream.bindings.input.destination=jdbc-file-demo

讓我們詳細瞭解一下我們要做的事情。我們要求 file sink 應用程式從 Kafka topic jdbc-file-demo 中消費資料,然後在檔案系統上的目錄 /tmp/processed-file 中生成一個名為 output.txt 的檔案。預設情況下,每個傳入的 Kafka topic 記錄都會作為新行附加到檔案中。如果將 file.consumer.binary 的值設定為 true,則檔案將以二進位制形式寫入。您可以在此處找到所有可用的配置。

執行 JDBC Source

與執行 file sink 的方式類似,現在我們將獲取並執行基於 Kafka 的 jdbc source 應用程式。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/jdbc-source-kafka/3.0.0-SNAPSHOT/jdbc-source-kafka-3.0.0-SNAPSHOT.jar

然後執行,

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0

我們為 JDBC Source 提供了以下配置資訊。

  • 資料來源 URL - 在本例中是用於我們執行的 MySQL 版本的 JDBC URL。

  • 資料來源使用者憑據

  • source 應用程式要執行的 SQL 查詢

  • 傳送資料的 Kafka topic(這設定為與 file-sink 消費資料相同的 topic)

  • 用於標記記錄的 update SQL 語句

請注意,當我們之前建立表時,我們添加了一個名為 tag 的列,以避免讀取我們已經讀取過的重複記錄。我們的主要 SQL 查詢(透過屬性 jdbc.supplier.query)將僅讀取 tag 值不為空的記錄。然後每次 source 讀取一條記錄時,tag 都會更新為值 1,以便下次執行查詢時跳過該記錄。如果不透過 jdbc.supplier.update 提供 update 語句,就像我們上面做的那樣,每次查詢都會提供表中的所有記錄。如果我們不提供此語句並且仍然想避免重複,那麼我們需要使用一些複雜的策略,例如使用元資料儲存來跟蹤我們到目前為止已經消費了哪些記錄。提供一個支援標誌(像我們示例中的 tag)的 schema,然後在每次讀取時更新它,是避免重複的一種更容易的策略。

輪詢 JDBC Source

JDBC Source 使用 poller 進行呼叫。這與在自定義的非 Spring Cloud Stream 應用程式中直接使用 JDBC Supplier 不同,在後者的情況下,必須手動呼叫 supplier。預設情況下,Spring Cloud Stream 為 JDBC Source 提供一個 poller,它每秒輪詢一次。此排程可以透過使用屬性 spring.cloud.stream.poller.fixedDelay 進行更改。有關輪詢的更多控制,可以在此處找到。

驗證流程

現在我們正在執行這兩個應用程式,讓我們向表中插入資料。

轉到您的 MySQL 終端會話並輸入以下 insert 語句。

mysql> insert into People values (1, 'Bob', 'First Street', 'First City', NULL);

現在轉到 file-sink 寫入檔案的目錄 /tmp/processed-file,查詢名為 output.txt 的檔案。開啟檔案並驗證其內容。它應該包含以下內容。

 {"id":1,"name":"Bob","street":"First Street","city":"First City"}

向表中填充更多資料。

mysql> insert into People values (2, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (3, 'Mary', 'First Street', 'First City', NULL);

驗證我們是否在 output.txt 檔案中看到了新資料。

執行帶過濾器的 JDBC Source

停止執行 JDBC Source 應用程式並按如下方式重新執行。

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"

正如我們在之前的部落格中以及此處所解釋的,所有現成的 source 都自動配置了許多有用的函式,因此您可以透過屬性啟用它們。在使用上述新配置選項執行時,我們將 filterFunctionjdbcSupplier 進行組合,從而為 JDBC Supplier 生成的資料添加了過濾功能。我們使用屬性 spring.cloud.stream.function.definition 並將其值設定為 jdbcSupplier|filterFunction 來實現組合。然後透過屬性 filter.function.expression 提供一個 JSONPath 表示式來過濾掉所有偶數 ID。

現在,如果您向表中插入更多資料,您將看到只有 ID 為奇數的記錄被寫入檔案。

嘗試將這些記錄輸入到表中。

mysql> insert into People values (200, 'John', 'First Street', 'First City', NULL);
mysql> insert into People values (201, 'Mary', 'First Street', 'First City', NULL);
mysql> insert into People values (202, 'Alice', 'First Street', 'First City', NULL);
mysql> insert into People values (203, 'Bob', 'First Street', 'First City', NULL);
mysql> insert into People values (204, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (205, 'Doe', 'First Street', 'First City', NULL);

我們將看到檔案中不包含 ID 為 200、202 和 204 的記錄,因為它們被過濾掉了。

執行帶其他資料庫的 JDBC Source

JDBC supplier 隨附了流行的開源 JDBC 驅動程式。目前,它包含用於 MySQL、PostgreSQL 和 Microsoft SQL Server 資料庫的驅動程式。這使我們能夠快速切換針對特定資料庫(例如 MySQL)執行的同一個 JDBC Source 應用程式,使其針對 PostgreSQL 執行,而無需進行任何程式碼更改,只需在部署時更改配置即可。讓我們以我們之前針對 MySQL 執行的 JDBC Source 為例,這次針對 PostgreSQL 執行。

首先,我們將在 docker 容器中執行 PostgreSQL。

docker run --rm   --name pg-docker -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=demo  -d -p 5432:5432  postgres

登入到 psql 會話(或使用 PGAdmin 等 UI 工具)。

docker run -it --rm --network host postgres psql -h localhost -d demo -U test

使用 test 作為密碼。

然後建立此表

CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));

停止當前的 JDBC Source 並使用以下配置選項重新執行它

java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:postgresql://:5432/demo --spring.datasource.username=test --spring.datasource.password=test --jdbc.supplier.query="select id, name, street, city from people where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update people set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"

這與我們第二次針對 MySQL 執行 JDBC source 時使用的配置選項大致相同,但這次資料來源屬性已更改為針對 PostgreSQL 資料庫執行。

在 psql 提示符下插入與之前使用 MySQL 時相同的資料。您將注意到只有 ID 為奇數的資料被附加到檔案中。

向 JDBC Supplier 新增商業資料庫驅動程式

如果我們想新增商業資料庫的 JDBC 驅動程式,則需要手動進行這些更改。操作步驟很簡單,如下所示。

  • 克隆 stream-application 倉庫

  • maven 配置中新增我們想要的驅動程式(例如 Oracle JDBC 驅動程式)作為依賴項。將其 scope 設定為 runtime

  • 從倉庫根目錄執行:./mvnw clean install -pl :jdbc-supplier

  • 生成包含 supplier 更改的應用程式:./mvnw clean install -pl :jdbc-source

  • cd applications/source/jdbc-source/apps - 在此處,我們可以找到基於 Kafka 和 RabbitMQ 的 jdbc-source 應用程式

  • 構建我們想要的應用程式變體。

總結

本文詳細介紹了 JDBC Supplier 以及它如何在 Spring Cloud Stream JDBC Source 中使用。我們還了解了 file consumer 及其在 Spring Cloud Stream 中的 sink 對應部分。然後,我們深入探討了使用幾種變體獨立執行這些應用程式,並在此過程中探索了各種功能。最後,我們看到了如何在各種資料庫之間輕鬆切換 JDBC Source,以及如何新增新的資料庫驅動程式。

敬請關注

本系列文章將繼續更新。在接下來的幾周,我們將介紹更多的函式和應用程式。

訂閱 Spring 新聞稿

透過 Spring 新聞稿保持聯絡

訂閱

搶先一步

VMware 提供培訓和認證,助您快速發展。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,一個簡單訂閱即可獲得。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部