Training

入口

package org.wordCount;

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; public class WordMain { // private static List<String> secondDir = new ArrayList<String>(); public static void main(String[] args) throws Exception { Configuration conf = new Configuration();
//下面两行很重要
conf.set("mapred.jar", "E://eclipse//jar-work//WordMain.jar");
conf.set("mapred.job.tracker", "192.168.190.128:9001"); //设置单词先验概率的保存路径
String priorProbality = "hdfs://192.168.190.128:9000/user/hadoop/output/priorP/priorProbability.txt";
conf.set("priorProbality", priorProbality); //单词总种类数的保存路径
String totalWordsPath = "hdfs://192.168.190.128:9000/user/hadoop/output/totalwords.txt";
conf.set("totalWordsPath", totalWordsPath); //每个类别中单词总数
String wordsInClassPath = "hdfs://192.168.190.128:9000/user/hadoop/mid/wordsFrequence/_wordsInClass/wordsInClass-r-00000";
conf.set("wordsInClassPath", wordsInClassPath); //设置输入 和 单词词频的输出路径
// "/user/hadoop/input/NBCorpus/Country"
String input = "hdfs://192.168.190.128:9000/user/hadoop/input/NBCorpus/Country";
String wordsOutput = "hdfs://192.168.190.128:9000/user/hadoop/mid/wordsFrequence";
conf.set("input", input);
conf.set("wordsOutput", wordsOutput); //每个类别单词概率保存路径,
//单词词频的输入路径也就是单词词频的输出路径 String freqOutput = "hdfs://192.168.190.128:9000/user/hadoop/output/probability/";
conf.set("freqOutput", freqOutput); FileCount.run(conf);
WordCount.run(conf);
Probability.run(conf);
/*
System.out.print("----------"); String[] otherArgs = new String[] { "hdfs://192.168.190.128:9000/user/hadoop/test/",
"hdfs://192.168.190.128:9000/user/hadoop/wordcount/output2/" };
conf.set("mapred.jar", "E://eclipse//jar-work//WordMain.jar"); Job job = new Job(conf, "word count");
job.setJarByClass(WordMain.class); job.setInputFormatClass(MyInputFormat.class); job.setMapperClass(WordMapper.class);
// job.setCombinerClass(WordReducer.class);
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// MyUtils.addInputPath(job, new Path(otherArgs[0]), conf); List<Path> inputPaths = getSecondDir(conf, otherArgs[0]);
for (Path path : inputPaths) {
System.out.println("path = " + path.toString());
MyInputFormat.addInputPath(job, path); }
System.out.println("addinputpath ok" );
// FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);*/ } // 获取文件夹下面二级文件夹路径的方法
static List<Path> getSecondDir(Configuration conf, String folder) throws Exception {
Path path = new Path(folder);
FileSystem fs = path.getFileSystem(conf);
FileStatus[] stats = fs.listStatus(path);
List<Path> folderPath = new ArrayList<Path>();
for (FileStatus stat : stats) {
if (stat.isDir()) {
if (fs.listStatus(stat.getPath()).length > 10) { // 筛选出文件数大于10个的类别作为
// 输入路径
folderPath.add(stat.getPath());
}
}
}
return folderPath;
} }

统计各个类别文本数

