MapReduce Cross 示例

package com.bsr.cross;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 第一次mr--目的是获取某一人是哪些人的好友
*
*
*/
public class Cross { //输入:A:B,C,D,F,E,O
//输出:B->A C->A D->A F->A E->A O->A
public static class Map extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] value1=value.toString().split(":");
String[] value2=value1[1].split(",");
for (String string : value2) {
context.write(new Text(string), new Text(value1[0]));
}
} }
public static class Reduce extends Reducer<Text, Text, Text, Text>{
// 输入<B->A><B->E><B->F>....
// 输出 B A,E,F,J
@Override
protected void reduce(Text key, Iterable<Text> value,Context context)
throws IOException, InterruptedException {
StringBuffer sb=new StringBuffer();
for (Text text : value) {
sb.append(text+",");
}
context.write(key, new Text(sb.toString()));
} } public static void main(String[] args) throws Exception {
//读取classpath下的所有xxx-site.xml配置文件,并进行解析
Configuration conf=new Configuration();
FileSystem fs = FileSystem.get(configuration);
String s = "/wc/output3";
Path path = new Path(s);
fs.delete(path, true); Job job=Job.getInstance(conf); //通过主类的类加载器机制获取到本job的所有代码所在的jar包
job.setJarByClass(Cross.class); //指定本job使用的mapper类
job.setMapperClass(Map.class); //指定本job使用的reducer类
job.setReducerClass(Reduce.class); //指定mapper输出的kv数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class); //指定reducer输出的kv数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //指定本job要处理的文件所在的路径
FileInputFormat.setInputPaths(job, new Path("/wc/data/"));
FileOutputFormat.setOutputPath(job, new Path("/wc/output3")); //将本job向hadoop集群提交执行
boolean flag=job.waitForCompletion(true);
System.exit(flag?0:1); } }

进行了逻辑的转换;

最新文章

  1. 初学seaJs模块化开发,利用grunt打包,减少http请求
  2. STM32F412应用开发笔记之一:初识NUCLEO-F412ZG
  3. MySQL知识总结
  4. Kali linux渗透测试常用工具汇总2-渗透攻击
  5. C++库(Thrift)
  6. python3+任务计划实现的人人影视网站自动签到
  7. lazyload懒加载的使用
  8. ubuntu12.04samba服务器配置,亲测可用(转)
  9. Error:No marked region found along edge. - Found along top edge.
  10. jQuery 遍历过滤
  11. OC中的一个特性:延展
  12. 微软IE11浏览器的7大变化
  13. sqlserver2008用bat脚本备份时报错因为库名有中横杠【原创】
  14. SQL_SERVER_2008升级SQL_SERVER_2008_R2的方法
  15. 详解EBS接口开发之更新供应商付款方法
  16. sql语言不经常用,复习
  17. 【Python3爬虫】自动查询天气并实现语音播报
  18. django 通过ajax完成登录
  19. 【PMP】变更流程图与说明
  20. Flask内置URL变量转换器

热门文章

  1. laravel学习:php写一个简单的ioc服务管理容器
  2. IIS HTTP 错误 401.3的解决办法
  3. 01Oracle Database
  4. sklearn.metrics.roc_curve
  5. seq2seq(1)- EncoderDecoder架构
  6. 20181228 模拟赛 T3 字符串游戏 strGame 博弈论 字符串
  7. http请求体笔记
  8. Linux学习笔记(一) 文件系统
  9. 微信小程序 传值取值的方法总结
  10. 多.h项目出现的问题:使用了预编译头依然出现error LNK2005:***obj已在***obj中定义与c++ error C2011: “xxx”:“class”类型重定义解决办法