RabbitMQ 安装延迟交换机delay-exchange
- 2020 年 3 月 11 日
- 筆記

安装RabbitMQ 延迟交换机 在三台节点上安装延迟交换机插件 1.进入RabbitMQ 插件目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins
2.下载延迟插件安装包
wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
3.解压延迟插件安装包
unzip rabbitmq_delayed_message_exchange--3.6.x.zip
将RAM 节点修改成DISK 节点 在 RAM 节点上,将节点修改成磁盘节点
rabbitmqctl stop_app rabbitmqctl change_cluster_node_type disk rabbitmqctl start_app
启动延迟交换机插件 在三台节点上启动延迟交换机插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
测试代码:
Producer:
import java.util.Date; import java.util.HashMap; import java.util.Map; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("172.31.1.135"); connectionFactory.setUsername("xx"); connectionFactory.setPassword("xx"); connectionFactory.setPort(); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "delay-exchange"; String routingkey = "delay.delay"; String queueName = "delay_queueName"; //x-delayed-message 声明 Map<String,Object> map =new HashMap<>(); map.put("x-delayed-type", "direct"); channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map); //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。 channel.queueDeclare(queueName, true, false, false, map); channel.queueBind(queueName,exchangeName,routingkey); for (int i = ; i < ; i++) { // deliveryMode=2 持久化,expiration 消息有效时间 String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime(); byte[] messageBodyBytes = msg.getBytes(); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", ); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes); } } }
Consumer:
import java.io.IOException; import java.util.Date; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("/"); connectionFactory.setHost("172.31.1.135"); connectionFactory.setPort(); connectionFactory.setUsername("xxx"); connectionFactory.setPassword("xxx"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String queueName = "delay_queueName"; channel.queueDeclare(queueName,true,false,false,null); channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String convernType = properties.getContentType(); long deliveryTag = envelope.getDeliveryTag(); System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime()); channel.basicAck(deliveryTag, false); } }); } }
执行结果:
routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c703b routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@cbc42f routingKey:delay.delay,convernType:null,deliveryTag:,Msg body:[B@c2f
欢迎关注:【程序员开发者社区】