使用 Spring Cloud Stream 構建簡單的事件驅動微服務

工程 | Ben Wilcock | 2019 年 10 月 15 日 | ...

事件驅動架構很棒。但是,如果沒有框架,編寫與流行的事件訊息平臺協作所需的腳手架可能會很混亂。在這篇文章中,我們將看看如何使用 Spring Cloud Stream 來簡化你的程式碼。

問題

你只想為你的事件驅動應用程式編寫業務邏輯,但那些樣板式的訊息程式碼可能會礙事。將你的應用程式連線到訊息服務很棘手,如果你是一名企業開發者,你可能需要使用多種訊息技術(無論是在本地還是在雲端)。

解決方案

讓靈活的訊息抽象來處理複雜的訊息平臺整合,這樣你就可以專注於編寫簡單清晰的業務邏輯。Spring Cloud Stream 是一個很好的選擇。它將許多流行的訊息平臺統一在一個易於使用的 API 之後,包括 RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs 和 Apache RocketMQ。它甚至消除了這些平臺之間在方法和特性上的任何細微差異(例如分割槽或交換機),讓你自由地建立創新的事件驅動解決方案。

在接下來的演示中,你將看到 Spring Cloud Stream 巧妙的抽象如何幫助使事件流程式碼更清晰、更容易使用。你還將看到使用 Spring Cloud Stream 的 binding 庫在兩種不同的訊息平臺(RabbitMQKafka)之間切換是多麼容易。

開始之前

這些事件驅動的微服務需要在你的電腦上安裝最新版本的以下應用程式1

  1. Java 8
  2. Docker(我們將在本地執行 RabbitMQ 和 Kafka)
  3. Git(可選)
  4. Bash(假設,雖然其他替代方案也可能奏效)

執行演示

首先,從 GitHub 克隆程式碼倉庫。要執行此操作(如果你安裝了 Git),請開啟新的終端視窗並執行以下命令。如果你沒有安裝 Git,請下載並解壓 此 zip 檔案

git clone https://github.com/benwilcock/spring-cloud-stream-demo.git

檢查程式碼後,你會注意到這個倉庫包含兩個微服務。

  1. Loansource 微服務(位於 /loansource 資料夾中)。此微服務充當事件訊息的來源。這些事件是 Loan 應用程式,類似於你在銀行和金融領域中會看到的。每筆貸款都有“名稱”、“金額”和“狀態”(最初設定為 PENDING,即待處理)。

  2. Loancheck 微服務(位於 /loancheck 資料夾中)。此微服務充當 Loan 處理器。它檢查哪些貸款是適合發放的,並將它們分類到 APPROVED(已批准)或 DECLINED(已拒絕)狀態。

要執行演示,請按照以下說明操作。

步驟 1:啟動訊息伺服器

在一個新的終端視窗中,進入專案的根資料夾並執行以下命令。

你需要安裝並執行 "Docker",以便此指令碼正常工作,因為它需要 docker-compose

./start-servers.sh

此指令碼將啟動 KafkaRabbitMQ,並將兩者的日誌輸出流式傳輸到終端視窗(除非你按 Ctrl-C 退出)。當你按 Ctrl-C 時,伺服器不會停止 - 它們會繼續在後臺執行。啟動後,這些伺服器都將可用於在你的計算機上執行的應用程式。

步驟 2:選擇 Kafka 或 RabbitMQ 模式

在接下來的步驟 3 和 4 中,我們必須將 -P<profile-choice> 替換為我們想要使用的訊息平臺的名稱。

  • 對於 Kafka,使用:-Pkafka
  • 對於 RabbitMQ,使用:-Prabbit

如果你完全省略 -P<profile-choice> 設定,則預設使用 Kafka。

注意:此演示並非旨在在 Kafka 和 RabbitMQ 之間“橋接”訊息,因此在編譯和執行這兩個應用程式時,請確保在每個應用程式中選擇相同的配置檔名。如果你的目標是橋接訊息系統,請參閱此處的文件

步驟 3:生成一些貸款事件

在一個新的終端視窗中,使用 cd 命令將當前目錄切換到 /loansource 目錄,然後執行以下命令,將 <profile-choice> 替換為你想要執行的模式(如上面步驟 2 中討論的 kafkarabbit 模式)。

