python中的rabbitmq

  • 2020 年 1 月 20 日
  • 筆記

介紹

RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過讀寫出入隊列的消息(針對應用程式的數據)來通訊,而無需專用連接來鏈接它們。消 息傳遞指的是程式之間通過在消息中發送數據進行通訊,而不是通過直接調用彼此來通訊,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程式通過 隊列來通訊。隊列的使用除去了接收和發送應用程式同時執行的要求。

RabbitMQ是一個消息代理:它接受和轉發消息。你可以把它想像成一個郵局:當你把你想要發布的郵件放在郵箱中時,你可以確定郵差先生最終將郵件發送給你的收件人。在這個比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。

RabbitMQ和郵局的主要區別在於它不處理紙張,而是接受,存儲和轉發二進位數據塊 — 消息。

請注意,生產者,消費者和消息代理不必駐留在同一主機上; 實際上在大多數應用程式中它們不是同一主機上。

Hello World!

(using the Pika Python client)

pip3 install pika

在本教程的這一部分,我們將使用Python編寫兩個小程式; 發送單個消息的生產者(發送者),以及接收消息並將其列印出來的消費者(接收者)。這是一個消息傳遞的「Hello World」。

在下圖中,「P」是我們的生產者,「C」是我們的消費者。中間的盒子是一個隊列 – RabbitMQ代表消費者保存的消息緩衝區。

我們的整體設計將如下所示:

生產者將消息發送到「hello」隊列,消費者接收來自該隊列的消息。

發送

我們的第一個程式 send.py 會向隊列發送一條消息。我們需要做的第一件事是與RabbitMQ伺服器建立連接。

#!/usr/bin/env python  import pika    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  channel = connection.channel()

我們現在連接到本地上的的代理 – 因此是 'localhost'。如果我們想連接到另一台機器上的代理,我們只需在此指定其名稱或IP地址。

接下來,在發送之前,我們需要確保收件人隊列存在。如果我們發送消息到不存在的位置,RabbitMQ將只刪除該消息。我們來創建一個將傳遞消息的 hello 隊列:

channel.queue_declare(queue='hello')

此時我們準備發送消息。我們的第一條消息將只包含一個字元串 "Hello World!"我們想把它發送給我們的 hello 隊列。

在RabbitMQ中,消息永遠不會直接發送到隊列,它總是需要經過交換。我們現在需要知道的是如何使用由空字元串標識的默認交換。這種交換是特殊的 – 它允許我們準確地指定消息應該到達哪個隊列。隊列名稱需要在routing_key參數中指定:

channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')  print(" [x] Sent 'Hello World!'")

在退出程式之前,我們需要確保網路緩衝區被刷新,並且我們的消息被實際傳送到RabbitMQ。我們可以通過輕輕關閉連接來完成。

connection.close()

接收

我們的第二個程式 receive.py 將接收隊列中的消息並將它們列印在螢幕上。

再次,我們首先需要連接到RabbitMQ伺服器。負責連接到Rabbit的程式碼與以前相同。

下一步,就像以前一樣,要確保隊列存在。使用queue_declare創建一個隊列是冪等的 – 我們可以根據需要多次運行該命令,並且只會創建一個。

channel.queue_declare()

您可能會問為什麼我們再次聲明隊列 – 我們已經在之前的程式碼中聲明了它。如果我們確信隊列已經存在,我們可以避免這種情況。例如,如果 send.py 程式之前運行過。但我們還不確定首先運行哪個程式。在這種情況下,重複在兩個程式中重複聲明隊列是一種很好的做法。

列出隊列    您可能希望看到RabbitMQ有什麼隊列以及它們中有多少條消息。您可以使用rabbitmqctl工具(作為特權用戶)執行此操作:    > sudo rabbitmqctl list_queues    在Windows上,省略sudo:    > rabbitmqctl.bat list_queues

從隊列接收消息更為複雜。它通過向隊列訂閱 回調函數 來工作。每當我們收到一條消息,這個回調函數就被皮卡庫調用。在我們的例子中,這個函數會在螢幕上列印消息的內容。

def callback(ch, method, propertites, body):      print(" [x] Received {}".format(body))

接下來,我們需要告訴RabbitMQ這個特定的回調函數應該從我們的hello隊列接收消息:

channel.basic_consume(callable, queue='hello', no_ack=True)

為了讓這個命令成功,我們必須確保我們想要訂閱的隊列存在。幸運的是,我們對此有信心 – 我們已經使用queue_declare創建了一個隊列。

NO_ACK參數,後面(幾篇之後)會有解釋。

最後,我們進入一個永無止境的循環,等待數據並在必要時運行回調。

print(' [*] Waiting for messages. To exit press CTRL+C')  channel.start_consuming()

把它放在一起

send.py的完整程式碼:

#!/usr/bin/env python  import pika    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))  channel = connection.channel()      channel.queue_declare(queue='hello')    channel.basic_publish(exchange='', routing_key='hello',body='Hello World!')  print(" [x] Sent 'Hello World!'")  connection.close()

receive.py的完整程式碼:

#!/usr/bin/env python  import pika    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  channel = connection.channel()    channel.queue_declare(queue='hello')      def callback(ch, method, propertites, body):      print(" [x] Received {}".format(body))      channel.basic_consume(callable,queue='hello',no_ack=True)    print(' [*] Waiting for messages. To exit press CTRL+C')  channel.start_consuming()

現在我們可以在終端上試用我們的程式。首先,讓我們開始一個消費者,它將持續運行等待交付:

python receive.py  # => [*] Waiting for messages. To exit press CTRL+C  # => [x] Received 'Hello World!'

現在開始製作。生產者計劃將在每次運行後停止:

python send.py  # => [x] Sent 'Hello World!'

歡呼!我們能夠通過RabbitMQ發送我們的第一條消息。正如您可能已經注意到的,receive.py 程式不會退出。它會隨時準備接收更多消息,並可能會被Ctrl-C中斷。

嘗試在新終端中再次運行 send.py