領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多在我們的 Road to GA 系列中,本週我們將探討 Apache Kafka 4.0.0 中的共享組及其在 Spring for Apache Kafka 4.0.0 中的整合——這一功能從根本上擴充套件了我們從 Kafka 主題消費訊息的方式。
當我們首次使用Kafka時,其心智模型通常很簡單:主題儲存訊息,消費者讀取它們,並且處理在分割槽內按順序進行。這種基於分割槽的模型為無數應用程式提供了良好的服務,提供了帶有強保證的有序處理。然而,某些用例涉及建立數百個分割槽的主題,其主要目的是為了實現更高的並行性,而不是因為任何排序要求。分割槽數量和消費者並行性之間的關係在我們確實需要排序保證時非常有效,但當處理不需要保留序列的獨立事件時,它就成為一個限制。
Apache Kafka 4.0.0引入了共享組(也稱為“Kafka佇列”)作為一種補充的消費模型。這一新增功能不會取代傳統的消費者組,但為那些記錄級別分發比分割槽級別分配更有意義的場景提供了替代方案。Spring for Apache Kafka 4.0.0全面支援共享組,本文將探討它們的工作原理以及何時適合我們的應用程式需求。
請注意,共享組目前在Kafka 4.1.0中處於預覽狀態,預計將在Kafka 4.2.0中達到生產就緒狀態。
傳統消費者組將整個分割槽分配給消費者。在任何給定時間,每個分割槽都只屬於組中的一個消費者,這為我們提供了該分割槽內的有序處理。
共享組透過分發單個記錄而不是整個分割槽來採取不同的方法。代理協調共享組中可用消費者之間的記錄分發,允許任何消費者接收任何記錄,無論它來自哪個分割槽。
關鍵的權衡是:傳統消費者組透過分割槽分配提供排序保證,而共享組透過記錄級別分發提供擴充套件靈活性。
當滿足以下條件時選擇共享組:
在以下情況下繼續使用傳統消費者組:
讓我們簡要地瞭解共享組的機制,以理解幕後發生了什麼變化。
當我們在共享組中建立一個消費者時,它會連線到代理並請求記錄。代理透過一個名為共享協調器(Share Coordinator)的元件來維護協調,該元件跟蹤哪些記錄已分配給哪些消費者。當消費者請求記錄時,代理會從主題的分割槽中選擇未分配的記錄並將其傳送給該消費者。這些記錄現在處於“已獲取”狀態——已分配給該特定消費者進行處理。
已獲取狀態帶有基於時間的鎖(預設為30秒,可透過group.share.record.lock.duration.ms配置)。如果消費者在此超時時間內未確認記錄,代理會自動將其返回到可用池中,供其他消費者處理。這種獲取鎖提供了自動故障恢復,無需在消費者崩潰或無響應時進行手動干預。
消費者處理記錄併發送回確認。有三種可能的確認型別:ACCEPT(成功處理)、RELEASE(返回池中重試)和REJECT(標記為永久失敗)。根據確認,代理更新記錄的狀態並繼續。
這種協調發生在代理級別,這與傳統消費者組的工作方式不同,傳統消費者組中消費者直接跟蹤它們的偏移量。
代理還提供內建的重試語義。每次將記錄交付給消費者時,代理都會增加內部交付計數。預設情況下,在5次交付嘗試(可透過group.share.delivery.attempt.limit配置)後,代理會將記錄移動到存檔狀態。這為我們提供了“毒藥訊息”保護,而無需應用程式級別的重試邏輯,儘管我們仍然可以在需要更多控制時實現自己的重試策略。
Spring for Apache Kafka中共享組的程式設計模型與我們已知的模型保持一致。我們有兩種主要方法來設定共享消費者:程式設計容器建立和註解驅動的監聽器(使用@KafkaListener)。
我們首先配置一個ShareConsumerFactory而不是常規的ConsumerFactory
@Configuration
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String>
shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
此配置遵循我們用於傳統消費者的相同工廠模式。我們定義一個建立共享消費者的工廠和一個管理監聽器生命週期的容器工廠。Spring for Apache Kafka的抽象在兩種消費模型中保持一致。
我們可以透過程式設計方式建立一個容器並設定訊息監聽器
@Bean
public ShareKafkaMessageListenerContainer<String, String> imageProcessingContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("image-processing");
containerProps.setGroupId("image-processors");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
imageService.process(record.value());
// Implicit ACCEPT when method completes successfully
}
});
return container;
}
這使我們能夠對容器建立和配置進行細粒度控制。我們使用主題和組ID建立一個ContainerProperties例項,使用工廠和屬性例項化容器,並附加我們的訊息監聽器。
對於大多數用例,使用@KafkaListener的註解驅動方法提供了更清晰的程式設計模型
@KafkaListener(
topics = "image-processing",
groupId = "image-processors",
containerFactory = "shareKafkaListenerContainerFactory"
)
public void processImage(String imageUrl) {
// Process the image
imageService.process(imageUrl);
// Implicit ACCEPT when method completes successfully
}
containerFactory屬性告訴Spring使用我們的ShareKafkaListenerContainerFactory,它建立一個共享消費者而不是傳統消費者。groupId現在指的是共享組而不是消費者組,但註解結構保持不變。
當此方法成功完成時,Spring for Apache Kafka會自動向代理傳送ACCEPT確認。如果丟擲異常,它會發送REJECT,這會將記錄標記為永久失敗並阻止進一步的交付嘗試。這種隱式確認模式適用於簡單處理場景,其中成功或失敗清晰地對映到方法完成或異常。如果您需要臨時故障來觸發重試(使用RELEASE),您將需要使用顯式確認模式以進行更細粒度的控制。
有時我們需要對記錄的確認方式進行更多控制。我們可能希望明確拒絕我們已知無效的“毒藥訊息”,或者我們可能需要在處理邏輯中的特定點進行確認,而不是在方法完成時進行。
我們可以在不同級別啟用顯式確認。最常見的方法是在工廠級別進行配置
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit");
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> explicitShareConsumerFactory) {
// The factory will detect the explicit acknowledgment mode from the consumer factory configuration
return new ShareKafkaListenerContainerFactory<>(explicitShareConsumerFactory);
}
啟用顯式確認後,我們的監聽器方法會接收一個ShareAcknowledgment引數,該引數賦予我們直接控制權
@KafkaListener(
topics = "payment-processing",
groupId = "payment-processors",
containerFactory = "shareKafkaListenerContainerFactory"
)
public void processPayment(PaymentEvent event,
ShareAcknowledgment acknowledgment) {
try {
if (!isValid(event)) {
// Permanently reject invalid events
acknowledgment.reject();
return;
}
paymentService.process(event);
acknowledgment.acknowledge();
} catch (TransientException e) {
// Release for retry with another consumer
acknowledgment.release();
} catch (PermanentException e) {
// Don't retry unrecoverable errors
acknowledgment.reject();
}
}
三種確認型別使我們能夠精確控制記錄結果。呼叫acknowledge()會告訴代理記錄已成功處理並可以存檔。呼叫release()會將記錄返回到池中供其他消費者處理,這對於臨時故障(如臨時網路問題或資源不可用)非常有用。呼叫reject()會將記錄標記為永久失敗並阻止進一步的交付嘗試。
顯式確認模式的一個重要限制是:在所有先前交付的記錄都已確認之前,消費者無法輪詢新記錄。這可以防止未處理的記錄壓垮消費者,但也意味著我們必須確保每條記錄都收到確認(接受、釋放或拒絕),以避免阻塞消費者執行緒。Spring for Apache Kafka透過在記錄未確認30秒後(可透過shareAcknowledgmentTimeout配置)記錄警告來幫助除錯。
請記住,每次release()都會增加代理的內部交付計數,因此即使消費者不斷呼叫release(),代理最終也會在達到配置限制後存檔記錄。
傳統Kafka消費者按順序處理記錄——每個消費者例項從其分配的分割槽中輪詢記錄並一次處理一個。當我們需要更高的並行性時,我們通常會新增更多的消費者例項,這通常意味著更多的應用程式例項或程序。
共享組啟用了一種不同的擴充套件方法,因為記錄級別分發消除了分割槽分配的限制。Spring for Apache Kafka利用這一點,直接向ShareKafkaMessageListenerContainer添加了併發支援。
我們可以在單個容器中配置多個消費者執行緒
@Bean
public ShareKafkaListenerContainerFactory<String, String>
shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
factory.setConcurrency(10); // 10 concurrent consumer threads
return factory;
}
這會建立一個包含10個執行緒的容器,每個執行緒執行自己的共享消費者輪詢迴圈。所有10個執行緒都從同一個共享組中拉取記錄,並在同一個JVM中併發處理它們。如果我們在5個應用程式例項上執行此操作,那就是50個併發消費者處理記錄流。
這種併發模型為我們提供了擴充套件的靈活性。我們可以透過增加併發性(每個例項更多執行緒)進行垂直擴充套件,或者透過新增更多例項進行水平擴充套件,或者兩者兼而有之。對於需求可變的工作負載,我們可以調整併發設定或例項計數,而無需更改主題配置或擔心分割槽重新平衡。
共享組在Kafka 4.0.0中作為早期訪問功能引入,在Kafka 4.1.0中進入預覽階段,並預計在Kafka 4.2.0中達到生產就緒狀態。Spring for Apache Kafka 4.0.0(隨Spring Boot 4.0.0釋出)支援Kafka 4.1.0版本中實現的共享組。
有一個重要的相容性考慮:Kafka 4.0.0和4.1.0對於共享組不相容。這些版本之間的協議有所演變,因此在使用共享組時,客戶端和代理需要處於相同的次要版本。這在代理和客戶端庫可能在不同時間升級的環境中尤其重要。
共享組透過新增記錄級別分發作為分割槽級別分配的替代方案,擴充套件了Kafka的消費模型。這兩種模型都服務於重要的目的——具有排序保證的傳統消費者組對於有狀態處理和事件排序仍然至關重要,而共享組則為獨立事件的高吞吐量處理提供了優勢。
關鍵在於將消費模型與我們的應用程式要求相匹配。當順序很重要時,分割槽分配提供了我們所需的保證。當吞吐量和擴充套件靈活性比順序更重要時,記錄級別分發可以簡化我們的架構和資源管理。
Spring for Apache Kafka實現支援KIP-932並增加了Spring特有的增強。@KafkaListener整合與我們用於傳統消費者的程式設計模型保持一致。內建的併發支援提供了在單個應用程式例項中進行擴充套件的選項。超時檢測和優雅關閉等功能有助於生產部署處理操作問題。
Spring for Apache Kafka 4.0.0透過與現有@KafkaListener模型保持一致,使共享組的使用感覺自然。我們可以增量地採用共享組,將其用於特定用例,同時繼續將傳統消費者組用於其他用例。這兩種模型可以在同一個Spring for Apache Kafka應用程式中並存而不會發生衝突。
隨著共享組在Kafka 4.2.0中趨向生產就緒,值得評估它們是否適合我們當前或計劃中的任何用例。如果我們一直主要為了並行性而不是排序而預配大量分割槽,或者如果我們正在處理使容量規劃變得困難的可變工作負載,那麼共享組可能會提供一種更簡單的方法。
有關Spring for Apache Kafka中共享組的更多詳細資訊,請檢視參考文件。
我們歡迎您在應用程式中探索共享組時提供反饋。如果您遇到問題或有改進建議,請在Spring for Apache Kafka GitHub儲存庫上提出問題。您的意見有助於我們在GA版本釋出及以後改進框架。