./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>

一旦 loansource 應用程式啟動,在終端視窗中,你應該每秒看到一條訊息,告訴你一個新的 Loan 事件已以 PENDING(待處理)狀態釋出到訊息平臺。讓此微服務保持執行,然後進入下一步。

步驟 4:處理貸款事件

在另一個新的終端視窗中,將當前目錄切換到 /loancheck 目錄,然後執行以下命令,同樣將 <profile-choice> 替換為你想要執行的模式。

./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>

一旦 loancheck 應用程式啟動,在終端視窗中,你應該每秒看到一條訊息,告訴你已從訊息平臺讀取了一個新的 PENDING(待處理)Loan 應用程式,並且它被 APPROVED(已批准)或 DECLINED(已拒絕)。如果你想了解這些應用程式是如何構建的,請跳到“工作原理”部分。

步驟 5:停止演示

完成微服務後,在 /loansource/loancheck 微服務的每個終端視窗中按 Ctrl-C。應用程式將停止,事件處理也將停止。

如果你要在 Kafka 和 Rabbit 之間切換模式,只需返回到步驟 2 並重復該過程即可。

如果你已完全完成演示,並希望停止 Kafka 和 RabbitMQ 伺服器,請在專案根資料夾的終端視窗中執行 ./stop-servers.sh 指令碼。如果你只是在模式之間切換,則無需執行此操作。

工作原理

Maven Profiles(在每個專案的 pom.xml 中)控制你在構建時將哪些 Spring Cloud Stream 繫結器新增為依賴項。當你選擇 -Pkafka 時,[spring-cloud-stream-binder-kafka][kafka] 依賴項會新增到專案中。當你選擇 -Prabbit 時,[spring-cloud-stream-binder-rabbit][rabbit] 依賴項會新增進來。

<profiles>
    <profile>
        <id>kafka</id>
        <properties>
            <spring.profile.activated>kafka</spring.profile.activated>
        </properties>
        <activation>
            <activeByDefault>true</activeByDefault>
        </activation>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-kafka</artifactId>
                <version>${spring-cloud-stream.version}</version>
            </dependency>
        </dependencies>
    </profile>
    ...
<profiles>

你選擇的 Maven Profile 也會影響 src/main/resources/application.properties 檔案中的 spring.profiles.active 屬性,該屬性會切換你在啟動時看到的橫幅。

Loansource 微服務

對於 Loansource 微服務,我們使用了 Spring Cloud Stream v2.1 的一個新特性 - Spring Cloud Function 支援。藉助這個新特性,讓 LoansourceApplication 微服務充當 Loan 訊息源所需的只是宣告一個生成並返回 Supplier<>@Bean 方法。在這種情況下,它是一個型別為 LoanSupplier。函式方法程式碼如下所示...

@Bean
public Supplier<Loan> supplyLoan() {
  return () -> {
    Loan loan = new Loan(UUID.randomUUID().toString(), "Ben", 10000L);
    LOG.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
    return loan;
  };
}

Supplier<> 是一種 Java 函式資料型別。由於只有一個返回此型別的 @Bean 方法,Spring Cloud Stream 知道下一步該怎麼做。預設情況下,它會每秒觸發一次此函式,並將結果傳送到名為“output”的預設 MessageChannel。這個函式方法的優點在於它只包含業務邏輯,因此你可以使用常規的單元測試來測試它。

我們可以使用 application.properties 檔案中的 spring.cloud.function.definition 屬性來顯式宣告我們希望繫結到繫結目標(binding destinations)的函式 bean - 但對於只定義了一個 @Bean 的情況,這並非必要。

如果我們想使用不同的輪詢間隔,可以使用 application.properties 檔案中的 spring.integration.poller.fixed-delay 屬性。

Loancheck 微服務

loancheck 微服務需要多一點程式碼,但不多。它的工作是將 Loan 事件分類到不同的通道。為此,它訂閱來自源的 output 主題的事件,然後根據貸款的價值將它們傳送到 approveddeclined 主題,類似於欺詐檢查功能。

