領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本文是探討基於 Java 函式重新設計的 Spring Cloud Stream 應用程式的系列部落格文章的一部分。在本章中,我們將探討如何使用 Spring Cloud Stream 應用程式 和 Spring Cloud Data Flow 來實現一個非常常見的 ETL 用例:從遠端服務攝取檔案。具體來說,我們將研究如何從 S3、SFTP 和 FTP 攝取檔案。
到目前為止,部落格系列中包含的內容如下:
要全面瞭解情況,Spring Cloud Data Flow 已經支援使用 SFTP 進行遠端檔案攝取幾年了。自撰寫這篇文章以來,基本架構沒有改變,但正如我們將看到的,新的流應用程式允許更簡單、更靈活的解決方案。

檔案攝取架構始於一個遠端檔案源,它輪詢遠端目錄併為檢測到的每個檔案釋出一條訊息。術語遠端檔案源指的是提供此功能的任何源應用程式。迄今為止,這包括Amazon S3 源、SFTP 源和FTP 源。
每個源都可以配置為將遠端目錄中的檔案同步到本地目錄。在這種情況下,底層Supplier函式產生的訊息負載是本地檔案路徑。供應商的輸出會在此過程中轉換為任務啟動請求。我們稍後會解釋這是如何完成的。請求由Task Launcher sink 接收,它透過其 REST API 將請求傳送到 Data Flow Server,以啟動批處理作業來攝取檔案內容。在下面顯示的示例中,該作業將 CSV 檔案的每一行插入到資料庫表中。
如果我們在 Kubernetes 或 Cloud Foundry 等雲平臺上執行,我們需要使用 NFS 等方式配置共享卷,以便任務應用程式可以訪問源下載的檔案。
這是使用 Spring Cloud Data Flow 推薦的檔案攝取架構。以下特點使其具有很強的彈性:
檔案攝取作業使用 Spring Batch 實現。它非常適合大型檔案處理,其中臨時故障可能需要作業從中斷處重新啟動——Spring Batch 專門設計用於處理這種情況。
Task Launcher sink 使用 PollableMessageSource,以便它可以在從輸入佇列中拉取任務啟動請求之前,首先確認 Data Flow 可以接受任務請求。Data Flow 配置了最大允許併發任務執行數量。該 sink 使用 Data Flow API 檢查此限制是否已達到,然後才接受下一個請求。這種流控制(類似於背壓)可以防止當(例如)100 個檔案被放入遠端目錄時,輕易發生的平臺資源飽和。
共享卷是必需的,以便批處理作業在必要時可以從上次提交的事務繼續處理。
如果沒有 Spring Cloud Data Flow 或 Spring Batch,也可以實現這種型別的工作負載。我們將此留給讀者自行練習。
遠端檔案源是什麼意思?Amazon S3、SFTP 和 FTP 源應用程式在 Spring Integration 中共享一個共同的血統,因此行為基本相同。例如,擴充套件AbstractInboundFileSynchronizer的類用於將本地目錄與遠端目錄同步。基類包括配置FileListFilter以指定要包含哪些檔案。通常,這用於模式匹配檔名。此外,此元件使用MetadataStore來跟蹤本地目錄中已有的檔案以及上次修改時間,以便只同步新檔案或已更改的檔案。預設情況下,元資料儲存是記憶體實現。這意味著當源重新啟動時,我們可能會收到已處理檔案的事件。為了解決這個問題,這些源中的每一個都可以輕鬆自定義以使用可用的幾種持久化實現之一。AbstractFileSynchronizer還支援使用 SpEL 表示式建立本地檔名、自動遠端檔案刪除等。
除了檔案同步,這些源中的每一個都包含file.consumer.mode屬性,其值可以是:
contents - 負載是檔案內容(位元組陣列)
ref - 負載是本地檔案路徑
lines - 每個負載是檔案中的一行
此外,每個源都提供了一個list-only選項,在這種情況下,負載包含遠端檔案的元資料,並且不執行同步。
SFTP Source從 SFTP 伺服器消費檔案。由於 SFTP 是最常用的遠端檔案服務,此元件具有最先進的功能。事實上,在上一代流應用程式中,SFTP 是我們為檔案攝取架構支援的唯一源。隨著它發展以支援任務啟動請求,我們最終專門為檔案攝取用例實現了一個特殊變體。旨在與tasklauncher-dataflow sink 配合使用的sftp-datalow源,嵌入了將有效負載轉換為任務啟動請求的程式碼。在當前版本中,我們已棄用此變體,轉而使用函式組合。此外,sftp 源可以設定為輪詢多個遠端目錄,在每個目錄之間輪流。在此配置中,輪詢演算法可以是fair(每個遠端目錄輪詢一次),也可以不是(每個遠端目錄持續輪詢直到沒有新檔案)。它還支援sftp.supplier.stream=true,這將直接流式傳輸內容而無需同步到本地目錄。
FTP 源與 SFTP 源非常相似,只是它使用 FTP 且在傳輸過程中不加密資料,因此安全性較低。它提供相同的核心功能,但目前不支援多個遠端目錄、list-only或stream模式。
Amazon S3 Source是根據其他源建模的,並支援相同的檔案消費者模式以及list-only模式。在這種情況下,s3.supplier.remote-dir指的是一個 S3 儲存桶。當使用list-only時,有效負載包含一個S3ObjectSummary,它提供有關 S3 物件的元資料。S3 本身提供了比 FTP/SFTP 更豐富的功能集。
除了 AWS S3,此源現在還可以與相容 S3 的實現(例如Minio)一起使用。
在之前的版本中,這被稱為tasklauncher-dataflow sink。最初,我們還有獨立的任務啟動器,每個受支援的平臺一個。為了易用性和彈性(如上所述),這些任務啟動器已被棄用,轉而使用 Data Flow 支援的實現。因此,我們從名稱中刪除了“Data Flow”。它現在簡單地是tasklauncher-sink。
該接收器是基於相應的tasklauncher-function構建的,該函式可在任何獨立應用程式中使用,以向 Data Flow 傳送任務啟動請求。這實現為Function<LaunchRequest, Optional<Long>>。LaunchRequest是一個簡單的值物件,至少包含要啟動的任務名稱。此任務必須在 Data Flow 中使用相同的名稱定義。可選地,啟動請求包括命令列引數和部署屬性。如果請求已提交,該函式返回唯一的任務 ID 作為長整型。如果 Data Flow 伺服器指示任務平臺已達到其最大執行任務數、無法聯絡到 Data Flow 伺服器或請求無效,則不會提交請求。
任務啟動器接收器從計劃任務中呼叫其基本函式,該任務由DynamicPeriodicTrigger觸發,該觸發器允許在執行時更新其週期。在這種情況下,我們使用它來實現指數退避。從初始的一秒週期開始,如果滿足以下條件,觸發器將退避,最終達到每 30 秒一次:
沒有排隊的啟動請求
平臺已執行最大任務數
如果這些條件中的任何一個發生變化,源會將週期重置為其初始值。當然,初始和最大觸發週期都是可配置的。
觸發的任務會檢查伺服器是否可以接受新的啟動請求,如果可以,它會使用PollableMessageSource輪詢輸入佇列。如果有請求,它會透過其 REST API 將請求釋出到 Data Flow。
新的基於函式架構為函式組合提供了第一類支援。作為此策略的一部分,某些常用函式可以與任何源組合。值得注意的是,這包括一個任務啟動請求函式。這意味著現在可以配置任何遠端檔案源來生成任務啟動請求。任務啟動請求函式可以評估 SpEL 表示式。例如,每個任務啟動請求都可以將不同的檔案路徑作為命令列引數提供。
讓我們深入研究一個示例,看看它是如何工作的。我們將使用 S3 源、任務啟動器接收器、Spring Cloud Data Flow、S3 相容服務和一個簡單的 Spring Batch 應用程式來處理檔案。
為了簡單起見,我們將使用 Docker Compose 在本地執行所有內容。
為本示例建立一個專案目錄,開啟終端會話,並切換到專案目錄。下載 SCDF docker-compose 檔案。
wget -O docker-compose.yml https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/v2.6.1/spring-cloud-dataflow-server/docker-compose.yml
設定 Data Flow 和 Skipper 版本,以及匯入最新流應用程式的 URI。然後執行docker-compose
export DATAFLOW_VERSION=2.6.1
export SKIPPER_VERSION=2.5.1
export STREAM_APPS_URI=https://repo.spring.io/libs-snapshot-local/org/springframework/cloud/stream/app/stream-applications-descriptor/2020.0.0-SNAPSHOT/stream-applications-descriptor-2020.0.0-SNAPSHOT.stream-apps-kafka-maven
docker-compose up
SCDF docker-compose.yml 掛載當前目錄,因此這些檔案可在掛載路徑 /root/scdf 下供容器訪問。
我們將使用 Minio 作為 S3 儲存,並將其執行在 Docker 容器中,以將其容器的/data路徑繫結到minio目錄。我們將在minio/mybucket中新增一個數據檔案。這將作為我們的遠端目錄。
我們再建立一個download目錄作為共享本地目錄。download目錄位於一個共享捲上,任何需要它的應用程式容器都可以訪問。在這種情況下,S3 源會從 S3 下載檔案,批處理應用程式將攝取資料並將其寫入資料庫表。在生產環境中,這將是一個外部持久卷,例如專用伺服器上掛載的 NFS 目錄。
mkdir -p minio/mybucket
mkdir download
在 S3 儲存桶位置建立一個數據檔案 name-list.csv。我們正好有一個您可以下載的。
wget -o minio/mybucket/name-list.csv https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-samples/master/batch/file-ingest/data/name-list.csv
這包含firstname,lastname行。批處理作業將為檔案中的每一行在people表中插入一行。
我們的專案目錄現在應該如下所示:
.
├── docker-compose.yml
├── download
└── minio
└── mybucket
└── name-list.csv
我們將執行 Minio,建立一個卷掛載以將其容器的/data路徑繫結到minio目錄。這將建立一個 S3 儲存桶mybucket,其中包含name-list.csv。
docker run --mount type=bind,source="$(pwd)"/minio,target=/data -p 9000:9000 -e "MINIO_ACCESS_KEY=minio" -e "MINIO_SECRET_KEY=minio123" minio/minio server /data
此時,如果您願意,可以開啟瀏覽器訪問https://:9000,使用上述憑據登入並檢視儲存桶。
現在我們已經設定好本地環境,我們可以編排檔案攝取管道。
我們恰好有需要的應用程式,釋出到repo.spring.ioMaven 倉庫。原始碼位於此處。
要註冊此應用程式,請開啟瀏覽器到https://:9393/dashboard並導航到“應用程式”頁面。單擊“新增應用程式”,然後使用 URI 註冊名為fileingest的Task應用程式。
maven://io.spring.cloud.dataflow.ingest:ingest:1.0.0.BUILD-SNAPSHOT
然後點選“註冊應用程式”。

