RabbitMQ消息隊列
- 2021 年 7 月 30 日
- 筆記
一、消息隊列
消息隊列就是一種先進先出
的數據機構
當在分散式系統中的時候,不同的機器需要做數據交互,所以涉及到不同機器之間的數據交互,這樣的話就需要藉助專業的消息隊列,常見的消息隊列有 RabbitMQ 、Kafka…他們都是開源且支援語言較多。
消息隊列解決的問題:
-
應用解耦
-
流量消峰:
如果訂單系統一秒最多能處理一萬次訂單,這個處理能力在平時綽綽有餘,正常時段我們下單一秒後就能返回結果。但是在高峰期,如果有兩萬次下單作業系統是處理不了的,只能限制訂單超過一萬後不允許用戶下單。
但是使用消息隊列,就可以取消這個限制,把這一秒內的訂單放入隊列中分散成一段時間來處理,這樣用戶就可能在下單幾十秒後才能收到下單成功的操作,但是比不能下單要好。
-
消息分發:
當A發送一次消息,B對消息感興趣,就只需監聽消息,C感興趣,C也去監聽消息,而A完全不需要改動。
-
非同步消息(Celery 就是對消息隊列的分裝)
RabbitMQ 和 Kafka
RabbitMQ :吞吐量小,有消息確認(對消息可靠性有要求,就用它)
Kafka:吞吐量高,注重高吞吐量,不注重消息的可靠性,數據量特別大
二、按裝RabbitMQ
1、原生安裝:
# 安裝擴展epel源
wget -O /etc/yum.repos.d/epel.repo //mirrors.aliyun.com/repo/epel-7.repo
yum -y install erlang # 因為RabbitMQ是erlang語言開發的,所以要按裝
yum -y install rabbitmq-server # 安裝RabbitMQ
systemctl start rabbitmq-server # 啟動
# 創建用戶
rabbitmqctl add_user 用戶名 密碼
# 分配許可權
rabbitmqctl set_user_tags 用戶名 administrator ——>(設置用戶為管理員角色)
rabbitmqctl set_permissions -p "/" 用戶名 ".*" ".*" ".*" # 設置許可權
systemctl reatart rabbitmq-server # 重啟
2、docker拉取
docker pull rabbitmq:3.8.3-management # 自動開啟了web管理介面
# 啟動需要配置用戶名和密碼
docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3.8.3-management
5672:是RabbitMQ的默認埠
15672:web管理介面的埠
三、基本使用
生產者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列
channel.queue_declare(queue='test')
# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='',
routing_key='test', # 指定向那個隊列放入
body='測試數據') # 放入的內容
# 關閉連接
connection.close()
消費者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
print(f'測試:{body}')
# 消費者從指定的隊列中拿消息消費,一旦有一條轉到 callback 里
channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)
# 阻塞主,一直等待拿消息消費
channel.start_consuming()
四、確認機制
消息確認機制其實就是消費者中 auto_ack 的設置
生產者不變
消費者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
print(f'測試:{body}')
# 如果auto_ack=False這樣設置後
# 也可以這樣設置當真正的消息處理完了,在發確認也是可以的
ch.basic_ack(delivery_tag=method.delivery_tag)
# auto_ack=True,隊列收到確認,就會自動把消費過的消息刪除。
# auto_ack=False,那麼就不會給隊列發送確認消息了,隊列就不會刪除消息。不會自動回復確認消息,
channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=False)
# 阻塞主,一直等待拿消息消費
channel.start_consuming()
五、持久化
隊列持久化:就是在聲明隊列的時候,指定持久化durable=True
,隊列必須是新的才可以
channel.queue_declare(queue='test', durable=True) # test 隊列持久化
消息持久化:就是在發布消息的時候添加
# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='',
routing_key='test', # 指定向那個隊列放入
body='hello', # 放入的內容
properties=pika.BasicProperties(delivery_mode=2) # 消息持久化
)
六、閑置消費
當正常情況下如果有多個消費者,那麼就會按照順序第一個消息給 第一個消費者,第二個消息給第二個消費者
但是當第一個消息的消費者處理資訊很耗時,一直沒有結束,那麼就可以讓第二個消費者優先獲取閑置消息。
消費者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.queue_declare(queue='test')
def callback(ch, method, properties, body):
print(f'測試:{body}')
# 就只有這一句話,誰閑置誰獲取,沒必要按照順序一個一個來
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)
# 阻塞主,一直等待拿消息消費
channel.start_consuming()
七、發布訂閱
發布訂閱就是:我可以有多個訂閱者來訂閱你的消息,這樣發布者只需要發布一條, 我的所有隻要訂閱你的人都可以消費你的消息
模型:當我的訂閱者起來了之後,就會創建一個隊列,多個訂閱者就會創建多個隊列,當發布者生產了消息之後,會傳給 exchange ,然後 exchange 會把消息複製分別分發到訂閱者創建的隊列中,這樣就實現了只要監聽你,那就能收到你發的消息。
基本使用
發布者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 不指定隊列,指定了 exchange 複製分發消息
channel.exchange_declare(exchange='conn', exchange_type='fanout')
# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='conn', # 指定複製分發消息的 exchange
routing_key='', # 不設置指定向那個隊列放入
body='Hello Word', # 放入的內容
)
# 關閉連接
connection.close()
訂閱者:啟動多次,都綁定到了同一個 exchange,所以就會都收到同一個 exchange 分發的消息
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='fanout')
# queue 不能制定名字,因為它們的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue # 生成一個隨機的 queue 名字
# 把隨機生成的隊列綁定到exchange上
channel.queue_bind(exchange='conn', queue=queue_name)
def callback(ch, method, properties, body):
print(f'測試:{body}')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
# 阻塞主,一直等待拿消息消費
channel.start_consuming()
關鍵字
需要設置 exchange_type 的類型為 direct
並且在發布消息的時候設置多個關鍵字:routing_key
在訂閱者中也需要設置 exchange_type 的類型 direct
並且當訂閱者綁定 exchange 的時候也需要設置 routing_key,
這樣的話在發布者發布消息後,exchange 會根據發布者和訂閱者設置的 routing_key 進行匹配,當訂閱者的 routing_key 匹配上了發布者的 routing_key 的話,那麼訂閱者就可以接收到發布者發布的消息,反之收不到消息。
發布者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 不指定隊列,指定了 exchange 複製分發消息,exchange_type='direct'
channel.exchange_declare(exchange='conn1', exchange_type='direct')
# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='conn1', # 指定複製分發消息的 exchange
routing_key='abc', # 指定關鍵字
body='Hello Word', # 放入的內容
)
# 關閉連接
connection.close()
消費者1:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='direct')
# queue 不能制定名字,因為它的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue # 生成一個隨機的 queue 名字
print(queue_name)
# 把隨機生成的隊列綁定到exchange上,
# 並設置routing_key='abc',也就是說只有發布者的routing_key中包含有'abc',此訂閱者才會收到消息
channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abc')
def callback(ch, method, properties, body):
print(f'測試:{body}')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
# 阻塞主,一直等待拿消息消費
channel.start_consuming()
消費者2:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='direct')
# queue 不能制定名字,因為它的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue # 生成一個隨機的 queue 名字
print(queue_name)
# 把隨機生成的隊列綁定到exchange上,
# 並設置了多個routing_key,也就是說只有發布者的routing_key中包含有入下兩個之一,此訂閱者都會收到消息
channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abc')
channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abcd')
def callback(ch, method, properties, body):
print(f'測試:{body}')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
# 阻塞主,一直等待拿消息消費
channel.start_consuming()
模糊匹配
在訂閱者綁定匹配的時候可以進行模糊匹配發布者的 routing_key ,匹配上了就能接收到發布者發布的消息
# 表示後面可以跟任意字元
* 表示後面只能跟一個單詞
發布者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 不指定隊列,指定了 exchange 複製分發消息,exchange_type='topic'
channel.exchange_declare(exchange='conn1', exchange_type='topic')
# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='conn2', # 指定複製分發消息的 exchange
routing_key='abcdefg', # 指定關鍵字
body='Hello Word', # 放入的內容
)
# 關閉連接
connection.close()
訂閱者:
import pika
# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
# 拿到channel對象
channel = connection.channel()
# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='direct')
# queue 不能制定名字,因為它的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue # 生成一個隨機的 queue 名字
print(queue_name)
# 把隨機生成的隊列綁定到exchange上,
# 並設置routing_key='abc#',也就是說只有發布者的routing_key中包含有'abc'開頭,此訂閱者才會收到消息
channel.queue_bind(exchange='conn2', queue=queue_name, routing_key='abc#')
def callback(ch, method, properties, body):
print(f'測試:{body}')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
# 阻塞主,一直等待拿消息消費
channel.start_consuming()
八、python中的RPC框架
RPC :遠程過程調用
例如:兩個服務調用,服務1通過網路調用服務2的方法。
SimpleXMLRPCServer
自帶的:數據包大,速度慢
服務端:
from xmlrpc.server import SimpleXMLRPCServer
class RPCServer(object):
def getObj(self):
return 'get obj'
def sendObj(self, data):
return 'send obj'
# SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
server.register_introspection_functions()
server.register_instance(RPCServer())
server.serve_forever()
客戶端:
from xmlrpc.client import ServerProxy
client = ServerProxy('//localhost:4242')
ret = client.getObj()
print(ret)
ZeroRPC
第三方的:底層使用 ZeroMQ 和 MessagePack ,速度快,響應時間短,並發高。
服務端:
import zerorpc
class RPCServer(object):
def getObj(self):
return 'get obj'
def sendObj(self, data):
return 'send obj'
server = zerorpc.Server(RPCServer())
server.bind('tcp://0.0.0.0:4243') # 允許連接的
server.run()
客戶端:
import zerorpc
client = zerorpc.Client()
client.connect('tcp://127.0.0.1:4243') # 連接
ret = client.getObj()
print(ret)