案例研究:Elasticsearch Sink

工程 | Soby Chacko | 2020 年 11 月 16 日 | ...

本文是探討基於 Java Functions 的重新設計的 Spring Cloud Stream 應用系列部落格的一部分。在本文中,我們將介紹 Elasticsearch sink,它允許我們在 Elasticsearch 中索引記錄,以及它對應的 Consumer 函式。

以下是本系列部落格的所有先前部分。

Elasticsearch Consumer

在我們瞭解 Elasticsearch sink 應用之前,先來看看驅動該 sink 的 consumer 函式。正如我們在之前的其他 sink 應用中看到的那樣,consumer 是一個標準的 java.util.function.Consumer,它接受一個 Message<?>。該 consumer 依賴於 Spring Boot 對 Elasticsearch 的支援,Spring Boot 會自動配置一個來自 Elasticsearch 的 RestHighLevelClient。該 consumer 支援以下載荷型別的訊息。

使用該 consumer 時,要使用的 Elasticsearch 索引由屬性 elasticsearch.consumer.index 指定

您可以透過設定 INDEX_ID 訊息頭來設定要用於每條訊息的 Elasticsearch ID。另外,您也可以設定 elasticsearch.consumer.id 屬性,該屬性接受一個 SpEL 表示式。如果兩者都沒有設定,Elasticsearch 將自動生成一個 ID。

透過將屬性 elasticsearch.consumer.async 設定為 true,我們可以使索引操作非同步化。

我們可以將 consumer 函式注入到應用中,並直接呼叫其 accept 方法將記錄索引到 ElasticSearch。

例如,假設我們在應用中注入 consumer bean,如下所示。

@Autowired
ElasticsearchConsumer elasticsearchConsumer

然後我們可以使用以下 java.util.Map 來索引一條記錄。

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", 100);
jsonMap.put("dateOfBirth", 1471466076564L);
jsonMap.put("fullName", "John Doe");
final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build();

elasticsearchConsumer.accept(message);

上面的 map 中的資訊可以作為純文字 JSON 提供,或者使用來自 Elasticsearch 的 XContentBuilder 提供。

Elasticsearch Sink

正如我們在之前的部落格中看到的那樣,當 consumer 函式與 Spring Cloud Stream 結合使其成為一個 sink 應用時,它會變得更加強大。它具有無縫地與中介軟體技術通訊的固有能力。該 sink 應用從 Apache Kafka 或 RabbitMQ 等中介軟體系統消費資料併發送到 Elasticsearch。我們已經為 Kafka 和 RabbitMQ 提供了開箱即用的 Elasticsearch 變體。

讓我們看看執行 Apache Kafka 獨立 Elasticsearch sink 應用的步驟。

首先,下載 sink 應用。由於該 sink 尚未正式釋出,我們使用最新的里程碑版本。

wget https://repo.spring.io/milestone/org/springframework/cloud/stream/app/elasticsearch-sink-kafka/3.0.0-M4/elasticsearch-sink-kafka-3.0.0-M4.jar

在執行應用之前,請確保您已執行 Easticsearch。這裡提供一個在 docker 容器中啟動單節點 Elasticsearch 叢集的快速方法。

docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2

我們還需要確保 Kafka 正在執行。

然後如下執行應用

java -jar elasticsearch-sink-kafka-3.0.0-M4.jar
 --spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing

透過提供 input destination 屬性,我們要求該 sink 從 Kafka 主題 data-in 接收資料並將其傳送到 Elasticsearch 索引 testing

向 Kafka 主題傳送一些測試資料。例如,如果 Kafka 在本地埠 9092 執行,您可以使用 Kafka 控制檯生產者指令碼傳送資料,如下所示。

kafka-console-producer.sh --broker-list localhost:9092 --topic data-in

然後傳送以下 JSON 資料。

{"key1":"value1"}

我們可以透過呼叫以下端點來驗證資料是否已被索引。

curl localhost:9200/testing/_search

類似地,我們也可以下載 Elasticsearch sink 應用的 RabbitMQ 變體,並在 RabbitMQ 叢集上執行它。

進一步使用該 sink

正如我們在本系列之前多次看到的那樣,當這些 Spring Cloud Stream 應用作為資料管道的一部分在 Spring Cloud Data Flow 上執行時,它們會變得更加強大和有彈性。

上面我們看到的 Elasticsearch 可以與其他許多應用結合使用。例如,一個 TCP source 應用可以從一個源接收資料,然後將資料轉儲到中介軟體目標,Elasticsearch sink 再從那裡消費並索引資料。然後,分析應用可以使用這個索引來生成儀表盤。這只是一個例子,還有許多這樣的用例。Spring Cloud Data Flow 使這些管道的編排對使用者來說是無縫的。我們鼓勵您查閱我們之前部落格中關於如何在 Spring Cloud Data Flow 上部署應用的步驟。使用同樣的步驟,Elasticsearch sink 應用也可以部署。

結論

在本部落格中,我們瞭解了 Elasticsearch consumer 函式及其對應的 Spring Cloud Stream sink 的工作原理。consumer 函式可以注入到自定義應用中,與其他業務邏輯結合使用。該 sink 應用提供了開箱即用的 Kafka 和 RabbitMQ 中介軟體變體。

本系列部落格還將有幾篇後續文章。請繼續關注。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

先行一步

VMware 提供培訓和認證,助力您快速進步。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部