使用 RabbitMQ 實現高效能和可擴充套件性的路由拓撲

工程 | Helena Edelson | 2011年4月1日 | ...

為一個高度可伸縮的系統設計一個良好的路由拓撲就像繪製一張圖。需要考慮許多因素,例如問題本身、環境的限制、訊息傳遞實現的限制以及效能策略。我們經常遇到的問題是,在根據我們的需求調整路由方面缺乏靈活性和表現力。這正是 RabbitMQ 的優勢所在。

基本概念

任何熟悉通用訊息傳遞的人都瞭解從 A 到 B 路由訊息的概念。路由可以是簡單的,也可以是非常複雜的,當為一個可擴充套件的複雜系統設計路由拓撲時,它必須是優雅的。保持乾淨和解耦,元件可以很好地處理不同的負載。這可以表示為簡單的對映或複雜的圖。在其最簡單的形式中,路由拓撲可以表示為節點,例如分層節點

Hierarchical nodes in message routing topology

對於 RabbitMQ 或 AMQP 的新手(請注意,Rabbit 相容多種協議,包括 STOMP、HTTP、HTTPS、XMPP 和 SMTP),以下是一些基本元件的描述
  • 交換機(Exchange) 伺服器內的實體,從生產者應用程式接收訊息,並可選地將這些訊息路由到伺服器內的訊息佇列
  • 交換機型別(Exchange type) 交換機特定模型的演算法和實現。與“交換機例項”相對,後者是伺服器內接收和路由訊息的實體
  • 訊息佇列(Message queue) 一個命名的實體,用於儲存訊息並將其轉發給消費者應用程式
  • 繫結(Binding) 一個在訊息佇列和交換機之間建立關係的實體
  • 路由鍵(Routing key) 交換機可能用於決定如何路由特定訊息的虛擬地址
對於點對點路由,路由鍵通常是訊息佇列的名稱。對於主題釋出-訂閱路由,路由鍵通常是分層的

api.agents.agent-{id}.operations.{operationName}

在更復雜的案例中,路由鍵可以與訊息頭欄位和/或其內容上的路由相結合。交換機檢查訊息的屬性、頭欄位、訊息體內容以及可能來自其他來源的資料,然後決定如何路由訊息。基於上述路由鍵思想的繫結模式可能看起來像 api.agents..operations.,我們將交換機 E1 繫結到佇列 Q1,繫結模式為 api.agents..operations.,這樣任何傳送到 E1 的訊息,如果其路由鍵與繫結模式匹配,都將路由到 Q1

Rabbit 代理的結構與 JMS 代理不同。每個 RabbitMQ 伺服器都至少包含一個節點(代理),或者更典型地,是叢集中的多個節點。每個節點都有一個預設的虛擬主機“/”,並且可以建立更多虛擬主機,例如“/development”。Rabbit 虛擬主機類似於 Tomcat 虛擬主機,它們將代理資料劃分為子集。這些虛擬主機內部有交換機和佇列。當用戶使用憑據連線時,它連線到 Rabbit 節點上的虛擬主機。

在這裡,我們連線到 Rabbit 節點,宣告一個用於釋出的交換機,一個用於消費的佇列,一個繫結模式,然後使用 RabbitMQ Java 客戶端 API 釋出幾條訊息。

package org.demo.simple.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public final class RocketSender {

 public void sendRockets() throws IOException {
     List<String> rocketsWithRoutings = new RocketRouter().build();

     Connection connection = new ConnectionFactory().newConnection();
     Channel channel = connection.createChannel();

     String rocketExchange = "rockets.launched";
     channel.exchangeDeclare(rocketExchange, "topic");
     String rocketQueue = channel.queueDeclare().getQueue();
     channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");

     for (String rocketTo : rocketsWithRoutings) {
         channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
     }

     channel.close();
     connection.close();
 }
}

