Green Beans:企業訊息傳遞和 Spring 入門

工程 | Josh Long | 2011 年 1 月 26 日 | ...

在本文中,我們將介紹訊息傳遞的核心概念,以及 Spring 框架及其姐妹專案提供的豐富訊息傳遞支援。

什麼是訊息傳遞?為了最好地解釋這一點,我將轉述 Gregor Hohpe 和 Bobby Woolf 的開創性著作《企業整合模式》(Addison Wesley, 2004)中提供的示例。當你打電話時,你會嘗試將資訊傳達給另一方。只有當對方在你打電話時能夠接聽,這才會起作用。由於並不總能接到電話,因此我們使用語音郵箱來排隊訊息。呼叫者將訊息留在語音郵箱中,然後被呼叫者可以稍後非同步地檢索訊息(或實際上,許多訊息)。

在這個例子中,語音郵箱位於雙方之間。它儲存訊息,然後在被呼叫者(接收者)檢索時將其傳遞。在企業訊息傳遞領域,情況非常相似:一方將訊息傳送到訊息代理(也稱為訊息導向中介軟體 - MOM),而另一方——當該方能夠——在稍後接收或顯式查詢訊息代理中的任何訊息。

這正是類比停止有用的地方。與語音郵箱相比,訊息代理具有許多選項。訊息代理非常適合提供附加服務,例如路由,並保證訊息傳遞。訊息代理可以針對不同的用例進行最佳化,例如,你可以權衡速度與永續性。訊息代理可能會將訊息持久化到外部儲存以確保永續性,儘管這通常是可以透過配置來切換以提高速度的選項。

在語音郵箱示例中,訊息由一方傳送,然後傳遞給另一方——通訊是*點對點*的。訊息代理支援這一點,以及另一種稱為*釋出/訂閱*的通訊型別,其中訊息會傳遞給多個客戶端。

訊息代理的一個常見用途是解決兩個不同系統之間的整合問題。傳送到訊息代理的資料通常是傳送方和接收方都通用的格式。兩個系統在使用訊息代理時需要就資料合同達成一致。訊息通常包含訊息正文,其中儲存了訊息本身的內容,以及訊息頭,它們是鍵/值對,提供了有關訊息正文的元資料,可用於幫助訊息的消費者處理訊息。訊息頭可以是任何你想要的內容,但它們通常與訊息本身或訊息的處理器相關。

Java Message Service

Java 訊息服務 (JMS) API 規定了與訊息代理互動的客戶端介面。每個訊息代理都提供自己的 API 實現,非常類似於 JDBC 驅動程式對 JDBC API 的作用。這意味著 JMS 客戶端通常應該使用與伺服器相同版本的客戶端。有許多優秀的 JMS 代理實現可供選擇。其中一個原因是訊息傳遞一直是應用程式開發的重要組成部分,並且今天仍然如此。自 1.1 版本以來,JMS 一直是 J2EE(現為 Java EE)規範的一部分。在過去十年的大部分時間裡,JMS 規範都停留在 1.1 版本。

在 JMS 中,客戶端使用 javax.jms.ConnectionFactory 建立 javax.jms.Connection。然後可以使用 Connection 建立 javax.jms.SessionSession 代表客戶端與代理的互動,並允許傳送和接收訊息以及其他不太明顯的。操作。

該介面上最有用的方法與建立 javax.jms.Destination 的訊息生產者和訊息消費者有關。Destination 映射了訊息代理上“地址”的 JMS 概念。它還映射了代理儲存訊息的位置。在 JMS 中,訊息從同一位置傳送、儲存和消費,所有這些都由 javax.jms.Destination 例項表示。

[caption id="attachment_7506" align="alignnone" width="573" caption="上方,藍色元素代表生產者和消費者。橙色元素代表代理中緩衝訊息的目標。在 JMS 中,這些是主題或佇列。"][/caption]

Destination 是一個介面,有兩個更具體的子介面 javax.jms.Queuejavax.jms.TopicQueue 代表標準的佇列,這是之前描述的點對點構造。Topic 提供釋出/訂閱訊息傳遞,並將一條訊息傳遞給多個接收者。

要將訊息傳送到 Destination,您必須建立一個 javax.jms.MessageProducer。然後可以使用 MessageProducer 傳送 javax.jms.Message

