ActiveMQ学习总结——实战操作(上)02

  • 2019 年 10 月 11 日
  • 筆記

相信大家通过上一篇博文已经对ActiveMQ有了一个大致的概念了,

那么本篇博文将带领大家一步一步去实战操作我们的ActiveMQ

本篇主要内容:

  1.ActiveMQ术语及API介绍

  2.ActiveMQ 文本消息处理

  3.ActiveMQ 对象消息处理


既然我们要学习如何实战操作了,那么久不可不知它的一些术语和API的用于

即使觉得枯燥,我们也要大致看一眼,过后相信你还会再来看的!因为很有用

 

 

一 ActiveMQ 的术语(这么碉堡的东西怎么会没有写让人觉得更碉堡的词汇呢?)

1 Destination

目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。
MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination
才能接收消息。

 

2 Producer

消息生成者,负责发送 Message 到目的地。

 

3 Consumer | Receiver

消息消费者,负责从目的地中消费【处理|监听|订阅】Message。

 

4 Message

消息,消息封装一次通信的内容。

 


 

 

二 ActiveMQ API简介

下述API都是接口类型,由定义在javax.jms包中

都是JMS(Java Message Service)标准接口定义

 

1.ConnectionFactory

  链接工厂,用于创建链接的工厂类型

2.Connection

  链接,用于建立访问ActiveMQ连接的类型,由链接工厂ConnectionFactory创建

3.Session

  会话,一次持久有效有状态的访问,由链接创建

4.Destination & Queue

  目的地。用于描述本次访问ActiveMQ的消息访问目的地,即ActiveMQ服务中的具体队列,由会话创建

  interface Queue extends Destination

5.MessageProducer

  消息生成者、在一次有效会话中,用于发送消息给ActiveMQ服务的工具,由会话创建

6.MessageConsumer

  消息消费者【消息订阅者|消息处理着】,在一次有效会话中,用于从ActiveMQ服务中获取消息的工具,由会话创建

7.Message

  消息。通过消息生成者向ActiveMQ服务发送消息时使用的数据载体对象或消费者从ActiveMQ服务中获取消息时使用的数据载体对象。是所有消息【文本消息|对象消息等】具体类型的顶级接口,可以通过会话创建或通过会话从ActiveMQ服务中获取

 

 

 


 

三 ActiveMQ处理文本消息

:*本案例将以producer(消息生产者)和consumer(消息发送者)连个模块为例

 

准备工作:

  确保你的ActiveMQ已经开启,并且所依赖的端口已经关闭(建议直接关闭防火墙 service iptables stop)

  Maven环境

 

1 消息生产者

1.1创建模块

 

 

1,2在POM文件添加ActiveMQ的依赖(根据个人的ActiveMQ版本自行查找)

 <dependencies>          <dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>      </dependencies>

 

