/** Splits files returned by {@link #listStatus(JobConf)} when
* they're too big.*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
//计时器,
StopWatch sw = new StopWatch().start();
//
FileStatus[] files = listStatus(job); // Save the number of input files for metrics/loadgen
//设置配置中文件个数mapreduce.input.fileinputformat.numinputfiles
job.setLong(NUM_INPUT_FILES, files.length);
// 计算所有文件的大小总和
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
// 每个split目标大小,用总的文件大小 / (max(设置的split个数,1)),
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
// 每个split大小的最小值,读取mapreduce.input.fileinputformat.split.minsize配置,如果没有配置的话那么
// 取minSplitSize =1
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); // 生成 splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
//遍历文件列表
for (FileStatus file: files) {
//获取一个文件路径
Path path = file.getPath();
//获取文件大小
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
//判断file是否包含file的location,也就是,是否包含BlockLocation等信息,
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
//去构造BlockLocation信息
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
//判断文件是否可以切分
if (isSplitable(fs, path)) {
//获取文件的BlockSize大小
long blockSize = file.getBlockSize();
//splitSize最终由 goalSize(设置的每个split大小的目标值),minSize(设置的每个split大小的最小值),blockSize(file的block数量)三个值所决定,逻辑关系如下:
// Math.max(minSize, Math.min(goalSize, blockSize))
// Math.max(minSize, Math.min((totalSize / (numSplits == 0 ? 1 : numSplits)), blockSize))
// numSplits这个设置,只有在totalSize/numSplits < blockSize才会生效
// minSize 只有在大于blockSize的时候才会生效
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
//文件为读取长度
long bytesRemaining = length;
//如果剩余的大小/split的大小大雨1.1,那么就商城生成一个split
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts[0], splitHosts[1]));
bytesRemaining -= splitSize;
}
//剩余的一点点数据也要生成一个split,
if (bytesRemaining != 0) {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts[0], splitHosts[1]));
}
} else {
String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
}
return splits.toArray(new FileSplit[splits.size()]);
}

最新文章

  1. 第四天--html简易布局
  2. GJM :异步Socket [转载]
  3. 渗透编码转码转换工具:CodeFuns
  4. js 与ios 交互的三种方法
  5. centos chrome
  6. Brush、Color、String相互转换
  7. [Windows编程] 开发DLL必读《Best Practices for Creating DLLs》
  8. 一天一个类--ArrayList之一
  9. 深入理解C指针之三:指针和函数
  10. 一个IT人士的个人经历,给迷失方向的朋友(转)
  11. python注意事项
  12. 2017-5-18 Repeater 重复器的使用
  13. echarts常用方法,item小坑(二)
  14. [Hadoop] 启动HDFS缺少服务
  15. DApp demo之pet-shop
  16. Hadoop 系列文章(三) 配置部署启动YARN及在YARN上运行MapReduce程序
  17. SKINNY加密算法详解(无代码,仅加密)
  18. EFI环境下的Ubuntu&amp;Win10双系统安装
  19. 文本分类需要CNN?No!fastText完美解决你的需求(后篇)
  20. C#程序证书创建工具 (Makecert.exe)

热门文章

  1. node搭建文件服务器
  2. 排序(bzoj 4552)
  3. hive的体系架构及安装
  4. 如何优雅地使用minicom
  5. Linux内核同步机制之(四):spin lock【转】
  6. 阿里云OSS Web端直传 服务器签名C#版
  7. 热安装NGINX并支持多站点SSL
  8. 解决:centos7.3 tomcat7启动巨慢问题
  9. [ Mongodb ] 问题总汇
  10. Java 原子性引用 AtomicReference