<返回更多

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

2020-07-08    
加入收藏

前言

我们基于Hadoop 1.2.1源码分析MapReduce V1的处理流程。这篇文章的内容,更多地主要是描述处理/交互流程性的东西,大部分流程图都是经过我梳理后画出来的(开始我打算使用序列图来描述流程,但是发现很多流程在单个对象内部都已经非常复杂,想要通过序列图表达有点担心描述不清,所以选择最基本的程序流程图),可能看起来比较枯燥,重点还是关注主要的处理流程要点,特别的地方我会刻意标示出来,便于理解。JobTracker与TaskTracker之间通过org.Apache.hadoop.mapred.InterTrackerProtocol协议来进行通信,TaskTracker通过该接口进行远程调用实现Heartbeat消息的发送,协议方法定义如下所示:

HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) throws IOException;

通过该方法可以看出,最核心的Heartbeat报告数据都封装在TaskTrackerStatus对象中,JobTracker端会接收TaskTracker周期性地发送的心跳报告,根据这些心跳信息来更新整个Hadoop集群中计算资源的状态/数量,以及Task的运行状态。另外,在JobTracker端维护的对象的数据结构,主要包括如下3个:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

TaskTrackerStatus中各个数据项的含义,说明如下表所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

下面,主要对ResourceStatus、TaskStatus、TaskTrackerHealthStatus进行说明:

ResourceStatus封装了一个TaskTracker节点的资源信息,结构如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

TaskStatus

TaskStatus封装了一个TaskTracker节点上运行的Task的状态信息,结构如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

上图将TaskStatus的包含的数据结构全部展开,可以根据字段含义来了解它所描述的一些信息。

TaskTrackerHealthStatus封装了TaskTracker的健康状态信息,如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

JobTracker处理Heartbeat流程JobTracker处理Heartbeat的流程,如果把每个处理细节都详细地展开,非常地复杂,可能从头到尾描述下来会感觉枯燥无味,所以这里我先概要地描述JobTracker处理Heartbeat的整体流程,然后再按照功能划分出一个个看似还算独立的子处理流程,单独地进行详细说明,这样能够更容易理解。整体处理流程,如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析
大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析
大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

下面,我们根据上面的Heartbeat处理流程图,概要地说明Heartbeat是如何处理的,流程描述如下所示:

TaskTracker创建一个TaskTrackerStatus对象,TaskTrackerStatus内部封装的信息包括:TaskTracker所在节点的基本信息、运行在TaskTracker上的Task的状态信息、TaskTracker服务的健康状态信息、TaskTracker的资源信息,另外发送心跳的RPC方法还包括restarted(TaskTracker是否重启)、initialContact(TaskTracker是否初次连接JobTracker)、acceptNewTasks(TaskTracker是否能够运行新的Task)、responseId(心跳响应ID),通过InterTrackerProtocol协议的heartbeat方法发送给JobTracker。

JobTracker接收到TaskTracker发送的心跳数据。

JobTracker检查TaskTracker的host是否在黑名单中,如果TaskTracker在黑名单中,则直接抛出异常终止RPC调用,否则继续下一步流程。

检查TaskTracker RPC调用参数restarted的值,如果TaskTracker重启了,则标记TaskTracker状态为健康状态;如果TaskTracker没有重启,则检查是否可以指派任务在该TaskTracker上运行。

如果TaskTracker不是初次连接JobTracker,检查JobTracker是否存在上一次向该TaskTracker发送的Heartbeat响应数据,存在的话则说明TaskTracker因为失去了与JobTracker之间的RPC连接而没有接收到,JobTracker直接再给TaskTracker重新发送该响应数据;不存在的话,若JobTracker重启了,使TaskTracker重新加入集群,需要通知Recovery Manager从恢复列表中移除该TaskTracker,若JobTracker未重启,这种情况几乎是不可能存在的(既然TaskTracker不是初次连接,JobTracker也没有重启,JobTracker端不可能没有保存Heartbeat响应数据)。

处理JobTracker接收到的TaskTracker的Heartbeat信息,主要是TaskTrackerStatus封装的数据。

根据处理Heartbeat数据结果,如果TaskTracker需要重新初始化,则发送一个带有ReinitTrackerAction指令的Heartbeat响应数据,否则TaskTracker不需要重新初始化则继续下一步流程。

检查是否可以向该TaskTracker指派任务,如果可以可以向该TaskTracker指派任务,则直接使用TaskScheduler指定的调度策略,选择当前可以指派给TaskTracker的一组需要启动的Task(对应指令LaunchTaskAction)。

