.net core Redis消息隊列中間件【InitQ】
前言
這是一篇拖更很久的博客,不知不覺InitQ在nuget下載量已經過15K了,奈何胸無點墨也不曉得怎麼寫(懶),隨便在github上掛了個md,現在好好嘮嘮如何在redis里使用隊列
隊列緩存分佈式 異步調優堆配置 ——(來自某位不知名碼友)
誕生背景
redis在項目中使用的越來越頻繁,通常我們是用來做緩存,使用較多的就是String,Hash這兩種類型,以及分佈式鎖,redis的List類型,就可以用於消息隊列,使用起來更加簡單,且速度更快,非常適合子服務內部之間的消息流轉,創造靈感來自於楊老闆的CAP(地址://www.cnblogs.com/tibos/p/11858095.html),採用註解的方式消費隊列,讓業務邏輯更加的清晰,方便維護
安裝環境
- .net core版本:2.1
- redis版本:3.0以上
特點
1.通過註解的方式,訂閱隊列
2.可以設置消費消息的頻次
3.支持消息廣播
4.支持延遲隊列
使用介紹
-
1.獲取initQ包
方案A. install-package InitQ
方案B. nuget包管理工具搜索 InitQ -
2.添加中間件(該中間件依賴 StackExchange.Redis)
services.AddInitQ(m=> { m.SuspendTime = 1000; m.IntervalTime = 1000; m.ConnectionString = "127.0.0.1,connectTimeout=15000,syncTimeout=5000,password=123456"; m.ListSubscribe = new List<Type>() { typeof(RedisSubscribeA), typeof(RedisSubscribeB) }; m.ShowLog = false; });
-
3.配置說明
public class InitQOptions { /// <summary> /// redis連接字符串 /// </summary> public string ConnectionString { get; set; } /// <summary> /// 沒消息時掛起時長(毫秒) /// </summary> public int SuspendTime { get; set; } /// <summary> /// 每次消費消息間隔時間(毫秒) /// </summary> public int IntervalTime { get; set; } /// <summary> /// 是否顯示日誌 /// </summary> public bool ShowLog { get; set; } /// <summary> /// 需要注入的類型 /// </summary> public IList<Type> ListSubscribe { get; set; } public InitQOptions() { ConnectionString = ""; IntervalTime = 0; SuspendTime = 1000; ShowLog = false; } }
消息發佈/訂閱
消息的發佈/訂閱是最基礎的功能,這裡做了幾個優化
- 採用的是長輪詢模式,可以控制消息消費的頻次,以及輪詢空消息的間隔,避免資源浪費
- 支持多個類訂閱消息,可以很方便的根據業務進行分類,前提是這些類 必須註冊
- 支持多線程消費消息(在執行耗時任務的時候,非常有用)
示例如下(Thread.Sleep):
public class RedisSubscribeA: IRedisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg}");
Thread.Sleep(3000); //使用堵塞線程模式,同步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg} 完成");
}
}
public class RedisSubscribeA: IRedisSubscribe
{
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg}");
Thread.Sleep(3000); //使用堵塞線程模式,同步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg} 完成");
}
[Subscribe("tibos_test_1")]
private async Task SubRedisTest2(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg}");
Thread.Sleep(3000); //使用堵塞線程模式,同步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg} 完成");
}
}
示例如下(Task.Delay):
[Subscribe("tibos_test_1")]
private async Task SubRedisTest(string msg)
{
Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg}");
await Task.Delay(3000); //使用非堵塞線程模式,異步延時
Console.WriteLine($"A類<---當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者A消費消息:{msg} 完成");
}
根據業務情況,合理的選擇堵塞模式
- 1.訂閱發佈者
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis對象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); //循環向 tibos_test_1 隊列發送消息 for (int i = 0; i < 1000; i++) { await _redis.ListRightPushAsync("tibos_test_1", $"我是消息{i + 1}號"); } }
- 2.定義消費者類 RedisSubscribeA
public class RedisSubscribeA: IRedisSubscribe { [Subscribe("tibos_test_1")] private async Task SubRedisTest(string msg) { Console.WriteLine($"A類--->訂閱者A消息消息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest1(string msg) { Console.WriteLine($"A類--->訂閱者A1消息消息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest2(string msg) { Console.WriteLine($"A類--->訂閱者A2消息消息:{msg}"); } [Subscribe("tibos_test_1")] private async Task SubRedisTest3(string msg) { Console.WriteLine($"A類--->訂閱者A3消息消息:{msg}"); } }
- 3.定義消費者類 RedisSubscribeB
public class RedisSubscribeB : IRedisSubscribe { /// <summary> /// 測試 /// </summary> /// <param name="msg"></param> /// <returns></returns> [Subscribe("tibos_test_1")] private async Task SubRedisTest(string msg) { Console.WriteLine($"B類--->訂閱者B消費消息:{msg}"); } }
消息廣播/訂閱
消息廣播是StackExchange.Redis已經封裝好的,我們只用起個線程監聽即可,只要監聽了這個key的線程,都會收到消息
- 1.訂閱消息通道,訂閱者需要在程序初始化的時候啟動一個線程偵聽通道,這裡使用HostedService來實現,並註冊到容器
public class ChannelSubscribeA : IHostedService, IDisposable { private readonly IServiceProvider _provider; private readonly ILogger _logger; public ChannelSubscribeA(ILogger<TestMain> logger, IServiceProvider provider) { _logger = logger; _provider = provider; } public void Dispose() { _logger.LogInformation("退出"); } public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("程序啟動"); Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis對象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) => { Console.WriteLine("test_channel" + " 訂閱服務A收到消息:" + message); })); } }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("結束"); return Task.CompletedTask; } }
public class ChannelSubscribeB : IHostedService, IDisposable { private readonly IServiceProvider _provider; private readonly ILogger _logger; public ChannelSubscribeB(ILogger<TestMain> logger, IServiceProvider provider) { _logger = logger; _provider = provider; } public void Dispose() { _logger.LogInformation("退出"); } public Task StartAsync(CancellationToken cancellationToken) { _logger.LogInformation("程序啟動"); Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis對象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); await _redis.SubscribeAsync("test_channel", new Action<RedisChannel, RedisValue>((channel, message) => { Console.WriteLine("test_channel" + " 訂閱服務B收到消息:" + message); })); } }); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _logger.LogInformation("結束"); return Task.CompletedTask; } }
- 2.將HostedService類注入到容器
services.AddHostedService<ChannelSubscribeA>(); services.AddHostedService<ChannelSubscribeB>();
- 3.廣播消息
using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis對象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); for (int i = 0; i < 1000; i++) { await _redis.PublishAsync("test_channel", $"往通道發送第{i}條消息"); } }
延遲消息
延遲消息非常適用處理一些定時任務的場景,如訂單15分鐘未付款,自動取消, xxx天后,自動續費…… 這裡使用zset+redis鎖來實現,這裡的操作方式,跟發佈/訂閱非常類似
寫入延遲消息:SortedSetAddAsync
註解使用:SubscribeDelay
-
1.定義發佈者
Task.Run(async () => { using (var scope = _provider.GetRequiredService<IServiceScopeFactory>().CreateScope()) { //redis對象 var _redis = scope.ServiceProvider.GetService<ICacheService>(); for (int i = 0; i < 100; i++) { var dt = DateTime.Now.AddSeconds(3 * (i + 1)); //key:redis里的key,唯一 //msg:任務 //time:延時執行的時間 await _redis.SortedSetAddAsync("test_0625", $"延遲任務,第{i + 1}個元素,執行時間:{dt.ToString("yyyy-MM-dd HH:mm:ss")}", dt); } } });
-
2.定義消費者
//延遲隊列 [SubscribeDelay("test_0625")] private async Task SubRedisTest1(string msg) { Console.WriteLine($"A類--->當前時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")} 訂閱者延遲隊列消息開始--->{msg}"); //模擬任務執行耗時 await Task.Delay(TimeSpan.FromSeconds(3)); Console.WriteLine($"A類--->{msg} 結束<---"); }
版本
- V1.0 更新時間:2019-12-30
版本庫:
- Git獲取://github.com/wmowm/InitQ
作者:提伯斯