案例研究:從檔案讀取並寫入MongoDB

工程 | Soby Chacko | 2020年8月25日 | ...

本文是部落格系列的一部分,該系列探討了基於Java Functions的全新重新設計的Spring Cloud Stream應用程式。在本篇中,我們將深入瞭解檔案供應商及其Spring Cloud Stream檔案源對應部分。我們還將看到MongoDB消費者及其相應的Spring Cloud Stream sink。最後,我們將演示如何將檔案源和MongoDB sink在Spring Cloud Data Flow上作為管道進行編排。

以下是本部落格系列的所有先前部分。

檔案讀取和處理用例

檔案攝取和從檔案讀取資料是一個經典的企業用例。幾十年來,許多企業一直依賴不同級別的檔案設施來執行任務關鍵型系統。數太位元組的資料透過網際網路和企業內網以檔案的形式傳輸。例如,想象一個銀行資料中心,它每秒從所有分支機構、ATM和POS交易中接收資料檔案,然後需要處理這些資料並將其放入其他系統。這只是一個領域,但在成千上萬的例子中,檔案處理是許多業務的關鍵路徑。有許多遺留系統,其中編寫了許多自定義應用程式,每個應用程式都採用自己的方式來處理這些用例。Spring Integration多年來一直提供檔案支援作為通道介面卡。這些元件可以實現為函式,在從檔案讀取的情況下,我們可以提供一個通用的Supplier函式,該函式可重用並注入到終端使用者應用程式中。在以下部分中,我們將看到有關此功能抽象及其各種使用場景的更多詳細資訊。

檔案供應商

檔案供應商是一個實現為java.util.function.Supplier bean的元件,當被呼叫時,它將提供給定目錄中檔案的內容。檔案供應商具有以下簽名。

Supplier<Flux<Message<?>>>

供應商的使用者可以訂閱返回的Flux<Message<?>,它是目錄中訊息流或檔案物件本身。

為了呼叫檔案供應商,我們需要指定一個目錄來輪詢檔案。目錄資訊是必需的,並且必須透過配置屬性file.supplier.directory提供。預設情況下,供應商將以byte[]的形式生成資料,但它還透過配置屬性file.consumer.mode支援兩種額外的檔案消費模式。支援的附加值是linesref。檔案消費模式lines將一次消費一行檔案內容。這對於讀取CSV檔案和其他結構化文字資料等文字檔案很有用。ref模式將提供實際的File物件。預設情況下,檔案供應商還會阻止讀取它之前已經讀取過的相同檔案。這透過屬性file.supplier.preventDuplicates進行控制。

在自定義應用程式中重用檔案供應商

檔案供應商是一個可重用的Spring bean,我們可以將其注入到終端使用者的自定義應用程式中。一旦注入,它可以直接呼叫並與自定義資料處理相結合。這是一個例子。

@Autowired
Supplier<Flux<Message<?>>> fileSupplier;

public void consumeDataAndSendEmail() {
  Flux<Message<?> data = fileSupplier.get();
  messageFlux.subscribe(t -> {
      if (t == something)
         //send the email here.
      }
  }
}

在上面的虛擬碼中,我們正在注入檔案供應商bean,然後使用它來呼叫其get方法以獲取Flux。然後我們訂閱該Flux,並且每次透過Flux接收到任何資料時,都應用一些過濾,並根據該資料採取行動。這只是一個簡單的示例,旨在演示如何重用檔案供應商。當您在實際應用程式中嘗試此操作時,您可能需要在實現中進行更多調整,例如在進行條件檢查之前將接收到的資料的預設資料型別從byte[]轉換為其他型別,或者將預設檔案讀取模式從content更改為lines等。

執行使用檔案供應商的獨立檔案源

當檔案供應商與Spring Cloud Stream結合使用時,它變得更加強大,可以成為檔案源。正如我們在之前的部落格中看到的,此供應商已經預打包了Kafka和RabbitMQ繫結器在Spring Cloud Stream中,使它們成為可作為Spring Boot應用程式執行的uber jars。讓我們看看如何獲取這個uber jar並作為獨立應用程式執行。

作為第一步,繼續並獲取帶有 Apache Kafa 變體的此檔案源。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/file-source-kafka/3.0.0-SNAPSHOT/file-source-kafka-3.0.0-SNAPSHOT.jar

確保Kafka在預設埠執行。

現在是時候獨立執行檔案源了。

java -jar file-source-kafka-3.0.0-SNAPSHOT.jar --file.supplier.directory=/tmp/data-files --file.consumer.mode=lines --spring.cloud.stream.bindings.output.destination=file-data

預設情況下,Spring Cloud Stream 期望輸出繫結為 fileSupplier-out-0(因為 fileSupplier 是供應商 bean 名稱)。然而,當生成這些應用程式時,此輸出繫結被覆蓋為 output。這樣做是為了滿足在 Spring Cloud Data Flow 上執行此源應用程式時的一些要求。

我們還要求應用程式讀取落在/tmp/data-files目錄中的檔案,並將其作為一次一行(使用lines模式)進行消費。

觀看 Kafka 主題 `file-data`。使用 kafkacat 工具,你可以這樣做

kafkacat -b localhost:9092 -t file-data

現在,將一些檔案放入/tmp/data-files目錄。您將看到資料到達file-data Kafka主題,檔案中每一行代表一個Kafka記錄。

如果您想將檔案限制為特定模式,可以使用屬性file.supplier.filenamePattern使用簡單的命名模式,或者使用屬性file.supplier.filenameRegex使用更復雜的基於正則表示式的模式。

使用檔案源自動配置處理器

正如我們在此部落格系列的第二部分中看到的,所有開箱即用的Spring Cloud Stream源應用程式都已自動配置了幾個開箱即用的通用處理器。您可以將這些處理器作為檔案源的一部分啟用。這裡有一個示例,我們執行檔案源並接收資料,然後轉換消費的資料,再將其傳送到中介軟體上的目標。

java -jar file-source-kafka-3.0.0-SNAPSHOT.jar --file.supplier.directory=/tmp/data-files --file.consumer.mode=lines --spring.cloud.stream.bindings.output.destination=file-data --spring.cloud.function.definition=fileSupplier|spelFunction --spel.function.expression=payload.toUpperCase()

透過為spring.cloud.function.definition屬性提供值fileSupplier|spelFunction,我們激活了與檔案供應商組合的spel函式。然後我們提供了一個SpEL表示式,我們想用它來使用spel.function.expression轉換資料。

還有其他幾個功能可以這樣組合。更多詳情請參閱此處

MongoDB 消費者

MongoDB消費者提供了一個函式,允許使用者從外部系統接收資料,然後將該資料寫入MongoDB。我們可以在自定義應用程式中直接使用消費者bean將資料插入到MongoDB集合中。以下是MongoDB消費者bean的型別簽名。

Consumer<Message<?>> mongodbConsumer

一旦注入到自定義應用程式中,使用者可以直接呼叫消費者的accept方法,並提供一個Message<?>物件以將其有效載荷傳送到MongoDB集合。

使用MongoDB消費者時,集合是一個必需屬性,必須透過mongodb.consumer.collection進行配置。

使用消費者執行獨立Spring Cloud Stream MongoDB sink

與檔案源的情況一樣,Spring Cloud Stream開箱即用的應用程式已經使用MongoDB消費者提供了MongoDB sink。該sink適用於KafkaRabbitMQ繫結器變體。當用作Spring Cloud Stream sink時,MongoDB消費者會自動配置為接受來自相應中介軟體系統的資料,例如,來自Kafka主題或RabbitMQ交換機的資料。

讓我們花幾分鐘驗證我們可以獨立執行MongoDB sink。

使用Docker設定MongoDB以測試sink

在終端視窗中執行以下命令。

docker run -d --name my-mongo \
    -e MONGO_INITDB_ROOT_USERNAME=mongoadmin \
    -e MONGO_INITDB_ROOT_PASSWORD=secret \
    -p 27017:27017 \
    mongo

docker exec -it my-mongo /bin/sh

這將使我們進入正在執行的 docker 容器的 shell 會話。在 shell 中呼叫以下命令。

# mongo

> use admin

> db.auth('mongoadmin','secret')
1
> db.createCollection('test_collection’')
{ "ok" : 1 }>

現在我們已經設定好MongoDB,讓我們獨立執行sink。

wget https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/mongodb-sink-kafka/3.0.0-SNAPSHOT/mongodb-sink-kafka-3.0.0-SNAPSHOT.jar

java -jar mongodb-sink-kafka-3.0.0-SNAPSHOT.jar --mongodb.consumer.collection=test_collection --spring.data.mongodb.username=mongoadmin --spring.data.mongodb.password=secret --spring.data.mongodb.database=admin --spring.cloud.stream.bindings.input.destination=test-data-mongo

