先行一步
VMware 提供培訓和認證,助力您快速進步。
瞭解更多本文是探討基於 Java Functions 的重新設計的 Spring Cloud Stream 應用系列部落格的一部分。在本文中,我們將介紹 Elasticsearch sink,它允許我們在 Elasticsearch 中索引記錄,以及它對應的 Consumer 函式。
以下是本系列部落格的所有先前部分。
在我們瞭解 Elasticsearch sink 應用之前,先來看看驅動該 sink 的 consumer 函式。正如我們在之前的其他 sink 應用中看到的那樣,consumer 是一個標準的 java.util.function.Consumer
,它接受一個 Message<?>
。該 consumer 依賴於 Spring Boot 對 Elasticsearch 的支援,Spring Boot 會自動配置一個來自 Elasticsearch 的 RestHighLevelClient
。該 consumer 支援以下載荷型別的訊息。
String
java.util.Map
來自 Elasticsearch 的 XContentBuilder。
使用該 consumer 時,要使用的 Elasticsearch 索引由屬性 elasticsearch.consumer.index
指定
您可以透過設定 INDEX_ID
訊息頭來設定要用於每條訊息的 Elasticsearch ID。另外,您也可以設定 elasticsearch.consumer.id
屬性,該屬性接受一個 SpEL 表示式。如果兩者都沒有設定,Elasticsearch 將自動生成一個 ID。
透過將屬性 elasticsearch.consumer.async
設定為 true
,我們可以使索引操作非同步化。
我們可以將 consumer 函式注入到應用中,並直接呼叫其 accept
方法將記錄索引到 ElasticSearch。
例如,假設我們在應用中注入 consumer bean,如下所示。
@Autowired
ElasticsearchConsumer elasticsearchConsumer
然後我們可以使用以下 java.util.Map
來索引一條記錄。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", 100);
jsonMap.put("dateOfBirth", 1471466076564L);
jsonMap.put("fullName", "John Doe");
final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build();
elasticsearchConsumer.accept(message);
上面的 map 中的資訊可以作為純文字 JSON
提供,或者使用來自 Elasticsearch 的 XContentBuilder 提供。
正如我們在之前的部落格中看到的那樣,當 consumer 函式與 Spring Cloud Stream 結合使其成為一個 sink 應用時,它會變得更加強大。它具有無縫地與中介軟體技術通訊的固有能力。該 sink 應用從 Apache Kafka 或 RabbitMQ 等中介軟體系統消費資料併發送到 Elasticsearch。我們已經為 Kafka 和 RabbitMQ 提供了開箱即用的 Elasticsearch 變體。
讓我們看看執行 Apache Kafka 獨立 Elasticsearch sink 應用的步驟。
首先,下載 sink 應用。由於該 sink 尚未正式釋出,我們使用最新的里程碑版本。
wget https://repo.spring.io/milestone/org/springframework/cloud/stream/app/elasticsearch-sink-kafka/3.0.0-M4/elasticsearch-sink-kafka-3.0.0-M4.jar
在執行應用之前,請確保您已執行 Easticsearch。這裡提供一個在 docker 容器中啟動單節點 Elasticsearch 叢集的快速方法。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
我們還需要確保 Kafka 正在執行。
然後如下執行應用
java -jar elasticsearch-sink-kafka-3.0.0-M4.jar
--spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing
透過提供 input destination 屬性,我們要求該 sink 從 Kafka 主題 data-in
接收資料並將其傳送到 Elasticsearch 索引 testing
。
向 Kafka 主題傳送一些測試資料。例如,如果 Kafka 在本地埠 9092
執行,您可以使用 Kafka 控制檯生產者指令碼傳送資料,如下所示。
kafka-console-producer.sh --broker-list localhost:9092 --topic data-in
然後傳送以下 JSON
資料。
{"key1":"value1"}
我們可以透過呼叫以下端點來驗證資料是否已被索引。
curl localhost:9200/testing/_search
類似地,我們也可以下載 Elasticsearch sink 應用的 RabbitMQ 變體,並在 RabbitMQ 叢集上執行它。
正如我們在本系列之前多次看到的那樣,當這些 Spring Cloud Stream 應用作為資料管道的一部分在 Spring Cloud Data Flow 上執行時,它們會變得更加強大和有彈性。
上面我們看到的 Elasticsearch 可以與其他許多應用結合使用。例如,一個 TCP source 應用可以從一個源接收資料,然後將資料轉儲到中介軟體目標,Elasticsearch sink 再從那裡消費並索引資料。然後,分析應用可以使用這個索引來生成儀表盤。這只是一個例子,還有許多這樣的用例。Spring Cloud Data Flow 使這些管道的編排對使用者來說是無縫的。我們鼓勵您查閱我們之前部落格中關於如何在 Spring Cloud Data Flow 上部署應用的步驟。使用同樣的步驟,Elasticsearch sink 應用也可以部署。
在本部落格中,我們瞭解了 Elasticsearch consumer 函式及其對應的 Spring Cloud Stream sink 的工作原理。consumer 函式可以注入到自定義應用中,與其他業務邏輯結合使用。該 sink 應用提供了開箱即用的 Kafka 和 RabbitMQ 中介軟體變體。
本系列部落格還將有幾篇後續文章。請繼續關注。