HBase自定义MapReduce
2024-09-04 09:23:03
HBase表数据的转移
在Hadoop阶段,我们编写的MR任务分别进程了Mapper和Reducer两个类,而在HBase中我们需要继承的是TableMapper和TableReducer两个类。
目标:将fruit表中的一部分数据,通过MR迁入到fruit_mr表中
Step1、构建ReadFruitMapper类,用于读取fruit表中的数据
package com.z.hbase_mr; import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes; public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> { @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { //将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
Put put = new Put(key.get()); //遍历添加column行
for(Cell cell: value.rawCells()){ //添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){ //添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //将该列cell加入到put对象中
put.add(cell); //添加/克隆列:color
}else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){ //向该列cell加入到put对象中
put.add(cell);
}
}
} //将从fruit读取到的每行数据写入到context中作为map的输出
context.write(key, put);
} }
Step2、构建WriteFruitMRReducer类,用于将读取到的fruit表中的数据写入到fruit_mr表中
package com.z.hbase_mr; import java.io.IOException; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable; public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> { @Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_mr表中 for(Put put: values){ context.write(NullWritable.get(), put); }
} }
Step3、构建Fruit2FruitMRJob extends Configured implements Tool,用于组装运行Job任务
//组装Job
public int run(String[] args) throws Exception { //得到Configuration
Configuration conf = this.getConf(); //创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName()); job.setJarByClass(Fruit2FruitMRJob.class); //配置Job
Scan scan = new Scan(); scan.setCacheBlocks(false); scan.setCaching(500); //设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob( "fruit", //数据源的表名 scan, //scan扫描控制器 ReadFruitMapper.class,//设置Mapper类 ImmutableBytesWritable.class,//设置Mapper输出key类型 Put.class,//设置Mapper输出value值类型 job//设置给哪个JOB ); //设置Reducer TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job); //设置Reduce数量,最少1个
job.setNumReduceTasks(1); boolean isSuccess = job.waitForCompletion(true); if(!isSuccess){ throw new IOException("Job running with error"); } return isSuccess ? 0 : 1; }
Step4、主函数中调用运行该Job任务
public static void main( String[] args ) throws Exception{ Configuration conf = HBaseConfiguration.create(); int status = ToolRunner.run(conf, new Fruit2FruitMRJob(), args); System.exit(status); }
最新文章
- java基础学习02(简单的java程序)
- sql执行循序
- JDBC连接数据库(PreparedStatement)
- Error:Protocol family unavailable
- Gson解析复杂的json数据
- 关于python的__name__理解
- MvvmCross for WPF File Plugin
- Mapped Statements collection does not contain value for ResearcherMapper.方法名
- 虚拟机下linux上网
- hdu 2106 decimal system
- C# 之 托付
- 20170822xlVBA ExportCellPhone
- 使用BAPI批量修改采购信息记录的税率
- ZT 用gdb调试core dump文件
- JS 实战1(添加、删除)
- Tomcat *的下载(绿色版和安装版都适用)
- python 异常知识点
- 【nynu】 妹妹的工资怎么算(二分)
- linux mysql命令行查看显示中文
- LeetCode: 53. Maximum Subarray(Easy)