文件如下:

file1:

Beijing Red Star
Shenzhen Thunder
Guangzhou Honda
Beijing Rising
Guangzhou Development Bank
Tencent
Back of Beijing

file2:

    Beijing
Guangzhou
Shenzhen
Xian

代码如下(由于水平有限,不保证完全正确,如果发现错误欢迎指正):

package com;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Test {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration config = new Configuration();
config.set("fs.defaultFS", "hdfs://192.168.0.100:9000");
config.set("yarn.resourcemanager.hostname", "192.168.0.100"); FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJarByClass(Test.class); //设置所用到的map类
job.setMapperClass(myMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class); //设置用到的reducer类
job.setReducerClass(myReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); //设置s输入输出地址
FileInputFormat.addInputPath(job, new Path("/FactoryName/")); Path path = new Path("/output2/"); if(fs.exists(path)){
fs.delete(path, true);
} //指定文件的输出地址
FileOutputFormat.setOutputPath(job, path); //启动处理任务job
boolean completion = job.waitForCompletion(true);
if(completion){
System.out.println("Job Success!");
}
} public static class myMapper extends Mapper<Object, Text, Text, Text> { // 实现map函数
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String temp=new String();// 左右表标识
String values=value.toString();
String words[]=values.split("\t"); String mapkey = new String();
String mapvalue = new String(); //右表:1 Beijing
if (words[].charAt() >= '' && words[].charAt() <= '') {
mapkey = words[];
mapvalue =words[];
temp = ""; }else{
//左表:Beijing Red Star 1
mapkey = words[];
mapvalue =words[];
temp = "";
} // 输出左右表
//左表:(1,1+Beijing Red Star)
//右表:(1,2+Beijing)
context.write(new Text(mapkey), new Text(temp + "+"+ mapvalue));
}
} //reduce解析map输出,将value中数据按照左右表分别保存
public static class myReducer extends Reducer<Text, Text, Text, Text> {
// 实现reduce函数
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { List<String> factory = new ArrayList<String>();
List<String> address = new ArrayList<String>(); for (Text value : values) {
// 取得左右表标识
char temp=(char) value.charAt();
String words[]=value.toString().split("[+]");//1,Beijing Red Star if(temp==''){
factory.add(words[]);// 左表
} if(temp==''){
address.add(words[]);// 右表
}
} //求出笛卡尔积,并输出
for (String f : factory) {
for (String a : address) {
context.write(new Text(f), new Text(a));
}
}
}
}
}

输出结果如下:

如果您认为这篇文章还不错或者有所收获,您可以通过右边的“打赏”功能 打赏我一杯咖啡【物质支持】,也可以点击下方的【好文要顶】按钮【精神支持】,因为这两种支持都是使我继续写作、分享的最大动力!

最新文章

  1. HTML5本地存储——Web SQL Database
  2. jquery——滚动条插件jscroll.js
  3. nmap脚本扫描使用总结
  4. OpenJDK和JDK区别(转)
  5. .htaccess 文件的使用
  6. (Android Studio)添加按钮以及权重问题
  7. as3+java+mysql(mybatis) 数据自动工具(一)
  8. DOCTYPE与浏览器模式详解(标准模式&amp;混杂模式)
  9. Linq to XML 读取XML 备忘笔记
  10. Android Studio错误
  11. python 学习资料
  12. PHP错误与异常
  13. hdu 1542 线段树扫描(面积)
  14. MOOS学习笔记4——独立线程不同回调
  15. Scala的类层级讲解
  16. XXE攻防
  17. xshell中出现的绿色背景的文件夹
  18. lua --- unpack
  19. vue 非父子组件传值
  20. $Mayan$游戏

热门文章

  1. SmartStoreNet解图
  2. socket.io 中文api
  3. select循环读取数据
  4. 智力大冲浪(洛谷P1230)
  5. windows安装oracle11g
  6. mysql查询某天是本年第几周
  7. 网络编程3 网络编程之缓冲区&amp;subprocess&amp;粘包&amp;粘包解决方案
  8. 常用的SQLalchemy 字段类型
  9. rest_framework之url控制器详解
  10. django--mysql设置