MR案例:链式ChainMapper
2024-10-18 18:58:59
类似于Linux管道重定向机制,前一个Map的输出直接作为下一个Map的输入,形成一个流水线。设想这样一个场景:在Map阶段,数据经过mapper01和mapper02处理;在Reduce阶段,数据经过sort和shuffle后,交给对应的reducer处理。reducer处理后并没有直接写入到Hdfs, 而是交给了另一个mapper03处理,它产生的最终结果写到hdfs输出目录中。
注意:对任意MR作业,Map和Reduce阶段可以有无限个Mapper,但reduer只能有一个。
package chain; import java.io.IOException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Chain { /**
* 手机 5000 * 需求:
* 电脑 2000 * 在第一个Mapper1里面过滤大于10000的数据
* 衣服 300 * 第二个Mapper2里面过滤掉大于100-10000的数据
* 鞋子 1200 * Reduce里面进行分类汇总并输出
* 裙子 434 * Reduce后的Mapper3里过滤掉商品名长度大于3的数据
* 手套 12 *
* 图书 12510 *
* 小商品 5 * 结果:
* 小商品 3 * 手套 12
* 订餐 2 * 订餐 2
*/ public static void main(String[] args) throws Exception {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(Chain.class); /**
* 配置mapper1
* 注意此处带参数的构造函数:new Configuration(false)
*/
Configuration map1Conf = new Configuration(false);
ChainMapper.addMapper(job, //主作业
Mapper1.class, //待加入的map class
LongWritable.class, //待加入map class的输入key类型
Text.class, //待加入map class的输入value类型
Text.class, //待加入map class的输出key类型
VLongWritable.class, //待加入map class的输出value类型
map1Conf); //待加入map class的配置信息 //配置mapper2
ChainMapper.addMapper(job, Mapper2.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false)); /**
* 配置Reducer
* 注意此处使用的是setReducer()方法
*/
ChainReducer.setReducer(job, Reducer_Only.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false)); //配置mapper3
ChainReducer.addMapper(job, Mapper3.class, Text.class, VLongWritable.class, Text.class, VLongWritable.class, new Configuration(false)); FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);
} //Mapper1
public static class Mapper1 extends Mapper<LongWritable, Text, Text, VLongWritable>{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { /**
* Hadoop中默认的输入格式 TextOutputFormat 只支持UTF-8格式
* 所以解决GBK中文输出乱码问题的方法是:
* 1. 先将输入的Text类型的value转换为字节数组
* 2. 然后使用String的构造器String(byte[] bytes, int offset, int length, Charset charset)
* 3. 通过使用指定的charset解码指定的byte子数组,构造一个新的String
*/
String line=new String(value.getBytes(),0,value.getLength(),"GBK");
String[] splited = line.split(" "); //过滤大于10000的数据
if(Integer.parseInt(splited[1])<10000L){
context.write(new Text(splited[0]), new VLongWritable(Long.parseLong(splited[1])));
}
}
} //Mapper2
public static class Mapper2 extends Mapper<Text, VLongWritable, Text, VLongWritable>{
@Override
protected void map(Text key, VLongWritable value, Context context)
throws IOException, InterruptedException { //过滤100-10000间的数据
if(value.get()<100L){
context.write(key, value);
}
}
} //Reducer
public static class Reducer_Only extends Reducer<Text, VLongWritable, Text, VLongWritable>{
@Override
protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context)
throws IOException, InterruptedException { long sumLong=0L; for(VLongWritable vLongWritable : v2s){
sumLong += vLongWritable.get(); context.write(key, new VLongWritable(sumLong));
}
}
} //Mapper3
public static class Mapper3 extends Mapper<Text, VLongWritable, Text, VLongWritable>{
@Override
protected void map(Text key, VLongWritable value, Context context)
throws IOException, InterruptedException { String line=new String(key.getBytes(),0,key.getLength(),"GBK"); //过滤商品名称长度大于3
if(line.length()<3){
context.write(key, value);
}
}
}
}
最新文章
- sql 注入问题
- 使用github page 页面建博客中遇到的几个小问题
- 基于腾讯手Q样式规范Frozen UI
- Linux系统Vsftp 传文件出现 553 Could Not Create File错误的解决方法
- CLLocationManagerDelegate不调用didUpdateLocations (地图)
- iOS中偏好设置的创建,数据写入与读取
- vs查看虚函数表和类内存布局
- Hibernate的查询语言之HQL(一)——快速入门
- 配置android开发环境eclipse获取ADT获取不到
- C语言初学 比较三个数中最大值的问题
- luci 随笔
- Python爬虫从入门到放弃(十八)之 Scrapy爬取所有知乎用户信息(上)
- PHP初入--添加内容到框框里并删除
- zoj1151 zoj1295 Word Reversal 字符串的简单处理
- 从0打卡leetcode之day 5 ---两个排序数组的中位数
- MongoDB的导入与导出
- Image Base64 Datasnap Image delphi与c#互相兼容识别
- Altium 拼板方法以及 注意的 地方
- Codeforces Round #131 Div1 B
- EventHandler中如何提升用户权限(模拟管理员权限)
热门文章
- 170419、Centos7下完美安装并配置mysql5.6
- 170310、Jenkins部署Maven多环境项目(dev、beta、prod)的参数设置
- Spring Data 增删改查事务的使用(七)
- python - 用户交互/数据类型/格式化输出/运算符/流程控制单双多分支
- python中读取json文件报错,TypeError:the Json object must be str, bytes or bytearray,not ‘TextIOWrapper’
- shell_02
- nodejs 异步编程 教程(推荐)
- 利用ssh的私钥登录Linux server
- 更改vim高亮括号匹配颜色
- 正确使用goto语句