­

消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

  • 2019 年 11 月 1 日
  • 笔记

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

本文链接:https://blog.csdn.net/eguid_1/article/details/79300799

1、实现功能

希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用

1、发送Topic

2、发送Queue

3、接收Topic

4、接收Queue

2、接口设计

根据功能设计公共调用接口

/**   * 数据分发接口(用于发送、接收消息队列数据)   *   * @author eguid   *   */  public interface MsgDistributeInterface {    	/**  	 * 发送到主题  	 *  	 * @param topicName -主题  	 * @param data -数据  	 * @return  	 */  	public boolean sendTopic(String topicName, byte[] data);    	/**  	 * 发送到主题  	 * @param topicName -主题  	 * @param data-数据  	 * @param offset -偏移量  	 * @param length -长度  	 * @return  	 */  	boolean sendTopic(String topicName, byte[] data, int offset, int length);    	/**  	 * 发送到队列  	 *  	 * @param queueName -队列名称  	 * @param data -数据  	 * @return  	 */  	public boolean sendQueue(String queueName, byte[] data);    	/**  	 * 发送到队列  	 * @param queueName -队列名称  	 * @param data -数据  	 * @param offset  	 * @param length  	 * @return  	 */  	public boolean sendQueue(String queueName, byte[] data,int offset, int length);    	/**  	 * 接收队列消息  	 * @param queueName 队列名称  	 * @param listener  	 * @throws JMSException  	 */  	void receiveQueue(String queueName, MessageListener listener) throws JMSException;    	/**  	 * 订阅主题  	 * @param topicName -主题名称  	 * @param listener  	 * @throws JMSException  	 */  	void receiveTopic(String topicName, MessageListener listener) throws JMSException;  }

3、基于ActiveMQ的接口实现

