RabbitMQ的web頁面介紹(三)
一、Virtual Hosts
每一個 RabbitMQ 伺服器都能創建虛擬的消息伺服器,我們稱之為虛擬主機 (virtual host) ,簡稱為vhost。每一個 vhost 本質上是一個獨立的小型 RabbitMQ 伺服器,擁有自己獨立的隊列、交換器及綁定關係等,井且它擁有自己獨立的許可權。vhost 就像是虛擬機與物理伺服器一樣,它們在各個實例間提供邏輯上的分離,為不同程式安全保密地運行數據,它既能將同一個RabbitMQ 中的眾多客戶區分開,又可以避免隊列和交換器等命名衝突。vhost 之間是絕對隔離的,無法將 vhostl 中的交換器與 vhost2 中的隊列進行綁定,這樣既保證了安全性,又可以確保可移植性。如果在使用 RabbitMQ 達到一定規模的時候,建議用戶對業務功能、場景進行歸類區分,並為之分配獨立的 vhost。
1.1、Virtual Hosts 的功能說明
vhost可以限制最大連接數和最大隊列數,並且可以設置vhost下的用戶資源許可權和Topic許可權,具體許可權見下方說明。
- 在 Admin -> Limits 頁面可以設置vhost的最大連接數和最大隊列數,達到限制後,繼續創建,將會報錯。
- 用戶資源許可權是指RabbitMQ 用戶在客戶端執行AMQP操作命令時,擁有對資源的操作和使用許可權。許可權分為三個部分: configure、write、read ,見下方表格說明。參考://www.rabbitmq.com/access-control.html#permissions
AMQP 0-9-1 Operation | configure | write | read | |
---|---|---|---|---|
exchange.declare | (passive=false) | exchange | ||
exchange.declare | (passive=true) | |||
exchange.declare | (with [AE](ae.html)) | exchange | exchange (AE) | exchange |
exchange.delete | exchange | |||
queue.declare | (passive=false) | queue | ||
queue.declare | (passive=true) | |||
queue.declare | (with [DLX](dlx.html)) | queue | exchange (DLX) | queue |
queue.delete | queue | |||
exchange.bind | exchange (destination) | exchange (source) | ||
exchange.unbind | exchange (destination) | exchange (source) | ||
queue.bind | queue | exchange | ||
queue.unbind | queue | exchange | ||
basic.publish | exchange | |||
basic.get | queue | |||
basic.consume | queue | |||
queue.purge | queue |
舉例說明:
-
- 比如創建隊列時,會調用 queue.declare 方法,此時會使用到 configure 許可權,會校驗隊列名是否與 configure 的表達式匹配。
- 比如隊列綁定交換器時,會調用 queue.bind 方法,此時會用到 write 和 read 許可權,會檢驗隊列名是否與 write 的表達式匹配,交換器名是否與 read 的表達式匹配。
-
- Topic許可權是RabbitMQ 針對STOMP和MQTT等協議實現的一種許可權。由於這類協議都是基於Topic消費的,而AMQP是基於Queue消費,所以AMQP的標準資源許可權不適合用在這類協議中,而Topic許可權也不適用於AMQP協議。所以,我們一般不會去使用它,只用在使用了MQTT這類的協議時才可能會用到。
2.2、vhost使用示例
1. 使用管理員用戶登錄Web管理介面。
2.頁面添加一個名為 v1 的Virtual Hosts。(此時還需要為此vhost分配用戶,添加一個新用戶)

3.在 Admin -> Users 頁面添加一個名為 order-user 的用戶,並設置為 management 角色。
4. 從 Admin 進入 order-user 的用戶設置介面,在 Permissions 中,為用戶分配vhost為/v1,並為每種許可權設置需要匹配的目標名稱的正則表達式。

