Event Hub小白入門指南

  • 2019 年 10 月 8 日
  • 筆記

Event Hub事件中心

本文的目的在於用最白的大白話,讓你從“完全不懂”開始,理解什麼是分佈式大數據流平台Event Hub,並且理解它的關鍵概念,並且初步理解其收發數據API。

定義,Event Hub是什麼、產生的目的

Event Hub是微軟雲服務Azure的一個產品,是分佈式大數據流平台。屬於PaaS。Event Hub:

  • 支持大規模、實時的流數據
  • 每秒能處理百萬級的事件
  • 簡單易用,託管式服務
  • 支持全球54個Azure地域

這裡指的大規模、實時的流數據是指什麼?Big Data Streaming 大數據流

很多應用需要從各處搜集數據來進行分析和處理,如網站收集用戶的使用數據,或者物聯網系統搜集所有連網設備的實時數據。這些數據從多個不同的終端產生,並且隨時都在產生。所以這些數據是流數據,即時性的,好像水流一樣,源源不斷,從一個地方流向下一個地方。

為什麼是“大數據”呢?因為這些數據可能從成千上萬的客戶端發出,並且發出的頻率很高,要彙集在一個地方進行處理,形成了很大的數據規模,所以是大數據。

Event Hub可以處理多大的數據?

每秒可以處理百萬級別的事件(event)。這裡的“事件”:就是你收發的數據。 – https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-about, 2019.8

 

什麼是託管式服務?Managed Service

Event Hub是全託管式的服務。什麼意思呢?就是如果不使用Event Hub這樣的託管式服務,那麼開發者需要自己對大數據流平台進行管理。比如Apache Kafka也是一個大數據流平台,但是它不是全託管的,這意味着開發者將需要自行搭建和管理大數據流的處理,比如搭建(購買和配置)虛擬機集群、安裝和管理Kafka、管理儲存,也就是說,開發者需要自行進行管理所有涉及的服務、更新、包、版本,或者需要再使用其他平台的服務代為完成這些步驟。而Event Hub為開發者全託管,開發者只需要創建Event Hub,然後就可以進行大數據的收發了,Event Hub保證中間的所有過程,提供穩定的服務。這樣開發者對中間過程的控制變弱了,但是可以更加關注自己的業務邏輯。

 

在大數據流的鏈條中,client客戶端產生數據,server服務器端接收數據和分析數據。Event Hub就像一個client和server中間的緩衝區域(buffer)。

為什麼要一個buffer:因為沒有buffer的話會造成依賴(dependency)和高耦合(tight coupling)。如果數據量大的話,就會出現問題。而buffer可以在這個數據生產線上起到控流的作用(flow control)。

 

Event Hub的典型用途是收集在遠端產生的遙測(telemetry)數據,包括從1)網絡應用的客戶端和2)遠端設備和門戶(如散落各地的共享單車)上獲取數據。

工作原理

本質上來講,Event Hub就是一個暫放數據的地方。

當數據從終端產生,發送數據給一個Event Hub的時候,Event Hub就把數據收集然後寫下來,寫在其內部的儲存里,然後我們就可以閱讀這些Event Hub為我們收集的數據,進行可視化、數據分析等等,做我們想做的事情。

 

Event Hub就好像是一個筆記本,我們從頭到尾寫,也從頭到尾讀。

||||||||||||||||||||||||||||||||||||||||||||||||||| –> 寫

讀–>

 

筆記本上寫下來的數據可以反覆閱讀,進入Event Hub的數據也可以被多次讀取。讀取數據的操作並不會將數據從Event Hub上刪除。

然而,Event Hub上的數據不是永久保存的。數據到達Event Hub後,會被保存一段時間(這段時間被稱作retention day,可以設置為1-7天)。就好比筆記本上,超過某段時間以後,老舊的筆記會被撕掉。

這就是Event Hub的基本工作原理。但是實際上還有一些更細節的概念,開發者們使用時必須要了解。其中最重要的包括Partition和Consumer Group。

 

什麼是Partition?

到達Event Hub的數據其實不是寫在同一個地方的,而是寫在幾個Partition上的。就好比其實Event Hub裏面不只有一個筆記本,而是有多個筆記本來記錄消息。

Partition好像是Event Hub儲存空間的扇區一般。數據在到達Event Hub時,會被挨個分配到某個Partition上,分配默認是輪流的方式(Round Robin)。也就是說,第一條消息到來的時候,會被寫到第一個筆記本上,而第二條消息到來的時候,會被寫到第二個筆記本上,以此類推。

筆記本 A:1  4

筆記本 B:2  5

筆記本 C:3   <- 6

  

這種分成多區的形式,目的是為了提供並行接收(讀取)的能力。

 

還是拿筆記本來打比方。比如,一開始的時候,你有一個Event Hub,裏面有2個筆記本來記錄數據,然後你請了一個人(讀取數據的應用)來從這兩個筆記本上讀取數據(應用可以開兩個線程來同時讀取數據)。由於剛開始消息並不多,所以一個人的腦子還是夠用的。

