mapreduce编程练习(二)倒排索引 Combiner的使用以及练习
2024-08-30 13:39:56
问题一:请使用利用Combiner的方式:根据图示内容编写maprdeuce程序
示例程序
package com.greate.learn;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class GetFile_Statistics extends Configured implements Tool {
public static class CountMapper extends Mapper<LongWritable, Text, Text, Text>{
private Text word = new Text();
private Text one = new Text(1+"");
@Override
protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException,InterruptedException{
System.out.println("line pos:" + key.toString());
String line = value.toString();
String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreElements()) {
word.set(tokenizer.nextToken()+" : "+fileName);
context.write(word, one);
}
}
}
public static class Combiner extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
int sum = 0;
for(Text v : values){
sum += Integer.parseInt(v.toString());
}
System.out.println("sum:" + sum);
String[] valueString = key.toString().split(" : ");
context.write(new Text(valueString[0]), new Text(valueString[1]+":" + sum));
}
}
public static class CountReducer extends Reducer<Text, Text, Text, Text>{
static String beforeKey = "";
static String beforeValue ="";
@Override
protected void reduce(Text key, Iterable<Text> values,
Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String key2 = key.toString();
String value = "";
for(Text text: values){
value = text.toString();
if(key2.equals(beforeKey)){
beforeKey = key2;
beforeValue = beforeValue +";"+value;
}else{
beforeKey = key2;
beforeValue = value;
}
}
context.write(new Text(beforeKey), new Text(beforeValue));
}
}
static FileSystem fs = null;
static Configuration conf=null;
public static void init() throws Exception{
conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000/");
fs = FileSystem.get(new URI("hdfs://localhost:9000/"),conf,"hadoop");
}
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(),"WordCount");
job.setJarByClass(GetFile_Statistics.class);
job.setMapperClass(CountMapper.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(CountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Path in = new Path("/GetFile_Statistics/input");
if(fs.exists(in)){
FileInputFormat.addInputPath(job, in);
}else{
System.out.println("文件夹不存在,需要创建!");
}
Path os = new Path("/GetFile_Statistics/output");
int flage = 0;
if(fs.exists(os)){
System.out.println("文件夹存在!不再创建!");
fs.delete(os, true);
FileOutputFormat.setOutputPath(job, os);
flage = job.waitForCompletion(false) ? 0:1;
}else{
FileOutputFormat.setOutputPath(job, os);
flage = job.waitForCompletion(false) ? 0:1;
}
return flage;
}
public static void main(String[] args) throws Exception {
init();
int res = ToolRunner.run(new GetFile_Statistics(), args);
System.exit(res);
}
}
问题二:现有一批电话通信清单,记录了用户A拨打某些特殊号码(如120,10086,13800138000等)的记录。需要做一个统计结果,记录拨打给用户B的所有用户A。
示例程序
package com.greate.learn;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class PhoneNumber_Statistic extends Configured implements Tool{
public static void main (String[] args) throws Exception{
ToolRunner.run(new PhoneNumber_Statistic(), args);
}
public int run(String[] arg0) throws Exception{
Configuration conf = getConf();
Job job = new Job(conf);
job.setJarByClass(getClass());
FileSystem fs = FileSystem.get(conf);
FileInputFormat.setInputPaths(job, new Path("/PhoneNumber_Statistics/input/"));
FileOutputFormat.setOutputPath(job, new Path("/PhoneNumber_Statistics/output/"));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(numberMap.class);
job.setReducerClass(numberReduce.class);
job.waitForCompletion(true);
return 0;
}
}
class numberMap extends Mapper<LongWritable, Text, Text, Text>{
protected void map(LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String[] list = value.toString().split(" ");
String keyy = list[1];
String valuee = list[0];
context.write(new Text(keyy), new Text(valuee));
}
}
class numberReduce extends Reducer<Text, Text, Text, Text>{ //��������
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException,InterruptedException{
String valuee;
String out = "";
for(Text value:values){
valuee = value.toString() + " | ";
out +=valuee;
}
context.write(key,new Text(out));
}
}
最新文章
- jsContext全局函数调用与对象函数调用、evaluateScript
- NOIP模拟赛 行走
- [翻译] java NIO Channel
- ios NSURLSession(iOS7后,取代NSURLConnection)使用说明及后台工作流程分析
- css规范大全
- dom 冒泡事件
- 关于this 的一个问题
- 如何在github上传自己的项目
- PHP学习笔记三十【final】
- HBase集群安装
- 华为OJ之尼科彻斯定理
- C++三种野指针及应对/内存泄露
- unity图片后期处理
- 开blog
- C语言第七次作业
- markdown操作手册
- springboot项目发布到独立的tomcat中运行&;打成jar包运行
- 为什么要用 Node.js
- 算法笔记(C++)
- Spark Shuffle Write阶段磁盘文件分析