JMS 支援兩種不同的接收訊息的機制。第一種方式是使用 javax.jmx.MessageConsumer#receive() 方法來請求訊息,該方法以*同步*方式返回 Destination 中的單個訊息;預設情況下,該方法會阻塞直到收到訊息。而不是使用 MessageConsumer,客戶端可以透過呼叫 javax.jms.Session#setMessageListener(MessageListener) 來安裝 javax.jms.MessageListenerMessageListener 是一個介面,只有一個方法 public void onMessage(javax.jms.Message),每當 Destination 上有可供消費的 javax.jms.Message 時,就會呼叫該方法。MessageListener 提供*非同步*訊息處理:當訊息到達時,它們會被處理。

JMS API 還有很多內容需要學習,但這些類和概念將在我們討論 Spring 對 JMS 訊息傳遞的支援時對您最有幫助。第一層支援是 org.springframework.jms.core.JmsTemplate,它提供了簡化方法,將我們剛剛討論的內容壓縮成單行程式碼。JmsTemplate 需要一個 javax.jms.ConnectionFactory 例項來執行其工作。JmsTemplate 可以為您完成許多工作。例如,要傳送訊息,JmsTemplate 會建立一個 javax.jms.Session,設定一個 javax.jms.MessageConsumerjavax.jms.MessageProducer,設定所有事務的機制,併為您提供當前 javax.jms.Session 的引用,以便您可以建立您選擇的訊息併發送它。所有這些錯誤處理和構造邏輯,輕鬆就能節省幾十行程式碼。一旦訊息傳送完畢,它會銷燬或關閉大部分這些物件。這是應用程式伺服器(如 Java EE 伺服器)中的標準做法,因為 ConnectionFactory 例項由伺服器建立、由伺服器管理並進行池化。它們在使用後會快取這些例項。在這些環境中關閉資源只是將它們返回到池中。因此,JmsTemplate 在標準情況下會做正確的事情,假設 ConnectionFactory 快取或池化例項。

在應用程式伺服器等託管環境中,您通常需要從 JNDI 獲取 javax.jms.ConnectionFactory。您可以使用 Spring 來為您查詢該引用並配置 JmsTemplate。在我們的示例中,我們希望更寬鬆地操作,因此我們將使用獨立的 ActiveMQ 訊息代理。ActiveMQ 是一個流行的、開源訊息代理。要使用它,請下載它,然後在 bin 資料夾中執行適合您作業系統的啟動指令碼。在您的應用程式中,您需要客戶端庫來連線到相應版本的 ActiveMQ。在撰寫本文時,ActiveMQ 的最新版本是 5.4.2。如果您使用 Maven,請將以下依賴項新增到您的 Maven pom 檔案中



            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>${activemq.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-optional</artifactId>
                <version>${activemq.version}</version>
            </dependency>

請確保為 ${activemq.version} 建立一個 Maven 屬性,或者手動將字串替換為相應的版本。還有一個 activemq-all 依賴項,但它會拉下許多可能不必要的 jar。對於我們的應用程式,上述兩個依賴項就足夠了。

將 Spring 與 JMS 結合使用

讓我們檢查一個基本 JMS 應用程式的配置。首先,讓我們檢查基本的 Spring XML 配置



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
       ">

    <context:property-placeholder location="jms.properties"/>
    <context:component-scan base-package="org.springsource.greenbeans.examples.jms.core"/>
    <context:component-scan base-package="org.springsource.greenbeans.examples.jms.jmstemplate"/>
    <tx:annotation-driven transaction-manager="jmsTransactionManager"/>

</beans>

您可以看到 XML 主要設定了屬性佔位符解析並啟用了類路徑掃描。最有趣的部分是元素,它告訴 Spring 啟用所有帶有 @Transactional 註釋的方法上的事務。該元素引用了 Spring 上下文中的另一個 bean,jmsTransactionManager,它是在以下 Java 配置類中定義的。



package org.springsource.greenbeans.examples.jms.core;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*
import org.springframework.jms.connection.*
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;

@Configuration
public class JmsConfiguration {

  @Value("${broker.url}")
  private String brokerUrl;

  @Bean
  public ConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(this.brokerUrl);
    return new CachingConnectionFactory(activeMQConnectionFactory);
  }

  @Bean
  public JmsTransactionManager jmsTransactionManager() {
    return new JmsTransactionManager(this.connectionFactory());
  }

  @Bean
  public JmsTemplate jmsTemplate() {
    return new JmsTemplate(this.connectionFactory());
  }
}

