输入源数据例子:

Source1-0001
Source2-0002
Source1-0003
Source2-0004
Source1-0005
Source2-0006
Source3-0007
Source3-0008

描写叙述:

  • Source1开头的数据属于集合A。
  • Source2开头的数据属于集合B;
  • Source3开头的数据即属于集合A,也属于集合B。

输出要求:

  • 完整保留集合A数据(包括Source1、Source3开头数据)
  • 完整保留集合B数据(包括Source2、Source3开头数据)

程序实现:

import java.io.IOException;
import java.util.List;
import java.util.Map; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.mahout.common.AbstractJob; import com.yhd.common.util.HadoopUtil; /**
* AbstractJob 是mahout的Job模板,能够不使用该模板,
* 实则的核心部分在于MultipleOutputs部分
*
* @author ouyangyewei
*
*/
public class TestMultipleOutputsJob extends AbstractJob {
@Override
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption(); Map<String, List<String>> parseArgs = parseArguments(args);
if(parseArgs==null){
return -1;
} HadoopUtil.delete(getConf(), getOutputPath()); Configuration conf = new Configuration();
conf.setInt("mapred.reduce.tasks", 4);
conf.set("mapred.job.queue.name", "pms");
conf.set("mapred.child.java.opts", "-Xmx3072m");
conf.set("mapreduce.reduce.shuffle.memory.limit.percent", "0.05"); Job job = new Job(new Configuration(conf));
job.setJobName("TestMultipleOutputsJob");
job.setJarByClass(TestMultipleOutputsJob.class);
job.setMapperClass(MultipleMapper.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, this.getInputPath());
FileOutputFormat.setOutputPath(job, this.getOutputPath()); /** 输出文件格式将为:Source1-m-**** */
MultipleOutputs.addNamedOutput(job, "Source1", TextOutputFormat.class, Text.class, Text.class);
/** 输出文件格式将为:Source2-m-**** */
MultipleOutputs.addNamedOutput(job, "Source2", TextOutputFormat.class, Text.class, Text.class); boolean suceeded = job.waitForCompletion(true);
if(!suceeded) {
return -1;
}
return 0;
} /**
*
* @author ouyangyewei
*
*/
public static class MultipleMapper extends Mapper<LongWritable, Text, Text, Text> {
private MultipleOutputs<Text, Text> mos = null; @Override
protected void setup(Context context
) throws IOException, InterruptedException {
mos = new MultipleOutputs<Text, Text>(context);
} public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] tokenizer = line.split("-"); if (tokenizer[0].equals("Source1")) {
/** 集合A的数据 */
mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
} else if (tokenizer[0].equals("Source2")) {
/** 集合B的数据 */
mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
} /** 集合A交集合B的数据 */
if (tokenizer[0].equals("Source3")) {
mos.write("Source1", new Text(tokenizer[0]), tokenizer[1]);
mos.write("Source2", new Text(tokenizer[0]), tokenizer[1]);
}
} protected void cleanup(Context context
) throws IOException, InterruptedException {
mos.close();
}
} /**
* @param args
*/
public static void main(String[] args) {
System.setProperty("javax.xml.parsers.DocumentBuilderFactory",
"com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
System.setProperty("javax.xml.parsers.SAXParserFactory",
"com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl"); TestMultipleOutputsJob instance = new TestMultipleOutputsJob();
try {
instance.run(args);
} catch (Exception e) {
e.printStackTrace();
}
}
}

使用hadoop jar命令调度执行jar包代码:

hadoop jar bigdata-datamining-1.0-user-trace-jar-with-dependencies.jar com.yhd.datamining.data.usertrack.offline.job.mapred.TestMultipleOutputsJob \
--input /user/pms/workspace/ouyangyewei/testMultipleOutputs \
--output /user/pms/workspace/ouyangyewei/testMultipleOutputs/output

程序执行以后,输出的结果:

[pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testMultipleOutputs/output
Found 4 items
-rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
-rw-r--r-- 3 pms pms 65 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
-rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/_SUCCESS
-rw-r--r-- 3 pms pms 0 2014-12-16 09:18 /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/part-m-00000 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source1-m-00000
Source1 0001
Source1 0003
Source1 0005
Source3 0007
Source3 0008 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei]
$hadoop fs -cat /user/pms/workspace/ouyangyewei/testMultipleOutputs/output/Source2-m-00000
Source2 0002
Source2 0004
Source2 0006
Source3 0007
Source3 0008

补充于2014-12-18:

这样的方式的缺陷是会产生非常多类似Source1或Source2开头的子文件,一种非常好的方式就是指定baseOutputPath,将Source1开头的文件放在同一个文件夹中管理

对上述代码进行改写实现文件夹管理:

public void map(LongWritable key, Text value, Context context
) throws IOException, InterruptedException {
String line = value.toString();
String[] tokenizer = line.split("-"); if (tokenizer[0].equals("Source1")) {
/** 集合A的数据 */
mos.write("Source1",
new Text(tokenizer[0]),
tokenizer[1],
"Source1/part");
} else if (tokenizer[0].equals("Source2")) {
/** 集合B的数据 */
mos.write("Source2",
new Text(tokenizer[0]),
tokenizer[1],
"Source2/part");
} /** 集合A交集合B的数据 */
if (tokenizer[0].equals("Source3")) {
mos.write("Source1",
new Text(tokenizer[0]),
tokenizer[1],
"Source1/part"); mos.write("Source2",
new Text(tokenizer[0]),
tokenizer[1],
"Source2/part");
}
}

程序执行以后,输出的结果:

$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output
Found 4 items
-rw-r--r-- 3 pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/_SUCCESS
-rw-r--r-- 3 pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/part-r-00000
drwxr-xr-x - pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
drwxr-xr-x - pms pms 0 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1
Found 1 items
-rw-r--r-- 3 pms pms 65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source1/part-r-00000 [pms@yhd-jqhadoop39 /home/pms/workspace/ouyangyewei/testUsertrack]
$hadoop fs -ls /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2
Found 1 items
-rw-r--r-- 3 pms pms 65 2014-12-18 14:11 /user/pms/workspace/ouyangyewei/testUsertrack/job1Output/Source2/part-r-00000

能够參考下:http://dirlt.com/mapred.html

最新文章

  1. C#数字日期装换为中文日期
  2. 【Java每日一题】20161104
  3. iOS 开发技巧总结
  4. GetProcAddress 宏
  5. Mobile testing基础之签名
  6. 解决全站ie6下PNG图片不透明问题只要几行代码
  7. MongoDB--Getting Started with Java Driver
  8. 推荐系列:最小与最大[DP+余式定理]
  9. 成为java高手的八大条件
  10. kafka原理和实践(一)原理:10分钟入门
  11. JAVA学习:面向对象编程
  12. C# 枚举在项目中使用心得
  13. ROS_Kinetic_x 基於ROS和Gazebo的RoboCup中型組仿真系統(多機器人協作)
  14. PHP网站记录
  15. 软件测试_测试工具_LoadRunner
  16. [转载] PNG优化插件:TinyPNG for Photoshop CC
  17. setTranslatesAutoresizingMaskIntoConstraints
  18. java入门——面向对象
  19. win8.1 pro-64位下安装配置MinGW—64位
  20. Web前端开发最佳实践系列文章汇总

热门文章

  1. TDD开发案例
  2. POJ 3260 The Fewest Coins(背包问题)
  3. CodeForces 786B Legacy(线段树优化建图+最短路)
  4. 什么是EPEL 及 Centos上安装EPEL(转)
  5. iOS数据库的基本使用
  6. 移动端与PHP服务端接口通信流程设计(增强版)
  7. win10 关键错误开始菜单和cortana无法工作 的问题
  8. mipmap 二
  9. javascript专业八级测试答案整理
  10. objc语言的运行时处理