一個簡單的“著陸”火箭的消費可能看起來像


 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
 channel.basicConsume(rocketQueue, false, queueingConsumer);

 int landed = 0;
 while (landed < launched) {
     QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     String rocketLanded = new String(delivery.getBody());

     if (rocketLanded.equalsIgnoreCase("Alderaan")) {
         System.out.println("That's no moon, that's a space station.");
     }
     landed++;
 }

問題

在考慮哪種路由策略在可擴充套件環境中表現最佳,並且效能本身也可以提高時,有許多選項。通常,訊息傳遞的一大優點是可用的各種配置,以及找出能夠解決當前和不斷增長的需求的正確配置。

為了簡單起見,我們考慮兩種策略

  1. 高度分割槽路由,帶有分層路由鍵,較少的主題交換機
  2. 數量更多的直接交換機和佇列,路由分割槽少得多
每個場景都遵循此用例:每個必須擴充套件的應用程式既是生產者又是消費者

從何開始

在深入研究一個能隨著時間乾淨高效地擴充套件的路由解決方案之前,最好先評估一下您的環境及其元件。例如,什麼有助於擴充套件?通常,解耦、分散式、非同步、並行性、抽象層次和間接性等等。然後考慮哪些元素是當前或潛在的瓶頸。一個基本原則是,高流量/高容量的路徑需要更高效的吞吐量,否則您會面臨分散式瓶頸的風險。一個練習是根據流量或作為熱圖來對這些進行排名。接下來,您能否對您的流量進行分類——是否有總體模式、主題或類似的訊息型別,以及它們之間的關係是什麼?現在開始考慮整合,如何以及在哪裡可以提高效率,並應用經過測試的模式來解決這些熱點,為擴充套件解耦,並提高效能。

一般路由注意事項

所有交換機型別行為不同。以下是一些通用規則
  • 如果應用程式圖中路由鍵的域是有限的,那麼許多扇出交換機可能更適合(每個路由鍵與一個交換機一對一對映)
  • 如果路由鍵的數量可能無限,請考慮主題交換機
  • 對於主題路由,效能隨著繫結數量的增加而降低
  • 扇出交換機非常快,因為它們沒有路由要處理,但如果繫結到大量佇列,情況就會改變
  • 直接交換機是主題交換機的一種更快形式,前提是您不需要萬用字元
  • 排查超過 100,000 個佇列的問題可能很繁瑣,相比之下,具有更多繫結、更少交換機和佇列的拓撲結構則更容易
  • 非常多的交換機和佇列會佔用更多的記憶體,這可能很重要,但這確實取決於具體情況
自 2011 年 3 月 23 日釋出的 RabbitMQ 2.4.0 起,提供了一種新的主題路由演算法最佳化,在高峰期比以前的主題演算法快 60 倍。因此,一個建議是減少交換機和佇列,增加路由,因為現在時間的增加是最小的。

效能

什麼便宜?

就記憶體成本而言,交換機和繫結。在 RabbitMQ 構建所基於的 Erlang 中,每個節點(代理)都是一個程序,每個佇列也是一個程序。預設情況下,Erlang VM 程序限制設定為 1M,可以提高。然而,交換機出於可伸縮性原因不是程序,它只是 RabbitMQ 內建 Mnesia 資料庫中的一行。在叢集中,宣告一個交換機會使其出現在叢集的所有節點上,而宣告一個佇列只會在其中一個節點上建立它。這解釋了為什麼交換機在節點重啟或在叢集中建立節點後仍然存在,而佇列則不會。

警惕繫結流失。在策略二中,如果您建立許多新佇列及其繫結,每當消費者連線時,您可能會遇到問題。例如,給定交換機 E1...En,許多訊息正在釋出到其中,每當消費者 Cm 連線時,它會從自己的佇列建立到所有 E1...En 的繫結,這可能會導致問題,具體取決於連線速率。

