使用 Apache Kafka 構建整合和資料處理管道,結合 Spring

工程 | Josh Long | 2015 年 4 月 15 日 | ...

應用程式生成的資料量比以往任何時候都多,而在分析之前,應對負載本身就是一個巨大的挑戰。Apache Kafka 應對了這一挑戰。它最初由 LinkedIn 設計,隨後於 2011 年開源。該專案旨在為處理即時資料流提供一個統一、高吞吐量、低延遲的平臺。其設計深受事務日誌的影響。它是一個訊息系統,類似於傳統的 RabbitMQ、ActiveMQ、MQSeries 等訊息系統,但它非常適合日誌聚合、持久訊息、快速(每秒**數百**兆位元組!)讀寫,並且可以容納眾多客戶端。當然,這使得它**非常適合**雲規模的架構!

Kafka 為許多大型生產系統提供支援。LinkedIn 將其用於活動資料和操作指標,為 LinkedIn 新聞源和 LinkedIn Today 以及進入 Hadoop 的離線分析提供動力。Twitter 將其用作流處理基礎設施的一部分。Kafka 為 Foursquare 的線上到線上和線上到離線訊息提供支援。它用於將 Foursquare 監控和生產系統與基於 Hadoop 的離線基礎設施整合。Square 使用 Kafka 作為匯流排,將所有系統事件透過 Square 的各種資料中心移動。這包括指標、日誌、自定義事件等。在消費者端,它輸出到 Splunk、Graphite 或類似 Esper 的即時警報系統。Netflix 每天使用它處理 300-600 億條訊息。Airbnb、Mozilla、Goldman Sachs、Tumblr、Yahoo、PayPal、Coursera、Urban Airship、Hotels.com 以及其他看似無窮無盡的網路巨頭也在使用它。顯然,它在一些強大的系統中發揮著重要作用!

安裝 Apache Kafka

安裝 Apache Kafka 有很多不同的方法。如果您使用 OSX 並安裝了 Homebrew,只需執行 brew install kafka 即可。您也可以從 Apache 下載最新的分發版本。我下載了 kafka_2.10-0.8.2.1.tgz,解壓後,您會發現其中包含 Apache Zookeeper 和 Kafka 的分發版本,因此不需要其他任何東西。我將 Apache Kafka 安裝在我的 $HOME 目錄下的 bin 子目錄中,然後建立了一個指向 $HOME/bin/kafka 的環境變數 KAFKA_HOME

首先啟動 Apache Zookeeper,指定其所需配置屬性檔案的位置:

$KAFKA_HOME/bin/zookeeper-server-start.sh  $KAFKA_HOME/config/zookeeper.properties

Apache Kafka 分發版本自帶 Zookeeper 和 Kafka 的預設配置檔案,這使得入門變得簡單。在更高階的用例中,您需要自定義這些檔案。

然後啟動 Apache Kafka。它同樣需要一個配置檔案,如下所示:

$KAFKA_HOME/bin/kafka-server-start.sh  $KAFKA_HOME/config/server.properties

server.properties 檔案包含(除其他內容外)連線到 Apache Zookeeper 的預設值 (zookeeper.connect)、應透過套接字傳送多少資料、預設有多少分割槽以及代理 ID (broker.id - 在叢集中必須唯一) 等資訊。

同一目錄中還有其他指令碼可用於傳送和接收虛擬資料,這對於確認一切正常執行非常方便!

現在 Apache Kafka 已經啟動並執行,讓我們看看如何在我們的應用程式中使用 Apache Kafka。

一些高階概念...

一個 Kafka **broker** 叢集由一個或多個伺服器組成,每個伺服器可以執行一個或多個 broker 程序。Apache Kafka 設計為高可用;沒有**主**節點。所有節點都可以互換。資料從一個節點複製到另一個節點,以確保在發生故障時資料仍然可用。

在 Kafka 中,一個**主題(topic)**是一個類別,類似於 JMS 目的地,或者同時類似於 AMQP 交換器和佇列。主題是分割槽的,訊息生產者決定訊息應傳送到主題的哪個分割槽。分割槽中的每條訊息都被分配一個唯一的序列 ID,即其**偏移量(offset)**。更多的分割槽允許消費者更大的並行性,但這也會導致 broker 上產生更多檔案。

