hadoop2.8.4 版本yarn RM fairScheduler調度性能優化的嘗試

  • 2019 年 11 月 12 日
  • 筆記

對一般小公司來說 可能yarn調度能力足夠了 但是對於大規模集群1000 or 2000+的話  yarn的調度性能捉襟見肘

恰好網上看到一篇很好的文章https://tech.meituan.com/2019/08/01/hadoop-yarn-scheduling-performance-optimization-practice.html

參考了YARN-5969 發現hadoop2.9.0已經修正了該issue 實測提高了調度性能 

FairScheduler 調度方式有兩種 

心跳調度:Yarn的NodeManager會通過心跳的方式定期向ResourceManager彙報自身狀態 伴隨着這次rpc請求 會觸發Resourcemanager 觸發nodeUpdate()方法 為這個節點進行一次資源調度

持續調度:有一個固定守護線程每隔很短的時間調度 實時的資源分配,與NodeManager的心跳出發的調度相互異步並行進行

心跳調度作為一個線程 每次運行

每次nodeUpdate 走的都是相同的邏輯

    // If the node is decommissioning, send an update to have the total      // resource equal to the used resource, so no available resource to      // schedule.      if (nm.getState() == NodeState.DECOMMISSIONING) {        this.rmContext            .getDispatcher()            .getEventHandler()            .handle(                new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption                    .newInstance(getSchedulerNode(nm.getNodeID())                        .getUsedResource(), 0)));      }        if (continuousSchedulingEnabled) {        if (!completedContainers.isEmpty()) {  //心跳調度         attemptScheduling(node);        }      } else {        attemptScheduling(node);  //持續調度      }        // Updating node resource utilization      node.setAggregatedContainersUtilization(          nm.getAggregatedContainersUtilization());      node.setNodeUtilization(nm.getNodeUtilization());

 

 

 

continuousSchedulingAttempt

/**
* Thread which attempts scheduling resources continuously,
* asynchronous to the node heartbeats.
*/
private class ContinuousSchedulingThread extends Thread {

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
continuousSchedulingAttempt();
Thread.sleep(getContinuousSchedulingSleepMs());
} catch (InterruptedException e) {
LOG.warn("Continuous scheduling thread interrupted. Exiting.", e);
return;
}
}
}
}

之後進行一次node節點 根據資源寬鬆情況的排序

void continuousSchedulingAttempt() throws InterruptedException {      long start = getClock().getTime();      List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());      // Sort the nodes by space available on them, so that we offer      // containers on emptier nodes first, facilitating an even spread. This      // requires holding the scheduler lock, so that the space available on a      // node doesn't change during the sort.      synchronized (this) {        Collections.sort(nodeIdList, nodeAvailableResourceComparator);      }        // iterate all nodes      for (NodeId nodeId : nodeIdList) {        FSSchedulerNode node = getFSSchedulerNode(nodeId);        try {          if (node != null && Resources.fitsIn(minimumAllocation,              node.getAvailableResource())) {            attemptScheduling(node);          }        } catch (Throwable ex) {          LOG.error("Error while attempting scheduling for node " + node +              ": " + ex.toString(), ex);          if ((ex instanceof YarnRuntimeException) &&              (ex.getCause() instanceof InterruptedException)) {            // AsyncDispatcher translates InterruptedException to            // YarnRuntimeException with cause InterruptedException.            // Need to throw InterruptedException to stop schedulingThread.            throw (InterruptedException)ex.getCause();          }        }      }

依次對node遍歷分配Container 

