1 . 旧版 API 的 Mapper/Reducer 解析

Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式,并交给Mapper/Reducer 中的 map/reduce 函数处理,产生另外一些 key/value。Mapper 与 Reducer 的类体系非常类似,我们以 Mapper 为例进行讲解。Mapper 的类图如图所示,包括初始化、Map操作和清理三部分。

(1)初始化
Mapper 继承了 JobConfigurable 接口。该接口中的 configure 方法允许通过 JobConf 参数对 Mapper 进行初始化。

(2)Map 操作
MapReduce 框架会通过 InputFormat 中 RecordReader 从 InputSplit 获取一个个 key/value 对, 并交给下面的 map() 函数处理:

void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter) throws IOException;

该函数的参数除了 key 和 value 之外, 还包括 OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。

(3)清理
Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close方法,用户可通过实现该方法对 Mapper 进行清理。
MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图所示。它们对应的功能分别是:

❑ChainMapper/ChainReducer:用于支持链式作业。

❑IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理, 直接输出。

❑InvertMapper:交换 key/value 位置。

❑ RegexMapper:正则表达式字符串匹配。

❑TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。

❑LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。

对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了比 Mapper 更通用的接口:MapRunnable,如图所示。用 户可以实现该接口以定制Mapper 的调用 方式或者自己实现 key/value 的处理逻辑,比如,Hadoop Pipes 自行实现了MapRunnable,直接将数据通过 Socket 发送给其他进程处理。提供该接口的另外一个好处是允许用户实现多线程 Mapper。

如图所示, MapReduce 提供了两个 MapRunnable 实现,分别是 MapRunner 和MultithreadedMapRunner,其中 MapRunner 为默认实现。 MultithreadedMapRunner 实现了一种多线程的 MapRunnable。 默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU类型的作业以提供吞吐率。

2. 新版 API 的 Mapper/Reducer 解析

从图可知, 新 API 在旧 API 基础上发生了以下几个变化:

❑Mapper 由接口变为类,且不再继承 JobConfigurable 和 Closeable 两个接口,而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。

❑将参数封装到 Context 对象中,这使得接口具有良好的扩展性。

❑去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。

❑新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采用“ foreach” 形式遍历所有 value,如下所示:

void reduce(KEYIN key, Iterable<VALUEIN> values, Context context) throws IOException, InterruptedException {
for(VALUEIN value: values) { // 注意遍历方式
context.write((KEYOUT) key, (VALUEOUT) value);
}
}

Mapper类的完整代码如下:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
/**
* Maps input key/value pairs to a set of intermediate key/value pairs.
*
* <p>Maps are the individual tasks which transform input records into a
* intermediate records. The transformed intermediate records need not be of
* the same type as the input records. A given input pair may map to zero or
* many output pairs.</p>
*
* <p>The Hadoop Map-Reduce framework spawns one map task for each
* {@link InputSplit} generated by the {@link InputFormat} for the job.
* <code>Mapper</code> implementations can access the {@link Configuration} for
* the job via the {@link JobContext#getConfiguration()}.
*
* <p>The framework first calls
* {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
* {@link #map(Object, Object, Context)}
* for each key/value pair in the <code>InputSplit</code>. Finally
* {@link #cleanup(Context)} is called.</p>
*
* <p>All intermediate values associated with a given output key are
* subsequently grouped by the framework, and passed to a {@link Reducer} to
* determine the final output. Users can control the sorting and grouping by
* specifying two key {@link RawComparator} classes.</p>
*
* <p>The <code>Mapper</code> outputs are partitioned per
* <code>Reducer</code>. Users can control which keys (and hence records) go to
* which <code>Reducer</code> by implementing a custom {@link Partitioner}.
*
* <p>Users can optionally specify a <code>combiner</code>, via
* {@link Job#setCombinerClass(Class)}, to perform local aggregation of the
* intermediate outputs, which helps to cut down the amount of data transferred
* from the <code>Mapper</code> to the <code>Reducer</code>.
*
* <p>Applications can specify if and how the intermediate
* outputs are to be compressed and which {@link CompressionCodec}s are to be
* used via the <code>Configuration</code>.</p>
*
* <p>If the job has zero
* reduces then the output of the <code>Mapper</code> is directly written
* to the {@link OutputFormat} without sorting by keys.</p>
*
* <p>Example:</p>
* <p><blockquote><pre>
* public class TokenCounterMapper
* extends Mapper<Object, Text, Text, IntWritable>{
*
* private final static IntWritable one = new IntWritable(1);
* private Text word = new Text();
*
* public void map(Object key, Text value, Context context) throws IOException {
* StringTokenizer itr = new StringTokenizer(value.toString());
* while (itr.hasMoreTokens()) {
* word.set(itr.nextToken());
* context.collect(word, one);
* }
* }
* }
* </pre></blockquote></p>
*
* <p>Applications may override the {@link #run(Context)} method to exert
* greater control on map processing e.g. multi-threaded <code>Mapper</code>s
* etc.</p>
*
* @see InputFormat
* @see JobContext
* @see Partitioner
* @see Reducer
*/
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public class Context
extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
public Context(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) throws IOException, InterruptedException {
super(conf, taskid, reader, writer, committer, reporter, split);
}
}
/**
* Called once at the beginning of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Called once for each key/value pair in the input split. Most applications
* should override this, but the default is the identity function.
*/
@SuppressWarnings("unchecked")
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
}
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* Expert users can override this method for more complete control over the
* execution of the Mapper.
* @param context
* @throws IOException
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
}

