转载请注明出处:http://blog.csdn.net/xiaojimanman/article/details/40184581

近期几天一直在看hadoop相关的书籍,眼下略微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。

需求描写叙述:

依据超市的销售清单,计算商品之间的关联程度(即统计同一时候买A商品和B商品的次数)。

数据格式:

超市销售清单简化为例如以下格式:一行表示一个清单,每一个商品採用 "," 切割,例如以下图所看到的:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQveGlhb2ppbWFubWFu/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

需求分析:

採用hadoop中的mapreduce对该需求进行计算。

map函数主要拆分出关联的商品,输出结果为 key为商品A,value为商品B,对于第一条三条结果拆分结果例如以下图所看到的:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQveGlhb2ppbWFubWFu/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

这里为了统计出和A、B两件商品想关联的商品。所以商品A、B之间的关系输出两条结果即 A-B、B-A。

reduce函数分别对和商品A相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品A|商品B。value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为R的做下分析:

通过map函数的处理,得到例如以下图所看到的的记录:

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQveGlhb2ppbWFubWFu/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

reduce中对map输出的value值进行分组计数,得到的结果例如以下图所看到的

watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQveGlhb2ppbWFubWFu/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/Center" alt="">

将商品A B作为key,组合个数作为value输出,输出结果例如以下图所看到的:

对于需求的实现过程的分析到眼下就结束了。以下就看下详细的代码实现

代码实现:

关于代码就不做具体的介绍。具体參照代码之中的凝视吧。

package com;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class Test extends Configured implements Tool{ /**
* map类,实现数据的预处理
* 输出结果key为商品A value为关联商品B
* @author lulei
*/
public static class MapT extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
String line = value.toString();
if (!(line == null || "".equals(line))) {
//切割商品
String []vs = line.split(",");
//两两组合。构成一条记录
for (int i = 0; i < (vs.length - 1); i++) {
if ("".equals(vs[i])) {//排除空记录
continue;
}
for (int j = i+1; j < vs.length; j++) {
if ("".equals(vs[j])) {
continue;
}
//输出结果
context.write(new Text(vs[i]), new Text(vs[j]));
context.write(new Text(vs[j]), new Text(vs[i]));
}
}
}
}
} /**
* reduce类,实现数据的计数
* 输出结果key 为商品A|B value为该关联次数
* @author lulei
*/
public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> {
private int count; /**
* 初始化
*/
public void setup(Context context) {
//从參数中获取最小记录个数
String countStr = context.getConfiguration().get("count");
try {
this.count = Integer.parseInt(countStr);
} catch (Exception e) {
this.count = 0;
}
}
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{
String keyStr = key.toString();
HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
//利用hash统计B商品的次数
for (Text value : values) {
String valueStr = value.toString();
if (hashMap.containsKey(valueStr)) {
hashMap.put(valueStr, hashMap.get(valueStr) + 1);
} else {
hashMap.put(valueStr, 1);
}
}
//将结果输出
for (Entry<String, Integer> entry : hashMap.entrySet()) {
if (entry.getValue() >= this.count) {//仅仅输出次数不小于最小值的
context.write(new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue()));
}
}
}
} @Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
Configuration conf = getConf();
conf.set("count", arg0[2]); Job job = new Job(conf);
job.setJobName("jobtest"); job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); job.setMapperClass(MapT.class);
job.setReducerClass(ReduceT.class); FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } /**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
if (args.length != 3) {
System.exit(-1);
}
try {
int res = ToolRunner.run(new Configuration(), new Test(), args);
System.exit(res);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} }

上传执行:

将程序打包成jar文件,上传到机群之中。

将測试数据也上传到HDFS分布式文件系统中。

命令执行截图例如以下图所看到的:

执行结束后查看对应的HDFS文件系统,例如以下图所看到的:

到此一个完整的mapreduce程序就完毕了,关于hadoop的学习。自己还将继续~

最新文章

  1. 【JavaScript】ArtTemplate个人的使用体验。
  2. CSS垂直居中
  3. AndroidStudio导入项目出现Your project path contains non-ASCII characters错误
  4. 【Python大系】Python快速教程
  5. Endnote专题之--output style相关问题
  6. Codeforces Round #353 (Div. 2)
  7. 第一个java程序hello world
  8. BZOJ2061 : Country
  9. IIS 7.0、IIS 7.5 和 IIS 8.0 中的 HTTP 状态代码 转
  10. CosCos2D-android 代码总结
  11. iOS之NSURLSessionDownloadTask下载
  12. Attach file to database
  13. Spring Session - Spring Boot
  14. switch与java,c#的异同
  15. EF中用Newtonsoft.Json引发的循环引用问题
  16. java学习书籍推荐
  17. 对Numpy广播操作的理解
  18. Python 简单的输出
  19. 命令行登陆mysql提示&#39;mysql&#39; 不是内部或外部命令
  20. 用virtualenv建立多个Python独立开发环境

热门文章

  1. appium---【Mac】Appium-Doctor提示WARN:“ opencv4nodejs cannot be found”解决方案
  2. Jmeter-----保存到响应文件
  3. django rest_framework中将json输出字符强制为utf-8编码
  4. Rule Compilation error xxx cannot be resolved
  5. Hadoop案例(一)之日志清洗
  6. xpath语法规则
  7. python部分内容存档
  8. Java学习笔记之:Java Servlet环境配置
  9. JAVA编程思想读书笔记(二)--容器
  10. 洛谷P2507 [SCOI2008]配对 [DP,贪心]