设为首页 加入收藏

TOP

Hadoop作业运行机制
2018-12-07 00:40:16 】 浏览:24
Tags:Hadoop 作业 运行 机制
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lb812913059/article/details/79897863

hadoop会为每个分片构建一个map任务,map和reduce每个阶段都以键值对作为输入和输出。键是某一行起始位置相对于文件起始位置的偏移量(行偏移量)。

1.为什么要将MapReduce计算转移到存储有部分数据的各台机器上
这样可以获得最佳性能,因为它无需使用宝贵的集群【网络带宽资源】,这就是所谓的.数据本地化优化(移动计算而不移动数据)。

2.为什么最佳分片的大小应该与块大小相同
因为它是确保可以存储在单个节点上的最大输入块的大小,如果分片跨越两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储有两个数据块,因此必然会有部分数据需要通过网络传输到map任务运行的节点。大量占用网络带宽资源,这样的方法效率太低。

3.为什么Map任务将其输出写入本地硬盘而非HDFS
因为map的输出是中间结果,该中间结果由reduce任务处理后才产生最终输出结果,而且一旦作业完成,map的输出结果就可以删除。因此如果把它存储在HDFS上,则必然会实现数据的备份措施(资源浪费)。如果运行map任务的节点在将map中间结果传送给reduce任务之前失败,Hadoop将在另一个节点上重新运行这个map任务以再次构建map中间结果。

MapReduce作业运行机制

MapReduce1.0版本

这里写图片描述

1.在客户端启动作业
    通过submit或者waitForCompletion提交作业,waitForCompletion()方法通过每秒循环轮转作业进度
    如果发现与上次报告有改变,则将进度报告发送到控制台。
    其实waitForComplection()方法中还是调用submit()方法

2.向JobTracker申请Job ID
    submit()方法中创建一个JobSubmitter实例,然后向JobTracker申请一个JobID
        确定输出目录是否存在,如果存在那么JobTracker会抛出错误给客户端
        检查输入目录是否存在,如果不存在同样抛出错误
        根据输入计算输入分片(Input Split),如果分片计算不出来也会抛出错误

3.复制作业的资源文件
    将运行作业所需要的资源复制到HDFS上,存放到JobTracker为该作业创建的一个以Job ID命名的文件夹中
    包括MapReduce程序打包的JAR文件,配置文件和客户端计算所得的输入分片信息。

4.客户端提交作业,JobTracker接收到作业请求

5.作业初始化,创建作业对象
    当jobTracker接收到Client提交的submitJob()方法之后,会将这个调用放到一个内部序列中
    等待作业调度器进行调度,并对其初始化
    初始化包括一个表示正在运行的作业的对象,用来封装任务和记录信息,以便跟踪任务的转台和进程。

6.获取输入分片数
    为了创建任务列表,JobTracker需要从HDFS文件系统中获取已经计算好的输入分片
    当作业调度器根据自己的调度算法调度到该作业时,会根据输入分片信息为每个分片创建一个map任务
    并将map任务分配给TaskTracker执行(reduce的个数是通过mapred.reduce.tasks属性决定)

7.心跳HeartBeat
    TaskTracker通过运行一个简单的循环来定期向JobTracker发送心跳信息(心跳间隔是5秒)
    告知JobTracker节点是否存活,并且通过心跳机制,来充当二者之间的信息通道

8. 获取作业资源文件
    TaskTracker开始运行作业任务,首先从HDFS中将Jar包复制到本地文件系统来进行计算,从而达到本地化
    在本地为这个任务创建一个工作目录并把jar解压在此处,然后创建一个TaskRunner来运行该任务
    对于map和reduce任务,TaskTracker根据CPU内核的数量和内存的大小有固定数量的map槽和reduce槽

    数据本地化:
    将map任务分配给含有该map处理的数据块的TaskTracker上,同时将程序JAR包复制到该TaskTracker上来运行
    而分配reduce任务时并不考虑数据本地化。

9、TaskRunner启动一个JVM来运行每个任务(图中10步骤)
    以便map任务和reduce任务出现任何问题,都不会影响到tasktracker,但是可以重用JVM。

10.当JobTracker收到作业的最后一个任务完成信息时,便把该作业设置成“成功”
    当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户

YARN(MapReduce2)的引入

这里写图片描述

1.通过submit或者waitForCompletion提交作业,waitForCompletion()方法通过每秒循环轮转作业进度
  如果发现与上次报告有改变,则将进度报告发送到控制台

2.向ResourceManager申请Application ID,RM检查输入输出说明、计算输入分片

3.复制作业的资源文件,将作业信息(jar、配置文件、分片信息)复制到HDFS上用户的应用缓存目录中

4.通过submitApplication()方法提交作业到资源管理器

5.资源管理器在收到submitApplication()消息后,将请求传递给调度器(Scheduler)
  调度器为其分配一个容器Container,然后RM在NM的管理下在container中启动程序的ApplicationMaster进程

6.ApplicationMaster对作业进行初始化,创建过个薄记对象以跟踪作业进度
  是一个java应用程序,他的主类是MRAppmaster

