消息中間件-RabbitMQ
一、基礎知識
1. 什麼是RabbitMQ
RabbitMQ是2007年發布,是一個在AMQP(高級消息隊列協議)基礎上完成的,簡稱MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通訊方法,由Erlang(專門針對於大數據高並發的語言)語言開發,可復用的企業消息系統,是當前最主流的消息中間件之一,具有可靠性、靈活的路由、消息集群簡單、隊列高可用、多種協議的支援、管理介面、跟蹤機制以及插件機制。
2.什麼是消息和隊列
1.消息 就是數據,增刪改查的數據。例如在員工管理系統中增刪改查的數據
2.隊列 指的是一端進數據一端出數據,例如C#中(Queue數據結構)
3.什麼是消息隊列
1.消息隊列指:一端進消息,一端出消息
2.RabbitMQ就是實現了消息隊列概念的一個組件,以面向對象的思想去理解,消息隊列就是類,而RabbitMQ就是實例,當然不僅僅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以實現消息隊列。
4.什麼地方使用RabbitMQ
1.在常見的單體架構中,主要流程是用戶UI操作發起Http請求>伺服器處理>然後由伺服器直接和資料庫交互,最後同步回饋用戶結果
2.在微服務架構中,UI與微服務通訊,主要是通過Http或者gRPC同步通訊
問題分析
在上述2種情況下,我們發現在UI請求時都是同步操作 ,第2種架構雖然將整體服務按業務拆分成不同的微服務並且對應各自的資料庫,但是在用戶與微服務通訊時,存在的問題依然沒有解決,例如資料庫的承載能力只能處理10w個請求,如果遇到高並發情況下,UI發起50w請求,那資料庫是遠遠承載不了的,從而導致如下問題。
1.高並發請求導致系統性能下降響應慢,同時資料庫承載風險加大
2.擴展性不強UI操作的交互對業務的依賴較大,導致用戶體驗下降
3.瞬時流量湧入巨大的話,伺服器可能直接掛了
解決方案
- 為了解決性能瓶頸問題。我們需要將同步通訊換成非同步通訊方式。因此就使用消息隊列,用戶在UI中操作直接寫入RabbitMQ然後直接返回,剩下的業務操作由消息隊列和各自的微服務來完成
RabbitMQ的優勢
-
非同步處理,響應快,增加了資料庫(伺服器的承載能力)
-
削峰,可以把流量的高峰分解到不同的時間段來處理
-
解耦(擴展性就更強),讓UI和業務獨立演化
-
高可用,處理器如果發生故障了,對其他的處理器沒有影響
RabbitMQ的不足
-
增加了系統複雜性,不方便調試和開發,在使用RabbitMQ以前前端直接和服務交互,現在加了一層
-
即時性降低了,在某一程度上提升了用戶操作體驗,也降低了用戶體驗,但是避免不了,取長補短
-
更加依賴消息隊列了
5.RabbitMQ組成概念
1.ConnectionFactory 為Connection的製造工廠。
2.Connection是RabbitMQ的socket鏈接,它封裝了socket協議相關部分邏輯。
3.Channel是我們與RabbitMQ打交道的最重要的一個介面,我們大部分的業務操作是在Channel這個介面中完成的,包括定義Queue、定義Exchange、綁定Queue與Exchange、發布消息等。
4.Exchange(交換機) 我們通常認為生產者將消息投遞到Queue中,實際上實際的情況是,生產者將消息發送到Exchange,由Exchange將消息路由到一個或多個Queue中(或者丟棄),而在RabbitMQ中的Exchange一共有4種策略,分別為:fanout(扇形)、direct(直連)、topic(主題)、headers(頭部)
二、如何落地RabbitMQ
1.RabbitMQ環境安裝
3.安裝完成之後,載入RabbitMQ管理插件
rabbitmq-plugins enable rabbitmq_management
4.安裝成功訪問RabbitMQ管理後台//localhost:15672
2.創建系統業務
1.分別創建考勤服務,請假服務,計算薪酬服務,郵件服務,簡訊服務消費者角色
2.創建員工管理網站用於模擬前端調用,主要充當生產者角色
3.在員工管理網站和每一個模擬微服務中通過nuget引入RabbitMQ.Client
4.在員工管理網站中創建模擬添加考勤的控制器並加入生產者程式碼
//創建連接
using (var connection = factory.CreateConnection())
{
//創建通道
var channel = connection.CreateModel();
//定義隊列
channel.QueueDeclare("CreateAttendance", false, false, false, null);
string json = JsonConvert.SerializeObject(attendanceDto);
//創建內容對象
var properties = channel.CreateBasicProperties();
//發送消息
channel.BasicPublish(exchange: "",routingKey: "CreateAttendance",basicProperties: properties,body: Encoding.UTF8.GetBytes(json));
}
5.在考勤微服務中創建介面,並在介面中加入消費者程式碼
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//創建消費者事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
// 1、邏輯程式碼,添加到資料庫
var message = Encoding.UTF8.GetString(body.ToArray());
object json = JsonConvert.DeserializeObject(message);
Console.WriteLine(" [x] 創建考勤資訊 {0}", message);
};
//設置消費者屬性
//p1.監聽隊列p2.消息確認ACK p3.消費者實例賦值
channel.BasicConsume(queue: "CreateAttendance",autoAck: false,consumer:consumer);
三、Exchange交換機及實例分析
1.Fanout Exchange (扇形交換機)
fanout類型的Exchange路由規則非常簡單,工作方式類似於多播一對多,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中。
1.生產者一個Exchange對應多個Queue,或者不聲明Queue
2.消費者定義Exchange,如果生產者定義了Queue,那必須將exchange和queue綁定,如果沒有定義隊列,那消費者自己聲明一個隨機Queue用於接收消費消息
業務實例
當我們有員工需要請假,在員工管理系統提交請假,但是由於公司規定普通員工請假,需要發送簡訊到他的主管領導,針對此業務場景我們需要調用請假服務的同時去發送簡訊,這時需要兩個消費者(請假服務,簡訊服務)來消費同一條消息,其實本質就是往RabbitMQ寫入一個能被多個消費者接收的消息,所以可以使用 扇形交換機
,一個生產者,多個消費者.
生產者模擬使用調用控制器來實現
[HttpPost]
public IEnumerable<bool> CreateLeave(CreateLeaveDto createLeaveDto)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//定義交換機
channel.ExchangeDeclare(exchange: "Leave_fanout", type: "fanout");
string productJson = JsonConvert.SerializeObject(createLeaveDto);
var body = Encoding.UTF8.GetBytes(productJson);
var properties = channel.CreateBasicProperties();
//設置消息持久化
properties.Persistent = true;
channel.BasicPublish(exchange: "Leave_fanout", routingKey: "", basicProperties: properties,body: body);
}
}
消費者實現IHostedService 介面創建一個監聽主機
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定義交換機
channel.ExchangeDeclare(exchange: "Leave_fanout", type: ExchangeType.Fanout);
//定義隨機隊列
var queueName = channel.QueueDeclare().QueueName;
//隊列和交換機綁定
channel.QueueBind(queueName,"Leave_fanout",routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、業務邏輯
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 創建請假 {0}", message);
// 1、自動確認機制缺陷,消息是否正常添加到資料庫當中,所以需要使用手工確認
channel.BasicAck(ea.DeliveryTag, true);
};
// Qos(防止多個消費者,能力不一致,導致的系統品質問題。
// 每一次一個消費者只成功消費一個)
channel.BasicQos(0, 1, false);
// 消息確認(防止消息消費失敗)
channel.BasicConsume(queue: queueName ,autoAck: false,consumer: consumer);
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、關閉rabbitmq的連接
throw new NotImplementedException();
}
}
2.Direct Exchange (直連交換機)
直接交換器,工作方式類似於單播一對一,Exchange會將消息發送完全匹配ROUTING_KEY的Queue,缺陷是無法實現多生產者對一個消費者
1.生產者一個Exchange對應一個routingKey綁定,也可以聲明隊列並綁定,然後向指定的隊列發送消息。
2.消費者需要定義Exchange和routingKey,如果生產者聲明並綁定了隊列,那消費者必須綁定生產者指定的Queue來接收消息,如果沒有指定Queue,那消費者需要自己聲明一個隨機Queue然後綁定用於接收消息
當我們員工管理系統需要計算薪資並將結果以發送簡訊的方式告訴員工,這個時候我們就不太適合用「扇形交換機」了,因為換做是你,你也不想你的工資全公司都知道吧?這個時候就需要訂製了一對一的場景了,那就在生產消息時使用直連交換機
根據routingKey發送指定的消費者.
生產者模擬使用調用控制器來實現
public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "admin",
UserName = "admin",
VirtualHost = "/"
};
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義交換機
channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: "direct");
string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);
//3、發送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設置消息持久化
//p1 指定交換機
//p2 routingKey
channel.BasicPublish(exchange: "CalculateSalary_direct",routingKey: "product-sms",basicProperties: properties,body: body);
}
}
消費者實現IHostedService 介面創建一個監聽主機
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定義交換機
channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: ExchangeType.Direct);
// 2、定義隨機隊列
var queueName = channel.QueueDeclare().QueueName;
// 3、隊列要和交換機綁定起來
channel.QueueBind(queueName,"CalculateSalary_direct",routingKey: "product-sms");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、業務邏輯
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 發送簡訊 {0}", message);
// 1、消息是否正常添加到資料庫當中,所以需要使用手工確認
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消費消息
channel.BasicQos(0, 1, false); // Qos(防止多個消費者,能力不一致,導致的系統品質問題。
// autoAck設為false 不進行自動確認
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、關閉rabbitmq的連接
throw new NotImplementedException();
}
}
3.Topic Exchange (主題交換機)
Exchange綁定隊列需要制定Key; Key 可以有自己的規則;Key可以有佔位符;或者# ,匹配一個單詞、#匹配多個單詞,在Direct基礎上加上模糊匹配;多生產者一個消費者,可以多對對,也可以多對1, 真實項目當中,使用主題交換機。可以滿足所有場景
1.生產者定義Exchange,然後不同的routingKey綁定
2.消費者定義Exchange,如果生產者定義了Queue,那必須將exchange和queue以及routingKey綁定,如果沒有定義隊列,那消費者自己聲明一個隨機Queue用於接收消費消息,
3.消費者routingKey的模糊匹配,生產者發送消息時routingKey定義以sms.開頭, * 號只能匹配的routingKey為一級,例如(sms.A)或(sms.B)的發送的消息,# 能夠匹配的routingKey為一級及多級以上 ,例如 (sms.A)或者(sms.A.QWE.IOP)
在月底的時候我們需要把員工存在異常考勤資訊,薪資結算資訊,請假資訊分別以郵件的形式發送給我們的員工查閱,我們知道這是一個典型的多個生產者,一個消費者場景,異常考勤資訊,薪資結算資訊,請假資訊分別需要生產消息發送到RabbitMQ,然後供我們員工消費
分別模擬3個生產者:異常考勤資訊,薪資結算資訊,請假資訊
var factory = new ConnectionFactory()
{
HostName = "192.168.0.106",
Port = 5672,
Password = "admin",
UserName = "admin",
VirtualHost = "/"
};
//計算薪資生產者
public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義topic交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);
//3、發送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設置消息持久化
//p1 指定交換機
//p2 routingKey
channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateSalary",basicProperties: properties,body: body);
}
}
//考勤生產者
public IEnumerable<bool> SendCalculateAttendance(CalculateAttendanceDto calculateAttendance)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義topic交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);
//3、發送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設置消息持久化
//p1 指定交換機
//p2 routingKey
channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);
}
}
//請假資訊生產者
public IEnumerable<bool> SendCalculateLeave(CalculateLeaveDto calculateLeave)
{
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義topic交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateLeaveJson = JsonConvert.SerializeObject(calculateLeave);
var body = Encoding.UTF8.GetBytes(calculateLeaveJson);
//3、發送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設置消息持久化
//p1 指定交換機
//p2 routingKey
channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);
}
}
public class RabbitmqHostService : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
Password = "guest",
UserName = "guest",
VirtualHost = "/"
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
// 1、定義交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic);
// 2、定義隨機隊列
var queueName = channel.QueueDeclare().QueueName;
// 3、隊列要和交換機綁定起來
// * 號的缺陷:只能匹配一級
// # 能夠匹配一級及多級以上
channel.QueueBind(queueName,"sms_topic",routingKey: "sms.#");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Console.WriteLine($"model:{model}");
var body = ea.Body;
// 1、業務邏輯
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] 發送簡訊 {0}", message);
// 1、消息是否正常添加到資料庫當中,所以需要使用手工確認
channel.BasicAck(ea.DeliveryTag, true);
};
// 3、消費消息
channel.BasicQos(0, 1, false); // Qos(防止多個消費者,能力不一致,導致的系統品質問題。
// autoAck設為false 不進行自動確認
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
}
public Task StopAsync(CancellationToken cancellationToken)
{
// 1、關閉rabbitmq的連接
throw new NotImplementedException();
}
}
4.Header Exchange(頭部交換機)
headers類型的Exchange不依賴於routing key與binding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對以及x-match參數,x-match參數是字元串類型,可以設置為any或者all。如果設置為any,意思就是只要匹配到了headers表中的任何一對鍵值即可,all則代表需要全部匹配。
1.不需要依賴Key
2.更多的時候,像這種Key Value 的鍵值,可能會存儲在資料庫中,那麼我們就可以定義一個動態規則來拼裝這個Key value ,從而達到消息靈活轉發到不同的隊列中去
四、RabbitMQ消息確認
我們根據上面的業務和程式碼簡單實現了由生產者到消費者的一個業務流程,我們可以總結出知道,整個消息的收發過程包含有三個角色,生產者(員工管理網站)、RabbitMQ(Broker)、消費者(微服務),在理想狀態下,按照這樣實現,整個流程以及系統的穩定性,可能不會發生太大的問題,但是真正在實際應用中我們要去思考可能存在的問題,主要從三個大的方面去分析,然後發散。
1.生產端
2.存儲端
3.消費端
1.消息生產端
我們在給RabbitMQ發送消息時,如何去保證消息一定到達呢,我們可以使用RabbitMQ提供了2種生產端的消息確認機制
模式 | 描述 | 實現方式 |
---|---|---|
Confirm模式 | 應答模式,生產者發送一條消息之後,Rabbitmq伺服器做了個響應,表示消息確認收到 | 非同步模式,在應答之前,可以繼續發送消息,單條消息、批量消息 |
Tx事務模式 | 基於AMQP協議;可以把channel 設置成一個帶事務的通道道,分為三步:1.開啟事務,提交事務,回滾事務 | 同步模式,在事務提交之前不能繼續發送消息,事務模式效率差一些 |
1.Confirm 實現
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義topic交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);
//3、發送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設置消息持久化
try
{
//開啟消息確認模式
channel.ConfirmSelect();
channel.BasicPublish(exchange: "sms_topic",
routingKey: "sms.CalculateAttendance", basicProperties: properties, body: body);
//如果一條消息或多消息都確認發送
if (channel.WaitForConfirms())
{
Console.WriteLine($"【{message}】發送到Broke成功!");
}
else
{
//可以記錄個日誌,重試一下;
}
//如果所有消息發送成功 就正常執行;如果有消息發送失敗;就拋出異常;
channel.WaitForConfirmsOrDie();
}
catch (Exception ex)
{
Console.WriteLine($"【{message}】發送到Broker失敗!");
}
}
2.Tx事務 實現
using (var connection = factory.CreateConnection())
{
var channel = connection.CreateModel();
//2、定義topic交換機
channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);
//3、發送消息
var properties = channel.CreateBasicProperties();
properties.Persistent = true; // 設置消息持久化
try
{
//開啟事務機制,AMQP協議支援
channel.TxSelect(); //事務是協議支援的
channel.BasicPublish(exchange: "sms_topic",
routingKey: "sms.CalculateAttendance", basicProperties: properties, body: body);
//提交事務 只有事務提交了才會真正寫入隊列
channel.TxCommit();
}
catch (Exception ex)
{
//事務回滾
channel.TxRollback();
}
}
2.消息存儲端
我們生產端給RabbitMQ發送消息成功後,如果RabbitMQ宕機了,會導致RabbitMQ中消息丟失,如何解決消息丟失問題,針對RabbitMQ消息丟失,我們可以在生產者中使用
1.持久化消息
2.集群
3.消息消費端
-
1.消費者宕機,導致消息丟失
-
2.執行業務邏輯失敗,但是消息已經被消費
當生產者寫入消息到RabbitMQ後,消費服務接收消息期間,伺服器宕機,導致消息丟失了,這個時候我們就應該使用RabbitMQ的消費端消息確認機制
模式 | 描述 | 特點 |
---|---|---|
自動確認 autoAck | 自動確認,是消費消息的時候,只要收到消息,就直接回執給RabbitMQ,已經收到一切正常; 直接總覽所有了,如果有1w條消息,只是消費成功了一條消息,RabbitMQ也會認為你是全部成功了,會將所有消息從隊列中移除;這樣會導致消息的丟失 | 處理很快 |
手動確認 | 消費者消費一條,回執給RabbitMQ一條消息,RabbitMQ 只刪除當前這一條消息,相當於是一條消費了,刪除一條消息; | 性能稍微低一些 |
1.自動確認
// 消息自動確認機制
channel.BasicConsume(queue: "CreateAttendance",autoAck: true, consumer: consumer);
2.手動確認
消費者收到消息。消費者發送確認消息給rabbitmq期間。執行業務邏輯失敗了,但是消息已經確認被消費了,我們應該在我們的消費者接收消息回調執行業務邏輯後面,執行使用手動確認消息機制,保證消息不被丟失
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName,"sms_topic",routingKey: "sms.#");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
//執行業務邏輯
//手工確認告訴borker可以刪除消息了
channel.BasicAck(ea.DeliveryTag, true);
//否定:告訴Broker,這個消息我沒有正常消費; requeue: true:重新寫入到隊列里去; false:你還是刪除掉;
//channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
};
// autoAck設為false 不進行自動確認
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
-
3.由於伺服器性能不一致導致消息堆積
生產者發送高並發消息,消費者來不及處理,導致消息堆積,如何解決消息堆積問題?可以使用消費服務集群,將壓力分散到不同的服務實例能解決這個問題,但是又產生了一個新的集群缺陷問題,假設集群伺服器的強弱不一致,比較弱的伺服器處理消息慢,就會導致大部分消息堆積在這台性能較差的伺服器,那又該如何解決呢?
我們可以採用RabbitMQ的QOS
功能,俗稱限流,他的意思就是消費者一次可以拉取指定數量的消息,在這些消息未處理完畢之前,不會再向隊列拉取消息。
// Qos(防止多個消費者,能力不一致,導致的系統品質問題。
// 每一次一個消費者只成功消費一個)
channel.BasicQos(0, 1, false);
-
4.如何保證消息不被重複消費(冪等性)
1.生產時消息重複
由於生產者發送消息給MQ,在MQ確認的時候出現了網路波動,生產者沒有收到確認,實際上MQ
已經接收到了消息。這時候生產者就會重新發送一遍這條消息。生產者中如果消息未被確認,或確
認失敗,我們可以使用定時任務+(redis/db)來進行消息重試。2.消費時消息重複
消費者消費成功後,再給MQ確認的時候出現了網路波動,MQ沒有接收到確認,為了保證消息被消費,MQ就會繼續給消費者投遞之前的消息。這時候消費者就接收到了兩條一樣的消息。
我們可以讓每個消息攜帶一個全局的唯一ID,即可保證消息的冪等性消費者獲取到消息後先根據id去查詢redis/db是否存在該消息。如果不存在,則正常消費,消費完畢後寫入redis/db。
如果存在,則證明消息被消費過,直接丟棄。