應用程式註冊後,我們需要建立一個任務定義,該定義將在任務啟動請求中引用。我們將任務命名為fileingest,與應用程式名稱相同。

現在我們將建立一個流,以便在 S3 儲存桶中出現每個新檔案時啟動fileingest任務。由於 S3 儲存桶中已經有一個檔案,我們預計它會被下載到我們的共享download目錄。當這種情況發生時,任務啟動請求將被髮送到任務啟動器接收器,該接收器將啟動fileingest任務來處理它。
選擇左側選單欄上的Streams,然後單擊Create stream(s)。將下面的流定義剪下並貼上到文字區域中。
注意
用您主機的區域網 IP 地址替換 S3 端點 URL。由於localhost解析為容器自身的 IP,我們需要使用區域網 IP。有多種方法可以獲取此值。
ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'
在 OS/X 上對我有效。
另外
dig +short $(hostname)
在我公司將我的機器加入其域之前,它曾經有效。
這是流定義
s3 --spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction --file.consumer.mode=ref --s3.common.endpoint-url=http:<lan-ip-address>:9000 --s3.common.path-style-access=true --s3.supplier.remote-dir=mybucket --s3.supplier.local-dir=/root/scdf/download --cloud.aws.credentials.accessKey=minio --cloud.aws.credentials.secretKey=minio123 --cloud.aws.region.static=us-east-1 --cloud.aws.stack.auto=false --task.launch.request.taskName=fileingest --task.launch.request.argExpressions='localFilePath=payload' | tasklauncher --spring.cloud.dataflow.client.server-uri=http://dataflow-server:9393
流定義基本上是s3|tasklauncher,但 S3 源需要一些配置。分解如下:
spring.cloud.function.definition=s3Supplier,taskLaunchRequestFunction - 函式組合的關鍵。這裡,我們使用逗號作為組合分隔符,而不是標準的|。如果以這種方式使用|,DSL 解析器會混淆。我們將s3supplier(S3 源的主要函式)與taskLaunchRequestFunction(應用程式上下文中的函式 Bean,可供任何標準源使用,如果我們願意)進行組合。
file.consumer.mode=ref - 負載是下載檔案的路徑。
s3.common.endpoint-url - 我們的 Minio 例項的 S3 服務端點。如果您使用 AWS S3,則不需要此項。
s3.common.oath-style-access=true - Minio 需要此設定
s3.supplier.remote-dir=mybucket - 我們的源將監控的 S3 儲存桶
3.supplier.local-dir=/root/scdf/download - 從容器角度來看的本地目錄路徑
cloud.aws.credentials.accessKey=minio
cloud.aws.credentials.secretKey=minio123 - 憑據直接使用spring-cloud-aws屬性名稱
cloud.aws.region.static=us-east-1 - AWS s3 SDK 需要一個區域,Minio 忽略此項
cloud.aws.stack.auto=false - 對 AWS 不執行任何特殊操作。
task.launch.request.taskName=fileingest - 要啟動的任務名稱。此項為必填項,但可以透過 SpEL 表示式動態設定。
task.launch.request.argExpressions='localFilePath=payload' - 每次我們啟動任務時,我們都希望將檔案位置作為命令列引數傳遞。在這種情況下,我們的攝取任務正在尋找名為localFilePath的引數,其值是針對每條訊息評估的訊息有效負載。此路徑位於配置的本地目錄/root/scdf/download/<filename>中,因此批處理應用程式可以看到它。
在這種情況下,任務啟動器接收器只需要 Data Flow Server URI。對於在 skipper 容器中執行的接收器,主機名是dataflow-server。