配置相當簡單。首先,我們定義一個 ActiveMQConnectionFactory 例項,然後將其交給 Spring 框架的 CachingConnectionFactory 例項。有些代理提供自己的快取 ConnectionFactory 實現。但是,如果您自己的代理沒有,那麼您總是可以使用 Spring Caching ConnectionFactory 實現來提高速度。

接下來,我們有一個 JmsTransactionManager,它提供 JMS 本地事務。在 JMS 中,事務回滾只有兩種結果:傳送操作失敗時,訊息未被髮送;接收操作失敗時,訊息會被重新排隊到訊息代理。後一種情況可能很複雜。

如果您收到一條訊息,然後在處理過程中遇到錯誤,並且假設您保持事務開啟,那麼事務將被回滾,訊息將被返回到代理。一旦進入代理,會發生什麼取決於代理和您的配置。通常,訊息將被立即重新傳遞。然而,這並非總是期望的行為。因此,大多數(如果不是全部)代理都支援某種形式的死信佇列,無法傳遞的訊息將被髮送到其中。您可以按需處理此佇列中的訊息——也許當這種錯誤情況發生時,某個監控工具會叫醒某人。然而,大多數代理提供更多的控制。可能可以設定關於錯誤訊息路由的規則。例如,代理可能會嘗試立即重新傳遞訊息,然後,如果再次失敗,它可能會等待一段時間再試,如果仍然失敗,則等待更長時間。這通常稱為退避期。也許在達到某個閾值後,訊息將被髮送到死信佇列,或被完全丟棄。無論如何,請檢查您的代理文件。

最後,我們透過提供對 ConnectionFactory 的引用來建立一個 JmsTemplate

讓我們看看 JmsTemplate 的實際應用。為了保持示例簡單,我們首先討論在一個名為 Producer 的類中如何傳送訊息。訊息傳遞的一個常見用途是向一個(或多個)不同系統傳送通知,作為同步機制,以便感興趣的系統擁有某個資料項的最新版本。在此示例中,我們假設有一個簡單的 Customer POJO,其中包含標準欄位:firstNamelastNameemailid


package org.springsource.greenbeans.examples.jms.core;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*
import org.springframework.beans.factory.annotation.*
import org.springframework.jms.core.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*;


@Component
public class Producer {

  @Value("${jms.customer.destination}")
  private String customerDestination;

  @Autowired
  private JmsTemplate jmsTemplate;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void sendCustomerUpdate(final Customer customer) throws Exception {
    this.jmsTemplate.send(this.customerDestination, new MessageCreator() {
      @Override
      public Message createMessage(Session session) throws JMSException {
           log.info("Sending customer data " + ToStringBuilder.reflectionToString(customer));
           MapMessage mapMessage = session.createMapMessage();
           mapMessage.setLong("id", customer.getId());
           mapMessage.setString("firstName", customer.getFirstName());
           mapMessage.setString("lastName", customer.getLastName());
           mapMessage.setString("email", customer.getEmail());
      }
    });
  }
}

在類中,我們看到一個 sendCustomerUpdate 方法,它以 Customer 引用作為其引數。使用 JmsTemplate 的 send 方法——它接受兩個引數:第一個是目標名稱("customers")的字串,第二個是 Spring 框架類 MessageCreator 的引用——我們使用傳遞到我們實現的 createMessage(javax.jms.Session) 方法的 javax.jms.Session 引用來構建一個 JMS 訊息。JMS 中有許多型別的訊息可以建立:javax.jms.TextMessagejavax.jms.ObjectMessagejavax.jms.MapMessage 等。ObjectMessage 的作用正如您所期望的那樣——它允許您將序列化的物件作為 JMS 訊息的有效負載進行傳輸。通常,應避免這樣做。序列化的資料型別將訊息的生產者和消費者與相同的 API 合約耦合在一起,這在並非總是可行的。即使能夠保證在訊息交換的雙方都可用且類版本相同的型別,與更靈活的其他選項相比,這樣做通常效率低下。相反,優先選擇分解——您可以使用 javax.jms.TextMessage 將物件封送為 XML 或 JSON 字串。或者,使用 javax.jms.MapMessage 傳送物件的構成部分,而不是物件本身,它只是一個具有已知鍵/值對的訊息,就像 java.util.Map 一樣。這就是我們在這裡採用的方法。所有 JVM 都擁有 intslongsStrings 等,並且可以反序列化以這種方式傳輸的資料。

