一、 为什么javaBean要继承Writable和WritableComparable接口?

1. 如果一个javaBean想要作为MapReduce的key或者value,就一定要实现序列化,因为在Map到Reduce阶段的时候,只能是传输二进制数据,不可能将字符流直接进行RPC传输,

只要一个javabean实现了序列化和反序列化,就可以做为key或者value

最简单的序列化和反序列化就是实现Writable接口

ps:javaBean在作为key的时候有点不同,除了要继承Writable接口还需要实现Comparable接口

因为在shuffle到Reduce阶段的合并阶段,需要根据key对数据进行排序,合并,如果不实现这个接口,运行时会出错

WritableComparable就是Writable接口和java.lang.Comparable<T>的一个子接口,所以将要作为key的javaBean直接继承WritableComparable就可以了

2. java序列化与Writable序列化的比较

2.1 java序列化不够灵活,为了更好的控制序列化的整个流程所以使用Writable

2.2 java序列化不符合序列化的标准,没有做一定的压缩,java序列化首先写类名,然后再是整个类的数据,而且成员对象在序列化中只存引用,成员对象的可以出现的位置很随机,既可以在序列化的对象前,也可以在其后面,这样就对随机访问造成影响,一旦出错,整个后面的序列化就会全部错误

2.3 Java序列化每次序列化都要重新创建对象,内存消耗大,而Writable是可以重用的

二、 实现Writable和WritableComparable的UserBean

代码如下:

package com.qjx.serialize_8_2;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class UserBean implements WritableComparable<UserBean> { private int id;
private String name ;
private String age; public UserBean() {
} public UserBean(int id,String name , String age) {
this.id = id;
this.name = name;
this.age = age;
} @Override
public String toString() {
return this.id + this.name + this.age;
} //反序列化,将输入二进制反序列化为字符流
@Override
public void readFields(DataInput in) throws IOException {
id = in.readInt();
name = in.readUTF();
age = in.readUTF();
} //序列化,将字节转化为二进制输出
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(id);
out.writeUTF(name);
out.writeUTF(age);
} @Override
public int compareTo(UserBean o) {
int thisValue = this.id;
int thatValue = o.id;
return (thisValue < thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
} public int getId() {
return id;
} public void setId(int id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getAge() {
return age;
} public void setAge(String age) {
this.age = age;
}
}

三、 MapReduce传递UserBean的一个简单例子

我们已经实现了可序列化的UserBean类,现在就做一个简单的例子,在MapReduce中传递UserBean

1. 准备一个文件user.txt,内容如下:

1 'tom' '22',2 'tom2' '22',3 'tom3' '22',4 'tom4' '22',5 'tom5' '22',6 'tom6' '22',7 'tom7' '22',8 'tom8' '22',9 'tom9' '22',10 'tom10' '22',11 'tom11' '22',12 'tom12' '22',13 'tom13' '22',1 'tom' '22',1 'tom' '22',2 'tom2' '22',2 'tom2' '22',

这个文件中有多个UserBean,我们的MapReduce就是要实现统计这些UserBean出现的次数

2. WCMapper.java的实现代码:

package com.qjx.serialize_8_2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper; /*
* Writable接口是一个实现了序列化协议的序列化对象。
* 在Hadoop中定义一个结构化对象都要实现Writable接口,使得该结构化对象可以序列化为字节流,字节流也可以反序列化为结构化对象。
* LongWritable类型:Hadoop.io对Long类型的封装类型
*/ public class WCMapper extends Mapper<LongWritable, Text, UserBean, LongWritable>{ @Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, UserBean, LongWritable>.Context context)
throws IOException, InterruptedException { // 获得每行文档内容,并且进行折分
String[] users = value.toString().split(","); // 遍历折份的内容
System.out.println(users.length);
for (String u1 : users) {
//根据空格划分为三个属性
String[] u = u1.toString().split(" ");
System.out.println(u.length);
if(u!=null && u.length== 3) {
UserBean u2 = new UserBean(Integer.parseInt(u[0]),u[1],u[2]);
context.write(u2, new LongWritable(1));
}
else {
System.out.println("user split false !");
}
}
}
}

