消息隊列之RabbitMQ

  • 2021 年 1 月 17 日
  • 筆記

1.rabbitMQ介紹

rabbitMQ是由erlang語言開發的,基於AMQP協議實現的消息隊列。他是一種應用程式之間的通訊方法,在分散式系統開發中應用非常廣泛。

rabbitMq的有點:

  1. 使用簡單,功能強大
  2. 基於AMQP協議
  3. 社區活躍,文檔完善
  4. 高並發性能好,erlang語言是專門用於開發高並發程式的
  5. springBoot默認集成rabbitMq

AMQP(advanced Message Queuing Protocol),是一個提供統一消息服務的應用標準高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端中間件的產品不同和開發語言不同的限制。JMS和AMQP的區別在於:JMS是java語言專屬的消息服務標準,他是在api層定義標準,並且只能用於java應用,而AMQP是在協議層定義的標準,是可以跨語言的。

2.工作流程

發送消息:

  1. 生產者和broker建立TCP連接
  2. 生產者和broker建立通道
  3. 生產者通過通道消息發送給broker,由exchange將消息轉發
  4. exchange將消息轉發給指定的queue

接受消息:

  1. 消費者和broker建立TCP連接
  2. 消費者和broker建立通道
  3. 消費者監聽指定的queue
  4. 當有消息到達queue的時候broker默認將消息推送給消費者
  5. 消費者接受到消息並消費

3.安裝

如果不想自己下載,需要我這裡的軟體的,可以在下面評論郵箱,我私發給你。

1.安裝erlang的環境,雙擊otp的運行程式,然後一路點擊下一步(next)。

配置環境變數

在path中添加erlang的路徑

2.安裝rabbitMq,雙擊rabbitmq的運行程式

安裝完成之後在菜單頁面可以看到

安裝完RabbitMQ如果想要訪問管理頁面需要在rabbitmq的sbin目錄中使用cmd執行:rabbitmq-plugins.bat enable rabbitmq_management(管理員身份運行此命令)添加可視化插件。

點擊上圖中的start/stop來開啟/停止服務。然後在瀏覽器上輸入地址查看,rabbitMq的默認埠是15672。默認的用戶名和密碼都是guest

如果安裝失敗,需要卸載重裝的時候或者出現rabbitMq服務註冊失敗時,此時需要進入註冊表清理erlang(搜索rabbitMQ,erlsrv將對應的項刪除)

4.程式碼實現

1.添加依賴

<!--添加rabbitMq的依賴-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.7.3</version>
</dependency>

2.生產者程式碼實現

package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.net.ConnectException;
import java.util.concurrent.TimeoutException;

/**
 * @className: producer
 * @description: rabbitmq的生產者程式碼實現
 * @author: charon
 * @create: 2021-01-03 23:10
 */
public class Producer {
    /**
     * 聲明隊列名
     */
    private static final String QUEUE = "hello charon";

    public static void main(String[] args) {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 創建通道
            channel = connection.createChannel();
            // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
            channel.queueDeclare(QUEUE, true, false, false, null);
            String message = "hello charon good evening";
            // 發布消息(交換機,RoutingKey即隊列名,額外的消息屬性,消息內容)
            channel.basicPublish("", QUEUE, null, message.getBytes());
            System.out.println("發送消息給mq:" + message);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 關閉資源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

3.消費者程式碼實現

package rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Consumer
 * @description: 消費者的程式碼實現
 * @author: charon
 * @create: 2021-01-05 08:28
 */
public class Consumer {
    /**
     * 聲明隊列名
     */
    private static final String QUEUE = "hello charon";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
        channel.queueDeclare(QUEUE, true, false, false, null);
        // 實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消費者標籤
             * @param envelope 信封,可以獲取交換機等資訊
             * @param properties 消息屬性
             * @param body 消費內容,位元組數組,可以轉成字元串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String exchange = envelope.getExchange();
//                long deliveryTag = envelope.getDeliveryTag();
                String message = new String(body,"utf-8");
                System.out.println("收到的消息是:"+message);
            }
        };
        // 消費消息(隊列名,是否自動確認,消費方法)
        channel.basicConsume(QUEUE,true,defaultConsumer);
    }
}

5.rabbitMq的工作模式

  1. Work queues 工作隊列(資源競爭)

​ 生產者將消息放入到隊列中,消費者可以有多個,同時監聽同一個隊列。如上圖,消費者c1,c2共同爭搶當前消息隊列的內容,誰先拿到誰負責消費消息,缺點是在高並發的情況下,默認會產品一個消息被多個消費者共同使用,可以設置一個鎖開關,保證一條消息只能被一個消費者使用。

