SpringBoot之ActiveMQ實現延遲消息

  • 2019 年 11 月 12 日
  • 筆記

一、安裝activeMQ

​ 安裝步驟參照網上教程,本文不做介紹

二、修改activeMQ配置文件

​ broker新增配置資訊 schedulerSupport="true"

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true" >            <destinationPolicy>              <policyMap>                <policyEntries>                  <policyEntry topic=">" >                      <!-- The constantPendingMessageLimitStrategy is used to prevent                           slow topic consumers to block producers and affect other consumers                           by limiting the number of messages that are retained                           For more information, see:                             http://activemq.apache.org/slow-consumer-handling.html                        -->                    <pendingMessageLimitStrategy>                      <constantPendingMessageLimitStrategy limit="1000"/>                    </pendingMessageLimitStrategy>                  </policyEntry>                </policyEntries>              </policyMap>          </destinationPolicy>

三、創建SpringBoot工程

file

  1. 配置ActiveMQ工廠資訊,信任包必須配置否則會報錯
package com.example.demoactivemq.config;    import org.apache.activemq.ActiveMQConnectionFactory;  import org.apache.activemq.RedeliveryPolicy;  import org.springframework.beans.factory.annotation.Value;  import org.springframework.context.annotation.Bean;  import org.springframework.context.annotation.Configuration;    import java.util.ArrayList;  import java.util.List;    /**   * @author shanks on 2019-11-12   */  @Configuration  public class ActiveMqConfig {        @Bean      public ActiveMQConnectionFactory factory(@Value("${spring.activemq.broker-url}") String url){          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);          // 設置信任序列化包集合          List<String> models = new ArrayList<>();          models.add("com.example.demoactivemq.domain");          factory.setTrustedPackages(models);            return factory;      }    }  
  1. 消息實體類
package com.example.demoactivemq.domain;    import lombok.Builder;  import lombok.Data;    import java.io.Serializable;    /**   * @author shanks on 2019-11-12   */    @Builder  @Data  public class MessageModel implements Serializable {      private String titile;      private String message;  }  
  1. 生產者
package com.example.demoactivemq.producer;      import lombok.extern.slf4j.Slf4j;  import org.apache.activemq.ScheduledMessage;  import org.apache.activemq.command.ActiveMQQueue;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.boot.autoconfigure.jms.JmsProperties;  import org.springframework.jms.core.JmsMessagingTemplate;  import org.springframework.stereotype.Service;    import javax.jms.*;  import java.io.Serializable;      /**   * 消息生產者   *   * @author shanks   */  @Service  @Slf4j  public class Producer {        public static final Destination DEFAULT_QUEUE = new ActiveMQQueue("delay.queue");        @Autowired      private JmsMessagingTemplate template;        /**       * 發送消息       *       * @param destination destination是發送到的隊列       * @param message     message是待發送的消息       */      public <T extends Serializable> void send(Destination destination, T message) {          template.convertAndSend(destination, message);      }        /**       * 延時發送       *       * @param destination 發送的隊列       * @param data        發送的消息       * @param time        延遲時間       */      public <T extends Serializable> void delaySend(Destination destination, T data, Long time) {          Connection connection = null;          Session session = null;          MessageProducer producer = null;          // 獲取連接工廠          ConnectionFactory connectionFactory = template.getConnectionFactory();          try {              // 獲取連接              connection = connectionFactory.createConnection();              connection.start();              // 獲取session,true開啟事務,false關閉事務              session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);              // 創建一個消息隊列              producer = session.createProducer(destination);              producer.setDeliveryMode(JmsProperties.DeliveryMode.PERSISTENT.getValue());              ObjectMessage message = session.createObjectMessage(data);              //設置延遲時間              message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);              // 發送消息              producer.send(message);              log.info("發送消息:{}", data);              session.commit();          } catch (Exception e) {              e.printStackTrace();          } finally {              try {                  if (producer != null) {                      producer.close();                  }                  if (session != null) {                      session.close();                  }                  if (connection != null) {                      connection.close();                  }              } catch (Exception e) {                  e.printStackTrace();              }          }      }  }  
  1. 消費者
package com.example.demoactivemq.producer;      import com.example.demoactivemq.domain.MessageModel;  import lombok.extern.slf4j.Slf4j;  import org.springframework.jms.annotation.JmsListener;  import org.springframework.stereotype.Component;    /**   * 消費者   */  @Component  @Slf4j  public class Consumer {          @JmsListener(destination = "delay.queue")      public void receiveQueue(MessageModel message) {          log.info("收到消息:{}", message);      }  }  
  1. application.yml
spring:    activemq:      broker-url: tcp://localhost:61616
  1. 測試類
package com.example.demoactivemq;    import com.example.demoactivemq.domain.MessageModel;  import com.example.demoactivemq.producer.Producer;  import org.junit.jupiter.api.Test;  import org.junit.runner.RunWith;  import org.springframework.beans.factory.annotation.Autowired;  import org.springframework.boot.test.context.SpringBootTest;  import org.springframework.test.context.junit4.SpringRunner;    @SpringBootTest(classes = DemoActivemqApplication.class)  @RunWith(SpringRunner.class)  class DemoActivemqApplicationTests {        /**       * 消息生產者       */      @Autowired      private Producer producer;        /**       * 及時消息隊列測試       */      @Test      public void test() {          MessageModel messageModel = MessageModel.builder()                  .message("測試消息")                  .titile("消息000")                  .build();          // 發送消息          producer.send(Producer.DEFAULT_QUEUE, messageModel);      }        /**       * 延時消息隊列測試       */      @Test      public void test2() {          for (int i = 0; i < 5; i++) {              MessageModel messageModel = MessageModel.builder()                      .titile("延遲10秒執行")                      .message("測試消息" + i)                      .build();              // 發送延遲消息              producer.delaySend(Producer.DEFAULT_QUEUE, messageModel, 10000L);          }          try {              // 休眠100秒,等等消息執行              Thread.currentThread().sleep(100000L);          } catch (InterruptedException e) {              e.printStackTrace();          }      }    }  

執行結果

2019-11-12 22:18:52.939  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息0)  2019-11-12 22:18:52.953  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息1)  2019-11-12 22:18:52.958  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息2)  2019-11-12 22:18:52.964  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息3)  2019-11-12 22:18:52.970  INFO 17263 --- [           main] c.e.demoactivemq.producer.Producer       : 發送消息:MessageModel(titile=延遲10秒執行, message=測試消息4)  2019-11-12 22:19:03.012  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息0)  2019-11-12 22:19:03.017  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息1)  2019-11-12 22:19:03.019  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息2)  2019-11-12 22:19:03.020  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息3)  2019-11-12 22:19:03.021  INFO 17263 --- [enerContainer-1] c.e.demoactivemq.producer.Consumer       : 收到消息:MessageModel(titile=延遲10秒執行, message=測試消息4)

比你優秀的人比你還努力,你有什麼資格不去奮鬥!!!