一。前述

Reduce文件会从Mapper任务中拉取很多小文件,小文件内部有序,但是整体是没序的,Reduce会合并小文件,然后套个归并算法,变成一个整体有序的文件。

二。代码

ReduceTask源码:

 public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// start thread that will handle communication with parent
TaskReporter reporter = startReporter(umbilical); boolean useNewApi = job.getUseNewReducer();
initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
} // Initialize the codec
codec = initCodec();
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = conf.getCombinerClass();
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null; Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run();//按顺序迭代 // free up the data structures
mapOutputFilesOnDisk.clear(); sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();//分组比较 对应解析源码1 if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator, //对应解析源码2
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} shuffleConsumerPlugin.close();
done(umbilical, reporter);
} 源码1:分组比较器的源码 public RawComparator getOutputValueGroupingComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.GROUP_COMPARATOR_CLASS, null, RawComparator.class);//用户没有设置分组比较器的时候,用默认的
if (theClass == null) {
return getOutputKeyComparator();//对应解析源码1.1
} return ReflectionUtils.newInstance(theClass, this);
}

源码1.1排序比较器,当用户不设置的时候取排序比较器实现,此时如果用户配置排序比较器,用排序比较器,没有的话用默认的Key的比较器

  public RawComparator getOutputKeyComparator() {
Class<? extends RawComparator> theClass = getClass(
JobContext.KEY_COMPARATOR, null, RawComparator.class);
if (theClass != null)
return ReflectionUtils.newInstance(theClass, this);
return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
}

总结:

在Map端是真正改变(调整)Key的顺序的,在Reduce端是不会真正改变(调整)拉过来的其顺序的,Reduce不会重新排序,Reduce端强依赖Map端的输出。

解析源码2:runNewReduce的实现

void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
// wrap value iterator to report progress.
final RawKeyValueIterator rawIter = rIter;//真正的迭代器
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
// make a task context so we can get the classes
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
// make a reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter, //构建上下文的时候把迭代器传进来
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,//比较器 解析源码2.1
valueClass);
try {
reducer.run(reducerContext);//构建完上下文之后运行Redude的Run方法 解析源码Reduce2.2
} finally {
trackedRW.close(reducerContext);
}
}

解析源码2.1: createReduceContext实现构建上下文的源码

public ReduceContextImpl(Configuration conf, TaskAttemptID taskid,
RawKeyValueIterator input, //把迭代器传给输入对象Input
Counter inputKeyCounter,
Counter inputValueCounter,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter,
RawComparator<KEYIN> comparator,
Class<KEYIN> keyClass,
Class<VALUEIN> valueClass
) throws InterruptedException, IOException{
super(conf, taskid, output, committer, reporter);
this.input = input;
this.inputKeyCounter = inputKeyCounter;
this.inputValueCounter = inputValueCounter;
this.comparator = comparator;
this.serializationFactory = new SerializationFactory(conf);
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(buffer);
this.valueDeserializer = serializationFactory.getDeserializer(valueClass);
this.valueDeserializer.open(buffer);
hasMore = input.next();
this.keyClass = keyClass;
this.valueClass = valueClass;
this.conf = conf;
this.taskid = taskid;
}
/** Start processing next unique key. */
  public boolean nextKey() throws IOException,InterruptedException {//实际上Reduce中run方法中的contect.netKey调用的逻辑
    while (hasMore && nextKeyIsSame) {//第一次假 放空
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false;
    }
  }   /**
   * Advance to the next key/value pair.
   */
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if (!hasMore) {
      key = null;
      value = null;
      return false;
    }
    firstValue = !nextKeyIsSame;
    DataInputBuffer nextKey = input.getKey();
    currentRawKey.set(nextKey.getData(), nextKey.getPosition(),
                      nextKey.getLength() - nextKey.getPosition());
    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
    key = keyDeserializer.deserialize(key);
    DataInputBuffer nextVal = input.getValue();
    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()
        - nextVal.getPosition());
    value = valueDeserializer.deserialize(value);     currentKeyLength = nextKey.getLength() - nextKey.getPosition();
    currentValueLength = nextVal.getLength() - nextVal.getPosition();     if (isMarked) {
      backupStore.write(nextKey, nextVal);
    }     hasMore = input.next();
    if (hasMore) {
      nextKey = input.getKey();
      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
                                     currentRawKey.getLength(),
                                     nextKey.getData(),
                                     nextKey.getPosition(),
                                     nextKey.getLength() - nextKey.getPosition()
                                         ) == 0;//判断当前key和下一个Key是否相等。
    } else {
      nextKeyIsSame = false;
    }
    inputValueCounter.increment(1);
    return true;
  }   public KEYIN getCurrentKey() {
    return key;
  }   @Override
  public VALUEIN getCurrentValue() {
    return value;
  }
 

