Hadoop_27_MapReduce_运营商原始日志增强(自定义OutputFormat)
1.需求:
现有一些原始日志需要做增强解析处理,流程:
1、 从原始日志文件中读取数据(日志文件:https://pan.baidu.com/s/12hbDvP7jMu9yE-oLZXvM_g)
2、 根据日志中的一个URL字段到外部知识库中获取信息增强到原始日志
3、 如果成功增强,则输出到增强结果目录;如果增强失败,则抽取原始数据中URL字段输出到待爬清单目录
2.需求分析:
程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可
以通过自定义outputformat来实现
3.需求实现:
技术实现要点:
1、 在mapreduce中访问外部资源(知识数据库)
2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
代码实现:
1.数据库获取数据的工具类:
1.首先启动数据库服务(192.168.232.201):service mysql start
2.使用远程客户端连接数据库工具Navicat操作数据库:导入创建url_rule表语句和导入该表数据
3.创建表语句及其数据下载:https://pan.baidu.com/s/1k74-o8wbFp5QC8Ee4Fu1YQ
package cn.bigdata.hdfs.LogEnhance;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Map; public class DBLoader {
public static void dbLoader(Map<String, String> ruleMap) throws Exception { Connection conn = null;
Statement st = null;
ResultSet res = null; try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://192.168.232.201:3306/mysql", "root", "root");
st = conn.createStatement();
res = st.executeQuery("select url,content from url_rule");
while (res.next()) {
ruleMap.put(res.getString(1), res.getString(2));
}
} finally {
try{
if(res!=null){
res.close();
}
if(st!=null){
st.close();
}
if(conn!=null){
conn.close();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
}
2.自定义一个outputformat继承自FileOutputFormat,实现getRecordWriter方法:
package cn.bigdata.hdfs.LogEnhance;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter
* 然后再调用RecordWriter的write(k,v)方法将数据写出
*/
public class LogEnhanceOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
///拿到一个文件系统操作的客户端实例对象
FileSystem fs = FileSystem.get(context.getConfiguration()); Path enhancePath = new Path("F:/temp/en/log.dat");
Path tocrawlPath = new Path("F:/temp/crw/url.dat");
//流式创建文件
FSDataOutputStream enhancedOs = fs.create(enhancePath);
FSDataOutputStream tocrawlOs = fs.create(tocrawlPath); return new EnhanceRecordWriter(enhancedOs, tocrawlOs);
} /**
* 构造一个自己的recordwriter
*/
static class EnhanceRecordWriter extends RecordWriter<Text, NullWritable> { FSDataOutputStream enhancedOs = null;
FSDataOutputStream tocrawlOs = null; public EnhanceRecordWriter(FSDataOutputStream enhancedOs, FSDataOutputStream tocrawlOs) {
super();
this.enhancedOs = enhancedOs;
this.tocrawlOs = tocrawlOs;
}
//实现抽象方法write
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String result = key.toString();
// 如果要写出的数据是待爬的url,则写入待爬清单文件 /logenhance/tocrawl/url.dat
if (result.contains("tocrawl")) {
tocrawlOs.write(result.getBytes());
} else {
// 如果要写出的数据是增强日志,则写入增强日志文件 /logenhance/enhancedlog/log.dat
enhancedOs.write(result.getBytes());
}
}
//实现抽象方法close
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (tocrawlOs != null) {
tocrawlOs.close();
}
if (enhancedOs != null) {
enhancedOs.close();
}
}
}
}
3.开发mapreduce处理流程:
这个程序是对每个小时不断产生的用户上网记录日志进行增强(将日志中的url所指向的网页内容分析结果信息追加到每一行
原始日志后面
maptask在初始化时会先调用setup方法一次 利用这个机制,将外部的知识库加载到maptask执行的机器内存中
package cn.bigdata.hdfs.LogEnhance; import java.io.IOException;
import java.util.HashMap;
import java.util.Map; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LogEnhance { static class LogEnhanceMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> ruleMap = new HashMap<String, String>();
Text k = new Text();
NullWritable v = NullWritable.get(); // 从数据库中加载规则信息倒ruleMap中
@Override
protected void setup(Context context) throws IOException, InterruptedException { try {
DBLoader.dbLoader(ruleMap);
} catch (Exception e) {
e.printStackTrace();
}
} @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称
Counter counter = context.getCounter("malformed", "malformedline");
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
try {
String url = fields[26];
String content_tag = ruleMap.get(url);
// 判断内容标签是否为空,如果为空,则只输出url到待爬清单;如果有值,则输出到增强日志
if (content_tag == null) {
k.set(url + "\t" + "tocrawl" + "\n");
context.write(k, v);
} else {
k.set(line + "\t" + content_tag + "\n");
context.write(k, v);
}
} catch (Exception exception) {
counter.increment(1);
}
}
} public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogEnhance.class);
job.setMapperClass(LogEnhanceMapper.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class); // 要控制不同的内容写往不同的目标路径,可以采用自定义outputformat的方法
job.setOutputFormatClass(LogEnhanceOutputFormat.class);
//本地原始日志文件存放目录
FileInputFormat.setInputPaths(job, new Path("F:/webloginput/"));
// 尽管我们用的是自定义outputformat,但是它是继承制fileoutputformat
// 在fileoutputformat中,必须输出一个_success文件,所以在此还需要设置输出path
FileOutputFormat.setOutputPath(job, new Path("F:/weblogoutput/")); // 不需要reducer
job.setNumReduceTasks(0); job.waitForCompletion(true);
System.exit(0);
}
}
总结:
1.其中在mapreduce程序中用到了计数器;获取一个计数器用来记录不合法的日志行数, 组名, 计数器名称
2.拷贝mysql的驱动包到工程的lib目录下,这里使用本地运行模式;https://pan.baidu.com/s/1ldzQ0i5qdvvJ3Yw08LvF5Q
3.maptask或者reducetask在最终输出时,先调用OutputFormat的getRecordWriter方法拿到一个RecordWriter,然后再调用
RecordWriter的write(k,v)方法将数据写出
4.在setup方法中完成知识库的加载,写入到Map中
5.Map端在Context,write时,如果后面没有Reduce,将没有整个的shuffe过程,将直接调用outPutFormat进行输出,进来什
么顺序,则出去什么顺序(总之:没有reduce就没有shuffle)
最新文章
- Gulp探究折腾之路(I)
- commons configuration管理项目的配置文件
- TIME_WAIT过多
- fcntl()功能 详解
- (转)java:快速文件分割及合并
- c#面向对象小结
- <;Win32_20>;纯c语言版的打飞机游戏出炉了^_^
- 日志收集之kafka
- iOS基础 - Copy
- uiautomator+cucumber实现自动化测试
- 设计模式二: 工厂方法(Factory Method)
- 安装与配置apache WEB服务器(Linux环境)
- 打造SharePoint之在线开发神器SPOnlineDevelopTool(一)——概述
- Hbase获取流程
- linux对4T硬盘进行分区
- Event事件2
- CentOS6.5安装sqlite3
- flask学习视频
- Trusted Cloud Summit(2018.08.14)
- sass和scss相关知识
热门文章
- 使用STM32F103ZET霸道主板实现SD卡的读写(非文件系统)
- 申请 Let&#39;s Encrypt 通配符 HTTPS 证书
- 【VS开发】MFC中调用C函数模块的解决方案
- [Agc029B]Powers of two_贪心_树形dp
- java中讲讲PrintWriter的用法,举例?
- IDEA插件之CheckStyle
- Spring 加载项目外部配置文件
- 数据结构 -- 队列Queue
- S02_CH04_User_IP实验Enter a post title
- 第十一章 ZYNQ-MIZ702 DDR3 PS读写操作方案