現在我們來看一下 JMS 中接收訊息。第一種方法是同步地請求它們,一次一個。



package org.springsource.greenbeans.examples.jms.jmstemplate;

import org.apache.commons.logging.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;
import javax.jms.*

@Component
public class RawJmsTemplatePollingMessageConsumer {

  @Autowired
  protected JmsTemplate jmsTemplate;

  @Value("${jms.customer.destination}")
  private String destination;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void receiveAndProcessCustomerUpdates() throws Exception {
    Message message = this.jmsTemplate.receive(this.destination);
    if (message instanceof MapMessage) {

      MapMessage mapMessage = (MapMessage) message ;
      String firstName = mapMessage.getString("firstName");
      String lastName = mapMessage.getString("lastName");
      String email = mapMessage.getString("email");
      Long id = mapMessage.getLong("id");

      Customer customer = new Customer(id, firstName, lastName, email );

      log.info("receiving customer message: " + customer);

    }
  }
}

此示例使用 JmsTemplate 例項在有可用訊息時接收一條新訊息,然後將其轉換為 Customer 物件,步驟與傳送訊息時相反,並將轉換後的物件有益地寫入日誌。如果不得不重複多次,這種 JMS 訊息的打包和解包會變得很繁瑣。通常,將此類邏輯提取到單獨的類中是有價值的。Spring JMS 層次結構支援使用 MessageConverter 層次結構的例項,讓您覆蓋物件的序列化方式。預設的 SimpleMessageConverter 在未另行指定時生效,並且在大多數情況下都做得很好,所以我們在這裡不覆蓋它。但是,如果我們決定要將物件作為 XML 傳輸,我們可以利用 MarshallingMessageConverter,它利用 Spring 框架的 OXM(物件-XML 封送)支援。最後,請注意 receiveAndProcessCustomerUpdates 方法裝飾了 @Transactional 註釋。如果在接收訊息時發生任何錯誤,並且丟擲了 Exception,Spring 將回滾接收操作並將訊息返回給代理。

監聽使其更簡單

這個例子足夠簡單,但有一些限制。首先,我們的程式碼與 JMS 和 Spring API 緊密耦合。其次,這隻處理一條訊息,並且僅在呼叫方法時才處理。實現者有責任確保呼叫該方法。通常,實現者希望訊息在到達後儘快處理,非同步處理。一個自然的下一步可能是從一個無限迴圈中連續呼叫 receive 方法,以確保佇列中的所有訊息都儘快得到處理。在此之後,為了實現更高的吞吐量,特別是對於長時間執行的任務,並確保佇列始終被清空,您可以新增執行緒,以便始終執行多個迴圈。這些都是邏輯上的下一步,但僅僅為了接收和處理訊息,它們也需要大量的工作。實際上,這裡的唯一業務邏輯是處理訊息有效負載並對其執行某些操作的程式碼。

Spring 框架提供了現成的解決方案,並且使用起來很簡單!Spring 框架中有兩種實現可以提供此功能,適用於不同的情況。它們都基於 AbstractJmsListeningContainer 類。如果您願意,您可以直接使用此層次結構,但碰巧的是,使用 Spring 的 JMS 名稱空間配置它還有一種更簡單的方式。

讓我們回顧一下我們之前的 Spring XML 配置,新增 http://www.springframework.org/schema/jms 名稱空間,然後進行相應的配置。



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:jms="http://www.springframework.org/schema/jms"
       ...
       xsi:schemaLocation="…  http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">
       ….

    <jms:listener-container  connection-factory="connectionFactory" acknowledge="auto" transaction-manager="jmsTransactionManager">
        <jms:listener destination="${jms.customer.destination}" ref="messageListenerContainerConsumer" method="receiveMessage" />
    </jms:listener-container>

</beans>

我們只摘錄了已新增到配置檔案中的部分。<jms:listener-container> 元素需要對連線工廠和正在使用的事務管理器進行引用。請注意,Spring 訊息監聽器提供自己的快取,因此您應該在此處使用普通的 ConnectionFactoryCachingConnectionFactory 在這裡是多餘的,不應該使用。在 元素中,您可以配置任意多的 <jms:listener> 元素,每個元素指定目標(javax.jms.Destination)例項的名稱以及將接收新訊息的 Spring Bean 的引用。可選地,您可以配置在引用的 Spring Bean 中應呼叫哪個方法。如果 Spring Bean 實現 javax.jms.MessageListener 或 Spring 自己的 SessionAwareMessageListener 介面之一,則會呼叫每個介面上的唯一方法,並將 javax.jms.Message 傳遞給它,無需指定方法。如果配置了方法,則該方法應以與 javax.jms.Message 的有效負載相同型別的物件作為其引數。對於我們的示例,由於我們期望接收 javax.jms.MapMessage 例項,這將是一個 java.util.Map 例項。

