保持領先
VMware 提供培訓和認證,助您快速進步。
瞭解更多今天,我們很高興宣佈 Spring XD 1.0 M2 版本釋出 (下載) Spring XD 是一個用於資料提取、即時分析、批處理和資料匯出的統一、分散式、可擴充套件的系統。該專案的目標是簡化大資料應用的開發。
Spring XD 的第二個里程碑版本引入了一些新特性,使得資料提取和處理即時流以及協調基於 Hadoop 的批處理作業變得更加容易。在這篇博文中,我們將介紹
在單節點模式下啟動 Spring XD ($XD_HOME/bin/xd-singlenode),並在一個單獨的視窗中啟動 shell。下面的示例展示瞭如何建立一個簡單的流,將透過 http 傳送的資料寫入檔案。注意,shell 為命令提供了 tab 補全提示。
$bin>./xd-shell
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name httpStream --definition "http | file"
xd:>tap create --name httpTap --definition "tap httpStream | counter"
xd:>http post --target https://:9000 --data "helloworld"
您可以列出所有流和 taps,以驗證它們是否已建立
xd:>stream list
Stream Name Stream Definition
----------- -----------------
httpStream http | file
xd:>tap list
Tap Name Stream Name Tap Definition
-------- ----------- ------------------------
httpTap httpStream tap httpStream | counter
如果您檢查位於目錄 /tmp/xd/output/httpStream.out
中的檔案,您將看到 hello world 訊息。
xd:>! cat /tmp/xd/output/httpStream.out
The httpTap is simply counting messages. To see the name of the counter created and its value, use the counter shell command
xd:>counter list
Counter name
------------
httpTap
xd:>counter display --name httpTap
1
在單節點模式下,計數器儲存在記憶體中,但也支援 Redis,這是非單節點模式下的預設設定。您可以使用命令列引數 –analytics redis
啟用 Redis 支援。
要建立一個將資料儲存在 Hadoop 中的 twitter 流以及一個即時計算推文中話題標籤頻率的計數器,請執行以下命令。請注意,要獲得一個consumerKey和consumerSecret您需要註冊一個 twitter 應用。如果您還沒有設定,您可以在Twitter 開發者網站上建立一個應用來獲取這些憑據。
xd:> stream create bieberStream --definition "twittersearch --consumerKey=<your-key> --consumerSecret=<your-secret> --query=bieber | hdfs"
xd:> tap create --name bieberHashTap --definition "tap bieberStream | field-value-counter --fieldName=entities.hashTags.text --counterName=bieberHashCount"
xd:> hadoop config fs --namenode hdfs://:8020
xd:> hadoop fs cat /xd/bieberStream/bieberStream-0.log
... see fun tweets here ...
xd:> fieldvaluecounter display --name bieberHashCount
FieldName=bieberHashCount
------------------------- - -----
VALUE - COUNT
mtvhottest | 57
MTVHottest | 31
MTVhottest | 10
mtvhottets | 3
MtvHottest | 2
MTVHott | 2
JustinBieber | 2
MTVH | 2
MTVHOTTEST | 2
KCAMEXICO | 1
BeliebersAreProudOfJustin | 1
MyBeliebers | 1
在計數器話題上,引入了一種新的聚合計數器型別,它可以將訊息中某個欄位的計數按年、月、日、小時和分鐘的時間桶進行聚合。
只需幾行 shell 命令,您就完成了不少工作!請檢視使用者指南以瞭解所有 shell 命令的詳細資訊。
到目前為止,所示的流處理管道都是線性的,但通常需要支援更復雜的流程。為了解決這種情況,在 M2 版本中引入了命名通道。您可以不使用源或目標模組,而是使用命名通道。為了與 unix 主題保持一致,從/向特定通道提取/傳送資料使用 `>’ 字元,並且名稱字首帶有 `:`
下面是一個示例,展示瞭如何使用命名通道來共享由不同輸入源驅動的資料管道。
xd:>stream create out --definition ":foo > file --name=demo"
xd:>stream create in1 --definition "http > :foo"
xd:>stream create in2 --definition "time > :foo"
xd:>http post --target https://:9000 --data "hello"
檢視輸出檔案
xd:>! cat /tmp/xd/output/demo.out
您會看到單詞“hello”與時間戳值混合在一起。將訊息扇出到多個流以及根據訊息內容將訊息路由到不同流的支援計劃在未來的里程碑版本中實現。
另外值得注意的是,我們增加了對 4 個 Hadoop 版本的支援
在啟動 XDContainer 時,可以透過傳遞命令列選項 –hadoopDistribution
來選擇要使用的特定發行版 jar。您也應該能夠使用其他 Hadoop 發行版,例如 Hadoop 1.2.x。我們將在後續版本中新增針對其他發行版的明確選項。值得注意的是, 在示例倉庫中有一個示例展示瞭如何將 Spring XD 與 Pivotal HD 的 HAWQ 功能一起使用。
M1 版本提供了基於本地和 Redis 佇列的傳輸方式,用於模組之間的通訊(在 DSL 中用管道符號表示)。M2 版本提供了基於 Rabbit 的傳輸方式支援,允許您利用功能齊全的訊息代理進行流提取。
可以使用 Spring XD 執行批處理作業,並設定觸發器來啟動這些作業。例如,我們可以重用 Hadoop 中的經典 wordcount 示例,提供一個簡單的包含兩個步驟的工作流編排。第一步是將檔案複製到 HDFS,第二步是執行 wordcount MapReduce 作業。
要執行示例,請克隆spring-xd-samples 倉庫並構建示例 batch-wordcount。然後按如下所示覆制 jar、config 和資料檔案。
$ cd batch-wordcount
$ mvn clean assembly:assembly
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/modules/job/* $XD_HOME/modules/job
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/lib/* $XD_HOME/lib
$ cp target/batch-wordcount-1.0.0.BUILD-SNAPSHOT-bin/nietzsche-chapter-1.txt /tmp
現在停止並在單節點模式下重新啟動 Spring XD ($XD_HOME/bin/xd-singlenode)。然後在 shell 中執行以下命令
xd:> job create --name wordCountJob --definition "wordcount"
或者,您也可以指定一個 cron 表示式來安排作業執行。您可以透過檢視 map reduce 作業的輸出來驗證結果。
xd:> hadoop config fs --namenode hdfs://:8020
xd:> hadoop fs cat /count/out/part-r-00000
工作流中也支援其他步驟,例如執行 Hive 或 Pig 指令碼。要編寫這些型別的工作流,請查閱 Spring for Apache Hadoop 參考指南。也支援非 Hadoop 的步驟。
下一個版本的一個主要主題是透過整合Spring Batch Admin專案的元件來提供更多批處理作業的管理功能。您將能夠透過向命名通道傳送訊息來觸發批處理作業,並從命名通道接收作業狀態通知。這將使您能夠輕鬆設定基於資料可用性的批處理作業觸發,例如
file --dir "/data/inbound" | jobParameterCreator > :wordCountJob
當檔案在目錄 /data/inbound
中可用時,透過向命名通道 :wordCountJob. 傳送訊息來啟動 wordcount 批處理作業。當批處理作業執行時,會有一系列資料流供您消費,這些訊息包含關於JobExecution、StepExecution 等的資料。
:wordCountJob.notifications > filter --expression "payload.status.equals('COMPLETED')" | email --address "[email protected]"
使用通道在流和作業之間交換資料是您可以開始瞭解 Spring XD 如何採取措施統一流處理和批處理這兩個領域的一個方面。敬請關注!