美高梅网址注册-澳门mgm4858集团登录网址
做最好的网站
来自 澳门mgm4858集团登录网址 2019-10-08 07:07 的文章
当前位置: 美高梅网址注册 > 澳门mgm4858集团登录网址 > 正文

MapReduce的执行过程主要包含是三个阶段,map负责

[TOC]

MapReduce的执行过程主要包含是三个阶段:Map阶段、Shuffle阶段、Reduce阶段

MapReduce采用”分而治之”的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果。简单地说,MapReduce就是”任务的分解与结果的汇总”。

图片 1

在Hadoop中,用于执行MapReduce任务的机器角色有两个:

MapReuce 过程分解

  • JobTracker用于调度工作的,一个Hadoop集群中只有一个JobTracker,位于master。
  • TaskTracker用于执行工作,位于各slave上。

Map 阶段

  • split: 会将输入的大文件 split 成一个 HDFS 的 block,每个 map 处理一个 block 的数据
  • map:对输入分片中的每个键值对调用map()函数进行运算,然后输出一个结果键值对

Partitioner:对 map 的输出进行partition,即根据key或value及reduce的数量来决定当前的这对键值对最终应该交由哪个reduce处理。默认是对key哈希后再以reduce task数量取模,默认的取模方式只是为了避免数据倾斜。这个 partition 过程可以通过指定 partitioner 自定义

  • sort:在溢写到磁盘之前,使用快排对缓冲区数据按照partitionIdx, key排序。(每个partitionIdx表示一个分区,一个分区对应一个reduce)

Combiner:如果设置了Combiner,那么在Sort之后,还会对具有相同key的键值对进行合并,减少溢写到磁盘的数据量。

  • spill: map输出写在内存中的环形缓冲区,默认当缓冲区满80%,启动溢写线程,以 round-robin的方式将缓冲的数据写出到 mapreduce.cluster.local.dir 指定的目录磁盘
  • merge:溢写可能会生成多个文件,这时需要将多个文件合并成一个文件。合并的过程中会不断地进行 sort & combine 操作,最后合并成了一个已分区且已排序的文件

在分布式计算中,MapReduce框架负责处理了并行编程中分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等复杂问题,把处理过程高度抽象为两个函数:map和reduce,map负责把任务分解成多个任务,reduce负责把分解后多任务处理的结果汇总起来。

Shuffle阶段

广义上Shuffle阶段横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和merge/sort过程。通常认为Shuffle阶段就是将map的输出作为reduce的输入的过程

  • Copy:Reduce端启动一些copy线程,通过HTTP方式将map端输出文件中属于自己的部分拉取到本地。Reduce会从多个map端拉取数据,并且每个map的数据都是有序的。
  • Merge:Copy过来的数据会先放入内存缓冲区中,这里的缓冲区比较大;当缓冲区数据量达到一定阈值时,将数据溢写到磁盘(与map端类似,溢写过程会执行 sort & combine)。如果生成了多个溢写文件,它们会被merge成一个有序的最终文件。这个过程也会不停地执行 sort & combine 操作。

需要注意的是,用MapReduce来处理的数据集必须具备这样的特点:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。

Reduce阶段

  • reduce:Shuffle阶段最终生成了一个有序的文件作为Reduce的输入,对于该文件中的每一个 <key, [value1,value2...]>调用reduce()方法,并将结果写到HDFS

对于一个MR任务,它的输入、输出以及中间结果都是<key, value>键值对:

  • Map:<k1, v1> ——> list(<k2, v2>)
  • Reduce:<k2, list> ——> list(<k3, v3>)

MR程序的执行过程主要分为三步:Map阶段、Shuffle阶段、Reduce阶段,如下图:

