问题:按要求文件名输出结果,比如这里我要求对一个输入文件中的WARN,INFO,ERROR,的信息项进行分析,并分别输入到对应的以WARN,INFO。ERROR和OTHER开头的结果文件中,其中结果文件包含对应的相关信息。

输入文件:

    输入文件为hadoop的一些logs日志信息文件,比如:

示例程序:

package com.map.splitFile;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class SplitFilesToResult extends Configured{ @SuppressWarnings("deprecation")
public static void main(String[] args) {
String in = "/SplitFilesToResult/input";
String out = "/SplitFilesToResult/output"; Job job;
try {
//删除hdfs目录
SplitFilesToResult wc2 = new SplitFilesToResult();
wc2.removeDir(out); job = new Job(new Configuration(), "wordcount Job");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(mapperString.class);
job.setReducerClass(reduceStatistics.class); //定义附加的输出文件
MultipleOutputs.addNamedOutput(job,"INFO",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"ERROR",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"WARN",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"OTHER",TextOutputFormat.class,Text.class,Text.class); FileInputFormat.addInputPath(job, new Path(in));
FileOutputFormat.setOutputPath(job, new Path(out));
job.waitForCompletion(true); FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), new Configuration());
fs.delete(new Path("/SplitFilesToResult/output/part-r-00000")); } catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
} @SuppressWarnings("deprecation")
public void removeDir(String filePath) throws IOException, URISyntaxException{
String url = "hdfs://localhost:9000";
FileSystem fs = FileSystem.get(new URI(url), new Configuration());
fs.delete(new Path(filePath));
}
} /**
* 重写maptask使用的map方法
* @author nange
*
*/
class mapperString extends Mapper<LongWritable, Text, Text, Text>{
//设置正则表达式的编译表达形式
public static Pattern PATTERN = Pattern.compile(" ");
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { String[] words = PATTERN.split(value.toString());
System.out.println("********" + value.toString());
if(words.length >= 2){
if(words.length == 2){
context.write(new Text("ERROR"), new Text(value.toString()));
}else if(words[0].equals("at")){
context.write(new Text("ERROR"), new Text(value.toString()));
}else{
context.write(new Text(words[2]), new Text(value.toString()));
}
}else
context.write(new Text("OTHER"), new Text(value.toString())); }
} /**
* 对单词做统计
* @author nange
*
*/
class reduceStatistics extends Reducer<Text, Text, Text, Text>{ //将结果输出到多个文件或多个文件夹
private MultipleOutputs<Text,Text> mos;
//创建MultipleOutputs对象
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<Text, Text>(context);
} @Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for(Text t: values){
//使用MultipleOutputs对象输出数据
if(key.toString().equals("INFO")){
mos.write("INFO", "", t);
}else if(key.toString().equals("ERROR")){
mos.write("ERROR", "", t);
}else if(key.toString().equals("WARN")){
//输出到hadoop/hadoopfile-r-00000文件
mos.write("WARN", "", t, "WARN");
}else{
mos.write("OTHER", "", t);
}
} } //关闭MultipleOutputs对象
protected void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
}

最新文章

  1. zzuli-小火山的跳子游戏
  2. Jquery遍历选中的input标签
  3. 8.11 CSS知识点4
  4. IOS开发-UITextField代理常用的方法总结
  5. C#winform导出数据到Excel的类
  6. mysql查询语句(mysql学习笔记七)
  7. yii2.0的gii生成代码bug
  8. 查找进程对应的PID和对应的端口号
  9. Android 应用自动更新功能的代码
  10. 武汉科技大学ACM :1007: A+B for Input-Output Practice (VII)
  11. 解决C/C++程序执行一闪而过的方法(三种办法)
  12. linux-cp
  13. NanUI 0.4.4发布
  14. APIO 2014
  15. Python爬虫入门教程 64-100 反爬教科书级别的网站-汽车之家,字体反爬之二
  16. Storm入门(十一)Twitter Storm源代码分析之CoordinatedBolt
  17. Python全栈开发-Day8-Socket网络编程
  18. Cesium添加水面
  19. MySQL规约(阿里巴巴)
  20. 使用jq获取文字的宽度

热门文章

  1. eclipse的相关操作和使用快捷键
  2. C# 9 新特性 —— 补充篇
  3. 记一次Hadoop安装部署过程
  4. volatile 关键字精讲
  5. pthread 读写锁
  6. tf.argmax(vector,axis)函数的使用
  7. 【Java基础】基本语法-程序流程控制
  8. 剑指offer 面试题6:从尾到头打印链表
  9. LRU缓存的实现
  10. Java内存模型与线程(一)