使用Redis做消息队列

基于内存的单线程数据库,使Redis的线程安全性与性能极高。而Redis的双向链表数据类型(List)天生就可作为消息队列存储消息.

在这里就不说消息队列的等等一些优点。但是补充一下Redis的List类型的几个命令,你可以指定将一个元素投送到列表的头部(左边)或者尾部(右边),当然也可以指定从列表的头部或尾部取出数据.

LPush:添加元素至列表的头部

 

 RPush:添加元素至列表的尾部

 

LPop:移除并获取列表的头部的第一个元素

 RPop:移除并获取列表的尾部的第一个元素

 

BLpop:移出并获取列表头部的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。命令格式:blpop key timeout,当timeout=0时,表示一直阻塞等待,直到有其他客户端执行rpush或者lpush命令,插入数据后,阻塞才解除.

BRpop:与BLpop相同,不同的是它是移除列表尾部的第一个元素.

如下,开启两个客户端,一个客户端先使用BLpop阻塞读取数据,另一个客户端写入数据.

OK,到此我想你已经明白了,List的作用已经显而易见。生产者投入消息,消费者拿到消息。而且双向链表的数据类型,投入和拿取数据都特别灵活。是不是感觉很不错?接着往下看😄

下面在代码中实现消息队列的数据投递与拾取.

引入NuGet包:StackExchange.Redis

 生产者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    for (int i = 0; i < 10; i++)
    {
        redis.db.ListRightPush("datalist", $"data{i}");//向列表的尾部投递消息
        Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
        Thread.Sleep(3000);
    }
    Console.ReadKey();
}

消费者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    while (true)
    {
        string result = redis.db.ListRightPop("datalist");//从列表的尾部拾取消息

        if (string.IsNullOrEmpty(result)) { }
        else
        {
            Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result.ToString()}");
            Thread.Sleep(1000);
        }
    }
}

为了让大家看到效果,我故意让线程等待了几秒钟.

先进先出和先进后出的实现方式都比较灵活,如果要想实现先进先出的规则的话,要将上面的消费者代码改为redis.db.ListLeftPop("datalist")=>从头部开始读取消息

但是上面的代码有一个很大的弊端,虽然消息已经消费完了,但是仍然在不停的lpop,所以造成很大的浪费.就算是这里使用了Sleep,一定程度上减少了CPU的占用率,但是消息处理的时效性就削弱了.

不用担心,对此肯定有解决的方法😀,我们上面提到了Redis有两个阻塞命令BRpop与BLpop,这两个命令可以解决上述问题.有消息的话它就会帮你拿出来,而且不用while(true)的方式也会减少CPU的开销.因为列表没有消息的话,它就会一直阻塞,可以理解为保持了一个长连接(就相当于你问你女朋友为什么生气,然后她就说因为什么什么…,晚上你问她想吃点什么,然后她说想吃点什么什么…,你每次都要去问她,时间久了她就觉得很烦,会觉得你不懂她。所以你就住进她心里面,她心里面想什么你就能第一时间知道,用这个做比喻我相信你们都能懂😂)。

但是StackExchange.Redis并没有提供BLpop与BRpop的API,我们可以使用使用pub/sub的方式.代码如下:

生产者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();

    for (int i = 0; i < 10; i++)
    {
        sub.PublishAsync("datalist", $"data{i}").GetAwaiter();
        Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
        Thread.Sleep(3000);
    }
    Console.ReadKey();
}

消费者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();
    sub.Subscribe("datalist", (channel, message) =>
    {
        Console.WriteLine($"{DateTime.Now} 已接收消息,Message={message}");
        Thread.Sleep(1000);
    });
    Console.WriteLine("消费者0已启动成功!");
    Console.ReadKey();
}

分别启动两个消费者客户端

这种为广播模式,每一个订阅者都会收到消息。但是该消息不保证是否被接收,生产者投递完消息如果没有消费者接收的话,消息会丢失.

还有一种方式消息不会丢失,将消息存在列表里面。首先生产者向列表投入数据,紧接着去通知订阅者,让订阅者从列表中取出数据。但是有一个弊端,如果有多个消费者订阅时,只有一个消费者能取到数据。代码如下:

生产者:

var redis = new RedisContent();
var sub = RedisContent.redis.GetSubscriber();

for (int i = 0; i < 10; i++)
{
    redis.db.ListLeftPush("datalist", $"data{i}", flags: CommandFlags.FireAndForget);
    sub.PublishAsync("channel1", "").GetAwaiter();
    Console.WriteLine($"{DateTime.Now} 已投递消息data{i}!");
    Thread.Sleep(3000);
}
Console.ReadKey();

消费者:

static void Main(string[] args)
{
    var redis = new RedisContent();
    var sub = RedisContent.redis.GetSubscriber();
//如果消费者后启动,或者宕机重启,要先查询列表中是否有数据,如果有数据要消费掉
var len = redis.db.ListRange("datalist").Length; if (len > 0) { Task.Run(() => { for (int i = 0; i < len; i++) { string result = redis.db.ListRightPop("datalist"); //业务操作... } }); } sub.Subscribe("channel1", (channel, message) => { string result = redis.db.ListRightPop("datalist"); Console.WriteLine($"{DateTime.Now} 已接收消息,Message={result}"); Thread.Sleep(1000); }); Console.WriteLine("消费者0已启动成功!"); Console.ReadKey();

如有不足,请见谅!今天是十月二号,昨天是中秋佳节又是国庆,在这里祝大家双节快乐。本来昨天晚上写完这篇的,但是八九点的时候太困了,就😴… …

昨晚坐在窗边,偶然间向外面瞄了一眼,月亮的光芒太耀眼了,也太漂亮了,赶紧拍了一张🌕

Tags: