很多视频网站都有电视剧热度排名,一般是依据用户在自己站的行为数据所体现出的受欢迎程度来排名。这里有一份来自优酷、爱奇艺、搜索视频等五大视频网站的一份视频播放数据,我们利用这份数据做些有意义的事情。

金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金婚第一部 3 9851 0 0 0 0
金子,轻松出来吧 1 4715 0 5 0 0
金子,轻松出来吧 1 2685 0 3 0 0
金枝欲孽 1 52307 9 14 0 4
金枝欲孽 3 8174 0 0 0 0
金枝欲孽 1 50709 3 28 0 6
金枝欲孽 3 8710 0 0 0 4
金枝欲孽 1 55621 0 10 0 2

注意:1-5数字和5大视频的关系:1优酷2搜狐3土豆4爱奇艺5迅雷看看

一、项目需求

自定义输入格式 完成统计任务 输出多个文件

输入数据:5个网站的 每天电视剧的 播放量 收藏数 评论数 踩数 赞数

输出数据:按网站类别 统计每个电视剧的每个指标的总量

任务目标:自定义输入格式 完成统计任务 输出多个文件

二、项目实现

第一步:既然要输出电视剧的一系列数据,那我们就需要定义一个电视剧热度数据的类,直接上代码。

package com.hadoop.mapreduce.test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 电视剧数据读写类
* 数据格式参考:继承者们 1 4105447 202 844 48 671
* @author Sparks.Li
* 需要自定义一个 VideoWritable 类实现 WritableComparable 接口,将每天电视剧的 播放量 收藏数 评论数 踩数 赞数 封装起来。
*/ //注意: Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)
//Writable接口提供两个方法(write和readFields)。 public class VideoWritable implements WritableComparable< Object > { private Long PlayNum;
private Long FavoriteNum;
private Long CommentNum;
private Long DownNum;
private Long LikeNum; // 问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢?
// 答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。 public VideoWritable(){} public VideoWritable(Long PlayNum,Long FavoriteNum,Long CommentNum,Long DownNum,Long LikeNum){//java����вι��캯��,�������ڴ�������ʱ��ʼ������
this.PlayNum = PlayNum;
this.FavoriteNum = FavoriteNum;
this.CommentNum = CommentNum;
this.DownNum = DownNum;
this.LikeNum = LikeNum;
} public void set(Long PlayNum,Long FavoriteNum,Long CommentNum,Long DownNum,Long LikeNum){
this.PlayNum = PlayNum;
this.FavoriteNum = FavoriteNum;
this.CommentNum = CommentNum;
this.DownNum = DownNum;
this.LikeNum = LikeNum;
} public Long getPlayNum() {
return PlayNum;
}
public void setPlayNum(Long PlayNum){
this.PlayNum = PlayNum;
}
public Long getFavoriteNum() {
return FavoriteNum;
}
public void setFavoriteNum(Long FavoriteNum){
this.FavoriteNum = FavoriteNum;
}
public Long getCommentNum() {
return CommentNum;
}
public void setCommentNum(Long CommentNum){
this.CommentNum = CommentNum;
}
public Long getDownNum() {
return DownNum;
}
public void setDownNum(Long DownNum){
this.DownNum = DownNum;
}
public Long getLikeNum() {
return LikeNum;
}
public void setLikeNum(Long LikeNum){
this.LikeNum = LikeNum;
} // 实现WritableComparable的readFields()方法
// 对象不能传输的,需要转化成字节流!
// 将对象转换为字节流并写入到输出流out中是序列化,write 的过程
// 从输入流in中读取字节流反序列化为对象 是反序列化,readFields的过程 public void readFields(DataInput in) throws IOException {
PlayNum = in.readLong();
FavoriteNum = in.readLong();
CommentNum = in.readLong();
DownNum = in.readLong();
LikeNum = in.readLong();
// in.readByte()
// in.readChar()
// in.readDouble()
// in.readLine()
// in.readFloat()
// in.readLong()
// in.readShort() } public void write(DataOutput out) throws IOException {
out.writeLong(PlayNum);
out.writeLong(FavoriteNum);
out.writeLong(CommentNum);
out.writeLong(DownNum);
out.writeLong(LikeNum);
// out.writeByte()
// out.writeChar()
// out.writeDouble()
// out.writeFloat()
// out.writeLong()
// out.writeShort()
// out.writeUTF()
} public int compareTo(Object o) {
return 0;
} }

