領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多事件驅動架構非常棒。但是如果沒有框架,編寫與流行事件訊息平臺配合使用的腳手架可能會很混亂。在這篇文章中,我們將探討如何使用 Spring Cloud Stream 來簡化您的程式碼。
您只想為事件驅動應用程式編寫邏輯,但樣板訊息程式碼可能會阻礙您。將您的應用程式連線到訊息服務很棘手,如果您是企業開發人員,您可能需要使用多種訊息技術(無論是本地的還是雲端的)。
讓靈活的訊息抽象來處理複雜的 मैसेज平臺整合,這樣您就可以專注於編寫簡單清晰的業務邏輯。Spring Cloud Stream 是一個很好的選擇。它將許多流行的訊息平臺統一到一個易於使用的 API 後面,包括 RabbitMQ、Apache Kafka、Amazon Kinesis、Google PubSub、Solace PubSub+、Azure Event Hubs 和 Apache RocketMQ。它甚至可以消除這些平臺之間(例如分割槽或交換)方法和功能上的細微差異,讓您可以自由地建立創新的事件驅動解決方案。
在接下來的演示中,您將看到 Spring Cloud Stream 巧妙的抽象如何幫助使事件流程式碼更簡潔、更易於使用。您還將看到使用 Spring Cloud Stream 的 binding 庫在兩個不同的訊息平臺(RabbitMQ 或 Kafka)之間切換是多麼簡單。
這些事件驅動微服務需要在您的 PC 上安裝以下最新應用程式1
首先,從 GitHub 克隆程式碼倉庫。為此(如果已安裝 Git),請開啟新的終端視窗併發出以下命令。如果未安裝 Git,請下載並解壓 此 zip 檔案。
git clone https://github.com/benwilcock/spring-cloud-stream-demo.git
檢查程式碼後,您會發現此倉庫包含兩個微服務。
Loansource 微服務(位於 /loansource 資料夾中)。此微服務充當事件訊息的源。這些事件是 Loan 應用程式,類似於您在銀行和金融領域會看到的。每筆貸款都有一個“名稱”、“金額”和“狀態”(最初設定為 PENDING)。
Loancheck 微服務(位於 /loancheck 資料夾中)。此微服務充當 Loan 處理器。它檢查哪些貸款是可行的,並將其分類為 APPROVED 或 DECLINED 狀態。
要執行演示,請按照以下說明操作。
在一個新的終端視窗中,進入專案的根資料夾併發出以下命令。
您需要安裝並執行 "Docker",此指令碼才能正常工作,因為它需要
docker-compose。
./start-servers.sh
此指令碼將啟動 Kafka 和 RabbitMQ 並將兩者的日誌輸出流式傳輸到終端視窗(除非您使用 Ctrl-C 退出)。當您按下 Ctrl-C 時,伺服器不會停止 - 它們將在後臺繼續執行。一旦啟動,這些伺服器將全部可供您的計算機上執行的應用程式使用。
在接下來的步驟 3 和 4 中,我們必須將 -P<profile-choice> 替換為我們想要使用的訊息平臺的名稱。
-Pkafka-Prabbit如果您完全省略 -P<profile-choice> 設定,則使用 Kafka。
注意:此演示並非旨在在 Kafka 和 RabbitMQ 之間“橋接”訊息,因此請確保在編譯和執行這兩個應用程式時選擇相同的配置檔名。如果您的目標是橋接訊息系統,請參見此處文件。
在一個新的終端視窗中,使用 cd 命令將 /loansource 目錄設定為當前目錄,然後發出以下命令,將 <profile-choice> 替換為您想要執行的模式(如上面步驟 2 中所述,可以是 kafka 或 rabbit 模式)。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
一旦 loansource 應用程式啟動,在終端視窗中,您應該每秒看到一條訊息,告訴您一個新的貸款事件已以 PENDING 狀態釋出到訊息平臺。讓此微服務繼續執行,然後進入下一步。
在另一個新的終端視窗中,將 /loancheck 目錄設定為當前目錄,然後發出以下命令,同樣將 <profile-choice> 替換為您想要執行的模式。
./mvnw clean package spring-boot:run -DskipTests=true -P<profile-choice>
一旦 loancheck 應用程式啟動,在終端視窗中,您應該每秒看到一條訊息,告訴您一個新的 PENDING 貸款申請已從訊息平臺讀取並被 APPROVED 或 DECLINED。如果您想了解這些應用程式是如何構建的,請跳到“工作原理”部分。
完成微服務後,在 /loansource 和 /loancheck 微服務的每個終端視窗中按下 Ctrl-C。應用程式將停止,事件處理也將停止。
如果您要在 Kafka 和 Rabbit 之間切換模式,只需返回到步驟 2 並重復該過程。
如果您已完全完成演示並希望停止 Kafka 和 RabbitMQ 伺服器,請在專案根資料夾的終端視窗中執行
./stop-servers.sh指令碼。如果您只是在模式之間切換,則沒有必要。
Maven 配置檔案(在每個專案的 pom.xml 中)控制在構建時新增哪些 Spring Cloud Stream 繫結作為依賴項。當您選擇 -Pkafka 時,[spring-cloud-stream-binder-kafka][kafka] 依賴項將新增到專案中。當您選擇 -Prabbit 時,[spring-cloud-stream-binder-rabbit][rabbit] 依賴項將新增。
<profiles>
<profile>
<id>kafka</id>
<properties>
<spring.profile.activated>kafka</spring.profile.activated>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
</profile>
...
<profiles>
您選擇的 Maven 配置檔案還會影響 src/main/resources/application.properties 檔案中的 spring.profiles.active 屬性,該屬性會切換您在啟動時看到的橫幅。
對於 Loansource 微服務,我們使用了 Spring Cloud Stream v2.1 的一項新功能 - Spring Cloud Function 支援。有了這項新功能,讓 LoansourceApplication 微服務充當 Loan 訊息的來源所需的全部是宣告一個生成並返回 Supplier<> 的 @Bean 方法。在這種情況下,它是一個 Loan 型別的 Supplier。函式方法程式碼看起來像這樣...
@Bean
public Supplier<Loan> supplyLoan() {
return () -> {
Loan loan = new Loan(UUID.randomUUID().toString(), "Ben", 10000L);
LOG.info("{} {} for ${} for {}", loan.getStatus(), loan.getUuid(), loan.getAmount(), loan.getName());
return loan;
};
}
Supplier<> 是 Java 函式資料型別。由於只有一個返回此型別的 @Bean 方法,Spring Cloud Stream 確切地知道接下來該怎麼做。預設情況下,它將每秒觸發一次此函式並將結果傳送到名為“output”的預設 MessageChannel。此函式方法的優點是它只包含業務邏輯,因此您可以使用常規單元測試對其進行測試。
我們可以使用
application.properties檔案中的spring.cloud.function.definition屬性來明確宣告我們希望將哪個函式 bean 繫結到繫結目的地 - 但對於只定義一個@Bean的情況,這沒有必要。
如果我們想使用不同的輪詢間隔,我們可以使用
application.properties檔案中的spring.integration.poller.fixed-delay屬性。
loancheck 微服務需要更多的程式碼,但不多。它的工作是將 Loan 事件分類到不同的通道中。為此,它正在訂閱來自源的 output 主題的事件,然後根據貸款的價值將其傳送到 approved 或 declined 主題,類似於欺詐檢查設施。
因為我們使用了 3 個訊息通道(一個入站和兩個出站),所以使用了一個簡單的 LoanProcessor 介面來闡明輸入和輸出。目前,它看起來像這樣
@Component
public interface LoanProcessor {
String APPLICATIONS_IN = "output"; // Topic where the new loans appear
String APPROVED_OUT = "approved"; // Topic where the approved loans are sent
String DECLINED_OUT = "declined"; // Topic where the declined loans are sent
@Input(APPLICATIONS_IN)
SubscribableChannel sourceOfLoanApplications();
@Output(APPROVED_OUT)
MessageChannel approved();
@Output(DECLINED_OUT)
MessageChannel declined();
}
此 LoanProcessor 介面首先在 @SpringBootApplication 類 (LoanCheckApplication.java) 中作為 @EnableBinding() 註解的引數引用,如下所示。
@SpringBootApplication
@EnableBinding(LoanProcessor.class)
public class LoanCheckApplication {
public static void main(String[] args) {
SpringApplication.run(LoanCheckApplication.class, args);
}
}
此外,一個名為 LoanChecker.java 的 Spring @Component 在執行時使用此 LoanProcessor 構建。此外,此元件的 checkAndSortLoans(Loan) 方法在每次新 Loan 事件到達時都會自動呼叫,因為它已被註釋為 LoanProcessor.APPLICATIONS_IN 通道的 @StreamListener()。您可以在以下程式碼示例中看到此註解的使用。
@StreamListener(LoanProcessor.APPLICATIONS_IN)
public void checkAndSortLoans(Loan loan) {
if (loan.getAmount() > MAX_AMOUNT) {
loan.setStatus(Statuses.DECLINED.name());
processor.declined().send(message(loan));
} else {
loan.setStatus(Statuses.APPROVED.name());
processor.approved().send(message(loan));
}
}
此方法隨後使用簡單的業務邏輯對 Loan 物件進行排序。根據排序結果,它將它們傳送到 processor.approved() 通道或 processor.declined() 通道(在相應設定其貸款狀態之後)。
正如您所看到的,使用 Spring Cloud Streams 獲得的關注點分離確實非常健康。在任何一個微服務中,都沒有任何特定於 Kafka 或 RabbitMQ 的程式碼。這使我們能夠專注於業務邏輯,而不管訊息平臺是什麼,並且您可以透過簡單地更改專案 pom.xml 中的“繫結器”依賴項來輕鬆切換訊息平臺。
您可以透過以下方式檢視流經訊息平臺的事件
對於 Kafka,可以使用 KafDrop 工具在 localhost:9000 觀察主題和事件訊息。無需登入。
對於 RabbitMQ,可以在 localhost:15672 找到 Rabbit 管理控制檯,用於觀察交換機和事件訊息。登入使用者名稱為 guest,密碼也為 guest。要觀察實際訊息內容,您可能需要手動建立一個佇列並使用 # 作為 routing key 將其繫結到所需主題。
要及時瞭解 Spring Cloud Stream 的最新資訊,請訪問 Spring 網站上該專案專門的 專案頁面。
要從頭開始建立您自己的 Spring 專案,請使用 start.spring.io 上的專案配置器。
如果您想更深入地瞭解 Spring 和純 Kafka,請檢視這些精彩的部落格文章