在大数据MapReduce作业开发中,我们经常会遇到一些大小表的join,这是如果这个小表足够“小”的话,我们可以使用进行“map-join-side”,这要就可以有效的降低reduce端的压力,但是在常用的JDK的集合中的Map有些许鸡肋,因此,各路大神们针对这个问题开发出了不同的集合框架,用以替换原始集合,下面我们具体介绍几种常用的集合框架:
首先,我们设想了一个场景——计算不同事业部015、2016年老客,新客-转化,新客-新增的用户数量,这三种类型的用户的定义如下:
老客:前一年和当前年均购买过服百事业部商品
新客-转化:前一年购买过图书,当前年购买了服百事业部商品
新客-新增:前一年什么也没买,当前年购买了服百事业部商品
因此,根据上述定义,举例:2016年老客就是根据cust_id(用户ID)在服百分类(fubaiArrayList )和服百总和(fubaiAllArrayList )两个集合查看2016年和2015年均存在的用户。2016年新客-转化就是根据cust_id(用户ID)在图书(bookArrayList )存在2015年购买记录,在服百分类(fubaiArrayList )和服百总和(fubaiAllArrayList )两个集合查看2016年存在的用户。2016年新客-新增就是根据cust_id(用户ID)在所有用户(allArrayList )不存在2015年购买记录,但在服百分类(fubaiArrayList )和服百总和(fubaiAllArrayList )两个集合查看2016年存在的用户。
因此,根据上述解释,我们构造了原始实现代码为:

public static class Map extends Mapper<LongWritable, Text, Text, Text> {

public static ArrayList<String> bookArrayList = null;
public static ArrayList<String> fubaiAllArrayList = null;
public static ArrayList<String> fubaiArrayList = null;
public static ArrayList<String> allArrayList = null; @Override
protected void setup(Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
bookArrayList = new ArrayList<String>();
Configuration configuration = context.getConfiguration();
FileSystem fs = FileSystem.get(configuration);
InputStream in = null;
BufferedReader reader = null;
String tempString = null;
Path book_path = new Path("/personal/zhoujie/recommend/book.csv");//14 15年全年购买过书的用户名单
if (fs.exists(book_path)) {
in = fs.open(book_path);
reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
while ((tempString = reader.readLine()) != null) {
//年份 cust_id 图书事业部
String parts[] = tempString.split(TAB, -1);
if(parts.length!=3)continue;
bookArrayList.add(parts[0]+TAB+parts[1]);
}
}
fubaiAllArrayList = new ArrayList<String>();
Path fubai_all_path = new Path("/personal/zhoujie/recommend/fubaiall.csv");//14 15年全年购买过服百的全部用户名单
if (fs.exists(fubai_all_path)) {
in = fs.open(fubai_all_path);
reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
while ((tempString = reader.readLine()) != null) {
//年份 cust_id 服百事业部总和
String parts[] = tempString.split(TAB, -1);
if(parts.length!=3)continue;
fubaiAllArrayList.add(parts[0]+TAB+parts[1]);
}
}
fubaiArrayList = new ArrayList<String>();
Path fubai_path = new Path("/personal/zhoujie/recommend/fubaiall.csv");//14 15年全年购买过各服百事业部的全部用户名单
if (fs.exists(fubai_path)) {
in = fs.open(fubai_path);
reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
while ((tempString = reader.readLine()) != null) {
//年份 cust_id 各服百事业部
String parts[] = tempString.split(TAB, -1);
if(parts.length!=3)continue;
fubaiArrayList.add(parts[0]+TAB+parts[1]);
}
}
allArrayList = new ArrayList<String>();
Path all_path = new Path("/personal/zhoujie/recommend/all_order.csv");//14 15年全年下单用户
if (fs.exists(all_path)) {
in = fs.open(all_path);
reader = new BufferedReader(new InputStreamReader(in, "utf-8"));
while ((tempString = reader.readLine()) != null) {
//年份 cust_id 事业部
String parts[] = tempString.split(TAB, -1);
if(parts.length!=3)continue;
allArrayList.add(parts[0]+TAB+parts[1]);
}
}
} @Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
InputSplit inputSplit = context.getInputSplit();
String fileName = ((FileSplit) inputSplit).getPath().toString(); if(fileName.contains("/personal/zhoujie/recommend/orderdetail/")){
//date+TAB+app_id+TAB+permanentid+TAB+toProductid "APP全站" "服百事业部" order_id 单价 个数 cust_id
String[] splited = value.toString().split(TAB, -1);
if(splited.length!=10)return;
String year = splited[0].substring(0, 4);
String cust_id = splited[9];
String department = splited[5];
if("2015".equals(year)){
if("服百事业部总和".equals(department)){//全部服百事业部
if (fubaiAllArrayList.contains("2014"+TAB+cust_id)) {//说明14年在服百事业部买过,作为老用户
context.write(new Text("2015"+TAB+"服百事业部总和"+TAB+"老用户"), new Text(cust_id));
}else if(bookArrayList.contains("2014"+TAB+cust_id)){//说明14年在图书事业部买过,作为新用户-转化用户
context.write(new Text("2015"+TAB+"服百事业部总和"+TAB+"新用户-转化用户"), new Text(cust_id));
}else if(!allArrayList.contains("2014"+TAB+cust_id)){//说明在14年没有买过任何东西
context.write(new Text("2015"+TAB+"服百事业部总和"+TAB+"新用户-新增用户"), new Text(cust_id));
}
}else {//各服百事业部
if (fubaiArrayList.contains("2014"+TAB+cust_id)) {//说明14年在子服百事业部买过,作为老用户
context.write(new Text("2015"+TAB+department+TAB+"老用户"), new Text(cust_id));
}else if(bookArrayList.contains("2014"+TAB+cust_id)){//说明14年在图书事业部买过,作为新用户-转化用户
context.write(new Text("2015"+TAB+department+TAB+"新用户-转化用户"), new Text(cust_id));
}else if(!allArrayList.contains("2014"+TAB+cust_id)){//说明在14年没有买过任何东西
context.write(new Text("2015"+TAB+department+TAB+"新用户-新增用户"), new Text(cust_id));
}
}
}else if ("2016".equals(year)) {
if("服百事业部总和".equals(department)){//全部服百事业部
if (fubaiAllArrayList.contains("2015"+TAB+cust_id)) {//说明15年在服百事业部买过,作为老用户
context.write(new Text("2016"+TAB+"服百事业部总和"+TAB+"老用户"), new Text(cust_id));
}else if(bookArrayList.contains("2015"+TAB+cust_id)){//说明15年在图书事业部买过,作为新用户-转化用户
context.write(new Text("2016"+TAB+"服百事业部总和"+TAB+"新用户-转化用户"), new Text(cust_id));
}else if(!allArrayList.contains("2015"+TAB+cust_id)){//说明在15年没有买过任何东西
context.write(new Text("2016"+TAB+"服百事业部总和"+TAB+"新用户-新增用户"), new Text(cust_id));
}
}else {//各服百事业部
if (fubaiArrayList.contains("2015"+TAB+cust_id)) {//说明15年在子服百事业部买过,作为老用户
context.write(new Text("2016"+TAB+department+TAB+"老用户"), new Text(cust_id));
}else if(bookArrayList.contains("2015"+TAB+cust_id)){//说明15年在图书事业部买过,作为新用户-转化用户
context.write(new Text("2016"+TAB+department+TAB+"新用户-转化用户"), new Text(cust_id));
}else if(!allArrayList.contains("2015"+TAB+cust_id)){//说明在15年没有买过任何东西
context.write(new Text("2016"+TAB+department+TAB+"新用户-新增用户"), new Text(cust_id));
}
}
}
}
}
}

