浅析MapReduce(二)
浅析MapReduce(二)
Map -> Reduce
Map阶段五大步骤
MapReduce其实是分治算法的一种实现,所谓分治算法就是“就是分而治之”,将大的问题分解为相同类型的子问题(最好具有相同的规模),对子问题进行求解,然后合并成大问题的解。MapReduce就是分治法的一种,将输入进行分片,然后交给不同的task进行处理,然后合并成最终的解。具体流程图如下:
MapReduce实际的处理过程可以理解为Input->Map->Sort->Combine->Partition->Reduce->Output。
(1)Input阶段 数据以一定的格式传递给Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以设置,也可以自定义分分片函数。
(2)Map阶段 对输入的key,value进行处理,即map(k1,v1) -> list(k2,v2),使用Job.setMapperClass进行设置。
(3) Sort阶段 对于Mapper的输出进行排序,使用Job.setOutputKeyComparatorClass进行设置,然后定义排序规则
(4) Combine阶段 这个阶段对于Sort之后有相同key的结果进行合并,使用Job.setCombinerClass进行设置,也可以自定义Combine Class类。
(5) Partition阶段 将Mapper的中间结果按照Key的范围划分为R份(Reduce作业的个数),默认使用HashPatitioner(key.hashCode() & Integer.MAX_VALUE) % numPartitions),也可以自定义划分的函数。使用Job.setPartitionClass进行设置。
(6) Reduce阶段 对于Mapper的结果进一步进行处理,Job.setReducerClass进行设置自定义的Reduce类。
(7) Output阶段 Reducer输出数据的格式。
MapReduce将作业的整个运行过程分为两个阶段:Map阶段和Reduce阶段
Map阶段由一定数量的Map Task组成
输入数据格式解析:InputFormat
输入数据处理:Mapper
数据分组:Partitioner
Reduce阶段由一定数量的Reduce Task组成
数据远程拷贝
数据按照key排序
数据处理:Reducer
数据输出格式:OutputFormat
InputFormat
InputFormat 负责处理MR的输入部分.
有三个作用:
1验证作业的输入是否规范.
2把输入文件切分成InputSplit. (处理跨行问题)
3提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理.
TextInputFormat
1.TextInputformat是默认的处理类,处理普通文本文件。
2.文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。
3.默认以\n或回车键作为一行记录。
4.TextInputFormat继承了FileInputFormat。
combiner
combiner的出现-为什么需要Map规约操作
在上述过程中,我们看到至少两个性能瓶颈:
(1)如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。
总结:网络带宽严重被占降低程序效率;
在MapReduce编程模型中,在Mapper和Reducer之间有一个非常重要的组件,它解决了上述的性能瓶颈问题,它就是Combiner。
①与mapper和reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用。
②并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。
combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。
每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能
Combiner的作用
(1)Combiner最基本是实现本地key的聚合,对map输出的key排序,value进行迭代。如下所示:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
(2)Combiner还有本地reduce功能(其本质上就是一个reduce),例如Hadoop自带的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致,如下所示:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K3, V3)
reduce: (K3, list(V3)) → list(K4, V4)使用combiner之后,先完成的map会在本地聚合,提升速度。对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。
融合Combiner的MapReduce
使用MyReducer作为Combiner
// 设置Map规约Combiner
job.setCombinerClass(MyReducer.class);
执行后看到map的输出和combine的输入统计是一致的,而combine的输出与reduce的输入统计是一样的。
由此可以看出规约操作成功,而且执行在map的最后,reduce之前。
自己定义Combiner
1 | public static class MyCombiner extends |
shuffle
Reduce阶段三个步骤
Step2.1就是一个Shuffle[随机、洗牌]操作
系统执行排序的过程(即将map输出作为输入传给reduce),称为shuffle.即这张图是官方对Shuffle过程的描述,hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心(心脏)。shuffle的主要工作是从Map结束到Reduce开始之间的过程。首先看下这张图,就能了解shuffle所处的位置。图中的partitions、copy phase、sort phase所代表的就是shuffle的不同阶段(大致范围)。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。
map 端的Shuffle细节:
\1. 在map task执行时,它的输入数据来源于HDFS的block。(map函数产生输出时,利用缓冲的方式写入内存,并出于效率考虑的方式就行预排序。如上图。此处默认的内存大小为100M,可通过io.sort.mr 来设置,当此缓冲区编程 %80的时候 ,一个后台线程就会将内容写入磁盘。在写入磁盘之前,线程首先根据最终要传的reduce把这些数据划分成相应的分区(partition),在每个分区中,后台线程进行内排序,如果有combine,就会在排序后的分区内执行。)
在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对:相同的key 到底应该交由哪个reduce去做,是现在决定的,也就是partition 作用。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写入缓冲区,当然写入之前,key与value值都会被序列化成字节数组。
reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge(合并),也最终形成一个文件作为reduce task的输入文件
reduce 端的Shuffle细节:Copy过程,简单地拉取数据。reduce 通过http的方式得到输出文件的分区。Merge阶段reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge(合并),也最终形成一个文件作为reduce task的输入文件。
partitioner
Hadoop内置Partitioner
MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。
用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区。Hadoop中自带了一个默认的分区类HashPartitioner,
它继承了Partitioner类,提供了一个getPartition的方法
1 | /** Partition keys by their {@link Object#hashCode()}. */ |
Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出.partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的。这里其实可以理解归类。
Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对
作用
- Partitioner决定了Map Task输出的每条数据交给哪个Reduce Task处理
- 默认实现:HashPartitioner是mapreduce的默认partitioner。计算方法是 reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。(hash(key) mod R 其中R是Reduce Task数目)
- 允许用户自定义 很多情况需自定义Partitioner比如“hash(hostname(URL)) mod R”确保相同域名的网页交给同一个Reduce Task处理
自定义partitioner
1 | /* |
MapReduce排序分组
Step1.4第四步中需要对不同分区中的数据进行排序和分组,默认情况按照key进行排序和分组