1. 背景

近日在一个Hadoop项目中使用MultipleInputs增加多输入文件时,发现相同路径仅会加载一次,导致后续的统计任务严重失真。本博文旨在记录异常的排查及解决方案。

2. 情景重现

(1) 准备简版的输入文件test,文件内容为"i am ws",输入的HDFS路径为/work/justTest/test

(2) 源码信息如下,主要是wordCount实现,其中/work/justTest/test作为输入路径,被输入两次:

 package com.ws.test;

 import java.io.IOException;

 import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MutilInputTest { public static void main(String[] args) {
testMultiInputs();
} /**
* 测试方法
*/
public static void testMultiInputs() { Configuration conf = new Configuration(); conf.set("mapreduce.job.queuename", "default");
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
conf.setInt("mapreduce.task.timeout",0);
conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f); String input = "/work/justTest/test";
try {
createMultiInputsTestJob(conf,
input , Test1Mapper.class,
input , Test2Mapper.class,
"/work/justTest/temp", 2, TestReduce.class)
.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
} /**
* 任务构建
* @param conf
* @param input1
* @param mapper1
* @param input2
* @param mapper2
* @param outputDir
* @param reduceNum
* @param reducer
* @return
*/
static Job createMultiInputsTestJob(Configuration conf,
String input1, Class<? extends Mapper> mapper1,
String input2, Class<? extends Mapper> mapper2,
String outputDir,
int reduceNum, Class<? extends Reducer> reducer) {
try {
Job job = new Job(conf);
job.setJobName("MultiInputsTest");
job.setJarByClass(MutilInputTest.class); job.setNumReduceTasks(reduceNum);
job.setReducerClass(reducer); job.setInputFormatClass(TextInputFormat.class);
MultipleInputs.addInputPath(job, new Path(input1), TextInputFormat.class, mapper1);
MultipleInputs.addInputPath(job, new Path(input2), TextInputFormat.class, mapper2); Path outputPath = new Path(outputDir);
outputPath.getFileSystem(conf).delete(outputPath, true); job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, outputPath); job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class); return job;
} catch (Exception e) {
return null;
}
} /**
* Mapper类
*
*/
static class Test1Mapper extends Mapper<LongWritable, Text, Text, Text> {
Context context; String type; @Override
protected void setup(Context context) throws IOException,
InterruptedException {
this.context = context;
this.type = getDataType();
super.setup(context);
} @Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] words = value.toString().split("");
for(String word : words){
context.getCounter(this.type+"_map_total", "input").increment(1);
context.write(new Text(word), new Text(""));
}
} protected String getDataType(){
return "test1";
}
} /**
* Mapper类继承
*
*/
static class Test2Mapper extends Test1Mapper{
@Override
protected String getDataType() {
return "test2";
}
} /**
* Reduce类
*
*/
static class TestReduce extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int total = 0;
while(values.iterator().hasNext()){
total += Integer.parseInt(values.iterator().next().toString());
}
context.getCounter("reduce_total", key.toString()+"_"+total).increment(1);
}
} }

(3) 任务执行记录如下:

 18/08/12 21:33:57 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