7.ApplicationMaster接受来自HDFS在客户端计算的输入分片
  对每一个分片创建一个map任务,任务对象,由mapreduce.job.reduces属性设置reduce个数

  uber模式:当任务小的时候就会启动一个JVM运行MapReduce作业,这在MapReduce1中是不允许的
            这样的作业在YARN中成为uber作业,通过设置mapreduce.job.ubertask.enable设置为false使用
            那什么是小任务呢当小于10个mapper且只有1个reducer且输入大小小于一个HDFS块的任务

8.如果作业不适合uber任务运行,ApplicationMaster就会为所有的map任务和reduce任务向资源管理器申请容器
  请求为任务指定内存需求,map任务和reduce任务的默认都会申请1024MB的内存

9.资源管理器为任务分配了容器,ApplicationMaster就通过节点管理器启动容器。
  该任务由主类YarnChild的java应用程序执行。

10.运行任务之前,首先将资源本地化,包括作业配置、jar文件和所有来自分布式缓存的文件

11.最后执行map任务和reduce任务

JobControl
当MapReduce工作流中的作业不止一个时,如果管理这些作业按顺序执行?有几种方法,其中主要考虑是否有一个线性的作业连或一个更复杂的作业有向无环图(DAG)。对于线性链表,最简单的方法就是一个接一个的巡行作业,等前一个作业运行结果后再运行下一个:
JobClient.runJob(conf1);
JobClient.runjob(conf2);
如果一个作业失败,runjob()方法就会抛出一个IOException,这样一来,管道中后面的作业就无法执行。通常使用Oozie或azkaban等MapReduce工作流系统

作业提交
可以通过一个简单的方法调用来运行MapReduce作业:
Job对象的submit()方法,该方法封装了大量的处理细节。也可以调用waitForCompletion()方法,它用于提交以前没有提交过的作业,并等待它的完成。WaitForCompletion()内部还是调用了submit()方法。

JobClient的runjob()方法是用于新建JobClient实例并调用其submitjob()方法的便捷方式(见图步骤1)。job的submit()方法创建一个内部的JobSummiter实例,并且调用submitJobInternal()方法。

提交作业后,waitForCompletion()每秒轮询任务进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功,就显示作业计数器;如果失败,则会导致作业失败的错误记录被输出到控制台。

JobClient的JobSummiter()方法所实现的作业提交过程如下

  1. 向资源管理器请求一个新应用ID,用于MapReduce作业ID。(步骤2)
  2. 检查作业的输出说明,例如:如果没有指定输出目录或输出目录已存在,就不提交且错误抛回给MapReduce
  3. 计算作业的输入分片。如果分片无法计算,比如因为输入路径不存在,作业就不提交,错误返回给MapReduce
  4. 将运行作业需要的资源(包括作业JAR文件,配置文件和计算所得的输入分片)复制到一个以作业ID命名的目录下的【共享文件系统】中(步骤3)。作业JAR的副本较多(由Mapreduce.client.submit.file.replication属性控制,默认为10),因此在运行作业的任务时,集群中有很多个副本可供NodeManager访问
  5. 通过调用ResourceManager的submitApplication()方法提交作业(步骤4)

作业ID

hadoop2中,MapReduce作业ID由YARN资源管理器创建的YARN应用ID生成,一个应用ID的格式包含两部分:
1、资源管理器(不是应用)开始时间
2、唯一标识此应用的由资源管理器维护的增量计数器
这里写图片描述

作业初始化
资源管理器收到调用它的submitApplication()消息后,会把此调用放入一个内部队列中,将请求传递给YARN调度器(scheduler)并对其进行初始化。调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动application master的进程(步骤5a,5b)

MapReduce作业的application master是一个java应用程序,它的主类是MRAppMaster。它对作业进行初始化操作:通过创建多个簿记对象以保持对作业进度的跟踪,因为它将接受来自任务的进度和完成报告(步骤6)。

接下来,它接受来自共享文件系统的、在客户端计算的输入分片(步骤7)。然后对每一个分片创建一个map任务对象以及确定reduce任务对象数量(由mapreduce.job.reduces属性决定,通过作业的setNumReduceTasks()方法设置)。再由schedule创建响应数量的reduce任务。任务在此时被指定ID。

除了map和reduce任务,还有setupJob和cleanupJob需要建立:由TaskTrackers在所有map开始前和所有reduce结束后分别执行,这两个方法在OutputCommitter中(默认是FileOutputCommitter)。setupJob()创建输出目录和任务的临时工作目录,cleanupJob()删除临时工作目录。

接下来,Application master必须决定如何运行构成MapReduce作业的各个任务。如果作业很小,就选择和自己在同一个JVM上运行任务。与在一个节点上顺序运行这些任务相比,当application master判断在新容器中分配和运行任务的开销大于并行运行他们的开销时,就会发生这一情况。这样的作业称为uberized,或者作为uber任务运行。而mapreduce1.x从不在单个TaskTracker上运行小作业。

