mapreduce多文件输出的两方法
 
package duogemap;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class OldMulOutput {
 
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, NullWritable, Text>{
private MultipleOutputs mos;
private OutputCollector<NullWritable, Text> collector;
 
 
public void Configured(JobConf conf){
mos=new MultipleOutputs(conf);
}
 
public void map(LongWritable key, Text value, OutputCollector<NullWritable, Text> output,Reporter reporter)
throws IOException{
String[] arr=value.toString().split(",", -1);
String chrono=arr[1]+","+arr[2];
String geo=arr[4]+","+arr[5];
collector=mos.getCollector("chrono", reporter);
collector.collect(NullWritable.get(),new Text(chrono));
collector=mos.getCollector("geo", reporter);
collector.collect(NullWritable.get(),new Text(geo));
}
 
public void close() throws IOException{
mos.close();
}
 
 
public static void main(String[] args) throws IOException {
Configuration conf=new Configuration();
String[] remainingArgs=new GenericOptionsParser(conf, args).getRemainingArgs();
 
if (remainingArgs.length !=2) {
System.err.println("Error!");
System.exit(1);
}
 
JobConf job=new JobConf(conf,OldMulOutput.class);
Path in=new Path(remainingArgs[0]);
Path out=new Path(remainingArgs[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
 
job.setJobName("Multifile");
job.setMapperClass(MapClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
 
job.setNumReduceTasks(0);
MultipleOutputs.addNamedOutput(job, "chrono", TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, "geo", TextOutputFormat.class, NullWritable.class, Text.class);
JobClient.runJob(job);
}
 
}
}
 
 
 
package duogemap;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;
 
import duogemap.OldMulOutput.MapClass;
 
public class MulOutput {
 
public static class MapClass extends Mapper<LongWritable, Text, NullWritable, Text>{
 
private MultipleOutputs mos;
 
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.setup(context);
mos=new MultipleOutputs(context);
}
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
mos.write(NullWritable.get(),value,generateFileName(value));
}
private String generateFileName(Text value) {
// TODO Auto-generated method stub
String[] split=value.toString().split(",", -1);
String country=split[4].substring(1, 3);
 
return country+"/";
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.cleanup(context);
mos.close();
}
 
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf, "Muloutput");
String[] remainingArgs=new GenericOptionsParser(conf, args).getRemainingArgs();
 
if (remainingArgs.length !=2) {
System.err.println("Error!");
System.exit(1);
}
 
Path in=new Path(remainingArgs[0]);
Path out=new Path(remainingArgs[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
 
job.setMapperClass(MapClass.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
 
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true)?0:1);
}
}
}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

最新文章

  1. ASP.NET MVC Bundle使用 合并压缩
  2. KnockoutJS 3.X API 第二章 数据监控(1)视图模型与监控
  3. Python对象初探
  4. poj 3440 Coin Toss 概率问题
  5. hdu 4640 Island and study-sister
  6. javascript创建自定义对象和prototype
  7. Sql Server 日期格式化
  8. Spring+Redis的部署与Redis缓存使用示例
  9. python并发编程之IO模型 (四十九)
  10. 【转】Code First 属性详解
  11. linux 管道符与通配符
  12. PYQT实现简单的浏览器功能
  13. selenium WebDriver处理文件下载
  14. plsql 只能识别32位的oracle解决办法
  15. java集合:常用集合的数据结构
  16. 编写第一个Go程序
  17. windowsclient开发--怎样測量一个字符串显示的物理长度
  18. 微信小程序分包加载
  19. POJ 3221 Diamond Puzzle(BFS)
  20. Django2.0里model外键和一对一的on_delete参数

热门文章

  1. HTML URL地址解析
  2. 【原】Github+Hexo+NextT搭建个人博客
  3. AbpZero--1.如何开始
  4. 2016/12/31_Python
  5. 高性能 TCP/UDP/HTTP 通信框架 HP-Socket v4.1.1
  6. RMS去除在线认证
  7. Android 中的mvvm
  8. MySQL ALTER命令
  9. webService
  10. ubuntu进行子域名爆破