queueMgr.getRootQueue().assignContainer(node) 從root遍歷樹 對抽象的應用資源遍歷
    boolean validReservation = false;      FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();      if (reservedAppSchedulable != null) {        validReservation = reservedAppSchedulable.assignReservedContainer(node);      }      if (!validReservation) {        // No reservation, schedule at queue which is farthest below fair share        int assignedContainers = 0;        Resource assignedResource = Resources.clone(Resources.none());        Resource maxResourcesToAssign =            Resources.multiply(node.getAvailableResource(), 0.5f);        while (node.getReservedContainer() == null) {          boolean assignedContainer = false;          Resource assignment = queueMgr.getRootQueue().assignContainer(node);          if (!assignment.equals(Resources.none())) { //判斷是否分配到container            assignedContainers++;            assignedContainer = true;            Resources.addTo(assignedResource, assignment);          }          if (!assignedContainer) { break; }          if (!shouldContinueAssigning(assignedContainers,              maxResourcesToAssign, assignedResource)) {            break;          }        }

接下來在assignContainer 方法中對子隊列使用特定的比較器排序這裡是fairSchduler
  @Override    public Resource assignContainer(FSSchedulerNode node) { 對於每一個服務器,對資源樹進行一次遞歸搜索      Resource assigned = Resources.none();        // If this queue is over its limit, reject      if (!assignContainerPreCheck(node)) {        return assigned;      }        // Hold the write lock when sorting childQueues      writeLock.lock();      try {        Collections.sort(childQueues, policy.getComparator());      } finally {        writeLock.unlock();      }

對隊列下的app排序

/*       * We are releasing the lock between the sort and iteration of the       * "sorted" list. There could be changes to the list here:       * 1. Add a child queue to the end of the list, this doesn't affect       * container assignment.       * 2. Remove a child queue, this is probably good to take care of so we       * don't assign to a queue that is going to be removed shortly.       */      readLock.lock();      try {        for (FSQueue child : childQueues) {          assigned = child.assignContainer(node);          if (!Resources.equals(assigned, Resources.none())) {            break;          }        }      } finally {        readLock.unlock();      }      return assigned;

assignContainer 可能傳入的是app 可能傳入的是一個隊列 是隊列的話 進行遞歸 直到找到app為止(root(FSParentQueue)節點遞歸調用assignContainer(),最終將到達最終葉子節點的assignContainer()方法,才真正開始進行分配)

 

 我們在這裡 關注的就是排序

hadoop2.8.4 排序類 FairSharePolicy中的 根據權重 需求的資源大小 和內存佔比 進行排序 多次獲取

getResourceUsage() 產生了大量重複計算 這個方法是一個動態獲取的過程(耗時)
  @Override
public int compare(Schedulable s1, Schedulable s2) {
double minShareRatio1, minShareRatio2;
double useToWeightRatio1, useToWeightRatio2;
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
minShareRatio1 = (double) s1.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
s2.getWeights().getWeight(ResourceType.MEMORY);
int res = 0;
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0)
res = s1.getName().compareTo(s2.getName());
}
return res;
}
}

新版優化後如下

@Override      public int compare(Schedulable s1, Schedulable s2) {        int res = compareDemand(s1, s2);          // Pre-compute resource usages to avoid duplicate calculation        Resource resourceUsage1 = s1.getResourceUsage();        Resource resourceUsage2 = s2.getResourceUsage();          if (res == 0) {          res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);        }          if (res == 0) {          res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2);        }          // Break the tie by submit time        if (res == 0) {          res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());        }          // Break the tie by job name        if (res == 0) {          res = s1.getName().compareTo(s2.getName());        }          return res;      }          private int compareDemand(Schedulable s1, Schedulable s2) {        int res = 0;        Resource demand1 = s1.getDemand();        Resource demand2 = s2.getDemand();        if (demand1.equals(Resources.none()) && Resources.greaterThan(                RESOURCE_CALCULATOR, null, demand2, Resources.none())) {          res = 1;        } else if (demand2.equals(Resources.none()) && Resources.greaterThan(                RESOURCE_CALCULATOR, null, demand1, Resources.none())) {          res = -1;        }        return res;      }          private int compareMinShareUsage(Schedulable s1, Schedulable s2,                                       Resource resourceUsage1, Resource resourceUsage2) {        int res;        Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,                s1.getMinShare(), s1.getDemand());        Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,                s2.getMinShare(), s2.getDemand());        boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,                resourceUsage1, minShare1);        boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,                resourceUsage2, minShare2);          if (s1Needy && !s2Needy) {          res = -1;        } else if (s2Needy && !s1Needy) {          res = 1;        } else if (s1Needy && s2Needy) {          double minShareRatio1 = (double) resourceUsage1.getMemorySize() /                  Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE)                          .getMemorySize();          double minShareRatio2 = (double) resourceUsage2.getMemorySize() /                  Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE)                          .getMemorySize();          res = (int) Math.signum(minShareRatio1 - minShareRatio2);        } else {          res = 0;        }          return res;      }          /**       * To simplify computation, use weights instead of fair shares to calculate       * fair share usage.       */      private int compareFairShareUsage(Schedulable s1, Schedulable s2,                                        Resource resourceUsage1, Resource resourceUsage2) {        double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);        double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);        double useToWeightRatio1;        double useToWeightRatio2;        if (weight1 > 0.0 && weight2 > 0.0) {          useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;          useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;        } else { // Either weight1 or weight2 equals to 0          if (weight1 == weight2) {            // If they have same weight, just compare usage            useToWeightRatio1 = resourceUsage1.getMemorySize();            useToWeightRatio2 = resourceUsage2.getMemorySize();          } else {            // By setting useToWeightRatios to negative weights, we give the            // zero-weight one less priority, so the non-zero weight one will            // be given slots.            useToWeightRatio1 = -weight1;            useToWeightRatio2 = -weight2;          }        }          return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);      }      }

用了測試環境集群 比較了修改前後兩次隊列排序耗時

 

 上面紅框里為 新版本 下面紅框為老版本 雖然沒有進行壓測 但是在同樣的調度任務前提下 是有說服力的 在大集群上每秒調度上千萬乃至上億次該方法時  調度優化變的明顯