import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter;
import org.apache.hadoop.mapreduce.security.TokenCache; import com.google.common.base.Charsets; public class MyFileinput extends FileInputFormat<LongWritable, Text> { private static final PathFilter hiddenFileFilter = new PathFilter() {
public boolean accept(Path p) {
String name = p.getName();
return ((!(name.startsWith("_"))) && (!(name.startsWith("."))));
}
}; // 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤)
protected List<FileStatus> listStatus(JobContext job) throws IOException {
System.out.println("*********************");
List result = new ArrayList();
Path[] dirs = getInputPaths(job);
System.out.println("dirs" + dirs);
System.out.println("dirs length = " + dirs.length);
for(Path p: dirs){
System.out.println("Path loop " + p);
} if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
} TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration()); List errors = new ArrayList(); List filters = new ArrayList();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
} // 过滤函数,可以拓展
PathFilter inputFilter = new MultiPathFilter(filters); for (int i = 0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
System.out.println("matches=" + matches);
for(FileStatus match: matches){
System.out.println("loop matches" + match.getPath());
} if (matches == null)
errors.add(new IOException("Input path does not exist: " + p));
else if (matches.length == 0)
errors.add(new IOException("Input Pattern " + p
+ " matches 0 files"));
else {
for (FileStatus globStat : matches) {
System.out.println("globStat " + globStat);
if (globStat.isDirectory())
for (FileStatus stat : fs.listStatus(
globStat.getPath(), inputFilter)) {
result.add(stat);
}
else {
result.add(globStat);
}
}
}
} if (!(errors.isEmpty())) {
throw new InvalidInputException(errors);
}
// LOG.info("Total input paths to process : " + result.size());
return result;
} // 计算分片大小,返回一个分片列表
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job); System.out.print("minSize " + minSize);
System.out.print("maxSize " + maxSize); List splits = new ArrayList();
// 获取输入目录下的文件列表(过滤文件)
List<FileStatus> files = listStatus(job);
for (FileStatus file : files) {
Path path = file.getPath();
long length = file.getLen();
System.out.println("path: " + path+ " file len = " + length);
if (length != 0L) {
// 通过路径找到块列表
FileSystem fs = path.getFileSystem(job.getConfiguration());
BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
0L, length); if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
System.out.println("blockSize:" + blockSize);
long splitSize = computeSplitSize(blockSize, minSize,
maxSize);
System.out.println("splitSize :" + splitSize); long bytesRemaining = length;
System.out.println("bytesRemaining :" + bytesRemaining); System.out.println(bytesRemaining / splitSize);
// 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行
// 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片
while (bytesRemaining / splitSize > 1.1D) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
System.out.println("blkIndex :" + blkIndex); // 添加到分片分片列表
splits.add(makeSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize;
} // 文件尾
if (bytesRemaining != 0L) {
Long remain = length - bytesRemaining;
System.out.println("文件尾大小" + bytesRemaining);
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkIndex].getHosts()));
}
} else {
splits.add(makeSplit(path, 0L, length,
blkLocations[0].getHosts()));
}
} else {
// 测试文件大小为0, 也会启动一个map
splits.add(makeSplit(path, 0L, length, new String[0]));
}
} job.getConfiguration().setLong(
"mapreduce.input.fileinputformat.numinputfiles", files.size());
// LOG.debug("Total # of splits: " + splits.size());
return splits;
} private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters; public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
} public boolean accept(Path path) {
for (PathFilter filter : this.filters) {
if (!(filter.accept(path))) {
return false;
}
}
return true;
}
} // 文件内容读取, 默认按行读取
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter"); System.out.println("delimiter ==" + delimiter);
// 默认为空
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes);
}
}

主要功能是计算分片和按照分片给MAP任务读取内容

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext paramJobContext)
            throws IOException, InterruptedException;

public abstract RecordReader<K, V> createRecordReader(
            InputSplit paramInputSplit,
            TaskAttemptContext paramTaskAttemptContext) throws IOException,
            InterruptedException;
}

从顶层的派生类提供的接口差不多也能看出来。

最简单的Informat实现, 然后我们只要实现RecordReader就可以了

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import com.google.common.base.Charsets; public class MySimpleInformat<V> extends FileInputFormat<LongWritable, V>
{
protected boolean isSplitable(JobContext context, Path filename) {
// 是否需要分片
return false;
} @Override
public RecordReader<LongWritable, V> createRecordReader(
InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter"); System.out.println("delimiter ==" + delimiter);
// 默认为空
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return (RecordReader<LongWritable, V>) new LineRecordReader(recordDelimiterBytes);
}
}

最新文章

  1. 用过SQL语句获取织梦DedeCMS每个栏目各有多少文章
  2. .net平台的RSA实现以及与Delphi之间的互操作性
  3. apicloud+融云实现即时通讯
  4. groovy学习(一)列表
  5. 【原创精品】程序员最强大的利器——电子笔记本的思考(1)(ver0.3)
  6. hdu_2669 Romantic(扩展欧几里得)
  7. 安装php扩展phpredis
  8. IM-iOS退出后台接受消息,app退出后台能接收到推送
  9. NOIP2015 D2T3 洛谷2680 BZOJ4326 运输计划 解题报告
  10. 震惊!外部类可以访问内部类private变量
  11. 微信小程序(一),授权页面搭建
  12. [openjudge-搜索]城堡问题(The Castle)
  13. Maven私服仓库类型
  14. Git和Gitlab
  15. week3-栈和队列
  16. TopJUI Combobox 联动
  17. PHP秒杀系统全方位设计分析(一)
  18. PHP——smarty模板(第二天)
  19. 关于获得MFC窗口其它类指针的方法(csdn)
  20. 《Maven实战》第5章 坐标和依赖

热门文章

  1. mysql 存储过程(代码块)
  2. 从c到cpp对static 关键字的总结 需要整理下!!!!!!!!!!!!!!!!!!!!!!
  3. Maven 的setting.xml
  4. https Android 5.0 以下TLS 版本过低造成的问题
  5. Linux 进程间通信系列之 信号
  6. java替换word表格2007
  7. Choose and divide(唯一分解定理)
  8. MATLAB线性回归方程与非线性回归方程的相关计算
  9. NASM在Ubuntu上的安装与简单使用
  10. stm32 独立看门狗学习