从代码中可以看到,Mapper类中定义了一个新的类Context,继承自MapContext

我们来看看MapContext类的源代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
/**
* The context that is given to the {@link Mapper}.
* @param <KEYIN> the key input type to the Mapper
* @param <VALUEIN> the value input type to the Mapper
* @param <KEYOUT> the key output type from the Mapper
* @param <VALUEOUT> the value output type from the Mapper
*/
public class MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private RecordReader<KEYIN,VALUEIN> reader;
private InputSplit split; public MapContext(Configuration conf, TaskAttemptID taskid,
RecordReader<KEYIN,VALUEIN> reader,
RecordWriter<KEYOUT,VALUEOUT> writer,
OutputCommitter committer,
StatusReporter reporter,
InputSplit split) {
super(conf, taskid, writer, committer, reporter);
this.reader = reader;
this.split = split;
}
/**
* Get the input split for this map.
*/
public InputSplit getInputSplit() {
return split;
}
@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return reader.getCurrentKey();
}
@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return reader.getCurrentValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return reader.nextKeyValue();
}
}

MapContext类继承自TaskInputOutputContext,再看看TaskInputOutputContext类的代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
/**
* A context object that allows input and output from the task. It is only
* supplied to the {@link Mapper} or {@link Reducer}.
* @param <KEYIN> the input key type for the task
* @param <VALUEIN> the input value type for the task
* @param <KEYOUT> the output key type for the task
* @param <VALUEOUT> the output value type for the task
*/
public abstract class TaskInputOutputContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
extends TaskAttemptContext implements Progressable {
private RecordWriter<KEYOUT,VALUEOUT> output;
private StatusReporter reporter;
private OutputCommitter committer; public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid,
RecordWriter<KEYOUT,VALUEOUT> output,
OutputCommitter committer,
StatusReporter reporter) {
super(conf, taskid);
this.output = output;
this.reporter = reporter;
this.committer = committer;
}
/**
* Advance to the next key, value pair, returning null if at end.
* @return the key object that was read into, or null if no more
*/
public abstract
boolean nextKeyValue() throws IOException, InterruptedException;
/**
* Get the current key.
* @return the current key object or null if there isn't one
* @throws IOException
* @throws InterruptedException
*/
public abstract
KEYIN getCurrentKey() throws IOException, InterruptedException;
/**
* Get the current value.
* @return the value object that was read into
* @throws IOException
* @throws InterruptedException
*/
public abstract VALUEIN getCurrentValue() throws IOException,
InterruptedException;
/**
* Generate an output key/value pair.
*/
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
output.write(key, value);
}
public Counter getCounter(Enum<?> counterName) {
return reporter.getCounter(counterName);
}
public Counter getCounter(String groupName, String counterName) {
return reporter.getCounter(groupName, counterName);
}
@Override
public void progress() {
reporter.progress();
}
@Override
public void setStatus(String status) {
reporter.setStatus(status);
}
public OutputCommitter getOutputCommitter() {
return committer;
}
}

