領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多我們很高興地宣佈釋出 Reactor Kafka 1.0.0 的第一個里程碑版本。
Reactor Kafka 是一個基於 Project Reactor 的 Apache Kafka 響應式 API。Reactor Kafka API 能夠使用具有非阻塞背壓和極低開銷的函式式 API 將訊息釋出到 Kafka 主題並從 Kafka 主題消費訊息。這使得使用 Reactor 的應用程式能夠將 Kafka 用作訊息匯流排或流平臺,並與其他系統整合以提供端到端的響應式管道。
Reactor Kafka 的價值主張是在具有多個外部互動(其中 Kafka 是外部系統之一)的應用程式中高效利用資源。端到端響應式管道受益於非阻塞背壓和高效的執行緒使用,從而能夠高效處理大量併發請求。Project Reactor 提供的最佳化使響應式應用程式的開發具有極低的開銷和可預測的容量規劃,從而提供低延遲、高吞吐量的管道。
要開始執行響應式 Kafka 生產者和消費者示例,請遵循參考指南中“入門”部分的說明。
Reactor Kafka API 基於 Apache Kafka 生產者/消費者 API,由兩個主要類組成
Sender 用於向 Kafka 主題釋出訊息
Receiver 用於從 Kafka 主題消費訊息
這些響應式介面提供了底層 Kafka Producer 和 Consumer 的完整功能。
Sender<Integer, String> sender =
Sender.create(SenderOptions.create(producerProps)); (1)
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = (2)
Flux.range(1, 10)
.map(i -> SenderRecord.create(producerRecord(topic, i), i));
sender.send(outboundFlux, false) (3)
.doOnNext(r -> log.debug("Message #{} result: {}",
r.correlationMetadata(), r.recordMetadata())) (4)
.subscribe(); (5)
建立 Sender
要傳送到 Kafka 的出站訊息的 Flux
響應式傳送
記錄每次傳送的結果
訂閱以啟動訊息流到 Kafka
ReceiverOptions<Integer, String> receiverOptions = (1)
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Receiver.create(receiverOptions) (2)
.receive() (3)
.subscribe(r -> {
log.info("Received message {} ", r.record()); (4)
r.offset().acknowledge(); (5)
});
建立 ReceiverOptions 並配置對 Kafka 主題的訂閱
建立 Receiver
響應式接收
記錄每條傳入訊息
處理訊息後確認,以便可以提交偏移量
Reactor Kafka 原始碼和示例可在 github 上獲取。
有關更多資訊和其他資源,請參閱 Reactor Kafka 參考指南和 Javadocs。