RabbitMQ消息隊列

  • 2021 年 7 月 30 日
  • 筆記

一、消息隊列

消息隊列就是一種先進先出的數據機構

當在分散式系統中的時候,不同的機器需要做數據交互,所以涉及到不同機器之間的數據交互,這樣的話就需要藉助專業的消息隊列,常見的消息隊列有 RabbitMQ 、Kafka…他們都是開源且支援語言較多。

消息隊列解決的問題:

  1. 應用解耦

  2. 流量消峰:

    如果訂單系統一秒最多能處理一萬次訂單,這個處理能力在平時綽綽有餘,正常時段我們下單一秒後就能返回結果。但是在高峰期,如果有兩萬次下單作業系統是處理不了的,只能限制訂單超過一萬後不允許用戶下單。

    但是使用消息隊列,就可以取消這個限制,把這一秒內的訂單放入隊列中分散成一段時間來處理,這樣用戶就可能在下單幾十秒後才能收到下單成功的操作,但是比不能下單要好。

  3. 消息分發:

    當A發送一次消息,B對消息感興趣,就只需監聽消息,C感興趣,C也去監聽消息,而A完全不需要改動。

  4. 非同步消息(Celery 就是對消息隊列的分裝)

RabbitMQ 和 Kafka

RabbitMQ :吞吐量小,有消息確認(對消息可靠性有要求,就用它)

Kafka:吞吐量高,注重高吞吐量,不注重消息的可靠性,數據量特別大

二、按裝RabbitMQ

1、原生安裝:

# 安裝擴展epel源
wget -O /etc/yum.repos.d/epel.repo //mirrors.aliyun.com/repo/epel-7.repo
    
yum -y install erlang			# 因為RabbitMQ是erlang語言開發的,所以要按裝
yum -y install rabbitmq-server	# 安裝RabbitMQ
systemctl start rabbitmq-server	# 啟動

# 創建用戶
rabbitmqctl add_user 用戶名 密碼
# 分配許可權
rabbitmqctl set_user_tags 用戶名 administrator ——>(設置用戶為管理員角色)
rabbitmqctl set_permissions -p "/" 用戶名 ".*" ".*" ".*"	# 設置許可權
systemctl reatart rabbitmq-server	# 重啟

2、docker拉取

docker pull rabbitmq:3.8.3-management		# 自動開啟了web管理介面

# 啟動需要配置用戶名和密碼
docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3.8.3-management

5672:是RabbitMQ的默認埠
15672:web管理介面的埠

三、基本使用

生產者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))

# 拿到channel對象
channel = connection.channel()

# 聲明一個隊列
channel.queue_declare(queue='test')

# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='',
                      routing_key='test',   # 指定向那個隊列放入
                      body='測試數據')        # 放入的內容
                      
# 關閉連接
connection.close()

消費者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))

# 拿到channel對象
channel = connection.channel()

# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    print(f'測試:{body}')


# 消費者從指定的隊列中拿消息消費,一旦有一條轉到 callback 里
channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)

# 阻塞主,一直等待拿消息消費
channel.start_consuming()

四、確認機制

消息確認機制其實就是消費者中 auto_ack 的設置

生產者不變

消費者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))

# 拿到channel對象
channel = connection.channel()

# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    print(f'測試:{body}')
    # 如果auto_ack=False這樣設置後
    # 也可以這樣設置當真正的消息處理完了,在發確認也是可以的
    ch.basic_ack(delivery_tag=method.delivery_tag)


# auto_ack=True,隊列收到確認,就會自動把消費過的消息刪除。
# auto_ack=False,那麼就不會給隊列發送確認消息了,隊列就不會刪除消息。不會自動回復確認消息,
channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=False)

# 阻塞主,一直等待拿消息消費
channel.start_consuming()

五、持久化

隊列持久化:就是在聲明隊列的時候,指定持久化durable=True,隊列必須是新的才可以

channel.queue_declare(queue='test', durable=True)	# test 隊列持久化

消息持久化:就是在發布消息的時候添加

# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='',
                      routing_key='test',   # 指定向那個隊列放入
                      body='hello',         # 放入的內容
                      properties=pika.BasicProperties(delivery_mode=2) # 消息持久化
                      )

六、閑置消費

當正常情況下如果有多個消費者,那麼就會按照順序第一個消息給 第一個消費者,第二個消息給第二個消費者

但是當第一個消息的消費者處理資訊很耗時,一直沒有結束,那麼就可以讓第二個消費者優先獲取閑置消息。

消費者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))

# 拿到channel對象
channel = connection.channel()

# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.queue_declare(queue='test')


def callback(ch, method, properties, body):
    print(f'測試:{body}')


# 就只有這一句話,誰閑置誰獲取,沒必要按照順序一個一個來
channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)

# 阻塞主,一直等待拿消息消費
channel.start_consuming()

七、發布訂閱

發布訂閱就是:我可以有多個訂閱者來訂閱你的消息,這樣發布者只需要發布一條, 我的所有隻要訂閱你的人都可以消費你的消息

模型:當我的訂閱者起來了之後,就會創建一個隊列,多個訂閱者就會創建多個隊列,當發布者生產了消息之後,會傳給 exchange ,然後 exchange 會把消息複製分別分發到訂閱者創建的隊列中,這樣就實現了只要監聽你,那就能收到你發的消息。

基本使用

發布者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))

# 拿到channel對象
channel = connection.channel()

# 不指定隊列,指定了 exchange 複製分發消息
channel.exchange_declare(exchange='conn', exchange_type='fanout')

# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='conn',      # 指定複製分發消息的 exchange
                      routing_key='',   	# 不設置指定向那個隊列放入
                      body='Hello Word',    # 放入的內容
                      )
# 關閉連接
connection.close()

訂閱者:啟動多次,都綁定到了同一個 exchange,所以就會都收到同一個 exchange 分發的消息

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))


# 拿到channel對象
channel = connection.channel()


# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='fanout')


# queue 不能制定名字,因為它們的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue    # 生成一個隨機的 queue 名字



# 把隨機生成的隊列綁定到exchange上
channel.queue_bind(exchange='conn', queue=queue_name)


def callback(ch, method, properties, body):
    print(f'測試:{body}')


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

# 阻塞主,一直等待拿消息消費
channel.start_consuming()

關鍵字

需要設置 exchange_type 的類型為 direct

並且在發布消息的時候設置多個關鍵字:routing_key

在訂閱者中也需要設置 exchange_type 的類型 direct

並且當訂閱者綁定 exchange 的時候也需要設置 routing_key,

這樣的話在發布者發布消息後,exchange 會根據發布者和訂閱者設置的 routing_key 進行匹配,當訂閱者的 routing_key 匹配上了發布者的 routing_key 的話,那麼訂閱者就可以接收到發布者發布的消息,反之收不到消息。

發布者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))

# 拿到channel對象
channel = connection.channel()

# 不指定隊列,指定了 exchange 複製分發消息,exchange_type='direct'
channel.exchange_declare(exchange='conn1', exchange_type='direct')

# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='conn1',     # 指定複製分發消息的 exchange
                      routing_key='abc',   # 指定關鍵字
                      body='Hello Word',    # 放入的內容
                      )
# 關閉連接
connection.close()

消費者1:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))


# 拿到channel對象
channel = connection.channel()


# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='direct')


# queue 不能制定名字,因為它的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue    # 生成一個隨機的 queue 名字
print(queue_name)


# 把隨機生成的隊列綁定到exchange上,
# 並設置routing_key='abc',也就是說只有發布者的routing_key中包含有'abc',此訂閱者才會收到消息
channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abc')


def callback(ch, method, properties, body):
    print(f'測試:{body}')


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

# 阻塞主,一直等待拿消息消費
channel.start_consuming()

消費者2:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))


# 拿到channel對象
channel = connection.channel()


# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='direct')


# queue 不能制定名字,因為它的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue    # 生成一個隨機的 queue 名字
print(queue_name)


# 把隨機生成的隊列綁定到exchange上,
# 並設置了多個routing_key,也就是說只有發布者的routing_key中包含有入下兩個之一,此訂閱者都會收到消息
channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abc')
channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abcd')


def callback(ch, method, properties, body):
    print(f'測試:{body}')


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

# 阻塞主,一直等待拿消息消費
channel.start_consuming()

模糊匹配

在訂閱者綁定匹配的時候可以進行模糊匹配發布者的 routing_key ,匹配上了就能接收到發布者發布的消息

# 表示後面可以跟任意字元
* 表示後面只能跟一個單詞

發布者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))

# 拿到channel對象
channel = connection.channel()

# 不指定隊列,指定了 exchange 複製分發消息,exchange_type='topic'
channel.exchange_declare(exchange='conn1', exchange_type='topic')

# 生產者向隊列中放入一條消息
channel.basic_publish(exchange='conn2',     # 指定複製分發消息的 exchange
                      routing_key='abcdefg',    # 指定關鍵字
                      body='Hello Word',    # 放入的內容
                      )
# 關閉連接
connection.close()

訂閱者:

import pika

# 有用戶名密碼
credentials = pika.PlainCredentials('admin', 'admin')
# 拿到連接對象
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))


# 拿到channel對象
channel = connection.channel()


# 聲明一個隊列,如果消費者先起來,那麼就先聲明一個隊列
channel.exchange_declare(exchange='conn', exchange_type='direct')


# queue 不能制定名字,因為它的名字都是不一樣的
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue    # 生成一個隨機的 queue 名字
print(queue_name)


# 把隨機生成的隊列綁定到exchange上,
# 並設置routing_key='abc#',也就是說只有發布者的routing_key中包含有'abc'開頭,此訂閱者才會收到消息
channel.queue_bind(exchange='conn2', queue=queue_name, routing_key='abc#')


def callback(ch, method, properties, body):
    print(f'測試:{body}')


channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

# 阻塞主,一直等待拿消息消費
channel.start_consuming()

八、python中的RPC框架

RPC :遠程過程調用

例如:兩個服務調用,服務1通過網路調用服務2的方法。

SimpleXMLRPCServer

自帶的:數據包大,速度慢

服務端:

from xmlrpc.server import SimpleXMLRPCServer


class RPCServer(object):

    def getObj(self):
        return 'get obj'

    def sendObj(self, data):
        return 'send obj'


# SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
server.register_introspection_functions()
server.register_instance(RPCServer())
server.serve_forever()

客戶端:

from xmlrpc.client import ServerProxy

client = ServerProxy('//localhost:4242')
ret = client.getObj()
print(ret)

ZeroRPC

第三方的:底層使用 ZeroMQ 和 MessagePack ,速度快,響應時間短,並發高。

服務端:

import zerorpc

class RPCServer(object):

    def getObj(self):
        return 'get obj'

    def sendObj(self, data):
        return 'send obj'


server = zerorpc.Server(RPCServer())
server.bind('tcp://0.0.0.0:4243')   # 允許連接的
server.run()

客戶端:

import zerorpc

client = zerorpc.Client()
client.connect('tcp://127.0.0.1:4243')  # 連接
ret = client.getObj()
print(ret)