TaskInputOutputContext类继承自TaskAttemptContext,实现了Progressable接口,先看看Progressable接口的代码:

package org.apache.hadoop.util;
/**
* A facility for reporting progress.
*
* <p>Clients and/or applications can use the provided <code>Progressable</code>
* to explicitly report progress to the Hadoop framework. This is especially
* important for operations which take an insignificant amount of time since,
* in-lieu of the reported progress, the framework has to assume that an error
* has occured and time-out the operation.</p>
*/
public interface Progressable {
/**
* Report progress to the Hadoop framework.
*/
public void progress();
}

TaskAttemptContext类的代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Progressable;
/**
* The context for task attempts.
*/
public class TaskAttemptContext extends JobContext implements Progressable {
private final TaskAttemptID taskId;
private String status = ""; public TaskAttemptContext(Configuration conf,
TaskAttemptID taskId) {
super(conf, taskId.getJobID());
this.taskId = taskId;
}
/**
* Get the unique name for this task attempt.
*/
public TaskAttemptID getTaskAttemptID() {
return taskId;
} /**
* Set the current status of the task to the given string.
*/
public void setStatus(String msg) throws IOException {
status = msg;
}
/**
* Get the last set status message.
* @return the current status message
*/
public String getStatus() {
return status;
}
/**
* Report progress. The subtypes actually do work in this method.
*/
public void progress() {
}
}

TaskAttemptContext继承自类JobContext,最后来看看JobContext的源代码:

package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
* A read-only view of the job that is provided to the tasks while they
* are running.
*/
public class JobContext {
// Put all of the attribute names in here so that Job and JobContext are
// consistent.
protected static final String INPUT_FORMAT_CLASS_ATTR =
"mapreduce.inputformat.class";
protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";
protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";
protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";
protected static final String OUTPUT_FORMAT_CLASS_ATTR =
"mapreduce.outputformat.class";
protected static final String PARTITIONER_CLASS_ATTR =
"mapreduce.partitioner.class"; protected final org.apache.hadoop.mapred.JobConf conf;
private final JobID jobId; public JobContext(Configuration conf, JobID jobId) {
this.conf = new org.apache.hadoop.mapred.JobConf(conf);
this.jobId = jobId;
}
/**
* Return the configuration for the job.
* @return the shared configuration object
*/
public Configuration getConfiguration() {
return conf;
} /**
* Get the unique ID for the job.
* @return the object with the job id
*/
public JobID getJobID() {
return jobId;
}
/**
* Get configured the number of reduce tasks for this job. Defaults to
* <code>1</code>.
* @return the number of reduce tasks for this job.
*/
public int getNumReduceTasks() {
return conf.getNumReduceTasks();
}
/**
* Get the current working directory for the default file system.
*
* @return the directory name.
*/
public Path getWorkingDirectory() throws IOException {
return conf.getWorkingDirectory();
}
/**
* Get the key class for the job output data.
* @return the key class for the job output data.
*/
public Class<?> getOutputKeyClass() {
return conf.getOutputKeyClass();
}
/**
* Get the value class for job outputs.
* @return the value class for job outputs.
*/
public Class<?> getOutputValueClass() {
return conf.getOutputValueClass();
}
/**
* Get the key class for the map output data. If it is not set, use the
* (final) output key class. This allows the map output key class to be
* different than the final output key class.
* @return the map output key class.
*/
public Class<?> getMapOutputKeyClass() {
return conf.getMapOutputKeyClass();
}
/**
* Get the value class for the map output data. If it is not set, use the
* (final) output value class This allows the map output value class to be
* different than the final output value class.
*
* @return the map output value class.
*/
public Class<?> getMapOutputValueClass() {
return conf.getMapOutputValueClass();
}
/**
* Get the user-specified job name. This is only used to identify the
* job to the user.
*
* @return the job's name, defaulting to "".
*/
public String getJobName() {
return conf.getJobName();
}
/**
* Get the {@link InputFormat} class for the job.
*
* @return the {@link InputFormat} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends InputFormat<?,?>> getInputFormatClass()
throws ClassNotFoundException {
return (Class<? extends InputFormat<?,?>>)
conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
}
/**
* Get the {@link Mapper} class for the job.
*
* @return the {@link Mapper} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Mapper<?,?,?,?>> getMapperClass()
throws ClassNotFoundException {
return (Class<? extends Mapper<?,?,?,?>>)
conf.getClass(MAP_CLASS_ATTR, Mapper.class);
}
/**
* Get the combiner class for the job.
*
* @return the combiner class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getCombinerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(COMBINE_CLASS_ATTR, null);
}
/**
* Get the {@link Reducer} class for the job.
*
* @return the {@link Reducer} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Reducer<?,?,?,?>> getReducerClass()
throws ClassNotFoundException {
return (Class<? extends Reducer<?,?,?,?>>)
conf.getClass(REDUCE_CLASS_ATTR, Reducer.class);
}
/**
* Get the {@link OutputFormat} class for the job.
*
* @return the {@link OutputFormat} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends OutputFormat<?,?>> getOutputFormatClass()
throws ClassNotFoundException {
return (Class<? extends OutputFormat<?,?>>)
conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
}
/**
* Get the {@link Partitioner} class for the job.
*
* @return the {@link Partitioner} class for the job.
*/
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
/**
* Get the {@link RawComparator} comparator used to compare keys.
*
* @return the {@link RawComparator} comparator used to compare keys.
*/
public RawComparator<?> getSortComparator() {
return conf.getOutputKeyComparator();
}
/**
* Get the pathname of the job's jar.
* @return the pathname
*/
public String getJar() {
return conf.getJar();
}
/**
* Get the user defined {@link RawComparator} comparator for
* grouping keys of inputs to the reduce.
*
* @return comparator set by the user for grouping values.
* @see Job#setGroupingComparatorClass(Class) for details.
*/
public RawComparator<?> getGroupingComparator() {
return conf.getOutputValueGroupingComparator();
}
}

