YARN底層基礎庫

  • 2019 年 10 月 3 日
  • 筆記
 
YARN基礎庫是其他一切模組的基礎,它的設計直接決定了YARN的穩定性和擴展性,YARN借用了MRV1的一些底層基礎庫,比如RPC庫等,但因為引入了很多新的軟體設計方式,所以它的基礎庫更多,包括直接使用了開源序列化框架Protocol Buffers和Apache Avro,自定義的服務庫、事件庫和狀態機等


目錄

 
一. 概述
二. Protocol Buffers
三. Apache Avro
四. 底層通訊庫
五. 服務庫與事件庫
六. 狀態機庫


一. 概述
 
Yarn基礎庫是其他一切模組的基礎,它的設計直接決定了Yarn的穩定性和擴展性
 
Yarn的基礎庫主要有 : 
  • Protocol Buffers : Protocol Buffers是Google開源的序列化庫,具有平台無關,高性能,兼容好等優點.Yarn將ProtocolBuffers用到RPC通訊中,默認情況下,Yarn RPC中所有參數採用Protocol Buffers進行序列化/反序列化
  • Apache Avro : Avro是Hadoop生態系統中的RPC框架,具有平台無關,支援動態模式等優點,Avro的最初設計動機是解決Yarn RPC兼容性和擴展性差等問題
  • RPC庫 : Yarn採用MR1中的RPC庫,但其中採用的默認序列化方法被替換成了Protocol Buffers
  • 服務庫和事件庫 : Yarn將所有的對象服務化,以便統一管理(創建,銷毀等),而服務之間則採用事件機制進行通訊
  • 狀態機庫 : 狀態機是一種表示有限個狀態以及在這些狀態之間的轉移和動作等行為的數學模型
二. Protocol Buffers
 
Protocol Buffers是一種輕便高效的結構化數據存儲格式,可以用於結構化數據序列化/反序列化
 
他適合做數據存儲或RPC的數據交換格式,常用作通訊協議,數據存儲等領域的與語言無關,平台無關,可擴展的序列化結構數據格式
 
相比XML格式,Protocol Buffers的優點 : 
  • 平台無關,語言無關
  • 高性能,解析速度是XML的20 ~ 100倍
  • 體積小,文件大小僅是XML的1/10 ~ 1/3
  • 使用簡單
  • 兼容性好
 
Yarn中,所有的RPC函數的參數均採用Protocol Buffers定義的,Yarn的RPC協議全是使用Protocol Buffers定義(RPC協議上一章有介紹)
 
三. Apache Avro
 
Apache Avro是Hadoop下的一個子項目。它本身既是一個序列化框架,同時也實現了RPC的功能
 
Avro的特性和功能 : 
  • 豐富的數據結構類型
  • 快速可壓縮的二進位數據形式
  • 存儲持久數據的文件容器
  • 提供遠程過程調用RPC
  • 簡單的動態語言結合功能
 
Avro具有以下特點 : 
  • 支援動態模式。Avro不需要生成程式碼,有利於搭建通用的數據處理系統,避免了程式碼入侵
  • 數據無需加標籤
  • 無需手工分配的域標識
 
Avro作為日誌序列化庫使用,在Yarn MapReduce中,所有事件的序列化/反序列化均採用Avro完成
 
四. 底層通訊庫
  
HadoopRPC的解析參考我的文章Hadoop RPC機制詳解
 
YARN提供的對外類是Yarn RPC,用戶只需使用該類便可以構建一個基於HadoopRPC且採用Protocol Buffers序列化框架的通訊協議
 
五. 服務庫與事件庫
 
服務庫
 
對於生命周期較長的對象,YARN採用了基於服務的對象管理模型對其進行管理,該模型主要有以下幾個特點 : 
  • 將每個被服務化的對象分為4個狀態 : NOTINITED(被創建),INITED(已初始化),STARTED(已啟動),STOPPED(已停止)
  • 任何服務狀態變化都可以觸發另外一些動作
  • 可通過組合的方式對任意服務進行組合,以便進行統一管理
 
YARN中所有的服務對象最終都實現了介面Service,它定義了最基本的服務初始化、啟動、停止等操作,而AbstractService類提供了一個最基本的Service實現。
 