上面的程式碼,可以再添加一個消費者,這樣就可以實現工作隊列的工作模式。

2.Publish/Subscribe 發布訂閱(共享資源)

X代表rabbitMq內部組件交換機,生產者將消息放入交換機,交換機發布訂閱把消息發送到所有消息隊列中,對應的消費者拿到消息進行消費,對比工作隊列而言,發布訂閱可以實現工作隊列的功能,但是比工作隊列更強大。

特點:
1.每個消費者監聽自己的隊列
2.生產者將消息發送給Broker,由交換機將消息轉發到綁定的此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息;

生產者:

package rabbitmq.publish;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Producer
 * @description: 發布訂閱的生產者
 * @author: charon
 * @create: 2021-01-07 22:02
 */
public class Producer {

    /**郵件的隊列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**簡訊的隊列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交換機*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 創建通道
            channel = connection.createChannel();
            // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
            channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
            // 交換機(交換機名稱,交換機類型(fanout:發布訂閱,direct:routing,topic:主題,headers:header模式))
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
            // 綁定交換機(隊列名稱,交換機名稱,routingKey(發布訂閱設置為空))
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");
            // 發送多條消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by publish";
                // 指定交換機(交換機,RoutingKey即隊列名,額外的消息屬性,消息內容)
                channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
                System.out.println("發送消息給mq:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 關閉資源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消費email的消費者:

package rabbitmq.publish;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 郵件的消息消費者
 * @author: charon
 * @create: 2021-01-07 22:14
 */
public class EmailConsumer {

    /**郵件的隊列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**簡訊的隊列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交換機*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        // 實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消費者標籤
             * @param envelope 信封,可以獲取交換機等資訊
             * @param properties 消息屬性
             * @param body 消費內容,位元組數組,可以轉成字元串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                String exchange = envelope.getExchange();
//                long deliveryTag = envelope.getDeliveryTag();
                String message = new String(body,"utf-8");
                System.out.println("收到的email消息是:"+message);
            }
        };
        // 消費消息(隊列名,是否自動確認,消費方法)
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);
    }
}

消費簡訊的消費者:

package rabbitmq.publish;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: SmsConsumer
 * @description:
 * @author: charon
 * @create: 2021-01-07 22:17
 */
public class SmsConsumer {


    /**郵件的隊列*/
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

    /**簡訊的隊列*/
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

    /**交換機*/
    public static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
        // 實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消費者標籤
             * @param envelope 信封,可以獲取交換機等資訊
             * @param properties 消息屬性
             * @param body 消費內容,位元組數組,可以轉成字元串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的簡訊消息是:"+message);
            }
        };
        // 消費消息(隊列名,是否自動確認,消費方法)
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);
    }
}

3.Routing 路由模式

生產者將消息發送給交換機按照路由判斷,交換機根據路由的key,只能匹配上路由key的對應的消息隊列,對應的消費者才能消費消息。

如上圖,rabbitMq根據對應的key,將消息發送到對應的隊列中,error通知將發送到amqp.gen-S9b上,由消費者c1消費。error,info,warning通知將發送到amqp.gen-Ag1上,由消費者c2消費。

特點:
1.每個消費者監聽自己的隊列,並且設置路由key
2.生產者將消息發送給交換機,由交換機根據路由key來轉發消息到指定的隊列

生產者:

package rabbitmq.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: Producer
 * @description: 路由模式下的生成者
 * @author: charon
 * @create: 2021-01-07 22:34
 */
public class Producer {

    /**郵件的隊列*/
    public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";

    /**簡訊的隊列*/
    public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";

    /**交換機*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 設置email的路由key */
    public static final String ROUTING_EMAIL = "routing_email";

    /** 設置sms的路由key */
    public static final String ROUTING_SMS = "routing_sms";

