保持領先
VMware 提供培訓和認證,助力你快速進步。
瞭解更多為高度可伸縮系統設計良好的路由拓撲可能像繪製圖表一樣。需要考慮很多事情,例如問題本身、環境的限制、訊息實現的限制以及效能策略。我們經常遇到的問題是,路由缺乏靈活性和表達能力來滿足我們的需求。這正是 RabbitMQ 的突出之處。
api.agents.agent-{id}.operations.{operationName}
在更復雜的情況下,路由鍵可以與基於訊息頭欄位和/或其內容的路由結合使用。交換機檢查訊息的屬性、頭欄位、訊息體內容以及可能來自其他來源的資料,然後決定如何路由訊息。基於上述路由鍵思想的繫結模式可能看起來像 api.agents..operations.
,其中我們將交換機 E1
與佇列 Q1
使用繫結模式 api.agents..operations.
進行繫結,以便傳送到 E1
的任何訊息如果其路由鍵匹配繫結模式,就會路由到 Q1
。
Rabbit Broker 的結構與 JMS Broker 不同。每個 RabbitMQ 伺服器至少包含一個節點 (broker),或者更典型的是叢集中的多個節點。每個節點都有一個預設的虛擬主機 "/",並且可以建立更多虛擬主機,例如 "/develoment"。Rabbit 虛擬主機類似於 Tomcat 的虛擬主機,將 Broker 資料劃分為子集。在這些虛擬主機內部是交換機和佇列。使用者使用其憑據連線時,是連線到 Rabbit 節點上的虛擬主機。
這裡我們連線到 Rabbit 節點,宣告要釋出到的交換機、要消費的佇列、繫結模式,然後釋出一些訊息,使用 RabbitMQ Java 客戶端 API。
package org.demo.simple.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public final class RocketSender {
public void sendRockets() throws IOException {
List<String> rocketsWithRoutings = new RocketRouter().build();
Connection connection = new ConnectionFactory().newConnection();
Channel channel = connection.createChannel();
String rocketExchange = "rockets.launched";
channel.exchangeDeclare(rocketExchange, "topic");
String rocketQueue = channel.queueDeclare().getQueue();
channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");
for (String rocketTo : rocketsWithRoutings) {
channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
}
channel.close();
connection.close();
}
}
對“已著陸”火箭的簡單消費可能看起來像這樣:
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);
int landed = 0;
while (landed < launched) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
String rocketLanded = new String(delivery.getBody());
if (rocketLanded.equalsIgnoreCase("Alderaan")) {
System.out.println("That's no moon, that's a space station.");
}
landed++;
}
為了保持簡單,我們考慮兩種策略:
要警惕繫結流失。在策略二中,如果你建立許多新佇列及其繫結,每當消費者連線時,你可能會遇到問題。例如,給定有大量訊息釋出的交換機 E1...En
,每當消費者 Cm
連線時,它就會從其自己的佇列建立到所有 E1...En
的繫結,這可能會導致問題,具體取決於連線速率。
為了緩解繫結流失,考慮使用交換機到交換機繫結,這是 2.3.1 版本的新功能。每個消費者可以擁有自己的二級交換機 Ym
,該交換機必須是非自動刪除的。然後將所有 E1...En
繫結到 Ym
。這樣,這些繫結始終存在。在此場景中,每當消費者 Cm
連線時,它只需宣告其佇列並將該佇列繫結到 Ym
。如果 Ym 是一個扇出交換機,它將非常快,並將繫結流失率降低到每次連線 1 個繫結,而不是可能每次連線 n 個繫結。
現在考慮建立共享主題交換機:一個用於代理到伺服器的路徑,另一個用於伺服器到代理的路徑,第三個用於處理未認證的代理,僅路由到不需要安全認證的佇列。現在我們使用繫結模式、訊息路由鍵進行分割槽,併為每個伺服器設定一組供所有連線到它的代理共享。然後,以最簡單的形式,當每個代理上線時,它會宣告一個私有交換機和佇列,並將其交換機繫結到共享主題交換機。
我們的關係現在透過交換機到交換機的對映來表達,這減少了流失率並解耦了代理與“知道”伺服器佇列的依賴關係。使用此模式,系統變得清晰、解耦且可伸縮。
認證客戶端交換機將訊息從代理路由到伺服器。它處理所有釋出訊息到單消費者佇列的操作,包括那些產生訊息頻率最高的佇列。根據當前拓撲,對於 10,000 個客戶端,這可能是一個潛在的瓶頸,每分鐘大約產生 60,000 條訊息,即每天 86,400,000 條訊息。這很容易解決,根據你的配置(例如是否持久化訊息),RabbitMQ 每天可以處理超過 10 億條訊息。
我們的伺服器應用正在執行一個 RabbitMQ 叢集。記住,在叢集中,宣告一個交換機會使其出現在所有節點上,而宣告一個佇列只會在其中一個節點上建立,因此我們必須配置一個解決方案。
RabbitMQ 會動態地將訊息推送到磁碟以釋放 RAM,因此佇列的記憶體佔用不依賴於其內容。佇列空閒 10 秒或更長時間後,它會“休眠”,這會觸發該佇列的 GC。因此,佇列所需的記憶體量可以顯著縮小。例如,1000 個空的、空閒的佇列可能佔用 10MB 的 RAM。當它們全部活躍時(即使為空),當然,根據記憶體碎片情況,它們可能會佔用更多記憶體。強迫它們再次進入休眠狀態以測試行為很困難,因為 Erlang VM 不會立即將記憶體返還給作業系統。
但是,你可以觀察到一個休眠且記憶體非常碎片化的大程序,因為回收的記憶體量足以迫使 VM 將記憶體返還給 OS。如果你執行一個測試,該測試穩步增加 Rabbit 的記憶體佔用,你可以觀察到休眠對空閒程序的影響,因為它降低了記憶體使用量的增長速度。
Erlang 是一個多執行緒 VM,它利用多核優勢。它向開發者提供了“綠色執行緒”,被稱為“程序”,因為與執行緒不同,它們概念上不共享地址空間。這裡有一篇關於 Erlang VM 和程序 的有趣自述。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>