欄位名
|
值
|
說明
|
Virtual Host
|
/v1
|
指定用戶的vhost,以下許可權都只限於 /v1 vhost中
|
Configure regexp
|
eq-.*
|
只能操作名稱以eq-開頭的exchange或queue;為空則不能操作任何exchange和queue
|
Write regexp
|
.*
|
能夠發送消息到任意名稱的exchange,並且能綁定到任意名稱的隊列和任意名稱的目標交
換器(指交換器綁定到交換器),為空表示沒有許可權
|
Read regexp
|
^test$
|
只能消費名為test隊列上的消息,並且只能綁定到名為test的交換器
|
5.程式碼演示
public class Producer { public static void main(String[] args) { // 1、創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 2、設置連接屬性 factory.setUsername("order-user"); factory.setPassword("order-user"); factory.setVirtualHost("v1"); Connection connection = null; Channel channel = null; // 3、設置每個節點的鏈接地址和埠 Address[] addresses = new Address[]{ new Address("192.168.0.1", 5672), new Address("192.168.0.2", 5672) }; try { // 開啟/關閉連接自動恢復,默認是開啟狀態 factory.setAutomaticRecoveryEnabled(true); // 設置每100毫秒嘗試恢復一次,默認是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL factory.setNetworkRecoveryInterval(100); factory.setTopologyRecoveryEnabled(false); // 4、使用連接集合裡面的地址獲取連接 connection = factory.newConnection(addresses, "生產者"); // 添加重連監聽器 ((Recoverable) connection).addRecoveryListener(new RecoveryListener() { /** * 重連成功後的回調 * @param recoverable */ public void handleRecovery(Recoverable recoverable) { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已重新建立連接!"); } /** * 開始重連時的回調 * @param recoverable */ public void handleRecoveryStarted(Recoverable recoverable) { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 開始嘗試重連!"); } }); // 5、從鏈接中創建通道 channel = connection.createChannel(); /** * 6、聲明(創建)隊列 * 如果隊列不存在,才會創建 * RabbitMQ 不允許聲明兩個隊列名相同,屬性不同的隊列,否則會報錯 * * queueDeclare參數說明: * @param queue 隊列名稱 * @param durable 隊列是否持久化 * @param exclusive 是否排他,即是否為私有的,如果為true,會對當前隊列加鎖,其它通道不能訪問,並且在連接關閉時會自動刪除,不受持久化和自動刪除的屬性控制 * @param autoDelete 是否自動刪除,當最後一個消費者斷開連接之後是否自動刪除 * @param arguments 隊列參數,設置隊列的有效期、消息最大長度、隊列中所有消息的生命周期等等 */ channel.exchangeDeclare("test-exchange", "fanout"); channel.queueDeclare("queue1", false, false, false, null); channel.queueBind("queue1", "test-exchange", "xxoo"); for (int i = 0; i < 100; i++) { // 消息內容 String message = "Hello World " + i; try { // 7、發送消息 channel.basicPublish("test-exchange", "queue1", null, message.getBytes()); } catch (AlreadyClosedException e) { // 可能連接已關閉,等待重連 System.out.println("消息 " + message + " 發送失敗!"); i--; TimeUnit.SECONDS.sleep(2); continue; } System.out.println("消息 " + i + " 已發送!"); TimeUnit.SECONDS.sleep(2); } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 8、關閉通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 9、關閉連接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class VirtualHosts { public static void main(String[] args) { // 1、創建連接工廠 ConnectionFactory factory = new ConnectionFactory(); // 2、設置連接屬性 factory.setUsername("order-user"); factory.setPassword("order-user"); factory.setVirtualHost("v1"); Connection connection = null; Channel prducerChannel = null; Channel consumerChannel = null; // 3、設置每個節點的鏈接地址和埠 Address[] addresses = new Address[]{ new Address("192.168.0.1", 5672), new Address("192.168.0.2", 5672) }; try { // 4、從連接工廠獲取連接 connection = factory.newConnection(addresses, "消費者"); // 5、從鏈接中創建通道 prducerChannel = connection.createChannel(); prducerChannel.exchangeDeclare("test-exchange", "fanout"); prducerChannel.queueDeclare("queue1", false, false, true, null); prducerChannel.queueBind("queue1", "test-exchange", "xxoo"); // 消息內容 String message = "Hello A"; prducerChannel.basicPublish("test-exchange", "c1", null, message.getBytes()); consumerChannel = connection.createChannel(); // 創建一個消費者對象 Consumer consumer = new DefaultConsumer(consumerChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("收到消息:" + new String(body, "UTF-8")); } }; consumerChannel.basicConsume("queue1", true, consumer); System.out.println("等待接收消息"); System.in.read(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 9、關閉通道 if (prducerChannel != null && prducerChannel.isOpen()) { try { prducerChannel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } // 10、關閉連接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
2.3、集群連接恢復
官方資料://www.rabbitmq.com/api-guide.html#connection-recovery;根據官方文檔說明可知:
- 通過 factory.setAutomaticRecoveryEnabled(true); 可以設置連接自動恢復的開關,默認已開啟
- 通過 factory.setNetworkRecoveryInterval(10000); 可以設置間隔多長時間嘗試恢復一次,默認是5秒: com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
- 如果啟用了自動連接恢復,將由以下事件觸發:
-
- 連接的I/O循環中拋出IOExceiption
- 讀取Socket套接字超時
- 檢測不到伺服器心跳
- 在連接的I/O循環中引發任何其他異常
- 如果客戶端第一次連接失敗,不會自動恢復連接。需要我們自己負責重試連接、記錄失敗的嘗試、實現重試次數的限制等等。
ConnectionFactory factory = new ConnectionFactory(); // configure various connection settings try { Connection conn = factory.newConnection(); } catch (java.net.ConnectException e) { Thread.sleep(5000); // apply retry logic }
-
- 如果程式中調用了 Connection.Close ,也不會自動恢復連接。
- 如果是 Channel-level 的異常,也不會自動恢復連接,因為這些異常通常是應用程式中存在語義問題(例如試圖從不存在的隊列消費)。
- 在Connection和Channel上,可以設置重新連接的監聽器,開始重連和重連成功時,會觸發監聽器。添加和移除監聽,需要將Connection或Channel強制轉換成Recoverable介面。
((Recoverable) connection).addRecoveryListener()
((Recoverable) connection).removeRecoveryListener()