参考资料

《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

最新文章

  1. [Q&amp;A] MySQL Error 1050(42S01): Table already exist
  2. 解决sea.js引用jQuery提示$ is not a function的问题
  3. Linux查看系统资源使用情况(转)
  4. iptables防火墙原理详解
  5. mysql 中时间和日期函数应用
  6. IOS UIWebView截获html并修改便签内容,宽度自适应
  7. [转]eclipse中使用maven插件的时候,运行run as maven build的时候报错
  8. Stream流的读取使用
  9. FireMonkey Style Designer
  10. Matlab与.NET基于类型安全的接口混合编程入门
  11. [ios-必看] 国人当自强:两岸三地在线编程学习网站大搜罗 [转]
  12. 第十九节,基本数据类型,集合set
  13. DataTables源码分析(一)
  14. [笔记]NumPy基础操作
  15. SpringBoot配置日志logback
  16. iOS逆向开发(6):微信伪造位置
  17. ArcGIS案例教程-通过点坐标生成圆
  18. Decision Trees:机器学习根据大量数据,已知年龄、收入、是否上海人、私家车价格的人,预测Ta是否有真实购买上海黄浦区楼房的能力—Jason niu
  19. C.字符串(字符)操作
  20. [转]PHP与Shell交互

热门文章

  1. HTML5 Canvas 初探
  2. Python并发编程-进程
  3. Selenium之PhantomJS相关设置
  4. 【BZOJ 4527】 4527: K-D-Sequence (线段树)
  5. VS2015快捷键及常用功能
  6. [Codeforces #172] Tutorial
  7. poj 2912 并查集(食物链加强版)
  8. bzoj 1654: [Usaco2006 Jan]The Cow Prom 奶牛舞会 -- Tarjan
  9. NOIP200606金明的预算方案
  10. HDU 4619 Warm up 2 最大独立集