3. WCReducer.java实现代码:

package com.qjx.serialize_8_2;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; import com.qjx.serialize_8_2.UserBean; public class WCReducer extends Reducer<UserBean, LongWritable, UserBean, LongWritable>{ @Override
protected void reduce(UserBean key, Iterable<LongWritable> values,
Reducer<UserBean, LongWritable, UserBean, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0;
for (LongWritable i : values) {
// i.get转换成long类型
sum += i.get();
}
// 输出总计结果
context.write(key, new LongWritable(sum));
}
}

4. UserCount.java 的实现代码:

package com.qjx.serialize_8_2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class UserCount { public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {
// 创建job对象
Job job = Job.getInstance(new Configuration());
// 指定程序的入口
job.setJarByClass(UserCount.class); // 指定自定义的Mapper阶段的任务处理类
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(UserBean.class);
job.setMapOutputValueClass(LongWritable.class);
// 本地数据的输入路径
FileInputFormat.setInputPaths(job, new Path("E:/trainingPack/serialize/input")); // 指定自定义的Reducer阶段的任务处理类
job.setReducerClass(WCReducer.class);
// 设置最后输出结果的Key和Value的类型 x
job.setOutputKeyClass(UserBean.class);
job.setOutputValueClass(LongWritable.class);
// 将计算的结果存到本地
FileOutputFormat.setOutputPath(job, new Path("E:/trainingPack/serialize/output")); // 执行提交job方法,直到完成,参数true打印进度和详情
job.waitForCompletion(true);
System.out.println("Finished");
}
}

5. 执行结果,生成的output内容如下:

1'tom''22'    3
2'tom2''22' 3
3'tom3''22' 1
4'tom4''22' 1
5'tom5''22' 1
6'tom6''22' 1
7'tom7''22' 1
8'tom8''22' 1
9'tom9''22' 1
10'tom10''22' 1
11'tom11''22' 1
12'tom12''22' 1
13'tom13''22' 1

最新文章

  1. 浅谈WebLogic和Tomcat
  2. plain framework 1 参考手册 入门指引之 简明教程
  3. How to create QTP Shared Object Repository
  4. Linux重启inotify配置max_user_watches无效被恢复默认值8192的正确修改方法
  5. 【WEB】jsp向servlet传参中文乱码问题解决
  6. 通过存储过程进行分页查询的SQL示例
  7. 使用微软 AppFabric 遇到问题
  8. psp个人软件过程需求文档
  9. Android 屏幕适配方案
  10. jQuery中间each实施例的方法
  11. java实现类似qq的窗口对聊
  12. PHP检测获取内存信息
  13. XJOI1571爱心蜗牛【树形动规】
  14. 06 Activity的启动模式 Intent的七大属性的总结
  15. SpringBoot的学习【5.Spring Boot 的配置文件】
  16. tomcat部署应用时设置context path为空的上下文路径问题
  17. MySql中的事务、JDBC事务、事务隔离级别
  18. 【T11】提防对等实体的不友好动作
  19. Harbor使用 -- 修改80端口
  20. MFC中添加了一个dialog,并创建了相应的类,初始化函数没有怎么办?

热门文章

  1. SVN Client API的.net 接口 SharpSvn介紹 Checkout操作实例
  2. 通过虚拟驱动vivi分析摄像头驱动
  3. HDU5374 Tetris (2015年多校比赛第7场)大模拟
  4. 2. Add Two Numbers【medium】
  5. proxy_redirect参数的作用
  6. Mockito - Wanted but not invoked: Actually, there were zero interactions with this mock
  7. mock实例方法
  8. grails email 发送邮件插件
  9. php windows 扩展redis
  10. OpenCV中Camshitf算法学习