為了緩解繫結流失,請考慮使用自 2.3.1 版起新增的交換機到交換機繫結。每個消費者可以有自己的輔助交換機 Ym,該交換機不得自動刪除。然後將所有 E1...En 繫結到 Ym。這樣,這些繫結始終存在。在這種情況下,每當消費者 Cm 連線時,它只需宣告其佇列並將該佇列繫結到 Ym。如果 Ym 是一個扇出交換機,它將非常快,並將繫結流失率降低到每個連線 1 次,而不是每個連線可能 n 次。

Exchange-to-Exchange Binding

用例

交換機到交換機可擴充套件用例

考慮一個帶有自主代理的伺服器應用程式。每個代理都在一個虛擬機器上,該虛擬機器是一個彈性擴充套件系統的一部分。每個代理啟動時,它都會向伺服器傳送一條訊息,表示它線上,然後是許多其他訊息,例如身份驗證和資料傳輸。如果我們有 1,000 個代理,每個代理宣告 50 個直接交換機、佇列和繫結,那麼每個代理都必須知道伺服器的佇列才能在 queue.declare 操作中履行繫結契約。這不是一個可擴充套件的解決方案。

現在考慮建立共享主題交換機:一個用於代理到伺服器路徑的交換機,另一個用於伺服器到代理路徑的交換機,第三個用於處理未經身份驗證的代理,該交換機僅路由到不需要安全性的佇列。現在我們使用繫結模式和訊息路由鍵進行分割槽,併為每個伺服器設定一組,供所有連線到它的代理共享。然後,以其最簡單的形式,當每個代理上線時,它會宣告一個私有交換機和佇列,並將其交換機繫結到共享主題交換機。

我們的關係現在透過交換機到交換機的對映來表達,這減少了流失率,並解耦了代理,使其不必“知道”伺服器佇列。使用這種模式,系統乾淨、解耦且可擴充套件。

彈性擴充套件用例

讓我們把之前的場景更進一步。我們已經在使用場景 2 的主題釋出-訂閱路由:許多直接路由。現在假設系統要求在資料中心中擴充套件我們的伺服器應用程式叢集,擁有 50,000 個或更多代理。我們如何應對不同的負載?

經過身份驗證的客戶端交換機將訊息從代理路由到伺服器。它處理所有將訊息釋出到單一消費者佇列的操作,包括那些產生最高頻率訊息的操作。在當前拓撲結構下,對於 10,000 個客戶端,每分鐘大約 60,000 條訊息,或每天 86,400,000 條訊息,這是一個潛在的瓶頸。這很容易解決,RabbitMQ 每天可以處理超過 10 億條訊息,具體取決於您的配置,例如您是否持久化訊息。

我們的伺服器應用程式正在執行一個 RabbitMQ 叢集。請記住,在叢集中,宣告一個交換機會使其出現在所有節點上,而宣告一個佇列只會在其中一個節點上建立它,因此我們必須配置一個解決方案。

生產者和消費者之間的負載均衡

為了有效處理隨著更多客戶端應用程式(代理)上線而可能產生的非常高的負載,我們可以通過幾種方式修改此拓撲。首先,對上述配置進行最佳化,以在 Rabbit 叢集中進行訊息負載均衡。我們可以為 Rabbit 叢集中的每個節點建立一個佇列。如果我們有四個節點,對於每個高流量佇列,我們為該操作建立 hfq.{0,1,2,3}。現在每個代理可以透過一個 0 到 3 之間的數字隨機選擇一個節點,或者使用更復雜的輪詢實現來發布訊息。使用 RabbitMQ,有 RPC 呼叫,或者您可以使用 Rabbit 管理外掛來獲取節點數量,您可以在您的輪詢演算法中使用它。

帶輪詢排程的工人佇列

工人佇列,或稱任務佇列,通常用於在多個工人之間高效地分配耗時任務,並輕鬆實現工作並行化。此外,這種拓撲結構適用於消除執行資源密集型任務以及必須阻塞直到它們完成的需求。執行多個工人佇列允許在它們之間分配這些任務。