package org.wordCount;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; /**
*
* 获取文件个数,并计算先验概率 先验概率保存在/user/hadoop/output/priorP/prior.txt
*
*/ public class FileCount { public static void run(Configuration conf) throws Exception { int sum = 0;
String in = conf.get("input"); Map<String, Integer> map = new HashMap<>();
Map<String, Double> priorMap = new HashMap<>(); // map传值(需要筛选测试集,有的类别文本数太少要删除)
map = FileCount.getFileNumber(in); //测试打印出每个类别和文件总数
Iterator<Map.Entry<String, Integer>> itrs = map.entrySet().iterator();
while (itrs.hasNext()) {
// System.out.println("ok");
Map.Entry<String, Integer> it = itrs.next();
if(it.getValue() <= 10){ //这两行代码可以不计算文本数少于10的类别
itrs.remove();
}else{
sum += it.getValue();
System.out.println(it.getKey() + "\t" + it.getValue());
}
} System.out.println("sum = " + sum); String output = conf.get("priorProbality"); Path outputPath = new Path(output);
FileSystem fs = outputPath.getFileSystem(conf);
FSDataOutputStream outputStream = fs.create(outputPath); //计算每个类别文本占总文本的比率,即先验概率
String ctx = "";
for (Map.Entry<String, Integer> entry : map.entrySet()) {
Double result = 0.0;
result = Double.parseDouble(entry.getValue().toString()) / sum;
priorMap.put(entry.getKey(), result);//保存在priorMap中
ctx += entry.getKey() + "\t" + result + "\n";
}
outputStream.writeBytes(ctx);
IOUtils.closeStream(outputStream); // 打印概率信息,同时可以写入文件中
// map的另外一种遍历方法
Iterator<Map.Entry<String, Double>> iterators = priorMap.entrySet().iterator();
while (iterators.hasNext()) {
Map.Entry<String, Double> iterator = iterators.next();
System.out.println(iterator.getKey() + "\t" + iterator.getValue());
} } // get 方法
public static Map<String, Integer> getFileNumber(String folderPath) throws Exception { Map<String, Integer> fileMap = new HashMap<>();
Configuration conf = new Configuration(); Path path = new Path(folderPath);
FileSystem hdfs = path.getFileSystem(conf);
FileStatus[] status = hdfs.listStatus(path);
// System.out.println(folderPath);
// System.out.println("status.length = " + status.length); for (FileStatus stat : status) {
if (stat.isDir()) {
int length = hdfs.listStatus(stat.getPath()).length;
String name = stat.getPath().getName();
fileMap.put(name, length);
}
} return fileMap;
} }

文本中单词计数

package org.wordCount;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class WordCount { private static MultipleOutputs<Text, IntWritable> mos;
// static String baseOutputPath = "/user/hadoop/test_out"; // 设计两个map分别计算每个类别的文本数//和每个类别的单词总数
// private static Map<String, List<String>> fileCountMap = new
// HashMap<String, List<String>>();
// private static Map<String, Integer> fileCount = new HashMap<String,
// Integer>();
// static Map<String, List<String>> wordsCountInClassMap = new
// HashMap<String, List<String>>(); static enum WordsNature {
CLSASS_NUMBER, CLASS_WORDS, TOTALWORDS
} // map
static class First_Mapper extends Mapper<Text, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1);
private final static IntWritable zero = new IntWritable(0); private Text countryName = new Text(); @Override
protected void map(Text key, Text value, Mapper<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String word = itr.nextToken();
if (!(MyUtils.hasDigit(word) || word.contains("."))) { // 去掉无意义词
countryName.set(key.toString() + "\t" + word); context.write(countryName, one); // 统计每个类别中的单词个数 ABL have 1
context.write(key, one); // 统计类别中的单词总数
context.write(new Text(word), zero); // 统计单词总数
}
} }
} // Reducer
static class First_Reducer extends Reducer<Text, IntWritable, Text, IntWritable> { // result 表示每个类别中每个单词的个数
IntWritable result = new IntWritable();
Map<String, List<String>> classMap = new HashMap<String, List<String>>();
Map<String, List<String>> fileMap = new HashMap<String, List<String>>(); @Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
} // sum为0,总得单词数加1,统计所有单词的种类
if (sum == 0) {
context.getCounter(WordsNature.TOTALWORDS).increment(1);
} else {// sum不为0时,通过key的长度来判断,
String[] temp = key.toString().split("\t");
if (temp.length == 2) { // 用tab分隔类别和单词
result.set(sum);
context.write(key, result);
// mos.write(new Text(temp[1]), result, temp[0]);
} else { // 类别中单词总数
result.set(sum);
mos.write(key, result, "_wordsInClass" + "\\" + "wordsInClass");
} } } @Override
protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException { mos.close();
} @Override
protected void setup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException { mos = new MultipleOutputs<Text, IntWritable>(context);
} } public static int run(Configuration conf) throws Exception {
// Configuration conf = new Configuration();
// System.out.print("---run-------");
// 设置不同文件的路径
// 文本数路径
// String priorProbality = "hdfs://192.168.190.128:9000/user/hadoop/output/priorP/priorProbality.txt";
// conf.set("priorProbality", priorProbality); Job job = new Job(conf, "file count"); job.setJarByClass(WordCount.class); job.setInputFormatClass(MyInputFormat.class); job.setMapperClass(WordCount.First_Mapper.class);
job.setReducerClass(WordCount.First_Reducer.class);
// System.out.println("---job-------");
// 过滤掉文本数少于10的类别 String input = conf.get("input"); List<Path> inputPaths = MyUtils.getSecondDir(conf, input);
for (Path path : inputPaths) {
System.out.println("path = " + path.toString());
MyInputFormat.addInputPath(job, path);
} String wordsOutput = conf.get("wordsOutput");
FileOutputFormat.setOutputPath(job, new Path(wordsOutput)); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); int exitCode = job.waitForCompletion(true) ? 0 : 1; // 调用计数器
Counters counters = job.getCounters();
Counter c1 = counters.findCounter(WordsNature.TOTALWORDS);
System.out.println("-------------->>>>: " + c1.getDisplayName() + ":" + c1.getName() + ": " + c1.getValue()); // 将单词种类数写入文件中
Path totalWordsPath = new Path("hdfs://192.168.190.128:9000/user/hadoop/output/totalwords.txt");
FileSystem fs = totalWordsPath.getFileSystem(conf);
FSDataOutputStream outputStream = fs.create(totalWordsPath);
outputStream.writeBytes(c1.getDisplayName() + ":" + c1.getValue());
IOUtils.closeStream(outputStream); // 下次求概率是尝试单词总种类数写到configuration中
//
// conf.set("TOTALWORDS", totalWords.toString()); return exitCode; } }

