案例研究:從檔案讀取並寫入 MongoDB

工程 | Soby Chacko | 2020年8月25日 | ...

本文是一個系列部落格的一部分,探討了基於 Java Function 的全新設計的 Spring Cloud Stream applications。在本篇中,我們將深入研究檔案供應者 (file supplier) 及其對應的 Spring Cloud Stream 檔案源 (file source)。我們還將介紹 MongoDB 消費者 (MongoDB consumer) 及其對應的 Spring Cloud Stream 接收器 (sink)。最後,我們將演示如何在 Spring Cloud Data Flow 上將檔案源和 MongoDB 接收器協同編排成一個管道 (pipeline)。

以下是本系列部落格的所有先前部分。

檔案讀取和處理用例

檔案攝取和從這些檔案讀取資料是一個經典的企業用例。幾十年來,許多企業一直依賴不同程度的檔案設施來執行關鍵任務系統。數 TB 的資料作為檔案在網際網路和企業內網中傳輸。例如,想象一下銀行資料中心,它每秒從所有分支機構、ATM 和 POS 交易接收資料檔案,然後需要處理這些檔案並將其放入其他系統中。這只是一個領域,但有成千上萬的例子表明檔案處理是許多業務的關鍵路徑。存在許多遺留系統,其中編寫了許多自定義應用程式,每個應用程式都採用自己的方式來處理這些用例。Spring Integration 多年來一直作為通道介面卡提供檔案支援。這些元件可以實現為 Function,而在從檔案讀取的情況下,我們可以提供一個可重用並注入到終端使用者應用程式中的通用 Supplier Function。在以下部分中,我們將看到有關此功能抽象及其各種使用場景的更多詳細資訊。

檔案 Supplier

檔案 Supplier 是一個元件,實現為 java.util.function.Supplier Bean,當被呼叫時,它將提供給定目錄中的檔案內容。檔案 supplier 具有以下簽名。

Supplier<Flux<Message<?>>>

supplier 的使用者可以訂閱返回的 Flux<Message<?>,這是一個訊息流或目錄中檔案本身的流。

為了呼叫檔案 supplier,我們需要指定一個目錄來輪詢檔案。目錄資訊是必需的,必須透過配置屬性 file.supplier.directory 提供。預設情況下,supplier 將生成 byte[] 型別的資料,但它也透過配置屬性 file.consumer.mode 支援另外兩種檔案消費模式。支援的附加值是 linesreflines 檔案消費模式將逐行消費檔案內容。這對於讀取文字檔案(例如 CSV 檔案和其他結構化文字資料)非常有用。ref 模式將提供實際的 File 物件。預設情況下,檔案 supplier 還會阻止重複讀取已讀過的檔案。這由屬性 file.supplier.preventDuplicates 控制。

在自定義應用中重用檔案 Supplier

檔案 supplier 是一個可重用的 Spring Bean,可以注入到終端使用者的自定義應用程式中。注入後,可以直接呼叫它並結合自定義資料處理。這是一個示例。

@Autowired
Supplier<Flux<Message<?>>> fileSupplier;

public void consumeDataAndSendEmail() {
  Flux<Message<?> data = fileSupplier.get();
  messageFlux.subscribe(t -> {
      if (t == something)
         //send the email here.
      }
  }
}

在上面的虛擬碼中,我們注入了檔案 supplier Bean,然後使用它呼叫其 get 方法來獲取一個 Flux。然後我們訂閱該 Flux,並透過 Flux 接收到資料時,應用一些過濾並根據過濾結果執行操作。這只是一個簡單的示例,展示瞭如何重用檔案 supplier。當您在實際應用程式中嘗試時,您可能需要在實現中進行更多調整,例如在執行條件檢查之前將接收到的資料的預設資料型別從 byte[] 轉換為其他型別,或者將預設的檔案讀取模式從 content 更改為 lines 等。

執行使用檔案 supplier 的獨立檔案源

當檔案 supplier 與 Spring Cloud Stream 結合使用時,它會變得更強大,成為一個檔案源。正如我們在之前的部落格中看到的,這個 supplier 已經預先打包了 Spring Cloud Stream 中的 KafkaRabbitMQ 繫結器,使它們成為可以作為 Spring Boot 應用程式執行的 uber jar。讓我們看看如何獲取這個 uber jar 並將其作為獨立應用程式執行。

第一步,獲取帶有 Apache Kafka 變體的檔案源。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-source-kafka/3.0.0-SNAPSHOT/file-source-kafka-3.0.0-SNAPSHOT.jar

確保您的 Kafka 正在預設埠上執行。

現在是時候獨立執行檔案源了。

