如何利用.NETCore向Azure EventHubs准實時批量發送數據?

最近在做一個基於Azure雲的物聯網分析項目:

.netcore採集程序向Azure事件中心(EventHubs)發送數據,通過Azure EventHubs Capture轉儲到Azure BlogStorage,供數據科學團隊分析。

為什麼使用Azure事件中心?

Azure事件中心是一種Azure上完全託管的實時數據攝取服務, 每秒可流式傳輸來自website、app、device任何源的數百萬個事件。提供的統一流式處理平台和時間保留緩衝區,將事件生成者和事件使用者分開。

  • 事件生成者: 可使用https、AQMP協議發佈事件
  • 分區:事件中心通過分區使用者模式提供消息流式處理功能,提高可用性和並行化
  • 事件接收者:所有事件中心使用者通過AMQP 1.0會話進行連接,讀取數據

例如,如果事件中心具有四個分區,並且其中一個分區要在負載均衡操作中從一台服務器移動到另一台服務器,則仍可以通過其他三個分區進行發送和接收。 此外,具有更多分區可以讓更多並發讀取器處理數據,從而提高聚合吞吐量。 了解分佈式系統中分區和排序的意義是解決方案設計的重要方面。 為了幫助說明排序與可用性之間的權衡,請參閱 CAP 定理

最直觀的方式:請在portal.azure.cn門戶站點—->創建事件中心命名空間—> 創建事件中心

.NetCore 准實時批量發送數據到事件中心

.NET庫 (Azure.Messaging.EventHubs)

我們使用Asp.NetCore以Azure App Service形式部署,依賴Azure App Service的自動縮放能錄應對物聯網的潮汐大流量。

通常推薦批量發送到事件中心,能有效增加web服務的吞吐量和響應能力。
目前新版SDk: Azure.Messaging.EventHubs僅支持分批發送。

  1. nuget上引入Azure.Messaging.EventHubs庫
  2. EventHubProducerClient客戶端負責分批發送數據到事件中心,根據發送時指定的選項,事件數據可能會自動路由到可用分區或發送到特定請求的分區。

在以下情況下,建議允許自動路由分區:
1) 事件的發送必須高度可用
2) 事件數據應在所有可用分區之間平均分配。
自動路由分區的規則:
1)使用循環法將事件平均分配到所有可用分區中
2)如果某個分區不可用,事件中心將自動檢測到該分區並將消息轉發到另一個可用分區。

我們要注意,根據選定的 命令空間定價層, 每批次發給事件中心的最大消息大小也不一樣:

分段批量發送策略

這裡我們就需要思考: web程序收集數據是以個數為單位; 但是我們分批發送時要根據分批的位元組大小來切分。
我的方案是: 因引入TPL Dataflow 管道:

  1. web程序收到數據,立刻丟入TransformBlock<string, EventData>
  2. 轉換到EventData之後,使用BatchBlock<EventData>按照個數打包
  3. 利用ActionBlock<EventData[]>在包內 累積指定位元組大小批量發送
  • 最後我們設置一個定時器(5min),強制在BatchBlock的前置隊列未滿時打包,並發送。

核心的TPL Dataflow代碼如下:

public class MsgBatchSender
    {
        private readonly EventHubProducerClient Client;
        private readonly TransformBlock<string, EventData> _transformBlock;
        private readonly BatchBlock<EventData> _packer;
        private readonly ActionBlock<EventData[]> _batchSender;

        private readonly DataflowOption _dataflowOption;
        private readonly Timer _trigger;
        private readonly ILogger _logger;

        public MsgBatchSender(EventHubProducerClient client, IOptions<DataflowOption> option,ILoggerFactory loggerFactory)
        {
            Client = client;
            _dataflowOption = option.Value;
            var dfLinkoption = new DataflowLinkOptions { PropagateCompletion = true };

            _transformBlock = new TransformBlock<string, EventData>(
                text => new EventData(Encoding.UTF8.GetBytes(text)),
                   new ExecutionDataflowBlockOptions
                   {
                       MaxDegreeOfParallelism = _dataflowOption.MaxDegreeOfParallelism
                   });
            _packer = new BatchBlock<EventData>(_dataflowOption.BatchSize);
            _batchSender = new ActionBlock<EventData[]>(msgs=> BatchSendAsync(msgs));
            _packer.LinkTo(_batchSender, dfLinkoption);

            _transformBlock.LinkTo(_packer, dfLinkoption, x => x != null);

            _trigger = new Timer(_ => _packer.TriggerBatch(), null, TimeSpan.Zero, TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));

            _logger = loggerFactory.CreateLogger<DataTrackerMiddleware>();
        }

        private async Task BatchSendAsync(EventData[] msgs)
        {
            try
            {
                if (msgs != null)
                {
                    var i = 0;
                    while (i < msgs.Length)
                    {
                        var batch = await Client.CreateBatchAsync();
                        while (i < msgs.Length)
                        {
                            if (batch.TryAdd(msgs[i++]) == false)
                            {
                                break;
                            }
                        }
                        if(batch!= null && batch.Count>0)
                        {
                            await Client.SendAsync(batch);
                            batch.Dispose();
                        }
                    }
                }
            }
             catch (Exception ex)
            {
                // ignore and log any exception
                _logger.LogError(ex, "SendEventsAsync: {error}", ex.Message);
            }

        }

        public  async Task<bool> PostMsgsync(string txt)
        {
            return await _transformBlock.SendAsync(txt);
        }

        public async Task CompleteAsync()
        {
            _transformBlock.Complete();
            await _transformBlock.Completion;
            await _batchSender.Completion;
            await _batchSender.Completion;
        }
    }

總結

  • Azure事件中心的基礎用法
  • .NET Core准實時分批向Azure事件中心發送數據,其中用到的TPL Dataflow是以actor模型:提供了粗粒度的數據流和流水線任務,提高了高並發程序的健壯性。