修改後的程式碼是


package org.springsource.greenbeans.examples.jms.messagelistenercontainer;

import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;

import java.util.Map;

@Component
public class MessageListenerContainerConsumer {

  private Log log = LogFactory.getLog(getClass());

  public void receiveMessage(Map<String, Object> message) throws Exception {
    String firstName = (String) message.get("firstName");
    String lastName = (String) message.get("lastName");
    String email = (String) message.get("email");
    Long id = (Long) message.get("id");
    Customer customer = new Customer(id, firstName, lastName, email);
    log.info("receiving customer message: " + customer);
  }
}

不算差吧?您的程式碼不瞭解 JMS,甚至幾乎不瞭解 Spring(除了 @Component 註釋)。當然,您也可以使用 XML 或 Java 配置來配置此 bean,這樣可以避免此依賴關係。)此外,您的程式碼更容易理解。所有相同的規則都適用——例如,接收過程中丟擲的異常將觸發回滾。您可以透過在 XML <jms:listener-container > 元素中指定所需的監聽器數量來提高併發性。您還可以控制使用的是哪種事務管理型別。

AMQP

雖然 JMS 是一個非常強大的選項,但它並非沒有侷限性。客戶端與代理版本耦合,並且隨著部署的系統和代理的標誌日升級,很快就會變得很麻煩。JMS 本質上是 Java 中心化的。客戶端使用 Java 語言驅動程式連線到給定的代理。訊息傳遞的本質是整合,我們不能總是假定我們正在與其他 Java 客戶端整合,尤其是在這樣一個擁有眾多不同平臺的時代。雖然一些 JMS 訊息代理(甚至是開源的)可以擴充套件到極高的吞吐量,但確實存在更快的訊息傳遞選項,如果您的場景需要,那麼至少值得研究替代方案。JMS 是一個不錯的 API,但沒有人會稱之為最好的 API。因此,雖然許多訊息代理支援 JMS,但它們也支援其專有 API 或更強大、更具表現力的替代 API。例如,JMS 在訊息傳送後缺乏路由功能。

一個符合這些挑戰的流行選項是 AMQP 標準。AMQP(高階訊息佇列協議)是一個標準,最初是為了應對摩根大通銀行在關鍵任務應用程式中面臨的挑戰而誕生的。從他們工作的開端出現了一個規範,圍繞這個規範最終形成了一個工作組,今天包括許多公司,如高盛、Progress Software、微軟、Novell、Red Hat、WS02、高盛、美國銀行、巴克萊、思科、瑞士信貸、德國證券交易所繫統,當然還有 VMware 的 SpringSource 部門。SpringSource 特別開發了最流行的基於 AMQP 的訊息代理實現 RabbitMQ。

RabbitMQ 是一個 開源訊息代理。它易於安裝,特別是如果您執行的是許多擁有現成 RabbitMQ 包管理器的系統。RabbitMQ 用 Erlang 語言編寫。通常,實現細節不應很重要,但這個細節特別重要,因為它關乎 RabbitMQ 的速度。您知道,Erlang 是一種輕量級語言,最初部署在關鍵任務電話系統中。Erlang 具有非常輕量級、直觀的執行緒模型,使得 Erlang 程式能夠實現比 JVM 目前能夠實現的高得多的併發性。此外,Erlang 的執行緒模型與其網路模型無縫融合。這意味著擴充套件到多個執行緒或多臺機器基本上是按相同的方式完成的。所有這些都意味著 RabbitMQ 速度很快。非常快,而且它對錯誤具有彈性,這也是 Ericsson 等公司享有九個九(99.9999999%)可用性的原因之一。

AMQP 是一個線路協議(類似於 HTTP),而不是一個 API。這使得它與語言無關(事實上,有數十種針對不同語言和平臺的已知客戶端),並且意味著 RabbitMQ 在您通常不認為會關心訊息代理的各種工具中都得到了支援,例如 WireShark,一個網路流量監控工具。從概念上講,任何 AMQP 客戶端都應該能夠與任何其他 AMQP 實現進行通訊。