图片 2img

  1. Map阶段
    • 分片:map阶段的输入通常是HDFS上文件,在运行Mapper前,FileInputFormat会将输入文件分割成多个split ——1个split至少包含1个HDFS的Block;然后每一个分片运行一个map进行处理。
    • 执行:对输入分片中的每个键值对调用map()函数进行运算,然后输出一个结果键值对。
      • Partitioner:对map()的输出进行partition,即根据key或value及reduce的数量来决定当前的这对键值对最终应该交由哪个reduce处理。默认是对key哈希后再以reduce task数量取模,默认的取模方式只是为了避免数据倾斜。然后该key/value对以及partitionIdx的结果都会被写入环形缓冲区。
    • 溢写:map输出写在内存中的环形缓冲区,默认当缓冲区满80%,启动溢写线程,将缓冲的数据写出到磁盘。
      • Sort:在溢写到磁盘之前,使用快排对缓冲区数据按照partitionIdx, key排序。(每个partitionIdx表示一个分区,一个分区对应一个reduce)
      • Combiner:如果设置了Combiner,那么在Sort之后,还会对具有相同key的键值对进行合并,减少溢写到磁盘的数据量。
    • 合并:溢写可能会生成多个文件,这时需要将多个文件合并成一个文件。合并的过程中会不断地进行 sort & combine 操作,最后合并成了一个已分区且已排序的文件。
  2. Shuffle阶段:广义上Shuffle阶段横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和merge/sort过程。通常认为Shuffle阶段就是将map的输出作为reduce的输入的过程
    • Copy过程:Reduce端启动一些copy线程,通过HTTP方式将map端输出文件中属于自己的部分拉取到本地。Reduce会从多个map端拉取数据,并且每个map的数据都是有序的。
    • Merge过程:Copy过来的数据会先放入内存缓冲区中,这里的缓冲区比较大;当缓冲区数据量达到一定阈值时,将数据溢写到磁盘(与map端类似,溢写过程会执行 sort & combine)。如果生成了多个溢写文件,它们会被merge成一个有序的最终文件。这个过程也会不停地执行 sort & combine 操作。
  3. Reduce阶段:Shuffle阶段最终生成了一个有序的文件作为Reduce的输入,对于该文件中的每一个键值对调用reduce()方法,并将结果写到HDFS。

在运行程序之前,需要先搭建好Hadoop集群环境,参考《Hadoop+HBase+ZooKeeper分布式集群环境搭建》。

WordCount可以说是最简单的MapReduce程序了,只包含三个文件:一个 Map 的 Java 文件,一个 Reduce 的 Java 文件,一个负责调用的主程序 Java 文件。

我们在当前用户的主文件夹下创建wordcount_01/目录,在该目录下再创建src/classes/。 src 目录存放 Java 的源代码,classes 目录存放编译结果。

TokenizerMapper.java

package com.lisong.hadoop;import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { IntWritable one = new IntWritable; Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString; while(itr.hasMoreTokens { word.set(itr.nextToken; context.write(word, one); } }}

IntSumReducer.java

package com.lisong.hadoop;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,InterruptedException { int sum = 0; for(IntWritable val:values) { sum += val.get(); } result.set; context.write(key,result); }}

WordCount.java

package com.lisong.hadoop;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit; } Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion?0:1); } }

以上三个.java源文件均置于 src 目录下。

Hadoop 2.x 版本中jar不再集中在一个 hadoop-core-*.jar 中,而是分成多个 jar。编译WordCount程序需要如下三个 jar:

$HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar

执行hadoop程序的时候,输入文件必须先放入hdfs文件系统中,不能是本地文件。

1 . 先查看hdfs文件系统的根目录:

2 . 然后利用put将输入文件(多个输入文件位于input文件夹下)复制到hdfs文件系统中:

3 . 运行wordcount程序

4 . 查看运行结果

Hadoop MapReduce操作的是键值对,但这些键值对并不是Integer、String等标准的Java类型。为了让键值对可以在集群上移动,Hadoop提供了一些实现了WritableComparable接口的基本数据类型,以便用这些类型定义的数据可以被序列化进行网络传输、文件存储与大小比较。

  • 值:仅会被简单的传递,必须实现WritableWritableComparable接口。
  • 键:在Reduce阶段排序时需要进行比较,故只能实现WritableComparable接口。

本文由美高梅网址注册发布于澳门mgm4858集团登录网址,转载请注明出处:MapReduce的执行过程主要包含是三个阶段,map负责

关键词: