領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多今天,我們很高興地宣佈 Spring XD (下載) 的 1.0 M2 版本。Spring XD 是一個統一的、分散式的、可擴充套件的系統,用於資料攝取、即時分析、批處理和資料匯出。該專案的目標是簡化大資料應用程式的開發。
Spring XD 的第二個里程碑版本引入了多項新特性,使您能夠更輕鬆地攝取和處理即時資料流以及編排基於 Hadoop 的批處理作業。在這篇博文中,我們將涵蓋:
以單節點模式($XD_HOME/bin/xd-singlenode)啟動 Spring XD,然後在另一個視窗中啟動 shell。下面的示例演示瞭如何建立一個簡單的流,該流將透過 HTTP 釋出的資料寫入檔案。請注意,shell 提供了命令的 Tab 自動補全提示。
$bin>./xd-shell
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name httpStream --definition "http | file"
xd:>tap create --name httpTap --definition "tap httpStream | counter"
xd:>http post --target https://:9000 --data "helloworld"
您可以列出所有流和 Tap 來驗證它們是否已建立。
xd:>stream list
Stream Name Stream Definition
----------- -----------------
httpStream http | file
xd:>tap list
Tap Name Stream Name Tap Definition
-------- ----------- ------------------------
httpTap httpStream tap httpStream | counter
如果檢查位於目錄 /tmp/xd/output/httpStream.out 中的檔案,您將看到“hello world”訊息。
xd:>! cat /tmp/xd/output/httpStream.out
The httpTap is simply counting messages. To see the name of the counter created and its value, use the counter shell command
xd:>counter list
Counter name
------------
httpTap
xd:>counter display --name httpTap
1
在單節點模式下,計數器是記憶體中的,但它也支援 Redis,當不使用單節點模式時,Redis 是預設選項。您可以使用 --analytics redis 命令列引數啟用 Redis 支援。
要建立一個將資料儲存在 Hadoop 中並即時統計推文中標籤頻率的 Twitter 流,請執行以下命令。請注意,要獲取consumerKey還是consumerSecret您需要註冊一個 Twitter 應用程式。如果您還沒有設定好,可以在 Twitter Developers 站點上建立一個應用程式來獲取這些憑據。
xd:> stream create bieberStream --definition "twittersearch --consumerKey=<your-key> --consumerSecret=<your-secret> --query=bieber | hdfs"
xd:> tap create --name bieberHashTap --definition "tap bieberStream | field-value-counter --fieldName=entities.hashTags.text --counterName=bieberHashCount"
xd:> hadoop config fs --namenode hdfs://:8020
xd:> hadoop fs cat /xd/bieberStream/bieberStream-0.log
... see fun tweets here ...
xd:> fieldvaluecounter display --name bieberHashCount
FieldName=bieberHashCount
------------------------- - -----
VALUE - COUNT
mtvhottest | 57
MTVHottest | 31
MTVhottest | 10
mtvhottets | 3
MtvHottest | 2
MTVHott | 2
JustinBieber | 2
MTVH | 2
MTVHOTTEST | 2
KCAMEXICO | 1
BeliebersAreProudOfJustin | 1
MyBeliebers | 1
在談到計數器時,引入了一種新的 聚合計數器 型別,該型別將訊息中某個欄位的計數聚合到每年、每月、每天、每小時和每分鐘的時間桶中。
僅用幾行 shell 命令,您就完成了這麼多工作!有關所有 shell 命令的詳細資訊,請參閱 使用者指南。
到目前為止顯示的流處理管道是線性的,但通常需要支援更復雜的流。為了開始解決這種情況,M2 中引入了命名通道。您可以選擇命名通道而不是源或宿模組。為了保持 Unix 的主題,向特定通道的輸入/輸出資料使用 `>` 字元,通道名稱字首為 `:`。
這是一個示例,展示瞭如何使用命名通道來共享由不同輸入源驅動的資料管道。
xd:>stream create out --definition ":foo > file --name=demo"
xd:>stream create in1 --definition "http > :foo"
xd:>stream create in2 --definition "time > :foo"
xd:>http post --target https://:9000 --data "hello"
檢視輸出檔案
xd:>! cat /tmp/xd/output/demo.out
您將看到單詞“hello”與時間戳值交錯出現。將訊息廣播到多個流以及支援根據訊息內容將訊息路由到不同流的功能計劃在未來的里程碑版本中實現。
值得注意的是,我們還增加了對 4 個 Hadoop 版本的支援。
您可以在啟動 XDContainer 時透過傳遞命令列選項 --hadoopDistribution 來選擇要使用的特定發行版 jar。您也可以使用其他 Hadoop 發行版,例如 Hadoop 1.2.x。我們將在以後的版本中為其他發行版新增明確的選項。值得注意的是,在 samples repository 中有一個示例,演示瞭如何使用 Spring XD 和 Pivotal HD 的 HAWQ 功能。
M1 版本提供了本地和 Redis 佇列後備傳輸,用於模組之間的通訊,由 DSL 中的管道符號表示。M2 版本支援基於 Rabbit 的傳輸,讓您可以利用功能齊全的訊息代理進行流攝入。
可以使用 Spring XD 執行批處理作業,並設定觸發器來啟動這些作業。例如,我們可以重用 Hadoop 中經典的 wordcount 示例來提供一個簡單的包含兩個步驟的工作流編排。第一步是將檔案複製到 HDFS,第二步是執行 wordcount MapReduce 作業。
要執行該示例,請克隆 spring-xd-samples repository 並構建 sample batch-wordcount。然後,像下面一樣複製 jar、配置檔案和資料檔案。
$ cd batch-wordcount
$ mvn clean assembly:assembly
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/modules/job/* $XD_HOME/modules/job
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/lib/* $XD_HOME/lib
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/nietzsche-chapter-1.txt /tmp
現在停止並以單節點模式($XD_HOME/bin/xd-singlenode)重新啟動 Spring XD。然後在 shell 中執行以下命令。
xd:> job create --name wordCountJob --definition "wordcount"
或者,您也可以指定一個 cron 表示式來 排程作業 的執行。您可以透過檢視 map reduce 作業的輸出來驗證結果。
xd:> hadoop config fs --namenode hdfs://:8020
xd:> hadoop fs cat /count/out/part-r-00000
工作流中具有其他步驟,例如執行 Hive 或 Pig 指令碼,也得到支援。要編寫這些型別的工作流,請查閱 Spring for Apache Hadoop 參考指南。也支援非 Hadoop 基於的步驟。
下一個版本的一個主要主題是,透過整合 Spring Batch Admin 專案的元件,公開更多的批處理作業管理功能。您將能夠透過向命名通道傳送訊息來觸發批處理作業,以及透過命名通道接收作業狀態通知。這將使您能夠輕鬆地根據資料可用性設定批處理作業的觸發,例如
file --dir "/data/inbound" | jobParameterCreator > :wordCountJob
當檔案出現在目錄 /data/inbound 中時,透過向命名通道 :wordCountJob. 傳送訊息來啟動 wordcount 批處理作業。當批處理作業執行時,將有一系列資料可供您使用有關 JobExecution、StepExecution 等的資訊的訊息。
:wordCountJob.notifications > filter --expression "payload.status.equals('COMPLETED')" | email --address "[email protected]"
在流和作業之間使用通道交換資料是 Spring XD 正在採取步驟統一流和批處理這兩個領域的一個方面。敬請期待!