MapReduce编程练习(三),按要求不同文件名输出结果
2024-08-23 16:56:24
问题:按要求文件名输出结果,比如这里我要求对一个输入文件中的WARN,INFO,ERROR,的信息项进行分析,并分别输入到对应的以WARN,INFO。ERROR和OTHER开头的结果文件中,其中结果文件包含对应的相关信息。
输入文件:
输入文件为hadoop的一些logs日志信息文件,比如:
示例程序:
package com.map.splitFile;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SplitFilesToResult extends Configured{
@SuppressWarnings("deprecation")
public static void main(String[] args) {
String in = "/SplitFilesToResult/input";
String out = "/SplitFilesToResult/output";
Job job;
try {
//删除hdfs目录
SplitFilesToResult wc2 = new SplitFilesToResult();
wc2.removeDir(out);
job = new Job(new Configuration(), "wordcount Job");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(mapperString.class);
job.setReducerClass(reduceStatistics.class);
//定义附加的输出文件
MultipleOutputs.addNamedOutput(job,"INFO",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"ERROR",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"WARN",TextOutputFormat.class,Text.class,Text.class);
MultipleOutputs.addNamedOutput(job,"OTHER",TextOutputFormat.class,Text.class,Text.class);
FileInputFormat.addInputPath(job, new Path(in));
FileOutputFormat.setOutputPath(job, new Path(out));
job.waitForCompletion(true);
FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), new Configuration());
fs.delete(new Path("/SplitFilesToResult/output/part-r-00000"));
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@SuppressWarnings("deprecation")
public void removeDir(String filePath) throws IOException, URISyntaxException{
String url = "hdfs://localhost:9000";
FileSystem fs = FileSystem.get(new URI(url), new Configuration());
fs.delete(new Path(filePath));
}
}
/**
* 重写maptask使用的map方法
* @author nange
*
*/
class mapperString extends Mapper<LongWritable, Text, Text, Text>{
//设置正则表达式的编译表达形式
public static Pattern PATTERN = Pattern.compile(" ");
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = PATTERN.split(value.toString());
System.out.println("********" + value.toString());
if(words.length >= 2){
if(words.length == 2){
context.write(new Text("ERROR"), new Text(value.toString()));
}else if(words[0].equals("at")){
context.write(new Text("ERROR"), new Text(value.toString()));
}else{
context.write(new Text(words[2]), new Text(value.toString()));
}
}else
context.write(new Text("OTHER"), new Text(value.toString()));
}
}
/**
* 对单词做统计
* @author nange
*
*/
class reduceStatistics extends Reducer<Text, Text, Text, Text>{
//将结果输出到多个文件或多个文件夹
private MultipleOutputs<Text,Text> mos;
//创建MultipleOutputs对象
protected void setup(Context context) throws IOException,InterruptedException {
mos = new MultipleOutputs<Text, Text>(context);
}
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for(Text t: values){
//使用MultipleOutputs对象输出数据
if(key.toString().equals("INFO")){
mos.write("INFO", "", t);
}else if(key.toString().equals("ERROR")){
mos.write("ERROR", "", t);
}else if(key.toString().equals("WARN")){
//输出到hadoop/hadoopfile-r-00000文件
mos.write("WARN", "", t, "WARN");
}else{
mos.write("OTHER", "", t);
}
}
}
//关闭MultipleOutputs对象
protected void cleanup(Context context) throws IOException,InterruptedException {
mos.close();
}
}
最新文章
- zzuli-小火山的跳子游戏
- Jquery遍历选中的input标签
- 8.11 CSS知识点4
- IOS开发-UITextField代理常用的方法总结
- C#winform导出数据到Excel的类
- mysql查询语句(mysql学习笔记七)
- yii2.0的gii生成代码bug
- 查找进程对应的PID和对应的端口号
- Android 应用自动更新功能的代码
- 武汉科技大学ACM :1007: A+B for Input-Output Practice (VII)
- 解决C/C++程序执行一闪而过的方法(三种办法)
- linux-cp
- NanUI 0.4.4发布
- APIO 2014
- Python爬虫入门教程 64-100 反爬教科书级别的网站-汽车之家,字体反爬之二
- Storm入门(十一)Twitter Storm源代码分析之CoordinatedBolt
- Python全栈开发-Day8-Socket网络编程
- Cesium添加水面
- MySQL规约(阿里巴巴)
- 使用jq获取文字的宽度