package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.examples.BinRecordReader; class BinInputFormat extends FileInputFormat<LongWritable, IntWritable> { private static final double SPLIT_SLOP=1.1; /*
* 查询推断当前文件能否够分块?"true"为能够分块,"false"表示不进行分块
protected boolean isSplitable(Configuration conf, Path path) {
return true;
} /*
* MapReduce的client调用此方法得到全部的分块,然后将分块发送给MapReduce服务端。
* 注意,分块中不包括实际的信息,而仅仅是对实际信息的分块信息。详细的说,每一个分块中
* 包括当前分块相应的文件路径,当前分块在该文件里起始位置,当前分块的长度以及相应的
* 实际数据所在的机器列表。在实现这个函数时,将这些信息填上就可以。
* */
public List<InputSplit> getSplits(Configuration conf) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
long minSplitSize = conf.getLong("mapred.min.split.size",1);
long maxSplitSize = conf.getLong("mapred.max.split.size", 1);
long blockSize = conf.getLong("dfs.block.size",1);
long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
FileSystem fs = FileSystem.get(conf);
String path = conf.get(INPUT_DIR);
FileStatus[] files = fs.listStatus(new Path(path)); for (int fileIndex = 0; fileIndex < files.length; fileIndex++) {
FileStatus file = files[fileIndex];
System.out.println("input file: " + file.getPath().toString());
long length = file.getLen();
FileSystem fsin = file.getPath().getFileSystem(conf);
BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(conf, file.getPath())) {
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize,
bytesRemaining -= splitSize;
} if (bytesRemaining != 0) {
splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining,
} else if (length != 0) {
splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(file.getPath(), 0, length, new String[0]));
return splits;
} /*
* 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的參数有两个:一个分块(split)和作业的配置信息(context).
* 在Mapper的run函数中能够看到MapReduce框架运行Map的逻辑:
* public void run(Context context) throws IOException, InterruptedException {
* setup(context);
* 调用RecordReader方法的nextKeyValue,生成新的键值对。假设当前分块(Split)中已经处理完成了,则nextKeyValue会返回false.退出run函数
* while (context.nextKeyValue()) {
* map(context.getCurrentKey(), context.getCurrentValue(), context);
* }
* cleanup(context);
* }
public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
BinRecordReader reader = new BinRecordReader();
return reader;


package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordReader; /**
* Return a single record (filename, "") where the filename is taken from
* the file split.
public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
private FSDataInputStream inputStream = null;
private long start,end,pos;
private Configuration conf = null;
private FileSplit fileSplit = null;
private LongWritable key = new LongWritable();
private IntWritable value = new IntWritable();
private boolean processed = false;
public BinRecordReader() throws IOException {
} /*关闭文件流
* */
public void close() {
try {
if(inputStream != null)
} catch (IOException e) {
// TODO Auto-generated catch block
} /*
* 获取处理进度
public float getProgress() {
return ((processed == true)? 1.0f : 0.0f);
} /*
* 获取当前的Key
* */
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return key;
} /* 获取当前的Value
* */
public IntWritable getCurrentValue() throws IOException,InterruptedException {
// TODO Auto-generated method stub
return value;
} /*
* 进行初始化工作,打开文件流,依据分块信息设置起始位置和长度等等
* */
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
fileSplit = (FileSplit)inputSplit;
conf = context.getConfiguration(); this.start = fileSplit.getStart();
this.end = this.start + fileSplit.getLength(); try{
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
this.inputStream = fs.open(path);
this.pos = this.start;
} catch(IOException e) {
} /*生成下一个键值对
public boolean nextKeyValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
if(this.pos < this.end) {
this.pos = inputStream.getPos();
return true;
} else {
processed = true;
return false;


package org.apache.hadoop.examples;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.examples.BinInputFormat; public class IntCount {
public static class TokenizerMapper extends Mapper<LongWritable, IntWritable, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1);
private Text intNum = new Text(); public void map(LongWritable key, IntWritable value, Context context
) throws IOException, InterruptedException {
context.write(intNum, one);
} public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get(); }
context.write(key, result);
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] newArgs = new String[]{"hdfs://localhost:9000/read","hdfs://localhost:9000/data/wc_output21"};
String[] otherArgs = new GenericOptionsParser(conf, newArgs).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
Job job = new Job(conf, "IntCount");
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);


int main(){
FILE * fp = fopen("tmpfile","wb");
int i,j;
for(i=0;i<10;i++) {
return 0;



