領先一步
VMware 提供培訓和認證,助您飛速前進。
瞭解更多今天我們很高興地宣佈 Spring XD 1.0 M1 版本釋出 (下載)。Spring XD 是一個統一、分散式、可擴充套件的系統,用於資料攝取、即時分析、批處理和資料匯出。 該專案的目標是簡化大資料應用的開發。
從宏觀角度看,大資料應用與企業整合和批處理應用有許多共同特徵。 透過 Spring Integration 和 Spring Batch 專案,Spring 已經為構建整合和批處理應用提供了超過 6 年的成熟解決方案。 Spring XD 在此基礎上構建,提供了一個輕量級的執行時環境,可以透過簡單的 DSL 輕鬆配置和組裝。
在這篇部落格中,我們將介紹 Spring XD 的關鍵元件,即 Streams、Jobs、Taps、Analytics 以及用於宣告它們的 DSL,還有執行時架構。 更多詳細資訊可以在XD 指南中找到。http | file
你透過向預設執行在埠 8080 上的 XD Admin Server 傳送 HTTP 請求來告訴 Spring XD 建立一個 stream。 在 M2 版本中,我們將提供一個互動式 shell 與 XD 通訊,但在 M1 中,最簡單的方式是使用 'curl' 與 XD 互動。
curl -d "http | file" https://:8080/streams/httptest
Stream 的名稱是 httptest
,預設監聽的 HTTP 埠是 9000
,預設的檔案位置是 /tmp/xd/output/${streamname}
。
curl -d "hello world" https://:9000
你會在檔案 /tmp/xd/output/httptest
中看到字串 'hello world'
要更改預設值,你可以傳入選項引數
http --port=9090 | file --dir=/var/streams --name=data.txt
M1 版本中支援的源包括 file、time、HTTP、Tail、Twitter Search、Gemfire (Continuous Queries)、Gemfire (Cache Event)、Syslog 和 TCP。 支援的 Sink 包括 Log、File、HDFS、 Gemfire 分散式資料網格和 TCP。 要將 syslog 資料捕獲到 HDFS,DSL 如下
syslog | hdfs --namenode="http://192.168.1.100:9000"
你也可以新增自定義的源和 Sink。 透過遵循簡單步驟,可以新增 Spring Integration 中現有的 Inbound 和 Outbound Channel Adapters。 未來的版本將增加對 MQTT、RabbitMQ、JMS 和 Kafka 的支援。 我們歡迎pull request來貢獻你偏好的源和 Sink 模組。
Stream 的程式設計模型基於 Spring Integration。 輸入源將外部資料轉換為包含頭部(包含鍵值對)和載荷(可以是任何 Java 型別)的 Message。訊息透過 Message Channels 流經 stream。下圖顯示了一個包含輸入源、處理步驟和輸出 Sink 的 stream。
在 DSL 中,管道符號對應於將資料從一個處理步驟傳遞到下一個步驟的 channel。 Spring XD 中的 channel 可以是記憶體中的,也可以由 Redis、JMS、RabbitMQ 等中介軟體支援。 這實現了一個簡單的分散式處理模型,我們將在稍後討論。
表示帶有處理步驟的 stream 的 DSL 表示式形式如下
source | filter | transform | sink
M1 版本中支援的處理器包括 filter、transformer、json-field-extractor、json-field-value-filter 和 script。 filter 和 transformer 處理器支援使用 Spring Expression Language (SpEL) 和 Groovy。 在前面的例子中,要使用 SpEL 將 HTTP 請求的載荷轉換為大寫,
http | transform --expression=payload.toUpperCase() | file
script 處理器也允許你執行自定義的 Groovy 程式碼。
Tap 允許你“監聽”來自另一個 stream 的資料並在一個單獨的 stream 中處理這些資料。原始 stream 不受 Tap 影響,也感知不到其存在,類似於電話線上的竊聽器。 WireTap是 EAI 模式標準目錄的一部分,並且是 Spring XD 使用的 Spring Integration 框架的一部分。
Tap 可以從目標 stream 處理管道中的任何點消費資料。例如,如果你有一個名為 mystream 的 stream,定義如下source | filter | transform | sink
你可以使用 DSL 建立一個 tap
tap mystream.filter | sink2
這將在應用 filter 之後但在 transformer 之前竊聽 stream 的資料。因此,未轉換的資料將被髮送到 sink2。
例如,如果你使用命令建立一個名為 httpstream
的 stream
curl -d "http --port=9898 | filter --expression='payload.length() > 5'
| transform --expression=payload.toUpperCase()
| file" https://:8080/streams/httpstream
然後要在名為 httptap
的 stream 上建立一個將過濾後的資料 stream 寫入單獨檔案的 tap,使用以下命令
curl -d "tap httpstream.filter | file --dir=/tmp --name=filtered.txt" https://:8080/streams/httptap
釋出資料,例如
curl -d "hello world" https://:9898
curl -d "he" https://:9898
curl -d "hello world 2" https://:9898
結果會在檔案 /tmp/xd/output/httpstream
中看到 HELLO WORLD 和 HELLO WORLD 2,在 /tmp/filtered.txt
中看到小寫版本。文字 'he'
將不會出現在任一檔案中。
一個主要用例是在透過其主 stream 攝取資料的同時執行即時分析。例如,考慮一個消費 Twitter 搜尋結果並將其寫入 HDFS 的資料 stream。可以在資料寫入 HDFS 之前建立一個 tap,並將來自 tap 的資料透過管道傳遞給一個計數器,該計數器對應於推文中特定話題標籤被提及的次數。
問 10 個開發者什麼是‘即時分析’,你會得到 20 個答案。 答案範圍從非常簡單(但極其有用)的計數器,到移動平均,聚合計數器,直方圖,時間序列,機器學習演算法,再到嵌入式 CEP 引擎。 Spring XD 旨在作為一個通用類庫,支援廣泛的這些指標和分析資料結構,並與多種後端儲存技術配合使用。 它們也作為一種 Sink 型別暴露給 XD,用於 DSL 表示式中。
在 M1 版本中,支援 Counter、Field Value Counter、Gauge 和 Rich Gauge。這些指標可以儲存在記憶體中或 Redis 中。 更多詳情以及未來版本將實現的列表,請參閱JavaDocs和 使用者指南的分析部分。
舉個例子,考慮收集 stream 中推文話題標籤即時頻率計數的情況。 要使用 SpringXD 執行此操作,建立一個新的 stream 定義,使用 twitter search source 模組並命名為 ‘spring’
curl -d "twittersearch --query='spring' --consumerKey=<consumer-key> --consumerSecret=<consumer-secret>
| file" https://:8080/streams/spring
這會將推文儲存在本地檔案系統中。 注意,要獲取consumerKey和consumerSecret你需要註冊一個 Twitter 應用程式。如果你還沒有設定,可以在Twitter 開發者網站上建立一個應用以獲取這些憑據。
接下來,在 twittersearch 源的輸出上建立一個名為 ‘springtap’ 的 tap,用於統計推文中的話題標籤頻率。
curl -d "tap spring.twittersearch | field-value-counter
--fieldName=entities.hashTags.text
--counterName=hashTagFrequency" https://:8080/streams/springtap
欄位 entities.hashTags.text
是用於底層實現的 Spring Social Tweet 物件 JSON 表示中話題標籤的路徑。 要檢視前 5 個話題標籤,請使用 redis-cli 檢視名為 fieldvaluecounters.hashTagFrequency.
的有序集合內容。注意,通常需要幾分鐘才能收集到足夠包含話題標籤實體的推文。
> redis-cli
redis 127.0.0.1:6379>ZREVRANGEBYSCORE fieldvaluecounters.hashTagFrequency +inf -inf WITHSCORES LIMIT 0 5
1] "spring"
2] "6"
3] "Turkey"
4] "6"
5] "Arab"
6] "6"
7] "summer"
8] "3"
9] "fashion"
10] "3"
Spring XD 有兩種執行模式:單節點模式和分散式模式。第一種是單個程序處理所有處理和管理任務。這種模式有助於你輕鬆入門,並簡化應用的開發和測試。分散式模式允許處理任務分佈在一組機器上,一個管理伺服器傳送命令來控制在叢集上執行的處理任務。
M1 版本中的分散式架構很簡單。 stream 的每個部分(稱為 module)可以在自己的 container 例項中執行。 模組之間的資料透過 Redis 佇列傳遞。 更多詳情請參閱架構部分。 此版本的主要重點是正確設計抽象,例如讓 DSL 中的管道符號可以在各種 transport 中即插即用。 未來的版本將提供其他 transport 和效能改進,以及在 Hadoop 叢集內部執行的支援。本文未涵蓋的其他主題包括介紹Tuple 資料結構以及如何建立自定義處理器。 下一版本的一個重要部分將是支援 XDContainer 執行 Spring Batch 作業。 這些作業可用於幫助將資料從 HDFS 匯出到關係型資料庫,以及編排 Hadoop 作業(MapReduce、Pig、Hive 或 Cascading 作業)在叢集上的執行。 我們還將為指標(如聚合計數器、基於 HTTP/JMX 的管理)提供額外的庫,以及基於Reactor專案的某些高效能源,敬請關注!
在我們繼續努力向最終的 Spring XD 1.0.0 版本邁進之際,我們很想聽取您的反饋。如果您有任何問題,請使用Stackoverflow(標籤:springxd),要報告任何錯誤或改進,請使用Jira Issue Tracker或提交GitHub issue。