SpringBoot之ActiveMQ实现延迟消息

  • 2019 年 11 月 12 日
  • 筆記

一、安装activeMQ

​ 安装步骤参照网上教程,本文不做介绍

二、修改activeMQ配置文件

​ broker新增配置信息 schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >            <destinationPolicy>              <policyMap>                <policyEntries>                  <policyEntry topic=">" >                      <!-- The constantPendingMessageLimitStrategy is used to prevent                           slow topic consumers to block producers and affect other consumers                           by limiting the number of messages that are retained                           For more information, see:                             http://activemq.apache.org/slow-consumer-handling.html                        -->                    <pendingMessageLimitStrategy>                      <constantPendingMessageLimitStrategy limit="1000"/>                    </pendingMessageLimitStrategy>                  </policyEntry>                </policyEntries>              </policyMap>          </destinationPolicy>

三、创建SpringBoot工程

file

  1. 配置ActiveMQ工厂信息,信任包必须配置否则会报错
package com.example.demoactivemq.config;    import org.apache.activemq.ActiveMQConnectionFactory;  import org.apache.activemq.RedeliveryPolicy;  import org.springframework.beans.factory.annotation.Value;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;    import java.util.ArrayList;  import java.util.List;    /**   * @author shanks on 2019-11-12   */  @Configuration  public class ActiveMqConfig {        @Bean      public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);          // 设置信任序列化包集合          List<String> models = new ArrayList<>();          models.add("com.example.demoactivemq.domain");          factory.setTrustedPackages(models);            return factory;      }    }  
  1. 消息实体类
package com.example.demoactivemq.domain;    import lombok.Builder;  import lombok.Data;    import java.io.Serializable;    /**   * @author shanks on 2019-11-12   */    @Builder  @Data  public class MessageModel implements Serializable {      private String titile;      private String message;  }  
  1. 生产者
package com.example.demoactivemq.producer;      import lombok.extern.slf4j.Slf4j;  import org.apache.activemq.ScheduledMessage;  import org.apache.activemq.command.ActiveMQQueue;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.boot.autoconfigure.jms.JmsProperties;  import org.springframework.jms.core.JmsMessagingTemplate;  import org.springframework.stereotype.Service;    import javax.jms.*;  import java.io.Serializable;      /**   * 消息生产者   *   * @author shanks   */  @Service  @Slf4j  public class Producer {        public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");        @Autowired      private JmsMessagingTemplate template;        /**       * 发送消息       *       * @param destination destination是发送到的队列       * @param message     message是待发送的消息       */      public <T extends Serializable> void send(Destination destination, T message) {          template.convertAndSend(destination, message);      }        /**       * 延时发送       *       * @param destination 发送的队列       * @param data        发送的消息       * @param time        延迟时间       */      public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {          Connection connection = null;          Session session = null;          MessageProducer producer = null;          // 获取连接工厂          ConnectionFactory connectionFactory = template.getConnectionFactory();          try {              // 获取连接              connection = connectionFactory.createConnection();              connection.start();              // 获取session,true开启事务,false关闭事务              session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);              // 创建一个消息队列              producer = session.createProducer(destination);              producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());              ObjectMessage message = session.createObjectMessage(data);              //设置延迟时间              message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);              // 发送消息              producer.send(message);              log.info("发送消息:{}", data);              session.commit();          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  if (producer != null) {                      producer.close();                  }                  if (session != null) {                      session.close();                  }                  if (connection != null) {                      connection.close();                  }              } catch (Exception e) {                  e.printStackTrace();              }          }      }  }  
  1. 消费者
package com.example.demoactivemq.producer;      import com.example.demoactivemq.domain.MessageModel;  import lombok.extern.slf4j.Slf4j;  import org.springframework.jms.annotation.JmsListener;  import org.springframework.stereotype.Component;    /**   * 消费者   */  @Component  @Slf4j  public class Consumer {          @JmsListener(destination = "delay.queue")      public void receiveQueue(MessageModel message) {          log.info("收到消息:{}", message);      }  }  
  1. application.yml
spring:    activemq:      broker-url: tcp://localhost:61616
  1. 测试类
package com.example.demoactivemq;    import com.example.demoactivemq.domain.MessageModel;  import com.example.demoactivemq.producer.Producer;  import org.junit.jupiter.api.Test;  import org.junit.runner.RunWith;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.boot.test.context.SpringBootTest;  import org.springframework.test.context.junit4.SpringRunner;    @SpringBootTest(classes = DemoActivemqApplication.class)  @RunWith(SpringRunner.class)  class DemoActivemqApplicationTests {        /**       * 消息生产者       */      @Autowired      private Producer producer;        /**       * 及时消息队列测试       */      @Test      public void test() {          MessageModel messageModel = MessageModel.builder()                  .message("测试消息")                  .titile("消息000")                  .build();          // 发送消息          producer.send(Producer.DEFAULT_QUEUE, messageModel);      }        /**       * 延时消息队列测试       */      @Test      public void test2() {          for (int i = 0; i < 5; i++) {              MessageModel messageModel = MessageModel.builder()                      .titile("延迟10秒执行")                      .message("测试消息" + i)                      .build();              // 发送延迟消息              producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);          }          try {              // 休眠100秒,等等消息执行              Thread.currentThread().sleep(100000L);          } catch (InterruptedException e) {              e.printStackTrace();          }      }    }  

执行结果

2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息0)  2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息1)  2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息2)  2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息3)  2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 发送消息:MessageModel(titile=延迟10秒执行, message=测试消息4)  2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息0)  2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息1)  2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息2)  2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息3)  2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延迟10秒执行, message=测试消息4)

比你优秀的人比你还努力,你有什么资格不去奋斗!!!