从通信开始聊聊消息中间件

一、系统间通信方式

我们可以想到的方式:

  • 基于文件
  • 基于共享内存
  • 基于IPC
  • 基于Socket
  • 基于数据库
  • 基于RPC

各个模式的缺点:

  • 文件:使用不方便,不及时
  • Socket:使用麻烦,多数情况下不如RPC
  • 数据库:不实时,但是经常有人拿数据库模拟消息队列
  • RPC:会导致调用关系复杂,同步处理,压力大的时候无法缓冲

我们期望有一种通信方式:

  • 可以实现异步的消息通信
  • 可以简化参与各方的复杂依赖关系
  • 可以在请求量很大的时候缓冲一下
  • 某些情况下能保障消息的可靠性、顺序性

这就是MQ(Message Queue),MQ满足了以上的所有特性。另外对比其他通信模式,MQ的优势还有:

  • 异步通信:异步通信,减少线程等待
  • 系统解耦:系统不直接调用,降低依赖
  • 削峰平谷:压力大的时候,缓冲部分请求消息
  • 可靠通信:提供多种消息模式、保证消息尽可能不丢失

二、MQ的简单介绍

从队列到消息服务

一个队列:

多个队列:

常见的两种消息模式

  • 点对点模式:一个生产者对应一个消费者
  • 发布订阅模式:一个生产者对应多个消费者

消息处理的保障

三种QoS

  • At most once,至多一次,消息可能丢失但是不会重复发送
  • At least once,至少一次,消息不会丢失,但是可能会重复
  • Exactly once,精确一次,每天消息被传输有且仅有一次

消息的有序性

同一个Topic或Queue的消息,保障消息顺序投递

注意:如果做了消息分区或者批量预取之类的操作,可能就没有顺序了

消息协议

  1. STOMP :面向流文本的消息传输协议,是WebSocket通信标准。
  • 协议简单、易于实现。
  1. JMS:面向Java平台的标准消息传递API
  • 在JVM语言(如Scala)上具又互用性
  • 支持事务
  • 有queue和topic两种传递模型
  • 能够定义消息格式(消息头、属性、内容)
  • 无须担心底层协议
  1. AMQP

JMS可以在任何的底层协议上运行,但是API是与编程语言绑定的。而AMQP能够支持两种不同的编程语言使用它来传递消息。

  • 独立于平台的底层消息传递协议
  • 跨语言和平台
  • 它是底层协议的
  • 支持事务、分布式事务
  • 有derect、fanout、topic、headers、system五种交换类型
  • 支持长周期消息传递
  • 使用SASL和TLS确保安全性
  1. MQTT : 专为小设备设计的,是物联网生态中主要成分之一。
  • 面向流,内存占用低
  • 低带宽下能发送消息。
  • 不允许分段消息(很难发送长消息)
  • 主持主题发布-订阅
  • 不支持事务
  • 不支持安全连接
  1. XMPP:一个开源形式组织产生的网络即时通信协议
  • 基于XML的协议
  • 简单的客户端,将大多数工作都放在服务端进行
  1. Open Messaging:由阿里、雅虎、滴滴等公司共同参与创立的分布式消息中间件开发标准
  • 结构简单
  • 解析速度快
  • 支持事务和持久化设计

为什么消息中间件不使用http协议呢?

  1. 因为http请求报文和响应报文是比较复杂的,包含了cookie、状态码、加解密等等附加的功能,但是对于消息来说不需要这么复杂。
  2. 大部分情况下http是短连接,在实际的交互中,如果请求到响应的过程中中断了,中断之后就不会持久化,就会造成消息丢失。

三、消息中间件

三代消息中间件:

  1. ActiveMQ、RabbitMQ
  2. Kafka、RocketMQ
  3. Apache Pulsar

1. ActiveMQ介绍

  • 高可靠、事务性的消息队列
  • 当前应用最广泛的开源消息中间件
  • 功能最全的开源消息队列

主要功能:

  • 多种语言和协议编写客户端

语言:Java、C、C++、C#、Ruby、Perl、Python、PHP
协议:OpenWire、Stomp Rest、WS Notification、XMPP、AMQP、MQTT

  • 完全支持JMS1.1和J2EE 1.4规范
  • 与Spring很好的集成
  • 支持多种传送协议:TCP、SSL、NIO、UDP、JGroups、JXTA
  • 支持通过JDBC和journal提供高速的消息持久化
  • 实现了高性能的集群模式

使用场景:

  1. 所有需要使用消息队列的地方
  2. 订单处理、消息通知、服务降级等
  3. 纯java实现,可以支持嵌入到应用系统

使用教程

  1. activemq下载链接;//archive.apache.org/dist/activemq/

  2. 下载最新版解压,然后进入apache-activemq-5.16.3-bin\apache-activemq-5.16.3\bin\win64目录,点击activemq.bat启动。

如果出现端口占用,可以conf/activemq.xml修改端口。

  1. 引入依赖:
<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
 </dependency>
  1. 点对点
package com.mmc.springbootstudy.activemq.p2p;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("test-queue");
        //生产者
        MessageProducer producer = session.createProducer(queue);
        //设置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //发送消息
        for (int i = 0; i < 1; i++) {
            sendMsg(session,producer,i);
        }
        session.commit();
        connection.close();
    }

    private static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        TextMessage textMessage = session.createTextMessage("Hello ActiveMQ " + i);
        producer.send(textMessage);
    }
}

package com.mmc.springbootstudy.activemq.p2p;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 接收者
 */
public class JmsReceiver {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("test-queue");
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            TextMessage receive = (TextMessage) consumer.receive();
            System.out.println(receive.getText());
            receive.acknowledge();
        }

    }
}

  1. 发布-订阅
public class TopSend {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);
        TextMessage textMessage = session.createTextMessage("hello,topic");
        Topic topic = session.createTopic("test-topic");
        producer.send(topic,textMessage);
    }
}
public class TopReceiver {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Topic topic = session.createTopic("test-topic");
        MessageConsumer consumer = session.createConsumer(topic);
        while (true){
            TextMessage receive = (TextMessage) consumer.receive();
            System.out.println(receive.getText());
        }

    }
}

或者接受消息用监听的方式。

public class TopReceiver {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Topic topic = session.createTopic("test-topic");
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println(((TextMessage)message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

未完。。。下篇继续讲kafka