前言

本章主要讲述的是对于hadoop生态系统中,MapReduce写的ChainMapper的学习。MapReduce是hadoop集群数据处理的默认框架。而对于数据集中所有的数据必然有一些不友好的数据,我们需要将其丢弃。我们称之为数据的预处理。所以我们需要将预处理模块与数据处理逻辑分开,以便以后可以复用数据预处理模块。以下是一个mapper的通用模式:

  • 丢弃无用的已损坏的数据
  • 处理有效数据,提取感兴趣的字段
  • 针对这些字段,输出我们感兴趣的数据

准备工作

数据集:ufo-60000条记录,这个数据集有一系列包含下列字段的UFO目击事件记录组成,每条记录的字段都是以tab键分割,文件名为ufo.tsv,这里就不提供下载连接了

  • sighting date:UFO目击事件发生时间
  • Recorded date:报告目击事件的时间
  • Location:目击事件发生的地点
  • Shape:UFO形状
  • Duration:目击事件持续时间
  • Dexcription:目击事件的大致描述

例子:

19950915 19950915 Redmond, WA 6 min. Young man w/ 2 co-workers witness tiny, distinctly white round disc drifting slowly toward NE. Flew in dir. 90 deg. to winds.

ChainMapper介绍

全限定名: org.apache.hadoop.mapred.lib.ChainMapper

作用:顺序的执行多个mapper,并且最后一个mapper的输出会传递给reducer。

ChainMapper的使用

题目:通过使用 ChainMapper 类验证数据集的记录是否有效,即判断每条记录是否都可以划分为6个字符串

  • 上传ufo.tsv到hadoop
hadoop dfs -put ufo.tsv ufo.tsv
  • 编写 UFORecordValidationMapper.java
import java.io.IOException;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*; public class UFORecordValidationMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {
public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {
String line = value.toString();
if(validate(line)) {
output.collect(key, value);
}
} private boolean validate(String str) {
String[] parts = str.split("\t");
if(parts.length != 6) {
return false;
}
return true;
}
}
  • 编写 UFOLocation.java
import java.io.IOException;
import java.util.Iterator;
import java.util.regex.*; import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.*; public class UFOLocation {
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
private final static LongWritable one = new LongWritable(1);
private static Pattern locationPattern = Pattern.compile("[a-zA-Z]{2}[^a-zA-Z]*$"); public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
String[] fields = line.split("\t");
String location = fields[2].trim();
if(location.length() >= 2) {
Matcher matcher = locationPattern.matcher(location);
if(matcher.find()) {
int start = matcher.start();
String state = location.substring(start, start + 2);
output.collect(new Text(state.toUpperCase()), one);
}
}
}
} public static void main(String...args) throws Exception {
Configuration config = new Configuration();
JobConf conf = new JobConf(config, UFOLocation.class);
conf.setJobName("UFOLocation");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class); JobConf mapconf1 = new JobConf(false);
ChainMapper.addMapper(conf, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class, Text.class, true, mapconf1);
JobConf mapconf2 = new JobConf(false);
ChainMapper.addMapper(conf, MapClass.class, LongWritable.class, Text.class, Text.class, LongWritable.class, true, mapconf2);
conf.setMapperClass(ChainMapper.class);
conf.setCombinerClass(LongSumReducer.class);
conf.setReducerClass(LongSumReducer.class); FileInputFormat.setInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
  • 编译上述两个文件
javac UFORecordValidationMapper.java UFOLocation.java
  • 将编译好的文件打包成jar
jar cvf ufo.jar UFO*class
  • 提交打包好的jar包到hadoop上运行
hadoop jar ufo.jar UFOLocation ufo.tsv output
  • 从hadoop上获取结果到本地
hadoop dfs -get output/part-00000 ufo_result.txt
  • 查看结果
more ufo_result.txt

最新文章

  1. 企业IT管理员IE11升级指南【16】—— 使用Compat Inspector快速定位IE兼容性问题
  2. ABP文档 :Overall - Module System
  3. 关于entityframework 自动生成实体类中加验证的属性重新生成后属性被覆盖解决办法
  4. clientTarget与用户代理别名
  5. ansible-安装与使用
  6. Django项目流程(摘抄整理)
  7. Java多线程系列--“JUC集合”02之 CopyOnWriteArrayList
  8. 洛谷U2641 木板面积(area)——S.B.S.
  9. SurfaceOutput
  10. IOS 实现 AAC格式 录音 录音后自动播放
  11. 找不到所需要的ndbm.h头文件
  12. 为什么老师不喜欢RelativeLayout
  13. JS 操作JSON字符串
  14. TensorFlow安装与测试
  15. js获取url参数值的两种方式
  16. 【转】elasticsearch的查询器query与过滤器filter的区别
  17. 谈一谈jQuery核心架构设计(转)
  18. 【并查集缩点+tarjan无向图求桥】Where are you @牛客练习赛32 D
  19. redis学习(七)redis主从复制
  20. binary search tree study

热门文章

  1. 关于Tomcat的浅谈
  2. Learn Python the hard way, ex42 物以类聚
  3. charles_01_打断点修改接口请求&amp;返回数据
  4. 【ABAP系列】SAP ABAP ALV里日期类型的F4帮助
  5. 【ABAP系列】SAP 读取生产订单 记入文档的货物移动明细
  6. Junit 3.8源码分析
  7. 《jmeter:菜鸟入门到进阶系列》
  8. dp(最大分段和)
  9. 小白学Python(11)——pyecharts,绘制饼图 Pie
  10. (四:NIO系列) Java NIO Selector