根据TaskScheduler调度策略选择的需要启动的Task,并根据TaskTracker发送的Task状态报告,继续选择一些已经完成/需要被清理的Task分配给TaskTracker:先检查在该TaskTracker上是否有完成的Job,计算属于这些Job的需要被Kill掉(对应指令KillTaskAction)的Task;再检查是否有完成的Job,并且对应在该TaskTracker上的Task需要被清理(对应指令KillTaskAction);最后检查是否有已经完成需要被提交的Task(以此来通知TaskTracker提交Task完成并更新状态,对应指令CommitTaskAction)。

构造一个包含可调度Task(LaunchTaskAction/KillTaskAction/CommitTaskAction)的HeartbeatResponse对象,更新JobTracker内部维护的trackerToHeartbeatResponseMap映射。根据TaskTracker的Heartbeat报告的Task状态信息,对标记为完成的Task,更新JobTracker内部维护的多个队列和Map:trackerToMarkedTasksMap、taskidToTrackerMap、trackerToTaskMap、taskidToTIPMap。最后,返回TaskTracker调用的结果:HeartbeatResponse对象。

上面流程图中,黑色虚线所表示的处理流程,我们说明一下:这种情况是不可能出现的,因为TaskTracker不是第一次连接JobTracker,而JobTracker端还没有上一次TaskTracker发送的Heartbeat对应的HeartbeatResponse,同时JobTracker又没有重启动过,所以这种条件是不存在的,那么该流程分支也不可能执行,故而用虚线描述,指向发送一个带有ReinitTrackerAction的HeartbeatResponse。下面,我们细化整个流程,将一些比较重要的流程详细分析说明:TaskTracker与JobTracker失去连接,更新状态JobTracker如果在给定超时时间范围之内没有收到TaskTracker的Heartbeat报告,会认为该TaskTracker已经无法执行/指派任务,那么在JobTracker端与该TaskTracker相关的数据结构都需要更新,受到影响的Job和Task的数据结构也需要更新,具体处理流程如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

上述流程图描述的流程,如下所示:

从队列Map<String, Set<JobID>> trackerToJobsToCleanup中移除在该TaskTracker上已经完成且需要清理的所有Job。

从队列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中移除在TaskTracker上已经运行完成且需要清理的所有Task。

通知Recovery Manager从其维护的Set<String>类型的恢复列表JobTracker.RecoveryManager.recoveredTrackers中移除该TaskTracker。

从TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap中删除在该TaskTracker上运行的所有Task。

对在该TaskTracker上的运行的每一个Task(在队列trackerToTaskMap中),进行如下2步处理:

(1)从队列Map<TaskAttemptID, TaskInProgress> taskidToTIPMap中取出TaskAttemptID对应的TaskInProgress tip结构,再根据tip获取到JobInProgress:JobInProgress job = tip.getJob();;

(2)如果ReduceTask已经完成,以及具有0个ReduceTask的所有MapTask已经完成,则将这些Task放入到队列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中;如果tip标记Task没有完成,或者满足条件tip.isMapTask() && !tip.isJobSetupTask() && job.desiredReduces() != 0,检查Job运行状态,当job.getStatus().getRunState() == JobStatus.RUNNING || job.getStatus().getRunState() == JobStatus.PREP成立时,则该Task运行失败,并更新Task状态,同时收集这类Job,放入集合Set<JobInProgress> jobsWithFailures中,后续对这些Job进行处理;

由于该TaskTracker被JobTracker标记为lost状态,则对上面收集到的jobsWithFailures集合中的Job,只要存在属于该Job的Task被分配到该TaskTracker上运行,会通过累加计算在该TaskTracker上失败的Task计数,给该TaskTracker以惩罚,并释放所有在该TaskTracker上预留的Slot。

从队列TreeMap<String, Set<TaskAttemptID>> trackerToMarkedTasksMap中移除所有被标记完成的Task,同时更新JobTracker内部维护的如下3个队列:TreeMap<TaskAttemptID, String> taskidToTrackerMap、TreeMap<String, Set<TaskAttemptID>> trackerToTaskMap、Map<TaskAttemptID, TaskInProgress> taskidToTIPMap。

如果在该TaskTracker上的运行的Task还有没处理的,则转第6步进行处理;否则,流程结束。

检查是否可以向TaskTracker指派运行Task当TaskTracker发送Heartbeat标志其没有重启,那么会执行该子流程,如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

