博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Mapper 与 Reducer 解析
阅读量:6756 次
发布时间:2019-06-26

本文共 18009 字,大约阅读时间需要 60 分钟。

1 . 旧版 API 的 Mapper/Reducer 解析

Mapper/Reducer 中封装了应用程序的数据处理逻辑。为了简化接口,MapReduce 要求所有存储在底层分布式文件系统上的数据均要解释成 key/value 的形式,并交给Mapper/Reducer 中的 map/reduce 函数处理,产生另外一些 key/value。Mapper 与 Reducer 的类体系非常类似,我们以 Mapper 为例进行讲解。Mapper 的类图如图所示,包括初始化、Map操作和清理三部分。


(1)初始化
Mapper 继承了 JobConfigurable 接口。该接口中的 configure 方法允许通过 JobConf 参数对 Mapper 进行初始化。

(2)Map 操作

MapReduce 框架会通过 InputFormat 中 RecordReader 从 InputSplit 获取一个个 key/value 对, 并交给下面的 map() 函数处理:

void map(K1 key, V1 value, OutputCollector
output, Reporter reporter) throws IOException;

该函数的参数除了 key 和 value 之外, 还包括 OutputCollector 和 Reporter 两个类型的参数, 分别用于输出结果和修改 Counter 值。

(3)清理

Mapper 通过继承 Closeable 接口(它又继承了 Java IO 中的 Closeable 接口)获得 close方法,用户可通过实现该方法对 Mapper 进行清理。
MapReduce 提供了很多 Mapper/Reducer 实现,但大部分功能比较简单,具体如图所示。它们对应的功能分别是:

❑ChainMapper/ChainReducer:用于支持链式作业。

❑IdentityMapper/IdentityReducer:对于输入 key/value 不进行任何处理, 直接输出。

❑InvertMapper:交换 key/value 位置。

❑ RegexMapper:正则表达式字符串匹配。

❑TokenMapper:将字符串分割成若干个 token(单词),可用作 WordCount 的 Mapper。

❑LongSumReducer:以 key 为组,对 long 类型的 value 求累加和。


对于一个 MapReduce 应用程序,不一定非要存在 Mapper。MapReduce 框架提供了比 Mapper 更通用的接口:MapRunnable,如图所示。用 户可以实现该接口以定制Mapper 的调用 方式或者自己实现 key/value 的处理逻辑,比如,Hadoop Pipes 自行实现了MapRunnable,直接将数据通过 Socket 发送给其他进程处理。提供该接口的另外一个好处是允许用户实现多线程 Mapper。

如图所示, MapReduce 提供了两个 MapRunnable 实现,分别是 MapRunner 和MultithreadedMapRunner,其中 MapRunner 为默认实现。 MultithreadedMapRunner 实现了一种多线程的 MapRunnable。 默认情况下,每个 Mapper 启动 10 个线程,通常用于非 CPU类型的作业以提供吞吐率。

2. 新版 API 的 Mapper/Reducer 解析

从图可知, 新 API 在旧 API 基础上发生了以下几个变化:

❑Mapper 由接口变为类,且不再继承 JobConfigurable 和 Closeable 两个接口,而是直接在类中添加了 setup 和 cleanup 两个方法进行初始化和清理工作。

❑将参数封装到 Context 对象中,这使得接口具有良好的扩展性。

❑去掉 MapRunnable 接口,在 Mapper 中添加 run 方法,以方便用户定制 map() 函数的调用方法,run 默认实现与旧版本中 MapRunner 的 run 实现一样。

❑新 API 中 Reducer 遍历 value 的迭代器类型变为 java.lang.Iterable,使得用户可以采用“ foreach” 形式遍历所有 value,如下所示:

