RabbitMQ 的效能和可伸縮性路由拓撲

工程 | Helena Edelson | 2011年4月1日 | ...

為高度可伸縮系統設計良好的路由拓撲可能像繪製圖表一樣。需要考慮很多事情,例如問題本身、環境的限制、訊息實現的限制以及效能策略。我們經常遇到的問題是,路由缺乏靈活性和表達能力來滿足我們的需求。這正是 RabbitMQ 的突出之處。

基本概念

任何熟悉通用訊息傳遞的人都知道將訊息從 A 路由到 B 的概念。路由可以是簡單的,也可以是相當複雜的,在為可伸縮、複雜系統設計路由拓撲時,它必須是優雅的。保持簡潔和解耦,元件可以在不同負載下良好地進行流量控制。這可以表示為簡單的對映或複雜的圖。在其最簡單的形式下,路由拓撲可以表示為節點,例如層級節點。

Hierarchical nodes in message routing topology

對於 RabbitMQ 或 AMQP 的新手(請注意,Rabbit 支援多種協議,包括 STOMP、HTTP、HTTPS、XMPP 和 SMTP),以下是一些基本元件描述:
  • 交換機 (Exchange) 伺服器內的實體,接收來自生產者應用的訊息,並可選擇將這些訊息路由到伺服器內的訊息佇列。
  • 交換機型別 (Exchange type) 某種特定交換機模型的演算法和實現。與“交換機例項”相對,後者是伺服器內接收和路由訊息的實體。
  • 訊息佇列 (Message queue) 一個命名實體,儲存訊息並將其轉發給消費者應用。
  • 繫結 (Binding) 建立訊息佇列和交換機之間關係的實體。
  • 路由鍵 (Routing key) 交換機可用於決定如何路由特定訊息的虛擬地址。
對於點對點路由,路由鍵通常是訊息佇列的名稱。對於主題釋出/訂閱路由,路由鍵通常是層級結構的。

api.agents.agent-{id}.operations.{operationName}

在更復雜的情況下,路由鍵可以與基於訊息頭欄位和/或其內容的路由結合使用。交換機檢查訊息的屬性、頭欄位、訊息體內容以及可能來自其他來源的資料,然後決定如何路由訊息。基於上述路由鍵思想的繫結模式可能看起來像 api.agents..operations.,其中我們將交換機 E1 與佇列 Q1 使用繫結模式 api.agents..operations. 進行繫結,以便傳送到 E1 的任何訊息如果其路由鍵匹配繫結模式,就會路由到 Q1

Rabbit Broker 的結構與 JMS Broker 不同。每個 RabbitMQ 伺服器至少包含一個節點 (broker),或者更典型的是叢集中的多個節點。每個節點都有一個預設的虛擬主機 "/",並且可以建立更多虛擬主機,例如 "/develoment"。Rabbit 虛擬主機類似於 Tomcat 的虛擬主機,將 Broker 資料劃分為子集。在這些虛擬主機內部是交換機和佇列。使用者使用其憑據連線時,是連線到 Rabbit 節點上的虛擬主機。

這裡我們連線到 Rabbit 節點,宣告要釋出到的交換機、要消費的佇列、繫結模式,然後釋出一些訊息,使用 RabbitMQ Java 客戶端 API

package org.demo.simple.rabbit;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public final class RocketSender {

 public void sendRockets() throws IOException {
     List<String> rocketsWithRoutings = new RocketRouter().build();

     Connection connection = new ConnectionFactory().newConnection();
     Channel channel = connection.createChannel();

     String rocketExchange = "rockets.launched";
     channel.exchangeDeclare(rocketExchange, "topic");
     String rocketQueue = channel.queueDeclare().getQueue();
     channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");

     for (String rocketTo : rocketsWithRoutings) {
         channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
     }

     channel.close();
     connection.close();
 }
}

