RabbitMQ的web頁面介紹(三)

一、Virtual Hosts

每一個 RabbitMQ 伺服器都能創建虛擬的消息伺服器,我們稱之為虛擬主機 (virtual host) ,簡稱為vhost。每一個 vhost 本質上是一個獨立的小型 RabbitMQ 伺服器,擁有自己獨立的隊列、交換器及綁定關係等,井且它擁有自己獨立的許可權。vhost 就像是虛擬機與物理伺服器一樣,它們在各個實例間提供邏輯上的分離,為不同程式安全保密地運行數據,它既能將同一個RabbitMQ 中的眾多客戶區分開,又可以避免隊列和交換器等命名衝突。vhost 之間是絕對隔離的,無法將 vhostl 中的交換器與 vhost2 中的隊列進行綁定,這樣既保證了安全性,又可以確保可移植性。如果在使用 RabbitMQ 達到一定規模的時候,建議用戶對業務功能、場景進行歸類區分,並為之分配獨立的 vhost。

1.1、Virtual Hosts 的功能說明

vhost可以限制最大連接數和最大隊列數,並且可以設置vhost下的用戶資源許可權和Topic許可權,具體許可權見下方說明。
  • 在 Admin -> Limits 頁面可以設置vhost的最大連接數和最大隊列數,達到限制後,繼續創建,將會報錯。
  • 用戶資源許可權是指RabbitMQ 用戶在客戶端執行AMQP操作命令時,擁有對資源的操作和使用許可權。許可權分為三個部分: configure、write、read ,見下方表格說明。參考://www.rabbitmq.com/access-control.html#permissions
AMQP 0-9-1 Operation   configure write read
exchange.declare (passive=false) exchange    
exchange.declare (passive=true)      
exchange.declare (with [AE](ae.html)) exchange exchange (AE) exchange
exchange.delete   exchange    
queue.declare (passive=false) queue    
queue.declare (passive=true)      
queue.declare (with [DLX](dlx.html)) queue exchange (DLX) queue
queue.delete   queue    
exchange.bind     exchange (destination) exchange (source)
exchange.unbind     exchange (destination) exchange (source)
queue.bind     queue exchange
queue.unbind     queue exchange
basic.publish     exchange  
basic.get       queue
basic.consume       queue
queue.purge       queue
 
舉例說明:
    • 比如創建隊列時,會調用 queue.declare 方法,此時會使用到 configure 許可權,會校驗隊列名是否與 configure 的表達式匹配。
    • 比如隊列綁定交換器時,會調用 queue.bind 方法,此時會用到 write 和 read 許可權,會檢驗隊列名是否與 write 的表達式匹配,交換器名是否與 read 的表達式匹配。
    • Topic許可權是RabbitMQ 針對STOMP和MQTT等協議實現的一種許可權。由於這類協議都是基於Topic消費的,而AMQP是基於Queue消費,所以AMQP的標準資源許可權不適合用在這類協議中,而Topic許可權也不適用於AMQP協議。所以,我們一般不會去使用它,只用在使用了MQTT這類的協議時才可能會用到。

2.2、vhost使用示例

1. 使用管理員用戶登錄Web管理介面。
2.頁面添加一個名為 v1 的Virtual Hosts。(此時還需要為此vhost分配用戶,添加一個新用戶)

3.在 Admin -> Users 頁面添加一個名為 order-user 的用戶,並設置為 management 角色。

 

4. 從 Admin 進入 order-user 的用戶設置介面,在 Permissions 中,為用戶分配vhost為/v1,並為每種許可權設置需要匹配的目標名稱的正則表達式。
 

 

 

欄位名
值 
說明 
Virtual Host 
/v1 
指定用戶的vhost,以下許可權都只限於 /v1 vhost中
Configure regexp
eq-.*
只能操作名稱以eq-開頭的exchange或queue;為空則不能操作任何exchange和queue 
Write regexp 
.*
能夠發送消息到任意名稱的exchange,並且能綁定到任意名稱的隊列和任意名稱的目標交
換器(指交換器綁定到交換器),為空表示沒有許可權 
Read regexp 
^test$ 
只能消費名為test隊列上的消息,並且只能綁定到名為test的交換器
5.程式碼演示
public class Producer {
    public static void main(String[] args) {
        // 1、創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設置連接屬性
        factory.setUsername("order-user");
        factory.setPassword("order-user");
        factory.setVirtualHost("v1");

        Connection connection = null;
        Channel channel = null;

        // 3、設置每個節點的鏈接地址和埠
        Address[] addresses = new Address[]{
                new Address("192.168.0.1", 5672),
                new Address("192.168.0.2", 5672)
        };

        try {
            // 開啟/關閉連接自動恢復,默認是開啟狀態
            factory.setAutomaticRecoveryEnabled(true);

            // 設置每100毫秒嘗試恢復一次,默認是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
            factory.setNetworkRecoveryInterval(100);

            factory.setTopologyRecoveryEnabled(false);

            // 4、使用連接集合裡面的地址獲取連接
            connection = factory.newConnection(addresses, "生產者");

            // 添加重連監聽器
            ((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
                /**
                 * 重連成功後的回調
                 * @param recoverable
                 */
                public void handleRecovery(Recoverable recoverable) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已重新建立連接!");
                }

                /**
                 * 開始重連時的回調
                 * @param recoverable
                 */
                public void handleRecoveryStarted(Recoverable recoverable) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 開始嘗試重連!");
                }
            });

            // 5、從鏈接中創建通道
            channel = connection.createChannel();

            /**
             * 6、聲明(創建)隊列
             * 如果隊列不存在,才會創建
             * RabbitMQ 不允許聲明兩個隊列名相同,屬性不同的隊列,否則會報錯
             *
             * queueDeclare參數說明:
             * @param queue 隊列名稱
             * @param durable 隊列是否持久化
             * @param exclusive 是否排他,即是否為私有的,如果為true,會對當前隊列加鎖,其它通道不能訪問,並且在連接關閉時會自動刪除,不受持久化和自動刪除的屬性控制
             * @param autoDelete 是否自動刪除,當最後一個消費者斷開連接之後是否自動刪除
             * @param arguments 隊列參數,設置隊列的有效期、消息最大長度、隊列中所有消息的生命周期等等
             */
            channel.exchangeDeclare("test-exchange", "fanout");
            channel.queueDeclare("queue1", false, false, false, null);
            channel.queueBind("queue1", "test-exchange", "xxoo");
            for (int i = 0; i < 100; i++) {
                // 消息內容
                String message = "Hello World " + i;
                try {
                    // 7、發送消息
                    channel.basicPublish("test-exchange", "queue1", null, message.getBytes());
                } catch (AlreadyClosedException e) {
                    // 可能連接已關閉,等待重連
                    System.out.println("消息 " + message + " 發送失敗!");
                    i--;
                    TimeUnit.SECONDS.sleep(2);
                    continue;
                }
                System.out.println("消息 " + i + " 已發送!");
                TimeUnit.SECONDS.sleep(2);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 8、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 9、關閉連接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
public class VirtualHosts {
    public static void main(String[] args) {
        // 1、創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設置連接屬性
        factory.setUsername("order-user");
        factory.setPassword("order-user");
        factory.setVirtualHost("v1");

        Connection connection = null;
        Channel prducerChannel = null;
        Channel consumerChannel = null;

        // 3、設置每個節點的鏈接地址和埠
        Address[] addresses = new Address[]{
                new Address("192.168.0.1", 5672),
                new Address("192.168.0.2", 5672)
        };

        try {
            // 4、從連接工廠獲取連接
            connection = factory.newConnection(addresses, "消費者");

            // 5、從鏈接中創建通道
            prducerChannel = connection.createChannel();

            prducerChannel.exchangeDeclare("test-exchange", "fanout");
            prducerChannel.queueDeclare("queue1", false, false, true, null);
            prducerChannel.queueBind("queue1", "test-exchange", "xxoo");
            // 消息內容
            String message = "Hello A";
            prducerChannel.basicPublish("test-exchange", "c1", null, message.getBytes());

            consumerChannel = connection.createChannel();
            // 創建一個消費者對象
            Consumer consumer = new DefaultConsumer(consumerChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("收到消息:" + new String(body, "UTF-8"));
                }
            };
            consumerChannel.basicConsume("queue1", true, consumer);

            System.out.println("等待接收消息");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 9、關閉通道
            if (prducerChannel != null && prducerChannel.isOpen()) {
                try {
                    prducerChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 10、關閉連接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

 

 

 2.3、集群連接恢復

官方資料://www.rabbitmq.com/api-guide.html#connection-recovery;根據官方文檔說明可知:
  • 通過 factory.setAutomaticRecoveryEnabled(true); 可以設置連接自動恢復的開關,默認已開啟
  • 通過 factory.setNetworkRecoveryInterval(10000); 可以設置間隔多長時間嘗試恢復一次,默認是5秒: com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL 
什麼時候會觸發連接恢復?//www.rabbitmq.com/api-guide.html#recovery-triggers
  • 如果啟用了自動連接恢復,將由以下事件觸發:
    • 連接的I/O循環中拋出IOExceiption
    • 讀取Socket套接字超時
    • 檢測不到伺服器心跳
    • 在連接的I/O循環中引發任何其他異常
  • 如果客戶端第一次連接失敗,不會自動恢復連接。需要我們自己負責重試連接、記錄失敗的嘗試、實現重試次數的限制等等。
ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings

try {
  Connection conn = factory.newConnection();
} catch (java.net.ConnectException e) {
  Thread.sleep(5000);
  // apply retry logic
}
    • 如果程式中調用了 Connection.Close ,也不會自動恢復連接。
    • 如果是 Channel-level 的異常,也不會自動恢復連接,因為這些異常通常是應用程式中存在語義問題(例如試圖從不存在的隊列消費)。
  • 在Connection和Channel上,可以設置重新連接的監聽器,開始重連和重連成功時,會觸發監聽器。添加和移除監聽,需要將Connection或Channel強制轉換成Recoverable介面。
((Recoverable) connection).addRecoveryListener() 
((Recoverable) connection).removeRecoveryListener()

  git源碼://gitee.com/TongHuaShuShuoWoDeJieJu/rabbit.git

Tags: