1 概述

该瞅瞅MapReduce的内部运行原理了,以前只知道个皮毛,再不搞搞,不然怎么死的都不晓得。下文会以2.4版本中的WordCount这个经典例子作为分析的切入点,一步步来看里面到底是个什么情况。

2 为什么要使用MapReduce

Map/Reduce,是一种模式,适合解决并行计算的问题,比如TopN、贝叶斯分类等。注意,是并行计算,而非迭代计算,像涉及到层次聚类的问题就不太适合了。

从名字可以看出,这种模式有两个步骤,Map和Reduce。Map即数据的映射,用于把一组键值对映射成另一组新的键值对,而Reduce这个东东,以Map阶段的输出结果作为输入,对数据做化简、合并等操作。

而MapReduce是Hadoop生态系统中基于底层HDFS的一个计算框架,它的上层又可以是Hive、Pig等数据仓库框架,也可以是Mahout这样的数据挖掘工具。由于MapReduce依赖于HDFS,其运算过程中的数据等会保存到HDFS上,把对数据集的计算分发给各个节点,并将结果进行汇总,再加上各种状态汇报、心跳汇报等,其只适合做离线计算。和实时计算框架Storm、Spark等相比,速度上没有优势。旧的Hadoop生态几乎是以MapReduce为核心的,但是慢慢的发展,其扩展性差、资源利用率低、可靠性等问题都越来越让人觉得不爽,于是才产生了Yarn这个新的东东,并且二代版的Hadoop生态都是以Yarn为核心。Storm、Spark等都可以基于Yarn使用。

3 怎么运行MapReduce

明白了哪些地方可以使用这个牛叉的MapReduce框架,那该怎么用呢?Hadoop的MapReduce源码给我们提供了范例,在其hadoop-mapreduce-examples子工程中包含了MapReduce的Java版例子。在写完类似的代码后,打包成jar,在HDFS的客户端运行:

bin/hadoop jar mapreduce_examples.jar mainClass args

即可。当然,也可以在IDE(如Eclipse)中,进行远程运行、调试程序。

至于,HadoopStreaming方式,网上有很多。我们这里只讨论Java的实现。

4 如何编写MapReduce程序

如前文所说,MapReduce中有Map和Reduce,在实现MapReduce的过程中,主要分为这两个阶段,分别以两类函数进行展现,一个是map函数,一个是reduce函数。map函数的参数是一个<key,value>键值对,其输出结果也是键值对,reduce函数以map的输出作为输入进行处理。

4.1 代码构成

实际的代码中,需要三个元素,分别是Map、Reduce、运行任务的代码。这里的Map类是继承了org.apache.hadoop.mapreduce.Mapper,并实现其中的map方法;而Reduce类是继承了org.apache.hadoop.mapreduce.Reducer,实现其中的reduce方法。至于运行任务的代码,就是我们程序的入口。