MyInputFormat

package org.wordCount;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class Probability { private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
public static int total = 0;
private static MultipleOutputs<Text, DoubleWritable> mos; // Client
public static void run(Configuration conf) throws Exception { // 读取单词总数,设置到congfiguration中
String totalWordsPath = conf.get("totalWordsPath");
// String wordsInClassPath = conf.get("wordsInClassPath"); // 先读取单词总类别数
FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
String strLine = buffer.readLine();
String[] temp = strLine.split(":");
if (temp.length == 2) {
// temp[0] = TOTALWORDS
conf.set(temp[0], temp[1]);// 设置两个String
} total = Integer.parseInt(conf.get("TOTALWORDS"));
LOG.info("------>total = " + total); System.out.println("total ==== " + total); Job job = new Job(conf, "file count"); job.setJarByClass(Probability.class); job.setMapperClass(WordsOfClassCountMapper.class);
job.setReducerClass(WordsOfClassCountReducer.class); String input = conf.get("wordsOutput");
String output = conf.get("freqOutput"); FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output)); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } // Mapper
static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { private static DoubleWritable number = new DoubleWritable();
private static Text className = new Text(); // 保存类别中单词总数
private static Map<String, Integer> filemap = new HashMap<String, Integer>(); protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int tot = Integer.parseInt(conf.get("TOTALWORDS")); System.out.println("total = " + total);
System.out.println("tot = " + tot); // 输入的格式如下:
// ALB weekend 1
// ALB weeks 3
Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
// Map<String, Map<String, Double>> priorMap = new HashMap<String,
// Map<String, Double>>(); // 保存每个单词出现的概率 String[] temp = value.toString().split("\t");
// 先将数据存到baseMap中
if (temp.length == 3) {
// 文件夹名类别名
if (baseMap.containsKey(temp[0])) {
baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
} else {
Map<String, Integer> oneMap = new HashMap<String, Integer>();
oneMap.put(temp[1], Integer.parseInt(temp[2]));
baseMap.put(temp[0], oneMap);
} } // 读取数据完毕,全部保存在baseMap中 int allWordsInClass = 0; for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
allWordsInClass = filemap.get(entries.getKey());
for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
double p = (entry.getValue() + 1.0) / (allWordsInClass + tot); className.set(entries.getKey() + "\t" + entry.getKey());
number.set(p);
LOG.info("------>p = " + p);
mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件 // context.write(className, number);
}
} } //最后计算类别中不存在单词的概率,每个类别都是一个常数
protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException { Configuration conf = context.getConfiguration();
int tot = Integer.parseInt(conf.get("TOTALWORDS"));
for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别 double notFind = (1.0) / (entry.getValue() + tot);
number.set(notFind);
mos.write(new Text(entry.getKey()), number, "_notFound" + "\\" +"notFound"); }
mos.close();
} protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs<Text, DoubleWritable>(context);
String filePath = conf.get("wordsInClassPath");
FileSystem fs = FileSystem.get(URI.create(filePath), conf);
FSDataInputStream inputStream = fs.open(new Path(filePath));
BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
String strLine = null;
while ((strLine = buffer.readLine()) != null) {
String[] temp = strLine.split("\t");
filemap.put(temp[0], Integer.parseInt(temp[1]));
}
} } // Reducer
static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { // result 表示每个文件里面单词个数
DoubleWritable result = new DoubleWritable();
// Configuration conf = new Configuration();
// int total = conf.getInt("TOTALWORDS", 1); protected void reduce(Text key, Iterable<DoubleWritable> values,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException { double sum = 0L;
for (DoubleWritable value : values) {
sum += value.get();
}
result.set(sum); context.write(key, result);
} } }