VideoWritable

第二步:通过查看数据集可知,hadoop自带的数据输入格式已经不满足我们的需求了,这时候我们就需要来实现自己的InputFormat类啦。

package com.hadoop.mapreduce.test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
/**
* 自定义电视剧数据读写类
* 数据格式参考:继承者们 1 4105447 202 844 48 671
* @author Sparks.Li
* 需要自定义一个 VideoWritable 类实现 WritableComparable 接口,将每天电视剧的 播放量 收藏数 评论数 踩数 赞数 封装起来。
*/ //自定义输入格式 VideoInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。 public class VideoInputFormat extends FileInputFormat<Text,VideoWritable > {//自定义数据输入格式,其实这都是模仿源码的!可以去看 // 线路是: boolean isSplitable() -> RecordReader<Text,VideoWritable> createRecordReader() -> VideoRecordReader extends RecordReader<Text, VideoWritable > @Override
protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
// isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
// isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
return false; //整个文件封装到一个InputSplit } @Override
public RecordReader<Text,VideoWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
// RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
// createRecordReader是方法,在这里是,VideoInputFormat.createRecordReader。VideoInputFormat是InputFormat类的实例 //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类VideoRecordReader。 return new VideoRecordReader();//新建一个VideoRecordReader实例,所有才有了上面RecordReader<Text,VideoWritable>,所以才如下VideoRecordReader,写我们自己的
} //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为VideoWritable类型
public static class VideoRecordReader extends RecordReader<Text, VideoWritable > {//RecordReader<k1, v1>是一个整体
public LineReader in;//行读取器
public Text line;//每行数据类型
public Text lineKey;//自定义key类型,即k1
public VideoWritable lineValue;//自定义value类型,即v1 @Override
public void close() throws IOException {//关闭输入流
if(in !=null){
in.close();
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
return lineKey;//返回类型是Text,即Text lineKey
}
@Override
public VideoWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
return lineValue;//返回类型是VideoWritable,即VideoWritable lineValue
}
@Override
public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
return 0;//返回类型是float,即float 0
}
@Override
public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
FileSplit split=(FileSplit)input;
Configuration job=context.getConfiguration();
Path file=split.getPath();
FileSystem fs=file.getFileSystem(job); FSDataInputStream filein=fs.open(file);
in=new LineReader(filein,job);//输入流in
line=new Text();//每行数据类型
lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
lineValue = new VideoWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
} //此方法读取每行数据,完成自定义的key和value
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。 // 是SplitLineReader.readLine -> SplitLineReader extends LineReader -> org.apache.hadoop.util.LineReader // in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
// in.readLine(str, maxLineLength)//只读到maxLineLength行
// in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值 if(linesize==0) return false; String[] pieces = line.toString().split("\\t");//解析每行数据
//因为,有些视频名中含有空格,所以这里使用\t来切割!!!注意 if(pieces.length != 7){
throw new IOException("Invalid record received");
}
//将学生的每门成绩转换为 float 类型
Long a,b,c,d,e;
try{
a = Long.parseLong(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a
b = Long.parseLong(pieces[3].trim());
c = Long.parseLong(pieces[4].trim());
d = Long.parseLong(pieces[5].trim());
e = Long.parseLong(pieces[6].trim());
}catch(NumberFormatException nfe){
throw new IOException("Error parsing floating poing value in record");
}
lineKey.set(pieces[0]+"\t"+pieces[1]);//完成自定义key数据
lineValue.set(a, b, c, d, e);//封装自定义value数据 return true;
}
}
}

VideoInputFormat

第三步:这时候我们就可以写MapReduce统计程序来得出我们想要的结果啦。

