浅析MapReduce

MapReduce概念

Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架
Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个 hadoop 集群上

map(映射)

“Map”:主结点读入输入数据,把它分成可以用相同方法解决的小数据块(这里是一个分而治之的思想),然后把这些小数据块分发到不同的工作节点上(worder nodes)上,每一个工作节点(worder node)循环做同样的事,这就行成了一个树行结构(分布式计算中的很多模型都和图论有关,pageRank也是),而每一个叶子节点有来处理每一个具体的小数据块,再把这些处理结果返回给父节点。

reduce(归约)

“Reduce”:主结节得到所有子节点的处理结果,然后把所有结果组合并且返回到输出。

MapReduce原理

MapReduce执行流程

(1) 一个 mr 程序启动的时候,最先启动的是 MRAppMaster, MRAppMaster 启动后根据本次 job 的描述信息,计算出需要的 maptask 实例数量,然后向集群申请机器启动相应数量的 maptask 进程
(2) maptask 进程启动之后,根据给定的数据切片(哪个文件的哪个偏移量范围)范围进行数 据处理,主体流程为:
​ A、 利用客户指定的 inputformat 来获取 RecordReader 读取数据,形成输入 KV 对
​ B、 将输入 KV 对传递给客户定义的 map()方法,做逻辑运算,并将 map()方法输出的 KV 对收 集到缓存
​ C、 将缓存中的 KV 对按照 K 分区排序后不断溢写到磁盘文件 (超过缓存内存写到磁盘临时文件,最后都写到该文件,ruduce 获取该文件后,删除 )
(3) MRAppMaster 监控到所有 maptask 进程任务完成之后(真实情况是,某些 maptask 进 程处理完成后,就会开始启动 reducetask 去已完成的 maptask 处 fetch 数据),会根据客户指 定的参数启动相应数量的 reducetask 进程,并告知 reducetask 进程要处理的数据范围(数据
分区)
(4) Reducetask 进程启动之后,根据 MRAppMaster 告知的待处理数据所在位置,从若干台 maptask 运行所在机器上获取到若干个 maptask 输出结果文件,并在本地进行重新归并排序, 然后按照相同 key 的 KV 为一个组,调用客户定义的 reduce()方法进行逻辑运算,并收集运
算输出的结果 KV,然后调用客户指定的 outputformat 将结果数据输出到外部存储

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%871.png

MRjob的运行分析

1、client提交mr job

2、rm协调资源分配

3、nm启动并监控container

4、appmaster协调task

5、appmaster和task均由rm调度、由nm管理

6、hdfs用于在其他entity间共享job文件

MapReduce原理图

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%872.png

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%873.png

job提交

submit()方法内部创建submitter并调用submitJobInternal()

submit过程如下:

请求rm获取appid,用做mr jobid

检查output的有效性

计算inputsplit

复制资源(jar、conf、inputsplit)到hdfs中,存放在以jobid命名的目录下

jar文件在集群上有更多的副本,以备nm使用

rm.submitApplication()

job初始化

rm收到submitApp()后将请求转给Yarnscheduler

scheduler分配container

rm在该container启动appmaster , 并交由nm管理。

appmaster创建多个记录对象跟踪job进度,它将接受task的进度或完成报告。

检索inputsplit

为每个split创建map任务和一定数量的reduce任务(setNumRed..()),此时分配jobid

appmaster判断如何运行task,如果是小job,appmaster会在同一jvm中运行

uber task就是指这一点,因为开启新容器分配和运行程序更耗费资源。

小job的衡量标准是map<10,只有reduce=1,而且inputsize < blocksize。这些值可以修改。

​ mapreduce.job.ubertask.maxmaps

​ mapreduce.job.ubertask.maxreduces

​ mapreduce.job.ubertask.enable

最后,Appmaster调用OutputCommitter的setupJob()方法,默认是FileOutputCommimter,主要是创建output目录和临时工作目录

task资源指派

如果job没有指定为uber task,app
master从rm处为所有m和r任务请求容器。先发起map请求,优先级高于reduce,因为所有map都要在reduce的sort阶段启动前完成。对reduce的请求只有在5%的map任务完成后才会发起。

reduce可运行在任何节点,而map请求有调度器努力维护的数据本地约束。因此会尽量保证maptask是datalocal,但是也存在racklocal和rackoff的情况。对于特定的job,可通过查看job的计数器决定本地级别运行的task数据。

请求也可指定对内存和CPU的要求,默认每个m&r分配1024m内存和1个虚拟core,但可通过以下属性进行修改。mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.cpu.vcores mapreduce.reduce.cpu.vcores.

一旦sler为task指定了特定node的资源,app master会联络nm来启动container。该任务就会由YarnChild的java程序执行,该类会对所需资源进行本地化,比如conf、jar、缓存等,最终运行m或者r。该类运行在专有的jvm中,因此不会影响nm。

每个task都会执行setup和commit动作。他们和task在同一jvm中。如果是File-based的job,commit动作会将out从临时目录移动到最终目录。commit协议会确保在启用了机智运行时,重复的task只要有一个提交,其他都会终止掉。

streaming

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%874.png

流方式会运行特定的mr任务目的在于启动用户提供的可执行程序并与之通信。

流task使用标准io流和进程通信。执行期间java程序传递input k-v对给外部进程,在外部进程中执行用户定义的mr函数,执行后将output kv对回传给java进程。从NM角度看,就如同运行mr程序的子进程。

进度状态更新

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%875.png

job和每个task都有status信息,其中包括state(running、successful complete、failure)、m&r的progress、job计数器、消息和描述等信息。

task执行时,会跟踪自己的progress。对于map任务来说,是input被处理的比例。对于reduce,稍复杂,但系统仍可估算处理的

input 比例,通过将总progress分成3个阶段(copy、sort、reduce),每个阶段各1/3。因此如果一个task运行了一半的reduce,那么他的progress计算如下:

​ 1/3(copy) + 1/3(sort) +1/3(reduce)x1/2=5/6

job的完成

App master在收到job最后一个task完成通知后,会修改job的状态为”successful”,job轮询时就会感知到job已成功完成,因此会打印消息给user并从waitForComplete()方法返回。此时job的统计信息和计数器会打印在console中。
可以让app master发送通知给client,让其进行回调处理。mapreduce.job.endnotification.url
最后,job完成时,app master和task container会删除状态信息。调用OutputCommitter.commitJob()方法。job信息有job historyserver归档供以后使用。