[源碼解析] 深度學習分散式訓練框架 horovod (13) — 彈性訓練之 Driver

[源碼解析] 深度學習分散式訓練框架 horovod (13) — 彈性訓練之 Driver

0x00 摘要

Horovod 是Uber於2017年發布的一個易於使用的高性能的分散式訓練框架,在業界得到了廣泛應用。

本系列將通過源碼分析來帶領大家了解 Horovod。本文是系列第十三篇,看看 horovod 彈性實現中 的 Driver 角色。

本部分對應架構圖中的 Driver main 部分,因為這部分和 Host discovery 強相關,所以一起展示出來。因為彈性訓練的主體是 Driver,所以本文是把 調用程式碼 和 Driver 一起分析。

本系列其他文章鏈接如下:

[源碼解析] 深度學習分散式訓練框架 Horovod (1) — 基礎知識

[源碼解析] 深度學習分散式訓練框架 horovod (2) — 從使用者角度切入

[源碼解析] 深度學習分散式訓練框架 horovod (3) — Horovodrun背後做了什麼

[源碼解析] 深度學習分散式訓練框架 horovod (4) — 網路基礎 & Driver

[源碼解析] 深度學習分散式訓練框架 horovod (5) — 融合框架

[源碼解析] 深度學習分散式訓練框架 horovod (6) — 後台執行緒架構

[源碼解析] 深度學習分散式訓練框架 horovod (7) — DistributedOptimizer

[源碼解析] 深度學習分散式訓練框架 horovod (8) — on spark

[源碼解析] 深度學習分散式訓練框架 horovod (9) — 啟動 on spark

[源碼解析] 深度學習分散式訓練框架 horovod (10) — run on spark

[源碼解析] 深度學習分散式訓練框架 horovod (11) — on spark — GLOO 方案

[源碼解析] 深度學習分散式訓練框架 horovod (12) — 彈性訓練總體架構

0x01 角色

我們首先要回憶一下彈性訓練中的角色設定。

1.1 角色設定

Horovod 的彈性訓練包含兩個角色,driver 進程和 worker 進程。driver 進程運行在 CPU 節點上,worker 進程可運行在 CPU 或者 GPU 節點上

這兩個角色和 Spark 的 Driver — Executor 依然很類似。Driver 進程就可以認為是 Spark 的 Driver,或者說是 master 節點。Worker 就類似於 Spark 的 Executor。

具體如圖:

                 +------------------------------+
                 |                              |
                 |            Driver            |
                 |                              |
                 |                              |
                 +-----+-------+--------+-------+
                       ^       ^        ^
                       |       |        |
                       |       |        |
         +-------------+       |        +--------------+
         |                     |                       |
         |                     |                       |
         |                     |                       |
         v                     v                       v
+--------+----+        +-------+------+           +----+--------+
|  Worker     |        |  Worker      |           |  Worker     |
|             |        |              |           |             |
|      host1  |        |      host2   |           |     host3   |
+-------------+        +--------------+           +-------------+

1.2 職責

角色的職責如下:

master(控制節點)職責:

  • 負責實時檢測現有 worker(工作節點)是否有變化,掉線情況;
  • 負責通過腳本來實時監控 host 是否有變化;
  • 負責分配任務到存活的worker(工作節點);
  • 在有AllReduce 調用失敗導致進程失敗的情況下,master 通過 blacklist 機制 組織剩下的活著的進程構造一個新的環。
  • 如果有新 host 加入,則在新host之上生成新的 worker,新 worker 和 舊 worker 一起構造成一個新的通訊環。

worker(工作節點)職責:

  • 負責彙報(其實是被動的,沒有主動機制)自己的狀態(就是訓練完成情況);

  • 負責在該worker(工作節點)負責的數據上執行訓練。

0x02 調用部分

我們首先分析調用部分,彈性調用具體是從普適到特殊,一點點深入。

2.1 _run

前文介紹了 horovod 程式的入口是 _run 函數。可以看到,會依據是否是彈性訓練來選擇不同的路徑。我們本文開始介紹 _run_elastic。

