浅析MapReduce(二)

Map -> Reduce

Map阶段五大步骤

http://ov1nop9io.bkt.clouddn.com/121403128869360.png

MapReduce其实是分治算法的一种实现,所谓分治算法就是“就是分而治之”,将大的问题分解为相同类型的子问题(最好具有相同的规模),对子问题进行求解,然后合并成大问题的解。MapReduce就是分治法的一种,将输入进行分片,然后交给不同的task进行处理,然后合并成最终的解。具体流程图如下:

http://ov1nop9io.bkt.clouddn.com/20161228113443840.png

MapReduce实际的处理过程可以理解为Input->Map->Sort->Combine->Partition->Reduce->Output。

(1)Input阶段 数据以一定的格式传递给Mapper,有TextInputFormat,DBInputFormat,SequenceFileFormat等可以使用,在Job.setInputFormat可以设置,也可以自定义分分片函数。

(2)Map阶段 对输入的key,value进行处理,即map(k1,v1) -> list(k2,v2),使用Job.setMapperClass进行设置。

(3) Sort阶段 对于Mapper的输出进行排序,使用Job.setOutputKeyComparatorClass进行设置,然后定义排序规则

(4) Combine阶段 这个阶段对于Sort之后有相同key的结果进行合并,使用Job.setCombinerClass进行设置,也可以自定义Combine Class类。

(5) Partition阶段 将Mapper的中间结果按照Key的范围划分为R份(Reduce作业的个数),默认使用HashPatitioner(key.hashCode() & Integer.MAX_VALUE) % numPartitions),也可以自定义划分的函数。使用Job.setPartitionClass进行设置。

(6) Reduce阶段 对于Mapper的结果进一步进行处理,Job.setReducerClass进行设置自定义的Reduce类。

(7) Output阶段 Reducer输出数据的格式。

MapReduce将作业的整个运行过程分为两个阶段:Map阶段和Reduce阶段
Map阶段由一定数量的Map Task组成
输入数据格式解析:InputFormat
输入数据处理:Mapper
数据分组:Partitioner
Reduce阶段由一定数量的Reduce Task组成
数据远程拷贝
数据按照key排序
数据处理:Reducer
数据输出格式:OutputFormat

InputFormat

InputFormat 负责处理MR的输入部分.
有三个作用:
1验证作业的输入是否规范.
2把输入文件切分成InputSplit. (处理跨行问题)
3提供RecordReader 的实现类,把InputSplit读到Mapper中进行处理.

TextInputFormat

1.TextInputformat是默认的处理类,处理普通文本文件。
2.文件中每一行作为一个记录,他将每一行在文件中的起始偏移量作为key,每一行的内容作为value。
3.默认以\n或回车键作为一行记录。
4.TextInputFormat继承了FileInputFormat。

combiner

combiner的出现-为什么需要Map规约操作

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%878.png

在上述过程中,我们看到至少两个性能瓶颈:

(1)如果我们有10亿个数据,Mapper会生成10亿个键值对在网络间进行传输,但如果我们只是对数据求最大值,那么很明显的Mapper只需要输出它所知道的最大值即可。这样做不仅可以减轻网络压力,同样也可以大幅度提高程序效率。

  总结:网络带宽严重被占降低程序效率;

在MapReduce编程模型中,在Mapper和Reducer之间有一个非常重要的组件,它解决了上述的性能瓶颈问题,它就是Combiner。

http://ov1nop9io.bkt.clouddn.com/20161227175120887.gif

①与mapper和reducer不同的是,combiner没有默认的实现,需要显式的设置在conf中才有作用。

②并不是所有的job都适用combiner,只有操作满足结合律的才可设置combiner。

combine操作类似于:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt为求和、求最大值的话,可以使用,但是如果是求中值的话,不适用。

每一个map都可能会产生大量的本地输出,Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能

Combiner的作用

(1)Combiner最基本是实现本地key的聚合,对map输出的key排序,value进行迭代。如下所示:

​ map: (K1, V1) → list(K2, V2)

​ combine: (K2, list(V2)) → list(K2, V2)

​ reduce: (K2, list(V2)) → list(K3, V3)

(2)Combiner还有本地reduce功能(其本质上就是一个reduce),例如Hadoop自带的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致,如下所示:

  map: (K1, V1) → list(K2, V2)
  combine: (K2, list(V2)) → list(K3, V3)
  reduce: (K3, list(V3)) → list(K4, V4)

使用combiner之后,先完成的map会在本地聚合,提升速度。对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。

融合Combiner的MapReduce

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%8789.png

使用MyReducer作为Combiner

// 设置Map规约Combiner
job.setCombinerClass(MyReducer.class);

执行后看到map的输出和combine的输入统计是一致的,而combine的输出与reduce的输入统计是一样的。
由此可以看出规约操作成功,而且执行在map的最后,reduce之前。

