玩轉redis-簡單消息隊列

使用go語言基於redis寫了一個簡單的消息隊列
源碼地址
使用demo

redis的 list 非常的靈活,可以從左邊或者右邊添加元素,當然也以從任意一頭讀取數據

添加數據和獲取數據的操作也是非常簡單的
LPUSH 從左邊插入數據
RPUSH 大右邊插入數據
LPOP 從左邊取出一個數據
RPOP 從右邊取出一個數據

127.0.0.1:6379> LPUSH list1 a  (integer) 1  127.0.0.1:6379> RPUSH list1 b  (integer) 2  127.0.0.1:6379> LPOP list1  "a"  127.0.0.1:6379> RPOP list1  "b"  

或者使用 BLPOP BRPOP 來讀取數據,不同之處是取數據時,如果沒有數據會等待指定的時間,
如果這期間有數據寫入,則會讀取並返回,沒有數據則會返回空
在一個窗口1讀取

127.0.0.1:6379> BLPOP list1 10  1) "list1"  2) "a"  

在另一個窗口2寫入

127.0.0.1:6379> RPUSH list1 a b c  (integer) 3  

再開一個窗口3讀取,第二次讀取時,list是空的,所以等待1秒後返回空。

127.0.0.1:6379> BRPOP list1 1  1) "list1"  2) "c"    127.0.0.1:6379> BRPOP list1 1  (nil)  (1.04s)  

簡單消息隊列的實現

如果我們只從一邊新增元素,向另一邊取出元素,這就不是一個消息隊列么。但我估計你會有一個疑問,在消費數據時,同一個消息會不會同時被多個consumer消費掉?

當然不會,因為redis是單線程的,在從list取數據時天然不會出現並發問題。但是這是一個簡單的消息隊列,消費不成功怎麼處理還是需要我們自己寫代碼來實現的

下面我說一下使用list實現一個簡單的消息隊列的整體思路

comsumer的實現

consumer 主要做的就是從list里讀取數據,使用LPOP或者BLPOP都可以,
這裡做了一個開關 optionsUseBLopp如果為true時會使用BLPOP

type consumer struct {  	once            sync.Once  	redisCmd        redis.Cmdable  	ctx             context.Context  	topicName       string  	handler         Handler  	rateLimitPeriod time.Duration  	options         ConsumerOptions  	_               struct{}  }    type ConsumerOptions struct {  	RateLimitPeriod time.Duration  	UseBLPop        bool  }    

看一下創建consumer的代碼,最後面的opts參數是可選的配置

type Consumer = *consumer    func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {  	consumer := &consumer{  		redisCmd:  redisCmd,  		ctx:       ctx,  		topicName: topicName,  	}  	for _, o := range opts {  		o(&consumer.options)  	}  	if consumer.options.RateLimitPeriod == 0 {  		consumer.options.RateLimitPeriod = time.Microsecond * 200  	}  	return consumer  }    

讀取數據後具體怎麼進行處理調用者可以根據自己的業務邏輯進行相應處理
有一個小的interface調用者根據自己的邏輯去實現

type Handler interface {  	HandleMessage(msg *Message)  }  

讀取數據的邏輯使用一個gorouting實現

func (s *consumer) startGetMessage() {  	go func() {  		ticker := time.NewTicker(s.options.RateLimitPeriod)  		defer func() {  			log.Println("stop get message.")  			ticker.Stop()  		}()  		for {  			select {  			case <-s.ctx.Done():  				log.Printf("context Done msg: %#v n", s.ctx.Err())  				return  			case <-ticker.C:  				var revBody []byte  				var err error  				if !s.options.UseBLPop {  					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()  				} else {  					revs := s.redisCmd.BLPop(time.Second, s.topicName)  					err = revs.Err()  					revValues := revs.Val()  					if len(revValues) >= 2 {  						revBody = []byte(revValues[1])  					}  				}  				if err == redis.Nil {  					continue  				}  				if err != nil {  					log.Printf("LPOP error: %#v n", err)  					continue  				}    				if len(revBody) == 0 {  					continue  				}  				msg := &Message{}  				json.Unmarshal(revBody, msg)  				if s.handler != nil {  					s.handler.HandleMessage(msg)  				}  			}  		}  	}()  }    

Producer 的實現

Producer還是很簡單的就是把數據推送到 reids

type Producer struct {  	redisCmd redis.Cmdable  	_        struct{}  }    func NewProducer(cmd redis.Cmdable) *Producer {  	return &Producer{redisCmd: cmd}  }    func (p *Producer) Publish(topicName string, body []byte) error {  	msg := NewMessage("", body)  	sendData, _ := json.Marshal(msg)  	return p.redisCmd.RPush(topicName, string(sendData)).Err()  }