一.准备两张表以及对应的数据

(1)m_ys_lab_jointest_a(以下简称表A)

建表语句:

create table if not exists m_ys_lab_jointest_a (
id bigint,
name string
)
row format delimited
fields terminated by ''
lines terminated by ''
stored as textfile;  

具体数据如下:

id    name
1 北京
2 天津
3 河北
4 山西
5 内蒙古
6 辽宁
7 吉林
8 黑龙江
 
 
 
 
 
 
 
 
 
 
(2)m_ys_lab_jointest_b(以下简称表B)
建表语句为:
create table if not exists m_ys_lab_jointest_b (
id bigint,
statyear bigint,
num bigint
)
row format delimited
fields terminated by ''
lines terminated by ''
stored as textfile;
id   statyear  num
1 2010 1962
1 2011 2019
2 2010 1299
2 2011 1355
4 2010 3574
4 2011 3593
9 2010 2303
9 2011 2347

我们的目的是,以id为key做join操作,得到以下表:m_ys_lab_jointest_ab

id     name    statyear     num
1       北京    2011    2019
1       北京    2010    1962
2       天津    2011    1355
2       天津    2010    1299
4       山西    2011    3593
4       山西    2010    3574

二.计算模型

整个计算过程是:

(1)在map阶段,把所有记录标记成<key, value>的形式,其中key是id,value则根据来源不同取不同的形式:来源于表A的记录,value的值为"a#"+name;来源于表B的记录,value的值为"b#"+score。
(2)在reduce阶段,先把每个key下的value列表拆分为分别来自表A和表B的两部分,分别放入两个向量中。然后遍历两个向量做笛卡尔积,形成一条条最终结果。
 如下图所示:

上代码:

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter; /**
* MapReduce实现Join操作
*/
public class MapRedJoin {
public static final String DELIMITER = "\u0009"; // 字段分隔符 // map过程
public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
public void configure(JobConf job) {
super.configure(job);
} public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException, ClassCastException {
// 获取输入文件的全路径和名称
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
// 获取记录字符串
String line = value.toString();
// 抛弃空记录
if (line == null || line.equals("")){
return;
}
// 处理来自表A的记录
if (filePath.contains("m_ys_lab_jointest_a")) {
String[] values = line.split(DELIMITER); // 按分隔符分割出字段
if (values.length < 2){
return;
}
String id = values[0]; // id
String name = values[1]; // name
output.collect(new Text(id), new Text("a#"+name));
} else if (filePath.contains("m_ys_lab_jointest_b")) {// 处理来自表B的记录
String[] values = line.split(DELIMITER); // 按分隔符分割出字段
if (values.length < 3){
return;
}
String id = values[0]; // id
String statyear = values[1]; // statyear
String num = values[2]; //num
output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
}
}
} // reduce过程
public static class Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
List<String> listA = new ArrayList<String>(); // 存放来自表A的值
List<String> listB = new ArrayList<String>(); // 存放来自表B的值
while (values.hasNext()) {
String value = values.next().toString();
if (value.startsWith("a#")) {
listA.add(value.substring(2));
} else if (value.startsWith("b#")) {
listB.add(value.substring(2));
}
}
int sizeA = listA.size();
int sizeB = listB.size();
// 遍历两个向量
int i, j;
for (i = 0; i < sizeA; i ++) {
for (j = 0; j < sizeB; j ++) {
output.collect(key, new Text(listA.get(i) + DELIMITER +listB.get(j)));
}
}
}
} protected void configJob(JobConf conf) {
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(Text.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setOutputFormat(ReportOutFormat.class);
}
}

三.技术细节

下面说一下其中的若干技术细节:
(1)由于输入数据涉及两张表,我们需要判断当前处理的记录是来自表A还是来自表B。Reporter类getInputSplit()方法可以获取输入数据的路径,具体代码如下:
String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
(2)map的输出的结果,同id的所有记录(不管来自表A还是表B)都在同一个key下保存在同一个列表中,在reduce阶段需要将其拆开,保存为相当于笛卡尔积的m x n条记录。由于事先不知道m、n是多少,这里使用了两个向量(可增长数组)来分别保存来自表A和表B的记录,再用一个两层嵌套循环组织出我们需要的最终结果。
(3)在MapReduce中可以使用System.out.println()方法输出,以方便调试。不过System.out.println()的内容不会在终端显示,而是输出到了stdout和stderr这两个文件中,这两个文件位于logs/userlogs/attempt_xxx目录下。可以通过web端的历史job查看中的“Analyse This Job”来查看stdout和stderr的内容。

最新文章

  1. ORACLE 查看RMAN的备份信息总结
  2. ASP.NET Web API路由规则(二)
  3. POJ2226 Muddy Fields(二分图最小点覆盖集)
  4. python_redis之篇
  5. [MySQL] 两个优化数据库表的简单方法--18.3
  6. 网络I/O 工作机制
  7. VueJs 源码分析 ---(一) 整体对 vuejs 框架的理解
  8. Android Studio Flavors的妙用(转)
  9. SAP PP顾问面试题及资料
  10. Java虚拟机详解----常用JVM配置参数
  11. Inno Setup入门(三)——指定压缩方式
  12. Unity3D-实现连续点击两次返回键退出游戏(安卓/IOS)
  13. .Linode服务器的使用 网站迁移
  14. 【费用流】【网络流24题】【P1251】 餐巾计划问题
  15. BZOJ4372: 烁烁的游戏【动态点分治】
  16. uva 725 DIVISION (暴力枚举)
  17. scala(9) Monad
  18. linux 8 -- 管道组合Shell命令进行系统管理
  19. PHP数据库链接类(PDO+Access)实例分享
  20. centos 安装 python3 分类链接

热门文章

  1. Quartz.Net实现作业定时调度详解
  2. MCtalk对话抱抱星英语:从Diss在线英语教学乱象到回归教育本原
  3. Codeforces Round #564 (Div. 2)B
  4. Node中的cookie的使用
  5. 前端学习【第一篇】: HTML内容
  6. CentOS Linux 重启详解
  7. Python:字典的高级知识
  8. MySQL之基础操作
  9. 那些有实力进入 BAT 的本科生,都做对了什么事?
  10. RDBMS与数据库之间的关系