從通訊開始聊聊消息中間件
一、系統間通訊方式
我們可以想到的方式:
- 基於文件
- 基於共享記憶體
- 基於IPC
- 基於Socket
- 基於資料庫
- 基於RPC
各個模式的缺點:
- 文件:使用不方便,不及時
- Socket:使用麻煩,多數情況下不如RPC
- 資料庫:不實時,但是經常有人拿資料庫模擬消息隊列
- RPC:會導致調用關係複雜,同步處理,壓力大的時候無法緩衝
我們期望有一種通訊方式:
- 可以實現非同步的消息通訊
- 可以簡化參與各方的複雜依賴關係
- 可以在請求量很大的時候緩衝一下
- 某些情況下能保障消息的可靠性、順序性
這就是MQ(Message Queue),MQ滿足了以上的所有特性。另外對比其他通訊模式,MQ的優勢還有:
- 非同步通訊:非同步通訊,減少執行緒等待
- 系統解耦:系統不直接調用,降低依賴
- 削峰平谷:壓力大的時候,緩衝部分請求消息
- 可靠通訊:提供多種消息模式、保證消息儘可能不丟失
二、MQ的簡單介紹
從隊列到消息服務
一個隊列:
多個隊列:
常見的兩種消息模式
- 點對點模式:一個生產者對應一個消費者
- 發布訂閱模式:一個生產者對應多個消費者
消息處理的保障
三種QoS
- At most once,至多一次,消息可能丟失但是不會重複發送
- At least once,至少一次,消息不會丟失,但是可能會重複
- Exactly once,精確一次,每天消息被傳輸有且僅有一次
消息的有序性
同一個Topic或Queue的消息,保障消息順序投遞
注意:如果做了消息分區或者批量預取之類的操作,可能就沒有順序了
消息協議
- STOMP :面向流文本的消息傳輸協議,是WebSocket通訊標準。
- 協議簡單、易於實現。
- JMS:面向Java平台的標準消息傳遞API
- 在JVM語言(如Scala)上具又互用性
- 支援事務
- 有queue和topic兩種傳遞模型
- 能夠定義消息格式(消息頭、屬性、內容)
- 無須擔心底層協議
- AMQP
JMS可以在任何的底層協議上運行,但是API是與程式語言綁定的。而AMQP能夠支援兩種不同的程式語言使用它來傳遞消息。
- 獨立於平台的底層消息傳遞協議
- 跨語言和平台
- 它是底層協議的
- 支援事務、分散式事務
- 有derect、fanout、topic、headers、system五種交換類型
- 支援長周期消息傳遞
- 使用SASL和TLS確保安全性
- MQTT : 專為小設備設計的,是物聯網生態中主要成分之一。
- 面向流,記憶體佔用低
- 低頻寬下能發送消息。
- 不允許分段消息(很難發送長消息)
- 主持主題發布-訂閱
- 不支援事務
- 不支援安全連接
- XMPP:一個開源形式組織產生的網路即時通訊協議
- 基於XML的協議
- 簡單的客戶端,將大多數工作都放在服務端進行
- Open Messaging:由阿里、雅虎、滴滴等公司共同參與創立的分散式消息中間件開發標準
- 結構簡單
- 解析速度快
- 支援事務和持久化設計
為什麼消息中間件不使用http協議呢?
- 因為http請求報文和響應報文是比較複雜的,包含了cookie、狀態碼、加解密等等附加的功能,但是對於消息來說不需要這麼複雜。
- 大部分情況下http是短連接,在實際的交互中,如果請求到響應的過程中中斷了,中斷之後就不會持久化,就會造成消息丟失。
三、消息中間件
三代消息中間件:
- ActiveMQ、RabbitMQ
- Kafka、RocketMQ
- Apache Pulsar
1. ActiveMQ介紹
- 高可靠、事務性的消息隊列
- 當前應用最廣泛的開源消息中間件
- 功能最全的開源消息隊列
主要功能:
- 多種語言和協議編寫客戶端
語言:Java、C、C++、C#、Ruby、Perl、Python、PHP
協議:OpenWire、Stomp Rest、WS Notification、XMPP、AMQP、MQTT
- 完全支援JMS1.1和J2EE 1.4規範
- 與Spring很好的集成
- 支援多種傳送協議:TCP、SSL、NIO、UDP、JGroups、JXTA
- 支援通過JDBC和journal提供高速的消息持久化
- 實現了高性能的集群模式
使用場景:
- 所有需要使用消息隊列的地方
- 訂單處理、消息通知、服務降級等
- 純java實現,可以支援嵌入到應用系統
使用教程
-
activemq下載鏈接;//archive.apache.org/dist/activemq/
-
下載最新版解壓,然後進入apache-activemq-5.16.3-bin\apache-activemq-5.16.3\bin\win64目錄,點擊activemq.bat啟動。
如果出現埠佔用,可以conf/activemq.xml修改埠。
- 引入依賴:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
- 點對點
package com.mmc.springbootstudy.activemq.p2p;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Producer {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("test-queue");
//生產者
MessageProducer producer = session.createProducer(queue);
//設置不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//發送消息
for (int i = 0; i < 1; i++) {
sendMsg(session,producer,i);
}
session.commit();
connection.close();
}
private static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
TextMessage textMessage = session.createTextMessage("Hello ActiveMQ " + i);
producer.send(textMessage);
}
}
package com.mmc.springbootstudy.activemq.p2p;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 接收者
*/
public class JmsReceiver {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("test-queue");
MessageConsumer consumer = session.createConsumer(queue);
while (true){
TextMessage receive = (TextMessage) consumer.receive();
System.out.println(receive.getText());
receive.acknowledge();
}
}
}
- 發布-訂閱
public class TopSend {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
TextMessage textMessage = session.createTextMessage("hello,topic");
Topic topic = session.createTopic("test-topic");
producer.send(topic,textMessage);
}
}
public class TopReceiver {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = session.createTopic("test-topic");
MessageConsumer consumer = session.createConsumer(topic);
while (true){
TextMessage receive = (TextMessage) consumer.receive();
System.out.println(receive.getText());
}
}
}
或者接受消息用監聽的方式。
public class TopReceiver {
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = session.createTopic("test-topic");
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println(((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
未完。。。下篇繼續講kafka