1.概述
MapReduce的思想核心是“分而治之,先分再合”,适用于大量复杂任务处理场景(大规模数据处理场景)。
MapReduce分两个阶段:
- map阶段(分):如果任何可以拆分并且没有依赖,那么就把复杂的任务拆分成小任务,拆分成小任务之后,可以并行计算,提高处理效率。
- reduce阶段(合):把map阶段的各个局部结果进行全局汇总,得到最终的结果
生活中的MapReduce案例:统计图书馆的书籍总数
“Map”:你数1号书架,我数2号书架。我们人越多,数书就更快。
“Reduce”:我们到一起,把所有人的统计数加在一起。
2.MapReduce架构
和HDFS一样,MapReduce也是采用Master/Slave的架构,其架构图如下所示 :
客户端(Client)
每一个job都会在用户通过Client类将应用程序以及配置参数 Configuration 打包成 JAR 文件存储在HDFS,并把路径提交到 JobTracker 的 master 服务,然后由 master 创建每一个 Task (即MapTask和 ReduceTask) 将他们分发到各个 TaskTracker 服务中去执行。
JobTracker
JobTracker 负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与 job 的健康状况,一旦发现失败,就将相应的任务转移到其他节点;同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。
TaskTracker
TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker 使用"slot"等量划分本节点上的资源量。"slot"代表计算资源(CPU、内存等)。一个Task 获取到一个slot 后才有机会运行,而Hadoop 调度器的作用就是将各个TaskTracker 上的空闲slot分配给Task 使用。slot分为Map slot 和Reduce slot 两种,分别供Map Task 和Reduce Task 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。
Task
Task 分为Map Task 和Reduce Task 两种,均由TaskTracker 启动。HDFS 以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。但需要注意的是,split 的多少决定了Map Task 的数目,因为每个split 只会交给一个Map Task 处理。Split 和 Block的关系如下图所示 :
3.MapReduce运行流程
一个mapreduce作业的执行流程是:作业提交->作业初始化->任务分配->任务执行->更新任务执行进度和状态->作业完成。
1、提交作业
- JobClient使用runjob方法创建一个JobClient实例,调用submitJob()方法进行作业的提交 。
2、作业初始化
- 当JobTracker收到Job提交的请求后,将Job保存在一个内部队列,并让Job Scheduler(作业调度器)处理并初始化。
- 初始化涉及到创建一个封装了其tasks的job对象,并保持对task的状态和进度的跟踪(步骤5)。
- 当创建要运行的一系列task对象后,Job Scheduler首先开始从文件系统中获取由JobClient计算的input splits(步骤6),然后再为每个split创建map task。
3、任务分配
- TaskTracker 和 JobTracker之间的通信和任务分配是通过心跳机制完成的。
- TaskTracker作为一个单独的 JVM,它执行一个简单的循环,主要实现每隔一段时间向JobTracker发送心跳,告诉JobTracker此TaskTracker是否存活,是否准备执行新的任务。如果有待分配的任务,它就会为TaskTracker分配一个任务。
4、任务的执行
- TaskTracker申请到新的任务之后,就要在本地运行了。首先将任务本地化(包括运行任务所需的数据、配置信息、代码等),即从HDFS复制到本地,调用localizeJob()完成的。
- 对于使用Streaming和Pipes创建Map或者Reduce程序的任务,Java会把key/value传递给外部进程,然后通过用户自定义的Map或者Reduce进行处理,然后把key/value传回到java中。其中就好像是TaskTracker的子进程在处理Map和Reduce代码一样。
5、更新任务的执行进度和状态
- 任务的进度和状态是通过heartbeat(心跳机制)来更新和维护的。
- 对于Map Task,进度就是已处理数据和所有输入数据的比例。
- 对于Reduce Task,情况就有点复杂,包括3部分,拷贝中间结果文件、排序、reduce调用,每部分占1/3。
6、作业完成
- 当 Job 完成后,JobTracker会收一个Job Complete的通知,并将当前的Job状态更新为successful。同时JobClient也会轮询获知提交的Job已经完成,将信息显示给用户。最后,JobTracker会清理和回收该Job的相关资源,并通知TaskTracker进行相同的操作(比如删除中间结果文件)
4.MapReduce处理过程分析
以单词计数为例,分析MR的整个计算过程
Q1 在MapReduce任务中,需要启动多少个MapTask任务去处理数据?
例如:/wordcount/input 1.txt 200M 2.txt 100M,FileInputFormat会进行逻辑切片。逐个遍历待处理目录针对待处理目录下的文件以及切片大小对文件形成规划。
split size=block size=128M 1个切片对应一个maptask
split-1 1.txt --> 0- 128M
split-2 1.txt --> 128- 200M
split-3 2.txt --> 0- 100M
Q2:数据何时写入缓冲区?
context.write(k2,v2)往外写数据,数据写入到磁盘里(disk),此时要考虑性能问题。如果读取一行数据,就调用map方法,就写入一次;如果有很多行(假设100行),程序就会执行很多次(100次),对应很多次的IO操作(100次)。此时就需要内存缓冲区(3个byte数组)称为环形缓冲区。默认缓冲区的大小为100M,当内存缓冲区的大小为80M时,就开始向磁盘写入数据;内存写到磁盘称为spill(溢出/溢写)。假设文件大小不足128M,至少会发生一次溢出,且在发生溢出行为时,会发生排序(字典序),溢出的次数=文件个数。
4.1 Mapper任务执行过程详解
- 1、输入目录下的文件按照一定的标准逐个逻辑切片,形成规划;默认:split size = block size(默认128M);每一个切片由一个MapTask处理。
- 2、默认读取数据组件TextInputFormat对切片中的数据(每一行文本内容)按照一定规则解析成一个<k,v>对。k:光标起始偏移量,v:此行内容。
- 3、每一行内容解析出的每一个<k,v>对调用一次map方法,每次调用就会输出零个或多个键值对。(多少行调用多少次map方法)数据在缓冲。
- 4、按照一定的规则对阶段3的键值对进行分区,默认只有一个区。当 reducetask=2 时,就会涉及数据分区;决定map输出的kv去到哪一个reducetask。分区的数量=reducetask的个数(正常情况) 异常情况:分区个数多 直接报错 非法分区 分区个数少 产生空文件。
- 5、对每个分区中的键值对进行排序(溢写的同时–k的字典序);对溢出的临时文件合并为最终局部结果文件(分区且排序的文件)。
- 6、数据局部聚合处理(combiner处理),键值相等的键值对会调用一次reduce方法。(本阶段默认是没有的)。
4.2 Reducer任务执行过程详解
- 1、Reducer任务会主动从Mapper任务复制其输出的键值对。(开启线程(fetcher)主动拉取Map阶段输出的键值对)
- 2、 1.**合并(merge)**复制到Reducer本地数据;2.排序(sort):对合并后的数据进行排序;3.分组(grouping) 对合并后的数进行分组(k,Iterable([1,1,1]))。
- 3、键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,将键值对写入到HDFS中。
5.MapTask工作机制
Map阶段流程大体如下图所示 :
1).input File通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理。
2).数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区)。
3).写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘。
4).当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
详细步骤:
1. 读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。key:每行首字符偏移值,value:此行文本内容
3.读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader每读取一行内容就调用一次map方法。
4. map执行完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认为:key.hash % reduce task数量。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job.setPartitionerClass上。
5.将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
注:环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。
缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
6.当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序
如果job设置过Combiner(局部聚合),那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
7. 每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
6.Shuffle机制
map阶段处理的数据如何传递给reduce阶段,是MapReduce框架中最关键的一个流程,这个流程就叫shuffle。
shuffle: 洗牌、发牌——(核心机制:数据分区,排序,合并)。
shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段,描述了两个阶段之间数据处理的特性。
一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle。
shuffle过程:
Map阶段:
- Collect阶段:将MapTask的结果输出到默认大小为100M的环形缓冲区,保存的是key/value,Partition分区信息等。
- Spill阶段:当内存中的数据量达到一定的阀值(spill percent=0.8)的时候,就会将数据写入本地磁盘; 数据写入磁盘之前需要对数据进行一次排序的操作; 如果配置了combiner(默认是没有的),maptask的结果进行局部聚合。
- Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。
Reduce阶段:
- Copy阶段 :ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据 数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。
- Merge阶段 :在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。
- Sort阶段 :在对数据进行合并的同时,会进行排序操作;MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。
shuffle是Mapreduce的诟病所在;原因是在shuffle过程中涉及到了大量的内存到磁盘 磁盘到内存 内存再到磁盘的过程,执行效率大大降低。
当涉及多个mr程序之间的串联执行的时候,shuffle的弊端 就会被无限放大,故mr程序不是合做迭代计算。
Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。
7.ReduceTask工作机制
Reduce阶段流程大体如下图所示 :
Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。
详细步骤:
1.Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
2. Merge阶段。 这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。
merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。
当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
3.把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
4. 对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。