­

RabbitMQ 安装延迟交换机delay-exchange

  • 2020 年 3 月 11 日
  • 筆記

安装RabbitMQ 延迟交换机 在三台节点上安装延迟交换机插件 1.进入RabbitMQ 插件目录

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins  

2.下载延迟插件安装包

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip  

3.解压延迟插件安装包

unzip rabbitmq_delayed_message_exchange--3.6.x.zip  

将RAM 节点修改成DISK 节点 在 RAM 节点上,将节点修改成磁盘节点

rabbitmqctl stop_app    rabbitmqctl change_cluster_node_type disk    rabbitmqctl start_app  

启动延迟交换机插件 在三台节点上启动延迟交换机插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange  

测试代码

Producer:

import java.util.Date;  import java.util.HashMap;  import java.util.Map;    import com.rabbitmq.client.AMQP;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;      public class Producer {      public static void main(String[] args) throws Exception {          ConnectionFactory connectionFactory = new ConnectionFactory();          connectionFactory.setVirtualHost("/");          connectionFactory.setHost("172.31.1.135");          connectionFactory.setUsername("xx");          connectionFactory.setPassword("xx");          connectionFactory.setPort();          Connection connection = connectionFactory.newConnection();          Channel channel = connection.createChannel();          String exchangeName = "delay-exchange";          String routingkey = "delay.delay";            String queueName = "delay_queueName";          //x-delayed-message 声明          Map<String,Object> map =new HashMap<>();          map.put("x-delayed-type", "direct");          channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map);          //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。          channel.queueDeclare(queueName, true, false, false, map);          channel.queueBind(queueName,exchangeName,routingkey);          for (int i = ; i < ; i++) {               // deliveryMode=2 持久化,expiration 消息有效时间              String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime();              byte[] messageBodyBytes = msg.getBytes();              Map<String, Object> headers = new HashMap<String, Object>();              headers.put("x-delay", );              AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);              channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes);          }      }  }  

Consumer:

import java.io.IOException;  import java.util.Date;    import com.rabbitmq.client.AMQP.BasicProperties;  import com.rabbitmq.client.Channel;  import com.rabbitmq.client.Connection;  import com.rabbitmq.client.ConnectionFactory;  import com.rabbitmq.client.DefaultConsumer;  import com.rabbitmq.client.Envelope;      public class Consumer {      public static void main(String[] args) throws Exception {          ConnectionFactory connectionFactory = new ConnectionFactory();          connectionFactory.setVirtualHost("/");          connectionFactory.setHost("172.31.1.135");          connectionFactory.setPort();          connectionFactory.setUsername("xxx");          connectionFactory.setPassword("xxx");          Connection connection = connectionFactory.newConnection();          Channel channel = connection.createChannel();          String queueName = "delay_queueName";          channel.queueDeclare(queueName,true,false,false,null);            channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){              @Override              public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {                    String routingKey = envelope.getRoutingKey();                    String convernType = properties.getContentType();                    long deliveryTag = envelope.getDeliveryTag();                    System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime());                    channel.basicAck(deliveryTag, false);              }            });      }  }  

执行结果:

routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b  routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f  routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f  routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b  routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f  routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f  

欢迎关注:【程序员开发者社区】