將一些JSON資料插入Kafka主題test-data-mongo。例如,您可以使用Kafka附帶的控制檯生產者指令碼,如下所示。

/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test-data-mongo

然後像這樣生成資料

{"hello":"mongo"}

轉到上面我們在 Docker shell 中啟動的終端上的 MongoDB CLI。

db.test_collection.find()

透過 Kafka 主題輸入的資料應作為輸出顯示。

在 Spring Cloud Data Flow 上執行

獨立執行檔案源和MongoDB很好,但是Spring Cloud Data Flow使其非常容易將它們作為管道執行。基本上,我們想要編排一個相當於檔案源 | 過濾器 | MongoDB的流。

本系列中的一篇部落格專門介紹瞭如何執行Spring Cloud Data Flow並部署應用程式作為流的所有細節。如果您不熟悉執行Spring Cloud Data Flow,請回顧該部落格。下面,我們簡要提供了設定Spring Cloud Data Flow所涉及的步驟。

首先,我們需要獲取用於執行Spring Cloud Data Flow的docker-compose檔案。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.0/spring-cloud-dataflow-server/docker-compose.yml

另外,獲取用於執行MongoDB的此附加docker-compose檔案。

wget -O mongodb.yml https://raw.githubusercontent.com/spring-cloud/stream-applications/gh-pages/files/mongodb.yml

我們需要設定一些環境變數才能正確執行 Spring Cloud Data Flow。

export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven

現在我們萬事俱備,是時候啟動 Spring Cloud Data Flow 和所有其他輔助元件了。

docker-compose -f docker-compose.yml -f mongo.yml up

SCDF 啟動並執行後,請訪問 https://:9393/dashboard。然後轉到左側的 Streams 並選擇 Create Stream。從源應用程式中選擇 File,從處理器中選擇 Filter,從 sink 應用程式中選擇 MongoDB。單擊選項並選擇以下屬性。這是選擇所有屬性後應該顯示的樣子。

File to MongoDB SCDF Blog Pipeline

將流命名為file-source-filter-mongo,然後點選Create the Stream。建立後,點選“Deploy the Stream”。接受所有預設選項,然後點選螢幕底部的“Deploy Stream”。

流部署完成後,在呼叫 Spring Cloud Data Flow docker-compose 指令碼的同一目錄中建立一個名為 source-files 的目錄。此目錄已由執行 Spring Cloud Data Flow 元件(Skipper)之一的 docker 容器掛載並被容器看到。確保此 source-files 目錄具有正確的訪問級別,尤其是在 docker 容器將以 root 使用者身份執行應用程式而您很可能以非 root 使用者身份在本地機器上執行時。在 UI 上檢視檔案源應用程式的日誌,以查詢任何許可權錯誤。如果看到任何錯誤,請解決這些問題。

使用 mongo CLI 工具準備一個新的終端會話。

docker exec -it dataflow-mongo /bin/sh

# mongo

> use admin

> db.auth(‘mongoadmin’,'secret')
1
> db.createCollection(‘mongo_data’')
{ "ok" : 1 }>

source-files目錄中放置一些檔案,內容如下。

{"non-sql":"mongo"}
{"sql":"mysql"}
{"document":"mongo"}
{"log":"kafka"}
{"sink":"mongo"}

轉到mongo命令列終端會話。

db.mongo_data.find()

您會看到我們新增到管道中的過濾元件過濾掉了檔案中所有不包含“mongo”一詞的條目。您應該會看到類似於以下內容的輸出。

{ "_id" : ObjectId("5f4551c470e0373080fcd0b8"), "non-sql" : "mongo" }
{ "_id" : ObjectId("5f4551c470e0373080fcd0b2"), "sink" : "mongo" }
{ "_id" : ObjectId("5f4551c470e0373080fcd0b5"), "document" : "mongo" }

總結

在這篇部落格中,我們對檔案供應商及其Spring Cloud Stream源對應部分進行了旋風式的考察。我們還了解了MongoDB消費者和相應的Spring Cloud Stream sink應用程式。我們研究瞭如何將功能元件注入自定義應用程式。之後,我們看到了如何獨立執行檔案源和MongoDB sink的Spring Cloud Stream應用程式。最後,我們深入研究了Spring Cloud Data Flow,並編排了一個從檔案源到MongoDB sink的管道,並在途中過濾掉了一些資料。

敬請期待

這個部落格系列將會繼續。在接下來的幾周裡,我們將探討更多類似於我們在這篇部落格中描述的場景,但會使用不同的函式和應用程式。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有