解析源码2.2 Reduce

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; import java.util.Iterator; * public class IntSumReducer&lt;Key&gt; extends Reducer&lt;Key,IntWritable,
* Key,IntWritable&gt; {
* private IntWritable result = new IntWritable();
*
* public void reduce(Key key, Iterable&lt;IntWritable&gt; values,
* Context context) throws IOException, InterruptedException {
* int sum = 0;
* for (IntWritable val : values) {
* sum += val.get();
* }
* result.set(sum);
* context.write(key, result);
* }
* }
* </pre></blockquote></p>
*
* @see Mapper
* @see Partitioner
*/
@Checkpointable
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> { /**
* The <code>Context</code> passed on to the {@link Reducer} implementations.
*/
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
} /**
* Called once at the start of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
} /**
* This method is called once for each key. Most applications will define
* their reduce class by overriding this method. The default implementation
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
} /**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
} /**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {//实际上在这一步里实际上调用了NextKeyValue的值更新了 hasmore,nextKeyisSame,Key,Value的值
reduce(context.getCurrentKey(), context.getValues(), context);//解析源码2.2.1
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}

源码2.2.1context.getValues的最终实现是一个迭代器

protected class ValueIterator implements ReduceContext.ValueIterator<VALUEIN> {

    private boolean inReset = false;
private boolean clearMarkFlag = false; @Override
public boolean hasNext() {
try {
if (inReset && backupStore.hasNext()) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("hasNext failed", e);
}
return firstValue || nextKeyIsSame;
} @Override
public VALUEIN next() {
if (inReset) {
try {
if (backupStore.hasNext()) {
backupStore.next();
DataInputBuffer next = backupStore.nextValue();
buffer.reset(next.getData(), next.getPosition(), next.getLength()
- next.getPosition());
value = valueDeserializer.deserialize(value);
return value;
} else {
inReset = false;
backupStore.exitResetMode();
if (clearMarkFlag) {
clearMarkFlag = false;
isMarked = false;
}
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("next value iterator failed", e);
}
} // if this is the first record, we don't need to advance
if (firstValue) {
firstValue = false;
return value;
}
// if this isn't the first record and the next key is different, they
// can't advance it here.
if (!nextKeyIsSame) {
throw new NoSuchElementException("iterate past last value");
}
// otherwise, go to the next key/value pair
try {
nextKeyValue();//这个迭代器自身是没有数据的,在Next中调用的还是 nextKeyValue,在这个NextKeyValue中调用的是Input的输入数据
return value;
} catch (IOException ie) {
throw new RuntimeException("next value iterator failed", ie);
} catch (InterruptedException ie) {
// this is bad, but we can't modify the exception list of java.util
throw new RuntimeException("next value iterator interrupted", ie);
}
}

总结:以上说明一个流程。Reduce会拉回一个数据集,然后封装一个迭代器,真迭代器,ReduceContext会基于这个迭代器给我们封装一个方法,其中包括NextKeyValue这个方法,通过这个方法简介更新Key,Value的值,然后再Reduce方法的Run中有一个While循环,调用的是NextKey方法,底层调用的还是netxkeyValue方法,然后调用Reduce方法,传进去context.getCurrentKey(), context.getValues()两个方法,然后基于Value方法迭代,里面有HasNext和Next方法,Next方法实际上调用的还是真正的迭代器,最终数据时从镇迭代器中迭代出来的,在真正迭代器中有一个重要的标识NextKeyisSame,这个标识会被hasNext方法用到然后判断下一个key是否 相同,直到一组数据。

PS:补充一个知识点:

next调用的是NextKeyValue的方法,会把KeyValue真正改变,所以这块传的是引用传递。会改变同一块内存中的数据。

持续更新中。。。。,欢迎大家关注我的公众号LHWorld.

最新文章

  1. LinkedList
  2. Content has been consumed
  3. Linux SSH远程文件/目录传输命令scp
  4. 【转】页面跳转Transfer与Redirect的区别你知道吗?
  5. C++学习38 string字符串的增删改查
  6. linux 免交互状态下修改用户密码
  7. Objective-C调用Swift
  8. 《A First Course in Mathematical Modeling》-chaper1-差分方程建模
  9. Activity一共有以下四种launchMode
  10. perl malformed JSON string, neither tag, array, object, number, string or atom, at character offset
  11. android uiautomator自己主动化測试
  12. Vue过滤器
  13. SOD框架--系统概要
  14. django.db.utils.OperationalError: (1049, &quot;Unknown database &#39;djangodb&#39;&quot;)
  15. Python3基础-代码阅读系列—素数
  16. MT【61】含参数二次函数最大最小值
  17. 3D点云的深度学习
  18. 兼容低版本 ie 的思路
  19. [UE4]增加观察者
  20. CSS动画简介

热门文章

  1. HangFire快速入门 分布式后端作业调度框架服务
  2. 使用IIS应用程序初始化来保持ASP.NET应用程序的活动
  3. SharePoint修改左上角文字的命令行
  4. SSM项目目录结构
  5. spring boot 项目打成war包部署到服务器
  6. shell编程基础语法
  7. linux查看空间情况----df与du命令
  8. unzip解压失败
  9. Spring Cloud微服务笔记(二)Spring Cloud 简介
  10. JUC