1、继承FileOutputFormat,复写getRecordWriter方法

/**
* @Description:自定义outputFormat,输出数据到不同的文件
*/
public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> {
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new FRecordWriter(job);
}
}

2、实现RecordWriter

/**
* @Description: 继承RecordWriter,实现数据输出到不同目录文件
*/
public class FRecordWriter extends RecordWriter<Text, NullWritable> {
FSDataOutputStream out1 = null;
FSDataOutputStream out2 = null; @Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 判断是否包含“baidu”和"alibaba"字符串,输出到不同文件
if (key.toString().contains("baidu") || key.toString().contains("alibaba")) {
out1.write(key.toString().getBytes());
} else {
out2.write(key.toString().getBytes());
} } @Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(out1);
IOUtils.closeStream(out2);
} public FRecordWriter(TaskAttemptContext job) {
FileSystem fs;
try {
Path path1 = new Path("output1/a.log");
Path path2 = new Path("output2/b.log");
System.out.println(path1.getName());
System.out.println(path2.getName());
fs = FileSystem.get(job.getConfiguration());
out1 = fs.create(path1);
out2 = fs.create(path2);
}catch (Exception e){
e.printStackTrace();
} }
}

3、map

/**
* @Description: 按行读取,按行写入
*/
public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}

4、reducer

public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
private Text newLine = new Text();
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //循环null值的values是防止key里有重复的数据没有被取出
//Iterable<NullWritable> values迭代器里存储了key和value(虽然本例中value都是null值)
//通过循环迭代器,迭代器里的key值也会被不断取出赋值到Text key中(公用内存地址)
for (NullWritable value : values) {
newLine.set(key.toString()+"\r\n");
context.write(newLine,value);
}
}
}

5、driver

/**
* @Description: 自定义输出
* 实现对样本按行分割,判断是否包含baidu或alibaba字符串,
* 包含则写入目录1,不包含写入目录2,
*/
public class FilterDriver { public static void main(String args[]) throws Exception{
if(args.length!=2)
{
System.err.println("使用格式:FilterDriver <input path> <output path>");
System.exit(-1);
} Configuration conf = new Configuration();
Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class);
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text .class);
job.setMapOutputValueClass(NullWritable .class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
FileOutputFormat.setOutputPath(job, new Path(args[1])); Path outPath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if(fs.exists(outPath)){
fs.delete(outPath,true);
} boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} }
 

最新文章

  1. Java jacob调用打印机打印word文档
  2. kali安装java1.8
  3. About Flash
  4. android ProgressDialog 正在载...Loading...
  5. JS运动从入门到精髓!哈哈
  6. HDU 1255 覆盖的面积 (线段树+扫描线+离散化)
  7. 求一列的和,awk和perl哪个快?
  8. Linux之定时任务
  9. 文本离散表示(三):TF-IDF结合n-gram进行关键词提取和文本相似度分析
  10. 实战ELK(3) Kibana安装与简单实用
  11. linux 下端口close_wait 过多问题
  12. java面试题大全-基础方面 答案自己写
  13. MySQL导出用户权限
  14. linux命令之----sort命令用于将文本文件内容加以排序
  15. 大数据开发实战:Storm流计算开发
  16. vue-progressbar 知识点
  17. 0.3 CMD常用命令!以及用CMD显得自己高大上
  18. 算法笔记_026:折半查找(Java)
  19. display:table合并表格
  20. killer驱动

热门文章

  1. 平时代码中用不到设计模式?Are you kidding me?
  2. 关于mybtis 使用过程中发生There is no getter for property named &#39;id&#39; in class &#39;java.lang.String&#39; 错误
  3. CSS 让div,span等块级、非快级元素排列在同一行
  4. tomcat7控制台日志中文乱码
  5. MySQL 优化 (四)
  6. Octave-CostFunction
  7. This compilation unit is not on the build path of java project (此编译单元不在java项目的生成路径上)
  8. Python 列表生成式 &amp; 字典生成式
  9. 第四章 返回结果的HTTP状态码
  10. HTML5新属性在Google浏览器中不能显示的问题