设为首页 加入收藏

TOP

【深入浅出 Yarn 架构与实现】4-6 RM 行为探究 - 申请与分配 Container(一)
2023-07-25 21:40:33 】 浏览:45
Tags:Yarn 4-6 申请与 Container

本小节介绍应用程序的 ApplicationMaster 在 NodeManager 成功启动并向 ResourceManager 注册后,向 ResourceManager 请求资源(Container)到获取到资源的整个过程,以及 ResourceManager 内部涉及的主要工作流程。

一、整体流程

整个过程可看做以下两个阶段的送代循环:

  • 阶段1 ApplicationMaster 汇报资源需求并领取已经分配到的资源;
  • 阶段2 NodeManager 向 ResourceManager 汇报各个 Container 运行状态,如果 ResourceManager 发现它上面有空闲的资源,则进行一次资源分配,并将分配的资源保存到对应的 应用程序数据结构中,等待下次 ApplicationMaster 发送心跳信息时获取(即阶段1)。

image.png

一)AM 汇报心跳

1、ApplicationMaster 通过 RPC 函数 ApplicationMasterProtocol#allocate 向 ResourceManager 汇报资源需求(由于该函数被周期性调用,我们通常也称之为“心跳”),包括新的资源需求描述、待释放的 Container 列表、请求加入黑名单的节点列表、请求移除黑名单的节点列表等。

public AllocateResponse allocate(AllocateRequest request) {
	// Send the status update to the appAttempt.
    // 发送 RMAppAttemptEventType.STATUS_UPDATE 事件
	this.rmContext.getDispatcher().getEventHandler().handle(
	    new RMAppAttemptStatusupdateEvent(appAttemptId, request.getProgress()));
    
    // 从 am 心跳 AllocateRequest 中取出新的资源需求描述、待释放的 Container 列表、黑名单列表
    List<ResourceRequest> ask = request.getAskList();
    List<ContainerId> release = request.getReleaseList();
    ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();

	// 接下来会做一些检查(资源申请量、label、blacklist 等)

	// 将资源申请分割(动态调整 container 资源量)
    // Split Update Resource Requests into increase and decrease.
    // No Exceptions are thrown here. All update errors are aggregated
    // and returned to the AM.
    List<UpdateContainerRequest> increaseResourceReqs = new ArrayList<>();
    List<UpdateContainerRequest> decreaseResourceReqs = new ArrayList<>();
    List<UpdateContainerError> updateContainerErrors =
        RMServerUtils.validateAndSplitUpdateResourceRequests(rmContext,
            request, maximumCapacity, increaseResourceReqs,
            decreaseResourceReqs);

	// 调用 ResourceScheduler#allocate 函数,将该 AM 资源需求汇报给 ResourceScheduler
    // (实际是 Capacity、Fair、Fifo 等实际指定的 Scheduler 处理)
    allocation =
        this.rScheduler.allocate(appAttemptId, ask, release,
            blacklistAdditions, blacklistRemovals,
            increaseResourceReqs, decreaseResourceReqs);
}

2、ResourceManager 中的 ApplicationMasterService#allocate 负责处理来自 AM 的心跳请求,收到该请求后,会发送一个 RMAppAttemptEventType.STATUS_UPDATE 事件,RMAppAttemptImpl 收到该事件后,将更新应用程序执行进度和 AMLivenessMonitor 中记录的应用程序最近更新时间。
3、调用 ResourceScheduler#allocate 函数,将该 AM 资源需求汇报给 ResourceScheduler,实际是 Capacity、Fair、Fifo 等实际指定的 Scheduler 处理。
CapacityScheduler#allocate 实现为例:

// CapacityScheduler#allocate
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
    List<ResourceRequest> ask, List<ContainerId> release,
    List<String> blacklistAdditions, List<String> blacklistRemovals,
    List<UpdateContainerRequest> increaseRequests,
    List<UpdateContainerRequest> decreaseRequests) {

    // Release containers
	// 发送 RMContainerEventType.RELEASED
    releaseContainers(release, application);

    // update increase requests
    LeafQueue updateDemandForQueue =
        updateIncreaseRequests(increaseRequests, application);

    // Decrease containers
    decreaseContainers(decreaseRequests, application);

    // Sanity check for new allocation requests
    // 会将资源请求进行规范化,限制到最小和最大区间内,并且规范到最小增长量上
    SchedulerUtils.normalizeRequests(
        ask, getResourceCalculator(), getClusterResource(),
        getMinimumResourceCapability(), getMaximumResourceCapability()
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇JVM面试大总结 下一篇转码服务serverless探索

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目