mport java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Driver {

public static class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

public static class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

public static class DependenceMapper extends
            Mapper<Object, Text, Text, Text> {
        private Text word = new Text();
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            String []sep=value.toString().split("\t");
            word.set(sep[1]+"\t"+sep[0]);
            System.out.println(value.toString());
            context.write(word,new Text(""));
        }
    }

public static class DependenceReducer extends
            Reducer<Text,Text,Text,Text> {
        public void reduce(Text key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
            String[] sep = key.toString().split("\t");
            System.out.println( sep[0]+"++++++++="+ sep[1]);
            context.write(key,new Text(""));
        }
    }

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job(conf, "word count");
        //加入控制容器
        ControlledJob ctrljob1=new  ControlledJob(conf);
        ctrljob1.setJob(job);
        job.setJarByClass(Driver.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
//        job.waitForCompletion(true);

Configuration conf2 = new Configuration();
        Job job2 = new Job(conf2, "word count1");
         ControlledJob ctrljob2=new ControlledJob(conf);
            ctrljob2.setJob(job2);
            ctrljob2.addDependingJob(ctrljob1);
        job2.setJarByClass(Driver.class);
        job2.setMapperClass(DependenceMapper.class);
        job2.setReducerClass(DependenceReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job2, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job2, new Path(otherArgs[2]));
    //    job2.waitForCompletion(true);
          JobControl jobCtrl=new JobControl("myctrl");
          
            //添加到总的JobControl里,进行控制
            jobCtrl.addJob(ctrljob1);
            jobCtrl.addJob(ctrljob2);
            jobCtrl.run();
            
    }
}

最新文章

  1. equals == 比较
  2. 【BZOJ-3712】Fiolki LCA + 倍增 (idea题)
  3. Windows Phone &amp; Windows App应用程序崩溃crash信息抓取方法
  4. 使用buildroot编译bind DNS服务器
  5. (转)Engineering Productivity
  6. HDU 1695 (莫比乌斯反演) GCD
  7. Android(java)学习笔记112:局部位置的内部类的介绍
  8. citrix xen server 虚拟机无法关闭的问题
  9. HDU 1548 A strange lift(dij+邻接矩阵)
  10. [NOI2005] 维护数列
  11. Redis笔记1-redis的搭建和使用
  12. mysql分区方案的研究
  13. Unity之流光效果
  14. C从源码到运行发生了哪些事
  15. 三种方式控制GPIO
  16. POJ 2296 Map Labeler (2-Sat)
  17. python json (loads(),load(),jump(),jumps())
  18. UI(一)
  19. Hadoop本地安装
  20. 【算法笔记】A1039 Course List for Student

热门文章

  1. android打包大小笔录
  2. 请教下关于CKEditor富文本编辑框设置字体颜色的问题
  3. 【报错】java.lang.RuntimeException: Invalid action class configuration that references an unknown class named [xxxAction]
  4. C/C++宏定义中#与##区别 .
  5. 程序开发:MVC设计模式与应用
  6. IIS6 伪静态
  7. how to download image from any web page in java 下载图片
  8. 面向方面编程(AOP)
  9. LeetCode OJ 107. Binary Tree Level Order Traversal II
  10. 解决Xcode 9.2系统真机测试时出现 could not find developer disk image问题