package com.hadoop.mapreduce.test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* 学生成绩统计Hadoop程序
* 数据格式参考:继承者们 1 4105447 202 844 48 671
* @author sparks.li
*/
public class VideoCount extends Configured implements Tool{
public static class VideoMapper extends Mapper<Text,VideoWritable,Text,VideoWritable>{
@Override
protected void map(Text key, VideoWritable value, Context context)throws IOException, InterruptedException{
context.write(key, value); //写入key是k2,value是v2
// context.write(new Text(key), new ScoreWritable(value));等价
}
} public static class VideoReducer extends Reducer<Text,VideoWritable,Text,Text>{
private Text result = new Text();
private Text reduceKey = new Text();
private MultipleOutputs< Text, Text> multipleOutputs; @Override
protected void setup(Context context) throws IOException ,InterruptedException{
multipleOutputs = new MultipleOutputs< Text, Text>(context);
}
protected void reduce(Text Key, Iterable< VideoWritable > Values, Context context)throws IOException, InterruptedException{
Long PlayNum= new Long(0);
Long FavoriteNum= new Long(0);
Long CommentNum= new Long(0);
Long DownNum= new Long(0);
Long LikeNum= new Long(0);
for(VideoWritable ss:Values){
PlayNum += ss.getPlayNum();
FavoriteNum += ss.getFavoriteNum();
CommentNum += ss.getCommentNum();
DownNum += ss.getDownNum();
LikeNum += ss.getLikeNum();
}
result.set(PlayNum + "\t" + FavoriteNum + "\t" + CommentNum + "\t" + DownNum
+ "\t" + LikeNum); String[] webs = {"nothing","youku","souhu","tudou","aiqiyi","xunlei"}; String[] pieces = Key.toString().split("\\t+");//解析 key 数据
//line是Text类型。pieces是String[],即String数组。 if(pieces.length != 2){
throw new IOException("Invalid reduce key received");
}
// 设置key为视频名字
reduceKey.set(pieces[0] + "\t");
// 设置输出路径为视频网站名
String filename = webs[Integer.parseInt(pieces[1])]; multipleOutputs.write(reduceKey, result, filename);
}
@Override
protected void cleanup(Context context) throws IOException ,InterruptedException{
multipleOutputs.close();
}
} public int run(String[] args) throws Exception{
Configuration conf = new Configuration();//读取配置文件 Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
if (hdfs.isDirectory(mypath))
{
hdfs.delete(mypath, true);
} Job job = new Job(conf, "Video Count");//新建任务
job.setJarByClass(VideoCount.class);//设置主类 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径 job.setMapperClass(VideoMapper.class);// Mapper
job.setReducerClass(VideoReducer.class);// Reducer job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
job.setMapOutputValueClass(VideoWritable.class);// Mapper value输出类型 job.setInputFormatClass(VideoInputFormat.class);//设置自定义输入格式 job.waitForCompletion(true);
return 0;
} public static void main(String[] args) throws Exception{
String[] args0 =
{
"hdfs://sparks:9000/middle/video/tvplay.txt",
"hdfs://sparks:9000/middle/video/out"
}; int ec = ToolRunner.run(new Configuration(), new VideoCount(), args0);
System.exit(ec);
}
}

VideoCount

第四步:万事俱备只欠数据啦,我们把本地数据上传到HDFS上,并运行程序

hadoop fs -put 数据集文件 hdfs路径

  

三、项目结果

搞定!

最新文章

  1. 如何从零基础学习VR
  2. MS SQLSERVER中如何快速获取表的记录总数
  3. 从零开始学习jQuery (二) 万能的选择器
  4. Python for loop and while loop
  5. 编写运行R脚本
  6. WebDriver 随笔
  7. Sina App Engine(SAE)教程(11)- Yaf使用
  8. oracle数据库 PSU,SPU(CPU),Bundle Patches 和 Patchsets 补丁号码快速参考 (文档 ID 1922396.1)
  9. JQuery 之 Ajax 异步和同步浅谈
  10. Code First:如何实现一个主类中包含多个复类
  11. JavaScript高级程序设计40.pdf
  12. openui5中的RESTful实现odata协议
  13. mdadm命令详解及实验过程
  14. python基础操作_集合_三元运算
  15. (二十二)java小练习三
  16. 谈谈Mysql主从同步延迟分析及解决方案
  17. javascript 使用数组+循环+条件实现数字转换为汉字的简单方法。
  18. 事务回滚 SET XACT_ABORT ON
  19. linux安装redis ,mariadb
  20. Python PIL 库的应用

热门文章

  1. 4.如何实现用MTQQ通过服务器实现订阅者和发布者的通讯
  2. (转载)VB 查询Oracle中blob类型字段,并且把blob中的图片以流的方式显示在Image上
  3. Spring中获取对象
  4. Java自学手记——注解
  5. Unity-Shader-镜面高光Phong&amp;BlinnPhong-油腻的师姐在哪里
  6. [USACO09OCT]热浪Heat Wave
  7. Unity3D文件读取
  8. Android性能优化:ViewStub
  9. DataReader To List
  10. Python基础之模块、数据类型及数据类型转换