java -jar file-source-kafka-3.0.0-SNAPSHOT.jar --file.supplier.directory=/tmp/data-files --file.consumer.mode=lines --spring.cloud.stream.bindings.output.destination=file-data

預設情況下,Spring Cloud Stream 期望輸出繫結是 fileSupplier-out-0(因為 fileSupplier 是 supplier Bean 的名稱)。然而,當這些應用程式生成時,這個輸出繫結會被覆蓋為 output。這樣做是為了適應在 Spring Cloud Data Flow 上執行此源應用程式時的一些要求。

我們還要求應用程式讀取落在 /tmp/data-files 目錄中的檔案,並將其逐行消費(使用 lines 模式)。

觀察 Kafka 主題 file-data。使用 kafkacat 工具,您可以執行此操作:

kafkacat -b localhost:9092 -t file-data

現在,在 /tmp/data-files 目錄中放置一些檔案。您將看到資料到達 file-data Kafka 主題,檔案中每一行代表一個 Kafka 記錄。

如果您想將檔案限制為特定模式,可以使用屬性 file.supplier.filenamePattern 的簡單命名模式,或者使用屬性 file.supplier.filenameRegex 的更復雜的基於正則表示式的模式。

檔案源的自動配置處理器

正如我們在本系列部落格的第二部分中所看到的,所有開箱即用的 Spring Cloud Stream 源應用程式都已自動配置了幾個開箱即用的通用處理器。您可以將這些處理器作為檔案源的一部分啟用。這裡有一個示例,我們執行檔案源並接收資料,然後將消費的資料進行轉換,再發送到中介軟體上的目的地。

java -jar file-source-kafka-3.0.0-SNAPSHOT.jar --file.supplier.directory=/tmp/data-files --file.consumer.mode=lines --spring.cloud.stream.bindings.output.destination=file-data --spring.cloud.function.definition=fileSupplier|spelFunction --spel.function.expression=payload.toUpperCase()

透過為 spring.cloud.function.definition 屬性提供值 fileSupplier|spelFunction,我們激活了與檔案 supplier 組合的 SpEL Function。然後,我們提供一個 SpEL 表示式,用於使用 spel.function.expression 轉換資料。

還有其他幾種 Function 可以透過這種方式組合。請在此處檢視更多詳細資訊。

MongoDB Consumer

MongoDB consumer 提供了一個 Function,允許從外部系統接收資料,然後將這些資料寫入 MongoDB。我們可以直接在自定義應用程式中使用 consumer Bean 將資料插入到 MongoDB 集合中。以下是 MongoDB consumer Bean 的型別簽名。

Consumer<Message<?>> mongodbConsumer

一旦注入到自定義應用程式中,使用者可以直接呼叫 consumer 的 accept 方法,並提供一個 Message<?> 物件,將其有效載荷傳送到 MongoDB 集合。

使用 MongoDB consumer 時,collection 是一個必需屬性,必須透過 mongodb.consumer.collection 進行配置。

執行使用 consumer 的獨立 Spring Cloud Stream MongoDB Sink

與檔案源的情況一樣,開箱即用的 Spring Cloud Stream 應用程式已經提供了使用 MongoDB consumer 的 MongoDB sink。該 sink 支援 KafkaRabbitMQ 繫結器變體。當用作 Spring Cloud Stream sink 時,MongoDB consumer 會自動配置為接受來自相應中介軟體系統的資料,例如,來自 Kafka 主題或 RabbitMQ 交換器的資料。

讓我們花幾分鐘驗證我們可以獨立執行 MongoDB sink。

使用 Docker 設定 MongoDB 以測試 Sink

在終端視窗中執行以下命令。

docker run -d --name my-mongo \
    -e MONGO_INITDB_ROOT_USERNAME=mongoadmin \
    -e MONGO_INITDB_ROOT_PASSWORD=secret \
    -p 27017:27017 \
    mongo

docker exec -it my-mongo /bin/sh

這將使我們進入執行中的 Docker 容器的 shell 會話。在 shell 中呼叫以下命令。

# mongo

> use admin

> db.auth('mongoadmin','secret')
1
> db.createCollection('test_collection’')
{ "ok" : 1 }>

現在我們已經設定好 MongoDB,讓我們獨立執行 sink。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/mongodb-sink-kafka/3.0.0-SNAPSHOT/mongodb-sink-kafka-3.0.0-SNAPSHOT.jar

java -jar mongodb-sink-kafka-3.0.0-SNAPSHOT.jar --mongodb.consumer.collection=test_collection --spring.data.mongodb.username=mongoadmin --spring.data.mongodb.password=secret --spring.data.mongodb.database=admin --spring.cloud.stream.bindings.input.destination=test-data-mongo

