案例研究:使用 Spring Cloud Data Flow 進行遠端檔案攝取

工程 | David Turanski | 2020 年 9 月 29 日 | ...

本文是一個部落格系列的一部分,該系列探討了基於 Java 函式的重新設計的 Spring Cloud Stream 應用程式。在本章中,我們將探討如何使用 Spring Cloud Stream ApplicationsSpring Cloud Data Flow 實現一個非常常見的 ETL 用例:從遠端服務攝取檔案。具體來說,我們將介紹如何從 S3、SFTP 和 FTP 攝取檔案。

以下是該部落格系列迄今為止包含的內容

遠端檔案攝取架構

從宏觀角度來看,Spring Cloud Data Flow 多年來一直支援使用 SFTP 進行遠端檔案攝取。自我撰寫 此文章 以來,基本架構沒有改變,但正如我們將看到的那樣,新的流應用程式提供了更簡單、更靈活的解決方案。

fileingest

檔案攝取架構始於一個 遠端檔案源,它輪詢遠端目錄併為檢測到的每個檔案釋出一條訊息。術語 遠端檔案源 指代任何提供此功能的源應用程式。目前,這包括 Amazon S3 源SFTP 源FTP 源

每個源都可以配置為將遠端目錄中的檔案同步到本地目錄。在這種情況下,底層 Supplier 函式生成的訊息負載是本地檔案路徑。Supplier 的輸出在此過程中被轉換為任務啟動請求。我們稍後會解釋如何完成。該請求由一個 Task Launcher sink 接收,然後透過其 REST API 將其傳送到 Data Flow Server,以啟動一個批處理作業來攝取檔案內容。在下面所示的示例中,該作業將 CSV 檔案中的每一行插入到資料庫表中。

如果我們在雲平臺(例如 Kubernetes 或 Cloud Foundry)上執行,我們需要配置一個共享卷,例如使用 NFS,以便任務應用程式可以訪問源下載的檔案。

這是使用 Spring Cloud Data Flow 推薦的檔案攝取架構。以下特點使其具有很高的彈性

  • 檔案攝取作業是使用 Spring Batch 實現的。它非常適合處理大型檔案,其中臨時故障可能需要作業從上次中斷的地方重新啟動——Spring Batch 專門設計用於處理這種情況。

  • Task Launcher sink 使用 PollableMessageSource,以便在從輸入佇列中拉取任務啟動請求之前,它可以首先確認 Data Flow 可以接受任務請求。Data Flow 配置了允許的最大併發任務執行數量。該 sink 使用 Data Flow API 在接受下一個請求之前檢查是否未達到此限制。這種類似於背壓的流量控制,防止了當例如 100 個檔案被放入遠端目錄時容易發生的平臺資源飽和。

  • 如果需要,共享卷是必要的,以便批處理作業可以從上次提交的事務繼續處理。

可以在不使用 Spring Cloud Data Flow 或 Spring Batch 的情況下實現這種型別的工作負載。我們將此留作讀者的練習。

遠端檔案源

遠端檔案源是什麼意思?Amazon S3、SFTP 和 FTP 源應用程式在 Spring Integration 中具有共同的起源,因此行為基本相同。例如,擴充套件 AbstractInboundFileSynchronizer 的類用於將本地目錄與遠端目錄同步。基類包括配置一個 FileListFilter 來指定要包含的檔案。通常,這用於模式匹配檔名。此外,此元件使用一個 MetadataStore 來跟蹤本地目錄中已有的檔案以及上次修改時間,以便只同步新的或更改的檔案。預設情況下,元資料儲存是記憶體實現。這意味著當源重新啟動時,我們可能會收到已處理過的檔案的事件。為了解決這個問題,每個源都可以輕鬆定製以使用幾種可用的 持久化實現 之一。AbstractFileSynchronizer 還支援使用 SpEL 表示式建立本地檔名、自動刪除遠端檔案等。

除了檔案同步,每個源還包含 file.consumer.mode 屬性,其值可以是以下之一

  • contents - 負載是檔案內容的位元組陣列

  • ref - 負載是本地檔案路徑

  • lines - 每個負載是檔案中的一行

此外,每個源都提供一個 list-only 選項,其中負載包含有關遠端檔案的元資料,並且不執行同步。

SFTP 源