深入瞭解 AMQP 代理

AMQP 規範在客戶端和伺服器端都指定了所有構造,以及例行的管理選項。在 AMQP 中,客戶端與伺服器建立連線。客戶端可以向交換機發送訊息。交換機將訊息路由到代理內的佇列,或完全阻止它們。交換機是無狀態的閘道器,而佇列實際佇列化並存儲訊息。

客戶端可以從佇列中消費訊息。交換機和佇列之間沒有關係:您可以建立任意多的佇列,並將其中一個或多個繫結到交換機。交換機和佇列之間的關係稱為繫結。如果訊息中的路由鍵與繫結匹配,交換機會將最多一個訊息副本傳遞給佇列。這一點很重要,因為我之前說過,可以為單個佇列指定多個交換機和繫結。多個匹配不會產生多個訊息。交換機決定什麼構成匹配。有幾個眾所周知的交換機,它們指定不同的匹配演算法。

  • 扇形交換機:扇形交換機將收到的所有訊息路由到繫結到該交換機的每個佇列(這與 javax.jms.Topic 最相似,用於釋出-訂閱式訊息傳遞)
  • 直連交換機:當路由鍵(訊息的常見頭)與繫結鍵相同時進行匹配(這與 javax.jms.Queue 最相似,用於點對點訊息傳遞)
  • 主題交換機:主題交換機在 JMS 中沒有 API 特定等價物。它最類似於某些訊息代理中的分層主題。主題交換機將路由鍵頭與使用特殊語法來允許萬用字元的交換機繫結進行匹配。例如,繫結鍵可能指定以下內容:“.years.#”。此萬用字元將匹配任意一個詞,後跟一個點(“.”),然後是“years”,再後跟一個點(“.”),最後是零個或多個詞。因此,“taxes.years.2011”會匹配,而“taxes.years,”也會匹配,但“years.2322”不會匹配。
  • 頭交換機:匹配頭鍵或頭鍵值對的存在。
[caption id="attachment_7484" align="alignnone" width="698" caption="上面,藍色的圓圈是生產者和消費者,綠色的元素是交換機,橙色的元素是儲存訊息的 AMQP 佇列。它們只是訊息的命名儲存,與 JMS 意義上的佇列沒有關係。"][/caption]

規範還允許新增特殊交換機。例如,RabbitMQ 添加了外掛交換機,這基本上是第三方(或 RabbitMQ 本身)提供額外功能的擴充套件點。這催生了越來越多的可安裝外掛,它們可以處理從傳送 XMPP 訊息、處理複製到顯示管理 Web UI 等各種事情。

將 Spring 與 AMQP 結合使用

我們將研究 Spring AMQP 的使用,它是 Spring 產品組合專案,允許您使用 RabbitMQ 來完成規範要求的所有工作,以及更多高階的 RabbitMQ 特定操作。

讓我們開始構建我們的示例——設計基本上與我們的 JMS 示例相同——使用 RabbitMQ 和 Spring AMQP 客戶端。您首先需要的是適當的依賴項。如果您使用 Maven,請將以下依賴項新增到您的 pom.xml 檔案中


            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>${com.rabbitmq.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>${spring.amqp.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-amqp</artifactId>
                <version>${spring.amqp.version}</version>
            </dependency>

請務必為 ${spring.amqp.version}${com.rabbitmq.version} 屬性佔位符建立 Maven 屬性,或直接將它們替換為適當的版本。在撰寫本文時,${spring.amqp.version}1.0.0.M2${com.rabbitmq.version}2.1.0。就像我們在上一個示例中所做的那樣,我們安裝了一個簡單的 Spring XML 配置檔案來引導所有其他內容。唯一不同的是從 <tx:annotation-driven> 元素中引用的事務管理器實現的名稱、掃描的包以及載入的屬性檔案的名稱!所以,讓我們不要在這裡浪費太多時間,直接進入我們基於 AMQP 的示例的配置。


package org.springsource.greenbeans.examples.amqp.core;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.*;
import org.springframework.amqp.rabbit.core.*
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.*
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.*;

@Configuration
@SuppressWarnings("unused")
public class AmqpConfiguration {

  @Value("${broker.url}")
  private String brokerUrl;

  @Value("${broker.username}")
  private String username;

  @Value("${broker.password}")
  private String password;

  @Value("${amqp.customer.queue}")
  private String customerQueueName;

  @Bean
  public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(singleConnectionFactory());
    rabbitTemplate.setMessageConverter(jsonMessageConverter());
    return rabbitTemplate;
  }

  @Bean
  public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(this.singleConnectionFactory());
  }

  @Bean
  public MessageConverter jsonMessageConverter() {
    return new JsonMessageConverter();
  }

  @Bean
  public ConnectionFactory singleConnectionFactory() {
    SingleConnectionFactory connectionFactory = new SingleConnectionFactory(this.brokerUrl);
    connectionFactory.setUsername(this.username);
    connectionFactory.setPassword(this.password);
    return connectionFactory;
  }

  @Bean
  public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(this.rabbitTemplate());
  }

  @Bean
  public Queue customerQueue() {
    Queue q = new Queue(this.customerQueueName);
    amqpAdmin().declareQueue(q);
    return q;
  }

  @Bean
  public DirectExchange customerExchange() {
    DirectExchange directExchange = new DirectExchange(customerQueueName);
    this.amqpAdmin().declareExchange(directExchange);
    return directExchange ;
  }

  @Bean
  public Binding marketDataBinding() {
    return BindingBuilder.from(customerQueue()).to(customerExchange()).with(this.customerQueueName);
  }
}