自己定义Combiner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static class MyCombiner extends
Reducer<Text, LongWritable, Text, LongWritable> {
protected void reduce(
Text key,
java.lang.Iterable<LongWritable> values,
org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws java.io.IOException, InterruptedException {
// 显示次数表示规约函数被调用了多少次,表示k2有多少个分组
System.out.println("Combiner输入分组<" + key.toString() + ",N(N>=1)>");
long count = 0L;
for (LongWritable value : values) {
count += value.get();
// 显示次数表示输入的k2,v2的键值对数量
System.out.println("Combiner输入键值对<" + key.toString() + ","
+ value.get() + ">");
}
context.write(key, new LongWritable(count));
// 显示次数表示输出的k2,v2的键值对数量
System.out.println("Combiner输出键值对<" + key.toString() + "," + count
+ ">");
};
}

shuffle

Reduce阶段三个步骤

http://ov1nop9io.bkt.clouddn.com/%E5%9B%BE%E7%89%87562.png

Step2.1就是一个Shuffle[随机、洗牌]操作

http://ov1nop9io.bkt.clouddn.com/20161228102308185.png

系统执行排序的过程(即将map输出作为输入传给reduce),称为shuffle.即这张图是官方对Shuffle过程的描述,hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心(心脏)。shuffle的主要工作是从Map结束到Reduce开始之间的过程。首先看下这张图,就能了解shuffle所处的位置。图中的partitions、copy phase、sort phase所代表的就是shuffle的不同阶段(大致范围)。也可以这样理解, Shuffle描述着数据从map task输出到reduce task输入的这段过程。

map 端的Shuffle细节:
\1. 在map task执行时,它的输入数据来源于HDFS的block。(map函数产生输出时,利用缓冲的方式写入内存,并出于效率考虑的方式就行预排序。如上图。此处默认的内存大小为100M,可通过io.sort.mr 来设置,当此缓冲区编程 %80的时候 ,一个后台线程就会将内容写入磁盘。在写入磁盘之前,线程首先根据最终要传的reduce把这些数据划分成相应的分区(partition),在每个分区中,后台线程进行内排序,如果有combine,就会在排序后的分区内执行。)

  1. 在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对:相同的key 到底应该交由哪个reduce去做,是现在决定的,也就是partition 作用。MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

  2. 接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写入缓冲区,当然写入之前,key与value值都会被序列化成字节数组。

  3. reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge(合并),也最终形成一个文件作为reduce task的输入文件

    reduce 端的Shuffle细节:Copy过程,简单地拉取数据。reduce 通过http的方式得到输出文件的分区。Merge阶段reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge(合并),也最终形成一个文件作为reduce task的输入文件。

partitioner

Hadoop内置Partitioner

MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。

用户在中间key上使用分区函数来对数据进行分区,之后在输入到后续任务执行进程。一个默认的分区函数式使用hash方法(比如常见的:hash(key) mod R)进行分区。hash方法能够产生非常平衡的分区。Hadoop中自带了一个默认的分区类HashPartitioner,

它继承了Partitioner类,提供了一个getPartition的方法

1
2
3
4
5
6
7
8
9
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

}

Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出.partition是分割map每个节点的结果,按照key分别映射给不同的reduce,也是可以自定义的。这里其实可以理解归类。

Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对

作用

  1. Partitioner决定了Map Task输出的每条数据交给哪个Reduce Task处理
  2. 默认实现:HashPartitioner是mapreduce的默认partitioner。计算方法是 reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。(hash(key) mod R 其中R是Reduce Task数目)
  3. 允许用户自定义 很多情况需自定义Partitioner比如“hash(hostname(URL)) mod R”确保相同域名的网页交给同一个Reduce Task处理

自定义partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
* 自定义Partitioner类
*/
public static class KpiPartitioner extends Partitioner<Text, KpiWritable> {
@Override
public int getPartition(Text key, KpiWritable value, int numPartitions) {
// 实现不同的长度不同的号码分配到不同的reduce task中
int numLength = key.toString().length();
if (numLength == 11) {
return 0;
} else {
return 1;
}
}

}
/**
设置为打包运行,设置Partitioner为LiuPartitioner设置ReducerTask的个数为2
注意:分区的例子必须要设置为打成jar包运行!*/
public int run(String[] args) throws Exception {

// 定义一个作业
Job job = new Job(getConf(), "MyJob");
// 分区需要设置为打包运行
job.setJarByClass(MyLiuJob.class);
// 设置输入目录
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 设置自定义Mapper类
job.setMapperClass(MyMapper.class);
// 指定<k2,v2>的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(KpiWritable.class);
// 设置Partitioner
job.setPartitionerClass(LiuPartitioner.class);
job.setNumReduceTasks(2);
// 设置自定义Reducer类
job.setReducerClass(MyReducer.class);
// 指定<k3,v3>的类型
job.setOutputKeyClass(Text.class);
job.setOutputKeyClass(KpiWritable.class);
// 设置输出目录
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
// 提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}

MapReduce排序分组

http://ov1nop9io.bkt.clouddn.com/121403128869360.png

Step1.4第四步中需要对不同分区中的数据进行排序和分组,默认情况按照key进行排序和分组