rocketmq廣播消息的(五)

一、簡介

廣播消費指的是:一條消息被多個consumer消費,即使這些consumer屬於同一個ConsumerGroup,消息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中ConsumerGroup概念可以認為在消息劃分方面無意義。

二、程式碼

/**
 * 發布訂閱消息生產者
 */
public class BroadcastProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
        // 1. 創建生產者對象
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2. 設置NameServer的地址,如果設置了環境變數NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr("192.168.32.128:9876");

        // 3. 啟動生產者
        producer.start();

        // 4. 生產者發送消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult result = producer.send(message);

            System.out.printf("發送結果:%s%n", result);
        }

        // 5. 停止生產者
        producer.shutdown();
    }
}

 

/**
 * 發布訂閱消息生產者
 */
public class BroadcastProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException, UnsupportedEncodingException {
        // 1. 創建生產者對象
        DefaultMQProducer producer = new DefaultMQProducer("GROUP_TEST");

        // 2. 設置NameServer的地址,如果設置了環境變數NAMESRV_ADDR,可以省略此步
        producer.setNamesrvAddr("192.168.32.128:9876");

        // 3. 啟動生產者
        producer.start();

        // 4. 生產者發送消息
        for (int i = 0; i < 10; i++) {
            Message message = new Message("TopicTest", "TagA", "OrderID_" + i, ("Hello Broadcast:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

            SendResult result = producer.send(message);

            System.out.printf("發送結果:%s%n", result);
        }

        // 5. 停止生產者
        producer.shutdown();
    }
}

 

Tags: