redis之PubSub
- 2019 年 10 月 30 日
- 筆記
前面我們講了 Redis 消息隊列的使用方法,但是沒有提到 Redis 消息隊列的不足之處,那就是它不支持消息的多播機制。
消息多播
消息多播允許生產者生產一次消息,中間件負責將消息複製到多個消息隊列,每個消息隊列由相應的消費組進行消費。它是分佈式系統常用的一種解耦方式,用於將多個消費組的邏輯進行拆分。支持了消息多播,多個消費組的邏輯就可以放到不同的子系統中。
如果是普通的消息隊列,就得將多個不同的消費組邏輯串接起來放在一個子系統中,進
行連續消費。
PubSub
為了支持消息多播,Redis 不能再依賴於那 5 種基本數據類型了。它單獨使用了一個模塊來支持消息多播,這個模塊的名字叫着 PubSub,也就是 PublisherSubscriber,發佈者訂閱者模型。我們使用 Python 語言來演示一下 PubSub 如何使用。
# -*- coding: utf-8 -*-
import time
import redis
client = redis.StrictRedis()
p = client.pubsub()
p.subscribe(“codehole”)
time.sleep(1)
print p.get_message()
client.publish(“codehole”, “java comes”)
time.sleep(1)
print p.get_message()
client.publish(“codehole”, “python comes”)
time.sleep(1)
print p.get_message()
print p.get_message()
{‘pattern’: None, ‘type’: ‘subscribe’, ‘channel’: ‘codehole’, ‘data’: 1L}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘java comes’}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘python comes’}
None
客戶端發起訂閱命令後,Redis 會立即給予一個反饋消息通知訂閱成功。因為有網絡傳輸延遲,在 subscribe 命令發出後,需要休眠一會,再通過 get_message 才能拿到反饋消息。客戶端接下來執行發佈命令,發佈了一條消息。同樣因為網絡延遲,在 publish 命令發出後,需要休眠一會,再通過 get_message 才能拿到發佈的消息。如果當前沒有消息,get_message 會返回空,告知當前沒有消息,所以它不是阻塞的。Redis PubSub 的生產者和消費者是不同的連接,也就是上面這個例子實際上使用了兩個Redis 的連接。這是必須的,因為 Redis 不允許連接在 subscribe 等待消息時還要進行其它的操作。
在生產環境中,我們很少將生產者和消費者放在同一個線程里。如果它們真要在同一個線程里,何必通過中間件來流轉,直接使用函數調用就行。所以我們應該將生產者和消費者分離,接下來我們看看分離後的代碼要怎麼寫。
消費者
# -*- coding: utf-8 -*-
import time
import redis
client = redis.StrictRedis()
p = client.pubsub()
p.subscribe(“codehole”)
while True:
msg = p.get_message()
if not msg:
time.sleep(1)
continue
print msg
生產者
# -*- coding: utf-8 -*-
import redis
client = redis.StrictRedis()
client.publish(“codehole”, “python comes”)
client.publish(“codehole”, “java comes”)
client.publish(“codehole”, “golang comes”)
必須先啟動消費者,然後再執行生產者,消費者我們可以啟動多個,pubsub 會保證它們收到的是相同的消息序列。
{‘pattern’: None, ‘type’: ‘subscribe’, ‘channel’: ‘codehole’, ‘data’: 1L}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘python comes’}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘java comes’}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘golang comes’}
我們從消費者的控制台窗口可以看到上面的輸出,每個消費者窗口都是同樣的輸出。第一行是訂閱成功消息,它很快就會輸出,後面的三行會在生產者進程執行的時候立即輸出。
上面的消費者是通過輪詢 get_message 來收取消息的,如果收取不到就休眠 1s。這讓我們想起了第 3 節的消息隊列模型,我們使用 blpop 來代替休眠來提高消息處理的及時性。PubSub 的消費者如果使用休眠的方式來輪詢消息,也會遭遇消息處理不及時的問題。不過我們可以使用 listen 來阻塞監聽消息來進行處理,這點同 blpop 原理是一樣的。下面我們改造一下消費者
阻塞消費者
# -*- coding: utf-8 -*-
import time
import redis
client = redis.StrictRedis()
p = client.pubsub()
p.subscribe(“codehole”)
for msg in p.listen():
print msg
代碼簡短了很多,不需要再休眠了,消息處理也及時了。
模式訂閱
上面提到的訂閱模式是基於名稱訂閱的,消費者訂閱一個主題是必須明確指定主題的名稱。如果我們想要訂閱多個主題,那就 subscribe 多個名稱。
> subscribe codehole.image codehole.text codehole.blog # 同時訂閱三個主題,會有三條訂閱成功反
饋信息
1) “subscribe”
2) “codehole.image”
3) (integer) 1
1) “subscribe”
2) “codehole.text”
3) (integer) 2
1) “subscribe”
2) “codehole.blog”
3) (integer) 3
這樣生產者向這三個主題發佈的消息,這個消費者都可以接收到。
> publish codehole.image https://www.google.com/dudo.png
(integer) 1
> publish codehole.text ” 你好,歡迎加入碼洞 “
(integer) 1
> publish codehole.blog ‘{“content”: “hello, everyone”, “title”: “welcome”}’
(integer) 1
如果現在要增加一個主題 codehole.group,客戶端必須也跟着增加一個訂閱指令才可以收到新開主題的消息推送。
為了簡化訂閱的繁瑣,redis 提供了模式訂閱功能 Pattern Subscribe,這樣就可以一次訂
閱多個主題,即使生產者新增加了同模式的主題,消費者也可以立即收到消息
> psubscribe codehole.* # 用模式匹配一次訂閱多個主題,主題以 codehole. 字符開頭的消息都可
以收到
1) “psubscribe”
2) “codehole.*”
3) (integer) 1
消息結構
前面的消費者消息輸出時都是下面的這樣一個字典形式
{‘pattern’: None, ‘type’: ‘subscribe’, ‘channel’: ‘codehole’, ‘data’: 1L}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘python comes’}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘java comes’}
{‘pattern’: None, ‘type’: ‘message’, ‘channel’: ‘codehole’, ‘data’: ‘golang comes’}
那這幾個字段是什麼含義呢?data 這個毫無疑問就是消息的內容,一個字符串。channel 這個也很明顯,它表示當前訂閱的主題名稱。type 它表示消息的類型,如果是一個普通的消息,那麼類型就是 message,如果是控制消息,比如訂閱指令的反饋,它的類型就是 subscribe,如果是模式訂閱的反饋,它的類型就是 psubscribe,還有取消訂閱指令的反饋 unsubscribe 和 punsubscribe。
pattern 它表示當前消息是使用哪種模式訂閱到的,如果是通過 subscribe 指令訂閱的,那麼這個字段就是空。
PubSub 缺點
PubSub 的生產者傳遞過來一個消息,Redis 會直接找到相應的消費者傳遞過去。如果一個消費者都沒有,那麼消息直接丟棄。如果開始有三個消費者,一個消費者突然掛掉了,生產者會繼續發送消息,另外兩個消費者可以持續收到消息。但是掛掉的消費者重新連上的時候,這斷連期間生產者發送的消息,對於這個消費者來說就是徹底丟失了。如果 Redis 停機重啟,PubSub 的消息是不會持久化的,畢竟 Redis 宕機就相當於一個消費者都沒有,所有的消息直接被丟棄。正是因為 PubSub 有這些缺點,它幾乎找不到合適的應用場景。所以 Redis 的作者單獨開啟了一個項目 Disque 專門用來做多播消息隊列。該項目目前沒有成熟,一直長期處於Beta 版本,但是相應的客戶端 sdk 已經非常豐富了,就待 Redis 作者臨門一腳發佈一個Release 版本。