對於工人佇列,預設情況下,Rabbit 使用輪詢分發方法,按順序將每條訊息傳送給下一個消費者。每個消費者接收大致相同數量的訊息。如果您宣告一個佇列並啟動 3 個競爭消費者,將它們繫結到交換機,然後傳送 20,000 條訊息,訊息零將路由到第一個消費者,訊息一路由到第二個,訊息二路由到第三個,依此類推。如果任務積壓開始增加,我們可以簡單地新增更多工人,從而使系統易於擴充套件。

效能

記憶體

以上兩種選項都不會必然在 RabbitMQ 中引起高負載。交換機和佇列的數量沒有硬性限制,可以建立,在一個代理上執行 100,000 個佇列是沒問題的。經過適當的調整和足夠的 RAM,您可以執行超過一百萬個佇列。

RabbitMQ 會動態地將訊息推送到磁碟以釋放 RAM,因此佇列的記憶體佔用不取決於其內容。當佇列空閒 10 秒或更長時間後,它將“休眠”,這將導致該佇列上的垃圾回收。因此,佇列所需的記憶體量會大幅縮小。例如,1000 個空閒的佇列可能佔用 10MB 的 RAM。當它們全部處於活動狀態時(即使是空的),它們當然可能會根據記憶體碎片化情況,消耗更多的記憶體。強制它們重新進入休眠狀態以測試行為很困難,因為 Erlang VM 不會立即將記憶體交還給作業系統。

然而,你可以觀察到一個巨大的程序,它處於休眠狀態且記憶體非常碎片化,因為回收的記憶體足以迫使虛擬機器將記憶體交還給作業系統。如果你執行一個持續增加 Rabbit 記憶體佔用的測試,你可以觀察到休眠對空閒程序的影響,因為它會降低記憶體使用量的增長速度。

Erlang 是一個多執行緒虛擬機器,它利用多核優勢。它向開發人員提供了稱為“程序”的綠色執行緒,因為與執行緒不同,它們在概念上不共享地址空間。這裡有關於 Erlang VM 和程序的一個有趣的自白。

事務

10,000 條訊息的事務可能需要長達四分鐘才能釋出。RabbitMQ 的一個新功能,稱為 Publisher Confirms,比相同但事務性的程式碼快 100 倍以上。如果您沒有明確要求實現事務但確實需要驗證,您可以考慮此選項。

總結

以下是一些最終的要點,可幫助您從實現中榨取最大的效能提升
  • 新的主題路由演算法最佳化在高峰期速度提高 60 倍
  • 使用萬用字元 '*'(匹配單個單詞)的主題繫結模式比使用 '#'(匹配零個或多個單詞)快得多。萬用字元 '#' 在路由表中處理所需的時間比 '*' 長
  • 交換機到交換機繫結提高了解耦性,增加了拓撲靈活性,減少了繫結流失,並有助於提高效能
  • 交換機和繫結非常輕量級
  • RabbitMQ 釋出者確認比 AMQP 事務快 100 倍以上
  • 佇列空閒 >=10 秒後,它將“休眠”,導致佇列進行 GC,從而大大減少該佇列所需的記憶體
  • 工作佇列有助於並行化和分散式工作負載
  • 在 Rabbit 叢集中分發工作佇列有助於擴充套件
  • 對您的拓撲進行負載均衡
這絕不是關於這個主題的論文,確實還有更多模式、拓撲和效能細節需要考慮。策略一如既往地取決於許多因素,但我希望這足以幫助或至少讓人朝著正確的方向思考。

獲取它

GitHub 上的 RabbitMQ 原始碼 RabbitMQ 二進位制下載和外掛 Erlang 下載 Spring AMQP Java 和 .NET 中 RabbitMQ 的 API 用於監控 RabbitMQ 的 Hyperic Maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>

獲取 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

領先一步

VMware 提供培訓和認證,助您加速進步。

瞭解更多

獲得支援

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案,只需一份簡單的訂閱。

瞭解更多

即將舉行的活動

檢視 Spring 社群所有即將舉行的活動。

檢視所有