在JobTracker端,既然TaskTracker汇报状态表明其没有重启,那么就需要检查该TaskTracker对应的黑名单和灰名单情况,如果TaskTracker状态一切正常,则恢复其正常被指派任务并运行Task的能力。标记TaskTracker为Health状态当TaskTracker重启了,然后再次连接JobTracker时,发送Heartbeat的过程中,会执行该流程。重启的TaskTracker,JobTracker会将一个TaskTracker标记为Health状态,说明该TaskTracker对应的资源信息(内存/CPU)应该在JobTracker端做记录,表示这些资源是可用的,更新JobTracker端的几个可用资源的变量计数。但是,很有可能TaskTracker重启之前,其上运行Task失败了很多次,在JobTracker端记录该失败计数,当满足一定条件后,会将TaskTracker加入灰名单,如果TaskTracker重启了,应该将其从灰名单中移除,以便不影响任务分派,具体处理流程如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

上述流程图比较简单不再累述。更新TaskTracker状态如果TaskTracker不是第一次连接JobTracker,那么在JobTracker端的队列HashMap<String, TaskTracker> taskTrackers中会保存上一次TaskTracker向JobTracker汇报的状态TaskTrackerStatus,如果该TaskTrackerStatus不存在,则直接处理当前汇报的TaskTracker的状态报告,使得JobTracker端维护的该TaskTracker的状态是最新的,具体的处理流程,如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析
大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

上图中处理流程,描述如下所示:

检查是否JobTracker端存在该TaskTracker上一次汇报的状态报告,如果不存在,则直接处理当前发送的状态报告;否则,会更新JobTracker端维护的如下4个全局计数器:totalMaps(MapTask总数)、totalReduces(ReduceTask总数)、occupiedMapSlots(占用的Map Slot总数)、occupiedReduceSlots(占用的Reduce Slot总数),在当前计数值的基础上,减去上次汇报的报告中的数量(实际上是假定上次汇报的全部指标都已完成,如果没完成,再通过本次汇报的状态报告再加回去);如果TaskTracker没有被加入到黑名单中,还需要更新下面2个JobTracker端全局计数器:totalMapTaskCapacity(该TaskTracker上最大Map Slot总数)、totalReduceTaskCapacity(该TaskTracker上最大Reduce Slot总数)。

处理TaskTracker当前汇报的状态报告,更新JobTracker内部维护的6个全局计数器:totalMaps、totalReduces、occupiedMapSlots、occupiedReduceSlots、totalMapTaskCapacity、totalReduceTaskCapacity,各个计数器具体含义见上一步说明。

如果TaskTracker是第一次汇报状态报告,则需要在JobTracker内部注册,构造一个org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker对象(该TaskTracker对象是在JobTracker的视角看到的结构),加入到队列HashMap<String, TaskTracker> taskTrackers中,同时还要计算该TaskTracker所在的host节点上TaskTracker进程的个数,更新队列Map<String, Integer> uniqueHostsMap。

更新TaskTracker上所有Task状态在JobTracker处理TaskTracker发送的Heartbeat的过程中,首先会更新JobTracker维护的TaskTracker的状态信息,因为一个TaskTracker上可能运行着很多Task,那么需要更新这些Task的状态,可以通过上面介绍的TaskTrackerStatus的结构看出,对应着一个TaskStatus的状态报告集合,所以这里有一个批量更新TaskStatus状态的操作,实际上会对每一个Task的状态分别进行更新,整体处理流程如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析
大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

具体处理流程,描述如下所示:

从TaskTracker发送的TaskTrackerStatus对象可以提取Task状态报告集合,然后对每一个状态报告进行处理,直到所有的Task的状态都已经被更新到JobTracker内部维护的状态对象上,下面描述每一个TaskStatus的处理过程

(1)如果一个Task的运行状态不为TaskStatus.State.UNASSIGNED,说明该Task还没有在TaskTracker上获得运行机会,则并不让该Task失败(当一个Task指派给一个TaskTracker运行时,会首先在JobTracker端加入到一个超时列表中,由一个独立的线程JobTracker.ExpireLaunchingTasks去检测,该Task是否在给定的时间内(默认是10分钟 )是否在TaskTracker上启动而且一直没有报告状态,如果没有报告,则会将该Task标记为失败),等待下一次被调度分配给TaskTracker去运行。

(2)根据Task的ID,获取到它对应的JobInProgress信息,如果没有获取到则将该Task对应的JobInProgress对象加入到cleanup列表Map<String, Set<JobID>> trackerToJobsToCleanup中,直接返回继续处理下一个TaskStatus报告;如果能够获取到对应的JobInProgress信息,则检查该JobInProgress中包含的Job是否设置初始化完成状态,如果没有设置,则直接将该Task加入到队列Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup中,等待JobTracker调度Kill掉该Task,直接返回继续处理下一个TaskStatus报告。