對“已著陸”火箭的簡單消費可能看起來像這樣:


 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
 channel.basicConsume(rocketQueue, false, queueingConsumer);

 int landed = 0;
 while (landed < launched) {
     QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     String rocketLanded = new String(delivery.getBody());

     if (rocketLanded.equalsIgnoreCase("Alderaan")) {
         System.out.println("That's no moon, that's a space station.");
     }
     landed++;
 }

問題

在考慮哪種路由策略在效能本身也可以改進的可伸縮環境中表現最佳時,有很多選項。訊息傳遞的一個優點是提供了各種配置,並能夠找出既能解決當前需求又能滿足增長需求的正確配置。

為了保持簡單,我們考慮兩種策略:

  1. 使用層級路由鍵的高度分割槽路由,較少的主題交換機。
  2. 數量更多的直接交換機和佇列,路由分割槽少得多。
每種場景都遵循這個用例:每個必須擴充套件的應用既是生產者又是消費者。

從何處開始

在深入研究一種能夠隨著時間推移清晰高效地擴充套件的路由解決方案之前,最好先評估你的環境及其元件。例如,什麼有助於擴充套件?一般來說,解耦、分散式、非同步性、並行性、抽象和間接性等等。然後考慮哪些元素是當前或潛在的瓶頸。一個基本原則是,高流量/大容量路徑需要更高效的吞吐量,否則你的分發會面臨瓶頸的風險。一個練習是按照流量或作為熱力圖來對它們進行排名。接下來,你能否對你的流量進行分類——是否存在總體模式、主題或類似訊息型別,以及它們之間的關係是什麼?現在開始考慮整合,如何以及在哪裡可以提高效率,並應用經過驗證的模式來解決這些熱點問題,為了擴充套件而解耦,並提高效能。

常規路由注意事項

所有交換機型別的行為都不同。以下是一些通用規則:
  • 如果應用圖中的路由鍵是有限域,那麼許多扇出交換機可能是合適的選擇(每個路由鍵與一個交換機 1:1 對映)。
  • 如果路由鍵數量可能無限,請考慮主題交換機。
  • 對於主題路由,效能隨著繫結數量的增加而降低。
  • 扇出交換機非常快,因為它們沒有路由需要處理,但如果繫結到大量佇列,情況就會改變。
  • 直接交換機是主題交換機的一種更快形式,前提是你不需要使用萬用字元。
  • 對超過 100,000 個佇列進行故障排除可能比擁有更多繫結、更少交換機和佇列的拓撲更繁瑣。
  • 數量非常多的交換機和佇列會佔用更多記憶體,這可能很重要,但這確實取決於具體情況。
自 RabbitMQ 2.4.0(2011年3月23日釋出)起,新的主題路由演算法最佳化可在峰值時比之前的主題演算法快 60 倍。因此,一個建議是使用更少的交換機和佇列,而使用更多的路由,因為現在時間增加是最小的。

效能

什麼是廉價的?

就記憶體成本而言,是交換機和繫結。RabbitMQ 構建於 Erlang 之上,其中每個節點 (broker) 都是一個程序,每個佇列也是一個程序。預設情況下,Erlang VM 程序限制設定為 1M,可以提高。然而,出於可伸縮性考慮,交換機不是一個程序,它只是 RabbitMQ 內建的 Mnesia 資料庫中的一行。在叢集中,宣告一個交換機會使其出現在叢集的所有節點上,而宣告一個佇列只會在其中一個節點上建立。這解釋了為什麼交換機在節點重啟或在叢集中建立節點後仍然存在,而佇列不會。

要警惕繫結流失。在策略二中,如果你建立許多新佇列及其繫結,每當消費者連線時,你可能會遇到問題。例如,給定有大量訊息釋出的交換機 E1...En,每當消費者 Cm 連線時,它就會從其自己的佇列建立到所有 E1...En 的繫結,這可能會導致問題,具體取決於連線速率。

為了緩解繫結流失,考慮使用交換機到交換機繫結,這是 2.3.1 版本的新功能。每個消費者可以擁有自己的二級交換機 Ym,該交換機必須是非自動刪除的。然後將所有 E1...En 繫結到 Ym。這樣,這些繫結始終存在。在此場景中,每當消費者 Cm 連線時,它只需宣告其佇列並將該佇列繫結到 Ym。如果 Ym 是一個扇出交換機,它將非常快,並將繫結流失率降低到每次連線 1 個繫結,而不是可能每次連線 n 個繫結。

Exchange-to-Exchange Binding

用例

交換機到交換機可伸縮用例

考慮一個具有自主代理的伺服器應用。每個代理都在一個虛擬機器上,該虛擬機器是彈性擴充套件系統的一部分。當每個代理啟動時,它會向伺服器傳送一條線上訊息,隨後是許多其他訊息,例如認證和資料傳輸。如果我們有 1,000 個代理,每個代理宣告 50 個直接交換機、佇列和繫結,那麼每個代理必須知道伺服器的佇列,以便履行佇列宣告 (queue.declare) 操作上的繫結契約。這不是一個可伸縮的解決方案。

現在考慮建立共享主題交換機:一個用於代理到伺服器的路徑,另一個用於伺服器到代理的路徑,第三個用於處理未認證的代理,僅路由到不需要安全認證的佇列。現在我們使用繫結模式、訊息路由鍵進行分割槽,併為每個伺服器設定一組供所有連線到它的代理共享。然後,以最簡單的形式,當每個代理上線時,它會宣告一個私有交換機和佇列,並將其交換機繫結到共享主題交換機。

我們的關係現在透過交換機到交換機的對映來表達,這減少了流失率並解耦了代理與“知道”伺服器佇列的依賴關係。使用此模式,系統變得清晰、解耦且可伸縮。

彈性擴充套件用例

讓我們將前一個場景進一步推進。我們已經在場景二(許多直接路由)的基礎上使用了主題釋出/訂閱路由。現在假設系統要求增加到資料中心中擴充套件我們的伺服器應用叢集,擁有 50,000 或更多代理。我們如何應對不同的負載?

認證客戶端交換機將訊息從代理路由到伺服器。它處理所有釋出訊息到單消費者佇列的操作,包括那些產生訊息頻率最高的佇列。根據當前拓撲,對於 10,000 個客戶端,這可能是一個潛在的瓶頸,每分鐘大約產生 60,000 條訊息,即每天 86,400,000 條訊息。這很容易解決,根據你的配置(例如是否持久化訊息),RabbitMQ 每天可以處理超過 10 億條訊息。

我們的伺服器應用正在執行一個 RabbitMQ 叢集。記住,在叢集中,宣告一個交換機會使其出現在所有節點上,而宣告一個佇列只會在其中一個節點上建立,因此我們必須配置一個解決方案。

生產者和消費者之間的負載均衡

為了有效處理隨著更多客戶端應用(代理)上線而可能產生的極高負載,我們可以通過幾種方式修改此拓撲。首先,對上述配置進行最佳化,以在 Rabbit 叢集中實現訊息的負載均衡。我們可以為 Rabbit 叢集中的每個節點建立一個佇列。如果我們有四個節點,對於每個高流量佇列,我們可以為該操作建立 hfq.{0,1,2,3}。現在每個代理可以隨機選擇一個節點(透過 0 到 3 之間的一個數字,或者更復雜的輪詢實現)進行釋出。使用 RabbitMQ 有 RPC 呼叫,或者你可以使用 Rabbit 管理外掛 獲取節點數量,這可以在你的輪詢演算法中使用。

帶輪詢分發的 Worker 佇列

Worker 佇列(或任務佇列)通常用於有效地在多個 worker 之間分發耗時的任務,並輕鬆實現工作並行化。此外,此拓撲適用於無需執行資源密集型任務並等待其完成的情況。執行多個 worker 佇列可以將這些任務分發到它們之間。

對於 Worker 佇列,Rabbit 預設使用輪詢分發方法,將每條訊息按順序傳送給下一個消費者。每個消費者接收到的訊息數量大致相同。如果你宣告一個佇列並啟動 3 個競爭性消費者,將它們繫結到交換機,併發送 20,000 條訊息,那麼訊息 0 將路由到第一個消費者,訊息 1 到第二個,訊息 2 到第三個,依此類推。如果我們開始積壓任務,我們可以簡單地增加更多 worker,從而使系統輕鬆擴充套件。

