1.   wordcount示例开发

1.1. wordcount程序整体运行流程示意图

map阶段: 将每一行文本数据变成<单词,1>这样的kv数据

reduce阶段:将相同单词的一组kv数据进行聚合:累加所有的v

注意点:mapreduce程序中,

map阶段的进、出数据,

reduce阶段的进、出数据,

类型都应该是实现了HADOOP序列化框架的类型,如:

String对应Text

Integer对应IntWritable

Long对应LongWritable

1.2. 编码实现

WordcountMapper类开发

WordcountReducer类开发

JobSubmitter客户端类开发

《详见代码》

1.3. 运行mr程序

1)         将工程整体打成一个jar包并上传到linux机器上,

2)         准备好要处理的数据文件放到hdfs的指定目录中

3)         用命令启动jar包中的Jobsubmitter,让它去提交jar包给yarn来运行其中的mapreduce程序  :  hadoop jar wc.jar cn.edu360.mr.wordcount.JobSubmitter .....

4)         去hdfs的输出目录中查看结果

1.4. mr程序运行模式

mr程序的运行方式:

1、yarn

2、本地(windows   linux)

决定以哪种模式运行的

 package mr.flow;

 import java.io.DataInput;

 import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable,Comparable<FlowBean> { private int upFlow;
private int dFlow;
private String phone;
private int amountFlow; public FlowBean(){} public FlowBean(String phone, int upFlow, int dFlow) {
this.phone = phone;
this.upFlow = upFlow;
this.dFlow = dFlow;
this.amountFlow = upFlow + dFlow;
} public String getPhone() {
return phone;
} public void setPhone(String phone) {
this.phone = phone;
} public int getUpFlow() {
return upFlow;
} public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
} public int getdFlow() {
return dFlow;
} public void setdFlow(int dFlow) {
this.dFlow = dFlow;
} public int getAmountFlow() {
return amountFlow;
} public void setAmountFlow(int amountFlow) {
this.amountFlow = amountFlow;
} /**
* hadoop系统在序列化该类的对象时要调用的方法
*/
@Override
public void write(DataOutput out) throws IOException { out.writeInt(upFlow);
out.writeUTF(phone);
out.writeInt(dFlow);
out.writeInt(amountFlow); } /**
* hadoop系统在反序列化该类的对象时要调用的方法
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.phone = in.readUTF();
this.dFlow = in.readInt();
this.amountFlow = in.readInt();
} @Override
public String toString() { return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;
} @Override
public int compareTo(FlowBean o) { return o.amountFlow;
} } //
//public class FlowBean implements Writable {
//
// String phoneNum;
//
// public String getPhoneNum() {
// return phoneNum;
// }
//
// public void setPhoneNum(String phoneNum) {
// this.phoneNum = phoneNum;
// }
//
// int upFlow;
// int downFlow;
// int sunFlow;
//
// public FlowBean() {
// }
//
// public FlowBean(int up, int down , String num) {
// this.upFlow = up;
// this.downFlow = down;
// this.sunFlow = up+down;
// this.phoneNum = num;
// }
//
// public int getUpFlow() {
// return upFlow;
// }
//
// public void setUpFlow(int upFlow) {
// this.upFlow = upFlow;
// }
//
// public int getDownFlow() {
// return downFlow;
// }
//
// public void setDownFlow(int downFlow) {
// this.downFlow = downFlow;
// }
//
// public int getSunFlow() {
// return sunFlow;
// }
//
// public void setSunFlow(int sunFlow) {
// this.sunFlow = sunFlow;
// }
//
// /**
// * hadoop系统在序列化该类的对象时要调用的方法
// */
// @Override
// public void readFields(DataInput input) throws IOException {
// this.upFlow = input.readInt();
// this.downFlow = input.readInt();
// this.sunFlow = input.readInt();
// this.phoneNum = input.readUTF();
// }
//
// /**
// * hadoop系统在反序列化该类的对象时要调用的方法
// */
// @Override
// public void write(DataOutput out) throws IOException {
// // TODO Auto-generated method stub
// out.writeInt(upFlow);
// out.writeInt(downFlow);
// out.writeInt(sunFlow);
// out.writeUTF(phoneNum);
// }
//
// @Override
// public String toString() {
// // TODO Auto-generated method stub
// return this.upFlow + "," + this.downFlow + "," + this.sunFlow;
// }
//
//}
 package mr.flow;