/**   * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)   *   * @author eguid   *   */  public class ActiveMQImpl implements MsgDistributeInterface {    	private String userName;  	private String password;  	private String brokerURL;  	private boolean persistentMode;//持久化模式  	//连接工厂  	ConnectionFactory connectionFactory;  	//发送消息的线程  	Connection connection;  	// 事务管理  	Session session;    	//存放各个线程订阅模式生产者  	ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();  	//存放各个线程队列模式生产者  	ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();    	public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {  		this(userName, password, brokerURL, true);  	}    	public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {  		this.userName = userName;  		this.password = password;  		this.brokerURL = brokerURL;  		this.persistentMode=persistentMode;  		init();  	}    	public void init() throws JMSException {  		try {  			// 创建一个链接工厂  			connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);  			// 从工厂中创建一个链接  			connection = connectionFactory.createConnection();  			// 开启链接  			connection.start();  			// 创建一个事务(订阅模式,事务采用自动确认方式)  			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  		} catch (JMSException e) {  			throw e;  		}  	}    	@Override  	public boolean sendTopic(String topicName, byte[] data) {  		return sendTopic(topicName, data, 0, data.length);  	}    	@Override  	public boolean sendTopic(String topicName, byte[] data, int offset, int length) {  		return send(true, topicName, data, offset, length);  	}    	@Override  	public boolean sendQueue(String queueName, byte[] data) {  		return sendQueue(queueName, data, 0, data.length);  	}    	@Override  	public boolean sendQueue(String queueName, byte[] data, int offset, int length) {  		return send(false, queueName, data, offset, length);  	}    	/**  	 * 发送数据  	 *  	 * @param name  	 * @param data  	 * @param offset  	 * @param length  	 * @param type  	 *            -类型  	 * @return  	 */  	private boolean send(boolean type, String name, byte[] data, int offset, int length) {  		try {  			MessageProducer messageProducer = getMessageProducer(name, type);    			BytesMessage msg = createBytesMsg(data, offset, length);  			  System.err.println(Thread.currentThread().getName()+"发送消息");  			// 发送消息  			messageProducer.send(msg);  		} catch (JMSException e) {  			return false;  		}  		return false;  	}    	public void receive(String topicName) throws JMSException {  		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  		Topic topic =session.createTopic(topicName);  		MessageConsumer consumer=session.createConsumer(topic);  		consumer.setMessageListener(new MessageListener() {  			@Override  			public void onMessage(Message message) {  				BytesMessage msg=(BytesMessage) message;  				System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());  			}  		});    	}  	/**  	 * 创建字节数组消息  	 *  	 * @param data  	 * @param offset  	 * @param length  	 * @return  	 * @throws JMSException  	 */  	private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {  		BytesMessage msg = session.createBytesMessage();  		msg.writeBytes(data, offset, length);  		return msg;  	}    	/**  	 * 创建对象序列化消息  	 * @param obj  	 * @return  	 * @throws JMSException  	 */  	private ObjectMessage createMapMsg(Serializable obj) throws JMSException {  //		MapMessage msg = session.createMapMessage();//key-value形式的消息  		ObjectMessage msg = session.createObjectMessage(obj);  		return msg;  	}    	/**  	 * 创建字符串消息  	 * @param text  	 * @return  	 * @throws JMSException  	 */  	private TextMessage createTextMsg(String text) throws JMSException {  		TextMessage msg = session.createTextMessage(text);  		return msg;  	}      	/**  	 * 获取创建者  	 *  	 * @param name -名称(主题名称和队列名称)  	 * @param type -类型(true:topic,false:queue)  	 * @return  	 * @throws JMSException  	 */  	private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {  		return type?getTopicProducer(name):getQueueProducer(name);  	}    	/**  	 * 创建或获取队列  	 * @param queueName  	 * @return  	 * @throws JMSException  	 */  	private MessageProducer getQueueProducer(String queueName) throws JMSException {  		MessageProducer messageProducer = null;  		if ((messageProducer = queueThreadLocal.get()) == null) {  			Queue queue = session.createQueue(queueName);  			messageProducer = session.createProducer(queue);  			//是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))  			messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);  			queueThreadLocal.set(messageProducer);  		}  		return messageProducer;  	}    	/**  	 * 创建或获取主题  	 * @param topicName  	 * @return  	 * @throws JMSException  	 */  	private MessageProducer getTopicProducer(String topicName) throws JMSException {  		MessageProducer messageProducer = null;  		if ((messageProducer = topicThreadLocal.get()) == null) {  			Topic topic = session.createTopic(topicName);  			messageProducer = session.createProducer(topic);  			//是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))  			messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);  			topicThreadLocal.set(messageProducer);  		}  		return  messageProducer;  	}    	public String getPassword() {  		return password;  	}    	public void setPassword(String password) {  		this.password = password;  	}    	@Override  	public void receiveQueue(String queueName,MessageListener listener) throws JMSException {  		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  		Queue topic =session.createQueue(queueName);  		MessageConsumer consumer=session.createConsumer(topic);  		consumer.setMessageListener(listener);    	}    	@Override  	public void receiveTopic(String topicName,MessageListener listener) throws JMSException {  		final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  		Topic topic =session.createTopic(topicName);  		MessageConsumer consumer=session.createConsumer(topic);  		consumer.setMessageListener(listener);  	}

4、测试一下Topic和Queue

public static void main(String[] args) throws JMSException{  		//如果创建失败会立即抛出异常  		MsgDistributeInterface  producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");          Test testMq = new Test();          try {              Thread.sleep(1000);          } catch (InterruptedException e) {              e.printStackTrace();          }          //Thread 1          new Thread(testMq.new ProductorMq(producter)).start();          //Thread 2          new Thread(testMq.new ProductorMq(producter)).start();          //Thread 3          new Thread(testMq.new ProductorMq(producter)).start();          //Thread 4          new Thread(testMq.new ProductorMq(producter)).start();          //Thread 5          new Thread(testMq.new ProductorMq(producter)).start();          //Thread 6          new Thread(testMq.new ProductorMq(producter)).start();            //订阅接收线程Thread 1          new Thread(new Runnable() {  			@Override  			public void run() {  				try {  					producter.receiveTopic("eguid-topic",new MessageListener() {  						@Override  						public void onMessage(Message message) {  							BytesMessage msg=(BytesMessage) message;  							System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());  						}  					});  				} catch (JMSException e) {  					// TODO Auto-generated catch block  					e.printStackTrace();  				}  			}  		}).start();          //订阅接收线程Thread 2          new Thread(new Runnable() {  			@Override  			public void run() {  				try {  					producter.receiveTopic("eguid-topic",new MessageListener() {  						@Override  						public void onMessage(Message message) {  							BytesMessage msg=(BytesMessage) message;  							System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());  						}  					});  				} catch (JMSException e) {  					// TODO Auto-generated catch block  					e.printStackTrace();  				}  			}  		}).start();          //队列消息生产线程Thread-1          new Thread(testMq.new  QueueProductor(producter)).start();          //队列消息生产线程Thread-2          new Thread(testMq.new  QueueProductor(producter)).start();          //队列接收线程Thread 1          new Thread(new Runnable() {  			@Override  			public void run() {  				try {  					producter.receiveQueue("eguid-queue",new MessageListener() {  						@Override  						public void onMessage(Message message) {  							BytesMessage msg=(BytesMessage) message;  							System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());  						}  					});  				} catch (JMSException e) {  					// TODO Auto-generated catch block  					e.printStackTrace();  				}  			}  		}).start();        //队列接收线程Thread2          new Thread(new Runnable() {  			@Override  			public void run() {  				try {  					producter.receiveQueue("eguid-queue",new MessageListener() {  						@Override  						public void onMessage(Message message) {  							BytesMessage msg=(BytesMessage) message;  							System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());  						}  					});  				} catch (JMSException e) {  					// TODO Auto-generated catch block  					e.printStackTrace();  				}  			}  		}).start();      }        private class ProductorMq implements Runnable{      	Jtt809MsgProducter producter;          public ProductorMq(Jtt809MsgProducter producter){              this.producter = producter;          }            @Override          public void run() {              while(true){                  try {                  	String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";                      producter.sendTopic("eguid-topic",wang.getBytes());                        Thread.sleep(2000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }      }        private class QueueProductor implements Runnable{      	Jtt809MsgProducter producter;          public QueueProductor(Jtt809MsgProducter producter){              this.producter = producter;          }            @Override          public void run() {              while(true){                  try {                  	String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";                      producter.sendQueue("eguid-queue",eguid.getBytes());                      Thread.sleep(2000);                  } catch (InterruptedException e) {                      e.printStackTrace();                  }              }          }      }