正如您所見,這裡的處理過程比我們的 JmsTemplate 要多一些,但不要擔心,主要部分在形式和功能上與其 JMS 對應項相同。其餘的只是細節。首先,我們配置了通常的元件——TransactionManagerRabbitTransactionManager)、ConnectionFactory 例項和 RabbitTemplate。其中大部分應該是相當容易理解的。

讓我們深入研究那些不匹配的領域。第一個細微之處是我們為 RabbitTemplate 配置了對 JsonMessageConverter 的引用。請記住:AMQP 是語言和平臺無關的。從 Java 傳送到 AMQP 代理的訊息很可能被 .NET、Python 或 PHP 上的客戶端消費。當訊息被打包並透過網路傳送時,有效負載以位元組流的形式傳輸。訊息的接收者需要能夠將這些位元組還原成接收者平臺可讀的內容。如果訊息使用了 Java 物件,那麼這些位元組將是序列化的 Java 物件,並且只有另一端具有相同類的 Java 客戶端才能反序列化它。因此,正如在 Spring JMS 支援中一樣,Spring AMQP 提供了一個 MessageConverter 層次結構。Spring AMQP 層次結構有一個 MarshallingMessageConverter,還有一個 SimpleMessageConverter,此外,它還有一個 JsonMessageConverter(目前是 Spring AMQP 專案特有的),它將物件轉換為 JSON(JavaScript 物件表示法,可被所有主流語言和平臺解析,並且比 XML 更簡潔/不易出錯)或從 JSON 轉換回來。在 JMS 中,智慧序列化是效率和設計的問題,但在 AMQP 中,這是一個更緊迫的問題,所以請注意配置的 MessageConverter

您會在配置中發現四個在 JMS 示例中沒有對應項的物件。第一個是 AmqpAdmin。AMQP 在協議級別定義了用於建立應用程式所需的所有內容(包括交換機、佇列和繫結)的命令。在 Spring AMQP API 中,AmqpAdmin 是訪問這些命令的關鍵介面。

customerQueue 方法中,我們配置了一個 AMQP 佇列,在 customerExchange 方法中配置了一個 DirectExchange。最後,我們使用 Spring AMQP 的流式 BindingBuilder API 將佇列連線到我們的交換機。在我們這個特定的示例中——我們傳送一個路由鍵為 "customers" 的訊息到一個名為 "customers" 的佇列。在我們的特定示例中,除了佇列之外,我們不需要宣告任何東西就可以使其工作,因為預設的無名交換機會被啟用,並僅根據路由鍵路由訊息。然而,即使它有點多餘,瞭解它是如何完成的也很有用。我們使用 AmqpAdmin 例項來 "宣告" 這些構造。這些物件是冪等的。您可以宣告它們一百萬次,如果它們已經存在,那麼除了其中一次宣告之外,任何其他宣告都不會發生什麼,所以重複呼叫應用程式啟動時的呼叫是無害的。此外,如果這些構造被設定為持久化,那麼每次都不需要宣告它們。

讓我們看看如何傳送訊息。



package org.springsource.greenbeans.examples.amqp.core;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;

@Component
public class Producer {

  @Value("${amqp.customer.exchange}")
  private String exchange;
  