两个小工具

package org.wordCount;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; public class MyUtils { // 获取文件夹下面二级文件夹路径的方法
static List<Path> getSecondDir(Configuration conf, String folder) throws Exception {
// System.out.println("----getSencondDir----" + folder);
Path path = new Path(folder); FileSystem fs = path.getFileSystem(conf);
FileStatus[] stats = fs.listStatus(path);
System.out.println("stats.length = " + stats.length);
List<Path> folderPath = new ArrayList<Path>();
for (FileStatus stat : stats) {
if (stat.isDir()) {
// System.out.println("----stat----" + stat.getPath());
if (fs.listStatus(stat.getPath()).length > 10) { // 筛选出文件数大于10个的类别作为
// 输入路径
folderPath.add(stat.getPath());
}
}
}
// System.out.println("----folderPath----" + folderPath.size());
return folderPath;
} // 判断一个字符串是否含有数字
static boolean hasDigit(String content) { boolean flag = false; Pattern p = Pattern.compile(".*\\d+.*"); Matcher m = p.matcher(content); if (m.matches()) flag = true; return flag; } }

计算概率

package org.wordCount;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class Probability { private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
public static int total = 0;
private static MultipleOutputs<Text, DoubleWritable> mos; // Client
public static void run(Configuration conf) throws Exception { // 读取单词总数,设置到congfiguration中
String totalWordsPath = conf.get("totalWordsPath");
// String wordsInClassPath = conf.get("wordsInClassPath"); // 先读取单词总类别数
FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
String strLine = buffer.readLine();
String[] temp = strLine.split(":");
if (temp.length == 2) {
// temp[0] = TOTALWORDS
conf.set(temp[0], temp[1]);// 设置两个String
} total = Integer.parseInt(conf.get("TOTALWORDS"));
LOG.info("------>total = " + total); System.out.println("total ==== " + total); Job job = new Job(conf, "file count"); job.setJarByClass(Probability.class); job.setMapperClass(WordsOfClassCountMapper.class);
job.setReducerClass(WordsOfClassCountReducer.class); String input = conf.get("wordsOutput");
String output = conf.get("freqOutput"); FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output)); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } // Mapper
static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { private static DoubleWritable number = new DoubleWritable();
private static Text className = new Text(); // 保存类别中单词总数
private static Map<String, Integer> filemap = new HashMap<String, Integer>(); protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int tot = Integer.parseInt(conf.get("TOTALWORDS")); System.out.println("total = " + total);
System.out.println("tot = " + tot); // 输入的格式如下:
// ALB weekend 1
// ALB weeks 3
Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
// Map<String, Map<String, Double>> priorMap = new HashMap<String,
// Map<String, Double>>(); // 保存每个单词出现的概率 String[] temp = value.toString().split("\t");
// 先将数据存到baseMap中
if (temp.length == 3) {
// 文件夹名类别名
if (baseMap.containsKey(temp[0])) {
baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
} else {
Map<String, Integer> oneMap = new HashMap<String, Integer>();
oneMap.put(temp[1], Integer.parseInt(temp[2]));
baseMap.put(temp[0], oneMap);
} } // 读取数据完毕,全部保存在baseMap中 int allWordsInClass = 0; for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
allWordsInClass = filemap.get(entries.getKey());
for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
double p = (entry.getValue() + 1.0) / (allWordsInClass + tot); className.set(entries.getKey() + "\t" + entry.getKey());
number.set(p);
LOG.info("------>p = " + p);
mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件 // context.write(className, number);
}
} } //最后计算类别中不存在单词的概率,每个类别都是一个常数
protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException { Configuration conf = context.getConfiguration();
int tot = Integer.parseInt(conf.get("TOTALWORDS"));
for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别 double notFind = (1.0) / (entry.getValue() + tot);
number.set(notFind);
mos.write(new Text(entry.getKey()), number, "_notFound" + "\\" +"notFound"); }
mos.close();
} protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs<Text, DoubleWritable>(context);
String filePath = conf.get("wordsInClassPath");
FileSystem fs = FileSystem.get(URI.create(filePath), conf);
FSDataInputStream inputStream = fs.open(new Path(filePath));
BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
String strLine = null;
while ((strLine = buffer.readLine()) != null) {
String[] temp = strLine.split("\t");
filemap.put(temp[0], Integer.parseInt(temp[1]));
}
} } // Reducer
static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { // result 表示每个文件里面单词个数
DoubleWritable result = new DoubleWritable();
// Configuration conf = new Configuration();
// int total = conf.getInt("TOTALWORDS", 1); protected void reduce(Text key, Iterable<DoubleWritable> values,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException { double sum = 0L;
for (DoubleWritable value : values) {
sum += value.get();
}
result.set(sum); context.write(key, result);
} } }

预测

package org.wordCount;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; public class Probability { private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
public static int total = 0;
private static MultipleOutputs<Text, DoubleWritable> mos; // Client
public static void run(Configuration conf) throws Exception { // 读取单词总数,设置到congfiguration中
String totalWordsPath = conf.get("totalWordsPath");
// String wordsInClassPath = conf.get("wordsInClassPath"); // 先读取单词总类别数
FileSystem fs = FileSystem.get(URI.create(totalWordsPath), conf);
FSDataInputStream inputStream = fs.open(new Path(totalWordsPath));
BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
String strLine = buffer.readLine();
String[] temp = strLine.split(":");
if (temp.length == 2) {
// temp[0] = TOTALWORDS
conf.set(temp[0], temp[1]);// 设置两个String
} total = Integer.parseInt(conf.get("TOTALWORDS"));
LOG.info("------>total = " + total); System.out.println("total ==== " + total); Job job = new Job(conf, "file count"); job.setJarByClass(Probability.class); job.setMapperClass(WordsOfClassCountMapper.class);
job.setReducerClass(WordsOfClassCountReducer.class); String input = conf.get("wordsOutput");
String output = conf.get("freqOutput"); FileInputFormat.addInputPath(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output)); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } // Mapper
static class WordsOfClassCountMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { private static DoubleWritable number = new DoubleWritable();
private static Text className = new Text(); // 保存类别中单词总数
private static Map<String, Integer> filemap = new HashMap<String, Integer>(); protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int tot = Integer.parseInt(conf.get("TOTALWORDS")); System.out.println("total = " + total);
System.out.println("tot = " + tot); // 输入的格式如下:
// ALB weekend 1
// ALB weeks 3
Map<String, Map<String, Integer>> baseMap = new HashMap<String, Map<String, Integer>>(); // 保存基础数据
// Map<String, Map<String, Double>> priorMap = new HashMap<String,
// Map<String, Double>>(); // 保存每个单词出现的概率 String[] temp = value.toString().split("\t");
// 先将数据存到baseMap中
if (temp.length == 3) {
// 文件夹名类别名
if (baseMap.containsKey(temp[0])) {
baseMap.get(temp[0]).put(temp[1], Integer.parseInt(temp[2]));
} else {
Map<String, Integer> oneMap = new HashMap<String, Integer>();
oneMap.put(temp[1], Integer.parseInt(temp[2]));
baseMap.put(temp[0], oneMap);
} } // 读取数据完毕,全部保存在baseMap中 int allWordsInClass = 0; for (Map.Entry<String, Map<String, Integer>> entries : baseMap.entrySet()) { // 遍历类别
allWordsInClass = filemap.get(entries.getKey());
for (Map.Entry<String, Integer> entry : entries.getValue().entrySet()) { // 遍历类别中的单词词频求概率
double p = (entry.getValue() + 1.0) / (allWordsInClass + tot); className.set(entries.getKey() + "\t" + entry.getKey());
number.set(p);
LOG.info("------>p = " + p);
mos.write(new Text(entry.getKey()), number, entries.getKey() /*+ "\\" + entries.getKey()*/);//最后一个参数是为了生成文件夹对应的文件 // context.write(className, number);
}
} } //最后计算类别中不存在单词的概率,每个类别都是一个常数
protected void cleanup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException { Configuration conf = context.getConfiguration();
int tot = Integer.parseInt(conf.get("TOTALWORDS"));
for (Map.Entry<String, Integer> entry : filemap.entrySet()) { // 遍历类别 double notFind = (1.0) / (entry.getValue() + tot);
number.set(notFind);
mos.write(new Text(entry.getKey()), number, "_notFound" + "\\" +"notFound"); }
mos.close();
} protected void setup(Mapper<LongWritable, Text, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = context.getConfiguration();
mos = new MultipleOutputs<Text, DoubleWritable>(context);
String filePath = conf.get("wordsInClassPath");
FileSystem fs = FileSystem.get(URI.create(filePath), conf);
FSDataInputStream inputStream = fs.open(new Path(filePath));
BufferedReader buffer = new BufferedReader(new InputStreamReader(inputStream));
String strLine = null;
while ((strLine = buffer.readLine()) != null) {
String[] temp = strLine.split("\t");
filemap.put(temp[0], Integer.parseInt(temp[1]));
}
} } // Reducer
static class WordsOfClassCountReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { // result 表示每个文件里面单词个数
DoubleWritable result = new DoubleWritable();
// Configuration conf = new Configuration();
// int total = conf.getInt("TOTALWORDS", 1); protected void reduce(Text key, Iterable<DoubleWritable> values,
Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException { double sum = 0L;
for (DoubleWritable value : values) {
sum += value.get();
}
result.set(sum); context.write(key, result);
} } }

