領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本文是探討基於Java函式的全新設計的Spring Cloud Stream應用程式的系列部落格文章之一。在本期中,我們將探討JDBC Supplier以及基於Spring Cloud Stream的源。我們將瞭解如何從關係資料庫中匯出資料,並使用檔案消費者和相應的Spring Cloud Stream檔案接收器將其轉儲到檔案中。我們將介紹幾種不同的執行JDBC源並將資料傳送到檔案的方式。
以下是本部落格系列的所有先前部分。
查詢資料庫並處理結果是一個非常基本的企業用例。追溯到資訊科技革命的主機時代,我們就可以看到這種模式被廣泛使用。在過去的幾十年裡,SQL已經成為與資料庫通訊的典型語言。Java從其最初的版本開始,就透過一個名為Java資料庫連線(JDBC)的庫,為基於資料庫的企業級應用提供了支援,JDBC廣為人知。在Java的早期,許多應用程式都是使用原生的JDBC庫編寫的。Spring Framework從一開始就透過提供一個基於JDK中JDBC庫的模板模式——JdbcTemplate——來支援處理資料庫的核心用例。Spring Data專案圍繞這個模板添加了許多額外的功能。當Spring Integration出現時,它在Spring中吸收了這種支援,並提供了許多額外的元件,以便資料可以透過通道介面卡、閘道器等方式獲取。在其最新版本中,也是本部落格的主題,我們意識到我們可以將這些Spring Integration元件作為簡單的Java Supplier來查詢資料庫。我們將詳細瞭解如何訪問這個Supplier,在自定義應用程式中重用它,以及將其用作Spring Cloud Stream源。
JDBC Supplier是一個實現為java.util.function.Supplier bean的元件。當被呼叫時,它將從資料庫表中提供資料。JDBC Supplier具有以下簽名。
Supplier<Flux<Message<?>>> jdbcSupplier()
預設情況下,JDBC Supplier根據資料庫表的行來拆分資料,其中資料庫的每一行都表示為一個java.util.Map資料結構。例如,這裡有一個填充了一些資料的資料庫表。
ID
姓名
1
Bob
2
Jane
3
John
當我們針對此表呼叫JDBC supplier時,我們會得到一個包含Map作為有效負載的Flux Message物件。第一個訊息將包含一個鍵為ID和NAME,值分別為1和Bob的對映。第二個訊息將包含具有相同鍵但值為2和Jane的對映,依此類推。我們還可以要求在不將資料拆分為單獨訊息的情況下提供資料。為此,我們可以使用屬性jdbc.supplier.split並將其設定為false(預設值為true)。當我們停用拆分時,與上述JDBC supplier簽名有一個重要區別。它的簽名變為Supplier<Message<?>>,並且我們得到一個單獨的Message<List<Map>,而不是單獨的Message<Map>。如果我們將此應用於上面給出的示例,我們將得到一個包含3個元素的列表,每個元素都將包含一個包含資料庫表中每行的Map。
JDBC Supplier需要在每次呼叫時執行SQL查詢。此查詢是強制性的,必須透過屬性jdbc.supplier.query提供。我們還可以使用屬性jdbc.supplier.update強制Supplier忽略已讀取的行。我們將在本文後面看到如何實現此操作的示例。
上面提到的兩種Supplier bean——一種是我們看到的預設資料拆分方式,另一種是我們停用資料拆分的方式——都命名為jdbcSupplier。我們可以用這個名稱在自定義應用程式中注入它們。我們需要根據是否正在拆分資料來確保使用正確的型別。例如,如果我們按照預設值(即拆分資料),那麼我們可以如下自動裝配JDBC supplier。
@Autowired
Supplier<Flux<Message<?>>> jdbcSupplier;
另一方面,如果我們使用屬性jdbc.supplier.split停用拆分,那麼我們需要以Supplier<Message<?>型別注入它。
一旦注入,我們就可以呼叫Supplier的get方法,然後開始接收資料。
與我們在上一篇部落格中看到的檔案Supplier類似,檔案消費者也是一個可重用的bean,我們可以將其注入到自定義應用程式中,並用它在目錄中建立新檔案。該bean被實現為java.util.function.Consumer。對於新手讀者來說,這可能會導致一些困惑,為什麼它被稱為消費者,但實際上並沒有從檔案中消費任何東西。儘管檔案消費者被命名並實現為消費者,但它不是輪詢或讀取檔案的檔案消費者,而是一個接受資料然後寫入檔案的消費者。“消費”檔案的用例由檔案Supplier處理。
這是檔案消費者的型別簽名。
Consumer<Message<?>> fileConsumer()
由於它是一個消費者,因此只有在資料處理管道的末端使用此元件才有意義。消費者接受傳入資料並使用它寫入檔案。當我們將檔案消費者透過繫結器實現與Spring Cloud Stream結合時,它就成為一個接收器應用程式,它從中介軟體目標(例如Kafka主題或RabbitMQ交換)消費資料。
在許多企業用例中,檔案消費者都很有用。對於任何在有新資料可用時建立或追加檔案的業務案例,檔案消費者都很有用。
使用檔案消費者時,我們可以分別透過屬性file.consumer.directory和file.consumer.name提供要寫入的目錄和檔名。如果我們不設定它們,它將使用消費者設定的預設值。預設情況下,資料將附加到檔案中,這可以透過屬性file.consumer.mode更改。有關更多詳細資訊,請參閱FileConsumerProperties的配置選項。
當上述功能元件與Spring Cloud Stream結合時,它們變得更加強大。這些功能可以在訊息傳遞應用程式中使用,Spring Cloud Stream使其更容易以與中介軟體無關的方式使用它們。JDBC Supplier用於構建可以與許多不同中介軟體系統一起工作的JDBC源。同樣,檔案消費者被用作檔案接收器應用程式的骨幹,它也可以與不同的訊息傳遞系統一起工作。
在以下部分中,我們將獨立執行這些應用程式並驗證它們是否按預期工作。
首先,建立一個新目錄jdbc-file-demo。
mkdir jdbc-file-demo && cd jdbc-file-demo
我們將執行jdbc-souce和file-sink應用程式的Kafka變體。我們將使用Apache Kafka作為中介軟體來執行這些應用程式。對於JDBC源,我們將使用MySQL作為資料庫。我們為Kafka和MySQL提供了方便的docker-compose指令碼。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/kafka-mysql-only.yml
啟動docker容器
docker-compose up
執行docker ps,確保您看到所有三個元件(Kafka、Zookeeper和MySQL)都已啟動並正在執行。
現在我們已經準備好了必要的基礎設施,讓我們在執行應用程式之前設定好MySQL資料庫。
docker exec -it jdbc-file-blog-mysql mysql -uroot -p
使用rootpw作為密碼。在終端中,輸入以下命令來設定資料庫和表。
CREATE DATABASE IF NOT EXISTS Demo;
USE Demo;
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
上面的模式很直觀,但tag列需要一些解釋。它用於避免從已讀取的表中讀取重複資料。其思想是,我們更新查詢返回的每一行的tag列,以便它不會包含在後續查詢中。我們將在下面看到詳細資訊。
保持MySQL終端會話開啟,我們稍後會再回來。
讓我們獨立執行開箱即用的檔案接收器。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-sink-kafka/3.0.0-SNAPSHOT/file-sink-kafka-3.0.0-SNAPSHOT.jar
然後如下執行。
java -jar file-sink-kafka-3.0.0-SNAPSHOT.jar --file.consumer.directory=/tmp/processed-file --file.consumer.name=output.txt --spring.cloud.stream.bindings.input.destination=jdbc-file-demo
讓我們詳細瞭解一下我們要做的。我們要求檔案接收器應用程式從kafka主題jdbc-file-demo消費資料,然後在檔案系統上的/tmp/processed-file目錄中生成一個名為output.txt的檔案。預設情況下,每個傳入的Kafka主題記錄都作為新行附加到檔案中。如果將file.consumer.binary值設定為true,則檔案將以二進位制形式寫入。您可以在此處找到所有可用配置。
與我們執行檔案接收器的方式類似,我們現在將獲取並執行基於Kafka的jdbc源應用程式。
wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/jdbc-source-kafka/3.0.0-SNAPSHOT/jdbc-source-kafka-3.0.0-SNAPSHOT.jar
然後執行,
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0
我們正在向JDBC Source提供以下配置資訊。
資料來源URL - 在此情況下為我們正在執行的MySQL變體的JDBC URL。
資料來源使用者憑據
源應用程式要執行的SQL查詢
傳送資料的Kafka主題(這與檔案接收器正在消費資料的主題相同)
用於標記記錄的更新SQL語句
請注意,當我們之前建立表時,我們添加了一個名為tag的列,以避免讀取我們已經讀取的重複記錄。我們的主要SQL查詢(透過屬性jdbc.supplier.query)將只讀取tag值不為空的記錄。然後,每次源讀取一條記錄時,tag都會更新為值1,這樣下次執行查詢時,該記錄就會被忽略。如果不像我們上面那樣透過jdbc.supplier.update提供更新語句,每次查詢都將提供表中的所有記錄。如果我們不提供此功能並且仍然想避免重複,那麼我們需要使用一些複雜的策略,即使用一些元資料儲存來跟蹤我們迄今為止消費了什麼。提供一個支援標誌(如我們示例中的tag)的模式,然後在每次讀取時更新它,是一種更簡單的避免重複的策略。
JDBC Source是使用輪詢器呼叫的。這與在自定義非Spring Cloud Stream應用程式中直接使用JDBC Supplier不同,在這種情況下,必須手動呼叫Supplier。預設情況下,Spring Cloud Stream為JDBC Source提供了一個輪詢器,它每秒輪詢一次。此排程可以透過使用屬性spring.cloud.stream.poller.fixedDelay進行更改。有關輪詢的更多控制可以在此處找到。
現在我們正在執行這兩個應用程式,讓我們將資料插入到表中。
轉到您的MySQL終端會話並輸入以下插入語句。
mysql> insert into People values (1, 'Bob', 'First Street', 'First City', NULL);
現在轉到檔案接收器正在寫入檔案的目錄/tmp/processed-file,查詢名為output.txt的檔案。開啟該檔案並驗證其內容。它應該包含以下內容。
{"id":1,"name":"Bob","street":"First Street","city":"First City"}
向表中填充更多資料。
mysql> insert into People values (2, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (3, 'Mary', 'First Street', 'First City', NULL);
驗證我們是否在output.txt檔案中看到了新資料。
停止執行JDBC Source應用程式,並按如下重新執行。
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:mariadb://:3306/Demo --spring.datasource.username=root --spring.datasource.password=rootpw --jdbc.supplier.query="select id, name, street, city from People where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update People set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
正如我們在之前的部落格中看到的,以及此處所解釋的,所有開箱即用的源都配置了許多有用的函式,您可以透過屬性啟用它們。在上述新配置選項下執行時,我們將filterFunction與jdbcSupplier組合,從而為JDBC Supplier生成的資料新增過濾功能。我們使用屬性spring.cloud.stream.function.definition將其組合,並將其值設定為jdbcSupplier|filterFunction。然後透過屬性filter.function.expression,我們提供一個JSONPath表示式來過濾掉所有偶數ID。
現在,如果您向表中插入更多資料,您將看到只有具有奇數ID的記錄被寫入檔案。
嘗試將這些記錄輸入到表中。
mysql> insert into People values (200, 'John', 'First Street', 'First City', NULL);
mysql> insert into People values (201, 'Mary', 'First Street', 'First City', NULL);
mysql> insert into People values (202, 'Alice', 'First Street', 'First City', NULL);
mysql> insert into People values (203, 'Bob', 'First Street', 'First City', NULL);
mysql> insert into People values (204, 'Jane', 'First Street', 'First City', NULL);
mysql> insert into People values (205, 'Doe', 'First Street', 'First City', NULL);
我們將看到檔案不包含ID為200、202和204的記錄,因為它們被過濾掉了。
JDBC supplier隨流行的開源JDBC驅動程式一起提供。目前,它包含MySQL、PostgreSQL和Microsoft SQL Server資料庫的驅動程式。這使我們能夠快速切換同一個針對特定資料庫(例如MySQL)執行的JDBC Source應用程式,使其在不進行任何程式碼更改的情況下針對PostgreSQL執行,而只需在部署時進行配置更改。讓我們以我們針對MySQL執行的JDBC Source為例,這次針對PostgreSQL執行。
首先,我們將在docker容器中執行PostgreSQL。
docker run --rm --name pg-docker -e POSTGRES_PASSWORD=test -e POSTGRES_USER=test -e POSTGRES_DB=demo -d -p 5432:5432 postgres
登入到psql會話(或使用PGAdmin等UI工具)。
docker run -it --rm --network host postgres psql -h localhost -d demo -U test
使用test作為密碼。
然後建立此表
CREATE TABLE IF NOT EXISTS People (
id INT NOT NULL,
name VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
tag CHAR(1),
PRIMARY KEY (id));
停止當前的JDBC Source,並使用以下配置選項重新執行它。
java -jar jdbc-source-kafka-3.0.0-SNAPSHOT.jar --spring.cloud.stream.function.definition="jdbcSupplier|filterFunction" --spring.datasource.url=jdbc:postgresql://:5432/demo --spring.datasource.username=test --spring.datasource.password=test --jdbc.supplier.query="select id, name, street, city from people where tag is NULL order by id" --spring.cloud.stream.bindings.output.destination=jdbc-file-demo --jdbc.supplier.update="update people set tag='1' where id in (:id)" --server.port=0 --filter.function.expression="#jsonPath(payload,'$.id')%2!=0"
這與我們第二次針對MySQL執行JDBC源時的配置選項基本相同,但這次資料來源屬性已更改為針對PostgreSQL資料庫執行。
在psql提示符下插入與之前MySQL相同的資料。您會注意到只有ID為奇數的記錄被追加到檔案中。
如果我們要新增商業資料庫的JDBC驅動程式,那麼我們需要手動進行這些更改。具體步驟如下所示。
在maven配置中新增所需的驅動程式(例如Oracle JDBC驅動程式)作為依賴項。將其作用域設定為runtime。
從倉庫根目錄執行:./mvnw clean install -pl :jdbc-supplier
透過Supplier更改生成應用程式:./mvnw clean install -pl :jdbc-source
cd applications/source/jdbc-source/apps - 在這裡,我們可以找到基於Kafka和RabbitMQ的jdbc-source應用程式。
構建我們想要的應用程式變體。
這篇部落格詳細介紹了JDBC Supplier以及該Supplier在Spring Cloud Stream JDBC Source中的用法。我們還看到了檔案消費者及其在Spring Cloud Stream中的接收器對應物。然後,我們深入探討了使用幾種變體獨立執行這些應用程式,並在此過程中探索了各種功能。最後,我們瞭解瞭如何輕鬆地在各種資料庫之間切換JDBC Source,以及如何新增新的資料庫驅動程式。
本系列將繼續。在接下來的幾周,我們將研究更多的函式和應用程式。