消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)
- 2019 年 11 月 1 日
- 笔记
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/eguid_1/article/details/79300799
1、实现功能
希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用
1、发送Topic
2、发送Queue
3、接收Topic
4、接收Queue
2、接口设计
根据功能设计公共调用接口
/** * 数据分发接口(用于发送、接收消息队列数据) * * @author eguid * */ public interface MsgDistributeInterface { /** * 发送到主题 * * @param topicName -主题 * @param data -数据 * @return */ public boolean sendTopic(String topicName, byte[] data); /** * 发送到主题 * @param topicName -主题 * @param data-数据 * @param offset -偏移量 * @param length -长度 * @return */ boolean sendTopic(String topicName, byte[] data, int offset, int length); /** * 发送到队列 * * @param queueName -队列名称 * @param data -数据 * @return */ public boolean sendQueue(String queueName, byte[] data); /** * 发送到队列 * @param queueName -队列名称 * @param data -数据 * @param offset * @param length * @return */ public boolean sendQueue(String queueName, byte[] data,int offset, int length); /** * 接收队列消息 * @param queueName 队列名称 * @param listener * @throws JMSException */ void receiveQueue(String queueName, MessageListener listener) throws JMSException; /** * 订阅主题 * @param topicName -主题名称 * @param listener * @throws JMSException */ void receiveTopic(String topicName, MessageListener listener) throws JMSException; }
3、基于ActiveMQ的接口实现
/** * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常) * * @author eguid * */ public class ActiveMQImpl implements MsgDistributeInterface { private String userName; private String password; private String brokerURL; private boolean persistentMode;//持久化模式 //连接工厂 ConnectionFactory connectionFactory; //发送消息的线程 Connection connection; // 事务管理 Session session; //存放各个线程订阅模式生产者 ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>(); //存放各个线程队列模式生产者 ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>(); public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException { this(userName, password, brokerURL, true); } public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException { this.userName = userName; this.password = password; this.brokerURL = brokerURL; this.persistentMode=persistentMode; init(); } public void init() throws JMSException { try { // 创建一个链接工厂 connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL); // 从工厂中创建一个链接 connection = connectionFactory.createConnection(); // 开启链接 connection.start(); // 创建一个事务(订阅模式,事务采用自动确认方式) session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { throw e; } } @Override public boolean sendTopic(String topicName, byte[] data) { return sendTopic(topicName, data, 0, data.length); } @Override public boolean sendTopic(String topicName, byte[] data, int offset, int length) { return send(true, topicName, data, offset, length); } @Override public boolean sendQueue(String queueName, byte[] data) { return sendQueue(queueName, data, 0, data.length); } @Override public boolean sendQueue(String queueName, byte[] data, int offset, int length) { return send(false, queueName, data, offset, length); } /** * 发送数据 * * @param name * @param data * @param offset * @param length * @param type * -类型 * @return */ private boolean send(boolean type, String name, byte[] data, int offset, int length) { try { MessageProducer messageProducer = getMessageProducer(name, type); BytesMessage msg = createBytesMsg(data, offset, length); System.err.println(Thread.currentThread().getName()+"发送消息"); // 发送消息 messageProducer.send(msg); } catch (JMSException e) { return false; } return false; } public void receive(String topicName) throws JMSException { final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic =session.createTopic(topicName); MessageConsumer consumer=session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString()); } }); } /** * 创建字节数组消息 * * @param data * @param offset * @param length * @return * @throws JMSException */ private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException { BytesMessage msg = session.createBytesMessage(); msg.writeBytes(data, offset, length); return msg; } /** * 创建对象序列化消息 * @param obj * @return * @throws JMSException */ private ObjectMessage createMapMsg(Serializable obj) throws JMSException { // MapMessage msg = session.createMapMessage();//key-value形式的消息 ObjectMessage msg = session.createObjectMessage(obj); return msg; } /** * 创建字符串消息 * @param text * @return * @throws JMSException */ private TextMessage createTextMsg(String text) throws JMSException { TextMessage msg = session.createTextMessage(text); return msg; } /** * 获取创建者 * * @param name -名称(主题名称和队列名称) * @param type -类型(true:topic,false:queue) * @return * @throws JMSException */ private MessageProducer getMessageProducer(String name, boolean type) throws JMSException { return type?getTopicProducer(name):getQueueProducer(name); } /** * 创建或获取队列 * @param queueName * @return * @throws JMSException */ private MessageProducer getQueueProducer(String queueName) throws JMSException { MessageProducer messageProducer = null; if ((messageProducer = queueThreadLocal.get()) == null) { Queue queue = session.createQueue(queueName); messageProducer = session.createProducer(queue); //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费)) messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); queueThreadLocal.set(messageProducer); } return messageProducer; } /** * 创建或获取主题 * @param topicName * @return * @throws JMSException */ private MessageProducer getTopicProducer(String topicName) throws JMSException { MessageProducer messageProducer = null; if ((messageProducer = topicThreadLocal.get()) == null) { Topic topic = session.createTopic(topicName); messageProducer = session.createProducer(topic); //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费)) messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT); topicThreadLocal.set(messageProducer); } return messageProducer; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } @Override public void receiveQueue(String queueName,MessageListener listener) throws JMSException { final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Queue topic =session.createQueue(queueName); MessageConsumer consumer=session.createConsumer(topic); consumer.setMessageListener(listener); } @Override public void receiveTopic(String topicName,MessageListener listener) throws JMSException { final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Topic topic =session.createTopic(topicName); MessageConsumer consumer=session.createConsumer(topic); consumer.setMessageListener(listener); }
4、测试一下Topic和Queue
public static void main(String[] args) throws JMSException{ //如果创建失败会立即抛出异常 MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616"); Test testMq = new Test(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //Thread 1 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 2 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 3 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 4 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 5 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 6 new Thread(testMq.new ProductorMq(producter)).start(); //订阅接收线程Thread 1 new Thread(new Runnable() { @Override public void run() { try { producter.receiveTopic("eguid-topic",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); //订阅接收线程Thread 2 new Thread(new Runnable() { @Override public void run() { try { producter.receiveTopic("eguid-topic",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); //队列消息生产线程Thread-1 new Thread(testMq.new QueueProductor(producter)).start(); //队列消息生产线程Thread-2 new Thread(testMq.new QueueProductor(producter)).start(); //队列接收线程Thread 1 new Thread(new Runnable() { @Override public void run() { try { producter.receiveQueue("eguid-queue",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); //队列接收线程Thread2 new Thread(new Runnable() { @Override public void run() { try { producter.receiveQueue("eguid-queue",new MessageListener() { @Override public void onMessage(Message message) { BytesMessage msg=(BytesMessage) message; System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString()); } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }).start(); } private class ProductorMq implements Runnable{ Jtt809MsgProducter producter; public ProductorMq(Jtt809MsgProducter producter){ this.producter = producter; } @Override public void run() { while(true){ try { String wang=Thread.currentThread().getName()+"Hello eguid! This is topic."; producter.sendTopic("eguid-topic",wang.getBytes()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } } private class QueueProductor implements Runnable{ Jtt809MsgProducter producter; public QueueProductor(Jtt809MsgProducter producter){ this.producter = producter; } @Override public void run() { while(true){ try { String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue."; producter.sendQueue("eguid-queue",eguid.getBytes()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } } }