使用Redis Stream來做消息隊列和在Asp.Net Core中的實現

Redis - Wikipedia

寫在前面

我一直以來使用redis的時候,很多低烈度需求(並發要求不是很高)需要用到消息隊列的時候,在項目本身已經使用了Redis的情況下都想直接用Redis來做消息隊列,而不想引入新的服務,kafka和RabbitMQ等;

奈何這兄弟一直不給力;

雖然 Redis 的Pub/Sub 是實現了發布/訂閱的,但這傢伙最坑的是:丟數據

由於Pub/Sub 只是簡單的實現了發布訂閱模式,簡單的溝通起生產者和消費者,當接收生產者的數據後並立即推送或者說轉發給訂閱消費者,並不會做任何的持久化、存儲操作。由此:

  1. ​ 消費者(客戶端)掉線;
  2. ​ 消費者未訂閱(所以使用的時候一定記得先訂閱再生產);
  3. ​ 服務端宕機;
  4. ​ 消費者消費不過來,消息堆積(生產數據受數據緩衝區限制);

以上情況都會導致生產數據的丟失,基於上坑,據我所知大家很少使用Pub/Sub ;

不過官方的哨兵集群通訊的時候就是用的Pub/Sub;

然後,各路大佬結合隊列、阻塞等等實現了各種各樣的方案,主要是使用:BLPOP+LPUSH 的實現

這裡就不一一展開了,有興趣請看葉老闆文章

可能是各種實現都會帶來各種的問題,redis的官方也看到了社區的掙扎。終於,到了Redis5.0,官方帶來了消息隊列的實現:Stream

Redis Stream介紹

簡單來說Redis Stream 就是想用Redis 做消息隊列的最佳推薦;

XADD–發布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再發一條
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631628890025-0"

其中的’*’表示讓 Redis 自動生成唯一的消息 ID,格式是 「時間戳-自增序號」

XREAD–訂閱消息

訂閱消息

XREAD COUNT 5 STREAMS stream1 0-0 
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0 
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

‘0-0’ 表示從開頭讀取

如果需繼續拉取下一條,需傳入上一條消息的id

阻塞等待消息

XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待消息id 『1631628890025-0』 後的消息

50000 阻塞時間(毫秒) 『0』 表示無限期阻塞

從到這裡就可以看出 Pub/Sub多端訂閱的最大優點,Stream也是支援的。有的同學很快就發現問題了,這裡多端訂閱後,沒有消息確認ACK機制。

沒錯,因為現在所有的消費者都是訂閱共同的消息,多端訂閱,如果某個客戶端ACK某條消息後,其他端消費不了,就實現不了多端消費了。

由此,引出 分組:GROUP

GROUP–訂閱分組消息(多端訂閱)

同樣先發布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631629084083-0"

XGROUP CREATE 創建分組

創建分組1

XGROUP CREATE stream1 group1 0-0  
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0  
OK

『0-0』 表示從開頭讀取

‘>’ 表示讀取最新,未被消費過的消息

XREADGROUP–分組讀取

分組 group1

XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  

consumer1 消費者名稱, redis伺服器會記住第一次使用的消費者名稱;


127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
(nil)

同樣

『0-0』 表示從開頭讀取

‘>’ 表示讀取最新,未被消費過的消息 (可以看到命令執行第二遍已經讀不到新消息了)

分組 group2

127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0  
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19

可以看到可以讀到同樣的消息,多端訂閱沒有問題;

當然分組也支援阻塞讀取:

#和XREAD一樣
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 

#分組阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 > 

『0』 表示無限期阻塞,單位(毫秒)

XPENDING–待處理消息

消息使用XREADGROUP 讀取後會進入待處理條目列表(PEL);

我們看看:

 XPENDING stream1 group2
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"
      2) "4"

