搶先一步
VMware 提供培訓和認證,助您快速發展。
瞭解更多本文是探討基於 Java 函式的全新設計的 Spring Cloud Stream 應用程式的部落格系列的一部分。在本篇中,我們將探索 JDBC supplier 和基於 Spring Cloud Stream 的 source。我們將看到如何從關係型資料庫匯出資料,並使用 File Consumer 和相應的 Spring Cloud Stream File sink 將其轉儲到檔案中。我們將介紹幾種不同的執行 JDBC Source 並將資料傳送到檔案的方式。
這是本部落格系列的所有先前部分。
查詢資料庫並處理結果是一個非常基本的企業用例。即使追溯到資訊科技革命的主機時代,我們也能看到這種模式被廣泛使用。在過去的幾十年裡,SQL 已經成為與資料庫通訊的典型語言。Java 從其最初版本開始,就透過一個名為 Java Database Connectivity(俗稱 JDBC)的庫,為基於資料庫的企業級應用程式添加了支援。在 Java 的早期,許多應用程式都是使用原生的 JDBC 庫編寫的。Spring Framework 從一開始就透過提供基於 JDK 中 JDBC 庫的模板模式 - JdbcTemplate - 來支援處理資料庫的核心用例。Spring Data 專案在此模板的基礎上添加了許多額外功能。當 Spring Integration 出現時,它利用了 Spring 中的這種支援,並提供了許多額外的元件,以便資料可以透過通道介面卡、閘道器等方式獲取。在其最新版本中(這也是本文的主題),我們意識到可以將這些 Spring Integration 元件作為簡單的 Java supplier 來查詢資料庫。我們將詳細介紹如何訪問此 supplier,在自定義應用程式中重用它,以及將其用作 Spring Cloud Stream source。
JDBC Supplier 是一個實現為 java.util.function.supplier
bean 的元件。當呼叫時,它會從資料庫表中提供資料。JDBC supplier 具有以下簽名。
Supplier<Flux<Message<?>>> jdbcSupplier()
預設情況下,JDBC supplier 根據資料庫表的行分割資料,其中資料庫的每一行表示為一個 java.util.Map
資料結構。例如,這裡有一個填充了一些資料的資料庫表。
ID
Name
1
Bob
2
Jane
3
John
當我們對這個表呼叫 JDBC supplier 時,我們會得到一個 Flux 型別的 Message 物件,其中每個 Message 物件的 payload 是一個 Map
。第一個訊息將包含一個 Map,其鍵為 ID
和 NAME
,值分別為 1
和 Bob
。第二個訊息將包含具有相同鍵但值分別為 2
和 Jane
的 Map,依此類推。我們也可以要求提供資料時不將它們分割成單獨的訊息。為此,我們可以使用屬性 jdbc.supplier.split
並將其設定為 false
(預設值為 true
)。當我們停用分割時,與上面的 JDBC supplier 簽名有一個重要區別。其簽名變為 Supplier<Message<?>>
,並且不再是單獨的 Message<Map>
,而是得到一個單一的 Message<List<Map>
。如果將此應用於上面的示例,我們將得到一個包含 3 個元素的列表,每個元素都包含一個表示資料庫表每一行的 Map
。
JDBC Supplier 需要在每次呼叫時執行一個 SQL 查詢。此查詢是強制性的,必須透過屬性 jdbc.supplier.query
提供。我們還可以使用屬性 jdbc.supplier.update
強制 supplier 忽略已經讀取的行。本文後面將看到如何實現此操作的示例。
上面提到的兩種 Supplier
bean——預設啟用資料分割的和我們停用資料分割的——都被命名為 jdbcSupplier
。我們可以在自定義應用程式中以該名稱限定注入它們。我們需要根據是否分割資料來確保使用正確的型別。例如,如果我們採用預設設定(即分割資料),那麼可以按如下方式自動注入 JDBC supplier。
@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;
另一方面,如果使用屬性 jdbc.supplier.split
停用分割,那麼需要以 Supplier<Message<?>
型別注入它。
注入後,我們可以呼叫 Supplier
的 get
方法,然後開始接收資料。
與我們在上一篇部落格中看到的 File Supplier 類似,File Consumer 也是一個可重用 bean,我們可以將其注入到自定義應用程式中,並用它在目錄中建立新檔案。該 bean 實現為 java.util.function.Consumer
。對於初學者來說,這可能會引起一些困惑,為什麼它被稱為 consumer,但實際上並沒有從檔案中消費任何東西。雖然名稱和實現是 consumer,但檔案 consumer 並非是輪詢或讀取檔案的 consumer,而是接收資料並寫入檔案的 consumer。“消費”檔案的用例由檔案 supplier 處理。
這是檔案 consumer 的型別簽名。
Consumer<Message<?>> fileConsumer()
由於它是一個 consumer,因此只適合在資料處理管道的末端使用此元件。consumer 接受輸入資料並將其寫入檔案。當我們將檔案 consumer 與 Spring Cloud Stream 透過 binder 實現結合時,它就變成了一個 sink 應用程式,從諸如 Kafka topic 或 RabbitMQ exchange 等中介軟體目標消費資料。
檔案 consumer 在幾種企業用例中非常有用。對於任何有新資料可用時建立或附加到檔案的業務場景,檔案 consumer 都很有用。
使用檔案 consumer 時,我們可以使用屬性 file.consumer.directory
和 file.consumer.name
分別提供要寫入的檔案目錄和檔名。如果不設定這些屬性,它將使用 consumer 設定的預設值。預設情況下,資料將附加到檔案中,這可以透過屬性 file.consumer.mode
進行更改。有關更多詳細資訊,請參閱 FileConsumerProperties 的配置選項。
當與 Spring Cloud Stream 結合時,上述功能元件變得更加強大。這些函式可以在訊息傳遞應用程式中使用,而 Spring Cloud Stream 使其更容易以與中介軟體無關的方式使用。JDBC Supplier 用於構建 JDBC source,該 source 可以與許多不同的中介軟體系統配合工作。同樣,File Consumer 被用作 File Sink 應用程式的基礎,該應用程式也可以與不同的訊息系統配合工作。
在下面的章節中,我們將獨立執行這些應用程式並驗證它們是否按預期工作。
首先,建立一個新目錄 jdbc-file-demo
。
mkdir jdbc-file-demo && cd jdbc-file-demo
我們將執行 jdbc-souce 和 file-sink 應用程式的 Kafka 版本。我們將使用 Apache Kafka 作為中介軟體來執行這些應用程式。對於 JDBC source,我們將使用 MySQL 作為資料庫。我們為 Kafka 和 MySQL 提供了一個方便的 docker-compose 指令碼。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/kafka-mysql-only.yml
啟動 docker 容器
docker-compose up
執行 docker ps
並確保看到所有三個元件都已啟動並執行(Kafka、Zookeeper 和 MySQL)。
現在我們已經準備好必要的基礎設施,接下來讓我們在執行應用程式之前設定好 MySQL 資料庫。
docker exec -it jdbc-file-blog-mysql mysql -uroot -p
使用 rootpw
作為密碼。在終端中輸入以下命令來設定資料庫和表。
CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
上邊的 schema 很容易理解,但 tag
列需要一些解釋。它用於避免重複讀取已經讀取過的表資料。其思想是,我們更新查詢返回的每一行的 tag
列,這樣它就不會包含在後續查詢中。我們將在下面看到詳細資訊。
將進入 MySQL 的終端會話保持開啟狀態,因為稍後我們會回到那裡。
讓我們獨立執行現成的 file sink。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-sink-kafka/3.0.0-SNAPSHOT/file-sink-kafka-3.0.0-SNAPSHOT.jar
然後按如下方式執行。
java -jar file-sink-kafka-3.0.0-SNAPSHOT.jar --file.consumer.directory=/tmp/processed-file --file.consumer.name=output.txt --spring.cloud.stream.bindings.input.destination=jdbc-file-demo
讓我們詳細瞭解一下我們要做的事情。我們要求 file sink 應用程式從 Kafka topic jdbc-file-demo
中消費資料,然後在檔案系統上的目錄 /tmp/processed-file
中生成一個名為 output.txt
的檔案。預設情況下,每個傳入的 Kafka topic 記錄都會作為新行附加到檔案中。如果將 file.consumer.binary
的值設定為 true
,則檔案將以二進位制形式寫入。您可以在此處找到所有可用的配置。
與執行 file sink 的方式類似,現在我們將獲取並執行基於 Kafka 的 jdbc source 應用程式。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/jdbc-source-kafka/3.0.0-SNAPSHOT/jdbc-source-kafka-3.0.0-SNAPSHOT.jar
然後執行,
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0
我們為 JDBC Source 提供了以下配置資訊。
資料來源 URL - 在本例中是用於我們執行的 MySQL 版本的 JDBC URL。
資料來源使用者憑據
source 應用程式要執行的 SQL 查詢
傳送資料的 Kafka topic(這設定為與 file-sink 消費資料相同的 topic)
用於標記記錄的 update SQL 語句
請注意,當我們之前建立表時,我們添加了一個名為 tag
的列,以避免讀取我們已經讀取過的重複記錄。我們的主要 SQL 查詢(透過屬性 jdbc.supplier.query
)將僅讀取 tag
值不為空的記錄。然後每次 source 讀取一條記錄時,tag 都會更新為值 1
,以便下次執行查詢時跳過該記錄。如果不透過 jdbc.supplier.update
提供 update 語句,就像我們上面做的那樣,每次查詢都會提供表中的所有記錄。如果我們不提供此語句並且仍然想避免重複,那麼我們需要使用一些複雜的策略,例如使用元資料儲存來跟蹤我們到目前為止已經消費了哪些記錄。提供一個支援標誌(像我們示例中的 tag
)的 schema,然後在每次讀取時更新它,是避免重複的一種更容易的策略。
JDBC Source 使用 poller 進行呼叫。這與在自定義的非 Spring Cloud Stream 應用程式中直接使用 JDBC Supplier 不同,在後者的情況下,必須手動呼叫 supplier。預設情況下,Spring Cloud Stream 為 JDBC Source 提供一個 poller,它每秒輪詢一次。此排程可以透過使用屬性 spring.cloud.stream.poller.fixedDelay
進行更改。有關輪詢的更多控制,可以在此處找到。
現在我們正在執行這兩個應用程式,讓我們向表中插入資料。
轉到您的 MySQL 終端會話並輸入以下 insert 語句。
mysql> insert into People values (1, 'Bob', 'First Street', 'First City', NULL);
現在轉到 file-sink 寫入檔案的目錄 /tmp/processed-file
,查詢名為 output.txt
的檔案。開啟檔案並驗證其內容。它應該包含以下內容。
{"id":1,"name":"Bob","street":"First Street","city":"First City"}
向表中填充更多資料。
mysql> insert into People values (2, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (3, 'Mary', 'First Street', 'First City', NULL);
驗證我們是否在 output.txt
檔案中看到了新資料。
停止執行 JDBC Source 應用程式並按如下方式重新執行。
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
正如我們在之前的部落格中以及此處所解釋的,所有現成的 source 都自動配置了許多有用的函式,因此您可以透過屬性啟用它們。在使用上述新配置選項執行時,我們將 filterFunction 與 jdbcSupplier
進行組合,從而為 JDBC Supplier 生成的資料添加了過濾功能。我們使用屬性 spring.cloud.stream.function.definition
並將其值設定為 jdbcSupplier|filterFunction
來實現組合。然後透過屬性 filter.function.expression
提供一個 JSONPath 表示式來過濾掉所有偶數 ID。
現在,如果您向表中插入更多資料,您將看到只有 ID 為奇數的記錄被寫入檔案。
嘗試將這些記錄輸入到表中。
mysql> insert into People values (200, 'John', 'First Street', 'First City', NULL);
mysql> insert into People values (201, 'Mary', 'First Street', 'First City', NULL);
mysql> insert into People values (202, 'Alice', 'First Street', 'First City', NULL);
mysql> insert into People values (203, 'Bob', 'First Street', 'First City', NULL);
mysql> insert into People values (204, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (205, 'Doe', 'First Street', 'First City', NULL);
我們將看到檔案中不包含 ID 為 200、202 和 204 的記錄,因為它們被過濾掉了。
JDBC supplier 隨附了流行的開源 JDBC 驅動程式。目前,它包含用於 MySQL、PostgreSQL 和 Microsoft SQL Server 資料庫的驅動程式。這使我們能夠快速切換針對特定資料庫(例如 MySQL)執行的同一個 JDBC Source 應用程式,使其針對 PostgreSQL 執行,而無需進行任何程式碼更改,只需在部署時更改配置即可。讓我們以我們之前針對 MySQL 執行的 JDBC Source 為例,這次針對 PostgreSQL 執行。
首先,我們將在 docker 容器中執行 PostgreSQL。
docker run --rm --name pg-docker -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=demo -d -p 5432:5432 postgres
登入到 psql
會話(或使用 PGAdmin 等 UI 工具)。
docker run -it --rm --network host postgres psql -h localhost -d demo -U test
使用 test
作為密碼。
然後建立此表
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
停止當前的 JDBC Source 並使用以下配置選項重新執行它
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:postgresql://:5432/demo --spring.datasource.username=test --spring.datasource.password=test --jdbc.supplier.query="select id, name, street, city from people where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update people set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
這與我們第二次針對 MySQL 執行 JDBC source 時使用的配置選項大致相同,但這次資料來源屬性已更改為針對 PostgreSQL 資料庫執行。
在 psql 提示符下插入與之前使用 MySQL 時相同的資料。您將注意到只有 ID 為奇數的資料被附加到檔案中。
如果我們想新增商業資料庫的 JDBC 驅動程式,則需要手動進行這些更改。操作步驟很簡單,如下所示。
克隆 stream-application 倉庫
在maven 配置中新增我們想要的驅動程式(例如 Oracle JDBC 驅動程式)作為依賴項。將其 scope 設定為 runtime
。
從倉庫根目錄執行:./mvnw clean install -pl :jdbc-supplier
生成包含 supplier 更改的應用程式:./mvnw clean install -pl :jdbc-source
cd applications/source/jdbc-source/apps
- 在此處,我們可以找到基於 Kafka 和 RabbitMQ 的 jdbc-source 應用程式
構建我們想要的應用程式變體。
本文詳細介紹了 JDBC Supplier 以及它如何在 Spring Cloud Stream JDBC Source 中使用。我們還了解了 file consumer 及其在 Spring Cloud Stream 中的 sink 對應部分。然後,我們深入探討了使用幾種變體獨立執行這些應用程式,並在此過程中探索了各種功能。最後,我們看到了如何在各種資料庫之間輕鬆切換 JDBC Source,以及如何新增新的資料庫驅動程式。
本系列文章將繼續更新。在接下來的幾周,我們將介紹更多的函式和應用程式。