package com.libc;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser; public class Process { public static class TokenizerMapper extends
Mapper<Object, Text, Text, Text> {
private Text word = new Text(); public void map(Object key, Text value, Context context)
throws IOException, InterruptedException { // TODO Auto-generated method stub
String datas = "";
try {
datas = new String(value.getBytes(), 0, value.getLength(),
"GBK");
} catch (UnsupportedEncodingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
// datas = value.toString();
try { String[] split = datas.split(" time="); // 处理头中包含空格的字段
Pattern p = Pattern.compile("phonemodel=\"(.*?)\"");
String pm = getIndex(split[0], p);
split[0] = split[0].replaceAll(pm, pm.replace(" ", ""));
Pattern p1 = Pattern.compile("networktype=\"(.*?)\"");
String nt = getIndex(split[0], p1);
split[0] = split[0].replaceAll(nt, nt.replace(" ", ""));
for (int i = 1; i < split.length; i++) {
String[] codes = split[i].split(" ", 4);
int headLen = split[0].split(" ").length;
if (headLen != 20) {
// 丢掉错误日志
continue;
}
// 处理旧版本日志判别标准:|
if (codes[2].equals("code=\"100\"")){
if(codes[3].indexOf("contact_name")>-1){
codes[3] = process100(codes[3]);
}
codes[3] = codes[3].replace(' ', '#'); }else if(codes[2].equals("code=\"101\"") ){
if(codes[3].indexOf("message_to_")>-1){
codes[3] = process101(codes[3]);
}
codes[3] = codes[3].replace(' ', '#');
}
else if(codes[2].equals("code=\"102\"")){
if(codes[3].indexOf("caller_n")>-1||codes[3].indexOf("caller_d")>-1){
codes[3] = process102(codes[3]);
}
codes[3] = codes[3].replace(' ', '#'); }else{
codes[3] = codes[3].replace(" ", " ");
} String collect = split[0] + " time=" + codes[0] + " "
+ codes[1] + " " + codes[2] + " " + codes[3];
word.set(collect); context.write(word, new Text(""));
} } catch (Exception e) {
// TODO Auto-generated catch block
}
}
} public static String process100(String code) throws Exception{
String[] codes = code.split(" ");
HashMap<String, Contact> hs = new HashMap<String, Process.Contact>();
Pattern p0 = Pattern.compile("_(\\d*)=");
Pattern p1 = Pattern.compile("\"(.*)\"");
for (int i = 0; i < codes.length; i++) {
if (codes[i].equals(""))
continue;
String index = getIndex(codes[i], p0);
if (index == null)
continue;
String value = getIndex(codes[i], p1);
Contact contact = null;
if (hs.containsKey(index)) {
contact = hs.get(index);
} else {
contact = new Contact();
}
if (codes[i].startsWith("contact_name_")) {
contact.contactName = value;
} else if (codes[i].startsWith("contact_num_")) {
contact.contactNum = value;
}
contact.index = index;
hs.put(index, contact);
} return printToString(hs);
} public static String process101(String code) throws Exception{
String[] codes = code.split("\" ");
HashMap<String, Message> hs = new HashMap<String, Process.Message>();
Pattern p = Pattern.compile("_(\\d*)=");
Pattern p1 = Pattern.compile("\"(.*)");
for (int i = 0; i < codes.length; i++) {
String index = getIndex(codes[i], p);
String value = getIndex(codes[i], p1);
if (index == null)
continue;
Message message = null;
if (hs.containsKey(index)) {
message = hs.get(index);
} else {
message = new Message();
}
if (codes[i].startsWith("message_time_")) {
message.messageTime = value;
} else if (codes[i].startsWith("message_to_")) {
message.messageTo = value;
}
message.index = index;
hs.put(index, message);
} return printToString(hs);
} public static String process102(String code) throws Exception{
String[] codes = code.split("\" ");
HashMap<String, CallLog> hs = new HashMap<String, Process.CallLog>();
Pattern p = Pattern.compile("_(\\d*)=");
Pattern p1 = Pattern.compile("\"(.*)");
for (int i = 0; i < codes.length; i++) {
String index = getIndex(codes[i], p);
if (index == null)
continue;
String value = getIndex(codes[i], p1);
CallLog callLog = null;
if (hs.containsKey(index)) {
callLog = hs.get(index);
} else {
callLog = new CallLog();
}
if (codes[i].startsWith("caller_date_")) {
callLog.callerDate = value;
} else if (codes[i].startsWith("caller_duration_")) {
callLog.callerDuration = value;
} else if (codes[i].startsWith("caller_name_")) {
callLog.callerName = value;
} else if (codes[i].startsWith("caller_num_")) {
callLog.callerNum = value;
}
callLog.index = index;
hs.put(index, callLog);
} return printToString(hs);
} public static String printToString(Map hs) {
Set set = hs.keySet();
Iterator<String> it = set.iterator();
String result = "";
while (it.hasNext()) {
result = result + hs.get(it.next()).toString() + "|";
}
return result;
} public static String getIndex(String code, Pattern p) {
String index = null; Matcher matcher = p.matcher(code);
if (matcher.find()) {
index = matcher.group(1);
}
return index;
} public static class IntSumReducer extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Text rr, Context context)
throws IOException, InterruptedException {
context.write(key, new Text(""));
}
} public static class Contact { public String index;
public String contactName;
public String contactNum; @Override
public String toString() {
// TODO Auto-generated method stub
return "contact_" + index + "=" + this.contactName + ";"
+ this.contactNum;
}
} public static class Message {
public String index;
public String messageTime;
public String messageTo; @Override
public String toString() {
// TODO Auto-generated method stub
return "message_" + this.index + "=" + this.messageTo + ";"
+ this.messageTime;
}
} public static class CallLog {
public String index;
public String callerDuration;
public String callerNum;
public String callerName;
public String callerDate; @Override
public String toString() {
// TODO Auto-generated method stub
return "callLog_" + this.index + "=" + this.callerName + ";"
+ this.callerNum + ";" + this.callerDate + ";"
+ this.callerDuration;
}
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: process <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "process");
job.setJarByClass(Process.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

  此版本为第一版,运行几天后服务器日志量暴增,导致堆栈溢出错误,

因此修改为第二版后可以对jvm内存自定义配置

方案一:

/opt/aimcpro/mapred/bin/hadoop jar libc_process.jar com.libc.Process -D mapred.child.java.opts=-Xmx2048m hdfs://mycluster/libc/input  hdfs://mycluster/libc/output

方案二:

Configuration cc = job.getConfiguration();
String mem = cc.get("mapred.child.java.opts");
System.out.println(mem);

即在代码中更改设置。

当jvm从1G设为2G后,job顺利通过了

数据一直在增长啊:

20140801 6058177
20140802 7490572
20140803 8114244
20140804 7278280
20140805 7673678
20140806 8213066
20140807 9192677
20140808 9362143
20140809 10989437
20140810 11396093
20140811 10229799
20140812 10346527
20140813 10064709
20140814 11017971
20140815 11634611
20140818 10422815
20140819 12874181
20140820 13478590
20140821 12530974
20140822 11590312
20140823 15705258

最新文章

  1. C#集合类型大盘点
  2. DevExpress TreeList使用心得
  3. android editText 监听事件
  4. JS数组的concat、push等方法,操作的是地址指针,而非内存操作
  5. CoreLocation 定位
  6. 20160815_设置静态IP
  7. 如何在大量jar包中搜索特定字符
  8. PLSQL_解析过程及硬解析和软解析的区别(案例)
  9. 制作Windows的ico图标
  10. MIME(Multipurpose Internet Mail Extensions)多用途互联网邮件扩展类
  11. 如何设置eclipse 右键new的菜单
  12. [置顶]生鲜配送管理系统_升鲜宝V2.0 销售订单汇总_采购任务分配功能_操作说明
  13. jcrop2.X 取消选框
  14. C# 类如何声明索引器以提供对类的类似数组的访问的代码
  15. 为什么想起开通blog?
  16. 【BZOJ4873】[六省联考2017]寿司餐厅(网络流)
  17. 全景3d
  18. C++ 读取文本文件内容到结构体数组中并排序
  19. 深入了解MyBatis二级缓存
  20. GitHub 新手教程 四,Git GUI 新手教程(1),OpenSSH Public Key

热门文章

  1. VS2013启动项目调试的时候会启动本地IIS
  2. Controller中获取输入参数注解使用总结
  3. Design Pattern Iterator 迭代器设计模式
  4. CM_RESOURCE_LIST structure
  5. 《think in python》学习-5
  6. android文件和图片的处理工具类(一)
  7. Sql Service存储过程分页
  8. Ubuntu引导修复问题
  9. [STL源码剖析]RB-tree的插入操作
  10. frame,iframe,frameset用法和区别