**生產者(Producers)**將訊息傳送到 Apache Kafka broker 主題,併為他們生成的每條訊息指定要使用的分割槽。訊息生產可以是同步的或非同步的。生產者還指定他們想要什麼樣的複製保證。

**消費者(Consumers)**監聽主題上的訊息並處理已釋出訊息流。正如您使用過其他訊息系統所預期的那樣,這通常(而且很有用!)是非同步的。

Spring XD 以及許多其他分散式系統一樣,Apache Kafka 使用 Apache Zookeeper 來協調叢集資訊。Apache Zookeeper 提供了一個共享的層次結構名稱空間(稱為**znodes**),節點可以共享該名稱空間以瞭解叢集拓撲和可用性(這也是 Spring Cloud 即將支援它的另一個原因...)。

Zookeeper 在您與 Apache Kafka 的互動中非常重要。例如,Apache Kafka 提供了兩種不同的 API 來充當消費者。更高級別的 API 入門更簡單,它可以處理分割槽等所有細節。它需要引用一個 Zookeeper 例項來維護協調狀態。

現在讓我們看看如何在 Spring 中使用 Apache Kafka。

結合 Spring Integration 使用 Apache Kafka

最近釋出的 Spring Integration for Apache Kafka 1.1 非常強大,它提供了入站介面卡,用於使用較低級別的 Apache Kafka API 以及較高級別的 API。

目前,該介面卡主要採用 XML 配置,儘管 Spring Integration Java 配置 DSL 的工作正在進行中,並且里程碑版本已經可用。我們現在將在這裡同時介紹這兩種方式。

為了使所有這些示例都能正常工作,我添加了 libs-milestone-local Maven 倉庫並使用了以下依賴項:

  • org.apache.kafka:kafka_2.10:0.8.1.1
  • org.springframework.boot:spring-boot-starter-integration:1.2.3.RELEASE
  • org.springframework.boot:spring-boot-starter:1.2.3.RELEASE
  • org.springframework.integration:spring-integration-kafka:1.1.1.RELEASE
  • org.springframework.integration:spring-integration-java-dsl:1.1.0.M1

結合 Spring Integration XML DSL 使用 Spring Integration Apache Kafka

首先,讓我們看看如何使用 Spring Integration 出站介面卡將 Spring Integration 流中的 Message<T> 例項傳送到外部 Apache Kafka 例項。該示例非常簡單:一個名為 inputToKafka 的 Spring Integration channel 充當管道,將 Message<T> 訊息轉發到出站介面卡 kafkaOutboundChannelAdapter。介面卡本身可以從 kafka:producer-context 元素中指定的預設配置獲取其配置,也可以從介面卡本地的配置覆蓋中獲取。給定的 kafka:producer-context 元素中可以有一個或多個配置。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>

    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                                              topic="event-stream"
                                              compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

</beans>

以下是 Spring Boot 應用程式中的 Java 程式碼,透過將訊息傳送到傳入的 inputToKafka MessageChannel 來使用出站介面卡觸發訊息傳送。

package xml;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

@SpringBootApplication
@EnableIntegration
@ImportResource("/xml/outbound-kafka-integration.xml")
public class DemoApplication {

    private Log log = LogFactory.getLog(getClass());

    @Bean
    @DependsOn("kafkaOutboundChannelAdapter")
    CommandLineRunner kickOff(@Qualifier("inputToKafka") MessageChannel in) {
        return args -> {
            for (int i = 0; i < 1000; i++) {
                in.send(new GenericMessage<>("#" + i));
                log.info("sending message #" + i);
            }
        };
    }

