0. 说明

  编写 MapReduce 程序实现年度最高气温统计


1. 气温数据分析

  气温数据样例如下:

++023450FM-+000599999V0202701N015919999999N0000001N9-+99999102001ADDGF108991999999999999999999
++023450FM-+000599999V0202901N008219999999N0000001N9-+99999102001ADDGF104991999999999999999999
++023450FM-+000599999V0209991C000019999999N0000001N9-+99999102001ADDGF108991999999999999999999
++023450FM-+000599999V0201801N008219999999N0000001N9-+99999101831ADDGF108991999999999999999999
++023450FM-+000599999V0201801N009819999999N0000001N9-+99999101761ADDGF108991999999999999999999

  对气温数据进行分析可以得出以下的结论
    1. 年份的索引为 15-19 ,以此作为 Key
    2. 气温的索引为 87-92 ,以此作为 Value

  【思路】

  在 Map 阶段将原始数据映射成满足要求的 K-V 对,在 Reduce 阶段对相同 Key 的值进行比较,得到最大值


2. 代码编写

   [2.1 MaxTempMapper.java]

package hadoop.mr.maxtemp;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /**
* Mapper 类
*/
public class MaxTempMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将 value 变为 String 格式
String line = value.toString();
// 获得年份
String year = line.substring(15, 19);
// 获得温度
int temp = Integer.parseInt(line.substring(87, 92)); // 存在脏数据 9999,所以要将其过滤
if (temp != 9999) {
// 输出年份与温度
context.write(new Text(year), new IntWritable(temp));
} }
}

  [2.2 MaxTempReducer.java]

package hadoop.mr.maxtemp;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /**
* Reducer 类
*/
public class MaxTempReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
Integer max = Integer.MIN_VALUE; // 得到最大值
for (IntWritable value : values) {
max = Math.max(max, value.get());
} // 输出年份与最大温度
context.write(key, new DoubleWritable(max / 10.0));
}
}

  [2.3 MaxTempApp.java]

package hadoop.mr.maxtemp;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /**
* max Temp APP
*/
public class MaxTempApp {
public static void main(String[] args) throws Exception {
// 初始化配置文件
Configuration conf = new Configuration(); // 仅在本地开发时使用
conf.set("fs.defaultFS", "file:///"); // 初始化文件系统
FileSystem fs = FileSystem.get(conf); // 通过配置文件初始化 job
Job job = Job.getInstance(conf); // 设置 job 名称
job.setJobName("max Temp"); // job 入口函数类
job.setJarByClass(MaxTempApp.class); // 设置 mapper 类
job.setMapperClass(MaxTempMapper.class); // 设置 reducer 类
job.setReducerClass(MaxTempReducer.class); // 设置 map 的输出 K-V 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class); // 设置 reduce 的输出 K-V 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class); // 新建输入输出路径
Path pin = new Path("E:/file/temp");
Path pout = new Path("E:/test/wc/out"); // 打包后自定义输入输出路径
// Path pin = new Path(args[0]);
// Path pout = new Path(args[1]); // 设置输入路径和输出路径
FileInputFormat.addInputPath(job, pin);
FileOutputFormat.setOutputPath(job, pout); // 判断输出路径是否已经存在,若存在则删除
if (fs.exists(pout)) {
fs.delete(pout, true);
} // 执行 job
job.waitForCompletion(true);
}
}

3. 测试

  本地模式下运行代码的结果如下

   


4. 部署到集群上

  【4.1 修改代码 MaxTempApp.java】

    

  【4.2 打包程序】

  

   

  【4.3 运行程序】

  开启 Hadoop 集群,然后将 temp 数据文件上传到 HDFS 中,过程略

  运行以下命令

hadoop jar myhadoop-1.0-SNAPSHOT.jar hadoop.mr.maxtemp.MaxTempApp /testdata/temp /testdata/out

  【查看结果】

  命令行下可以看到结果,Web UI 查看 http://s101:8088

  


最新文章

  1. SSH整合(struts2.3.24+hibernate3.6.10+spring4.3.2+mysql5.5+myeclipse8.5+tomcat6+jdk1.6)
  2. jquery 插件之 点赞“+1” 特效
  3. 【poj1080】 Human Gene Functions
  4. bzoj 2186: [Sdoi2008]沙拉公主的困惑
  5. 用php 查询显示新闻消息
  6. Jmeter使用
  7. 九度oj 1184 二叉树遍历
  8. angularJS之$apply()方法
  9. Linux的内存回收和交换
  10. Awesome Javascript(中文翻译版)
  11. java中获得jar包执行路径的方法
  12. Jquery.Validate验证CheckBoxList,RadioButtonList,DropDownList是否选中
  13. 服务器部署_nginx报错: [warn] conflicting server name &quot;www.test.com&quot; on 0.0.0.0:80, ignored
  14. dojo.io.script
  15. js---DOM元素节点
  16. flask 扩展之 -- flask-pagedown
  17. c oth
  18. Luogu P1257 平面上的最接近点对_暴力
  19. 数据可视化的开源方案: Superset vs Redash vs Metabase (二)
  20. ubuntu配置neuwork网络

热门文章

  1. 移动端真机调试终极利器-BrowserSync(使用方法)
  2. Redis学习系列七分布式锁
  3. 线程安全-005-synchronized其他概念
  4. 在Hadoop集群上的Hive配置
  5. .NET内存管理、垃圾回收
  6. Java_反射demo
  7. Atom编辑器中安装Emmet插件失败的问题
  8. 从零开始学 Web 之 JavaScript(二)变量
  9. Deep learning with Python 学习笔记(9)
  10. 基于Asp.Net Core 2.1的简单问答社区系统源代码分享