消息隊列助你成為高薪 Node.js 工程師
- 2019 年 12 月 6 日
- 筆記
為什麼寫這篇文章
- 現在的面試要求越來越高了,打開看了看幾個 BOSS 招聘 Node.js 全棧開發的,其中都有一條「了解 消息隊列,並在項目中應用過」,嗚嗚嗚
- 後端開發者應該都知道消息隊列,但是一些前端開發者可能知道的並不多,但是你們可能好奇
搶票,商品秒殺
等功能是如何實現的,其實沒有多麼高大上,看了消息隊列就知道了。
文章導圖(你能學到)

什麼是消息隊列
「消息隊列」是在消息的傳輸過程中保存消息的容器。
個人理解:我把它分成兩個詞消息
和隊列
。當一大批客戶端同時產生大量的網路請求(消息
)時候,伺服器的承受能力肯定是有一個限制的。這時候要是有個容器,先讓這些消息排隊就好了,還好有個叫隊列
的數據結構,通過有隊列屬性的容器
排隊(先進先出),把消息再傳到我們的伺服器,壓力減小了好多,這個很棒的容器
就是消息隊列
這段理解中還包含這個兩個概念: 客戶端->生產者
伺服器->消費者
當有消息隊列
出現,生產者
和消費者
是必不可少的兩個概念,上面的理解是多個生產者
對應一個消費者
,當然現實開發中還有許多消費者
的情況哦。接下來的文章也會多次提到生產-消費模型
。
消息隊列優勢
- 應用解耦 消息隊列可以使消費者和生產者直接互不干涉,互不影響,只需要把消息發送到隊列即可,而且可獨立的擴展或修改兩邊的處理過程,只要能確保它們遵守同樣的介面約定,可以生產者用Node.js實現,消費者用python實現。
- 靈活性和峰值處理能力 當客戶端訪問量突然劇增,對伺服器的訪問已經超過服務所能處理的最大峰值,甚至導致伺服器超時負載崩潰,使用消息隊列可以解決這個問題,可以通過
控制消費者的處理速度
和生產者可進入消息隊列的數量
等來避免峰值問題 - 排序保證 消息隊列可以控制數據處理的順序,因為消息隊列本身使用的是隊列這個數據結構,
FIFO
(先進選出),在一些場景數據處理的順序很重要,比如商品下單順序等。 - 非同步通訊 消息隊列中的有些消息,並不需要立即處理,消息隊列提供了非同步處理機制,可以把消息放在隊列中並不立即處理,需要的時候處理,或者非同步慢慢處理,一些不重要的發送簡訊和郵箱功能可以使用。
- 可擴展性 前面提到了消息隊列可以做到
解耦
,如果我們想增強消息入隊和出隊的處理頻率,很簡單,並不需要改變程式碼中任何內容,可以直接對消息隊列修改一些配置即可,比如我們想限制每次發送給消費者的消息條數等。
有優勢定有它現實的應用場景,文章後面會針對優勢講它們對應的應用場景。
消息隊列的類型介紹
介紹幾款目前市場上主流的消息隊列(課外知識,可忽略)
- Kafka:是由 Apache 軟體基金會開發的一個開源流處理平台,由 Scala 和 Java 編寫,是一種高吞吐量的分散式發布訂閱消息系統,支援單機每秒百萬並發。另外,Kafka 的定位主要在日誌等方面, 因為Kafka 設計的初衷就是
處理日誌
的,可以看做是一個日誌(消息)系統
一個重要組件,針對性很強。0.8 版本開始支援複製,不支援事務,因此對消息的重複、丟失、錯誤沒有嚴格的要求。 - RocketMQ:阿里開源的消息中間件,是一款低延遲、高可靠、可伸縮、易於使用的消息中間件,思路起源於 Kafka。最大的問題商業版收費,有些功能不開放。
- RabbitMQ:由 Erlang(有著和原生 Socket 一樣低的延遲)語言開發基於 AMQP 協議的開源消息隊列系統。能保證消息的可靠性、穩定性、安全性。
高並發
的特性,毋庸置疑,RabbitMQ 最高,原因是它的實現語言是天生具備高並發高可用的erlang 語言,天生的分散式
優勢。
說明:本文主要以RabbitMQ講解,較為常見。個人認為這幾種消息隊列中間件能實現的功能,通過 redis 也都能實現,思想。
初識消息隊列(消息隊列在node.js中的簡單應用)
Rabbitmq基本安裝
Mac版安裝
直接通過 HomeBrew 安裝,執行以下命令
brew install rabbitmq
啟動 rabbitmq
進入安裝目錄 $ /usr/local/Cellar/rabbitmq/3.7.8 啟動 $ sbin/rabbitmq-server
瀏覽器輸入 http://localhost:15672/#/ 默認用戶名密碼 guest
安裝後的基本示意圖