import java.io.IOException; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException { String line = value.toString();
String[] fields = line.split("\t"); String phone = fields[1]; int upFlow = Integer.parseInt(fields[fields.length-3]);
int dFlow = Integer.parseInt(fields[fields.length-2]); context.write(new Text(phone), new FlowBean(phone, upFlow, dFlow));
} } //
//import java.io.IOException;
//
//import org.apache.hadoop.io.LongWritable;
//import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapreduce.Mapper;
//
//public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
//
// @Override
// protected void map(LongWritable key, Text value,
// Mapper<LongWritable, Text, Text, FlowBean>.Context context)
// throws IOException, InterruptedException {
// String[] values=value.toString().split("/t");
// context.write(new Text(values[1]), new FlowBean(values[1],Integer.parseInt(values[values.length-3]), Integer.parseInt(values[values.length-2])));
//
// }
//
//}
 package mr.flow;

 import java.io.IOException;

 import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ /**
* key:是某个手机号
* values:是这个手机号所产生的所有访问记录中的流量数据
*
* <135,flowBean1><135,flowBean2><135,flowBean3><135,flowBean4>
*/
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException { int upSum = 0;
int dSum = 0; for(FlowBean value:values){
upSum += value.getUpFlow();
dSum += value.getdFlow();
} context.write(key, new FlowBean(key.toString(), upSum, dSum)); } } //
//import java.io.IOException;
//
//import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapreduce.Reducer;
//
//public class FlowCountReduce extends Reducer<Text, FlowBean, Text, FlowBean> {
//
// @Override
// protected void reduce(Text key, Iterable<FlowBean> value,
// Reducer<Text, FlowBean, Text, FlowBean>.Context context)
// throws IOException, InterruptedException {
// int upSun=0,downSun=0;
//
// for (FlowBean flowBean : value) {
// upSun+=flowBean.getUpFlow();
// downSun+=flowBean.getAmountFlow();
// }
// context.write(key, new FlowBean( key.toString(),upSun,downSun));
// }
//}
 package mr.flow; 

 import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobSubmitMain { public static final String HADOOP_INPUT_PATH = "hdfs://hadoop1:9000/InputFlow";
public static final String HADOOP_OUTPUT_PATH = "hdfs://hadoop1:9000/OutputFlow";
public static final String HADOOP_ROOT_PATH = "hdfs://hadoop1:9000";
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
// 2、设置job提交到哪去运行
//conf.set("fs.defaultFS", HADOOP_ROOT_PATH);
//conf.set("mapreduce.framework.name", "yarn");
Job job = Job.getInstance();
job.setJarByClass(JobSubmitMain.class);
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
Path output = new Path(HADOOP_OUTPUT_PATH);
FileSystem fs = FileSystem.get(new URI(HADOOP_ROOT_PATH), conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileInputFormat.setInputPaths(job, new Path(HADOOP_INPUT_PATH));
FileOutputFormat.setOutputPath(job, output);
job.setNumReduceTasks(1);
//job.submit();
job.waitForCompletion(true);
System.out.println("OK");
}
}

JobSubmitMain

WordCount  main 类(Windows需要注意)

 package WordCount;

 import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountMain { public static final String HADOOP_ROOT_PATH = "hdfs://hadoop1:9000";
public static final String HADOOP_INPUT_PATH = "hdfs://hadoop1:9000/Input";
public static final String HADOOP_OUTPUT_PATH = "hdfs://hadoop1:9000/Output"; public static void main(String[] args) throws IOException,
URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration();
// 1、设置job运行时要访问的默认文件系统
//conf.set("fs.defaultFS", HADOOP_ROOT_PATH);
// 2、设置job提交到哪去运行
conf.set("mapreduce.framework.name", "yarn");
//conf.set("yarn.resourcemanager.hostname", "hadoop1");
// 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
//conf.set("mapreduce.app-submission.cross-platform", "true"); Job job = Job.getInstance(conf); // 1、封装参数:jar包所在的位置
job.setJar("/home/hadoop/wordcount.jar");
//job.setJarByClass(WordCountMain.class); // 2、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordcountReducer.class); // 3、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); // 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
Path output = new Path(HADOOP_OUTPUT_PATH);
FileSystem fs = FileSystem.get(new URI(HADOOP_ROOT_PATH), conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileInputFormat.setInputPaths(job, new Path(HADOOP_INPUT_PATH));
FileOutputFormat.setOutputPath(job, output); // 注意:输出路径必须不存在 // 5、封装参数:想要启动的reduce task的数量
job.setNumReduceTasks(2); // 6、提交job给yarn
boolean res = job.waitForCompletion(true);
System.out.println("OK");
System.exit(res ? 0 : -1); } }

WordCountMain

关键点是:

参数 mapreduce.framework.name = yarn | local

同时,如果要运行在yarn上,以下两个参数也需要配置:

参数 yarn.resourcemanager.hostname = ....

参数 fs.defaultFS = ....

最新文章

  1. Windows update 失败的解决方案
  2. CAS单点登录中文用户名乱码问题
  3. Xcode模拟器和真机生成的日志查看(转载)
  4. 低功耗蓝牙4.0BLE编程-nrf51822开发(8)-GATT
  5. Application,Session,Cookie,ViewState和Cache区别
  6. 个人实验记录之EIGRP基本配置
  7. php常用的header头
  8. php获取post参数的几种方式
  9. 2016-3-6.16:43------------js开始
  10. Kubernetes 架构(上)- 每天5分钟玩转 Docker 容器技术(120)
  11. vue2 前端搜索实现
  12. BZOJ_2595_[Wc2008]游览计划_斯坦纳树
  13. Delete from join 用法
  14. java中伪共享问题
  15. C# Socket网络编程精华篇 (转)
  16. 20155208徐子涵 2016-2017-2 《Java程序设计》第7周学习总结
  17. python装饰器学习笔记
  18. mingw 构建 Geos
  19. &lt;转&gt;lua解释执行脚本流程
  20. Debian 利用 iso 镜像完全离线更新 apt-cdrom

热门文章

  1. 最短路 || Codeforces 938D Buy a Ticket
  2. ibatis经验
  3. sftp ftp文件同步方案
  4. 安装Yii2提示Failed to decode response: zlib_decode(): data error错误解决方法
  5. POJ-1011(sticks,深搜)
  6. 6 SQL 函数、谓词、CASE表达式
  7. pwntools各使用模块简介
  8. POJ 2485 Highways (求最小生成树中最大的边)
  9. STM32F407 独立看门狗 个人笔记
  10. POJ 1159 字符串匹配问题