作业从JobClient端的submitJobInternal()方法提交作业的同时,调用InputFormat接口的getSplits()方法来创建split。默认是使用InputFormat的子类FileInputFormat来计算分片,而split的默认实现为FileSplit(其父接口为InputSplit)。这里要注意,split只是逻辑上的概念,并不对文件做实际的切分。一个split记录了一个Map Task要处理的文件区间,所以分片要记录其对应的文件偏移量以及长度等。每个split由一个Map Task来处理,所以有多少split,就有多少Map Task。下面着重分析这个方法:

 public List<InputSplit> getSplits(JobContext job
) throws IOException {
//getFormatMinSplitSize():始终返回1
//getMinSplitSize(job):获取” mapred.min.split.size”的值,默认为1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //getMaxSplitSize(job):获取"mapred.max.split.size"的值,
//默认配置文件中并没有这一项,所以其默认值为” Long.MAX_VALUE”,即2^63 – 1
long maxSize = getMaxSplitSize(job); // generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus>files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job.getConfiguration());
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
//计算split大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize); //计算split个数
long bytesRemaining = length; //bytesRemaining表示剩余字节数
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { //SPLIT_SLOP=1.1
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
} if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
} // Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); LOG.debug("Total # of splits: " + splits.size());
return splits;
}

  首先计算分片的下限和上限:minSize和maxSize,具体的过程在注释中已经说清楚了。接下来用这两个值再加上blockSize来计算实际的split大小,过程也很简单,具体代码如下:

 protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}

  接下来就是计算实际的分片个数了。针对每个输入文件,计算input split的个数。while循环的含义如下:

  a)  文件剩余字节数/splitSize>1.1,创建一个split,这个split的字节数=splitSize,文件剩余字节数=文件大小 - splitSize

  b)  文件剩余字节数/splitSize<1.1,剩余的部分全都作为一个split(这主要是考虑到,不用为剩余的很少的字节数一些启动一个Map Task)

  

  我们发现,在默认配置下,split大小和block大小是相同的。这是不是为了防止这种情况:

一个split如果对应的多个block,若这些block大多不在本地,则会降低Map Task的本地性,降低效率。

  到这里split的划分就介绍完了,但是有两个问题需要考虑:

1、如果一个record跨越了两个block该怎么办?

  这个可以看到,在Map Task读取block的时候,每次是读取一行的,如果发现块的开头不是上一个文件的结束,那么抛弃第一条record,因为这个record会被上一个block对应的Map Task来处理。那么,第二个问题来了:

2、上一个block对应的Map Task并没有最后一条完整的record,它又该怎么办?

  一般来说,Map Task在读block的时候都会多读后续的几个block,以处理上面的这种情况。不过这部分的代码我还没有看到,等看到了再补充吧。

  本文基于hadoop1.2.1

  如有错误,还请指正

  参考文章:《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》 董西成

  转载请注明出处:http://www.cnblogs.com/gwgyk/p/4113929.html

最新文章

  1. MongoDB replica set IDs do not match
  2. Mysql 主从热备份
  3. java(搜索不区分大小写)
  4. 【PHP面向对象(OOP)编程入门教程】7.特殊的引用”$this“的使用
  5. Java eclipse下 Ant build.xml实例详解
  6. 关于left join、right join和inner join
  7. mybatis 参数问题
  8. c# json转Dictionary字典
  9. Preventing Web Attacks with Apache
  10. 关于MSHTML
  11. 引用类中的enum
  12. zen cart global $db 这噶哒
  13. 开源视频平台:ViMP
  14. memge和saveOrUpdate的区别
  15. 认识Ajax
  16. VS开发程序用户防范安全问题
  17. js获取、修改url中参数
  18. js异常处理
  19. Java集合set的并、交、差操作
  20. WDS使用捕获映像制作企业自定义映像

热门文章

  1. mysql GROUP_CONCAT+ GROUP BY + substring_index获取分组的前几名
  2. 阅读笔记 火球——UML大战需求分析 2
  3. JAVA程序操作hbase的Maven配置pom.xml文件
  4. iOS 使用Block进行逆传值
  5. 蚁群算法简介(part3: 蚁群算法之更新信息素)
  6. Example For maven-compiler-plugin
  7. ASP.NET MVC Jquery Validate 表单验证的多种方式
  8. cocos2dx解决苹果正版ipv6的问题
  9. css清除浮动float的三种方法总结,为什么清浮动?浮动会有那些影响?一起来$(&#39;.float&#39;)
  10. SQL基础--序列