    public static void main(String args[]) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

使用新的 Apache Kafka Spring Integration Java 配置 DSL

Spring Integration 1.1 版本釋出後不久,Spring Integration 的明星人物 Artem Bilan 就開始 著手新增 Spring Integration Java 配置 DSL 的類似功能,結果非常出色!它尚未正式釋出(您目前需要新增 libs-milestone 倉庫),但我鼓勵您嘗試一下。對我來說它工作得很好,而且 Spring Integration 團隊總是非常樂於在可能的情況下獲得早期反饋!這裡有一個示例,演示了從兩個不同的 IntegrationFlow 傳送和消費訊息。生產者與上面的 XML 示例類似。

此示例中的新功能是輪詢消費者。它是面向批處理的,將在固定間隔拉取其看到的所有訊息。在我們的程式碼中,收到的訊息將是一個 Map,其鍵是主題,值是另一個 Map,其中包含分割槽 ID 和讀取記錄的批次(在此例中為 10 條記錄)。還有一種基於 MessageListenerContainer 的替代方案,它可以處理傳入的訊息。

package jc;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec;
import org.springframework.integration.dsl.kafka.Kafka;
import org.springframework.integration.dsl.kafka.KafkaHighLevelConsumerMessageSourceSpec;
import org.springframework.integration.dsl.kafka.KafkaProducerMessageHandlerSpec;
import org.springframework.integration.dsl.support.Consumer;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

/**
 * Demonstrates using the Spring Integration Apache Kafka Java Configuration DSL.
 * Thanks to Spring Integration ninja <a href="https://springframework.tw/team/artembilan">Artem Bilan</a>
 * for getting the Java Configuration DSL working so quickly!
 *
 * @author Josh Long
 */
@EnableIntegration
@SpringBootApplication
public class DemoApplication {

  public static final String TEST_TOPIC_ID = "event-stream";

  @Component
  public static class KafkaConfig {

    @Value("${kafka.topic:" + TEST_TOPIC_ID + "}")
    private String topic;

    @Value("${kafka.address:localhost:9092}")
    private String brokerAddress;

    @Value("${zookeeper.address:localhost:2181}")
    private String zookeeperAddress;

    KafkaConfig() {
    }

    public KafkaConfig(String t, String b, String zk) {
        this.topic = t;
        this.brokerAddress = b;
        this.zookeeperAddress = zk;
    }

    public String getTopic() {
        return topic;
    }

    public String getBrokerAddress() {
        return brokerAddress;
    }

    public String getZookeeperAddress() {
        return zookeeperAddress;
    }
  }

  @Configuration
  public static class ProducerConfiguration {

    @Autowired
    private KafkaConfig kafkaConfig;

    private static final String OUTBOUND_ID = "outbound";

    private Log log = LogFactory.getLog(getClass());

    @Bean
    @DependsOn(OUTBOUND_ID)
    CommandLineRunner kickOff( 
           @Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
        return args -> {
            for (int i = 0; i < 1000; i++) {
                in.send(new GenericMessage<>("#" + i));
                log.info("sending message #" + i);
            }
        };
    }

    @Bean(name = OUTBOUND_ID)
    IntegrationFlow producer() {

      log.info("starting producer flow..");
      return flowDefinition -> {

        Consumer<KafkaProducerMessageHandlerSpec.ProducerMetadataSpec> spec =
          (KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata)->
            metadata.async(true)
              .batchNumMessages(10)
              .valueClassType(String.class)
              .<String>valueEncoder(String::getBytes);

        KafkaProducerMessageHandlerSpec messageHandlerSpec =
          Kafka.outboundChannelAdapter(
               props -> props.put("queue.buffering.max.ms", "15000"))
            .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
            .addProducer(this.kafkaConfig.getTopic(), 
                this.kafkaConfig.getBrokerAddress(), spec);
        flowDefinition
            .handle(messageHandlerSpec);
      };
    }
  }

  @Configuration
  public static class ConsumerConfiguration {

    @Autowired
    private KafkaConfig kafkaConfig;

    private Log log = LogFactory.getLog(getClass());

    @Bean
    IntegrationFlow consumer() {

      log.info("starting consumer..");

      KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka.inboundChannelAdapter(
          new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress()))
            .consumerProperties(props ->
                props.put("auto.offset.reset", "smallest")
                     .put("auto.commit.interval.ms", "100"))
            .addConsumer("myGroup", metadata -> metadata.consumerTimeout(100)
              .topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1))
              .maxMessages(10)
              .valueDecoder(String::new));

      Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100));

      return IntegrationFlows
        .from(messageSourceSpec, endpointConfigurer)
        .<Map<String, List<String>>>handle((payload, headers) -> {
            payload.entrySet().forEach(e -> log.info(e.getKey() + '=' + e.getValue()));
            return null;
        })
        .get();
    }
  }

  public static void main(String[] args) {
      SpringApplication.run(DemoApplication.class, args);
  }
}

該示例大量使用了 Java 8 的 lambda 表示式。

生產者花了一些時間確定單次傳送操作將傳送多少訊息,如何編碼鍵和值(畢竟 Kafka 只知道 byte[] 陣列),以及訊息是同步傳送還是非同步傳送。在下一行,我們配置出站介面卡本身,然後定義一個 IntegrationFlow,以便所有訊息都透過 Kafka 出站介面卡傳送出去。

