1 . 旧版 API 的 Mapper/Reducer 解析
Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式,并交给Mapper/Reducer 中的 map/reduce 函数处理,产生另外一些 key/value。Mapper 与 Reducer 的类体系非常类似,我们以 Mapper 为例进行讲解。Mapper 的类图如图所示,包括初始化、Map操作和清理三部分。
(1)初始化Mapper 继承了 JobConfigurable 接口。该接口中的 configure 方法允许通过 JobConf 参数对 Mapper 进行初始化。
(2)Map 操作
MapReduce 框架会通过 InputFormat 中 RecordReader 从 InputSplit 获取一个个 key/value 对, 并交给下面的 map() 函数处理:void map(K1 key, V1 value, OutputCollectoroutput, Reporter reporter) throws IOException;
该函数的参数除了 key 和 value 之外, 还包括 OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。
(3)清理
Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close方法,用户可通过实现该方法对 Mapper 进行清理。MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图所示。它们对应的功能分别是:❑ChainMapper/ChainReducer:用于支持链式作业。
❑IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理, 直接输出。
❑InvertMapper:交换 key/value 位置。
❑ RegexMapper:正则表达式字符串匹配。
❑TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。
❑LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。
对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了比 Mapper 更通用的接口:MapRunnable,如图所示。用 户可以实现该接口以定制Mapper 的调用 方式或者自己实现 key/value 的处理逻辑,比如,Hadoop Pipes 自行实现了MapRunnable,直接将数据通过 Socket 发送给其他进程处理。提供该接口的另外一个好处是允许用户实现多线程 Mapper。
如图所示, MapReduce 提供了两个 MapRunnable 实现,分别是 MapRunner 和MultithreadedMapRunner,其中 MapRunner 为默认实现。 MultithreadedMapRunner 实现了一种多线程的 MapRunnable。 默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU类型的作业以提供吞吐率。
2. 新版 API 的 Mapper/Reducer 解析
从图可知, 新 API 在旧 API 基础上发生了以下几个变化:
❑Mapper 由接口变为类,且不再继承 JobConfigurable 和 Closeable 两个接口,而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。
❑将参数封装到 Context 对象中,这使得接口具有良好的扩展性。
❑去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。
❑新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采用“ foreach” 形式遍历所有 value,如下所示:
void reduce(KEYIN key, Iterablevalues, Context context) throws IOException, InterruptedException { for(VALUEIN value: values) { // 注意遍历方式 context.write((KEYOUT) key, (VALUEOUT) value); }}
Mapper类的完整代码如下:
package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.compress.CompressionCodec;/** * Maps input key/value pairs to a set of intermediate key/value pairs. * *Maps are the individual tasks which transform input records into a * intermediate records. The transformed intermediate records need not be of * the same type as the input records. A given input pair may map to zero or * many output pairs.
* *The Hadoop Map-Reduce framework spawns one map task for each * {
@link InputSplit} generated by the { @link InputFormat} for the job. *Mapper
implementations can access the { @link Configuration} for * the job via the { @link JobContext#getConfiguration()}. * *The framework first calls * {
@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by * { @link #map(Object, Object, Context)} * for each key/value pair in theInputSplit
. Finally * { @link #cleanup(Context)} is called. * *All intermediate values associated with a given output key are * subsequently grouped by the framework, and passed to a {
@link Reducer} to * determine the final output. Users can control the sorting and grouping by * specifying two key { @link RawComparator} classes. * *The
@link Partitioner}. * *Mapper
outputs are partitioned per *Reducer
. Users can control which keys (and hence records) go to * whichReducer
by implementing a custom {Users can optionally specify a
@link Job#setCombinerClass(Class)}, to perform local aggregation of the * intermediate outputs, which helps to cut down the amount of data transferred * from thecombiner
, via * {Mapper
to theReducer
. * *Applications can specify if and how the intermediate * outputs are to be compressed and which {
@link CompressionCodec}s are to be * used via theConfiguration
. * *If the job has zero * reduces then the output of the
@link OutputFormat} without sorting by keys. * *Mapper
is directly written * to the {Example:
** ** public class TokenCounterMapper * extends MapperApplications may override the {
@link #run(Context)} method to exert * greater control on map processing e.g. multi-threadedMapper
s * etc. * * @see InputFormat * @see JobContext * @see Partitioner * @see Reducer */public class Mapper{ public class Context extends MapContext { public Context(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }}
从代码中可以看到,Mapper类中定义了一个新的类Context,继承自MapContext
我们来看看MapContext类的源代码:
package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;/** * The context that is given to the { @link Mapper}. * @paramthe key input type to the Mapper * @param the value input type to the Mapper * @param the key output type from the Mapper * @param the value output type from the Mapper */public class MapContext extends TaskInputOutputContext { private RecordReader reader; private InputSplit split; public MapContext(Configuration conf, TaskAttemptID taskid, RecordReader reader, RecordWriter writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); }}
MapContext类继承自TaskInputOutputContext,再看看TaskInputOutputContext类的代码:
package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.Progressable;/** * A context object that allows input and output from the task. It is only * supplied to the { @link Mapper} or { @link Reducer}. * @paramthe input key type for the task * @param the input value type for the task * @param the output key type for the task * @param the output value type for the task */public abstract class TaskInputOutputContext extends TaskAttemptContext implements Progressable { private RecordWriter output; private StatusReporter reporter; private OutputCommitter committer; public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid, RecordWriter output, OutputCommitter committer, StatusReporter reporter) { super(conf, taskid); this.output = output; this.reporter = reporter; this.committer = committer; } /** * Advance to the next key, value pair, returning null if at end. * @return the key object that was read into, or null if no more */ public abstract boolean nextKeyValue() throws IOException, InterruptedException; /** * Get the current key. * @return the current key object or null if there isn't one * @throws IOException * @throws InterruptedException */ public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; /** * Get the current value. * @return the value object that was read into * @throws IOException * @throws InterruptedException */ public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; /** * Generate an output key/value pair. */ public void write(KEYOUT key, VALUEOUT value ) throws IOException, InterruptedException { output.write(key, value); } public Counter getCounter(Enum counterName) { return reporter.getCounter(counterName); } public Counter getCounter(String groupName, String counterName) { return reporter.getCounter(groupName, counterName); } @Override public void progress() { reporter.progress(); } @Override public void setStatus(String status) { reporter.setStatus(status); } public OutputCommitter getOutputCommitter() { return committer; }}
TaskInputOutputContext类继承自TaskAttemptContext,实现了Progressable接口,先看看Progressable接口的代码:
package org.apache.hadoop.util;/** * A facility for reporting progress. * *Clients and/or applications can use the provided
*/public interface Progressable { /** * Report progress to the Hadoop framework. */ public void progress();}Progressable
* to explicitly report progress to the Hadoop framework. This is especially * important for operations which take an insignificant amount of time since, * in-lieu of the reported progress, the framework has to assume that an error * has occured and time-out the operation.
TaskAttemptContext类的代码:
package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.Progressable;/** * The context for task attempts. */public class TaskAttemptContext extends JobContext implements Progressable { private final TaskAttemptID taskId; private String status = ""; public TaskAttemptContext(Configuration conf, TaskAttemptID taskId) { super(conf, taskId.getJobID()); this.taskId = taskId; } /** * Get the unique name for this task attempt. */ public TaskAttemptID getTaskAttemptID() { return taskId; } /** * Set the current status of the task to the given string. */ public void setStatus(String msg) throws IOException { status = msg; } /** * Get the last set status message. * @return the current status message */ public String getStatus() { return status; } /** * Report progress. The subtypes actually do work in this method. */ public void progress() { }}
TaskAttemptContext继承自类JobContext,最后来看看JobContext的源代码:
package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;/** * A read-only view of the job that is provided to the tasks while they * are running. */public class JobContext { // Put all of the attribute names in here so that Job and JobContext are // consistent. protected static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.inputformat.class"; protected static final String MAP_CLASS_ATTR = "mapreduce.map.class"; protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class"; protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class"; protected static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.outputformat.class"; protected static final String PARTITIONER_CLASS_ATTR = "mapreduce.partitioner.class"; protected final org.apache.hadoop.mapred.JobConf conf; private final JobID jobId; public JobContext(Configuration conf, JobID jobId) { this.conf = new org.apache.hadoop.mapred.JobConf(conf); this.jobId = jobId; } /** * Return the configuration for the job. * @return the shared configuration object */ public Configuration getConfiguration() { return conf; } /** * Get the unique ID for the job. * @return the object with the job id */ public JobID getJobID() { return jobId; } /** * Get configured the number of reduce tasks for this job. Defaults to * 1
. * @return the number of reduce tasks for this job. */ public int getNumReduceTasks() { return conf.getNumReduceTasks(); } /** * Get the current working directory for the default file system. * * @return the directory name. */ public Path getWorkingDirectory() throws IOException { return conf.getWorkingDirectory(); } /** * Get the key class for the job output data. * @return the key class for the job output data. */ public Class getOutputKeyClass() { return conf.getOutputKeyClass(); } /** * Get the value class for job outputs. * @return the value class for job outputs. */ public Class getOutputValueClass() { return conf.getOutputValueClass(); } /** * Get the key class for the map output data. If it is not set, use the * (final) output key class. This allows the map output key class to be * different than the final output key class. * @return the map output key class. */ public Class getMapOutputKeyClass() { return conf.getMapOutputKeyClass(); } /** * Get the value class for the map output data. If it is not set, use the * (final) output value class This allows the map output value class to be * different than the final output value class. * * @return the map output value class. */ public Class getMapOutputValueClass() { return conf.getMapOutputValueClass(); } /** * Get the user-specified job name. This is only used to identify the * job to the user. * * @return the job's name, defaulting to "". */ public String getJobName() { return conf.getJobName(); } /** * Get the { @link InputFormat} class for the job. * * @return the { @link InputFormat} class for the job. */ @SuppressWarnings("unchecked") public Class > getInputFormatClass() throws ClassNotFoundException { return (Class >) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); } /** * Get the { @link Mapper} class for the job. * * @return the { @link Mapper} class for the job. */ @SuppressWarnings("unchecked") public Class > getMapperClass() throws ClassNotFoundException { return (Class >) conf.getClass(MAP_CLASS_ATTR, Mapper.class); } /** * Get the combiner class for the job. * * @return the combiner class for the job. */ @SuppressWarnings("unchecked") public Class > getCombinerClass() throws ClassNotFoundException { return (Class >) conf.getClass(COMBINE_CLASS_ATTR, null); } /** * Get the { @link Reducer} class for the job. * * @return the { @link Reducer} class for the job. */ @SuppressWarnings("unchecked") public Class > getReducerClass() throws ClassNotFoundException { return (Class >) conf.getClass(REDUCE_CLASS_ATTR, Reducer.class); } /** * Get the { @link OutputFormat} class for the job. * * @return the { @link OutputFormat} class for the job. */ @SuppressWarnings("unchecked") public Class > getOutputFormatClass() throws ClassNotFoundException { return (Class >) conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class); } /** * Get the { @link Partitioner} class for the job. * * @return the { @link Partitioner} class for the job. */ @SuppressWarnings("unchecked") public Class > getPartitionerClass() throws ClassNotFoundException { return (Class >) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); } /** * Get the { @link RawComparator} comparator used to compare keys. * * @return the { @link RawComparator} comparator used to compare keys. */ public RawComparator getSortComparator() { return conf.getOutputKeyComparator(); } /** * Get the pathname of the job's jar. * @return the pathname */ public String getJar() { return conf.getJar(); } /** * Get the user defined { @link RawComparator} comparator for * grouping keys of inputs to the reduce. * * @return comparator set by the user for grouping values. * @see Job#setGroupingComparatorClass(Class) for details. */ public RawComparator getGroupingComparator() { return conf.getOutputValueGroupingComparator(); }}
参考资料
《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》