    public static void main(String[] args) {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            // 創建通道
            channel = connection.createChannel();
            // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
            channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
            channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
            // 交換機(交換機名稱,交換機類型(fanout:發布訂閱,direct:routing,topic:主題,headers:header模式))
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
            // 綁定交換機(隊列名稱,交換機名稱,routingKey)
            channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
            channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
            // 發送多條消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by routing --email";
                // 指定交換機(交換機,RoutingKey,額外的消息屬性,消息內容)
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_EMAIL, null, message.getBytes());
                System.out.println("發送消息給mq:" + message);
            }
            // 發送多條消息
            for (int i = 0; i < 5; i++) {
                String message = "hello charon good evening by routing --sms";
                // 指定交換機(交換機,RoutingKey,額外的消息屬性,消息內容)
                channel.basicPublish(EXCHANGE_ROUTING_INFORM, ROUTING_SMS, null, message.getBytes());
                System.out.println("發送消息給mq:" + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 關閉資源
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

消費email的消費者:

package rabbitmq.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 路由模式下的email消費者
 * @author: charon
 * @create: 2021-01-07 22:40
 */
public class EmailConsumer {
    /**郵件的隊列*/
    public static final String QUEUE_ROUTING_EMAIL = "queue_routing_email";

    /**交換機*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 設置email的路由key */
    public static final String ROUTING_EMAIL = "routing_email";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
        channel.queueDeclare(QUEUE_ROUTING_EMAIL, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        // 綁定隊列並指明路由key
        channel.queueBind(QUEUE_ROUTING_EMAIL,EXCHANGE_ROUTING_INFORM,ROUTING_EMAIL);
        // 實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消費者標籤
             * @param envelope 信封,可以獲取交換機等資訊
             * @param properties 消息屬性
             * @param body 消費內容,位元組數組,可以轉成字元串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的email消息是:"+message);
            }
        };
        // 消費消息(隊列名,是否自動確認,消費方法)
        channel.basicConsume(QUEUE_ROUTING_EMAIL,true,defaultConsumer);
    }
}

消費簡訊的消費者:

package rabbitmq.routing;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @className: EmailConsumer
 * @description: 路由模式下的email消費者
 * @author: charon
 * @create: 2021-01-07 22:40
 */
public class SmsConsumer {
    /**郵件的隊列*/
    public static final String QUEUE_ROUTING_SMS = "queue_routing_sms";

    /**交換機*/
    public static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";

    /** 設置email的路由key */
    public static final String ROUTING_SMS = "routing_sms";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 創建連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 設置ip,埠,因為是本機,所以直接設置為127.0.0.1
        connectionFactory.setHost("127.0.0.1");
        // web埠默認為15672,通訊埠為5672
        connectionFactory.setPort(5672);
        // 設置用戶名和密碼
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 設置虛擬ip,默認為/,一個rabbitmq的服務可以設置多個虛擬機,每個虛擬機就相當於一個獨立的mq
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        // 創建通道
        Channel channel = connection.createChannel();
        // 聲明隊列(隊列名稱,是否持久化,是否排它,是否自動刪除,隊列的擴展參數比如設置存活時間等)
        channel.queueDeclare(QUEUE_ROUTING_SMS, true, false, false, null);
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        // 綁定隊列並指明路由key
        channel.queueBind(QUEUE_ROUTING_SMS,EXCHANGE_ROUTING_INFORM,ROUTING_SMS);
        // 實現消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             *
             * @param consumerTag 消費者標籤
             * @param envelope 信封,可以獲取交換機等資訊
             * @param properties 消息屬性
             * @param body 消費內容,位元組數組,可以轉成字元串
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body,"utf-8");
                System.out.println("收到的簡訊消息是:"+message);
            }
        };
        // 消費消息(隊列名,是否自動確認,消費方法)
        channel.basicConsume(QUEUE_ROUTING_SMS,true,defaultConsumer);
    }
}

4.Topic 主題模式

  1. 星號井號代表通配符
  2. 星號代表一個單詞,井號代表一個或多個單詞
  3. 路由功能添加模糊匹配
  4. 消息產生者產生消息,把消息交給交換機
  5. 交換機根據key的規則模糊匹配到對應的隊列,由隊列的監聽消費者接收消息消費

特點:
1.每個消費者監聽自己的隊列,並且設置帶通配符的routingkey
2.生產者將消息發送給broker,由交換機及根據路由key來轉發消息到指定的隊列

5.Header 轉發器

取消了路由key,使用header中的key/value(鍵值對)來匹配隊列。

6.RPC 遠程調用

基於direct類型交換機實現。生產者將消息遠程發送給rpc隊列,消費者監聽rpc消息隊列的消息並消息,然後將返回結果放入到響應隊列中,生產者監聽響應隊列中的消息,拿到消費者的處理結果,實現遠程RPC遠程調用。

參考文件:

//www.cnblogs.com/Jeely/p/10784013.html
//lovnx.blog.csdn.net/article/details/70991021