可視化介面可模組功能介紹:
其他系統安裝請自行網上搜索
幾個埠區別說明
5672:通訊默認埠號 15672:管理控制台默認埠號 25672:集群通訊埠號 注意: 阿里雲 ECS 伺服器如果出現 RabbitMQ 安裝成功,外網不能訪問是因為安全組的問題沒有開放埠 解決方案
Rabbitmq安裝後的基本命令
以下列舉一些在終端常用的操作命令
- whereis rabbitmq:查看 rabbitmq 安裝位置
- rabbitmqctl start_app:啟動應用
- whereis erlang:查看erlang安裝位置
- rabbitmqctl start_app:啟動應用
- rabbitmqctl stop_app:關閉應用
- rabbitmqctl status:節點狀態
- rabbitmqctl add_user username password:添加用戶
- rabbitmqctl list_users:列出所有用戶
- rabbitmqctl delete_user username:刪除用戶
- rabbitmqctl add_vhost vhostpath:創建虛擬主機
- rabbitmqctl list_vhosts:列出所有虛擬主機
- rabbitmqctl list_queues:查看所有隊列
- rabbitmqctl -p vhostpath purge_queue blue:清除隊列里消息
注意:以上終端所有命令,需要進入到rabbitmqctl的sbin目錄下執行rabbitmqctl命令才有用,否則會報錯:

