領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多我們很高興宣佈 Reactor Kafka 1.0.0 的首個里程碑版本釋出。
Reactor Kafka 是一個基於 Project Reactor 的 Apache Kafka 響應式 API。Reactor Kafka API 支援使用函式式 API 以非阻塞背壓和極低開銷的方式將訊息釋出到 Kafka 主題以及從 Kafka 主題消費訊息。這使得使用 Reactor 的應用程式能夠將 Kafka 用作訊息匯流排或流平臺,並與其他系統整合以提供端到端的響應式管道。
Reactor Kafka 的價值主張在於在包含多個外部互動的應用中高效利用資源,其中 Kafka 是外部系統之一。端到端的響應式管道受益於非阻塞背壓和高效的執行緒使用,從而能夠高效處理大量併發請求。Project Reactor 提供的最佳化使得響應式應用的開發開銷極低,容量規劃可預測,從而交付低延遲、高吞吐的管道。
要開始使用並執行示例響應式 Kafka 生產者和消費者,請遵循參考指南中“入門”部分的說明。
Reactor Kafka API 基於 Apache Kafka 生產者/消費者 API,包含兩個主要類:
用於將訊息釋出到 Kafka 主題的 Sender
用於從 Kafka 主題消費訊息的 Receiver
這些響應式介面提供了底層 Kafka Producer
和 Consumer
的全部功能。
Sender<Integer, String> sender =
Sender.create(SenderOptions.create(producerProps)); (1)
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = (2)
Flux.range(1, 10)
.map(i -> SenderRecord.create(producerRecord(topic, i), i));
sender.send(outboundFlux, false) (3)
.doOnNext(r -> log.debug("Message #{} result: {}",
r.correlationMetadata(), r.recordMetadata())) (4)
.subscribe(); (5)
建立 Sender
要傳送到 Kafka 的出站訊息 Flux
響應式傳送
記錄每次傳送的結果
訂閱以啟動訊息流向 Kafka
ReceiverOptions<Integer, String> receiverOptions = (1)
ReceiverOptions.<Integer, String>create(consumerProps)
.subscription(Collections.singleton(topic));
Receiver.create(receiverOptions) (2)
.receive() (3)
.subscribe(r -> {
log.info("Received message {} ", r.record()); (4)
r.offset().acknowledge(); (5)
});
建立 ReceiverOptions
並配置對 Kafka 主題的訂閱
建立 Receiver
響應式接收
記錄每條接收到的訊息
處理訊息後確認,以便提交偏移量
Reactor Kafka 原始碼和示例可在 github 上獲取。
更多資訊和額外資源,請參閱Reactor Kafka 參考指南和Javadocs。