效能

記憶體

上述兩種選項都不會必然導致 RabbitMQ 的高負載。建立的交換機和佇列數量沒有硬性限制,在一個 Broker 上執行 100,000 個佇列是沒問題的。透過適當的調優和足夠的記憶體,你可以執行超過一百萬個佇列。

RabbitMQ 會動態地將訊息推送到磁碟以釋放 RAM,因此佇列的記憶體佔用不依賴於其內容。佇列空閒 10 秒或更長時間後,它會“休眠”,這會觸發該佇列的 GC。因此,佇列所需的記憶體量可以顯著縮小。例如,1000 個空的、空閒的佇列可能佔用 10MB 的 RAM。當它們全部活躍時(即使為空),當然,根據記憶體碎片情況,它們可能會佔用更多記憶體。強迫它們再次進入休眠狀態以測試行為很困難,因為 Erlang VM 不會立即將記憶體返還給作業系統。

但是,你可以觀察到一個休眠且記憶體非常碎片化的大程序,因為回收的記憶體量足以迫使 VM 將記憶體返還給 OS。如果你執行一個測試,該測試穩步增加 Rabbit 的記憶體佔用,你可以觀察到休眠對空閒程序的影響,因為它降低了記憶體使用量的增長速度。

Erlang 是一個多執行緒 VM,它利用多核優勢。它向開發者提供了“綠色執行緒”,被稱為“程序”,因為與執行緒不同,它們概念上不共享地址空間。這裡有一篇關於 Erlang VM 和程序 的有趣自述。

事務

釋出 10,000 條訊息的事務可能需要長達四分鐘。RabbitMQ 的一個新功能叫做 釋出者確認 (Publisher Confirms),比相同但採用事務的程式碼快 100 倍以上。如果你沒有明確要求實現事務,但確實需要驗證,可以考慮這個選項。

要點總結

以下是一些最終要點,可幫助你從實現中獲得最大的效能收益:
  • 新的主題路由演算法最佳化在峰值時快 60 倍。
  • 使用萬用字元的繫結模式,其中 '*' 匹配單個詞,比 '#' 匹配零個或多個詞要快得多。萬用字元 '#' 在路由表中處理所需的時間比 '*' 長。
  • 交換機到交換機繫結提高了解耦性,增加了拓撲靈活性,減少了繫結流失,並有助於提高效能。
  • 交換機和繫結非常輕量級。
  • RabbitMQ 釋出者確認比 AMQP 事務快 100 倍以上。
  • 佇列空閒 >=10 秒後會“休眠”,觸發佇列的 GC,從而顯著減少該佇列所需的記憶體。
  • Worker 佇列有助於並行化和分發工作負載。
  • 在 Rabbit 叢集中分發 Worker 佇列有助於擴充套件。
  • 對你的拓撲進行負載均衡。
這絕不是關於這個主題的論文,確實還有許多其他模式、拓撲和效能細節需要考慮。策略,一如既往,取決於許多因素,但我希望這些內容足夠有幫助,或至少能引導讀者朝著正確的方向思考。

獲取

RabbitMQ GitHub 原始碼 RabbitMQ 二進位制下載和外掛 Erlang 下載 適用於 Java 和 .NET 的 Spring AMQP API 用於監控 RabbitMQ 的 Hyperic Maven
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>

訂閱 Spring 新聞通訊

透過 Spring 新聞通訊保持聯絡

訂閱

保持領先

VMware 提供培訓和認證,助力你快速進步。

瞭解更多

獲取支援

Tanzu Spring 透過一個簡單的訂閱提供 OpenJDK™、Spring 和 Apache Tomcat® 的支援和二進位制檔案。

瞭解更多

即將到來的活動

檢視 Spring 社群所有即將到來的活動。

檢視全部