SFTP Source 從 SFTP 伺服器消費檔案。由於 SFTP 是最常用的遠端檔案服務,因此該元件具有最先進的功能。事實上,在上一代流應用程式中,SFTP 是我們支援檔案攝取架構的唯一源。隨著它發展到支援任務啟動請求,我們最終專門為檔案攝取用例實現了一個特殊變體。sftp-datalow 源被設計為與 tasklauncher-dataflow sink 一起工作,嵌入了將負載轉換為任務啟動請求的程式碼。在當前版本中,我們已棄用此變體,轉而支援函式組合。此外,SFTP 源可以配置為輪詢多個遠端目錄,在它們之間輪換。在此配置中,輪換演算法可以是 fair(每個遠端目錄輪詢一次)或非公平(每個遠端目錄持續輪詢直到沒有新檔案)。它還支援 sftp.supplier.stream=true,這將直接流式傳輸內容而無需同步到本地目錄。

FTP 源

FTP Source 與 SFTP 源非常相似,不同之處在於它使用 FTP 且在傳輸過程中不加密資料,因此安全性較低。它提供相同核心功能,但目前不支援多個遠端目錄、list-onlystream 模式。

Amazon S3 源

Amazon S3 Source 仿照其他源構建,支援相同的檔案消費模式以及 list-only 模式。在這種情況下,s3.supplier.remote-dir 指的是一個 S3 儲存桶。使用 list-only 時,負載包含一個 S3ObjectSummary,它提供有關 S3 物件的元資料。S3 本身提供了比 FTP/SFTP 更豐富的功能集。

除了 AWS S3,該源現在還可以用於相容 S3 的實現,例如 Minio

Task Launcher Sink

在之前的版本中,這被稱為 tasklauncher-dataflow sink。最初,我們還有獨立的任務啟動器,每個支援的平臺一個。出於易用性和彈性考慮,如上所述,這些已經棄用,轉而使用 Data Flow 支援的實現。因此,我們從名稱中去掉了 "Data Flow"。現在它就是簡單的 tasklauncher-sink

