Spring Integration Kafka 擴充套件 1.0.GA 版本已釋出

釋出 | Artem Bilan | 2015 年 2 月 9 日 | ...

親愛的 Spring 社群,

我們很高興地宣佈 Spring Integration Kafka 1.0 GA 擴充套件現已釋出,它為 Apache Kafka 提供了熟悉的 Spring Integration 端點。 像往常一樣,使用 Release Repository 以及 Maven 或 Gradle

compile "org.springframework.integration:spring-integration-kafka:1.0.0.RELEASE"

或下載 分發歸檔,體驗一下。

首先感謝所有為該專案做出貢獻的人——特別感謝該專案的創始人 Soby Chacko,他實現了基礎設施,以及基於高階消費者的訊息源和生產者,以及 Marius Bogoevici,他學習了用於訊息驅動消費者的簡單消費者 API 的複雜性。

概述

毫不奇怪,這個專案完全基於 Apache Kafka(0.8.1.1 版本)和 Spring Integration 基礎(4.0.5.RELEASE 版本)。 我們提供了一些抽象,例如 ConfigurationConnectionFactoryKafkaMessageListenerContainerKafkaConsumerContext/KafkaProducerContextKafkaMessage 等,以遵循 Spring 的解耦和易用性原則。 基於這些抽象,我們提供了高階 API,例如 KafkaMessageDrivenChannelAdapterKafkaHighLevelConsumerMessageSourceKafkaProducerMessageHandler,這些是 Spring Integration 方面的介面卡。 還提供了 XML 配置支援。

KafkaHighLevelConsumerMessageSource

Kafka 高階消費者 透過 <int-kafka:inbound-channel-adapter><int-kafka:consumer-context> 呈現,以使用 KafkaStream 從 Kafka 主題 poll 訊息。 它的主要優點是使用簡單,並且如果訊息源的多個例項並行執行,則能夠在消費者之間平衡分割槽。

典型的配置可能如下所示

<int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext"
                                    channel="inputFromKafka">
       <int:poller fixed-delay="10"/>
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContext"
                            consumer-timeout="4000"
                            zookeeper-connect="zookeeperConnect">
     <int-kafka:consumer-configurations>
           <int-kafka:consumer-configuration group-id="default"
                                  value-decoder="valueDecoder"
                                  key-decoder="valueDecoder"
                                  max-messages="5000">
                  <int-kafka:topic id="test1" streams="4"/>
                  <int-kafka:topic id="test2" streams="4"/>
           </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

正如您所看到的,除了 <int-kafka:consumer-context> 之外,還需要引用 zookeeperConnect。 這是一個簡單的 bean,它表示與 Zookeeper 叢集的連線

<int-kafka:zookeeper-connect id="zookeeperConnect" 
                          zk-connect="host1,host2,host3:2182"
                          zk-connection-timeout="6000"
                          zk-session-timeout="6000"
                          zk-sync-time="2000"/>

KafkaHighLevelConsumerMessageSource 生成帶有 Map<String, Map<Integer, List<Object>>> payloadMessage,它類似於“每個主題的按分區劃分的 Kafka 訊息”。

KafkaMessageDrivenChannelAdapter

Kafka 簡單消費者 透過 <int-kafka:message-driven-adapter> 呈現,並且基於眾所周知的 ListenerContainer 抽象 - KafkaMessageListenerContainer(類似於 Spring AMQP SimpleMessageListenerContainer 或 Spring JMS DefaultMessageListenerContainer

@Bean
public Configuration zkConfiguration() {
   return new ZookeeperConfiguration(new ZookeeperConnect());
}

@Bean
public ConnectionFactory kafkaConnectionFactory() {
   return new DefaultConnectionFactory(zkConfiguration());
}

@Bean
public MessageProducer kafkaMessageDrivenChannelAdapter() {
   KafkaMessageDrivenChannelAdapter adapter = 
            new KafkaMessageDrivenChannelAdapter(
   		new KafkaMessageListenerContainer(kafkaConnectionFactory(),    
                                       "topic1", "topic2"));
   adapter.setOutputChannel(inputChannel());
   return adapter;
}

該元件的主要優點是可以更好地控制組件偵聽的分割槽(這些分割槽是可配置的),以及起始偏移量(例如,在需要重放主題的情況下)。 此外,還有更豐富的偏移量管理和錯誤處理策略。

監聽任務的結果是單個 Message,其 payload 基於 Kafka 訊息和帶有來自 KafkaHeaders 的鍵的其他 headers。 排序在分割槽內保留。

可以為 Kafka 訊息以及 Kafka 訊息鍵使用 kafka.serializer.Decoder 配置這兩種介面卡。 Spring Integration Kafka 開箱即用地提供了 Avro Encoder/Decoder 實現。

此外,Spring Integration Kafka 引入了 OffsetManager 抽象來處理 Kafka 主題偏移量,這在使用 High Level Consumer 時不可用。 提供了 MetadataStoreOffsetManagerKafkaTopicOffsetManagerOffsetManager 必須注入到 KafkaMessageListenerContainer。 預設情況下,使用 MetadataStoreOffsetManager,它由 Spring Integration Core 中的 SimpleMetadataStore 支援。

KafkaProducerMessageHandler

Kafka 生產者 透過 <int-kafka:outbound-channel-adapter><int-kafka:producer-context> 對呈現。 後者利用目標 Kafka Producer 的配置,該配置由 MessageHeaders 中的 KafkaHeaders.TOPIC 選擇,或者由 <int-kafka:outbound-channel-adapter> 上的 topic-expression 選擇,並與 <int-kafka:producer-configuration> 子元素上配置的 topic 選項匹配

<int-kafka:producer-context id="kafkaProducerContext">
	<int-kafka:producer-configurations>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					key-class-type="java.lang.String"
					value-class-type="java.lang.String"
					topic="test1"
					value-encoder="kafkaEncoder"
					key-encoder="kafkaEncoder"
					compression-codec="default"/>
		<int-kafka:producer-configuration broker-list="localhost:9092"
					topic="test2"
					compression-codec="default"/>
	</int-kafka:producer-configurations>
</int-kafka:producer-context>

正如您所看到的,有足夠的選項來調整目標 Producer,並且每個 Producer 都可以由特定的 broker-list 支援。 如果只存在一個 <int-kafka:producer-configuration>,您可以將訊息傳送到任何 topic,例如,基於 Message 上下文標頭。

Spring XD 使用這些介面卡作為 Kafka 源和接收器。 此外,它還提供了 KafkaMessageBus。 這些功能將很快在 Spring XD 1.1 RELEASE 中提供。

此外,在我們完成此版本時,Apache Kafka 0.8.2 也收到了最終版本。 我們很高興祝賀該團隊,我們將在不久的將來合併新功能 - 這僅僅是在 Spring 應用程式中提供對 Kafka 的一流支援的開始!

有關更多資訊,請參見專案主頁

與往常一樣,我們期待您的評論和反饋(StackOverflow (spring-integration 標籤), Spring JIRA, GitHub),並且我們非常歡迎貢獻

獲取 Spring 新聞資訊

訂閱 Spring 新聞資訊,保持聯絡

訂閱

遙遙領先

VMware 提供培訓和認證,以加速您的進步。

瞭解更多

獲取支援

Tanzu Spring 在一個簡單的訂閱中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群中所有即將舉行的活動。

檢視所有