YARN中所有對象,如果是非組合服務,直接繼承AbstractService類即可,否則需CompositeService。比如,對於RM而言,它是一個組合服務,它組合了各種服務對象,包括ClientRMService、ApplicationMasterLauncher、ApplicationMasterService等
 
 
                     YARN中服務模型的類圖
 
YARN中,RM和NM屬於組合服務,它們內部包含多個單一服務和組合服務,以實現對內部多種服務的統一管理
 
事件庫
 
YARN採用了基於事件驅動的並發模型,該模型能夠大大增強並發性,從而提高系統整體性能。為了構建該模型,YARN將各種處理邏輯抽象成事件和對應事件調度器,並將每類事件的處理過程分割成多個步驟,用有限狀態機表示
 
 
                    YARN的事件處理模型
 
整個處理過程大致為 :處理請求會作為事件進入系統,由中央非同步調度器(AsyncDispatcher)負責傳遞給相應事件調度器(Event Handler)。該事件調度器可能將該事件轉發給另外一個事件調度器,也可能交給一個帶有有限狀態機的事件處理器,其處理結果也以事件的形式輸出給中央非同步調度器。而新的事件會再次被中央非同步調度器轉發給下一個事件調度器,直至處理完成(達到終止條件)
 
YARN中,所有核心服務實際上都是一個中央非同步調度器,包括RM、NM和AppMaster等,它們維護了事先註冊的事件與事件處理器,並根據接收的事件類型驅動服務的運行
 
使用YARN事件庫時,通常先定義一個中央非同步調度器,負責事件的轉發與處理,然後根據實際業務需求定義一系列的事件與事件處理器,並註冊到中央非同步調度器實現事件統一管理和調度。以MRAppMaster為例,它內部包含一個中央非同步調度器,並註冊了TaskAttemptEvent/TaskAttemptImpl、TaskEvent/TaskImpl、JobEvent/JobImpl等一系列事件/事件處理器,由中央非同步調度器統一管理和調度
 
服務化和事件驅動軟體設計思想的引入,使得YARN具有低耦合、高內聚的特點,各個模組只需完成各自功能,而模組之間則採用事件聯繫起來,系統設計簡單且維護方便
 
YARN服務庫和事件庫的使用
 
YARN服務庫和事件庫的使用方法,介紹一個簡單的實例,該例子涉及任務和作業兩種對象的事件以及一個中央非同步調度器
 
(1) 定義Task事件
 
public class TaskEvent extends AbstractEvent<TaskEventType> {      private String taskID;      public TaskEvent (String taskID, TaskEventType type) {          super(type);          this.taskID = taskID;      }      public String getTaskID() {          return taskID;      }  }  // Task事件類型定義  public enum TaskEventType {      T_KILL,      T_SCHEDULE  }

 
(2) 定義Job事件
 
public class JobEvent extends AbstractEvent<JobEventType> {      private String jobID;      public JobEvent (String jobID, JobEventType type) {          super(type);          this.jobID = jobID;      }  }  //Job事件類型定義  public enum JobEventType {      JOB_KILL,      JOB_INIT,      JOB_START  }

 
(3) 事件調度器
 