一、JDK集合类
不用说,这个不是我们今天介绍的重点。正是由于原始集合的效率低下才有了这篇文章的存在。即上述代码就是JDK集合类的实现代码,经过多次测试,作业消耗时间大概在三个小时作业。
二、FastUtil集合框架
经过测试,FastUtil的集合类替换原始集合的时候,用时两小时:
bookArrayList = new ObjectBigArrayBigList<String>()
三、HPPC集合框架
经过测试,FastUtil的集合类替换原始集合的时候,用时三分钟:
bookArrayList = new ObjectHashSet<String>()
好快!
经过这三个集合类的测试,发现HPPC集合框架的查询效率是最高的。

最新文章

  1. UWP中新加的数据绑定方式x:Bind分析总结
  2. chose.jquery 联动
  3. 转载《 LayoutInflater 的inflate函数用法详解》
  4. C++访问托管类(C#类库)
  5. asp.net对cookie的操作
  6. Laxcus大数据管理系统2.0(5)- 第三章 数据存取
  7. 课务IOS概述_1
  8. http://www.myexception.cn/program/767123.html
  9. javascript表格的添加和删除
  10. 如何配置SSH Keys登录
  11. correlated subquery and non-correlated subquery
  12. docpad建站记录
  13. Spring总结_02_Spring概述
  14. C++中printf和scanf的用法
  15. Java并发编程实战 之 线程安全性
  16. window.location的方法属性详解
  17. Docker网络(五)--技术流ken
  18. java.util.ConcurrentModificationException异常原因及解决方法
  19. centos7编译安装Python3所需要的库(模块)依赖
  20. Linux内核及分析 第七周 可执行程序的装载

热门文章

  1. jquery为表格行添加上下移动画效果
  2. [转载]使用VS2015搭建Lua开发环境
  3. python 模块之-re
  4. BZOJ1283 序列(费用流)
  5. MT【228】整数解的个数
  6. 【ARC102E】Stop. Otherwise...(容斥原理,动态规划)
  7. 【BZOJ1879】[SDOI2009]Bill的挑战(动态规划)
  8. luogu3645 [Apio2015]雅加达的摩天大楼 (分块+dijkstra)
  9. 2018 ACM 网络选拔赛 焦作赛区
  10. LVS+Keepalived实现MySQL从库读操作负载均衡配置