Node.js實現一個簡單的 HelloWorld 消息隊列
畫一張基本的圖,HelloWorld 消息隊列的圖片,把下面幾個概念都畫進去。
看這段程式碼前先說幾個概念
- 生產者 :生產消息的
- 消費者 :接收消息的
- 通道 channel:建立連接後,會獲取一個 channel 通道
- exchange :交換機,消息需要先發送到 exchange 交換機,也可以說是第一步存儲消息的地方(交換機會有很多類型,後面會詳細說)。
- 消息隊列 : 到達消費者前一刻存儲消息的地方,exchange 交換機會把消息傳遞到此
- ack回執:收到消息後確認消息已經消費的應答
amqplib模組
推薦一個 npm 模組amqplib
。
Github: https://github.com/squaremo/amqp.node
$ npm install amqplib
生產者程式碼 product.js
const amqp =require('amqplib'); async function product(params) { // 1. 創建鏈接對象 const connection = await amqp.connect('amqp://localhost:5672'); // 2. 獲取通道 const channel = await connection.createChannel(); // 3. 聲明參數 const routingKey = 'helloKoalaQueue'; const msg = 'hello koala'; for (let i=0; i<10000; i++) { // 4. 發送消息 await channel.publish('', routingKey, Buffer.from(`${msg} 第${i}條消息`)); } // 5. 關閉通道 await channel.close(); // 6. 關閉連接 await connect.close(); } product();
生產者程式碼解釋與運行結果
執行 node product.js
程式碼注釋中已經把基本的流程講解了,但是我剛開始看的時候還有疑問,我想很多小夥伴也會有疑問,說明下:
- 疑問1 前面提到過交換機這個名詞,
生產者
發消息的時候必須要指定一個 exchange,若不指定 exchange(為空)會默認指向 AMQP default 交換機,AMQP default 路由規則是根據 routingKey 和 mq 上有沒有相同名字的隊列進行匹配路由。上面這段程式碼就是默認指定的交換機。不同類型交換機詳細講解請往下看。 - 疑問2 生產者發送消息後,消息是發送到交換機exchange,但是這時候會創建隊列嗎? 答案:程式碼中我們聲明的是路由是routingKey,但是它並沒有創建helloKoalaQueue 消息隊列,消息只會發送到交exchange交換機。 運行程式碼後看隊列截圖可以證明這一點:
- 說明1 生產者發送消息後,注意關閉通道和連接,只要消息發送成功後,連接就可以關閉了,消費者用任何語言去獲取消息都可以,這也證明了消息隊列優秀
解耦
的特性 - 說明2 可以多次執行
node product.js
生產者程式碼,消息會堆積到交換機exchange
中,並不會覆蓋,如果已執行過消費者並且確認了對應的消息隊列
,消息會從exchange交換機
發送到消息隊列
,並存入到消息隊列
,等待消費者
消費

消費者程式碼 consumer.js
// 構建消費者 const amqp = require('amqplib'); async function consumer() { // 1. 創建鏈接對象 const connection = await amqp.connect('amqp://localhost:5672'); // 2. 獲取通道 const channel = await connection.createChannel(); // 3. 聲明參數 const queueName = 'helloKoalaQueue'; // 4. 聲明隊列,交換機默認為 AMQP default await channel.assertQueue(queueName); // 5. 消費 await channel.consume(queueName, msg => { console.log('Consumer:', msg.content.toString()); channel.ack(msg); }); } consumer();
生產者程式碼解釋與運行結果
執行 node consumer.js
- 運行後的執行結果

- 說明1 這時候我改變程式碼中的隊列名稱為
helloKoalaQueueHaHa
,這時候去看Rabbitmq可視化介面中,隊列模組,創建了這個隊列

看到這裡再次證明了消息隊列優秀的解耦特性
,消費者和生產者模型
之間沒有任何聯繫,再次創建這個helloKoalaQueueHaHa
路由名稱的生產者,消費者
也會正常消費,並且會列印消息,大家可以實際操作試一下。
- 說明2 這時候我改變程式碼中的隊列名稱為
helloKoalaQueueHaHa
,這時候去看Rabbitmq可視化介面中,隊列模組,創建了這個隊列

看到這裡又再次證明了消息隊列優秀的解耦特性
,消費者和生產者模型
之間沒有任何聯繫,再次創建這個helloKoalaQueueHaHa
路由名稱的生產者,消費者
也會正常消費,並且會列印消息,大家可以實際操作試一下。
如何釋放掉消息隊列
可視化介面中直接刪除掉消息隊列
- 訪問http://{rabbitmq安裝IP}:15672,登錄。
- 點擊queues,這裡可以看到你創建的所有的Queue,
- 選中某一個Queue,然後會進入一個列表介面,下方有個Delete按鈕,確認 Queue刪除隊列/Purge Message清除消息即可。
弊端:這樣只能一個隊列一個隊列的刪除,如果隊列中的消息過多就會特別慢。
通過程式碼實現消息隊列釋放(刪除)
消息隊列交換機講解
先記住一句話
生產者發消息的時候必須指定一個 exchange,否則消息無法直接到達消息隊列,Exchange將消息路由到一個或多個Queue中(或者丟棄)
然後開始本章節交換機的講解
若不指定 exchange(為空)會默認指向 AMQP default 交換機,AMQP default 路由規則是根據 routingKey 和 mq 上有沒有相同名字的隊列進行匹配路由。
交換機的種類
常用的四種類型
- fanout
- direct
- topic
- headers
不管是哪一種類型的交換機,都有一個綁定binding的操作,只不過根據不同的交換機類型有不同的路由綁定策略。不同類型做的下圖紅色框框中的事。

fanout(中文翻譯 廣播)
fanout類型的Exchange路由規則非常簡單,它會把所有發送到該Exchange的消息路由到所有與它綁定的Queue中,不需要設置路由鍵。

上圖中,上圖中,生產者(Producter)發送到Exchange(X)的所有消息都會路由到圖中的兩個Queue,並最終被兩個消費者(consumer1與consumer2)消費。
說明:所有消息都會路由到兩個Queue中,是兩個消費者都可以收到全部的完全相同的消息嗎? 答案是的,兩個消費者收到的隊列消息正常應該是完全相同的。這種類型常用於廣播類型的需求,或者也可以消費者1記錄日誌 ,消費者2列印日誌
對應程式碼實現:
生產者:
const amqp = require('amqplib'); async function producer() { // 創建鏈接對象 const connection = await amqp.connect('amqp://localhost:5672'); // 獲取通道 const channel = await connection.createChannel(); // 聲明參數 const exchangeName = 'fanout_koala_exchange'; const routingKey = ''; const msg = 'hello koala'; // 交換機 await channel.assertExchange(exchangeName, 'fanout', { durable: true, }); // 發送消息 await channel.publish(exchangeName, routingKey, Buffer.from(msg)); // 關閉鏈接 await channel.close(); await connection.close(); } producer();
消費者:
const amqp = require('amqplib'); async function consumer() { // 創建鏈接對象 const connection = await amqp.connect('amqp://localhost:5672'); // 獲取通道 const channel = await connection.createChannel(); // 聲明參數 const exchangeName = 'fanout_koala_exchange'; const queueName = 'fanout_kaola_queue'; const routingKey = ''; // 聲明一個交換機 await channel.assertExchange(exchangeName, 'fanout', { durable: true }); // 聲明一個隊列 await channel.assertQueue(queueName); // 綁定關係(隊列、交換機、路由鍵) await channel.bindQueue(queueName, exchangeName, routingKey); // 消費 await channel.consume(queueName, msg => { console.log('Consumer:', msg.content.toString()); channel.ack(msg); }); console.log('消費端啟動成功!'); } consumer();
注意:其他類型程式碼已經放到 github,地址:https://github.com/koala-coding/simple_rabbitmq 歡迎 star 交流。
direct
direct 把消息路由到那些 binding key與 routing key 完全匹配的 Queue中。

以上圖的配置為例,我們以 routingKey=」error」 發送消息到Exchange,則消息會路由到 amq1 和 amq2;如果我們以 routingKey=」info」 或 routingKey=」warning」 來發送消息,則消息只會路由到 Queue2。如果我們以其他 routingKey 發送消息,則消息不會路由到這兩個 Queue 中。
topic
生產者指定 RoutingKey 消息根據消費端指定的隊列通過模糊匹配的方式進行相應轉發,兩種通配符模式: #:可匹配一個或多個關鍵字 *:只能匹配一個關鍵字

headers
header exchange(頭交換機)和主題交換機有點相似,但是不同於主題交換機的路由是基於路由鍵,頭交換機的路由值基於消息的 header 數據。 主題交換機路由鍵只有是字元串,而頭交換機可以是整型和哈希值 header Exchange 類型用的比較少,可以自行 google 了解。
消息隊列的思考與深入探索
消息隊列實現rpc
(本小段內容來源網上,參考文章說明)

RPC 遠程調用服務端的方法,使用 MQ 可以實現 RPC 的非同步調用,基於 Direct 交換機實現
- 客戶端即是生產者又是消費者,向 RPC 請求隊列發送 RPC 調用消息,同時監聽 RPC 響應隊列
- 服務端監聽RPC請求隊列,收到消息後執行服務端的方法
- 服務端將方法執行後的結果發送到RPC響應隊列
(注意,這裡只是提一下 RPC 這個知識,因為單單一個RPC一篇文章都不一定說說完,有興趣的可以用隊列嘗試一下RPC)
是否有消息持久化的必要?
消息隊列
是存在記憶體中的,如果出現問題掛掉,消息隊列中的消息會丟失。所以對於一些需求非常有持久化的必要!RabbitMQ 可以開啟持久化。不同開發語言都可以設置持久化參數。
這裡以Node.js為例子,其他語言可以自行搜索
await channel.assertExchange(exchangeName, 'direct', { durable: true }); // 注意其中的{ durable: true },這事對交換機持久化,還有其他的幾種持久化方式
同時推薦一篇不錯的寫持久化的文章: https://juejin.im/post/5d6f6b0ae51d45621512add0
消費者完成後是否有消息應答的必要?
消息應答簡單的解釋就是消費者
完成了消費後,通知一下消息隊列。
我覺得這個配置是有必要打開的,消費者完成消息隊列中的任務,消費者可能中途失敗或者掛掉,一旦 RabbitMQ 發送一個消息給消費者然後便迅速將該消息從消息隊列記憶體
中移除,這種情況下,消費者對應工作進程失敗或者掛掉後,那該進程正在處理的消息也將丟失。而且,也將丟失所有發送給該進程的未被處理的消息。
為了確保消息永不丟失,RabbitMQ 支援消息應答機制。當消息被接受,處理之後一條應答便會從消費者回傳至發送方,然後RabbitMQ將其刪除。
如果某個消費者掛掉(信道、鏈接關閉或者 tcp 鏈接丟失)且沒有發送 ack 應答,RabbitMQ 會認為該消息沒有被處理完全然後會將其重新放置到隊列中。通過這種方式你就可以確保消息永不丟失,甚至某個工作進程偶然掛掉的情況。
默認情況下消息應答是關閉的。是時候使用 false(auto-ack配置項)參數將其開啟了
這裡以 Node.js 為例子,其他語言可以自行搜索
// 消費者消費時候的程式碼 await channel.consume(queueName, msg => { console.log('koala:', msg.content.toString()); //... 這裡可以放業務邏輯處理的程式碼,消費者完成後發送回執應答 channel.ack(msg);// 消息應答 }, { noAck: false });
如何實現公平調度?
可以將prefetch count
項的值配置為1,這將會指示 RabbitMQ 在同一時間不要發送超過一條消息給每個消費者。換句話說,直到消息被處理和應答之前都不會發送給該消費者任何消息。取而代之的是,它將會發送消息至下一個比較閑的消費者或工作進程。
這裡以 Node.js 為例子,amqplib 庫對於限流實現提供的介面方法 prefetch。
prefetch 參數說明:
- count:每次推送給消費端 N 條消息數目,如果這 N 條消息沒有被ack,生產端將不會再次推送直到這 N 條消息被消費。
- global:在哪個級別上做限制,ture 為 channel 上做限制,false 為消費端上做限制,默認為 false。
// 創建消費者的時候 限流參數設置 await channel.prefetch(1, false);
如何實現一個交換機給多個消費者依次發送消息,選擇那種交換機?
如果一個生產者,兩個消費者,發放消息,我想要的隊列先給消費者1發,發完消費者1發消費者2,這樣有順序的交互發送,應該現在哪一種交換機呢?注意是交互,看完之後想一下?還有消費者完成後有沒有手動回調消息隊列完成的必要?消息持久化有必要沒,持久化有什麼好處?
(看完消息隊列的消息傳遞,你會有疑問管道中的消息(生產者)是怎麼被消費者消費的 放入隊列,然後從隊列被取出)
消息隊列應用場景
- 雙十一商品秒殺/搶票功能實現 我們在雙11的時候,當我們凌晨大量的秒殺和搶購商品,然後去結算的時候,就會發現,介面會提醒我們,讓我們稍等,以及一些友好的圖片文字提醒。而不是像前幾年的時代,動不動就頁面卡死,報錯等來呈現給用戶。 用一張圖來解釋消息隊列在秒殺搶票等場景的使用:(說明:往下看之前,如果你做過電商類秒殺,可以想想你是怎麼實現的,我們可以一起討論哦。這裡只是想說下消息隊列的作用,並不是最終優化的結果,比如用redis控制總快取等)

這裡在生成訂單時候,不需要直接操作資料庫 IO ,預扣庫存。先扣除了庫存,保證不超賣,然後非同步生成用戶訂單,這裡用到一次即時消費隊列
,這樣響應給用戶的速度就會快很多;而且還要保證不少賣,用戶拿到了訂單,不支付怎麼辦?我們都知道現在訂單都有有效期,再使用一個消息隊列
,用於判斷訂單支付超時,比如說用戶五分鐘內不支付,訂單就失效了,訂單一旦失效,就會加入新的庫存。這也是現在很多網上零售企業保證商品不少賣採用的方案。訂單量比較少的情況下,生成訂單非常快,用戶幾乎不用排隊。
- 積分兌換(積分可用於多平台) 積分兌換模組,有一個公司多個部門都要用到這個模組,這時候就可以通過消息隊列解耦這個特性來實現。 各部門系統做各部門的事,但是他們都可以用這個積分系統進行商品的兌換等。其他模組與積分模組完全解耦。
- 發送郵件,用戶大數據分析等 同步變非同步功能實現 這個功能要說的比較多,從一個平台的用戶註冊開始。 正常情況註冊,不出現高並發: 對於用戶來說,他就是想註冊用一下這個軟體,只要服務端將他的賬戶資訊存到資料庫中他便可以登錄上去做他想做的事情了。用戶並不care這些事,服務端就可以把其他的操作放入對應的
消息隊列
中然後馬上返回用戶結果,由消息隊列非同步
的進行這些操作。 假如有大量的用戶註冊,發生了高並發: 郵件介面承受不住,或是分析資訊時的大量計算使 cpu 滿載,這將會出現雖然用戶數據記錄很快的添加到資料庫中了,但是卻卡在發郵件或分析資訊時的情況,導致請求的響應時間大幅增長,甚至出現超時,這就有點不划算了。面對這種情況一般也是將這些操作放入消息隊列(生產者消費者模型),消息隊列慢慢的進行處理,同時可以很快的完成註冊請求,不會影響用戶使用其他功能。- 用戶註冊
- 用戶註冊選擇幾個興趣標籤,這時候需要根據用戶的屬性,用戶分析,計算出推薦內容
- 註冊後可能需要發送郵件給用戶
- 發送給用戶一個包含操作指南的系統通知
- 等等
- 基於RabbitMQ的Node.js與Python或其他語言實現通訊 這裡也是利用了 RabbitMQ 的解耦特性,不僅僅可以與 Python,還可以與其他很多語言通訊,就不具體說了。
總結
親,別只看,你試試呀!直接開啟服務,裝個 RabbitMQ,挺有意思的,就算一個 HelloWorld 也能嘗試出很多內容。而且本文說的很多內容都可以用 redis 來實現,也可以去看下我的 redis 文章。順便說一句設計模式和數據結構是兩個好東西,越來越能感覺到。文章程式碼地址:https://github.com/koala-coding/simple_rabbitmq
參考文章
https://www.cnblogs.com/baidawei/p/9172433.html
https://www.sojson.com/blog/48.html
http://www.imooc.com/article/293742
https://www.zhihu.com/question/34243607/answer/58314162
https://bbs.csdn.net/topics/392169691?page=1
https://mp.weixin.qq.com/s/wTkwJXlNr5CaI7uRntJ42A