將一些 JSON 資料插入到 Kafka 主題 test-data-mongo 中。例如,您可以使用 Kafka 附帶的控制檯生產者指令碼,如下所示。

/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-data-mongo

然後像這樣生成資料:

{"hello":"mongo"}

轉到我們在上面 Docker shell 中啟動的終端上的 MongoDB CLI。

db.test_collection.find()

透過 Kafka 主題輸入的資料應顯示為輸出。

在 Spring Cloud Data Flow 上執行

獨立執行檔案源和 MongoDB 都很順利,但 Spring Cloud Data Flow 使得將它們作為管道執行變得非常容易。基本上,我們想要編排一個等同於 File Source | Filter | MongoDB 的流程。

本系列部落格中的一篇專門詳細介紹瞭如何執行 Spring Cloud Data Flow 並將應用程式部署為流。如果您不熟悉執行 Spring Cloud Data Flow,請回顧該部落格。下面,我們將簡要介紹設定 Spring Cloud Data Flow 的步驟。

首先,我們需要獲取用於執行 Spring Cloud Data Flow 的 docker-compose 檔案。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml

另外,獲取這個用於執行 MongoDB 的附加 docker-compose 檔案。

wget -O mongodb.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/mongodb.yml

我們需要設定一些環境變數才能正確執行 Spring Cloud Data Flow。

export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven

現在一切準備就緒,是時候開始執行 Spring Cloud Data Flow 和所有其他輔助元件了。

docker-compose -f docker-compose.yml -f mongo.yml up

一旦 SCDF 啟動並執行,請訪問 https://:9393/dashboard。然後轉到左側的 Streams 並選擇 Create Stream。從源應用程式中選擇 File,從處理器中選擇 Filter,從 sink 應用程式中選擇 MongoDB。點選選項並選擇以下屬性。以下是選擇所有屬性後的截圖。

File to MongoDB SCDF Blog Pipeline

將流命名為 file-source-filter-mongo,然後點選 Create the Stream。建立後,點選“Deploy the Stream”。接受所有預設選項,然後點選螢幕底部的“Deploy Stream”。

流部署完成後,請在呼叫 Spring Cloud Data Flow docker-compose 指令碼的同一目錄下建立一個名為 source-files 的目錄。該目錄已經由執行 Spring Cloud Data Flow 元件(Skipper)之一的 Docker 容器掛載,並可由容器看到。確保此 source-files 目錄具有正確的訪問級別,特別是由於 Docker 容器將以 root 身份執行應用程式,而您很可能在本地機器上以非 root 使用者身份執行。在 UI 上檢視檔案源應用程式的日誌以檢查是否有許可權錯誤。如果看到任何錯誤,請解決這些問題。

準備一個新的終端會話,並使用 mongo CLI 工具。

docker exec -it dataflow-mongo /bin/sh

# mongo

> use admin

> db.auth(‘mongoadmin’,'secret')
1
> db.createCollection(‘mongo_data’')
{ "ok" : 1 }>

source-files 目錄中放置一些檔案,內容如下。

{"non-sql":"mongo"}
{"sql":"mysql"}
{"document":"mongo"}
{"log":"kafka"}
{"sink":"mongo"}

轉到 mongo CLI 終端會話。

db.mongo_data.find()

您將看到我們在管道中新增的過濾元件過濾掉了檔案中所有不包含單詞 mongo 的條目。您應該看到類似於以下的輸出。

{ "_id" : ObjectId("5f4551c470e0373080fcd0b8"), "non-sql" : "mongo" }
{ "_id" : ObjectId("5f4551c470e0373080fcd0b2"), "sink" : "mongo" }
{ "_id" : ObjectId("5f4551c470e0373080fcd0b5"), "document" : "mongo" }

總結

在本篇部落格中,我們快速回顧了檔案 Supplier 及其對應的 Spring Cloud Stream 源。我們還了解了 MongoDB consumer 和相應的 Spring Cloud Stream sink 應用程式。我們探討了如何將 Function 元件注入到自定義應用程式中。之後,我們看到了如何獨立執行檔案源和 MongoDB sink 的 Spring Cloud Stream 應用程式。最後,我們深入研究了 Spring Cloud Data Flow,並編排了一個從檔案源到 MongoDB sink 的管道,並在途中過濾掉了一些資料。

敬請關注

本系列部落格將繼續更新。在接下來的幾周裡,我們將探討更多類似於本部落格中描述的場景,但會涉及不同的 Function 和應用程式。

訂閱 Spring 電子報

透過 Spring 電子報保持聯絡

訂閱

搶佔先機

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將舉辦的活動

檢視 Spring 社群所有即將舉辦的活動。

檢視全部