Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。
一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。
框架会对map的输出先进行排序, 然后把结果输入给reduce任务。
通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。
这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用

输入与输出流程主要分为 Map阶段,Shuffle阶段,Reduce阶段

Map阶段

分片(Split):
    map阶段的输入通常是HDFS上文件,在运行Mapper前,FileInputFormat会将输入文件分割成多个split
    1个split至少包含1个HDFS的Block(默认为128M)然后每一个分片运行一个map进行处理。

执行(Map):
    对输入分片中的每个键值对调用map()函数进行运算,然后输出一个结果键值对。

Partitioner:
    对map()的输出进行partition,即根据key或value及reduce的数量来决定当前的这对键值对最终应该交由哪个reduce处理。
    默认是对key哈希后再以reduce task数量取模,默认的取模方式只是为了避免数据倾斜。然后该key/value对以及partitionIdx的结果都会被写入环形缓冲区。

溢写(Spill):
    map输出写在内存中的环形缓冲区,默认当缓冲区满80%,启动溢写线程,将缓冲的数据写出到磁盘。

Sort:
    在溢写到磁盘之前,使用快排对缓冲区数据按照partitionIdx, key排序。(每个partitionIdx表示一个分区,一个分区对应一个reduce)

Combiner:
    如果设置了Combiner,那么在Sort之后,还会对具有相同key的键值对进行合并,减少溢写到磁盘的数据量。

合并(Merge):
    溢写可能会生成多个文件,这时需要将多个文件合并成一个文件。合并的过程中会不断地进行 sort & combine 操作,最后合并成了一个已分区且已排序的文件。

Shuffle阶段

Shuffle    广义上Shuffle阶段横跨Map端和Reduce端,在Map端包括Spill过程,在Reduce端包括copy和merge/sort过程。  
  通常认为Shuffle阶段就是将map的输出作为reduce的输入的过程

Copy过程  Reduce端启动一些copy线程,通过HTTP方式将map端输出文件中属于自己的部分拉取到本地。
  Reduce会从多个map端拉取数据,并且每个map的数据都是有序的。

Merge过程:
  Copy过来的数据会先放入内存缓冲区中,这里的缓冲区比较大;当缓冲区数据量达到一定阈值时,
  将数据溢写到磁盘(与map端类似,溢写过程会执行 sort & combine)。如果生成了多个溢写文件,它们会被merge成一个有序的最终文件。
  这个过程也会不停地执行 sort & combine 操作。

Reduce阶段

Shuffle阶段最终生成了一个有序的文件作为Reduce的输入,对于该文件中的每一个键值对调用reduce()方法,并将结果写到HDFS。

WordCountMapper

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    /**
     * 每读取一行  就会调用一次map方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 进来的value是一行文本,我们就将这一行文本切分单词
        String[] words = value.toString().split(" ");
        for (String word : words) {
            context.write(new Text(word), new LongWritable(1));
        }

    }

}

WordCountReducer

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        // 看起来是for循环,其实是调用next这种迭代机制
        for (LongWritable value : values) {
            count += value.get();
        }
        context.write(key, new LongWritable(count));
    }

    // 整个Reduce执行结束之后  会调用这个方法
    protected void cleanup(Context context) throws IOException ,InterruptedException {

    }
}

WordCount

public class WordCount {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        //操作HDFS上的文件    默认操作本地文件
        //conf.set("fs.defaultFS", "hdfs://hadoop-01:9000");  

        //创建一个用来描述本次数据处理工作的job对象
        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCount.class); 

        //设置这个job所用的业务mapper和reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
//        job.setCombinerClass(WordCountReducer.class);

        //设置mapper和reducer的输出数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //设置job要处理的数据在哪里,输出到哪里
        FileInputFormat.setInputPaths(job, new Path("./wordcount/words.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\wordcount\\output"));

        // 操作HDFS上的文件
//        FileInputFormat.setInputPaths(job, new Path("/2016.5.12/words.txt"));
//        FileOutputFormat.setOutputPath(job, new Path("/2016.5.12/output"));

        //向yarn集群发出提交job的请求
        job.waitForCompletion(true);

    }

}

最新文章

  1. BroadcastReceiver几种常见监听
  2. Ansible-playbook批量部署,更新war脚本,可以再完善----后续再update
  3. python不用加号实现加法
  4. AccessViolationException: 尝试读取或写入受保护的内存。这通常指示其他内存已损坏。
  5. mysql一次添加多条记录
  6. 【反射】——Autofac 类型注册
  7. Sold out
  8. MFC去掉win7玻璃效果
  9. iOS 数据持久化(2):SQLite3
  10. WindowManager
  11. STM32电源管理
  12. kbhit()的三个测试
  13. 开源免费的.NET图像即时处理的组件ImageProcessor
  14. C语言_第一讲_C语言入门
  15. pat1051-1060
  16. js中的 Date对象 在 IOS 手机中的兼容性问题
  17. [Swift]LeetCode837. 新21点 | New 21 Game
  18. Git让你从入门到精通,看这一篇就够了
  19. 洛谷P2468 SDOI 2010 粟粟的书架
  20. Rancher 容器管理平台-免费视频培训-链接及内容-第三季

热门文章

  1. element-ui 表格标题换行
  2. 【NOIP2016A组模拟7.13】亚瑟王之宫
  3. 详解HASH(字符串哈希)
  4. Leetcode 3. Longest Substring Without Repeating Characters(string 用法 水题)
  5. 170905-MyBatis中的关系映射
  6. 语法检查程序LanguageTool学习和使用笔记
  7. JavaSE知识点:finalize,treeMap
  8. 电脑配置Java环境变量之后,在cmd中仍然无法识别
  9. 2019年CCPC网络赛 HDU 6703 array【权值线段树】
  10. SQL Server DACPAC数据库部署错误