package com.bank.service;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 使用MapReduce批量导入Hbase
 *     通过TableOutputFormat,该类内部传给指定的Put实例并调用table.put()方法。作业结束前会主动调用flushCommits()方法保存仍在写缓冲区的数据
 *
 * @author mengyao
 *
 */
public class CnyBatch extends Configured implements Tool {

static class CnyBatchMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        protected void map(LongWritable key, Text value, Context context)
                throws java.io.IOException, InterruptedException {
            context.write(key, value);
        }
    }

static class CnyBatchReduce extends TableReducer<LongWritable, Text, NullWritable> {
        private final static String familyName = "info";
        private final static String[] qualifiers = {"gzh", "currency", "version", "valuta", "qfTime", "flag", "machineID"};
        @Override
        protected void reduce(LongWritable key,
                java.lang.Iterable<Text> value, Context context)
                throws java.io.IOException, InterruptedException {
            final String[] values = value.toString().split("\t");
            if (values.length == 7 && values.length == qualifiers.length) {
                 final String row = values[0]+"_"+values[1]+"_"+values[2]+"_"+values[3];
                 long timestamp = System.currentTimeMillis();
                 Put put = new Put(Bytes.toBytes(row));
                 for (int i = 0; i < values.length; i++) {
                     String qualifier = qualifiers[i];
                     String val = values[i];
                     put.add(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), timestamp, Bytes.toBytes(val));
                 }
                 context.write(NullWritable.get(), put);
            } else {
                 System.err.println(" ERROR: value length must equale qualifier length ");
            }
        };
    }

@Override
    public int run(String[] arg0) throws Exception {
        Job job = Job.getInstance(getConf(), CnyBatch.class.getSimpleName());
        TableMapReduceUtil.addDependencyJars(job);
        job.setJarByClass(CnyBatch.class);
        
        FileInputFormat.setInputPaths(job, arg0[0]);
        job.setMapperClass(CnyBatchMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setReducerClass(CnyBatchReduce.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("dfs.socket.timeout", "100000");
        String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println(" ERROR: <dataInputDir> <tableName>");
            System.exit(2);
        }
        conf.set(TableOutputFormat.OUTPUT_TABLE, args[1]);
        int status = ToolRunner.run(conf, new CnyBatch(), args);
        System.exit(status);
    }
}

最新文章

  1. UILabel顶端对齐
  2. Mac OS 下安装wget
  3. linux系统进程的内存布局
  4. android第一天错误
  5. 用非GUI模式执行测试,jp@gc - PerfMon Metrics Collector会出现无法获取正确数据的解决办法
  6. Python:认识变量和字符串
  7. 《高级软件测试》web测试实践--12月30日记录
  8. [USACO Jan09] 安全路径
  9. Cocos2D v3.x中关于重叠触摸层优先级的问题
  10. JavaScript夯实基础系列(二):闭包
  11. Django学习笔记(3)--模板
  12. JavaScript面向对象--继承 (超简单易懂,小白专属)
  13. 壁虎书1 The Machine Learning Landscape
  14. 【LOJ 3049】「十二省联考 2019」字符串问题
  15. n个整数中1出现的次数
  16. Installing PHP5 on Ubuntu Server
  17. JavaScript快速入门-实战(例子)
  18. DRUPAL8模版命名规则
  19. C# 把string字符导出到txt文档方法
  20. Redis used_cpu_sys used_cpu_user meaning (redis info中cpu信息的含义)

热门文章

  1. VS2008 动态库和静态库的生成和加载
  2. 27个Jupyter快捷键、技巧(原英文版)
  3. POJ2449
  4. 带左右箭头切换的自动滚动图片JS特效
  5. nat123 与微信公众号开发者测试账号配合调试
  6. 13 hbase连接
  7. 【C++学习之路】派生类的构造函数(二)
  8. [转]简述volatile
  9. mysql innerjoin left join right join 解析
  10. 关于 Delphi 中的Sender和易混淆的概念(转)