18/08/12 21:33:58 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/08/12 21:33:59 INFO input.FileInputFormat: Total input paths to process : 1
18/08/12 21:33:59 INFO mapreduce.JobSubmitter: number of splits:1
18/08/12 21:34:00 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39623
18/08/12 21:34:00 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39623
18/08/12 21:34:00 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39623/
18/08/12 21:34:00 INFO mapreduce.Job: Running job: job_1527582903778_39623
18/08/12 21:34:06 INFO mapreduce.Job: Job job_1527582903778_39623 running in uber mode : false
18/08/12 21:34:06 INFO mapreduce.Job: map 0% reduce 0%
18/08/12 21:34:12 INFO mapreduce.Job: map 100% reduce 0%
18/08/12 21:34:17 INFO mapreduce.Job: map 100% reduce 50%
18/08/12 21:34:22 INFO mapreduce.Job: map 100% reduce 100%
18/08/12 21:34:22 INFO mapreduce.Job: Job job_1527582903778_39623 completed successfully
18/08/12 21:34:22 INFO mapreduce.Job: Counters: 53
File System Counters
FILE: Number of bytes read=64
FILE: Number of bytes written=271730
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=263
HDFS: Number of bytes written=0
HDFS: Number of read operations=9
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Job Counters
Launched map tasks=1
Launched reduce tasks=2
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=14760
Total time spent by all reduces in occupied slots (ms)=49344
Total time spent by all map tasks (ms)=3690
Total time spent by all reduce tasks (ms)=6168
Total vcore-seconds taken by all map tasks=3690
Total vcore-seconds taken by all reduce tasks=6168
Total megabyte-seconds taken by all map tasks=15114240
Total megabyte-seconds taken by all reduce tasks=50528256
Map-Reduce Framework
Map input records=1
Map output records=3
Map output bytes=14
Map output materialized bytes=48
Input split bytes=255
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=48
Reduce input records=3
Reduce output records=0
Spilled Records=6
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=183
CPU time spent (ms)=3150
Physical memory (bytes) snapshot=1009094656
Virtual memory (bytes) snapshot=24295927808
Total committed heap usage (bytes)=2306867200
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
reduce_total
am_1=1
i_1=1
ws_1=1
test2_map_total
input=3

从日志中可以看出: 1)第三行显示"需要处理的总输入路径为1",2) map阶段的计数器显示总共的输入词数为3,且仅有test2相关计数,reduce阶段的计数器显示单词个数均为1。

由此,会引发疑问,为什么明明输入两个相同文件,hadoop仅检测到只有一个文件呢?

3. 原因排查

既然到Map、Reduce时的文件已经是仅有一个,因此需要在创建任务的时候进行排查。遂查看了与输入路径相关的MultipleInputs源码:

 @SuppressWarnings("unchecked")
public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) { addInputPath(job, path, inputFormatClass);
Configuration conf = job.getConfiguration();
String mapperMapping = path.toString() + ";" + mapperClass.getName();
String mappers = conf.get(DIR_MAPPERS);
conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
: mappers + "," + mapperMapping); job.setMapperClass(DelegatingMapper.class);
} public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass) {
String inputFormatMapping = path.toString() + ";"
+ inputFormatClass.getName();
Configuration conf = job.getConfiguration();
String inputFormats = conf.get(DIR_FORMATS);
conf.set(DIR_FORMATS,
inputFormats == null ? inputFormatMapping : inputFormats + ","
+ inputFormatMapping); job.setInputFormatClass(DelegatingInputFormat.class);
}

通过源码可以观察到,在设置DIR_FORMATS和DIR_MAPPERS属性时,均以"输入路径;文件格式类名或Mapper类名"的格式进行创建,而在Job运行前,对于相同数据路径会仅保留其中一个,且若传入路径相同,则仅保存最后一个调用MultipleInputs.addInputPath对应的配置信息。因此相应的解决方案是传入不同的路径。

4. 解决方案

将相同的输入内容文件,作为不同的路径传入。

(1) 不同之处的代码如下:

 public static void testMultiInputs() {

 		Configuration conf = new Configuration();

 		conf.set("mapreduce.job.queuename", "default");
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 0.995f);
conf.setInt("mapreduce.task.timeout",0);
conf.setFloat("mapreduce.reduce.shuffle.input.buffer.percent",0.40f); String input = "/work/justTest/";
try {
createMultiInputsTestJob(conf,
input+"test1", Test1Mapper.class,
input+"test2", Test2Mapper.class,
input+"/temp", 2, TestReduce.class)
.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}

(2) 运行日志如下所示:

 18/08/12 21:58:15 INFO client.RMProxy: Connecting to ResourceManager at bd-001/192.168.86.41:8032
18/08/12 21:58:15 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
18/08/12 21:58:16 INFO input.FileInputFormat: Total input paths to process : 1
18/08/12 21:58:16 INFO mapreduce.JobSubmitter: number of splits:2
18/08/12 21:58:17 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1527582903778_39628
18/08/12 21:58:17 INFO impl.YarnClientImpl: Submitted application application_1527582903778_39628
18/08/12 21:58:17 INFO mapreduce.Job: The url to track the job: http://bd-001:8088/proxy/application_1527582903778_39628/
18/08/12 21:58:17 INFO mapreduce.Job: Running job: job_1527582903778_39628
18/08/12 21:58:22 INFO mapreduce.Job: Job job_1527582903778_39628 running in uber mode : false
18/08/12 21:58:22 INFO mapreduce.Job: map 0% reduce 0%
18/08/12 21:58:28 INFO mapreduce.Job: map 100% reduce 0%
18/08/12 21:58:34 INFO mapreduce.Job: map 100% reduce 100%
18/08/12 21:58:35 INFO mapreduce.Job: Job job_1527582903778_39628 completed successfully
18/08/12 21:58:35 INFO mapreduce.Job: Counters: 55
File System Counters
FILE: Number of bytes read=66
FILE: Number of bytes written=362388
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=528
HDFS: Number of bytes written=0
HDFS: Number of read operations=12
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Job Counters
Launched map tasks=2
Launched reduce tasks=2
Data-local map tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=27332
Total time spent by all reduces in occupied slots (ms)=59792
Total time spent by all map tasks (ms)=6833
Total time spent by all reduce tasks (ms)=7474
Total vcore-seconds taken by all map tasks=6833
Total vcore-seconds taken by all reduce tasks=7474
Total megabyte-seconds taken by all map tasks=27987968
Total megabyte-seconds taken by all reduce tasks=61227008
Map-Reduce Framework
Map input records=2
Map output records=6
Map output bytes=28
Map output materialized bytes=96
Input split bytes=512
Combine input records=0
Combine output records=0
Reduce input groups=3
Reduce shuffle bytes=96
Reduce input records=6
Reduce output records=0
Spilled Records=12
Shuffled Maps =4
Failed Shuffles=0
Merged Map outputs=4
GC time elapsed (ms)=272
CPU time spent (ms)=4440
Physical memory (bytes) snapshot=1346195456
Virtual memory (bytes) snapshot=29357146112
Total committed heap usage (bytes)=3084910592
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
reduce_total
am_2=1
i_2=1
ws_2=1
test1_map_total
input=3
test2_map_total
input=3

(3) 通过日志可以看到,运行结果符合原始目标。

最新文章

  1. oracle_dblink配置
  2. Django搭建及源码分析(二)
  3. 导入 github 步骤
  4. 【转】Android SDK Manager 更新方法
  5. Request Connection: Remote Server @ 192.229.145.200:80
  6. php cli模式没有加载php.ini
  7. [Android学习笔记]Android向下兼
  8. freemarker错误七
  9. Chrome不支持showModalDialog模态对话框和无法返回returnValue的问题
  10. Android的ListView
  11. AOP 切面编程------JoinPoint ---- log日志
  12. webpack入门宝典
  13. 问题:怎么把mysql的系统时间调整为电脑的时间?(已解决)
  14. python3 第二十九章 - 内置函数之tuple相关
  15. 设计模式のFlyweight(享元模式)----结构模式
  16. Let&#39;s Encrypt免费泛域名证书申请
  17. hash 位运算 练习
  18. 开发环境转Mac FAQ
  19. [codeforces round#475 div2 ][C Alternating Sum ]
  20. Tomcat 基础

热门文章

  1. ipmitool查询服务器功耗
  2. mysql字段名与关键字冲突(near &quot;to&quot;:syntax error)
  3. metasploit 读书笔记-EXPLOITATION
  4. Vue.js 的几点总结Watchers/router key/render
  5. oracle多用户并发及事务处理
  6. ghj1222的代码规范
  7. 洛谷P1894 [USACO4.2]完美的牛栏The Perfect Stall
  8. 关于thinkphp5中数据库分组查询group
  9. i++操作非原子的验证代码
  10. 使用git将本地代码提交到码云上去