def _run(args):
    # if hosts are not specified, either parse from hostfile, or default as
    # localhost
    if not args.hosts and not args.host_discovery_script:
        if args.hostfile:
            args.hosts = hosts.parse_host_files(args.hostfile)
        else:
            # Set hosts to localhost if not specified
            args.hosts = 'localhost:{np}'.format(np=args.np)

    # Convert nics into set
    args.nics = set(args.nics.split(',')) if args.nics else None

    if _is_elastic(args):
        return _run_elastic(args) # 本文在這裡
    else:
        return _run_static(args)

2.2 _run_elastic

此部分邏輯如下:

  • 首先,如果參數配置了「獲取參數的腳本」,則調用 discovery.HostDiscoveryScript 得到一個object(目前只是一個object,未來在構建 ElasticDriver 的時候會獲取 host 資訊)。否則就直接讀取固定 host 配置;
  • 其次,使用 host 配置以及其他資訊來配置 ElasticSettings;
  • 最後,調用 gloo_run_elastic 來進行彈性訓練;

程式碼如下:

def _run_elastic(args):
    # construct host discovery component
    if args.host_discovery_script:
        discover_hosts = discovery.HostDiscoveryScript(args.host_discovery_script, args.slots)
    elif args.hosts:
        _, available_host_slots = hosts.parse_hosts_and_slots(args.hosts)
        discover_hosts = discovery.FixedHosts(available_host_slots)
    ......

    # horovodrun has to finish all the checks before this timeout runs out.
    settings = elastic_settings.ElasticSettings(discovery=discover_hosts,
                                                min_np=args.min_np or args.np,
                                                max_np=args.max_np,
                                                elastic_timeout=args.elastic_timeout,
                                                reset_limit=args.reset_limit,
                                                num_proc=args.np,
                                                verbose=2 if args.verbose else 0,
                                                ssh_port=args.ssh_port,
                                                ssh_identity_file=args.ssh_identity_file,
                                                extra_mpi_args=args.mpi_args,
                                                key=secret.make_secret_key(),
                                                start_timeout=tmout,
                                                output_filename=args.output_filename,
                                                run_func_mode=args.run_func is not None,
                                                nics=args.nics,...)

    env = os.environ.copy()
    config_parser.set_env_from_args(env, args)
    gloo_run_elastic(settings, env, args.command)

2.3 gloo_run_elastic

這部分開始,就是架構圖中第一部分:

主要做了如下:

  • 定義了 get_common_interfaces,這是一個可以獲取網路路由資訊以及host能力的函數;需要注意的一點是:會等待所需的最小節點數目,然後才會開始獲取網路路由。
  • _exec_command_fn 我們在之前介紹過,就是提供了一種運行命令的能力,或者說是運行環境;
  • 建立了一個 RendezvousServer,用來保存各種host資訊;
  • 使用以上這些參數和 command 參數來運行 launch_gloo_elastic,command參數就是類似 python train.py
def gloo_run_elastic(settings, env, command):

    def get_common_interfaces(driver):
        # Host-to-host common interface detection requires at least 2 hosts in an elastic job.
        min_hosts = _get_min_start_hosts(settings)
        current_hosts = driver.wait_for_available_slots(settings.num_proc, min_hosts=min_hosts)
        return driver_service.get_common_interfaces(settings, current_hosts.host_assignment_order)

    exec_command = _exec_command_fn(settings)
    rendezvous = RendezvousServer(settings.verbose)
    launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous)

2.4 get_common_interfaces

get_common_interfaces 可以獲取網路路由資訊以及host。

  • 如果配置了遠端host,則會分散式執行,在各個 host 之上執行,獲取每個 host 的網卡和路由資訊。
  • 否則就獲取本地網卡等資訊。

具體函數在:runner/driver/driver_service.py

