需求:

编写MapReduce程序算出高峰时间段(如9-10点)哪张表被访问的最频繁的表,以及这段时间访问这张表最多的用户,以及这个用户访问这张表的总时间开销。

测试数据:


TableName(表名),Time(时间),User(用户),TimeSpan(时间开销)

*t003 6:00 u002 180

*t003 7:00 u002 180

*t003 7:08 u002 180

*t003 7:25 u002 180

*t002 8:00 u002 180

*t001 8:00 u001 240

*t001 9:00 u002 300

*t001 9:11 u001 240

*t003 9:26 u001 180

*t001 9:39 u001 300

*t001 10:00 u001 200


代码

方法一:

package com.table.main;

import java.io.IOException;
import java.util.HashMap; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TableUsed { public static class MRMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().substring(1).split("\\s+");
Long time = Long.parseLong(split[1].charAt(0) + "");
// 筛选9-10点使用过的表
if (time == 9 || time == 10) {
context.write(new Text(split[0]), new Text(split[2] + ":" + split[3]));
}
}
} public static class MRReducer extends Reducer<Text, Text, Text, Text> {
// 存放使用量最大的表的表名及用户
public static HashMap<String, HashMap<String, Integer>> map = new HashMap<String, HashMap<String, Integer>>();
// 最大用使用量
public static int max_used_num = 0;
// 使用量最大的表
public static String table = ""; protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { HashMap<String, Integer> user_map = new HashMap<String, Integer>(); int table_used_num = 0;
for (Text t : values) {
table_used_num++;
String[] split = t.toString().split(":"); // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户
if (user_map.get(split[0]) == null) {
user_map.put(split[0], Integer.parseInt(split[1]));
} else {
Integer use_time = user_map.get(split[0]);
use_time += Integer.parseInt(split[1]);
user_map.put(split[0], use_time);
}
}
if (table_used_num > max_used_num) {
map.put(key.toString(), user_map);
table = key.toString();
max_used_num = table_used_num;
}
} protected void cleanup(Context context) throws IOException, InterruptedException { // 循环map,查出使用时间最长的用户信息
HashMap<String, Integer> map2 = map.get(table); int max = 0;
String max_used_user = "";
for (HashMap.Entry<String, Integer> m : map2.entrySet()) {
if (m.getValue() > max) {
max = m.getValue();
max_used_user = m.getKey();
}
}
context.write(new Text(table), new Text("\t" + max_used_user + "\t" + map2.get(max_used_user)));
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(TableUsed.class); job.setMapperClass(MRMapper.class);
job.setReducerClass(MRReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put2"));
System.out.println(job.waitForCompletion(true) ? 1 : 0);
}
}
缺点:只算出使用时间最长的用户,没有判断该用户是否是使用次数最多的

方法二:

package com.table.main;

import java.io.IOException;
import java.util.HashMap; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TableUsed { public static class MRMapper extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().substring(1).split("\\s+");
Long time = Long.parseLong(split[1].charAt(0) + "");
// 筛选9-10点使用过的表
if (time == 9 || time == 10) {
context.write(new Text(split[0]), new Text(split[2] + ":" + split[3]));
}
}
} public static class MRReducer extends Reducer<Text, Text, Text, Text> {
// 表的最大使用次数 使用该表最多的用户
public static int max_used_num = 0, max_user_used = 0;
// 使用量最大的表 使用该表最多的用户名
public static String max_used_table = "", user_name = "";
// 使用次数最多的用户的 使用时间
public static Integer user_used_time = 0; protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException { HashMap<String, Integer> user_map = new HashMap<String, Integer>();
HashMap<String, Integer> user_used_map = new HashMap<String, Integer>(); int table_used_num = 0;// 表的使用次数
Integer use_num = 0;// 用户使用次数
Integer use_time = 0;//使用时间
String username = "";//用户名 for (Text t : values) {
table_used_num++;
String[] split = t.toString().split(":"); // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户
if (user_map.get(split[0]) == null) { user_map.put(split[0], Integer.parseInt(split[1]));
user_used_map.put(split[0], 1);
} else {
use_time = user_map.get(split[0]);
use_time += Integer.parseInt(split[1]);
user_map.put(split[0], use_time); use_num = user_used_map.get(split[0]);
use_num ++;
user_used_map.put(split[0], use_num);
} /**
* 判断该用户是否为此表使用次数最多的,
* 是则存进user_map和user_used_map,否则不存;
* 由于只需要求使用量最多的用户,因此使用量不是最多用户没有必要存在于map中
*/
if (use_num > max_user_used) {
username = split[0];
max_user_used = use_num;
user_used_time = use_time;
//此处也可以不remove()
user_used_map.remove(split[0]);
user_map.remove(split[0]);
}
} if (table_used_num > max_used_num) {
max_used_table = key.toString();
max_used_num = table_used_num;
user_name = username;
}
} protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new Text(max_used_table), new Text(max_user_used + "\t" + user_name + "\t" + user_used_time));
}
} public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(TableUsed.class); job.setMapperClass(MRMapper.class);
job.setReducerClass(MRReducer.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put6"));
System.out.println(job.waitForCompletion(true) ? 1 : 0);
}
}

最新文章

  1. jQuery.grep()
  2. Codeforces Round #369 (Div. 2)---C - Coloring Trees (很妙的DP题)
  3. 使用 IntraWeb (39) - THttpRequest、THttpReply
  4. 在Ubuntu 14.04 上安装网易云音乐
  5. android在程序中打开另一个程序
  6. 基于css3的文字3D翻转特效
  7. CFNetwork学习总结
  8. Wbemtest查询
  9. SqlSugar简单工模式数据访问简单Demo
  10. C#中的一些复习。
  11. MySQL索引初探
  12. Python 数据分析1
  13. vue路由异步组件案例
  14. Eclipse 插件Maven在使用 add dependency,找不到包,解决办法
  15. centos6配置网络常见问题
  16. Python入门学习指南--内附学习框架
  17. Swagger 路径过滤 -PreSerializeFilters
  18. [echats] - EChats图表的使用
  19. android-eclips中logcat不显示信息的问题解决
  20. AtCoder Grand Contest 029 Solution

热门文章

  1. JMETER 不同线程组 变量值 的参数传递(转)
  2. 怎么查看mac系统是32位还是64位的操作系统
  3. java动态编译 (java在线执行代码后端实现原理)
  4. DHTML 简介
  5. Linux入门之运维(1) 系统监控 vmstat top
  6. kubernetes,Docker网络相关资料链接
  7. Linux命令之pip
  8. PHP引用符&amp;的用法详细解析
  9. Xamrin开发安卓笔记(一)
  10. CNN学习笔记:正则化缓解过拟合