領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多本文是部落格系列的一部分,該系列探討了基於Java Functions新設計的Spring Cloud Stream應用程式。在本文中,我們將介紹允許我們將記錄索引到Elasticsearch的Elasticsearch sink及其相應的Consumer函式。
以下是本部落格系列的所有先前部分。
在研究Elasticsearch sink應用程式之前,讓我們先看一下為sink提供動力的consumer函式。如我們在其他sink應用程式中已經看到的那樣,consumer是標準的java.util.function.Consumer,它接受一個Message<?>。該consumer依賴於Spring Boot對Elasticsearch的支援,該支援會自動配置Elasticsearch的RestHighLevelClient。該consumer支援具有以下型別載荷的訊息。
String
java.util.Map
來自Elasticsearch的XContentBuilder。
使用consumer時,要使用的Elasticsearch索引由屬性elasticsearch.consumer.index指定。
您可以透過設定INDEX_ID訊息頭來為每條訊息設定要使用的Elasticsearch ID。或者,您可以設定elasticsearch.consumer.id屬性,該屬性接受SpEL表示式。如果兩者都未設定,Elasticsearch將自動生成ID。
透過將屬性elasticsearch.consumer.async設定為true,我們可以使索引操作非同步進行。
我們可以將consumer函式注入到應用程式中,並直接呼叫其accept方法以將記錄索引到ElasticSearch。
例如,假設我們將consumer bean注入應用程式,如下所示。
@Autowired
ElasticsearchConsumer elasticsearchConsumer
然後,我們可以使用以下java.util.Map來索引一條記錄。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("age", 100);
jsonMap.put("dateOfBirth", 1471466076564L);
jsonMap.put("fullName", "John Doe");
final Message<Map<String, Object>> message = MessageBuilder.withPayload(jsonMap).build();
elasticsearchConsumer.accept(message);
上述map中的相同資訊可以以純JSON格式提供,或使用來自Elasticsearch的XContentBuilder提供。
正如我們在之前的部落格中看到的,當與Spring Cloud Stream結合使用時,consumer函式會變得更加強大,併成為一個sink應用程式。它具有與中介軟體技術無縫通訊的內在能力。sink應用程式從Apache Kafka或RabbitMQ等中介軟體系統消費資料,然後傳送到Elasticsearch。我們已經提供了開箱即用的Kafka和RabbitMQ的Elasticsearch變體。
讓我們來看一下執行獨立Elasticsearch sink應用程式(用於Apache Kafka)的步驟。
首先,請下載sink應用程式。由於sink尚未普遍可用,讓我們使用最新的里程碑版本。
wget https://repo.spring.io/milestone/org/springframework/cloud/stream/app/elasticsearch-sink-kafka/3.0.0-M4/elasticsearch-sink-kafka-3.0.0-M4.jar
在執行應用程式之前,請確保您已執行Elasticsearch。這是在docker容器中啟動單節點Elasticsearch叢集的快速方法。
docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2
我們還需要確保Kafka正在執行。
然後,按以下方式執行應用程式:
java -jar elasticsearch-sink-kafka-3.0.0-M4.jar
--spring.cloud.stream.bindings.input.destination=data-in --elasticsearch.consumer.index=testing
透過提供輸入目標屬性,我們要求sink從Kafka主題data-in接收資料,並將資料傳送到Elasticsearch索引testing。
將一些測試資料傳送到Kafka主題。例如,如果您在埠9092上本地執行Kafka,可以使用Kafka控制檯生產者指令碼傳送:
kafka-console-producer.sh --broker-list localhost:9092 --topic data-in
然後傳送以下JSON資料:
{"key1":"value1"}
我們可以透過呼叫以下端點來驗證資料是否已索引。
curl localhost:9200/testing/_search
同樣,我們也可以下載RabbitMQ版本的Elasticsearch sink應用程式,並將其針對RabbitMQ叢集執行。
正如我們在本系列中多次見到的,當這些Spring Cloud Stream應用程式作為Spring Cloud Data Flow上的資料管道的一部分執行時,它們會變得更加強大和有彈性。
上面看到的Elasticsearch可以與其他許多應用程式結合使用。例如,TCP source應用程式可以從源接收資料,然後將資料轉儲到中介軟體目標,然後Elasticsearch sink從中消耗並索引資料。然後,分析應用程式可以使用此索引來生成儀表板。這只是一個例子,還有許多類似的用例。Spring Cloud Data Flow使使用者能夠無縫地編排這些管道。我們鼓勵您檢視我們在前幾篇部落格中概述的在Spring Cloud Data Flow上部署應用程式的步驟。使用相同的步驟,也可以部署Elasticsearch sink應用程式。
在本部落格中,我們瞭解了Elasticsearch consumer函式及其相應的Spring Cloud Stream sink的工作原理。consumer函式可以注入到自定義應用程式中,與其他業務邏輯結合使用。sink應用程式是開箱即用的,可用於Kafka和RabbitMQ中介軟體變體。
這個部落格系列還有幾集即將到來。請繼續關注。