該 sink 構建在一個相應的 tasklauncher-function 上,該函式可在任何獨立應用程式中使用,向 Data Flow 傳送任務啟動請求。這實現為 Function<LaunchRequest, Optional<Long>>。[LaunchRequest](https://github.com/spring-cloud/stream-applications/blob/master/functions/function/tasklauncher-function/src/main/java/org/springframework/cloud/fn/tasklauncher/LaunchRequest.java) 是一個簡單的值物件,至少包含要啟動的任務名稱。此任務必須在 Data Flow 中使用相同的名稱定義。可選地,啟動請求包括命令列引數和部署屬性。如果請求已提交,該函式將返回唯一的任務 ID 作為 long 值。如果 Data Flow server 指示任務平臺已達到最大執行任務數、無法訪問 Data Flow server 或請求無效,則請求不會提交。

Task Launcher sink 從一個定時任務中呼叫其基礎函式,該任務由一個 DynamicPeriodicTrigger 觸發,該觸發器允許在執行時更新其週期。在這種情況下,我們使用它來實現指數退避。從最初的一秒週期開始,如果滿足以下條件,觸發器將退避,最終達到每 30 秒一次的週期:

  • 沒有排隊的啟動請求

  • 平臺已執行最大任務數

如果這些條件中的任何一個發生變化,源會將週期重置為其初始值。當然,初始和最大觸發週期是可配置的。

被觸發的任務檢查伺服器是否可以接受新的啟動請求,如果可以,它使用 PollableMessageSource 輪詢輸入佇列。如果存在請求,它將透過其 REST API 將請求傳送到 Data Flow。

建立任務啟動請求

新的基於函式的架構為 函式組合 提供了一流的支援。作為此策略的一部分,某些通用函式可以與任何源組合。值得注意的是,這包括 task-launch-request-function。這意味著現在可以將任何遠端檔案源配置為生成任務啟動請求。任務啟動請求函式可以評估 SpEL 表示式。例如,每個任務啟動請求可以提供不同的檔案路徑作為命令列引數。

綜合起來

讓我們深入研究一個示例,看看它是如何工作的。我們將使用 S3 Source、Task Launcher Sink、Spring Cloud Data Flow、一個相容 S3 的服務以及一個簡單的 Spring Batch 應用程式來處理檔案。

為了簡單起見,我們將使用 Docker Compose 在本地執行所有內容。

安裝 Spring Cloud Data Flow

為此示例建立一個專案目錄,開啟一個終端會話,並切換到專案目錄。下載 SCDF 的 docker-compose 檔案。

wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.1/spring-cloud-dataflow-server/docker-compose.yml

啟動 Spring Cloud Data Flow

設定 Data Flow 和 Skipper 版本,以及匯入最新流應用程式的 URI。然後執行 docker-compose

export DATAFLOW_VERSION=2.6.1
export SKIPPER_VERSION=2.5.1
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
docker-compose up

建立一些資料

SCDF 的 docker-compose.yml 掛載當前目錄,因此這些檔案可以在掛載路徑 /root/scdf 下供容器訪問。

我們將使用 Minio 進行 S3 儲存,並將其執行在 Docker 容器中,繫結到 minio 目錄。我們將向 minio/mybucket 新增一個數據檔案。這將充當我們的遠端目錄。

我們還將建立一個 download 目錄作為我們的共享本地目錄。download 目錄位於一個共享捲上,任何需要它的應用程式容器都可以訪問。在本例中,包括從 S3 下載檔案的 S3 source,以及將資料攝取並寫入資料庫表的批處理應用程式。在生產環境中,這將是一個外部持久卷,例如專用伺服器上掛載的 NFS 目錄。

mkdir -p minio/mybucket
mkdir download

在 S3 儲存桶位置建立一個數據檔案,name-list.csv。我們恰好有一個您可以下載的

wget -o minio/mybucket/name-list.csv https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest/data/name-list.csv

它包含 firstname,lastname 的行。批處理作業將為檔案中的每一行向 people 表插入一條記錄。

我們的專案目錄現在應該看起來像這樣

.
├── docker-compose.yml
├── download
└── minio
    └── mybucket
        └── name-list.csv

啟動 Minio

我們將執行 Minio,建立一個卷掛載將其容器的 /data 路徑繫結到 minio 目錄。這將建立一個 S3 儲存桶 mybucket,其中包含 name-list.csv

docker run --mount type=bind,source="$(pwd)"/minio,target=/data -p 9000:9000 -e "MINIO_ACCESS_KEY=minio" -e "MINIO_SECRET_KEY=minio123" minio/minio server /data

此時,如果願意,您可以開啟瀏覽器訪問 https://:9000,使用上述憑據登入並檢視儲存桶。

使用 Data Flow 建立任務和流

現在我們已經搭建好了本地環境,我們可以編排我們的檔案攝取管道了。

註冊批處理應用程式

我們恰好擁有所需的應用程式,已釋出到 repo.spring.io Maven 倉庫。原始碼在這裡

要註冊此應用程式,請在瀏覽器中開啟 https://:9393/dashboard 並導航到 Apps 頁面。點選 Add Application(s) 並使用 URI 註冊一個名為 fileingestTask 應用程式。

maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT

然後點選 Register the application(s)

RegisterTaskApplication

建立任務定義

應用程式註冊完成後,我們需要建立一個任務定義,該定義將在任務啟動請求中引用。我們將任務命名為 fileingest,與應用程式同名。

CreateFileIngestTask

建立流

現在我們將建立一個流,為 S3 儲存桶中的每個新檔案啟動 fileingest 任務。由於 S3 儲存桶中已經有一個檔案,我們期望它會被下載到我們的共享 download 目錄。發生這種情況時,將向 Task Launcher sink 傳送任務啟動請求,該 sink 將啟動 fileingest 任務來處理它。

在左側選單欄選擇 Streams 並點選 Create stream(s)。將下面的流定義剪下並貼上到文字區域。

注意

在 S3 endpoint URL 中替換您主機的區域網 IP 地址。由於 localhost 解析為容器自身的 IP,我們需要使用區域網 IP。獲取此值的方法有很多種。

ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'

在 OS/X 上對我有用。

另外

dig +short $(hostname)

以前有效,直到我的僱主將我的機器加入他們的域。

這是流定義

s3 --spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction --file.consumer.mode=ref --s3.common.endpoint-url=http:<lan-ip-address>:9000 --s3.common.path-style-access=true --s3.supplier.remote-dir=mybucket --s3.supplier.local-dir=/root/scdf/download --cloud.aws.credentials.accessKey=minio --cloud.aws.credentials.secretKey=minio123 --cloud.aws.region.static=us-east-1 --cloud.aws.stack.auto=false --task.launch.request.taskName=fileingest --task.launch.request.argExpressions='localFilePath=payload' | tasklauncher --spring.cloud.dataflow.client.server-uri=http://dataflow-server:9393

流定義基本上是 s3|tasklauncher,但 S3 source 需要一些配置。分解如下:

  • spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction - 函式組合的秘訣。在這裡,我們使用逗號作為組合分隔符,而不是標準的 |。如果以這種方式使用 |,DSL 解析器會感到困惑。我們將 S3 source 的主要函式 s3suppliertaskLaunchRequestFunction(應用程式上下文中的一個 Function Bean,如果願意,可用於任何標準源)進行組合。

  • file.consumer.mode=ref - 負載是下載檔案的路徑。

  • s3.common.endpoint-url - 用於我們的 Minio 例項的 S3 服務 endpoint。如果您使用 AWS S3,則不需要此項。

  • s3.common.oath-style-access=true - Minio 需要此項。

  • s3.supplier.remote-dir=mybucket - 我們的源將監控的 S3 儲存桶。

  • 3.supplier.local-dir=/root/scdf/download - 從容器角度來看的本地目錄路徑。

  • cloud.aws.credentials.accessKey=minio

  • cloud.aws.credentials.secretKey=minio123 - 憑證直接使用 spring-cloud-aws 屬性名稱。

  • cloud.aws.region.static=us-east-1 - AWS s3 SDK 需要一個區域,Minio 忽略此項。

  • cloud.aws.stack.auto=false - 不為 AWS 做任何特殊操作。

  • task.launch.request.taskName=fileingest - 要啟動的任務名稱。這是必需的,但可以透過 SpEL 表示式動態設定。

  • task.launch.request.argExpressions='localFilePath=payload' - 每次啟動任務時,我們都希望將檔案位置作為命令列引數傳遞。在本例中,我們的攝取任務正在尋找名為 localFilePath 的引數,其值是針對每條訊息評估的訊息負載。此路徑位於配置的本地目錄 /root/scdf/download/<filename> 中。因此批處理應用程式可以看到它。

在這種情況下,task launcher sink 只需要 Data Flow Server 的 URI。對於在 skipper 容器中執行的 sink,主機名是 dataflow-server

CreateFileIngestStream

建立流並給它命名。

部署流

使用 play 按鈕部署流。這將開啟一個頁面,允許您檢視配置並進行任何更改。點選頁面底部的 Deploy stream

驗證任務已啟動

流部署完成後,轉到 Tasks 頁面,最終(30 秒內)您應該看到 fileingest 任務已完成。

TaskView

您還可以看到,檔案已複製到 download 目錄

.
├── docker-compose.yml
├── download
│   └── name-list.csv
└── minio
    └── mybucket
        └── name-list.csv

Executions 選項卡可以獲取更多工執行詳情。

由於這也是一個 Spring Batch 應用程式,您可以轉到 Jobs 頁面,找到 ingestJob,然後點選 info 圖示顯示 Job 執行詳情。

JobDetails

驗證資料庫中的資料

作業詳情報告執行了 5494 次寫入。Data Flow Server 配置所有任務應用程式的 DataSource 使用其資料庫來記錄任務和作業執行狀態。對於本演示,我們使用相同的 DataSource 寫入應用程式資料。我們可以連線到 dataflow-mysql 容器來查詢表。

docker exec -it dataflow-mysql mysql -u root -p

使用密碼 rootpw 登入並查詢表。

VerifyData

結論

如果您讀到這裡,感謝您的時間和關注。如果您運行了演示,祝賀您!這是您應得的!

即使在最簡單的形式下,這也是 Data Flow 的一個相當高階的用例。在這裡,我們提供了 Spring Batch 應用程式。通常,您需要自己編寫(儘管有傳言稱 Spring Cloud Task 的下一個版本將包含一個可配置的批處理應用程式)。除此之外,我們無需編寫任何程式碼即可擁有一個功能齊全、雲原生、事件驅動的 ETL 管道,將資料從 S3 攝取到關係資料庫。事件驅動意味著新資料一旦到達即可被攝取並供使用者使用,而不是按夜間計劃執行,次日才提供資料。檔案可以併發處理,根據需要執行多個作業例項。由於 Data Flow 限制了平臺上併發執行的任務數量,因此該架構可以處理非常高的負載而不會耗盡平臺資源。

從 S3 攝取檔案的一個可行替代方案是無需將檔案複製到共享檔案系統。在這種情況下,可以將 S3 source 配置為 list only=true,以便它提供遠端 S3 路徑。然後批處理作業連線到 S3 並直接處理遠端檔案。此 Stack Overflow 文章 提供了一些關於如何執行此操作的提示。

然而,在使用 S/FTP 時,這種方法不太理想,因為它們是檔案傳輸協議,直接流式傳輸受到限制。如果您不使用持久卷,並且作業由於某種原因失敗,您必須從頭開始,很可能需要從部分完成的狀態手動回退。使用 S/FTP 實現此管道與我們在此展示的非常相似。

敬請關注

本文是關於新的基於函式的 Spring Cloud Stream 應用程式相關主題系列文章的一部分。未來幾周敬請期待更多內容。

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊,保持聯絡

訂閱

快人一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲取支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一個簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視全部