玩轉redis-簡單消息隊列
- 2020 年 4 月 8 日
- 筆記
使用go
語言基於redis
寫了一個簡單的消息隊列
源碼地址
使用demo
redis的 list
非常的靈活,可以從左邊或者右邊添加元素,當然也以從任意一頭讀取數據
添加數據和獲取數據的操作也是非常簡單的
LPUSH
從左邊插入數據
RPUSH
大右邊插入數據
LPOP
從左邊取出一個數據
RPOP
從右邊取出一個數據
127.0.0.1:6379> LPUSH list1 a (integer) 1 127.0.0.1:6379> RPUSH list1 b (integer) 2 127.0.0.1:6379> LPOP list1 "a" 127.0.0.1:6379> RPOP list1 "b"
或者使用 BLPOP
BRPOP
來讀取數據,不同之處是取數據時,如果沒有數據會等待指定的時間,
如果這期間有數據寫入,則會讀取並返回,沒有數據則會返回空
在一個窗口1
讀取
127.0.0.1:6379> BLPOP list1 10 1) "list1" 2) "a"
在另一個窗口2
寫入
127.0.0.1:6379> RPUSH list1 a b c (integer) 3
再開一個窗口3
讀取,第二次讀取時,list
是空的,所以等待1秒後返回空。
127.0.0.1:6379> BRPOP list1 1 1) "list1" 2) "c" 127.0.0.1:6379> BRPOP list1 1 (nil) (1.04s)
簡單消息隊列的實現
如果我們只從一邊新增元素,向另一邊取出元素,這就不是一個消息隊列么。但我估計你會有一個疑問,在消費數據時,同一個消息會不會同時被多個consumer
消費掉?
當然不會,因為redis是單線程的,在從list
取數據時天然不會出現並發問題。但是這是一個簡單的消息隊列,消費不成功怎麼處理還是需要我們自己寫代碼來實現的
下面我說一下使用list
實現一個簡單的消息隊列的整體思路
comsumer的實現
consumer
主要做的就是從list里讀取數據,使用LPOP
或者BLPOP
都可以,
這裡做了一個開關 options
的UseBLopp
如果為true
時會使用BLPOP
。
type consumer struct { once sync.Once redisCmd redis.Cmdable ctx context.Context topicName string handler Handler rateLimitPeriod time.Duration options ConsumerOptions _ struct{} } type ConsumerOptions struct { RateLimitPeriod time.Duration UseBLPop bool }
看一下創建consumer
的代碼,最後面的opts
參數是可選的配置
type Consumer = *consumer func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer { consumer := &consumer{ redisCmd: redisCmd, ctx: ctx, topicName: topicName, } for _, o := range opts { o(&consumer.options) } if consumer.options.RateLimitPeriod == 0 { consumer.options.RateLimitPeriod = time.Microsecond * 200 } return consumer }
讀取數據後具體怎麼進行處理調用者可以根據自己的業務邏輯進行相應處理
有一個小的interface
調用者根據自己的邏輯去實現
type Handler interface { HandleMessage(msg *Message) }
讀取數據的邏輯使用一個gorouting實現
func (s *consumer) startGetMessage() { go func() { ticker := time.NewTicker(s.options.RateLimitPeriod) defer func() { log.Println("stop get message.") ticker.Stop() }() for { select { case <-s.ctx.Done(): log.Printf("context Done msg: %#v n", s.ctx.Err()) return case <-ticker.C: var revBody []byte var err error if !s.options.UseBLPop { revBody, err = s.redisCmd.LPop(s.topicName).Bytes() } else { revs := s.redisCmd.BLPop(time.Second, s.topicName) err = revs.Err() revValues := revs.Val() if len(revValues) >= 2 { revBody = []byte(revValues[1]) } } if err == redis.Nil { continue } if err != nil { log.Printf("LPOP error: %#v n", err) continue } if len(revBody) == 0 { continue } msg := &Message{} json.Unmarshal(revBody, msg) if s.handler != nil { s.handler.HandleMessage(msg) } } } }() }
Producer 的實現
Producer
還是很簡單的就是把數據推送到 reids
type Producer struct { redisCmd redis.Cmdable _ struct{} } func NewProducer(cmd redis.Cmdable) *Producer { return &Producer{redisCmd: cmd} } func (p *Producer) Publish(topicName string, body []byte) error { msg := NewMessage("", body) sendData, _ := json.Marshal(msg) return p.redisCmd.RPush(topicName, string(sendData)).Err() }