記一次Redis MQ完成的分散式改造,從此放心安全迭代
- 2019 年 10 月 3 日
- 筆記
引言
熟悉TPL Dataflow博文的朋友可能記得這是個單體程式,使用TPL Dataflow 處理工作流任務, 在使用Docker部署的過程中, 有一個問題一直無法迴避:
在單體程式部署的瞬間(服務不可用)會有少量流量無法處理;更糟糕的情況下,迭代部署的這個版本有問題,上線後無法運作, 更多的流量沒有得到處理。
背負神聖使命(巨大壓力)的程式猿心生一計, 為何不將單體程式改成分散式:服務A只接受數據,服務B只處理數據。
知識儲備
消息隊列和訂閱發布作為老生常談的兩個知識點被反覆提及,按照JMS的規範, 官方稱為點對點(point to point, queue) 和 訂閱發布(publish/subscribe,topic ),
點對點
消息生產者生產消息發送到queue中,然後消費者從queue中取出並且消費消息。
隊列保留著消息,直到他們被消費或超時;
① 每個消息只有一個消費者,MQ支援多消費者,但是一個消息被消費以後,queue中不再有存儲
② 消息發送者和消費者在時間上沒有依賴性,當消息發送者發送消息之後, 不管接受者有沒有在運行,都不會影響到 消息被發送到隊列
③消息消費者在消費之後 需要向隊列應答成功
如果你希望發送的每個消息都應該被成功處理,你應該使用p2p模型
適用場合: 想讓接受者執行 有且僅一次處理,組件之後同步通訊。
發布/訂閱
消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。
①每個消息可以有多個消費者
②發布者和消費者 有時間上依賴性, 針對某topic的訂閱者,必須先創建相應訂閱,才能消費消息
將消息發布到通道中,而不用知道訂閱者是誰(不關注是否存在訂閱者);訂閱者可收聽自己感興趣的多個通道, 也不需要知道發布者是誰(不關注是哪個發布者)。
③ 故如果沒有消費者,發布的消息將得不到處理;
頭腦風暴
Redis 原生支援發布/訂閱 模型,內置的List數據結構亦能形成輕量級MQ的效果。
如上所述, Pub/Sub 模型 在訂閱者宕機的時候,發布的消息得不到處理,故此模型不能用於 強業務的 數據接收和處理。
本次採用的消息隊列模型:
- 解耦業務: 新建Receiver程式作為生產者,專註於接收並發送到隊列;原有的webapp作為消費者專註數據處理。
- 起到削峰填谷的作用, 若建立多個消費者webapp容器,還能形成負載均衡的效果。
需要關注Redis 兩個命令( 左進右出,右進左出同理):
LPUSH & RPOP/BRPOP
Brpop 中的B 表示 “Block”, 是一個rpop命令的阻塞版本:若指定List沒有新元素,在給定時間內,該命令會阻塞當前redis客戶端連接,直到超時返回nil
編程實踐
本次使用 ASPNetCore 完成RedisMQ的實踐,引入Redis國產第三方開源庫CSRedisCore.
不使用著名的StackExchange.Redis 組件庫的原因:
之前一直使用StackExchange.Redis, 參考了很多資料,做了很多優化,並未完全解決RedisTimeoutException問題
StackExchange.Redis基於其多路復用的連接機制,不支援阻塞式命令, 故採用了 CSRedisCore,該庫強調了API 與Redis官方命令一致,很容易上手
生產者Receiver
生產者使用LPush 命令向Redis List數據結構寫入消息。
------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{
// Redis客戶端要定義成單例, 不然在大流量並發收數的時候, 會造成redis client來不及釋放。另一方面也確認api控制器不是單例模式,
var csredis = new CSRedisClient(Configuration.GetConnectionString(“redis”)+”,name=receiver”);
RedisHelper.Initialization(csredis);
services.AddSingleton(csredis);
services.AddMvc();
}
------------------截取自數據接收Controller------------------- [Route("batch")] [HttpPost] public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs) { if (!ModelState.IsValid) throw new ArgumentException("Http Body Payload Error."); var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs); if (eqidPairs != null && eqidPairs.Any()) RedisHelper.LPush(redisKey, eqidPairs.ToArray()); await Task.CompletedTask; }
消費者webapp
根據以上RedisMQ思路,事件消費方式是拉取pull,故需要輪詢Redis List數據結構,這裡使用ASPNetCore內置的BackgroundService後台服務類實現後台輪詢消費任務。
public class BackgroundJob : BackgroundService { private readonly IEqidPairHandler _eqidPairHandler; private readonly CSRedisClient[] _cSRedisClients; private readonly IConfiguration _conf; private readonly ILogger _logger; public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory) { _eqidPairHandler = eqidPairHandler; _cSRedisClients = csRedisClients; _conf = conf; _logger = loggerFactory.CreateLogger(nameof(BackgroundJob)); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("Service starting"); if (_cSRedisClients[0] == null) { _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0); } RedisHelper.Initialization(_cSRedisClients[0]); while (!stoppingToken.IsCancellationRequested) { var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}"; var eqidpair = RedisHelper.BRPop(5, key); if (eqidpair != null) await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair)); // 強烈建議無論如何休眠一段時間,防止突發大流量導致webApp進程CPU滿載,自行根據場景設置合理休眠時間 await Task.Delay(10, stoppingToken); } _logger.LogInformation("Service stopping"); } }
最後依照引言中的部署原理圖,將Nginx,Receiver, WebApp使用docker-compose工具容器化
根據docker-compsoe up命令的用法,若容器正在運行且對應的Service Configuration或Image並未改變,該容器不會被ReCreate;
docker-compose up 命令默認只會停止Service或Image變更的容器並重建。
If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation,
docker-compose up
picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the--no-recreate
flag.
做一次上線測試驗證,修改docker-compose.yml文件Web app的容器服務,docker-compose up;
僅數據處理程式WebApp容器被重建:
Nice,分散式改造上線,效果很明顯,現在可以放心安全的迭代Web App數據處理程式。
碼甲拙見,如有問題請下方留言大膽斧正;碼字+Visio製圖,均為原創,看官請不吝好評+關注, ~。。~
本文歡迎轉載,請轉載頁面明顯位置註明原作者及原文鏈接。