定義一個中央非同步調度器,它接收Job和Task兩種類型事件,並交給對應的事件處理器處理
@SuppressWarnings("unchecked")  public class SimpleMRAppMaster extends CompositeService {      private Dispatcher dispatcher; //中央非同步調度器      private String jobID;      private int taskNumber; //該作業包含的任務數目      private String[] taskIDs; //該作業內部包含的所有任務        public SimpleMRAppMaster (String name, String jobID, int taskNumber) {          super(name);          this.jobID = jobID;          this.taskNumber = taskNumber;          taskIDs = new String[taskNumber];          for (int i = 0; i < taskNumber; i++) {              taskIDs[i] = new String (jobID + "_task_" + i);          }      }        public void serviceInit (final Configuration conf) throws Exception {          dispatcher = new AsyncDispatcher(); //定義一個中央非同步調度器          //分別註冊Job和Task事件調度器          dispatcher.register(JobEventType.class, new JobEventDispatcher());          dispatcher.register(TaskEventType.class, new TaskEventDispatcher());          addService((Service)dispatcher);          super.serviceInit(conf);      }        public Dispatcher getDispatcher() {          return dispatcher;      }        private class JobEventDispatcher implements EventHandler<JobEvent> {          @Override          public void handle (JobEvent event) {              if (event.getType() == JobEventType.JOB_KILL) {                  System.out.println("Receive JOB_KILL event, killing all the tasks");                  for (int i = 0; i < taskNumber; i++) {                      dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_KILL));                  }              } else if (event.getType() == JobEventType.JOB_INIT) {                  System.out.println("Receive JOB_INIT event, scheduling tasks");                  for (int i = 0; i < taskNumber; i++) {                      dispatcher.getEventHandler().handle(new TaskEvent(taskIDs[i], TaskEventType.T_SCHEDULE));                  }              }          }      }        private class TaskEventDispatcher implements EventHandler<TaskEvent> {          @Override          public void handler (TaskEvent event) {              if (event.getType() == TaskEventType.T_KILL) {                  System.out.println("Receive T_KILL event of task" + event.getTaskID());              } else if (event.getType() == TaskEventType.T_SCHEDULE) {                  System.out.println("Receive T_SCHEDULE of task" + event.getTaskID());              }          }      }  }

 
(4). 測試程式
 
@SuppressWarnings("unchecked")  public class SimpleMRAppMasterTest {      public static void main (String[] args) throws Exception {          String jobID = "job_20131215_12";          SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);          YarnConfiguration conf = new YarnConfiguration(new Configuration());          appMaster.serviceInit(conf);          appMaster.serviceStart();          appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_KILL));          appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID, JobEventType.JOB_INIT));      }  }

 
事件驅動帶來的變化
 
MRV1中,對象之間的作用關係是基於函數調用實現的,當一個對象向另外一個對象傳遞消息時,會直接採用函數調用的方式,且整個過程是串列的
 
基於函數調用的編程模型時低效的,它隱含著整個過程是串列、同步進行的。MRV2引入了事件驅動編程模型是一種更加高效的方式。
 
在基於事件驅動的編程模型中,所有對象被抽象成了事件處理器,而事件處理器之間通過事件相互關聯。 每種事件處理器處理一種類型的事件,同時根據需要觸發另外一種事件
 
相比於基於函數調用的編程模型,這種編程模型具有非同步、並發等特點,更加高效,因此更適合大型分散式系統
 
六. 狀態機庫
 
狀態機由一組狀態組成,這些狀態分為三類:初始狀態、中間狀態和最終狀態。狀態機從初始狀態開始運行,經過一系列中間狀態後,到達最終狀態並退出。在一個狀態機中,每個狀態都可以接收一組特定事件,並根據具體的事件類型轉換到另一個狀態。當狀態機轉換到最終狀態時,則退出
 
YARN狀態轉換方式
 
YARN中,每種狀態轉換由一個四元組表示,分別是轉換前狀態(preState)、轉換後狀態(postState)、事件(event)和回調函數(hook)
 
YARN定義了三種狀態轉換方式 : 
 
(1) 一個初始狀態、一個最終狀態、一種事件。該方式表示狀態機在preState狀態下,接收到Event事件後,執行函數狀態轉移函數Hook,並在執行完成後將當前狀態轉換為postState
 
              初始狀態:最終狀態:事件=1:1:1
 
(2) 一個初始狀態、多個最終狀態、一種事件。該方式表示狀態機在preState狀態下,接收到Event事件後,執行函數狀態轉移函數Hook,並將當前狀態轉移為Hook的返回值所表示的狀態
 
               初始狀態:最終狀態:事件=1:N:1
 
(3) 一個初始狀態、一個最終狀態、多個事件。該方式表示狀態機在preState狀態下,接收到Event1、Event2和Event3中的任何一個事件,將執行函數狀態轉移函數Hook,並在執行完成後將當前狀態轉換成postState
 
             初始狀態:最終狀態:事件=1:1:N
 
狀態機類
 
YARN自己實現了一個非常簡單的狀態機庫(位於包org.apache.hadcop.yarn.state中)。YARN對外提供了一個狀態機工廠StatemachineFactory,它提供多種addTransition方法供用戶添加各種狀態轉移,一旦狀態機添加完畢後,可通過調用installTopology完成一個狀態機的構建

 
我每天會寫文章記錄大數據技術學習之路,另外我自己整理了些大數據的學習資料,目前全部放在我的公眾號”SmallBird技術分享”,加入我們一起學習交流,並且回復’分享’會有大數據資源驚喜等著你~