但是後面,你的業務越做越大,進入的數據流也越來越大,你請的人腦子不夠用了(CPU不夠,讀取速度不夠)。這時候,你就需要再請一個人(再開一個讀取應用實例),那麼這兩個人可以每個人管一個筆記本,讀取上面的數據。

 

然而你的業務越做越大,兩個人也讀不過來了,怎麼辦呢?這時候你可能要考慮再多加一個筆記本,這樣就可以再多請一個人了。不過因為目前partition數量不能在Event Hub創建之後修改,所以只能重新創建一個有更多Partition數量的Event Hub才能滿足要求。

  

當然,Partition並不是越多越好,因為每個Partition都要求有一個單獨的Receiver來讀取,而這意味着更多CPU資源和Socket連接的代價,所以要謹慎考慮增加Partition的數量,不要隨意耗費資源。

 

最多可以有多少個Partition?

一個Event Hub允許2-32個Partition,在創建Event Hub時設置。目前的話,Event Hub一旦創建就不能修改(只能創建一個新的Event Hub),所以要創建的時候合理預估並行讀取的數量。

 

什麼是Consumer Group

那Consumer Group是什麼呢?在Event Hub的概念中,Consumer Group相當於一個讀取時的視圖(View),我們可以保存一個Consumer Group下,讀取流的狀態(讀取的應用讀到了什麼位置,或者說偏移量)。這樣的話,如果一個應用的讀取連接因為某些原因而斷開,要重新建立讀取連接的話,我們就方便知道要從什麼位置繼續讀取。

 

我們可以在一個Event Hub中創建多個Consumer Group(最多20個),這樣不同的讀取應用就可以使用不同的Consumer Group來進行讀取。比如,一個Event Hub收集了所有的共享單車的狀態數據,而在分析數據的時候,我們有一個應用是用來監測當前單車分佈的位置是否需要派人調整,另一個應用則是需要進行用戶的使用習慣、行動路線來的分析。兩個應用目的不同,讀取的頻率也並不相同。這樣種情況下,就可以使用兩個Consumer Group,分別對應每個應用,這樣兩邊的讀取可以互不干擾。

 

如何使用

(*本文是一個介紹,並非教程。所以此處假設已經在Azure里創建了“事件中心命名空間 Event Hub Namespace”和“事件中心 Event Hub”,並且獲取了Event Hub的Connection String。)

 

那麼如何使用Event Hub API進行事件的收發呢?

發送接口 Sender API

有兩種方式都可以實現將數據發送到Event Hub上的操作。一種是用基於HTTPS協議的REST API來進行發送,也就是在header里設置授權信息,把要發送的數據POST到相應的URL。*

(* 詳細操作在這裡可以看到:https://docs.microsoft.com/en-us/rest/api/eventhub/event-hubs-runtime-rest。)

 

但是更推薦的是第二種發送方式,使用EventClient API。它背後是AMQP協議,更為高效。不過這裡我們並不需要理解AMQP的實現方式,只需要使用微軟提供的接口。

下面以C#為例,簡單介紹發送接口:

API接口在Microsoft.Azure.EventHubs的NuGet包里。

1. 創建 EventData,把要發送的消息放在EventData對象中。

 var eventData = new EventData(byteArray);

 

2. 創建 EventHubClient,這時要提供帶有授權信息(Connection String)好連接到指定的Event Hub。

 EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);

 

3. 調用EventHubClient的發送API來發送數據

 eventHubClient.SendAsync(eventData);

 

接收接口 Receiver API

從Event Hub裏面獲取出數據的接口基於AMQP協議(並沒有基於HTTPS協議的API)。

其中,仍然有兩種方式可以實現數據的獲取:1)使用EventHubClient的PartitionReceiver;2)使用EventProcessorHost (EPH)API。下面分別介紹:

https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-dotnet-standard-api-overview

1)使用PartitionReceiver

這是Event Hub提供的一個讀取API,可以從一個指定的Partition上讀取數據。

API接口在Microsoft.Azure.EventHubs的NuGet包里。

1. 和發送數據一樣,創建EventHubClient連接到Event Hub。這個EventHubClient在發送的時候也有使用到。

EventHubClient eventHubClient = EventHubClient.CreateFromConnectionString(connectionString);

 

2. 用EventHubClient的CreateReceiver() API來創建PartitionReceiver:

PartitionReceiver partitionReceiver = eventHubClient.CreateReceiver(“$Default”, “0”, DateTime.Now); //從默認ConsumerGroup上,獲取“0”號Partition上,從當前時刻以後的所有數據。

在這裡,我們想要

 

3. 調用PartitionReceiver的接收API來接收消息

var eventDatas = await partitionReceiver.ReceiveAsync(10); // 執行接收,設定最多接收10條數據

 

4. 解析數據

foreach (var ehEvent in ehEvents)

{

var message = UnicodeEncoding.UTF8.GetString(ehEvent.Body.Array);

// 放入數據分析邏輯的代碼

}

 

