Yarn的Tool接口案例

Tool接口环境准备

之前写wordcount里通过命令行传入的参数来获取输入路径与输出路径。执行命令

[ranan@hadoop102 hadoop-3.1.3]$ hadoop jar wc.jar
com.ranan.mapreduce.wordcount2.WordCountDriver /input
/output1

期望可以动态传参,结果报错,被人误是第一个输出参数

[rana@hadoop102 hadoop-3.1.3]$ hadoop jar wc.jar
com.rana.mapreduce.wordcount2.WordCountDriver -
D mapreduce.job.queuename=root.test /input /output2

需求:自己写的程序也可以动态修改参数。编写Yarn的Tool接口

1 新建Maven项目YarnDemo

修改Maven设置

修改pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion
<groupId>com.ranan</groupId>
<artifactId>YarnDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<!--打包使用的是JDK8-->
<maven.conpiler.source>8</maven.conpiler.source>
<maven.conpiler.target>8</maven.conpiler.target>
</properties> <!-- 新增依赖 -->
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
</project>

注意

打包用的JDK8,hadoop里面是JDK8,但是本地安装的是JDK11、Maven使用的是JDK11。

所以执行命令的时候会报错。

hadoop3.x目前只支持jdk1.8。修改版本到JDK8。

修改Java编译器

创建包名

编写代码

使用ToolRunner的方法运行

WordCount.class里编写Mapper,Reducer及部分驱动代码

driver驱动里面对conf进行设置

WordCount.class

package com.ranran.yarn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool; import java.io.IOException; /**
* @author ranan
* @create 2021-11-03 20:13
*/
public class WordCount implements Tool {
//配置
private Configuration conf;
//核心驱动(conf 需要传入,不能直接new)
public int run(String[] args) throws Exception {
Job job = Job.getInstance(conf);
//设置jar包驱动
job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WorCountReducer.class); job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1])); return job.waitForCompletion(true)?0:1;
}
//conf的set方法,用于driver驱动设置conf配置信息
public void setConf(Configuration configuration) {
this.conf = conf;
}
//get方法
public Configuration getConf() {
return conf;
}
//mapper
public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
private Text outk = new Text();
private IntWritable outV = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//切割
String[] words = line.split(" ");
//循环遍历写出
for (String word : words) {
outk.set(word);
context.write(outk,outV);
}
}
}
//reduce
public static class WorCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values){
sum +=value.get();
} context.write(key,new IntWritable(sum));
}
}
}

WordCountDriver.class

起过滤效果

package com.ranran.yarn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import java.util.Arrays; /**
* @author ranan
* @create 2021-11-03 21:24
*/
public class WordCountDriver { private static Tool tool; //处理Tool接口相关
public static void main(String[] args) throws Exception {
//实例化配置信息
Configuration conf = new Configuration(); /*
命令行动态传过来的参数进行处理:hadoop jar wc.jar com.rana.mapreduce.wordcount2.WordCountDriver -D mapreduce.job.queuename=root.test /input /output2 --> hadoop jar wc.jar com.ranan.mapreduce.wordcount2.WordCountDriver /input /output1
*/ //合法性校验 args[0]:wordcount -D 这部分
switch (args[0]){
//执行wordcount程序
case "wordcount":
//WordCount实现了Tool接口
tool = new WordCount();
break;
default:
throw new RuntimeException("no such tool" + args[0]);
}
//执行程序,第一个参数配置信息,第二个tool接口,第三个接口传参
//只传输入输出路径到tool
int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
System.exit(run);
}
}

打包jar上传到集群

[ranan@hadoop102 hadoop-3.1.3]$ rz

在 HDFS 上准备输入文件,假设为/input 目录,向集群提交该 Jar 包,先看两个参数的是否能运行。

com.ranan.yarn.WordCountDriver驱动的全类名

[ranan@hadoop102 hadoop-3.1.3]$ yarn jar YarnDemo-1.0-SNAPSHOT.jar com.ranran.yarn.WordCountDriver wordcount /input /output

注意此时提交的 3 个参数,第一个用于生成特定的 Tool,第二个和第三个为输入输出目录。此时如果我们希望加入设置参数,可以在 wordcount 后面添加参数,例如:

[ranan@hadoop1022 hadoop-3.1.3]$ yarn jar YarnDemo-1.0-SNAPSHOT.jar wordcount -Dmapreduce.job.queuename=root.test /input /output1

注:以上操作全部做完过后,快照回去或者手动将配置文件修改成之前的状态,因为本身资源就不够,分成了这么多,不方便以后测试

最新文章

  1. 同一台电脑上多个myeclipse破解的问题
  2. js闭包演示
  3. 【UISegmentedControl】- &#160;分段控件
  4. AppCan JSSDK模块扩展
  5. WPF 基础面试题及答案(一)
  6. Windows7下U盘安装Ubuntu14.04双系统
  7. Slow HTTP Denial of Service Attack
  8. 项目中Service层的写法
  9. 百度之星资格赛,hdu 4825 XOR SUM
  10. 关于埃博拉(Ebola)基础研究病毒
  11. Ubuntu下安装Intellij IDEA和PyCharm
  12. viewpager翻页的窗帘效果动画
  13. [SQL]LeetCode577.员工奖金 | Employee Bonus
  14. selenium在scrapy中的使用、UA池、IP池的构建
  15. Spark 灰度发布在十万级节点上的成功实践 CI CD
  16. Web browse的发展演变
  17. HIVE配置错误信息
  18. Hibernate框架 主配置文件(Hibernate.cfg.xml)基本
  19. atexit函数和exit函数的理解
  20. informix中的时间计算

热门文章

  1. Jquery校验中国身份证号码是否正确
  2. hdu 2586 How far away? (LCA模板)
  3. C语言图书管理借阅系统——ncurses库的使用
  4. Fiddler抓包工具学习及使用
  5. PTA 7-3 Windows消息队列 (25分)
  6. django test, app aren&#39;t loaded yet
  7. Django 小实例S1 简易学生选课管理系统 4 实现登录页面
  8. 详解电子表格中的json数据:序列化与反序列化
  9. [loj3069]整点计数
  10. [atARC071F]Infinite Sequence