記一次Redis MQ完成的分散式改造,從此放心安全迭代

  • 2019 年 10 月 3 日
  • 筆記

引言

  熟悉TPL Dataflow博文的朋友可能記得這是個單體程式,使用TPL Dataflow 處理工作流任務, 在使用Docker部署的過程中, 有一個問題一直無法迴避:

       在單體程式部署的瞬間(服務不可用)會有少量流量無法處理;更糟糕的情況下,迭代部署的這個版本有問題,上線後無法運作, 更多的流量沒有得到處理。

      背負神聖使命(巨大壓力)的程式猿心生一計, 為何不將單體程式改成分散式:服務A只接受數據,服務B只處理數據。

 

知識儲備

    消息隊列和訂閱發布作為老生常談的兩個知識點被反覆提及,按照JMS的規範, 官方稱為點對點(point to point, queue) 和 訂閱發布(publish/subscribe,topic ),

點對點

  消息生產者生產消息發送到queue中,然後消費者從queue中取出並且消費消息。

隊列保留著消息,直到他們被消費或超時;

每個消息只有一個消費者,MQ支援多消費者,但是一個消息被消費以後,queue中不再有存儲

消息發送者和消費者在時間上沒有依賴性,當消息發送者發送消息之後, 不管接受者有沒有在運行,都不會影響到 消息被發送到隊列

③消息消費者在消費之後 需要向隊列應答成功

如果你希望發送的每個消息都應該被成功處理,你應該使用p2p模型

 

適用場合: 想讓接受者執行 有且僅一次處理,組件之後同步通訊。

發布/訂閱

  消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。

①每個消息可以有多個消費者

②發布者和消費者 有時間上依賴性, 針對某topic的訂閱者,必須先創建相應訂閱,才能消費消息

將消息發布到通道中,而不用知道訂閱者是誰(不關注是否存在訂閱者);訂閱者可收聽自己感興趣的多個通道, 也不需要知道發布者是誰(不關注是哪個發布者)。

③ 故如果沒有消費者,發布的消息將得不到處理;

頭腦風暴 

Redis 原生支援發布/訂閱 模型,內置的List數據結構亦能形成輕量級MQ的效果。

如上所述, Pub/Sub 模型 在訂閱者宕機的時候,發布的消息得不到處理,故此模型不能用於 強業務的 數據接收和處理。

本次採用的消息隊列模型:

  •    解耦業務:  新建Receiver程式作為生產者,專註於接收並發送到隊列;原有的webapp作為消費者專註數據處理。
  •    起到削峰填谷的作用, 若建立多個消費者webapp容器,還能形成負載均衡的效果。 

    需要關注Redis 兩個命令( 左進右出,右進左出同理):

    LPUSH  &  RPOP/BRPOP

Brpop 中的B 表示 “Block”, 是一個rpop命令的阻塞版本:若指定List沒有新元素,在給定時間內,該命令會阻塞當前redis客戶端連接,直到超時返回nil

編程實踐

本次使用 ASPNetCore 完成RedisMQ的實踐,引入Redis國產第三方開源庫CSRedisCore.

不使用著名的StackExchange.Redis 組件庫的原因:

  • 之前一直使用StackExchange.Redis, 參考了很多資料,做了很多優化,並未完全解決RedisTimeoutException問題 

  • StackExchange.Redis基於其多路復用的連接機制,不支援阻塞式命令, 故採用了 CSRedisCore,該庫強調了API 與Redis官方命令一致,很容易上手

生產者Receiver

 生產者使用LPush 命令向Redis List數據結構寫入消息。

------------------截取自Startup.cs-------------------------  

public void ConfigureServices(IServiceCollection services)
{
  // Redis客戶端要定義成單例, 不然在大流量並發收數的時候, 會造成redis client來不及釋放。另一方面也確認api控制器不是單例模式,
  var csredis = new CSRedisClient(Configuration.GetConnectionString(“redis”)+”,name=receiver”);
  RedisHelper.Initialization(csredis);
  services.AddSingleton(csredis);

 services.AddMvc();

}

------------------截取自數據接收Controller-------------------  [Route("batch")]  [HttpPost]  public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs)  {    if (!ModelState.IsValid)    throw new ArgumentException("Http Body Payload Error.");    var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";     eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);     if (eqidPairs != null && eqidPairs.Any())      RedisHelper.LPush(redisKey, eqidPairs.ToArray());      await Task.CompletedTask;   }

 消費者webapp

     根據以上RedisMQ思路,事件消費方式是拉取pull,故需要輪詢Redis  List數據結構,這裡使用ASPNetCore內置的BackgroundService後台服務類實現後台輪詢消費任務。

public class BackgroundJob : BackgroundService      {          private readonly IEqidPairHandler _eqidPairHandler;          private readonly CSRedisClient[] _cSRedisClients;          private readonly IConfiguration _conf;          private readonly ILogger _logger;          public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory)          {              _eqidPairHandler = eqidPairHandler;              _cSRedisClients = csRedisClients;              _conf = conf;              _logger = loggerFactory.CreateLogger(nameof(BackgroundJob));          }            protected override async Task ExecuteAsync(CancellationToken stoppingToken)          {              _logger.LogInformation("Service starting");              if (_cSRedisClients[0] == null)              {                  _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);              }              RedisHelper.Initialization(_cSRedisClients[0]);                while (!stoppingToken.IsCancellationRequested)              {                  var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";                  var eqidpair = RedisHelper.BRPop(5, key);                  if (eqidpair != null)                      await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));                  // 強烈建議無論如何休眠一段時間,防止突發大流量導致webApp進程CPU滿載,自行根據場景設置合理休眠時間                  await Task.Delay(10, stoppingToken);              }              _logger.LogInformation("Service stopping");          }      }

 

最後依照引言中的部署原理圖,將Nginx,Receiver, WebApp使用docker-compose工具容器化

根據docker-compsoe up命令的用法,若容器正在運行且對應的Service Configuration或Image並未改變,該容器不會被ReCreate;

docker-compose  up  命令默認只會停止Service或Image變更的容器並重建。

If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation, docker-compose up picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the --no-recreate flag.

做一次上線測試驗證,修改docker-compose.yml文件Web app的容器服務,docker-compose up;

僅數據處理程式WebApp容器被重建:

 Nice,分散式改造上線,效果很明顯,現在可以放心安全的迭代Web App數據處理程式。

作者:JulianHuang

碼甲拙見,如有問題請下方留言大膽斧正;碼字+Visio製圖,均為原創,看官請不吝好評+關注,  ~。。~

本文歡迎轉載,請轉載頁面明顯位置註明原作者及原文鏈接