(3)检查该TaskStatus报告中对应的TaskAttemptID,是否在JobTracker端存在对应的TaskInProgress对象,很有可能JobTracker重启,内存中维护的Map<TaskAttemptID, TaskInProgress> taskidToTIPMap队列中没有TaskInProgress对象,这时JobInProgress对象一定存在,可以通过JobInProgress对象获取到该Task对应的TaskInProgress对象(因为在JobTracker端创建Job的时候,会分别创建4类TIP:map、reduce、cleanup、setup),再将其加入到Map<TaskAttemptID, TaskInProgress> taskidToTIPMap队列中,同时触发已知的一组JobInProgressListener的jobUpdated方法,去更新Job状态。

(4)根据TaskStatus能够获取到所有Fetch失败的Task,查询该Task对应的TaskInProgress对象,从而进一步通知JobInProgress对象,根据设定的允许Task Fetch失败的最大次数限制,确定是否要让该Task失败,并更新TaskInProgress状态。

更新Task状态当Task的状态发生变化的情况下,可能需要更新Task的状态,我们根据JobTracker定义的updateTaskStatus方法,方法声明如下所示:1public synchronized void updateTaskStatus(TaskInProgress tip, TaskStatus status)其中,tip是当前在JobTracker端维护的Task的状态,status是TaskTracker汇报的Task状态,更新JobTracker端Task状态主要是根据心跳汇报的status来更新tip数据结构。更新Task状态的具体流程,如下图所示:

大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析
大数据专家分享MapReduce V1:JobTracker处理Heartbeat流程分析

更新Task状态,主要是更新每个Task对应的在JobTracker端维护的TaskInProgress结构,处理流程描述如下:

如果心跳汇报的status中,Task运行状态为SUCCEEDED,当tip标识已经完成或标识被Kill掉,则统一修改status的运行状态为KILLED;如果心跳汇报的status对应的TaskAttemptID不是cleanup task,当该TaskAttemptID 对应的JobInProgress表示Job已经完成,或失败,或被Kill掉,那么status运行状态为FAILED_UNCLEAN则修改为FAILED,运行状态为KILLED_UNCLEAN则修改为KILLED。

调用TaskInProgress的updateStatus方法,传入当前TaskTracker汇报的status状态对象,更新tip的状态。TaskInProgress会维护每个Task对应的TaskStatus对象oldStatus,并根据汇报的status对更新替换oldStatus。有3种情况不需要更新:第一种是当status的运行状态不等于RUNNING/COMMIT_PENDING/FAILED_UNCLEAN/KILLED_UNCLEAN/UNASSIGNED中的任何一种状态;第二种是status的运行状态为RUNNING、UNASSIGNED中的任意一种状态,并且oldStatus的运行状态为FAILED/KILLED/FAILED_UNCLEAN/KILLED_UNCLEAN/SUCCEEDED/COMMIT_PENDING中任意一种状态;第三种是oldStatus的运行状态为FAILED/KILLED中的任意一种状态,这种情况会把该TaskAttemptID加入到队列TreeMap<TaskAttemptID, Boolean> tasksToKill中标识需要Kill掉该Task。

如果status的运行状态为FAILED状态,并且JobTracker在Safe模式下,则设置status的运行状态为KILLED。

此时,如果oldStatus与status不相等,即TaskAttemptID的状态已经发生变化,则会根据status的运行状态创建不同的TaskCompletionEvent事件(SUCCEEDED/FAILED/KILLED),这些 TaskCompletionEvent事件会被加入到JobInProgress的taskCompletionEvents列表中,供JobClient查询或供JobTracker检索;或者执行相应的操作:如果运行状态为FAILED_UNCLEAN/KILLED_UNCLEAN,则tip中该TaskAttemptID标记为失败并更新相关结构,然后加入到mapCleanupTasks/reduceCleanupTasks列表中等待被清理,同时将该TaskAttemptID对应的数据从JobTracker的taskidToTIPMap、taskidToTrackerMap、trackerToTaskMap这3个队列中删除。

根据构造的TaskCompletionEvent对象,并且如果status的运行状态为SUCCEEDED,则更新其对应的JobInProgress的状态为成功。(原创:时延军(包含链接:http://shiyanjun.cn))

 

声明:本站部分内容来自互联网,如有版权侵犯或其他问题请与我们联系,我们将立即删除或处理。
▍相关推荐
更多资讯 >>>