從通訊開始聊聊消息中間件

一、系統間通訊方式

我們可以想到的方式:

  • 基於文件
  • 基於共享記憶體
  • 基於IPC
  • 基於Socket
  • 基於資料庫
  • 基於RPC

各個模式的缺點:

  • 文件:使用不方便,不及時
  • Socket:使用麻煩,多數情況下不如RPC
  • 資料庫:不實時,但是經常有人拿資料庫模擬消息隊列
  • RPC:會導致調用關係複雜,同步處理,壓力大的時候無法緩衝

我們期望有一種通訊方式:

  • 可以實現非同步的消息通訊
  • 可以簡化參與各方的複雜依賴關係
  • 可以在請求量很大的時候緩衝一下
  • 某些情況下能保障消息的可靠性、順序性

這就是MQ(Message Queue),MQ滿足了以上的所有特性。另外對比其他通訊模式,MQ的優勢還有:

  • 非同步通訊:非同步通訊,減少執行緒等待
  • 系統解耦:系統不直接調用,降低依賴
  • 削峰平谷:壓力大的時候,緩衝部分請求消息
  • 可靠通訊:提供多種消息模式、保證消息儘可能不丟失

二、MQ的簡單介紹

從隊列到消息服務

一個隊列:

多個隊列:

常見的兩種消息模式

  • 點對點模式:一個生產者對應一個消費者
  • 發布訂閱模式:一個生產者對應多個消費者

消息處理的保障

三種QoS

  • At most once,至多一次,消息可能丟失但是不會重複發送
  • At least once,至少一次,消息不會丟失,但是可能會重複
  • Exactly once,精確一次,每天消息被傳輸有且僅有一次

消息的有序性

同一個Topic或Queue的消息,保障消息順序投遞

注意:如果做了消息分區或者批量預取之類的操作,可能就沒有順序了

消息協議

  1. STOMP :面向流文本的消息傳輸協議,是WebSocket通訊標準。
  • 協議簡單、易於實現。
  1. JMS:面向Java平台的標準消息傳遞API
  • 在JVM語言(如Scala)上具又互用性
  • 支援事務
  • 有queue和topic兩種傳遞模型
  • 能夠定義消息格式(消息頭、屬性、內容)
  • 無須擔心底層協議
  1. AMQP

JMS可以在任何的底層協議上運行,但是API是與程式語言綁定的。而AMQP能夠支援兩種不同的程式語言使用它來傳遞消息。

  • 獨立於平台的底層消息傳遞協議
  • 跨語言和平台
  • 它是底層協議的
  • 支援事務、分散式事務
  • 有derect、fanout、topic、headers、system五種交換類型
  • 支援長周期消息傳遞
  • 使用SASL和TLS確保安全性
  1. MQTT : 專為小設備設計的,是物聯網生態中主要成分之一。
  • 面向流,記憶體佔用低
  • 低頻寬下能發送消息。
  • 不允許分段消息(很難發送長消息)
  • 主持主題發布-訂閱
  • 不支援事務
  • 不支援安全連接
  1. XMPP:一個開源形式組織產生的網路即時通訊協議
  • 基於XML的協議
  • 簡單的客戶端,將大多數工作都放在服務端進行
  1. Open Messaging:由阿里、雅虎、滴滴等公司共同參與創立的分散式消息中間件開發標準
  • 結構簡單
  • 解析速度快
  • 支援事務和持久化設計

為什麼消息中間件不使用http協議呢?

  1. 因為http請求報文和響應報文是比較複雜的,包含了cookie、狀態碼、加解密等等附加的功能,但是對於消息來說不需要這麼複雜。
  2. 大部分情況下http是短連接,在實際的交互中,如果請求到響應的過程中中斷了,中斷之後就不會持久化,就會造成消息丟失。

三、消息中間件

三代消息中間件:

  1. ActiveMQ、RabbitMQ
  2. Kafka、RocketMQ
  3. Apache Pulsar

1. ActiveMQ介紹

  • 高可靠、事務性的消息隊列
  • 當前應用最廣泛的開源消息中間件
  • 功能最全的開源消息隊列

主要功能:

  • 多種語言和協議編寫客戶端

語言:Java、C、C++、C#、Ruby、Perl、Python、PHP
協議:OpenWire、Stomp Rest、WS Notification、XMPP、AMQP、MQTT

  • 完全支援JMS1.1和J2EE 1.4規範
  • 與Spring很好的集成
  • 支援多種傳送協議:TCP、SSL、NIO、UDP、JGroups、JXTA
  • 支援通過JDBC和journal提供高速的消息持久化
  • 實現了高性能的集群模式

使用場景:

  1. 所有需要使用消息隊列的地方
  2. 訂單處理、消息通知、服務降級等
  3. 純java實現,可以支援嵌入到應用系統

使用教程

  1. activemq下載鏈接;//archive.apache.org/dist/activemq/

  2. 下載最新版解壓,然後進入apache-activemq-5.16.3-bin\apache-activemq-5.16.3\bin\win64目錄,點擊activemq.bat啟動。

如果出現埠佔用,可以conf/activemq.xml修改埠。

  1. 引入依賴:
<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
 </dependency>
  1. 點對點
package com.mmc.springbootstudy.activemq.p2p;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("test-queue");
        //生產者
        MessageProducer producer = session.createProducer(queue);
        //設置不持久化
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        //發送消息
        for (int i = 0; i < 1; i++) {
            sendMsg(session,producer,i);
        }
        session.commit();
        connection.close();
    }

    private static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
        TextMessage textMessage = session.createTextMessage("Hello ActiveMQ " + i);
        producer.send(textMessage);
    }
}

package com.mmc.springbootstudy.activemq.p2p;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 接收者
 */
public class JmsReceiver {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Queue queue = session.createQueue("test-queue");
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            TextMessage receive = (TextMessage) consumer.receive();
            System.out.println(receive.getText());
            receive.acknowledge();
        }

    }
}

  1. 發布-訂閱
public class TopSend {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        MessageProducer producer = session.createProducer(null);
        TextMessage textMessage = session.createTextMessage("hello,topic");
        Topic topic = session.createTopic("test-topic");
        producer.send(topic,textMessage);
    }
}
public class TopReceiver {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Topic topic = session.createTopic("test-topic");
        MessageConsumer consumer = session.createConsumer(topic);
        while (true){
            TextMessage receive = (TextMessage) consumer.receive();
            System.out.println(receive.getText());
        }

    }
}

或者接受消息用監聽的方式。

public class TopReceiver {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Topic topic = session.createTopic("test-topic");
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    System.out.println(((TextMessage)message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

未完。。。下篇繼續講kafka