预测的inputformat

package org.wordCount.predict;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeFileInputFormat extends FileInputFormat<LongWritable, Text>{ @Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
} @Override
protected boolean isSplitable(JobContext context, Path filename) { return false;
} } class WholeFileRecordReader extends RecordReader<LongWritable, Text>{ private FileSplit fileSplit; //保存输入的分片,他将被转换成一条<key, value>记录
private Configuration conf; //配置对象
private Text value = new Text();//
private LongWritable key = new LongWritable(); //key对象,为空
private boolean processed = false; //布尔变量记录记录是否被处理过 @Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit)split; //将输入分片强制转换成fileSplit
this.conf = context.getConfiguration(); } @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed){
byte[] contents = new byte[(int)fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try{
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
}finally{
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
} @Override
public LongWritable getCurrentKey() throws IOException, InterruptedException { return key;
} @Override
public Text getCurrentValue() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return value;
} @Override
public float getProgress() throws IOException, InterruptedException { return processed ? 1.0f : 0.0f;
} @Override
public void close() throws IOException {
// TODO Auto-generated method stub } }

最新文章

  1. 关于分开编写多个LaTeX文件的一点微小的总结
  2. PV、UV、IP之间的区别与联系
  3. Subset---poj3977(折半枚举+二分查找)
  4. IOS使用Asyncsocket进行socket编程
  5. toB的产品经理和toc产品经理区别
  6. hadoop集群默认配置和常用配置【转】
  7. Codeforces Round #313 (Div. 2) B. Gerald is into Art 水题
  8. jenkis编译报错:需要class,interface或enum
  9. C语言-07其它相关
  10. vb串口通信界面
  11. BZOJ 2875 随机数生成器
  12. jQuery实现拖动布局并将排序结果保存到数据库
  13. 转 excel表怎么自动分列
  14. Windows下memcached的安装配置
  15. 节点操作,节点属性的操作及DOM event事件
  16. JAVA_SE基础——18.方法的递归
  17. Docker系列教程01-Centos7安装新版Docker教程(10步)
  18. java web 项目打包(war 包)并部署
  19. C#中Request.ServerVariables详细说明及代理
  20. [LeetCode&amp;Python] Problem 860. Lemonade Change

热门文章

  1. vb asp.net的一些属性值
  2. classpath获取--getResource()
  3. JS总结之一:字符串的调用方法
  4. Lambda表达式介绍
  5. ECOS高可用集群
  6. and的用法(&amp;)
  7. java memcache应用
  8. 1.4 测试各阶段(单元、集成、系统 、Alpha、Beta、验收)
  9. 【dp】 比较经典的dp poj 1160
  10. servlet中路径的获取