建立流併為其命名。
使用play按鈕部署流。這將開啟一個頁面,讓您可以檢視配置並進行任何更改。點選頁面底部的Deploy stream。
流部署後,轉到Tasks頁面,最終(30 秒內)您應該會看到fileingest任務已完成。

您還可以看到,檔案已複製到download目錄
.
├── docker-compose.yml
├── download
│ └── name-list.csv
└── minio
└── mybucket
└── name-list.csv
在Executions選項卡中,您可以獲取任務執行的更多詳細資訊。
由於這也是一個 Spring Batch 應用程式,您可以轉到“作業”頁面,轉到ingestJob,然後單擊“資訊”圖示以顯示作業執行詳細資訊。

作業詳細報告顯示它執行了 5494 次寫入。Data Flow Server 配置所有任務應用程式的 DataSource 使用其資料庫來記錄任務和作業執行狀態。對於此演示,我們使用相同的 DataSource 來寫入應用程式資料。我們可以連線到dataflow-mysql容器來查詢表
docker exec -it dataflow-mysql mysql -u root -p
使用密碼rootpw登入並查詢表格

如果您讀到這裡,感謝您的時間和關注。如果您運行了演示,恭喜您!
即使以最簡單的形式,這也是 Data Flow 的一個相當高階的用例。在這裡,我們提供了 Spring Batch 應用程式。通常,您會編寫自己的(儘管 Spring Cloud Task 的下一個版本據說將包含一個可配置的批處理應用程式)。除此之外,我們不需要編寫任何程式碼就可以擁有一個功能齊全、雲原生、事件驅動的 ETL 管道,用於將資料從 S3 攝取到關係資料庫。事件驅動意味著新資料一旦到達就會被攝取並供使用者使用,而不是在夜間計劃執行,資料在第二天才能使用。檔案可以併發處理,根據需要執行多個作業例項。由於 Data Flow 限制了平臺上執行的併發任務數量,因此此架構可以處理非常高的負載而不會耗盡平臺資源。
從 S3 攝取檔案的一個可行替代方案消除了將檔案複製到共享檔案系統的需要。在這種情況下,S3 源可以配置為list only=true,以便它將提供遠端 S3 路徑。然後批處理作業連線到 S3 並直接處理遠端檔案。這篇Stack Overflow 帖子提供了一些關於如何實現此目的的提示。
然而,當使用 S/FTP 時,這種方法不太理想,因為這些是檔案傳輸協議,直接流式傳輸受到限制。如果您不使用持久卷,並且作業因某種原因失敗,您必須從頭開始,可能需要手動從部分完成的狀態倒回。使用 S/FTP 實現此管道與我們在此處所示的非常相似。
這篇文章是關於新的基於函式 Spring Cloud Stream 應用程式系列文章的一部分。未來幾周將有更多內容釋出。