def get_common_interfaces(settings, all_host_names, remote_host_names=None, fn_cache=None):
    if remote_host_names is None:
        remote_host_names = network.filter_local_addresses(all_host_names)

    if len(remote_host_names) > 0:
        if settings.nics:
            # If args.nics is provided, we will use those interfaces. All the workers
            # must have at least one of those interfaces available.
            nics = settings.nics
        else:
            # Find the set of common, routed interfaces on all the hosts (remote
            # and local) and specify it in the args to be used by NCCL. It is
            # expected that the following function will find at least one interface
            # otherwise, it will raise an exception.
            local_host_names = set(all_host_names) - set(remote_host_names)
            nics = _driver_fn(all_host_names, local_host_names, settings, fn_cache=fn_cache)

    else:
        nics = get_local_interfaces(settings)
    return nics

get_local_interfaces 是獲取本地 host 的網卡資訊。

def get_local_interfaces(settings):
    if settings.verbose >= 2:
        print('All hosts are local, finding the interfaces '
              'with address 127.0.0.1')
    # If all the given hosts are local, find the interfaces with address
    # 127.0.0.1
    nics = set()
    for iface, addrs in net_if_addrs().items():
        if settings.nics and iface not in settings.nics:
            continue
        for addr in addrs:
            if addr.family == AF_INET and addr.address == '127.0.0.1':
                nics.add(iface)
                break

    return nics

2.5 獲取異地網卡資訊

此處資訊在前文中已經講述,我們精簡如下:

_driver_fn 的作用是分散式執行 探尋函數,作用是:

  • 啟動 service 服務;
  • 使用 driver.addresses() 獲取 Driver 服務的地址(使用self._addresses = self._get_local_addresses()完成);
  • 使用 _launch_task_servers(利用 Driver 服務的地址)在每個 worker 之中啟動 task 服務,然後 task 服務會在 service 服務中註冊;
  • 因為是一個環形,每個 worker 會探測 worker index + 1 的所有網路介面;
  • 最後 _run_probe 返回一個所有 workers 上的所有路由介面的交集;
@cache.use_cache()
def _driver_fn(all_host_names, local_host_names, settings):
    """
    launches the service service, launches the task service on each worker and
    have them register with the service service. Each worker probes all the
    interfaces of the worker index + 1 (in a ring manner) and only keeps the
    routed interfaces. Function returns the intersection of the set of all the
    routed interfaces on all the workers.
    :param all_host_names: list of addresses. for example,
        ['worker-0','worker-1']
        ['10.11.11.11', '10.11.11.12']
    :type all_host_names: list(string)
    :param local_host_names: host names that resolve into a local addresses.
    :type local_host_names: set
    :param settings: the object that contains the setting for running horovod
    :type settings: horovod.runner.common.util.settings.Settings
    :return: example: ['eth0', 'eth1']
    :rtype: list[string]
    """
    # Launch a TCP server called service service on the host running horovod
    num_hosts = len(all_host_names)
    driver = HorovodRunDriverService(num_hosts, settings.key, settings.nics)

    # Have all the workers register themselves with the service service.
    _launch_task_servers(all_host_names, local_host_names,
                         driver.addresses(), settings)
    try:
        return _run_probe(driver, settings, num_hosts)
    finally:
        driver.shutdown()

2.6 launch_gloo_elastic

到了這裡,才是正式調用起 gloo 彈性系統,就是生成 Driver 相關部分 & 建立 彈性訓練的 worker。

運行之中,只有一個 RendezvousServer,launch_gloo_elastic 也只運行一次。

邏輯如下:

  • 如果需要配置輸出文件,則創建;
  • 使用”發現腳本”等作為參數 建立 ElasticDriver;
  • 使用 create_rendezvous_handler 作為 handler 來啟動 RendezvousServer;
  • 使用 driver.wait_for_available_slots 來等待所需的最小數目 slots;
  • 如果等到了,就調用 get_common_interfaces 獲取網路路由等,從而得到 server ip;
  • 註冊 shutdown event;
  • 利用 get_run_command 得到運行的命令;
  • 利用 _create_elastic_worker_fn 建立 彈性訓練的 worker;