下面是Hadoop提供的WordCount源码。

 /**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.examples; import java.io.IOException;
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1);
private Text word = new Text(); public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
} public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

4.2 入口类

4.2.1 参数获取

首先定义配置文件类Configuration,此类是Hadoop各个模块的公共使用类,用于加载类路径下的各种配置文件,读写其中的配置选项。

第二步中,用到了GenericOptionsParser类,其目的是将命令行中参数自动设置到变量conf中。

GenericOptionsParser的构造方法进去之后,会进行到parseGeneralOptions,对传入的参数进行解析:

 private void parseGeneralOptions(Options opts, Configuration conf,

       String[] args) throws IOException {

     opts = buildGeneralOptions(opts);

     CommandLineParser parser = new GnuParser();

     try {

       commandLine = parser.parse(opts, preProcessForWindows(args), true);

       processGeneralOptions(conf, commandLine);

     } catch(ParseException e) {

       LOG.warn("options parsing failed: "+e.getMessage());

       HelpFormatter formatter = new HelpFormatter();

       formatter.printHelp("general options are: ", opts);

     }

   }

而getRemainingArgs方法会获得传入的参数,接着在main方法中会进行判断参数的个数,由于此处是WordCount计算,只需要传入文件的输入路径和输出路径即可,因此参数的个数为2,否则将退出:

 if (otherArgs.length != 2) {

       System.err.println("Usage: wordcount <in> <out>");

       System.exit(2);

 }

如果在代码运行的时候传入其他的参数,比如指定reduce的个数,可以根据GenericOptionsParser的命令行格式这么写:

bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5

其规则是-D加MapReduce的配置选项,当然还支持-fs等其他参数传入。当然,默认情况下Reduce的数目为1,Map的数目也为1。

4.2.2 Job定义

定义Job对象,其构造方法为:

 public Job(Configuration conf, String jobName) throws IOException {

     this(conf);

     setJobName(jobName);

   }

可见,传入的"word count"就是Job的名字。而conf被传递给了JobConf进行环境变量的获取:

 public JobConf(Configuration conf) {

     super(conf);    

     if (conf instanceof JobConf) {

       JobConf that = (JobConf)conf;

       credentials = that.credentials;

     }
checkAndWarnDeprecation();
}

Job已经实例化了,下面就得给这个Job加点佐料才能让它按照我们的要求运行。于是依次给Job添加启动Jar包、设置Mapper类、设置合并类、设置Reducer类、设置输出键类型、设置输出值的类型。

这里有必要说下设置Jar包的这个方法setJarByClass:

 public void setJarByClass(Class<?> cls) {

     ensureState(JobState.DEFINE);

     conf.setJarByClass(cls);

   }

它会首先判断当前Job的状态是否是运行中,接着通过class找到其所属的jar文件,将jar路径赋值给mapreduce.job.jar属性。至于寻找jar文件的方法,则是通过classloader获取类路径下的资源文件,进行循环遍历。具体实现见ClassUtil类中的findContainingJar方法。

搞完了上面的东西,紧接着就会给mapreduce.input.fileinputformat.inputdir参数赋值,这是Job的输入路径,还有mapreduce.input.fileinputformat.inputdir,这是Job的输出路径。具体的位置,就是我们前面main中传入的Args。

4.2.3 Job提交

万事俱备,那就运行吧。

这里调用的方法如下:

 public boolean waitForCompletion(boolean verbose

                                    ) throws IOException, InterruptedException,

                                             ClassNotFoundException {

     if (state == JobState.DEFINE) {

       submit();

     }

     if (verbose) {

       monitorAndPrintJob();

     } else {

       // get the completion poll interval from the client.

       int completionPollIntervalMillis =

         Job.getCompletionPollInterval(cluster.getConf());

       while (!isComplete()) {

         try {

           Thread.sleep(completionPollIntervalMillis);

         } catch (InterruptedException ie) {

         }

       }

     }

     return isSuccessful();

   }

至于方法的参数verbose,如果想在控制台打印当前的进度,则设置为true。

至于submit方法,如果当前在HDFS的配置文件中配置了mapreduce.framework.name属性为“yarn”的话,会创建一个YARNRunner对象来进行任务的提交。其构造方法如下:

 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,

       ClientCache clientCache) {

     this.conf = conf;

     try {

       this.resMgrDelegate = resMgrDelegate;

       this.clientCache = clientCache;

       this.defaultFileContext = FileContext.getFileContext(this.conf);

     } catch (UnsupportedFileSystemException ufe) {

       throw new RuntimeException("Error in instantiating YarnClient", ufe);

     }

   }

其中,ResourceMgrDelegate实际上ResourceManager的代理类,其实现了YarnClient接口,通过ApplicationClientProtocol代理直接向RM提交Job,杀死Job,查看Job运行状态等操作。同时,在ResourceMgrDelegate类中会通过YarnConfiguration来读取yarn-site.xml、core-site.xml等配置文件中的配置属性。

下面就到了客户端最关键的时刻了,提交Job到集群运行。具体实现类是JobSubmitter类中的submitJobInternal方法。这个牛气哄哄的方法写了100多行,还不算其几十行的注释。我们看它干了点啥。

Step1:

检查job的输出路径是否存在,如果存在则抛出异常。

Step2:

初始化用于存放Job相关资源的路径。注意此路径的构造方式为:

 conf.get(MRJobConfig.MR_AM_STAGING_DIR,

         MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)

         + Path.SEPARATOR + user

 + Path.SEPARATOR + STAGING_CONSTANT

其中,MRJobConfig.DEFAULT_MR_AM_STAGING_DIR为“/tmp/hadoop-yarn/staging”,STAGING_CONSTANT为".staging"。

Step3:

设置客户端的host属性:mapreduce.job.submithostname和mapreduce.job.submithostaddress。

Step4:

通过RPC,向Yarn的ResourceManager申请JobID对象。

Step5:

从HDFS的NameNode获取验证用的Token,并将其放入缓存。

Step6:

将作业文件上传到HDFS,这里如果我们前面没有对Job命名的话,默认的名称就会在这里设置成jar的名字。并且,作业默认的副本数是10,如果属性mapreduce.client.submit.file.replication没有被设置的话。

Step7:

文件上传到HDFS之后,还要被DistributedCache进行缓存起来。这是因为计算节点收到该作业的第一个任务后,就会有DistributedCache自动将作业文件Cache到节点本地目录下,并且会对压缩文件进行解压,如:.zip,.jar,.tar等等,然后开始任务。

最后,对于同一个计算节点接下来收到的任务,DistributedCache不会重复去下载作业文件,而是直接运行任务。如果一个作业的任务数很多,这种设计避免了在同一个节点上对用一个job的文件会下载多次,大大提高了任务运行的效率。

Step8:

对每个输入文件进行split划分。注意这只是个逻辑的划分,不是物理的。因为此处是输入文件,因此执行的是FileInputFormat类中的getSplits方法。只有非压缩的文件和几种特定压缩方式压缩后的文件才分片。分片的大小由如下几个参数决定:mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize、文件的块大小。

具体计算方式为:

Math.max(minSize, Math.min(maxSize, blockSize))

分片的大小有可能比默认块大小64M要大,当然也有可能小于它,默认情况下分片大小为当前HDFS的块大小,64M。

接下来就该正儿八经的获取分片详情了。代码如下:

           long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize;
} if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts())); }

Step8.1

将bytesRemaining(剩余未分片字节数)设置为整个文件的长度。

Step8.2

如果bytesRemaining超过分片大小splitSize一定量才会将文件分成多个InputSplit,SPLIT_SLOP(默认1.1)。接着就会执行如下方法获取block的索引,其中第二个参数是这个block在整个文件中的偏移量,在循环中会从0越来越大:

 protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
return i;
}
} BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset + " is outside of file (0.." + fileLength + ")");
}

将符合条件的块的索引对应的block信息的主机节点以及文件的路径名、开始的偏移量、分片大小splitSize封装到一个InputSplit中加入List<InputSplit> splits。

Step8.3

bytesRemaining -= splitSize修改剩余字节大小。剩余如果bytesRemaining还不为0,表示还有未分配的数据,将剩余的数据及最后一个block加入splits。

Step8.4

如果不允许分割isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中;如果文件的长度==0,则splits.add(new FileSplit(path, 0, length, new String[0]))没有block,并且初始和长度都为0;

Step8.5

将输入目录下文件的个数赋值给 "mapreduce.input.num.files",方便以后校对,返回分片信息splits。

  这就是getSplits获取分片的过程。当使用基于FileInputFormat实现InputFormat时,为了提高MapTask的数据本地性,应尽量使InputSplit大小与block大小相同。

 如果分片大小超过bolck大小,但是InputSplit中的封装了单个block的所在主机信息啊,这样能读取多个bolck数据吗?

比如当前文件很大,1G,我们设置的最小分片是100M,最大是200M,当前块大小为64M,经过计算后的实际分片大小是100M,这个时候第二个分片中存放的也只是一个block的host信息。需要注意的是split是逻辑分片,不是物理分片,当Map任务需要的数据本地性发挥作用时,会从本机的block开始读取,超过这个block的部分可能不在本机,这就需要从别的DataNode拉数据过来,因为实际获取数据是一个输入流,这个输入流面向的是整个文件,不受split的影响,split的大小越大可能需要从别的节点拉的数据越多,从从而效率也会越慢,拉数据的多少是由getSplits方法中的splitSize决定的。所以为了更有效率,分片的大小尽量保持在一个block大小吧。

Step9:

将split信息和SplitMetaInfo都写入HDFS中。使用方法:

 JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array);

Step10:

对Map数目设置,上面获得到的split的个数就是实际的Map任务的数目。

Step11:

相关配置写入到job.xml中:

 jobCopy.writeXml(out);

Step12:

通过如下代码正式提交Job到Yarn:

 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

这里就涉及到YarnClient和RresourceManager的RPC通信了。包括获取applicationId、进行状态检查、网络通信等。

Step13:

上面通过RPC的调用,最后会返回一个JobStatus对象,它的toString方法可以在JobClient端打印运行的相关日志信息。

4.2.4 另一种运行方式

提交MapReduce任务的方式除了上述源码中给出的之外,还可以使用ToolRunner方式。具体方式为:

 ToolRunner.run(new Configuration(),new WordCount(), args); 

至此,我们的MapReduce的启动类要做的事情已经分析完了。

-------------------------------------------------------------------------------

如果您看了本篇博客,觉得对您有所收获,请点击右下角的 [推荐]

如果您想转载本博客,请注明出处

如果您对本文有意见或者建议,欢迎留言

感谢您的阅读,请关注我的后续博客

最新文章

  1. GJM : AlloyTouch实战--60行代码搞定QQ看点资料卡
  2. GoLang之方法与接口
  3. Android 坐标系统
  4. DIV+CSS规范命名大全集合
  5. java去除重复的字符串和移除不想要的字符串
  6. Java 如何连接 SQL 2008 R2
  7. MFC TreeCtrl 控件(一):简单使用
  8. http://www.ibm.com/developerworks/cn/opensource/os-cn-cas/
  9. silverlight 生产图表(动态图表类型,Y轴数量) .xaml.cs文件
  10. jdbc连接数据库工具类
  11. ThinkPHP - 配置项目结构
  12. Laravel5中使用阿里大于(鱼)发送短信验证码
  13. Flink-Kafka-Connector Flink结合Kafka实战
  14. 一些Linq方法,come on !!
  15. L1-057 PTA使我精神焕发
  16. Codeforces Round #468 (Div. 2, based on Technocup 2018 Final Round)
  17. EmEditor的一个好用的正则替换功能
  18. JVM堆内存调优
  19. Ejb in action(一)——开篇介绍
  20. BZOJ3509 [CodeChef] COUNTARI 【分块 + fft】

热门文章

  1. 日期选择插件clndr的使用
  2. AWS 之Load Balance篇
  3. LA 3902 Network
  4. Npoi Web 项目中(XSSFWorkbook) 导出出现无法访问已关闭的流的解决方法
  5. SQL语句方法语法总结(二)
  6. Android telnet RPi 2B
  7. 基于jquery框架的ajax搜索显示
  8. 【Sass初级】开始使用Sass和Compass
  9. ipad iphone 开发的应用,加一个启动预览图片
  10. 嵌入式 uboot以及kernel添加看门狗临时记录(个人记录未整理乱)