表示:

  1. (integer) 4 //表示當前消費者組的待處理消息的數量
  2. “1631628884174-0” //消息最大id
  3. “1631629084083-0” //最小id
      1. “consumer1” // 消費者名稱
      2. “4” //消費者待處理消息數量

XACK–刪除已處理消息(消息確認機制)

我們已經知道group2待處理消息有4條,我們從頭讀取看看:

XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

假設最後一條消息 『1631629084083-0』 我已處理完成

127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1

再看:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"
      2) "3"

可以清楚看到goroup2 待處理消息剩下3條;

這時 Redis 已經把這條消息標記為「處理完成」不再追蹤;

Stream在Asp.net Core中的使用

private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";

發布:

csRedis.XAdd(_keyStream, "*", ("name", "message1"));

訂閱:

static async Task CsRedisStreamConsumer()
{
    Console.WriteLine("CsRedis StreamConsumer start!");

    var csRedis = new CSRedis.CSRedisClient(_connstr);
    csRedis.XAdd(_keyStream, "*", ("name", "message1"));

    try
    {
        csRedis.XGroupCreate(_keyStream, _nameGrourp);
    }
    catch { }

    (string key, (string id, string[] items)[] data)[] product;
    (string Pid, string Platform, string Time) data = (null, null, null);

    while (true)
    {
        try
        {
            product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));
            if (product?.Length > 0 == true && product[0].data?.Length > 0 == true)
            {
                Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");

                product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>
                {
                    Console.WriteLine($"    {value}");
                });

                //csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);
            }
        }
        catch (Exception)
        {
            //throw;
        }
    }
}

CSRedisCore

動畫2

這裡的超時報錯可通過修改連接參數:syncTimeout 解決

CSRedisCore支援阻塞讀取;

StackExchange.Redis

發布:

db.StreamAdd(_keyStream, "name", "message1", "*");

訂閱:

static async Task StackExchangeRedisStreamConsumer()
{
    Console.WriteLine("StackExchangeRedis StreamConsumer start!");

    var redis = ConnectionMultiplexer.Connect(_connstr);
    var db = redis.GetDatabase();

    try
    {
        ///初始化方式1
        //db.StreamAdd(_keyStream, "name", "message1", "*");
        //db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);

        //方式2
        db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);
    }
    catch { }

    StreamEntry[] data = null;

    while (true)
    {
        data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);

        if (data?.Length > 0 == true)
        {
            Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");

            data.FirstOrDefault().Values.ToList().ForEach(c =>
            {
                Console.WriteLine($"    {c.Name}:{c.Value}");
            });

            db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);
        }
    }
}

動畫

StackExchange.Redis 有點比較坑的是不存在阻塞讀取;理由://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

QA

Q:Stream是否支援AOF、RDB持久化?

A:支援,其它數據類型一樣,每個寫操作,也都會寫入到 RDB 和 AOF 中。

Q:Stream是否還是會丟數據?若是,何種情況下?;

A:會;1、AOF是定時寫盤的,如果數據還在記憶體中時redis服務宕機就會;2、主從切換時(從庫還未同步完成主庫發來的數據,就被提成主庫)

總結

技術中有的時候沒有「銀彈」,只有更適合的技術,汝之蜜糖彼之砒霜;

很多時候的技術選型都是個比較麻煩的東西,對選型人的要求很高;你可能不是只需要熟悉其中的一種路線,而是要踩過各種各樣的坑,再根據當前受限的環境,選擇比較適合目前需求/團隊的;

回到Stream上,我認為目前Stream能滿足挺大部分隊列需求;

特別是「在項目本身已經使用了Redis的情況下都想直接用Redis來做消息隊列,而不想引入新的更專業的mq,比如kafka和RabbitMQ的時候」

當然,最終決定需要用更專業的mq與否的,還是需求;

引用

//www.redis.cn/

//database.51cto.com/art/202104/659208.htm

//github.com/2881099/csredis/

//stackexchange.github.io/StackExchange.Redis/Streams.html