簡單程式碼如下:

def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous):
    # Make the output directory if it does not exist
    if settings.output_filename:
        _mkdir_p(settings.output_filename)

    # 使用"發現腳本"等作為參數 建立 ElasticDriver
    driver = ElasticDriver(rendezvous, settings.discovery,
                           settings.min_np, settings.max_np,
                           timeout=settings.elastic_timeout,
                           reset_limit=settings.reset_limit,
                           verbose=settings.verbose)

    handler = create_rendezvous_handler(driver)
    global_rendezv_port = rendezvous.start(handler) # 啟動 RendezvousServer
    driver.wait_for_available_slots(settings.num_proc)

    nics = get_common_interfaces(driver) # 獲取網路路由等
    server_ip = network.get_driver_ip(nics)

    event = register_shutdown_event()
    run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)

    # 建立 彈性訓練的 worker
    create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)

    driver.start(settings.num_proc, create_worker)
    res = driver.get_results()
    driver.stop()

    for name, value in sorted(res.worker_results.items(), key=lambda item: item[1][1]):
        exit_code, timestamp = value

這裡很複雜,我們需要逐一分析。

首先,我們看看 get_run_command,這個在前文spark gloo之中介紹過,這裡再說一下。

它會調用 create_run_env_vars 得到gloo需要資訊,並據此構建 run_command,其格式如下:

HOROVOD_GLOO_RENDEZVOUS_ADDR=1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python 

可以看到,elastic 和 spark gloo 版本很類似,都是使用 RendezvousServer 來完成一些master的控制功能。

其次,我們看看Driver主體。

0x03 Driver Main

3.1 ElasticDriver

定義如下,基本成員是:

  • _rendezvous :driver 會根據當前正在運行的節點重新執行一個 RendezvousServer,這個 rendezvous 會存儲每個 worker 的地址和給其在邏輯通訊環分配的序號 rank;

  • _host_manager :HostManager 負責發現,管理各種 host;

  • _worker_registry :WorkerStateRegistry

  • _discovery_thread :負責後台定期探尋 host,具體會調用 _host_manager 完成功能;

  • _worker_clients :WorkerNotificationClient,每一個 worker 對應一個;

  • _host_assignments :host 分配資訊;

  • _rank_assignments :rank 分配資訊。rank 可以認為是代表分散式任務里的一個執行訓練的進程。Rank 0 在Horovod中通常具有特殊的意義:它是負責此同步的設備。

  • _world_size :進程總數量,會等到所有world_size個進程就緒之後才會開始訓練;

  • _wait_hosts_cond :類型是 threading.Condition,目的是等待 訓練所需最小的 host 數目;

具體定義如下:

class ElasticDriver(object):
    def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0):
        self._rendezvous = rendezvous
        self._host_manager = HostManager(discovery)

        self._host_assignments = {}
        self._rank_assignments = {}
        self._world_size = 0

        self._wait_hosts_cond = threading.Condition()

        self._create_worker_fn = None
        self._worker_clients = {}

        self._worker_registry = WorkerStateRegistry(self, self._host_manager, reset_limit=reset_limit)
        self._results = ResultsRecorder()
        self._shutdown = threading.Event()

        self._discovery_thread = threading.Thread(target=self._discover_hosts)
        self._discovery_thread.daemon = True
        self._discovery_thread.start()

創建Driver之後,接下來的主要動作是:

  • 等待最小數目 host。
  • 配置 worker。
  • 啟動 driver,其內部會啟動worker。
  • Driver 等待 worker 的運行結果。

我們逐步分析。

3.2 等待最小數目 host

啟動之後,會調用 driver.wait_for_available_slots(settings.num_proc) 等待最小數目host。

可以看到,這裡就是無限循環等待,如果 avail_slots >= min_np and avail_hosts >= min_hosts 才會返回。其實,就是看 self._host_manager.current_hosts 的數目是否已經達到了 所需最小的 host 數目,而且 slot 也達到了所需最小數目。

def wait_for_available_slots(self, min_np, min_hosts=1):
    tmout = timeout.Timeout(self._timeout,  message='')
    self._wait_hosts_cond.acquire()

    try:
        while True: # 無限循環等待
            current_hosts = self._host_manager.current_hosts
            
            avail_slots = current_hosts.count_available_slots()
            avail_hosts = len(current_hosts.available_hosts)
            
            if avail_slots >= min_np and avail_hosts >= min_hosts:
                return current_hosts
            if self._shutdown.is_set():
                raise RuntimeError('Job has been shutdown, see above error messages for details.')
            self._wait_hosts_cond.wait(tmout.remaining())
            tmout.check_time_out_for('minimum number of slots to become available')
    finally:
        self._wait_hosts_cond.release()

邏輯如下:

      launch_gloo_elastic
               +
               |
               |
               |
               |                                +----------------+
               v                                |   HostManager  |
+--------------+------------------+   wait      |                |
| driver.wait_for_available_slots | +---------> |                |
+---------------------------------+             |  current_hosts |
                                                +----------------+

3.3 配置 worker

配置過程 是由 _create_elastic_worker_fn 完成。

_create_elastic_worker_fn 分為兩部分:

  • _slot_info_to_command_fn 會建立 slot_info_to_command,套路和之前文章中類似,就是把各種環境變數和運行命令 run_command 糅合起來,得到一個可以在 「某個 host and slot」 之上運行的命令文本;
  • 返回 create_worker。
    • create_worker 是利用 exec_command 和 命令文本 構建的函數。
    • exec_command 我們在之前介紹過,就是提供了一種運行命令的能力,或者說是運行環境;
    • 所以 create_worker 就是提供一個在某個環境下運行某個命令的能力;

這幾個概念關係具體如下:

3.4 啟動 driver

driver.start(settings.num_proc, create_worker)

具體啟動經歷了以下幾個步驟。

3.4.1 start

def start(self, np, create_worker_fn):
    self._create_worker_fn = create_worker_fn
    self._activate_workers(np)

3.4.2 _activate_workers

ElasticDriver 的 resume / start 函數會調用到 _activate_workers,其定義如下,可以看到,如果此時 discovery 腳本已經發現了新節點,進而返回了 pending_slotspending_slots 就是可以在這些 slot 之上啟動新 worker 的,於是 就會 調用 _start_worker_processes

def _activate_workers(self, min_np):
    current_hosts = self.wait_for_available_slots(min_np)
    pending_slots = self._update_host_assignments(current_hosts)
    self._worker_registry.reset(self.world_size())
    self._start_worker_processes(pending_slots)

3.4.3 _start_worker_processes

啟動之後,在一個執行緒中通過run_worker啟動worker,然後使用 self._results.expect(thread) 向 ResultsRecorder 放入 worker 執行緒。這是等待結果的關鍵。

def _start_worker_processes(self, pending_slots):
    for slot_info in pending_slots:
        self._start_worker_process(slot_info)

def _start_worker_process(self, slot_info):
    create_worker_fn = self._create_worker_fn
    shutdown_event = self._shutdown
    host_event = self._host_manager.get_host_event(slot_info.hostname)

    def run_worker():
        res = create_worker_fn(slot_info, [shutdown_event, host_event])
        exit_code, timestamp = res
        self._handle_worker_exit(slot_info, exit_code, timestamp)

    thread = threading.Thread(target=run_worker) # 啟動訓練執行緒
    thread.daemon = True
    thread.start()
    self._results.expect(thread) # 等待運行結果

3.5 等待運行結果

Driver 使用 如下得到結果。

def get_results(self):
    return self._results.get_results()

_results是 ResultsRecorder 類型,所以我們需要看看其實現。

3.5.1 ResultsRecorder

