使用Redis Stream來做消息隊列和在Asp.Net Core中的實現
- 2021 年 9 月 15 日
- 筆記
- .NET Core, C#/.net/.netcore, Redis, Stream
寫在前面
我一直以來使用redis的時候,很多低烈度需求(並發要求不是很高)需要用到消息隊列的時候,在項目本身已經使用了Redis的情況下都想直接用Redis來做消息隊列,而不想引入新的服務,kafka和RabbitMQ等;
奈何這兄弟一直不給力;
雖然 Redis 的Pub/Sub 是實現了發布/訂閱的,但這傢伙最坑的是:丟數據
由於Pub/Sub 只是簡單的實現了發布訂閱模式,簡單的溝通起生產者和消費者,當接收生產者的數據後並立即推送或者說轉發給訂閱消費者,並不會做任何的持久化、存儲操作。由此:
- 消費者(客戶端)掉線;
- 消費者未訂閱(所以使用的時候一定記得先訂閱再生產);
- 服務端宕機;
- 消費者消費不過來,消息堆積(生產數據受數據緩衝區限制);
以上情況都會導致生產數據的丟失,基於上坑,據我所知大家很少使用Pub/Sub ;
不過官方的哨兵集群通訊的時候就是用的Pub/Sub;
然後,各路大佬結合隊列、阻塞等等實現了各種各樣的方案,主要是使用:BLPOP+LPUSH 的實現
這裡就不一一展開了,有興趣請看葉老闆文章;
可能是各種實現都會帶來各種的問題,redis的官方也看到了社區的掙扎。終於,到了Redis5.0,官方帶來了消息隊列的實現:Stream。
Redis Stream介紹
簡單來說Redis Stream 就是想用Redis 做消息隊列的最佳推薦;
XADD–發布消息
XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再發一條
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19
"1631628890025-0"
其中的’*’表示讓 Redis 自動生成唯一的消息 ID,格式是 「時間戳-自增序號」
XREAD–訂閱消息
訂閱消息
XREAD COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
‘0-0’ 表示從開頭讀取
如果需繼續拉取下一條,需傳入上一條消息的id
阻塞等待消息
XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0
阻塞等待消息id 『1631628890025-0』 後的消息
50000 阻塞時間(毫秒) 『0』 表示無限期阻塞
從到這裡就可以看出 Pub/Sub多端訂閱的最大優點,Stream也是支援的。有的同學很快就發現問題了,這裡多端訂閱後,沒有消息確認ACK機制。
沒錯,因為現在所有的消費者都是訂閱共同的消息,多端訂閱,如果某個客戶端ACK某條消息後,其他端消費不了,就實現不了多端消費了。
由此,引出 分組:GROUP
GROUP–訂閱分組消息(多端訂閱)
同樣先發布消息
XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19
"1631629084083-0"
XGROUP CREATE 創建分組
創建分組1
XGROUP CREATE stream1 group1 0-0
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0
OK
『0-0』 表示從開頭讀取
‘>’ 表示讀取最新,未被消費過的消息
XREADGROUP–分組讀取
分組 group1
XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >
consumer1 消費者名稱, redis伺服器會記住第一次使用的消費者名稱;
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >
(nil)
同樣
『0-0』 表示從開頭讀取
‘>’ 表示讀取最新,未被消費過的消息 (可以看到命令執行第二遍已經讀不到新消息了)
分組 group2
127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19
可以看到可以讀到同樣的消息,多端訂閱沒有問題;
當然分組也支援阻塞讀取:
#和XREAD一樣
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
#分組阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 >
『0』 表示無限期阻塞,單位(毫秒)
XPENDING–待處理消息
消息使用XREADGROUP 讀取後會進入待處理條目列表(PEL);
我們看看:
XPENDING stream1 group2
127.0.0.1:6379> XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"
2) "4"
表示:
- (integer) 4 //表示當前消費者組的待處理消息的數量
- “1631628884174-0” //消息最大id
- “1631629084083-0” //最小id
-
-
- “consumer1” // 消費者名稱
- “4” //消費者待處理消息數量
-
XACK–刪除已處理消息(消息確認機制)
我們已經知道group2待處理消息有4條,我們從頭讀取看看:
XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
假設最後一條消息 『1631629084083-0』 我已處理完成
127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1
再看:
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
127.0.0.1:6379> XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"
2) "3"
可以清楚看到goroup2 待處理消息剩下3條;
這時 Redis 已經把這條消息標記為「處理完成」不再追蹤;
Stream在Asp.net Core中的使用
private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";
發布:
csRedis.XAdd(_keyStream, "*", ("name", "message1"));
訂閱:
static async Task CsRedisStreamConsumer()
{
Console.WriteLine("CsRedis StreamConsumer start!");
var csRedis = new CSRedis.CSRedisClient(_connstr);
csRedis.XAdd(_keyStream, "*", ("name", "message1"));
try
{
csRedis.XGroupCreate(_keyStream, _nameGrourp);
}
catch { }
(string key, (string id, string[] items)[] data)[] product;
(string Pid, string Platform, string Time) data = (null, null, null);
while (true)
{
try
{
product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));
if (product?.Length > 0 == true && product[0].data?.Length > 0 == true)
{
Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");
product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>
{
Console.WriteLine($" {value}");
});
//csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);
}
}
catch (Exception)
{
//throw;
}
}
}
CSRedisCore
這裡的超時報錯可通過修改連接參數:syncTimeout 解決
CSRedisCore支援阻塞讀取;
StackExchange.Redis
發布:
db.StreamAdd(_keyStream, "name", "message1", "*");
訂閱:
static async Task StackExchangeRedisStreamConsumer()
{
Console.WriteLine("StackExchangeRedis StreamConsumer start!");
var redis = ConnectionMultiplexer.Connect(_connstr);
var db = redis.GetDatabase();
try
{
///初始化方式1
//db.StreamAdd(_keyStream, "name", "message1", "*");
//db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);
//方式2
db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);
}
catch { }
StreamEntry[] data = null;
while (true)
{
data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);
if (data?.Length > 0 == true)
{
Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");
data.FirstOrDefault().Values.ToList().ForEach(c =>
{
Console.WriteLine($" {c.Name}:{c.Value}");
});
db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);
}
}
}
StackExchange.Redis 有點比較坑的是不存在阻塞讀取;理由://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing
QA
Q:Stream是否支援AOF、RDB持久化?
A:支援,其它數據類型一樣,每個寫操作,也都會寫入到 RDB 和 AOF 中。
Q:Stream是否還是會丟數據?若是,何種情況下?;
A:會;1、AOF是定時寫盤的,如果數據還在記憶體中時redis服務宕機就會;2、主從切換時(從庫還未同步完成主庫發來的數據,就被提成主庫)
總結
技術中有的時候沒有「銀彈」,只有更適合的技術,汝之蜜糖彼之砒霜;
很多時候的技術選型都是個比較麻煩的東西,對選型人的要求很高;你可能不是只需要熟悉其中的一種路線,而是要踩過各種各樣的坑,再根據當前受限的環境,選擇比較適合目前需求/團隊的;
回到Stream上,我認為目前Stream能滿足挺大部分隊列需求;
特別是「在項目本身已經使用了Redis的情況下都想直接用Redis來做消息隊列,而不想引入新的更專業的mq,比如kafka和RabbitMQ的時候」
當然,最終決定需要用更專業的mq與否的,還是需求;
引用
//database.51cto.com/art/202104/659208.htm
//stackexchange.github.io/StackExchange.Redis/Streams.html