Reduce侧连接
1、reduce side join
在reduce端进行表的连接,该方法的特点就是操作简单,缺点是map端shffule后传递给reduce端的数据量过大,极大的降低了性能
连接方法:
(1)map端读入输入数据,以连接键为Key,待连接的内容为value,但是value需要添加特别的标识,表示的内容为表的表示,即若value来自于表1,则标识位设置为1,若来自表2,则设置为2,然后将map的内容输出到reduce
(2)reduce端接收来自map端shuffle后的结果,即<key, values>内容,然后遍历values,对每一个value进行处理,主要的处理过程是:判断每一个标志位,如果来自1表,则将value放置在特地为1表创建的数组之中,若来自2表,则将value放置在为2表创建的数组中,最后对两个数组进行求笛卡儿积,然后输出结果,即为最终表的连接结果。
2、map side join
在map端进行表的连接,对表的大小有要求,首先有一个表必须足够小,可以读入内存,另外的一个表很大,与reduce端连接比较,map端的连接,不会产生大量数据的传递,而是在map端连接完毕之后就进行输出,效率极大的提高
连接方法:
(1)首先要重写Mapper类下面的setup方法,因为这个方法是先于map方法执行的,将较小的表先读入到一个HashMap中。
(2)重写map函数,一行行读入大表的内容,逐一的与HashMap中的内容进行比较,若Key相同,则对数据进行格式化处理,然后直接输出。
实例与map侧连接一样,思路也与map侧连接一样,输出结果也一样。。。。。
package mapreduce01;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.LineReader;
public class Reduceduan {
static String INPUT_PATH = "hdfs://master:9000/qq/123";
static String OUTPUT_PATH="hdfs://master:9000/output";
static class MyMapper extends Mapper<Object,Object,Text,Text>{
Text output_key = new Text();
Text output_value = new Text();
protected void map(Object key,Object value,Context context) throws IOException,InterruptedException{
String[] tokens = value.toString().split(",");
if(tokens!=null&&tokens.length==2){
output_key.set(tokens[0]);
output_value.set(tokens[1]);
context.write(output_key,output_value);
}
}
}
static class MyReduce extends Reducer<Text,Text,Text,Text> {
Text output_key=new Text();
Text output_value=new Text();
Map<String,String> addMap = new HashMap<String,String>(); //image yingshe
protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException{
URI uri=context.getCacheFiles()[0];
Path path = new Path(uri);
FileSystem fs = path.getFileSystem(context.getConfiguration());
LineReader lineReader = new LineReader(fs.open(path));
Text line=new Text();
while(lineReader.readLine(line)>0){
String tokens[] = line.toString().split(",");
if(tokens!=null && tokens.length==2)
addMap.put(tokens[0], tokens[1]);
}
}
protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
if(values==null)
return
String addrName = addMap.get(values.iterator().next().toString());
output_value.set(addrName);
context.write(key,output_value);
}
}
public static void main(String[] args) throws Exception{
Path outputpath = new Path(OUTPUT_PATH);
Path cacheFile = new Path("hdfs://master:9000/qq/a");
Configuration conf = new Configuration();
FileSystem fs = outputpath.getFileSystem(conf);
if(fs.exists(outputpath)){
fs.delete(outputpath,true);
}
Job job=Job.getInstance(conf);
FileInputFormat.setInputPaths(job,INPUT_PATH);
FileOutputFormat.setOutputPath(job, outputpath);
URI uri =cacheFile.toUri();
job.setCacheFiles(new URI[]{uri}); //set cache address
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
}
}
最新文章
- 记一次jdk升级引起的 Unsupported major.minor version 51.0
- Nodejs学习(三)-安装nodejs supervisor,提高点效率吧。
- jquery bind、delegate、live、on的区别及联系
- C#四则运算之策略模式
- X230 安装win7 sp1
- ZOJ 3367 Counterfeit Money(最大相同子矩阵)
- WINCE6.0+ILI9806E休眠唤醒显示异常问题
- 04_HttpClient发送Https请求
- 批处理文件的@echo off是什么意思?
- redis的管理工具
- Swift - 访问通讯录联系人(使用系统提供的通讯录交互界面)
- Python内置函数(3)——max
- python3 re模块正则匹配字符串中的时间信息
- [POI2015]PUS
- Convolutional Pose Machines(理解)
- jupyter4.4.0自定义目录
- java web----URL
- Chrome 调试技巧
- Centos7安装xenserver tools
- EntityFramework使用总结(与MVC4.0实现CURD操作)
热门文章
- 9.CVE-2016-5195(脏牛)内核提权漏洞分析
- 面试题18(一):在O(1)时间删除链表结点
- 痞子衡嵌入式:恩智浦i.MX RTxxx系列MCU特性介绍(2)- RT685EVKA性能实测(Dhrystone)
- gulp使用文档
- 莫队算法-小Z的袜子
- 【NOIP模拟赛】收银员(一道差分约束好题)
- 洛谷P3147 [USACO16OPEN]262144
- nginx丢弃http包体处理
- 高效法则 之 你还在用这么low的方法打开软件吗?
- vue-cli3.0 脚手架搭建项目