幾個功能如下:

  • expect 等待 thread:採用 expect 來 self._worker_threads.put(worker_thread),這樣就知道應該等待哪些 thread。
  • add_result 添加結果:_handle_worker_exit 會 在 record 之後,調用 self._results.add_result(name, (exit_code, timestamp)) 紀錄結果;
  • get_results 獲取結果:driver 就是調用此函數,獲取結果,利用了 join。
class ResultsRecorder(object):
    def __init__(self):
        self._error_message = None
        self._worker_results = {}
        self._worker_threads = queue.Queue()

    def expect(self, worker_thread):
        self._worker_threads.put(worker_thread)

    def add_result(self, key, value):
        if key in self._worker_results:
            return
        self._worker_results[key] = value

    def get_results(self):
        while not self._worker_threads.empty():
            worker_thread = self._worker_threads.get()
            worker_thread.join()
        return Results(self._error_message, self._worker_results)

3.5.2 worker 結束

Driver 使用 _handle_worker_exit 來等待具體 worker結束。根據 worker 的返回來決定如何處理。

_handle_worker_exit 是運行在 worker thread 之中,運行時候,會通過self._results.add_result 往 ResultsRecorder 註冊資訊。

def _handle_worker_exit(self, slot_info, exit_code, timestamp):
    if not self.has_rank_assignment(slot_info.hostname, slot_info.local_rank):
        # Ignore hosts that are not assigned a rank
        return

    if exit_code == 0: # 順利完成記錄
        rendezvous_id = self._worker_registry.record_success(slot_info.hostname, slot_info.local_rank)
    else: # 否則記錄失敗
        rendezvous_id = self._worker_registry.record_failure(slot_info.hostname, slot_info.local_rank)

    if self.finished() and self._worker_registry.last_rendezvous() == rendezvous_id:
        name = '{}[{}]'.format(slot_info.hostname, slot_info.local_rank)
        self._results.add_result(name, (exit_code, timestamp)) # 往ResultsRecorder註冊資訊

具體如下:

+-----------------------------+
| ElasticDriver               |
|                             |
|        start                |
|          +                  |
|          | 1                |
|          |                  |
|          v                  |
|  _activate_workers          |
|          +                  |                 +-------------------------+
|          |                  |                 | Thread                  |
|          | 2                |                 |                         |
|          v                  |                 |        run_worker       |
| _start_worker_processes     |                 |            +            |
|          +                  |                 |            |            |
|          |                  |                 |            | 7          |
|          | 3                |                 |            v            |
|          v                  |                 |     create_worker_fn    |
|  _start_worker_process      |                 |            +            |
|          +                  |                 |            |            |
|          |                  |                 |            | 8          |
|          | 4                |                 |            v            |  results.add_result
|          v                  | thread.start()  |   _handle_worker_exit +---------------------------------+
|      run_worker +---------------------------> |                         |                9              |
|          +                  |      5          +-------------------------+                               |
|          |                  |                                                                           |
|          |                  |                 +------------------------------------------------------+  |
|          v                  | expect(thread)  | ResultsRecorder                                      |  |
|     self._results  +------------------------> |                                                      |  |
|                             |      6          | _worker_results = [thread]                           |  |
|                             |                 |                                                      |  |
|                             |                 | _worker_threads = [name : (exit_code, timestamp)] <-----+
+-----------------------------+                 |                                                      |
                                                +------------------------------------------------------+

或者手機如下:

我們接下來會具體看看彈性訓練的其他部分。

因為 Driver 是彈性訓練主要框架,所以不可避免的在其他文章中也會出現 本文部分文字,敬請諒解。

0xEE 個人資訊

★★★★★★關於生活和技術的思考★★★★★★

微信公眾帳號:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,敬請關注。

在這裡插入圖片描述

0xFF 參考

ElasticDL調用 Horovod 在Kubernetes上實現彈性 AllReduce(一)

kubernetes 培訓_在Kubernetes上使用horovod進行分散式深度學習培訓

在 Kubernetes 上彈性深度學習訓練利器 — Elastic Training Operator

ElasticHorovod – 彈性、容錯的分散式訓練 (嘗鮮版)

Horovod 彈性訓練