玩转redis-简单消息队列

使用go语言基于redis写了一个简单的消息队列
源码地址
使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

添加数据和获取数据的操作也是非常简单的
LPUSH 从左边插入数据
RPUSH 大右边插入数据
LPOP 从左边取出一个数据
RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a  (integer) 1  127.0.0.1:6379> RPUSH list1 b  (integer) 2  127.0.0.1:6379> LPOP list1  "a"  127.0.0.1:6379> RPOP list1  "b"  

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
如果这期间有数据写入,则会读取并返回,没有数据则会返回空
在一个窗口1读取

127.0.0.1:6379> BLPOP list1 10  1) "list1"  2) "a"  

在另一个窗口2写入

127.0.0.1:6379> RPUSH list1 a b c  (integer) 3  

再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1  1) "list1"  2) "c"    127.0.0.1:6379> BRPOP list1 1  (nil)  (1.04s)  

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?

当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用list实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以,
这里做了一个开关 optionsUseBLopp如果为true时会使用BLPOP

type consumer struct {  	once            sync.Once  	redisCmd        redis.Cmdable  	ctx             context.Context  	topicName       string  	handler         Handler  	rateLimitPeriod time.Duration  	options         ConsumerOptions  	_               struct{}  }    type ConsumerOptions struct {  	RateLimitPeriod time.Duration  	UseBLPop        bool  }    

看一下创建consumer的代码,最后面的opts参数是可选的配置

type Consumer = *consumer    func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {  	consumer := &consumer{  		redisCmd:  redisCmd,  		ctx:       ctx,  		topicName: topicName,  	}  	for _, o := range opts {  		o(&consumer.options)  	}  	if consumer.options.RateLimitPeriod == 0 {  		consumer.options.RateLimitPeriod = time.Microsecond * 200  	}  	return consumer  }    

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
有一个小的interface调用者根据自己的逻辑去实现

type Handler interface {  	HandleMessage(msg *Message)  }  

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {  	go func() {  		ticker := time.NewTicker(s.options.RateLimitPeriod)  		defer func() {  			log.Println("stop get message.")  			ticker.Stop()  		}()  		for {  			select {  			case <-s.ctx.Done():  				log.Printf("context Done msg: %#v n", s.ctx.Err())  				return  			case <-ticker.C:  				var revBody []byte  				var err error  				if !s.options.UseBLPop {  					revBody, err = s.redisCmd.LPop(s.topicName).Bytes()  				} else {  					revs := s.redisCmd.BLPop(time.Second, s.topicName)  					err = revs.Err()  					revValues := revs.Val()  					if len(revValues) >= 2 {  						revBody = []byte(revValues[1])  					}  				}  				if err == redis.Nil {  					continue  				}  				if err != nil {  					log.Printf("LPOP error: %#v n", err)  					continue  				}    				if len(revBody) == 0 {  					continue  				}  				msg := &Message{}  				json.Unmarshal(revBody, msg)  				if s.handler != nil {  					s.handler.HandleMessage(msg)  				}  			}  		}  	}()  }    

Producer 的实现

Producer还是很简单的就是把数据推送到 reids

type Producer struct {  	redisCmd redis.Cmdable  	_        struct{}  }    func NewProducer(cmd redis.Cmdable) *Producer {  	return &Producer{redisCmd: cmd}  }    func (p *Producer) Publish(topicName string, body []byte) error {  	msg := NewMessage("", body)  	sendData, _ := json.Marshal(msg)  	return p.redisCmd.RPush(topicName, string(sendData)).Err()  }