消費者花了一些時間確定要連線哪個 Zookeeper 例項,在一個批次中接收多少條訊息(10 條)等。一旦收到訊息批次,它們就會被傳遞到 handle 方法,我在其中傳遞了一個 lambda 表示式,它將遍歷 payload 的 body 並打印出來。沒什麼特別的。

結合 Spring XD 使用 Apache Kafka

Apache Kafka 是一個訊息匯流排,當用作整合匯流排時,它非常強大。然而,它的真正價值在於它足夠快且具有足夠的伸縮性,可以用於在處理管道中路由大資料。如果您正在進行資料處理,那麼您絕對需要 Spring XD!Spring XD 使在複雜的流處理管道中使用 Apache Kafka 變得非常簡單(因為其支援是基於 Apache Kafka Spring Integration 介面卡構建的!)。Apache Kafka 被暴露為 Spring XD 的一個**源(source)**——資料來源地,以及一個**匯(sink)**——資料目的地。

Spring XD 提供了一個非常方便的 DSL,用於建立類似於 bash 的管道-過濾器流。Spring XD 是一個集中的執行時,用於管理、伸縮和監控資料處理作業。它構建於 Spring Integration、Spring Batch、Spring Data 和 Spring for Hadoop 之上,是一個一站式資料處理平臺。Spring XD 作業從**源(sources)**讀取資料,透過可能計數、過濾、豐富或轉換資料的處理元件執行它們,然後將它們寫入匯(sinks)。

Spring Integration 和 Spring XD 的大師級人物 Marius Bogoevici,他在 Spring Integration 和 Spring XD 實現 Apache Kafka 的最近工作中貢獻良多,整理了一個非常好的示例,演示了 如何建立一個完整可用的 Spring XD 和 Kafka 流README 檔案將引導您完成設定 Apache Kafka、Spring XD 和所需主題的整個過程。然而,關鍵在於使用 Spring XD shell 和 shell DSL 來組合流。Spring XD 元件是預配置的命名元件,但有許多引數可以透過 XD shell 和 DSL 的 --.. 引數進行覆蓋。(順便說一句,這個 DSL 是由著名的 Spring Expression language 作者,傑出的 Andy Clement 編寫的!)這裡有一個示例,配置了一個流,用於從 Apache Kafka 源讀取資料,然後將訊息寫入一個名為 log 的元件,這是一個匯(sink)。在這種情況下,log 可以是 syslogd、Splunk、HDFS 等。

xd> stream create kafka-source-test --definition "kafka --zkconnect=localhost:2181 --topic=event-stream | log" --deploy

就是這樣!當然,這只是 Spring XD 的一個初步體驗,但希望您會同意它的可能性是誘人的。

使用 Lattice 和 Docker 部署 Kafka 伺服器

使用 Lattice(一個支援流行 Docker 映象格式等多種容器格式的分散式執行時)可以輕鬆設定一個示例 Kafka 安裝。Spotify 提供了一個 Docker 映象,它設定了一個共存的 Zookeeper 和 Kafka 映象。您可以輕鬆地將其部署到 Lattice 叢集,如下所示:

ltc create --run-as-root m-kafka spotify/kafka

從那裡,您可以輕鬆地伸縮 Apache Kafka 例項,並且更輕鬆地從基於雲的服務中消費 Apache Kafka。

後續步驟

您可以在我的 GitHub 賬戶上找到此部落格的程式碼

我們只觸及了皮毛!

如果您想了解更多(為什麼不呢?),那麼務必檢視 Marius Bogoevici 和 Mark Pollack 博士即將舉行的關於使用 Spring XD 和 Apache Kafka 構建響應式資料管道的網路研討會,他們將在其中演示使用 RxJava、Spring XD 和 Apache Kafka 是多麼容易!

獲取 Spring 時事通訊

透過 Spring 時事通訊保持聯絡

訂閱

搶先一步

VMware 提供培訓和認證,助您加速前進。

瞭解更多

獲取支援

Tanzu Spring 透過一項簡單訂閱,為 OpenJDK™、Spring 和 Apache Tomcat® 提供支援和二進位制檔案。

瞭解更多

近期活動

檢視 Spring 社群的所有近期活動。

檢視全部