1.3 编写消息的生产者

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQTextProducer {        public void sendTextActiveMQ(String msg) {          //定义链接工厂          ConnectionFactory connectionFactory = null;            //定义链接对象          Connection connection = null;            //定义会话          Session session = null;            //目的地          Destination destination = null;            //定义消息的发送者          MessageProducer producer = null;            //定义消息          Message message = null;              try {              /**               * userName:访问ActiveMQ服务的用户名,默认为admin。               * password:访问ActiveMQ服务的密码,默认为admin               * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改               *               * borkerURL:访问ActiveMQ服务的路径地址。               *  路径结构为:协议名://主机地址:端口号               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介绍               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //创建链接对象              connection = connectionFactory.createConnection();                //启动连接              connection.start();                /**               * 参数一:transacted 是否使用事务 可选值为:true|false               *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事务,则设置我们的参数即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制               * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制               这里设置 这两个参数的含义为:               不使用事务,并由Session自动确认提交               * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列              //名称不固定              destination = session.createQueue("hello-mq");                //创建消息的生产者 需要指定目的地(也就是把Destination传入参数)              producer = session.createProducer(destination);                //创建消息对象 并传入我们需要放入队列的消息              message = session.createTextMessage(msg);                //发送消息              producer.send(message);            } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息发送者资源              if (producer != null) {                  try {                      producer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }          }  }

 

 

2. 消息消费者

2.1创建模块

 

 

 

2,2在POM文件添加ActiveMQ的依赖(根据个人的ActiveMQ版本自行查找)

 <dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>

 

2.3编写消息的消费者

package cn.arebirth.mq;    import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQTextConsumer {        public void receiveTextActiveMQ() {          //定义链接工厂          ConnectionFactory connectionFactory = null;            //定义链接对象          Connection connection = null;            //定义会话          Session session = null;            //目的地          Destination destination = null;            //定义消息的消费者(接收者)          MessageConsumer consumer = null;            //定义消息          Message message = null;              try {              /**               * userName:访问ActiveMQ服务的用户名,默认为admin。               * password:访问ActiveMQ服务的密码,默认为admin               * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改               *               * borkerURL:访问ActiveMQ服务的路径地址。               *  路径结构为:协议名://主机地址:端口号               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介绍               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //创建链接对象              connection = connectionFactory.createConnection();                //启动连接              connection.start();                /**               * 参数一:transacted 是否使用事务 可选值为:true|false               *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事务,则设置我们的参数即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制               * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制               这里设置 这两个参数的含义为:               不使用事务,并由Session自动确认提交               * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列              //名称不固定              destination = session.createQueue("hello-mq");                //创建消息的消费者 需要指定目的地(也就是把Destination传入参数)              consumer = session.createConsumer(destination);                //创建消息对象 由消费者接收消息              message = consumer.receive();                //处理消息              String msg = ((TextMessage) message).getText();              System.out.println("ActiveMQ say:" + msg);          } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息接收者资源              if (consumer != null) {                  try {                      consumer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }        }  }

 

 

3 测试成果

定义测试类,来测试我们的成果!

 

3.1 定义producer测试类

package cn.arebirth.mq;    public class ProducerTest {      public static void main(String[] args) {          ActiveMQTextProducer producer = new ActiveMQTextProducer();          producer.sendTextActiveMQ("Hello,ActiveMQ!");      }  }

当我们执行完这段代码后,我们打开我们的ActiveMQ的控制面板(首先你要启动你的ActiveMQ,上一篇有介绍到!)

 

 

接下来,当我们看到此图片内容即说明producer成果运行!(hello-mq我们自定义的队列名称!)

 

 

 


 

3.2 定义 Consumer测试类

package cn.arebirth.mq;    public class ConsumerTest {      public static void main(String[] args) {          ActiveMQTextConsumer consumer = new ActiveMQTextConsumer();          consumer.receiveTextActiveMQ();      }  }

 

运行结果出现如下内容即成果!

 

 

 

 

 

当我们仔细观察ActiveMQ控制台的时候会发现,

  producer每产生一条消息对应的Number Of Pending Message和Message Enqueued就+1

  consumer每次启动消费消息的时候,Number Of pending Message就会-1  Message Dequeued +1 

为什么呢?哈哈  相信大家已经大概猜出来了或者看英文的意思  

介绍下:

  • Name:消息的名字
  • Number Of Pending Message:为消费的消息数量
  • Number Of Consumers:消费者的数量
  • Message Enqueued:队列共有过多少消息
  • Message Dequeued:消费者消费了多少条消息

 

鼓捣完文本消息,那么有的人会说,只能鼓捣文本??错了,接下来我们搞实用的对象!(不是你和我,是我们和电脑 -. -)


 

 

三 ActiveMQ处理对象消息

有了上面处理文本消息的基础,我们将很容易掌握处理对象消息的能力!这里面的步骤,我们将略省一部分,和上面及其相似

 

1 对象创建(处理对象消息,那么一定是要有个数据载体的对象啦,我们自己创建一个即可     生产者和消费者均需要此对象!)

package cn.arebirth.pojo;    import java.io.Serializable;    /**   * 一定要序列化!!   */  public class User implements Serializable {      private String username;      private String pwd;      private String content;        @Override      public String toString() {          return "User{" +                  "username='" + username + ''' +                  ", pwd='" + pwd + ''' +                  ", content='" + content + ''' +                  '}';      }        public User() {      }        public User(String username, String pwd, String content) {          this.username = username;          this.pwd = pwd;          this.content = content;      }        public String getUsername() {          return username;      }        public void setUsername(String username) {          this.username = username;      }        public String getPwd() {          return pwd;      }        public void setPwd(String pwd) {          this.pwd = pwd;      }        public String getContent() {          return content;      }        public void setContent(String content) {          this.content = content;      }  }

 

2 消息生产者

2.1 还是需要pom的jar包依赖的的,如果还在原来的环境上,则不需要添加啦!

<dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>

 

2.2 编写消息的生产者

package cn.arebirth.mq;    import cn.arebirth.pojo.User;  import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQObjectProducer {        public void sendObjectActiveMQ(User user){          //定义链接工厂          ConnectionFactory connectionFactory = null;            //定义链接对象          Connection connection = null;            //定义会话          Session session = null;            //目的地          Destination destination = null;            //定义消息的发送者          MessageProducer producer = null;            //定义消息          Message message = null;              try {              /**               * userName:访问ActiveMQ服务的用户名,默认为admin。               * password:访问ActiveMQ服务的密码,默认为admin               * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改               *               * borkerURL:访问ActiveMQ服务的路径地址。               *  路径结构为:协议名://主机地址:端口号               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介绍               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //创建链接对象              connection = connectionFactory.createConnection();                //启动连接              connection.start();                /**               * 参数一:transacted 是否使用事务 可选值为:true|false               *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事务,则设置我们的参数即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制               * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制               这里设置 这两个参数的含义为:               不使用事务,并由Session自动确认提交               * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列              //名称不固定              destination = session.createQueue("hello-mq-obj");                //创建消息的生产者 需要指定目的地(也就是把Destination传入参数)              producer = session.createProducer(destination);                //创建消息对象 并传入我们需要放入队列的消息              message = session.createObjectMessage(user);                //发送消息              producer.send(message);            } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息发送者资源              if (producer != null) {                  try {                      producer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }      }  }

 

 

3 消息消费者

3.1 还是需要pom的jar包依赖的的,如果还在原来的环境上,则不需要添加啦!

<dependency>              <groupId>org.apache.activemq</groupId>              <artifactId>activemq-all</artifactId>              <version>5.9.0</version>          </dependency>

 

3.2  编写消息的消费者

package cn.arebirth.mq;    import cn.arebirth.pojo.User;  import org.apache.activemq.ActiveMQConnectionFactory;    import javax.jms.*;    public class ActiveMQObjectConsumer {      public void receiveObjectActiveMQ() {          //定义链接工厂          ConnectionFactory connectionFactory = null;            //定义链接对象          Connection connection = null;            //定义会话          Session session = null;            //目的地          Destination destination = null;            //定义消息的消费者(接收者)          MessageConsumer consumer = null;            //定义消息          Message message = null;              try {              /**               * userName:访问ActiveMQ服务的用户名,默认为admin。               * password:访问ActiveMQ服务的密码,默认为admin               * 用户名和用户密码都可以通过ActiveMQ安装目录的oonf目录下的jetty-ream.properties文件进行修改               *               * borkerURL:访问ActiveMQ服务的路径地址。               *  路径结构为:协议名://主机地址:端口号               *  在conf/activemq.xml文件中可以找到修改               *  在上一篇文章中都有介绍               */              connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://169.254.18.20:61616");                //创建链接对象              connection = connectionFactory.createConnection();                //启动连接              connection.start();                /**               * 参数一:transacted 是否使用事务 可选值为:true|false               *      ture:使用事务 设置第二个参数变量这为 Session.SESSION_TRANSACTION 交由session管理               *      false:不使用事务,则设置我们的参数即可               *               * acknowledgeMode:               * * Session.AUTO_ACKNOWLEDGE:自动消息确认机制               * * Session.CLIENT_ACKNOWLEDGE:客户端确认机制               * * Session.DUPS_OK_ACKNOWLEDGE:有副本的客户端确认消息机制               这里设置 这两个参数的含义为:               不使用事务,并由Session自动确认提交               * 这里对此不作过多讲解,初学者跟着敲,其后在慢慢了解原理               *               */              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                //创建目的地,目的地名称即队列的名称,也就是保存我们消息地方的名称。消息的消费者需要通过此名称访问对应的队列              //名称不固定              destination = session.createQueue("hello-mq-obj");                //创建消息的消费者 需要指定目的地(也就是把Destination传入参数)              consumer = session.createConsumer(destination);                //创建消息对象 由消费者接收消息              message = consumer.receive();                //处理消息              ObjectMessage objectMessage = (ObjectMessage) message;              User user = (User) objectMessage.getObject();              System.out.println("ActiveMQ say:" + user);          } catch (Exception e) {              e.printStackTrace();          } finally {              //回收消息接收者资源              if (consumer != null) {                  try {                      consumer.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (session != null) {                  try {                      session.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }              if (connection != null) {                  try {                      connection.close();                  } catch (JMSException e) {                      e.printStackTrace();                  }              }          }        }  }

 

 

 

4 创建测试类

4.1 Produce生产者测试代码

package cn.arebirth.mq;    import cn.arebirth.pojo.User;    public class ProducerTest {      public static void main(String[] args) {          ActiveMQObjectProducer producer = new ActiveMQObjectProducer();          producer.sendObjectActiveMQ(new User("Arebirth","123","Hello,ActiveMQ!"));      }  }

 

当我们执行完这段代码后,我们打开我们的ActiveMQ的控制面板(首先你要启动你的ActiveMQ,上一篇有介绍到!)

 

 会看到 队列中增加了我们的消息!

 

4.2 Consumer消费者测试代码

package cn.arebirth.mq;    public class ConsumerTest {      public static void main(String[] args) {          ActiveMQObjectConsumer consumer = new ActiveMQObjectConsumer();          consumer.receiveObjectActiveMQ();      }  }

当我们执行后,出现下面的内容即可!成功啦!

 

 在看下,控制面板,

 

 

是不是已经和上一个列子一样啦!

 

 

ps

  本篇内容就到这里了,希望各位勤加敲代码,毕竟代码不是看出来的~