值得說明的是,一個PartitionReceiver只能從一個partition上讀取信息。所以通常的做法是,對於一個Event Hub的每一個partition,都創建一個PartitionReceiver來進行讀取,一一對應。

我們可以用EventHubClient的GetRuntimeInfoAsync() API來得到runtime信息,這裏面我們可以知道所有的partition,從而一一創建PartitionReceiver:

var runTimeInformation = await eventHubClient.GetRuntimeInformationAsync();

foreach (var partitionId in runTimeInformation.PartitionIds) {

var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, Date.Time.Now);

 }

 

2EventProcessorHost

PartitionReceiver直接從一個指定的partition上讀取數據,簡單好用。然而,我們往往需要讀取多個partition上的數據,並且在大數據的場景下對可擴展性有着相當的需求,這樣我們既需要管“給每個partition創建PartitionReceiver”這個事情,還需要管“開了多個應用實例以後partition讀取工作的分配”的事情,還要考慮“如果一個應用實例掛了,要怎麼從之前的進度開始接着讀取”,這樣寫起來十分複雜。

有沒有更自動省力的方法呢?

當然有噠!如果不想要手寫一個一個partition的讀取、offset的記錄、規模的縮放,我們可以使用EventProcessorHostEPH來進行處理。

本質上講,EPH提供兩個功能:1. EPH會自動把一個Event Hub中的每一個partition都創建一個EventProcessor(相當於一個receiver),並且把這些Processor平均分配給現有的應用實例去處理,並且實時監控這些應用實例;2. EPH會把讀取的進度自動保存下來。這樣,不管你有多少個實例在健康運行,或者某個實例掛掉了,都可以保證你的讀取正常進行,提供高可用性(availability)。

簡單點說,現在你有若干個個筆記本上的數據要讀,如果使用PartitionReceiver的方法,就是你請了若干個人來讀這些筆記本(創建若干App實例),你自己安排哪個人讀哪個筆記本,那麼如何分配你就得自己操心。如果使用EPH,那麼相當於請了一個經理來管理,經理會根據你請了幾個人(開幾個App實例),自動分配每個人乾的活。如果某個人請假了(某個App掛了),經理也會自動安排他的工作給其他人。同時,經理會記錄每個筆記本讀到了哪裡,以便如果讀取工作被中斷(如讀取連接斷開),後面還可以繼續從上次的地方接着讀。

API接口在Microsoft.Azure.EventHubs.Processor的NuGet包里。**

 

首先,要實現一個IEventProcessor接口:

CloseAsync(), OpenAsync(), ProcessErrorAsync(), ProcessEventsAsync()

 

public class YourEventProcessor : IEventProcessor{

public Task CloseAsync(PartitionContext context, CloseReason reason){

// your implementation when close

}

public Task OpenAsync(PartitionContext context){

// your implementation when open

}

public Task ProcessErrorAsync(PartitionContext context, Exception error){

// your implementation to process error

}

public Task ProcessEventAsync(PartitionContext context, IEnumerable<EventData> eventDatas){

// your implementation to process event data

if(eventDatas != null){

foreach(var eventData in eventDatas){

// process data here

}

}

return context.CheckpointAsync(); // save offset

}

}

上面的代碼中,實際的數據分析邏輯就寫在ProcessEventsAsync()里,最後 “return context.CheckpointAsync();” 進行讀取進度的保存。

然後,在主程序中創建EventProcessorHost:

var yourEventProcessorHost = new EventProcessorHost(

eventHubPath,

consumerGroupName,

eventHubConnectionString,

storageConnectionString,

containerName);

其中,eventHubPath、consumerGroupName、eventHubConnectionString是你創建好的EventHub的驗證信息,storageConnectionString和containerName是Azure Storage Account的驗證信息,這是用來保存讀取進度的,它也需要提前創建好(這裡不介紹如何創建)*。

下一步,要把這個EPH和剛剛創建的EventProcessor連接起來:

await yourEventProcessorHost.RegisterEventProcessorAsync<YourEventProcessor>();

最後,主程序退出時,取關EventProcessor。

await yourEventProcessorHost.UnregisterEventProcessorAsync();

這樣,即使你有多個應用實例,EPH可以幫你管理,並且以你想要的方式處理。

 

*也可以選擇使用別的storage,需要使用ICheckpointManager,這裡不詳細介紹。

** EPH接口正在進行重新優化設計,在本文發文後,接口可能會有較大變動,讀者請以實時的官方文檔為主。

 

Event Hub支持的語言以及API包:

入門指導文檔 https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-programming-guide

 

總結

EventHub的本質是messaging queue的變種——messaging log,是大數據時代下的產物,作為大數據分析流的門戶,專門為大數據分析提供了一個緩衝、負載平衡,為大數據的收發提供可靠、易於操作的平台服務。希望這篇文章所介紹的Event Hub的定義、工作原理、使用方式能帶給大家啟發。也希望有說的不夠清楚、不夠嚴謹的地方,請大家多多指正 🙂

 

特別感謝:

Xin大神

Jun

Adam