  @Value("${amqp.customer.queue}")
  private String routingKey;

  @Autowired
  private RabbitTemplate rabbitTemplate;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void sendCustomerUpdate(Customer customer) {
    log.info("sending customer update " + ToStringBuilder.reflectionToString(customer));
    this.rabbitTemplate.convertAndSend(this.exchange , this.routingKey, customer);
  }
}

在這個類中,我們使用 RabbitTemplate 傳送訊息並將其轉換為 JSON。我們指定了我們想要使用的 routingKey 以及應該使用哪個交換機(兩者都是 "customers",與我們在配置中設定的繫結型別一致)。我們已將該類配置為使用 @Transactional,因此任何傳送訊息的失敗都會與使用 JMS 發生失敗時的行為相同。

現在,讓我們看看使用 AMQP 接收訊息的選項。



package org.springsource.greenbeans.examples.amqp.amqptemplate;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.*;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springsource.greenbeans.examples.Customer;

@Component
public class RawAmqpTemplatePollingMessageConsumer {

  @Autowired
  protected RabbitTemplate amqpTemplate;

  @Value("${amqp.customer.queue}")
  private String queue;

  private Log log = LogFactory.getLog(getClass());

  @Transactional
  public void receiveAndProcessCustomerUpdates() throws Exception {
    Customer msg = (Customer)this.amqpTemplate.receiveAndConvert(this.queue);
    log.info("receiving customer message: " + ToStringBuilder.reflectionToString(  msg));
  }
}

不出所料,這看起來幾乎(除了 RabbitTemplate)與第一個同步 JMS 示例相同。我們省去了一些在第一個示例中需要處理的轉換邏輯,但除此之外,基本上是相同的。如果訊息接收時發生事務回滾,訊息將返回到佇列末尾,最終會被重新傳遞。

Spring AMQP 也支援非同步訊息接收,就像 Spring JMS 支援一樣。然而,由於 Spring AMQP 專案仍然是一個初生的專案,因此沒有等效的名稱空間支援。所以,我們需要自己配置物件。在您的配置中新增以下內容。



  @Autowired
  private MessageListenerContainerConsumer messageListenerContainerConsumer;

  @Bean
  public SimpleMessageListenerContainer listenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setTransactionManager(this.rabbitTransactionManager());
    container.setConnectionFactory(singleConnectionFactory());
    container.setQueueName(this.customerQueueName);

    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(
           this.messageListenerContainerConsumer, this.jsonMessageConverter());
    container.setMessageListener(messageListenerAdapter);
    return container;
  }

此配置注入了將執行處理的元件(見下文,messageListenerContainerConsumer 例項透過元件掃描被拾取並自動註冊到 Spring,這就是為什麼我們在這裡自動注入它的原因),然後配置一個 SimpleMessageListenerContainer 例項,該例項將負責接收訊息、管理事務以及在將訊息傳遞給 POJO 之前進行轉換。

POJO 本身看起來像這樣



package org.springsource.greenbeans.examples.amqp.messagelistenercontainer;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.logging.*;
import org.springframework.stereotype.Component;
import org.springsource.greenbeans.examples.Customer;

@Component
public class MessageListenerContainerConsumer {
  
  private Log log = LogFactory.getLog(getClass() );
  
  public void handleMessage(Customer cu){
    log.info("Received customer " + ToStringBuilder.reflectionToString(cu)) ;
  }
}

這個類比其他類更能受益於訊息轉換器。在這裡,我們可以宣告一個接受 Customer 型別引數的方法,而 MessageListenerContainer 知道如何將其轉換並傳遞給 handleMessage 方法。然而,所有相同的規則都適用。異常將觸發回滾等。

總結

在本文中,我們探討了開發人員今天希望在應用程式中整合企業訊息傳遞的兩個選項,使用了 Spring 框架。我們介紹了 Java 訊息服務 (JMS) API 和高階訊息佇列協議 (AMQP) 來處理企業訊息。我們使用核心 Spring 框架和 Spring AMQP 專案提供了同步和非同步示例。我們討論了訊息傳遞如何幫助擴充套件應用程式以及它如何成為整合應用程式的便捷方式。我希望這將有助於您更輕鬆地理解使用企業訊息傳遞時可能做出的選擇,以及 Spring 如何讓您更容易為您的應用程式做出正確的選擇。一如既往,本文的程式碼可以在我們的 Spring Samples 儲存庫中找到。

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有