哪些作业是小作业默认情况下,小作业就是少于10个mapper且只有一个reducer且输入大小小于一个HDFS块的任务。必须明确启用Uber任务(对于单个作业,或者是对整个集群),具体方法是将mapreduce.job.ubertask.enable设置为true。

最后,在任何任务运行之前,application master调用setupJob()方法设置OutputCommitter来建立作业的最终输出目录及任务输出的临时工作空间。在mapreduce1.x中,它在一个由taskTracker运行的特殊任务中被调用,而在YARN执行框架中,该方法由application master直接调用

任务分配
如果作业不适合作为Uber任务执行,那么application master就会为该作业中的所有map任务和reduce任务向资源管理器请求容器(步骤8)。首先为Map任务发出请求。该请求优先级要高于reduce任务的请求,这是因为所有的map任务必须在reduce的排序阶段能够启动前完成。直到有5%的map任务已经完成时,为reduce任务的请求才会发出。

NodeManager运行着一个简单的循环来定期发送心跳(heartbeat)给ResourceManager。传递节点是否存活的信息,同时也充当了两者之间的消息通道。请求信息附着在心跳信息上

reduce任务能够在集群中的任意位置运行,但是map任务的请求有着数据本地化局限。这也是调度器所关注的。因此附在心跳信息上的请求包括每个map任务的数据本地化信息,特别是输入分片所在的主机和相应机架信息。调度器使用这些信息来调度决策(像jobtracker的调度器一样)。

在理想情况下,任务是数据本地化的,即任务在分片所在的同一节点上运行。可选的情况是,任务可能是机架本地化的,即和分片在同一机架而非同一节点上运行。有一些任务既不是数据本地化,也不是机架本地化,他们会从别的机架上获取自己的数据。对于一个特定的作业运行,可以通过查看作业的计数器来确定在每个本地化层次上运行的任务的数量。

请求也为任务指定了内存需求和CPU个数
默认情况下map任务和reduce任务都分配到1024MB内存和一个虚拟的内核,这些值可以在每个作业的基础上进行配置
设置内存:mapreduce.map.memory.mb,mapreduce.redcue.memory.mb
设置CPU内核:mapreduce.map.cpu.vcores和mapreduce.reduce.cpu.vcoreesp.memory.mb
这里写图片描述

任务执行
一旦资源管理器的调度器为任务分配了容器,application master就通过与节点管理器通信来启动容器(步骤9a,9b)。该任务由主类为YarnChild的Java应用程序执行。在它运行任务之前,首先将任务需要的资源本地化,包括做的配置,JAR文件和所有来自分布式缓存的文件(步骤10)。最后,运行map运行或reduce任务(步骤11)。

YarnChild在指定的JVM中运行,因此用户定义的map或reduce函数(甚至是YarnChild)中的任何缺陷不会影响到节点管理器,例如导致崩溃或挂起。
每个任务都能够执行搭建(setup)和提交(commit)动作,他们和任务本身在同一个JVM中运行,并由作业的OutputCommitter确定。对于基于文件的作业,提交动作将任务输出由临时位置搬移到最终位置。提交协议确保党推测执行被启动时。只有一个任务副本被提交,其他的都被取消。

进度和状态更新
这里写图片描述

任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,例如已写入的map输出记录数,要么由用户自己定义。当map任务或reduce任务运行时,子进程和自己的父application master进行通信,每隔3秒钟任务向application master报告进度和状态(包括计数器),application master会形成一个作业的汇聚视图

资源管理器的界面显示了所有运行中的应用程序,并且分别有连接只想这些应用各自的application master的界面。这些界面展示了mapreduce作业的更多细节,包括进度。在作业期间,客户端每秒轮询一次application master以接收最新状态。客户端可以使用Job的getStatus()方法得到一个JobStatus实例,后者包含作业的所有状态信息。

map进度标准是处理输入所占比例,reduce是copy\merge\reduce(与shuffle的三个阶段相对应)整个进度的比例。

Child JVM有独立的线程每隔3秒检查任务更新标志,如果有更新就会报告给此tasktracker;
tasktracker每隔5秒给jobtracker发心跳;
job tracker合并这些更新,产生一个表明所有运行作业及其任务状态的全局试图。
JobClient通过每秒查询Jobtracker来接收最新状态。

作业完成

当Application Master收到作业最后一个任务已完成的通知后,便把作业的状态设置为“成功”。然后,在Job轮询状态时,便知道任务已经完成,于是Job打印一条消息告知用户,然后从waitForCompletion()方法返回。Job的统计信息和计数值也在这时输出到控制台。

如果Application master有相应的设置,也会发送一个HTTP作业通知。希望收到回调指令的客户端可以通过mapreduce.job.end-notification.url 属性来进行这项设置。最后作业完成时,application master和任务容器清理其工作状态(这样中间输出将被删除)。OutputCommitter的commitJob()方法会被调用。作业信息由作业历史服务器存档,以便日后用户需要时可以查询。

作业调优
这里写图片描述

作业调优检查表
这里写图片描述


编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇hadoop2.7.1不重启,动态删除节点.. 下一篇Hadoop官方文档翻译——YARN Arch..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }