继承FileInputFormat类来理解 FileInputFormat类
2024-10-16 12:38:14
import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat.MultiPathFilter;
import org.apache.hadoop.mapreduce.security.TokenCache; import com.google.common.base.Charsets; public class MyFileinput extends FileInputFormat<LongWritable, Text> { private static final PathFilter hiddenFileFilter = new PathFilter() {
public boolean accept(Path p) {
String name = p.getName();
return ((!(name.startsWith("_"))) && (!(name.startsWith("."))));
}
}; // 遍历文件列表, 过滤掉_ .开头的文件(可以自定义过滤)
protected List<FileStatus> listStatus(JobContext job) throws IOException {
System.out.println("*********************");
List result = new ArrayList();
Path[] dirs = getInputPaths(job);
System.out.println("dirs" + dirs);
System.out.println("dirs length = " + dirs.length);
for(Path p: dirs){
System.out.println("Path loop " + p);
} if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
} TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs,
job.getConfiguration()); List errors = new ArrayList(); List filters = new ArrayList();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
} // 过滤函数,可以拓展
PathFilter inputFilter = new MultiPathFilter(filters); for (int i = 0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
System.out.println("matches=" + matches);
for(FileStatus match: matches){
System.out.println("loop matches" + match.getPath());
} if (matches == null)
errors.add(new IOException("Input path does not exist: " + p));
else if (matches.length == 0)
errors.add(new IOException("Input Pattern " + p
+ " matches 0 files"));
else {
for (FileStatus globStat : matches) {
System.out.println("globStat " + globStat);
if (globStat.isDirectory())
for (FileStatus stat : fs.listStatus(
globStat.getPath(), inputFilter)) {
result.add(stat);
}
else {
result.add(globStat);
}
}
}
} if (!(errors.isEmpty())) {
throw new InvalidInputException(errors);
}
// LOG.info("Total input paths to process : " + result.size());
return result;
} // 计算分片大小,返回一个分片列表
public List<InputSplit> getSplits(JobContext job) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job); System.out.print("minSize " + minSize);
System.out.print("maxSize " + maxSize); List splits = new ArrayList();
// 获取输入目录下的文件列表(过滤文件)
List<FileStatus> files = listStatus(job);
for (FileStatus file : files) {
Path path = file.getPath();
long length = file.getLen();
System.out.println("path: " + path+ " file len = " + length);
if (length != 0L) {
// 通过路径找到块列表
FileSystem fs = path.getFileSystem(job.getConfiguration());
BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
0L, length); if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
System.out.println("blockSize:" + blockSize);
long splitSize = computeSplitSize(blockSize, minSize,
maxSize);
System.out.println("splitSize :" + splitSize); long bytesRemaining = length;
System.out.println("bytesRemaining :" + bytesRemaining); System.out.println(bytesRemaining / splitSize);
// 定义为1.1D, 为避免一个分片过小, 也需要启动一个MAP来运行
// 最后剩余的文件大小只要不超过分片大小的1.1倍都会放入一个分片
while (bytesRemaining / splitSize > 1.1D) {
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
System.out.println("blkIndex :" + blkIndex); // 添加到分片分片列表
splits.add(makeSplit(path, length - bytesRemaining,
splitSize, blkLocations[blkIndex].getHosts())); bytesRemaining -= splitSize;
} // 文件尾
if (bytesRemaining != 0L) {
Long remain = length - bytesRemaining;
System.out.println("文件尾大小" + bytesRemaining);
int blkIndex = getBlockIndex(blkLocations, length
- bytesRemaining);
splits.add(makeSplit(path, length - bytesRemaining,
bytesRemaining,
blkLocations[blkIndex].getHosts()));
}
} else {
splits.add(makeSplit(path, 0L, length,
blkLocations[0].getHosts()));
}
} else {
// 测试文件大小为0, 也会启动一个map
splits.add(makeSplit(path, 0L, length, new String[0]));
}
} job.getConfiguration().setLong(
"mapreduce.input.fileinputformat.numinputfiles", files.size());
// LOG.debug("Total # of splits: " + splits.size());
return splits;
} private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters; public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
} public boolean accept(Path path) {
for (PathFilter filter : this.filters) {
if (!(filter.accept(path))) {
return false;
}
}
return true;
}
} // 文件内容读取, 默认按行读取
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter"); System.out.println("delimiter ==" + delimiter);
// 默认为空
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes);
}
}
主要功能是计算分片和按照分片给MAP任务读取内容
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext paramJobContext)
throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(
InputSplit paramInputSplit,
TaskAttemptContext paramTaskAttemptContext) throws IOException,
InterruptedException;
}
从顶层的派生类提供的接口差不多也能看出来。
最简单的Informat实现, 然后我们只要实现RecordReader就可以了
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader; import com.google.common.base.Charsets; public class MySimpleInformat<V> extends FileInputFormat<LongWritable, V>
{
protected boolean isSplitable(JobContext context, Path filename) {
// 是否需要分片
return false;
} @Override
public RecordReader<LongWritable, V> createRecordReader(
InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter"); System.out.println("delimiter ==" + delimiter);
// 默认为空
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return (RecordReader<LongWritable, V>) new LineRecordReader(recordDelimiterBytes);
}
}
最新文章
- 用过SQL语句获取织梦DedeCMS每个栏目各有多少文章
- .net平台的RSA实现以及与Delphi之间的互操作性
- apicloud+融云实现即时通讯
- groovy学习(一)列表
- 【原创精品】程序员最强大的利器——电子笔记本的思考(1)(ver0.3)
- hdu_2669 Romantic(扩展欧几里得)
- 安装php扩展phpredis
- IM-iOS退出后台接受消息,app退出后台能接收到推送
- NOIP2015 D2T3 洛谷2680 BZOJ4326 运输计划 解题报告
- 震惊!外部类可以访问内部类private变量
- 微信小程序(一),授权页面搭建
- [openjudge-搜索]城堡问题(The Castle)
- Maven私服仓库类型
- Git和Gitlab
- week3-栈和队列
- TopJUI Combobox 联动
- PHP秒杀系统全方位设计分析(一)
- PHP——smarty模板(第二天)
- 关于获得MFC窗口其它类指针的方法(csdn)
- 《Maven实战》第5章 坐标和依赖