void reduce(KEYIN key, Iterable
values, Context context) throws IOException, InterruptedException { for(VALUEIN value: values) { // 注意遍历方式 context.write((KEYOUT) key, (VALUEOUT) value); }}

Mapper类的完整代码如下:

package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.io.compress.CompressionCodec;/**  * Maps input key/value pairs to a set of intermediate key/value pairs.   *  * 

Maps are the individual tasks which transform input records into a * intermediate records. The transformed intermediate records need not be of * the same type as the input records. A given input pair may map to zero or * many output pairs.

* *

The Hadoop Map-Reduce framework spawns one map task for each * {

@link InputSplit} generated by the {
@link InputFormat} for the job. * Mapper implementations can access the {
@link Configuration} for * the job via the {
@link JobContext#getConfiguration()}. * *

The framework first calls * {

@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by * {
@link #map(Object, Object, Context)} * for each key/value pair in the InputSplit. Finally * {
@link #cleanup(Context)} is called.

* *

All intermediate values associated with a given output key are * subsequently grouped by the framework, and passed to a {

@link Reducer} to * determine the final output. Users can control the sorting and grouping by * specifying two key {
@link RawComparator} classes.

* *

The Mapper outputs are partitioned per * Reducer. Users can control which keys (and hence records) go to * which Reducer by implementing a custom {

@link Partitioner}. * *

Users can optionally specify a combiner, via * {

@link Job#setCombinerClass(Class)}, to perform local aggregation of the * intermediate outputs, which helps to cut down the amount of data transferred * from the Mapper to the Reducer. * *

Applications can specify if and how the intermediate * outputs are to be compressed and which {

@link CompressionCodec}s are to be * used via the Configuration.

* *

If the job has zero * reduces then the output of the Mapper is directly written * to the {

@link OutputFormat} without sorting by keys.

* *

Example:

*

 * public class TokenCounterMapper  *     extends Mapper
{ * * private final static IntWritable one = new IntWritable(1); * private Text word = new Text(); * * public void map(Object key, Text value, Context context) throws IOException { * StringTokenizer itr = new StringTokenizer(value.toString()); * while (itr.hasMoreTokens()) { * word.set(itr.nextToken()); * context.collect(word, one); * } * } * } *

* *

Applications may override the {

@link #run(Context)} method to exert * greater control on map processing e.g. multi-threaded Mappers * etc.

* * @see InputFormat * @see JobContext * @see Partitioner * @see Reducer */public class Mapper
{ public class Context extends MapContext
{ public Context(Configuration conf, TaskAttemptID taskid, RecordReader
reader, RecordWriter
writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) throws IOException, InterruptedException { super(conf, taskid, reader, writer, committer, reporter, split); } } /** * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Called once for each key/value pair in the input split. Most applications * should override this, but the default is the identity function. */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } /** * Called once at the end of the task. */ protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } /** * Expert users can override this method for more complete control over the * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { setup(context); while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } cleanup(context); }}

从代码中可以看到,Mapper类中定义了一个新的类Context,继承自MapContext

我们来看看MapContext类的源代码:

package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;/** * The context that is given to the {
@link Mapper}. * @param
the key input type to the Mapper * @param
the value input type to the Mapper * @param
the key output type from the Mapper * @param
the value output type from the Mapper */public class MapContext
extends TaskInputOutputContext
{ private RecordReader
reader; private InputSplit split; public MapContext(Configuration conf, TaskAttemptID taskid, RecordReader
reader, RecordWriter
writer, OutputCommitter committer, StatusReporter reporter, InputSplit split) { super(conf, taskid, writer, committer, reporter); this.reader = reader; this.split = split; } /** * Get the input split for this map. */ public InputSplit getInputSplit() { return split; } @Override public KEYIN getCurrentKey() throws IOException, InterruptedException { return reader.getCurrentKey(); } @Override public VALUEIN getCurrentValue() throws IOException, InterruptedException { return reader.getCurrentValue(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { return reader.nextKeyValue(); }}

MapContext类继承自TaskInputOutputContext,再看看TaskInputOutputContext类的代码:

package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.Progressable;/** * A context object that allows input and output from the task. It is only * supplied to the {
@link Mapper} or {
@link Reducer}. * @param
the input key type for the task * @param
the input value type for the task * @param
the output key type for the task * @param
the output value type for the task */public abstract class TaskInputOutputContext
extends TaskAttemptContext implements Progressable { private RecordWriter
output; private StatusReporter reporter; private OutputCommitter committer; public TaskInputOutputContext(Configuration conf, TaskAttemptID taskid, RecordWriter
output, OutputCommitter committer, StatusReporter reporter) { super(conf, taskid); this.output = output; this.reporter = reporter; this.committer = committer; } /** * Advance to the next key, value pair, returning null if at end. * @return the key object that was read into, or null if no more */ public abstract boolean nextKeyValue() throws IOException, InterruptedException; /** * Get the current key. * @return the current key object or null if there isn't one * @throws IOException * @throws InterruptedException */ public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; /** * Get the current value. * @return the value object that was read into * @throws IOException * @throws InterruptedException */ public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; /** * Generate an output key/value pair. */ public void write(KEYOUT key, VALUEOUT value ) throws IOException, InterruptedException { output.write(key, value); } public Counter getCounter(Enum
counterName) { return reporter.getCounter(counterName); } public Counter getCounter(String groupName, String counterName) { return reporter.getCounter(groupName, counterName); } @Override public void progress() { reporter.progress(); } @Override public void setStatus(String status) { reporter.setStatus(status); } public OutputCommitter getOutputCommitter() { return committer; }}

TaskInputOutputContext类继承自TaskAttemptContext,实现了Progressable接口,先看看Progressable接口的代码:

package org.apache.hadoop.util;/** * A facility for reporting progress. *  * 

Clients and/or applications can use the provided Progressable * to explicitly report progress to the Hadoop framework. This is especially * important for operations which take an insignificant amount of time since, * in-lieu of the reported progress, the framework has to assume that an error * has occured and time-out the operation.

*/public interface Progressable { /** * Report progress to the Hadoop framework. */ public void progress();}

TaskAttemptContext类的代码:

package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.util.Progressable;/** * The context for task attempts. */public class TaskAttemptContext extends JobContext implements Progressable {  private final TaskAttemptID taskId;  private String status = "";    public TaskAttemptContext(Configuration conf,                             TaskAttemptID taskId) {    super(conf, taskId.getJobID());    this.taskId = taskId;  }  /**   * Get the unique name for this task attempt.   */  public TaskAttemptID getTaskAttemptID() {    return taskId;  }  /**   * Set the current status of the task to the given string.   */  public void setStatus(String msg) throws IOException {    status = msg;  }  /**   * Get the last set status message.   * @return the current status message   */  public String getStatus() {    return status;  }  /**   * Report progress. The subtypes actually do work in this method.   */  public void progress() {   }}

TaskAttemptContext继承自类JobContext,最后来看看JobContext的源代码:

package org.apache.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.RawComparator;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;/** * A read-only view of the job that is provided to the tasks while they * are running. */public class JobContext {  // Put all of the attribute names in here so that Job and JobContext are  // consistent.  protected static final String INPUT_FORMAT_CLASS_ATTR =     "mapreduce.inputformat.class";  protected static final String MAP_CLASS_ATTR = "mapreduce.map.class";  protected static final String COMBINE_CLASS_ATTR = "mapreduce.combine.class";  protected static final String REDUCE_CLASS_ATTR = "mapreduce.reduce.class";  protected static final String OUTPUT_FORMAT_CLASS_ATTR =     "mapreduce.outputformat.class";  protected static final String PARTITIONER_CLASS_ATTR =     "mapreduce.partitioner.class";  protected final org.apache.hadoop.mapred.JobConf conf;  private final JobID jobId;    public JobContext(Configuration conf, JobID jobId) {    this.conf = new org.apache.hadoop.mapred.JobConf(conf);    this.jobId = jobId;  }  /**   * Return the configuration for the job.   * @return the shared configuration object   */  public Configuration getConfiguration() {    return conf;  }  /**   * Get the unique ID for the job.   * @return the object with the job id   */  public JobID getJobID() {    return jobId;  }  /**   * Get configured the number of reduce tasks for this job. Defaults to    * 1.   * @return the number of reduce tasks for this job.   */  public int getNumReduceTasks() {    return conf.getNumReduceTasks();  }  /**   * Get the current working directory for the default file system.   *    * @return the directory name.   */  public Path getWorkingDirectory() throws IOException {    return conf.getWorkingDirectory();  }  /**   * Get the key class for the job output data.   * @return the key class for the job output data.   */  public Class
getOutputKeyClass() { return conf.getOutputKeyClass(); } /** * Get the value class for job outputs. * @return the value class for job outputs. */ public Class
getOutputValueClass() { return conf.getOutputValueClass(); } /** * Get the key class for the map output data. If it is not set, use the * (final) output key class. This allows the map output key class to be * different than the final output key class. * @return the map output key class. */ public Class
getMapOutputKeyClass() { return conf.getMapOutputKeyClass(); } /** * Get the value class for the map output data. If it is not set, use the * (final) output value class This allows the map output value class to be * different than the final output value class. * * @return the map output value class. */ public Class
getMapOutputValueClass() { return conf.getMapOutputValueClass(); } /** * Get the user-specified job name. This is only used to identify the * job to the user. * * @return the job's name, defaulting to "". */ public String getJobName() { return conf.getJobName(); } /** * Get the {
@link InputFormat} class for the job. * * @return the {
@link InputFormat} class for the job. */ @SuppressWarnings("unchecked") public Class
> getInputFormatClass() throws ClassNotFoundException { return (Class
>) conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class); } /** * Get the {
@link Mapper} class for the job. * * @return the {
@link Mapper} class for the job. */ @SuppressWarnings("unchecked") public Class
> getMapperClass() throws ClassNotFoundException { return (Class
>) conf.getClass(MAP_CLASS_ATTR, Mapper.class); } /** * Get the combiner class for the job. * * @return the combiner class for the job. */ @SuppressWarnings("unchecked") public Class
> getCombinerClass() throws ClassNotFoundException { return (Class
>) conf.getClass(COMBINE_CLASS_ATTR, null); } /** * Get the {
@link Reducer} class for the job. * * @return the {
@link Reducer} class for the job. */ @SuppressWarnings("unchecked") public Class
> getReducerClass() throws ClassNotFoundException { return (Class
>) conf.getClass(REDUCE_CLASS_ATTR, Reducer.class); } /** * Get the {
@link OutputFormat} class for the job. * * @return the {
@link OutputFormat} class for the job. */ @SuppressWarnings("unchecked") public Class
> getOutputFormatClass() throws ClassNotFoundException { return (Class
>) conf.getClass(OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class); } /** * Get the {
@link Partitioner} class for the job. * * @return the {
@link Partitioner} class for the job. */ @SuppressWarnings("unchecked") public Class
> getPartitionerClass() throws ClassNotFoundException { return (Class
>) conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class); } /** * Get the {
@link RawComparator} comparator used to compare keys. * * @return the {
@link RawComparator} comparator used to compare keys. */ public RawComparator
getSortComparator() { return conf.getOutputKeyComparator(); } /** * Get the pathname of the job's jar. * @return the pathname */ public String getJar() { return conf.getJar(); } /** * Get the user defined {
@link RawComparator} comparator for * grouping keys of inputs to the reduce. * * @return comparator set by the user for grouping values. * @see Job#setGroupingComparatorClass(Class) for details. */ public RawComparator
getGroupingComparator() { return conf.getOutputValueGroupingComparator(); }}

参考资料

《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

转载地址:http://rrgho.baihongyu.com/

你可能感兴趣的文章
图片的三级缓存
查看>>
js跨域问题解决方案
查看>>
(八)统一配置中心-Config
查看>>
I.MX6 Android CAN 命令行测试
查看>>
linux shell except tcl login ssh Automatic interaction
查看>>
iOS JSONModel解析数据成Model
查看>>
QQ空间抢车位刷钱方法汇总
查看>>
[LeetCode] Missing Number
查看>>
java.lang.IllegalStateException: Illegal access
查看>>
STL容器的效率比较
查看>>
Ckeditor使用总结
查看>>
人事面试100问题--巧妙应答
查看>>
【工具类】怎么进入阿里云docker仓库
查看>>
Ceres-Solver库入门
查看>>
悲惨而又丢人的创业经历:草根创业者含恨倾诉为什么失败
查看>>
理解WebKit和Chromium: WebKit, WebKit2, Chromium和Chrome介绍
查看>>
hanoi塔的递归算法
查看>>
C# 校验给定的ip地址是否合法
查看>>
lumen 登陆 注册 demo
查看>>
基于服务的并行系统的通讯方式探讨
查看>>