因為我們使用了 3 個訊息通道(一個入站,兩個出站),所以使用一個簡單的 LoanProcessor 介面來明確輸入和輸出。目前,它看起來像這樣

@Component
public interface LoanProcessor {

  String APPLICATIONS_IN = "output"; // Topic where the new loans appear
  String APPROVED_OUT = "approved"; // Topic where the approved loans are sent
  String DECLINED_OUT = "declined"; // Topic where the declined loans are sent

  @Input(APPLICATIONS_IN)
  SubscribableChannel sourceOfLoanApplications();

  @Output(APPROVED_OUT)
  MessageChannel approved();

  @Output(DECLINED_OUT)
  MessageChannel declined();
}

這個 LoanProcessor 介面首先在 @SpringBootApplication 類(LoanCheckApplication.java)中作為 @EnableBinding() 註解的引數被引用,如下所示。

@SpringBootApplication
@EnableBinding(LoanProcessor.class)
public class LoanCheckApplication {

  public static void main(String[] args) {
    SpringApplication.run(LoanCheckApplication.class, args);
  }
}

此外,一個名為 LoanChecker.java 的 Spring @Component 在執行時用這個 LoanProcessor 構建。而且,只要有新的 Loan 事件到達,這個元件的 checkAndSortLoans(Loan) 方法就會自動呼叫,因為它被標記為 LoanProcessor.APPLICATIONS_IN 通道的 @StreamListener()。你可以在以下程式碼示例中看到這個註解的使用。

  @StreamListener(LoanProcessor.APPLICATIONS_IN)
  public void checkAndSortLoans(Loan loan) {

    if (loan.getAmount() > MAX_AMOUNT) {
      loan.setStatus(Statuses.DECLINED.name());
      processor.declined().send(message(loan));
    } else {
      loan.setStatus(Statuses.APPROVED.name());
      processor.approved().send(message(loan));
    }
  }

然後,這個方法使用簡單的業務邏輯對 Loan 物件進行分類。根據分類結果,它將它們傳送到 processor.approved() 通道或 processor.declined() 通道(相應地設定其貸款狀態之後)。

總結

正如你所看到的,使用 Spring Cloud Stream 時獲得關注點分離確實非常有利。這兩個微服務中完全沒有任何 Kafka 或 RabbitMQ 特定的程式碼。這使我們可以專注於業務邏輯,而無需關心訊息平臺,你只需更改專案 pom.xml 中的“binder”依賴項,就可以輕鬆切換訊息平臺。

更多內容...

你可以按如下方式檢視事件流經訊息平臺的情況

  • 對於 Kafka,可以使用 KafDrop 工具在 localhost:9000 上觀察主題和事件訊息。無需登入。

  • 對於 RabbitMQ,可以在 localhost:15672 上找到 Rabbit Management Console,用於觀察交換機和事件訊息。登入時使用者名稱為 guest,密碼也為 guest。要檢視實際的訊息內容,你可能需要手動建立一個佇列,並使用 # 作為你的 routing key 將其繫結到所需的主題。

要了解 Spring Cloud Stream 的最新資訊,請訪問 Spring 網站上該專案的專屬專案頁面

要從頭開始建立自己的 Spring 專案,請使用 start.spring.io 上的專案配置器。

如果你想深入瞭解 Spring 和純粹的 Kafka,請檢視這些精彩的部落格文章

  1. Gary Russell:深入探討 Spring for Apache Kafka:錯誤處理、訊息轉換和事務支援

  2. Soby Chacko:深入探討 Spring for Apache Kafka:Apache Kafka 和 Spring Cloud Stream


腳註


  1. 此倉庫中的微服務程式碼使用 MavenSpring BootSpring Cloud Stream 編寫和打包。執行時,程式碼依賴於 KafkaZookeeperRabbitMQ 以及 KafDrop(一個由 Obsidian Dynamics 提供的 Docker 映象)。此列表中的所有內容均已為你提供 - 你無需安裝它們。

獲取 Spring 新聞通訊

訂閱 Spring 新聞通訊以保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助你加速前進。

瞭解更多

獲取支援

Tanzu Spring 透過一項簡單的訂閱提供對 OpenJDK™、Spring 和 Apache Tomcat® 的支援及二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群的所有即將到來的活動。

檢視全部