領先一步
VMware 提供培訓和認證,助您加速進步。
瞭解更多為一個高度可伸縮的系統設計一個良好的路由拓撲就像繪製一張圖。需要考慮許多因素,例如問題本身、環境的限制、訊息傳遞實現的限制以及效能策略。我們經常遇到的問題是,在根據我們的需求調整路由方面缺乏靈活性和表現力。這正是 RabbitMQ 的優勢所在。
api.agents.agent-{id}.operations.{operationName}
在更復雜的案例中,路由鍵可以與訊息頭欄位和/或其內容上的路由相結合。交換機檢查訊息的屬性、頭欄位、訊息體內容以及可能來自其他來源的資料,然後決定如何路由訊息。基於上述路由鍵思想的繫結模式可能看起來像 api.agents..operations.,我們將交換機 E1 繫結到佇列 Q1,繫結模式為 api.agents..operations.,這樣任何傳送到 E1 的訊息,如果其路由鍵與繫結模式匹配,都將路由到 Q1。
Rabbit 代理的結構與 JMS 代理不同。每個 RabbitMQ 伺服器都至少包含一個節點(代理),或者更典型地,是叢集中的多個節點。每個節點都有一個預設的虛擬主機“/”,並且可以建立更多虛擬主機,例如“/development”。Rabbit 虛擬主機類似於 Tomcat 虛擬主機,它們將代理資料劃分為子集。這些虛擬主機內部有交換機和佇列。當用戶使用憑據連線時,它連線到 Rabbit 節點上的虛擬主機。
在這裡,我們連線到 Rabbit 節點,宣告一個用於釋出的交換機,一個用於消費的佇列,一個繫結模式,然後使用 RabbitMQ Java 客戶端 API 釋出幾條訊息。
package org.demo.simple.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public final class RocketSender {
public void sendRockets() throws IOException {
List<String> rocketsWithRoutings = new RocketRouter().build();
Connection connection = new ConnectionFactory().newConnection();
Channel channel = connection.createChannel();
String rocketExchange = "rockets.launched";
channel.exchangeDeclare(rocketExchange, "topic");
String rocketQueue = channel.queueDeclare().getQueue();
channel.queueBind(rocketQueue, rocketExchange, "galaxies.*.planets.*");
for (String rocketTo : rocketsWithRoutings) {
channel.basicPublish(rocketExchange, "galaxies.*.planets." + rocketTo, null, rocketTo.getBytes());
}
channel.close();
connection.close();
}
}
一個簡單的“著陸”火箭的消費可能看起來像
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(rocketQueue, false, queueingConsumer);
int landed = 0;
while (landed < launched) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
String rocketLanded = new String(delivery.getBody());
if (rocketLanded.equalsIgnoreCase("Alderaan")) {
System.out.println("That's no moon, that's a space station.");
}
landed++;
}
為了簡單起見,我們考慮兩種策略
警惕繫結流失。在策略二中,如果您建立許多新佇列及其繫結,每當消費者連線時,您可能會遇到問題。例如,給定交換機 E1...En,許多訊息正在釋出到其中,每當消費者 Cm 連線時,它會從自己的佇列建立到所有 E1...En 的繫結,這可能會導致問題,具體取決於連線速率。
為了緩解繫結流失,請考慮使用自 2.3.1 版起新增的交換機到交換機繫結。每個消費者可以有自己的輔助交換機 Ym,該交換機不得自動刪除。然後將所有 E1...En 繫結到 Ym。這樣,這些繫結始終存在。在這種情況下,每當消費者 Cm 連線時,它只需宣告其佇列並將該佇列繫結到 Ym。如果 Ym 是一個扇出交換機,它將非常快,並將繫結流失率降低到每個連線 1 次,而不是每個連線可能 n 次。
現在考慮建立共享主題交換機:一個用於代理到伺服器路徑的交換機,另一個用於伺服器到代理路徑的交換機,第三個用於處理未經身份驗證的代理,該交換機僅路由到不需要安全性的佇列。現在我們使用繫結模式和訊息路由鍵進行分割槽,併為每個伺服器設定一組,供所有連線到它的代理共享。然後,以其最簡單的形式,當每個代理上線時,它會宣告一個私有交換機和佇列,並將其交換機繫結到共享主題交換機。
我們的關係現在透過交換機到交換機的對映來表達,這減少了流失率,並解耦了代理,使其不必“知道”伺服器佇列。使用這種模式,系統乾淨、解耦且可擴充套件。
經過身份驗證的客戶端交換機將訊息從代理路由到伺服器。它處理所有將訊息釋出到單一消費者佇列的操作,包括那些產生最高頻率訊息的操作。在當前拓撲結構下,對於 10,000 個客戶端,每分鐘大約 60,000 條訊息,或每天 86,400,000 條訊息,這是一個潛在的瓶頸。這很容易解決,RabbitMQ 每天可以處理超過 10 億條訊息,具體取決於您的配置,例如您是否持久化訊息。
我們的伺服器應用程式正在執行一個 RabbitMQ 叢集。請記住,在叢集中,宣告一個交換機會使其出現在所有節點上,而宣告一個佇列只會在其中一個節點上建立它,因此我們必須配置一個解決方案。
RabbitMQ 會動態地將訊息推送到磁碟以釋放 RAM,因此佇列的記憶體佔用不取決於其內容。當佇列空閒 10 秒或更長時間後,它將“休眠”,這將導致該佇列上的垃圾回收。因此,佇列所需的記憶體量會大幅縮小。例如,1000 個空閒的佇列可能佔用 10MB 的 RAM。當它們全部處於活動狀態時(即使是空的),它們當然可能會根據記憶體碎片化情況,消耗更多的記憶體。強制它們重新進入休眠狀態以測試行為很困難,因為 Erlang VM 不會立即將記憶體交還給作業系統。
然而,你可以觀察到一個巨大的程序,它處於休眠狀態且記憶體非常碎片化,因為回收的記憶體足以迫使虛擬機器將記憶體交還給作業系統。如果你執行一個持續增加 Rabbit 記憶體佔用的測試,你可以觀察到休眠對空閒程序的影響,因為它會降低記憶體使用量的增長速度。
Erlang 是一個多執行緒虛擬機器,它利用多核優勢。它向開發人員提供了稱為“程序”的綠色執行緒,因為與執行緒不同,它們在概念上不共享地址空間。這裡有關於 Erlang VM 和程序的一個有趣的自白。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>${rabbitmq.version}</version> <exclusions> <exclusion> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency>