Hadoop【MR开发规范、序列化】

一、MapReduce编程规范

用户编写的程序分成三个部分:Mapper、Reducer和Driver

1.Mapper阶段

(1)用户自定义Mapper要继承Mapper父类

(2)Mapper的输入时K-V对的形式(K-V可自定义)

(3)Mapper的业务逻辑写在map()方法中,要重写父类的map()方法

(4)MapTask进程会对每个输入的K-V调用一次map()方法

2.Reducer阶段

(1)用户自定义Reducer要继承Reducer父类

(2)Reducer的输入数据类型对应Mapper的输出的数据类型,也是K-V

(3)Reducer的业务逻辑写在reduce()方法中,要重写父类的reduce()方法

(4)ReduceTask进程会对一组相同K的K-V调用一次reduce()方法

3.Driver阶段

​ 相当于Yarn集群的客户端,用于提交整个job程序到Yarn集群,提交了封装了mapreduce程序的和相关运行参数的job对象。

二、WordCount案例开发

开发前提 要配置好window本地的开发环境,详情可见:HDFS【hadoop3.1.3 windows开发环境搭建】

需求

求出给定的wc.txt文本文件中统计输出每一个单词出现的总次数

wc.txt文本】

is you am
i have you node
is my love

1. 创建maven工程

2.在pom.xml添加依赖

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>

3.项目的src/main/resources目录下,添加log4j2.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="error" strict="true" name="XMLConfig">
<Appenders>
<!-- 类型名为Console,名称为必须属性 -->
<Appender type="Console" name="STDOUT">
<!-- 布局为PatternLayout的方式,
输出样式为[INFO] [2018-01-22 17:34:01][org.test.Console]I'm here -->
<Layout type="PatternLayout"
pattern="[%p] [%d{yyyy-MM-dd HH:mm:ss}][%c{10}]%m%n" />
</Appender> </Appenders> <Loggers>
<!-- 可加性为false -->
<Logger name="test" level="info" additivity="false">
<AppenderRef ref="STDOUT" />
</Logger> <!-- root loggerConfig设置 -->
<Root level="info">
<AppenderRef ref="STDOUT" />
</Root>
</Loggers> </Configuration>

4.编写Mapper、Reducer、Driver类

  • Mapper类

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import java.io.IOException; /**
    * 1.自定义的类需要继承Mapper
    * 2.Mapper的四个泛型KEYIN, VALUEIN, KEYOUT, VALUEOUT
    * 这四个泛型是两对(K,V)。
    * 第一对 :输入的数据类型
    * KEYIN : 数据的偏移量(一行一行的读取数据用来记录数据读到哪里)
    * VALUEIN :实际读取的具体的一行数据
    * 第二对 :输出的数据类型
    * KEYOUT : 单词
    * VALUEOUT :单词出现的数量(1)
    */
    public class CountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    //输出的key
    private Text outKey = new Text();
    //输出的value
    private IntWritable outValue = new IntWritable(1);
    /**
    * 该方法用来处理具体的业务逻辑
    * @param key 输入数据的KEYIN ,数据的偏移量
    * @param value 输入数据的VALUEIN,实际读取的具体的一行数据
    * @param context 上下文 (在这里用来将数据写出去)
    * @throws IOException
    * @throws InterruptedException
    */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    //1.先将读进来的数据转换成String便于操作
    String line = value.toString();
    //2.切割数据(按照空格切数据)
    String[] words = line.split(" ");
    //3.遍历所有的单词并进行封装(K,V)
    for (String word : words) {
    //给outKey赋值
    outKey.set(word);
    //写数据
    context.write(outKey,outValue);
    }
    }
    }
  • Reducer类

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import java.io.IOException; /**
    * 1.自定义的类需要继承Reducer
    * 2.4个泛型 : KEYIN,VALUEIN,KEYOUT,VALUEOUT
    * 4个泛型实际为两对
    * 第一对 输入的类型 :
    * KEYIN :mapper中输出的key的类型
    * VALUEIN :mapper中输出的value的类型
    * 第二对 输出的类型 :
    * KEYOUT :实际要写出去的数据的key的类型
    * VALUEOUT :实际要写出的数据的value的类型
    */
    public class CountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
    private IntWritable outValue = new IntWritable();//输出的value的类型
    /**
    * 该方法就是具体操作业务逻辑的方法
    * 注意 :一组一组的读取数据。key相同则为一组
    * @param key :单词
    * @param values :相同单词的一组value
    * @param context : 上下文(在这用来将数据写出去)
    * @throws IOException
    * @throws InterruptedException
    */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;//用来累加value值
    //遍历所有的value
    for (IntWritable value : values) {
    //value.get() : 将IntWritable转成基本数据类型
    sum += value.get();
    }
    //封装(K,V)
    outValue.set(sum);
    //将数据写出去
    context.write(key,outValue);
    }
    }
  • Driver类

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import java.io.IOException; /**
    * 本地运行MR任务
    *
    * 驱动类 :1.作为程序的入口 2.进行相关的一些关联 3.一些参数的设置
    */
    public class CountDriver {
    /*
    1.获取配置信息、封装job对象
    2.关联jar,Driver类
    3.关联mapper和reducer
    4.设置mapper的输出的key和value类型
    5.设置最终(reducer)输出的key和value的类型
    6.设置输入输出路径
    7.提交job任务
    */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //1.获取配置信息、封装job对象
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);
    //2.关联jar,Driver类
    job.setJarByClass(CountDriver.class);
    //3.关联mapper和reducer
    job.setMapperClass(CountMapper.class);
    job.setReducerClass(CountReducer.class);
    //4.设置mapper的输出的key和value类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    //5.设置最终(reducer)输出的key和value的类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //6.设置输入输出路径
    //注意 :FileInputFormat导入org.apache.hadoop.mapreduce.lib包
    FileInputFormat.setInputPaths(job,new Path(args[0]));
    //注意 :输出目录必须不存在
    FileOutputFormat.setOutputPath(job,new Path(args[1]));
    //7.提交job任务
    //boolean verbose是否打印进度
    boolean isSuccess = job.waitForCompletion(true);
    //虚拟机退出的状态 :0是正常退出,1非正常退出
    System.exit(isSuccess ? 0 : 1);
    }
    }

5.本地测试

在idea中配置输入参数-args[0]、输出参数-args[1]

跑任务,运行Driver类,查看结果

6.集群测试

1.maven打jar包,需要添加的打包插件依赖

注意:标记红颜色的部分需要替换为自己工程主类

    <build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin </artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 工程主类 -->
<mainClass>com.haowu.WCDriverYarn</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

2.打jar包

将target包下不带依赖的jar-->mapreduce-demo-1.0.SNAPSHOT.jar重命名为wc.jar-->拷贝到hadoop集群

3.启动集群,执行wc.jar

[haowu@hadoop102 ~]$ hadoop jar wc.jar com.haowu.WCDriverYarn /wcinput /wcoutput

运行前

运行后

三、Hadoop序列化

1.序列化概述

2.java、hadoop序列化数据类型对比

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String   Text
Map MapWritable
Array ArrayWritable

3.自定义bean对象实现序列化(Writable)

基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口

具体实现bean对象序列化步骤如下7步:

(1).实现Writable接口

(2).必须有空参构造

public FlowBean() {
super();
}

(3).重写序列化、反序列化方法

注意反序列化的顺序和序列化的顺序完全一致

序列化

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

反序列化

@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

(5).想把结果显示在文件中,需要重写toString(),可用”\t”分开

(6).如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序

@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

4.序列化案例实操

需求:统计每一个手机号耗费的总上行流量、下行流量、总流量

输入数据:phone.txt

数据格式

id	   手机号码		网络ip			  上行流量 下行流量  网络状态码
7 13560436666 120.196.100.99 1116 954 200
15 13682846555 192.168.100.12 1938 2910 200
16 13992314666 192.168.100.13 3008 3720 200
17 13509468723 192.168.100.14 7335 110349 404
18 18390173782 192.168.100.15 9531 2412 200

需求分析:

编写程序

1.编写Bean对象

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; /**
* 使用hadoop序列化框架
* 1.自定义类并实现Writable接口
* 2.重写write和readFields方法
* 3.读时数据的顺序必须和写时数据的顺序相同
*/
public class FlowBean implements Writable {
//上行流量
private long upFlow;
//下行流量
private long downFlow;
//总流量
private long sumFlow; public FlowBean() {
} public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
} public long getUpFlow() {
return upFlow;
} public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
} public long getDownFlow() {
return downFlow;
} public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
} public long getSumFlow() {
return sumFlow;
} public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
} /**
* 当我们通过reducer向外写数据时(对象)实际上是调用toString方法写出toString方法中的字符串
* @return
*/
@Override
public String toString() {
return upFlow + " " + downFlow + " " + sumFlow;
} /**
* 序列化:写
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
//顺序随意,类型不能错
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
} /**
* 反序列化 :读
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
//注意:读取数据的顺序必须和写的顺序相同
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
}

2.编写Mapper类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /**
* outkey : 手机号
* outvalue : FlowBean对象
*/
public class FlowMapper extends Mapper<LongWritable,Text, Text,FlowBean> {
private Text outkey = new Text(); @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//分割数据
String[] phoneInfo = value.toString().split("\t");
//封装K,V
outkey.set(phoneInfo[1]);
//从数组中取出对应的数据,并转成long类型
long upFlow = Long.parseLong(phoneInfo[phoneInfo.length - 3]);
long downFlow = Long.parseLong(phoneInfo[phoneInfo.length - 2]);
//封装value
FlowBean flowBean = new FlowBean(upFlow, downFlow);
//写数据
context.write(outkey,flowBean);
}
}

3.编写Reducer类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException { long upFlow = 0; //累加相同手机号的upflow
long downFlow = 0; //累加相同手机号的downflow
//遍历一组一组的数据
for (FlowBean value : values) {
//取出每一条数据的upflow,downflow并将upflow和downflow分别累加
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
}
//封装K,V
FlowBean outValue = new FlowBean(upFlow, downFlow);
//写出数据
context.write(key,outValue);
}
}

4.编写driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver {
/*
1.获取job对象
2.关联jar
3.关联mapper和reducer
4.设置mapper的输出的key和value类型
5.设置最终(reducer)输出的key和value的类型
6.设置输入输出路径
7.提交job任务
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.获取job对象
Job job = Job.getInstance(new Configuration());
//2.关联jar
job.setJarByClass(FlowDriver.class);
//3.关联mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4.设置mapper的输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//5.设置最终(reducer)输出的key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6.设置输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7.提交job任务
job.waitForCompletion(true); }
}

5.本地测试

在idea中配置输入参数-args[0]、输出参数-args[1]

6.测试结果

13560436666	1116	954	2070
13682846555 1938 2910 4828
11399231466 3008 3720 6728
13509468723 7335 1 7336
18390173782 2412 20 2432

最新文章

  1. c# .net获取随机字符串!
  2. [PCB制作] 1、记录一个简单的电路板的制作过程——四线二项步进电机驱动模块(L6219)
  3. Python自动化之常用模块
  4. Codeforces Beta Round #17 A - Noldbach problem 暴力
  5. js 自带的 filter()方法
  6. tomcat已 .war 包的形式发布项目
  7. Web App时代的缓存机制新思路
  8. 导出多级表头表格到Excel
  9. linux下如何不编译opencv的某些模块
  10. VS 调试Window Server方法
  11. hdoj 1878 欧拉回路(无向图欧拉回路+并查集)
  12. 使用 Redis 统计在线用户人数
  13. python爬取大众点评
  14. Java 非递归实现 二叉树的前中后遍历以及层级遍历
  15. Swift5 语言指南(十八) 可选链接
  16. window下上传文件至linux(windows下如何访问linux)
  17. shell_mysql_ alias 快速启动
  18. a标签打开设置
  19. webGL之three.js入门4--ThreeJS Editor入门篇
  20. Android 使用tomcat搭建HTTP文件下载服务器

热门文章

  1. configure: error: invalid variable name: `&#39;
  2. AtCoder Grand Contest 055题解
  3. 通用 Makefile(及makefile中的notdir,wildcard和patsubst)
  4. linux 内核源代码情景分析——linux 内存管理的基本框架
  5. 深入剖析Redis客户端Jedis的特性和原理
  6. Win powershell执行策略配置
  7. oracle修改CHARACTERSET
  8. 【数据结构&amp;算法】05-线性表之数组
  9. Discovery直播 | 3D“模”术师,还原立体世界——探秘3D建模服务
  10. 有关于ONVIF