mapreduce wordcount处理什么任务

没有更多推荐了,
不良信息举报
举报内容:
Hadoop旧mapreduce的map任务切分原理
举报原因:
原文地址:
原因补充:
最多只允许输入30个字
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!并行数据处理技术mapreduce适用于处理哪些任务_百度知道
并行数据处理技术mapreduce适用于处理哪些任务
我有更好的答案
摘要:MapReduce是Hadoop的又一核心模块,从MapReduce是什么,MapReduce能做什么以及MapReduce的工作机制三方面认识MapReduce。
采纳率:90%
来自团队:
17、全球尺度看,不同生物类群受威胁严重程度排序为( C)。(本题分数:2 分)存疑A、兽类&两栖类&珊瑚B、兽类&珊瑚&两栖类C、珊瑚&两栖类&兽类
17、全球尺度看,不同生物类群受威胁严重程度排序为(
)。(本题分数:2 分)存疑A、兽类&两栖类&珊瑚B、兽类&珊瑚&两栖类C、珊瑚&两栖类&兽类
可以进行海量数据的处理,比如最简单的词频统计,比如对日志中的访问内容进行分析统计,对相似图片的处理统计,凡是需要处理的数据量比较大的情况下,就可以用MapReduce进行处理和实现。
1条折叠回答
为您推荐:
其他类似问题
mapreduce的相关知识
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。没有更多推荐了,
不良信息举报
举报内容:
MapReduce学习笔记之Reduce任务(四)
举报原因:
原文地址:
原因补充:
最多只允许输入30个字
加入CSDN,享受更精准的内容推荐,与500万程序员共同成长!概述/MapReduce
MapReduce是开发的编程工具,用于大规模数据集(大于1TB)的并行运算。概念"(映射)"和"Reduce(化简)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。当前的软件实现是指定一个Map(映射),用来把一组键值对映射成一组新的键值对,指定并发的Reduce(化简)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
映射和化简/MapReduce
简单说来,一个映射函数就是对一些独立元素组成的概念上的列表(例如,一个测试成绩的列表)的每一个元素进行指定的操作(比如前面的例子里,有人发现所有学生的成绩都被高估了一分,他可以定义一个“减一”的映射函数,用来修正这个错误。)。事实上,每个元素都是被独立操作的,而原始列表没有被更改,因为这里创建了一个新的列表来保存新的答案。这就是说,Map操作是可以高度并行的,这对高性能要求的应用以及并行计算领域的需求非常有用。而化简操作指的是对一个列表的元素进行适当的合并(继续看前面的例子,如果有人想知道班级的平均分该怎么做?他可以定义一个化简函数,通过让列表中的元素跟自己的相邻的元素相加的方式把列表减半,如此递归运算直到列表只剩下一个元素,然后用这个元素除以人数,就得到了平均分。)。虽然他不如映射函数那么并行,但是因为化简总是有一个简单的答案,大规模的运算相对独立,所以化简函数在高度并行环境下也很有用。
分布和可靠性/MapReduce
MapReduce通过把对数据集的大规模操作分发给网络上的每个节点实现可靠性;每个节点会周期性的把完成的工作和状态的更新报告回来。如果一个节点保持沉默超过一个预设的时间间隔,主节点(类同Google File System中的主服务器)记录下这个节点状态为死亡,并把分配给这个节点的数据发到别的节点。每个操作使用命名文件的原子操作以确保不会发生并行线程间的冲突;当文件被改名的时候,系统可能会把他们复制到任务名以外的另一个名字上去。(避免副作用)。化简操作工作方式很类似,但是由于化简操作在并行能力较差,主节点会尽量把化简操作调度在一个节点上,或者离需要操作的数据尽可能进的节点上了;这个特性可以满足Google的需求,因为他们有足够的带宽,他们的内部网络没有那么多的机器。
用途/MapReduce
在Google,MapReduce用在非常广泛的应用程序中,包括“分布grep,分布排序,web连接图反转,每台机器的词矢量,web访问分析,反向索引构建,,,基于统计的机器翻译...”值得注意的是,MapReduce实现以后,它被用来重新生成Google的整个索引,并取代老的ad hoc程序去更新索引。MapReduce会生成大量的临时文件,为了提高效率,它利用系统来管理和访问这些文件。
主要功能/MapReduce
MapReduce提供了以下的主要功能:
1)数据划分和计算任务调度:
系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并负责Map节点执行的同步控制。
2)数据/代码互定位:
为了减少数据通信,一个基本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻找可用节点以减少通信延迟。
3)系统优化:
为了减少数据通信开销,中间结果数据进入Reduce节点前会进行一定的合并处理;一个Reduce节点所处理的数据可能会来自多个Map节点,为了避免Reduce计算阶段发生数据相关性,Map节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个Reduce节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。
4)出错检测和恢复:
以低端商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此MapReduce需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提高数据存储的可靠性,并能及时检测和恢复出错的数据。
主要技术特征/MapReduce
MapReduce设计上具有以下主要的技术特征:
1)向“外”横向扩展,而非向“上”纵向扩展
即MapReduce集群的构建完全选用价格便宜、易于扩展的低端商用服务器,而非价格昂贵、不易扩展的高端服务器。
对于大规模数据处理,由于有大量数据存储需要,显而易见,基于低端服务器的集群远比基于高端服务器的集群优越,这就是为什么MapReduce并行计算集群会基于低端服务器实现的原因。
2)失效被认为是常态
MapReduce集群中使用大量的低端服务器,因此,节点硬件失效和软件出错是常态,因而一个良好设计、具有高容错性的并行计算系统不能因为节点失效而影响计算服务的质量,任何节点失效都不应当导致结果的不一致或不确定性;任何一个节点失效时,其他节点要能够无缝接管失效节点的计算任务;当失效节点恢复后应能自动无缝加入集群,而不需要管理员人工进行系统配置。
MapReduce并行计算软件框架使用了多种有效的错误检测和恢复机制,如节点自动重启技术,使集群和计算框架具有对付节点失效的健壮性,能有效处理失效节点的检测和恢复。
3)把处理向数据迁移
传统高性能计算系统通常有很多处理器节点与一些外存储器节点相连,如用存储区域网络(Storage&Area,SAN&Network)连接的磁盘阵列,因此,大规模数据处理时外存文件数据I/O访问会成为一个制约系统性能的瓶颈。
为了减少大规模数据并行计算系统中的数据通信开销,代之以把数据传送到处理节点(数据向处理器或代码迁移),应当考虑将处理向数据靠拢和迁移。MapReduce采用了数据/代码互定位的技术方法,计算节点将首先尽量负责计算其本地存储的数据,以发挥数据本地化特点,仅当节点无法处理本地数据时,再采用就近原则寻找其他可用计算节点,并把数据传送到该可用计算节点。
4)顺序处理数据、避免随机访问数据
大规模数据处理的特点决定了大量的数据记录难以全部存放在内存,而通常只能放在外存中进行处理。由于磁盘的顺序访问要远比随机访问快得多,因此MapReduce主要设计为面向顺序式大规模数据的磁盘访问处理。
为了实现面向大数据集批处理的高吞吐量的并行处理,MapReduce可以利用集群中的大量数据存储节点同时访问数据,以此利用分布集群中大量节点上的磁盘集合提供高带宽的数据访问和传输。
5)为应用开发者隐藏系统层细节
软件工程实践指南中,专业程序员认为之所以写程序困难,是因为程序员需要记住太多的编程细节(从变量名到复杂算法的边界情况处理),这对大脑记忆是一个巨大的认知负担,需要高度集中注意力;而并行程序编写有更多困难,如需要考虑多线程中诸如同步等复杂繁琐的细节。由于并发执行中的不可预测性,程序的调试查错也十分困难;而且,大规模数据处理时程序员需要考虑诸如数据分布存储管理、数据分发、数据通信和同步、计算结果收集等诸多细节问题。
MapReduce提供了一种抽象机制将程序员与系统层细节隔离开来,程序员仅需描述需要计算什么(What&to&compute),而具体怎么去计算(How&to&compute)就交由系统的执行框架处理,这样程序员可从系统层细节中解放出来,而致力于其应用本身计算问题的算法设计。
6)平滑无缝的可扩展性
这里指出的可扩展性主要包括两层意义上的扩展性:数据扩展和系统规模扩展性。
理想的软件算法应当能随着数据规模的扩大而表现出持续的有效性,性能上的下降程度应与数据规模扩大的倍数相当;在集群规模上,要求算法的计算性能应能随着节点数的增加保持接近线性程度的增长。绝大多数现有的单机算法都达不到以上理想的要求;把中间结果数据维护在内存中的单机算法在大规模数据处理时很快失效;从单机到基于大规模集群的并行计算从根本上需要完全不同的算法设计。奇妙的是,MapReduce在很多情形下能实现以上理想的扩展性特征。
多项研究发现,对于很多计算问题,基于MapReduce的计算性能可随节点数目增长保持近似于线性的增长。
案例:统计词频/MapReduce
MapReduce伪代码
实现Map和Reduce两个函数
Map函数和Reduce函数是交给用户实现的,这两个函数定义了任务本身。Map函数接受一个键值对(key-value&pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。
ClassMapper
methodmap(String&input_key,&String&input_value):
//&input_key:&text&document&name
//&input_value:&document&contents
for&eachword&w&ininput_value:
EmitIntermediate(w,&"1");Reduce函数接受一个键,以及相关的一组值,将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。
ClassReducer
method&reduce(String&output_key,Iteratorintermediate_values):
//&output_key:&a&word
//&output_values:&a&list&of&counts
intresult&=&0;
for&eachv&inintermediate_values:
result&+=&ParseInt(v);
Emit(AsString(result));
其他实现/MapReduce
Nutch项目开发了一个实验性的MapReduce的实现【2】。
参考/MapReduce
Dean, Jeffrey & Ghemawat, Sanjay (2004). "MapReduce: Simplified Data Processing on Large Clusters". Retrieved Apr. 6, 2005 "Our abstraction is inspired by the map and reduce primitives present in Lisp and many other functional languages." -"MapReduce: Simplified Data Processing on Large Clusters" 外部链接Interpreting the Data: Parallel Analysis with - a paper on an internal tool at Google, Sawzall, which acts as an interface to MapReduce, intended to make MapReduce much easier to use. Discussion on Lambda the Ultimate. 微尘程序员网站资料库 -& Google研究 -& MapReduce
&|&相关影像
互动百科的词条(含所附图片)系由网友上传,如果涉嫌侵权,请与客服联系,我们将按照法律之相关规定及时进行处理。未经许可,禁止商业网站等复制、抓取本站内容;合理使用者,请注明来源于www.baike.com。
登录后使用互动百科的服务,将会得到个性化的提示和帮助,还有机会和专业认证智愿者沟通。
此词条还可添加&
编辑次数:12次
参与编辑人数:12位
最近更新时间: 13:15:42
贡献光荣榜
猜你想了解
扫码下载APP在上一节我们分析了TaskTracker如何对JobTracker分配过来的任务进行初始化,并创建各类JVM启动所需的信息,最终创建JVM的整个过程,本节我们继续来看,JVM启动后,执行的是Child类中的Main方法,这个方法是如何执行的。
&1,从命令参数中解析相应参数,获取JVMID、建立RPC连接、启动日志线程等初始化操作:
父进程(即TaskTracker)在启动子进程时,会加入一些参数,如本机的IP、端口、TaskAttemptID等等,通过解析可以得到JVMID。
String host = args[0];
int port = Integer.parseInt(args[1]);
final InetSocketAddress address = NetUtils.makeSocketAddr(host, port);
final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);
final String logLocation = args[3];
final int SLEEP_LONGER_COUNT = 5;
int jvmIdInt = Integer.parseInt(args[4]);
JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
另外,获取证书,与父进程建立RPC连接,此处使用的RPC接口就是上节分析过的TaskUmbilicalProtocol(TaskTracker实现了这个接口):
final TaskUmbilicalProtocol umbilical =
taskOwner.doAs(new PrivilegedExceptionAction&TaskUmbilicalProtocol&() {
public TaskUmbilicalProtocol run() throws Exception {
return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID,
defaultConf);
RPC是Hadoop里面的类,用于实现远程过程调用,前面我们已经分析过了,其RPC机制是通过JAVA中的动态代理&java.lang.reflect.InvocationHandler和java.lang.reflect.Proxy实现的,当客户端调用TaskUmbilicalProtocol的某个方法时,将转入实现了接口InvocationHandler的invoke方法,执行下面的代码:
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
client是org.apache.hadoop.ipc.Client类,其call方法调用以下代码将RPC所需参数传递至服务端:
Connection connection = getConnection(remoteId, call);
connection.sendParam(call);
// send the parameter
参数打包操作在sendParam方法中实现。
这里,Child子进程利用这一原理与TaskTracker父进程建立RPC关系,他们之间的调用接口即TaskUmbilicalProtocol中声明的方法。
之后,Child子进程会为JVM建立一个钩子:
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
if (taskid != null) {
TaskLog.syncLogs(logLocation, taskid, isCleanup, currentJobSegmented);
} catch (Throwable throwable) {
钩子的意义在于当JVM关闭了,会执行钩子对象中定义的方法,即上面的run方法,只有执行完了这些方法后,JVM才会真正关闭。从代码可以看出,主要是写日志。
除了钩子中会写日志外,Child子进程会创建一个专门写日志的线程。每过5秒会同步日志一次:
Thread t = new Thread() {
public void run() {
//every so often wake up and syncLogs so that we can track
//logs of the currently running task
while (true) {
Thread.sleep(5000);
if (taskid != null) {
TaskLog.syncLogs(logLocation, taskid, isCleanup, currentJobSegmented);
} catch (InterruptedException ie) {
} catch (IOException iee) {
LOG.error("Error in syncLogs: " + iee);
System.exit(-1);
t.setName("Thread for syncLogs");
t.setDaemon(true);
t.start();
之后,获得JVM的进程ID,获得进程ID的代码:
String pid = "";
if (!Shell.WINDOWS) {
pid = System.getenv().get("JVM_PID");
判断是否是Windows操作系统的代码:
/** Set to true on Windows platforms */
public static final boolean WINDOWS /* borrowed from Path.WINDOWS */
= System.getProperty("os.name").startsWith("Windows");
根据获得的PID,创建JvmContext对象,该对象实际上主要记录的就是进程ID和JVMID。
2,在短暂的初始化过程之后,接下来就是一个死循环。在死循环之外,是一个异常处理代码,主要结构为:
while (true) {
。。。。。。。。
}catch (FSError e) {
LOG.fatal("FSError from child", e);
umbilical.fsError(taskid, e.getMessage(), jvmContext);
catch (Throwable throwable) {
。。。。。。。。。
umbilical.fatalError(taskid, cause, jvmContext);
} finally {
RPC.stopProxy(umbilical);
shutdownMetrics();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
即出现了异常后,利用RPC机制向TaskTracker汇报,最终停止RPC代理,并关闭日志管理。到这里,整个Main方法也结束。因此,Child子进程的主要功能就在死循环中。我们接下来对循环中的代码进行分析。
3,首先执行的是获取JvmTask:
JvmTask myTask = umbilical.getTask(context);
umbilical是代理接口,向TaskTracker请求任务。他们之间传递的只有context对象,前面提过,该对象只包含进程ID和JVMID两个参数。
父进程首先对其进行鉴权,并记录其进程ID:
authorizeJVM(context.jvmId.getJobId());jvmManager.setPidToJvm(jvmId, context.pid);
之后,查看JVM传递过来的Job ID对应的任务是否处于运行当中,RunningJob在上一节分析过,在初始化过程中创建出来的:
RunningJob rjob = runningJobs.get(jvmId.getJobId());
如果该Job ID对应的任务没有处于运行中,可能是因为超期的Job等等,该Job实际上早已结束,总之出现了异常,试图关闭JVM,返回空的任务。
如果正处于运行中,则获取对应的任务对象:
TaskInProgress tip = jvmManager.getTaskForJvm(jvmId);& public TaskInProgress getTaskForJvm(JVMId jvmId)&&&&& throws IOException {&&& if (jvmId.isMapJVM()) {&&&&& return mapJvmManager.getTaskForJvm(jvmId);&&& } else {&&&&& return reduceJvmManager.getTaskForJvm(jvmId);&&& }& }
mapJvmManager、reduceJvmManager都是JvmManagerForType对象,其getTaskForJvm方法为:
synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
throws IOException {
if (jvmToRunningTask.containsKey(jvmId)) {
//Incase of JVM reuse, tasks are returned to previously launched
//JVM via this method. However when a new task is launched
//the task being returned has to be initialized.
TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
Task task = taskRunner.getTaskInProgress().getTask();
jvmRunner.taskGiven(task);
return taskRunner.getTaskInProgress();
return null;
可见,从jvmToRunningTask(Map &JVMId,TaskRunner&对象)中取出对应的TaskRunner、JvmRunner、Task。
检测Map&TaskAttemptID, TaskInProgress& tasks中是否包含该任务(有可能已经执行完了),如果存在,则构造一个JvmTask对象返回,该对象主要封装了MapTask或ReduceTask对象。
4,获取MapReduce运行的目录信息:
// setup the child's mapred-local-dir. The child is now sandboxed and
// can only see files down and under attemtdir only.
TaskRunner.setupChildMapredLocalDirs(task, job);
// setup the child's attempt directories
localizeTask(task, job, logLocation);
//setupWorkDir actually sets up the symlinks for the distributed
//cache. After a task exits we wipe the workdir clean, and hence
//the symlinks have to be rebuilt.
TaskRunner.setupWorkDir(job, new File(cwd));
//create the index file so that the log files
//are viewable immediately
TaskLog.syncLogs
(logLocation, taskid, isCleanup, logIsSegmented(job));
在setupChildMapredLocalDirs中,获取"mapred.local.dir"本地目录;
static void setupChildMapredLocalDirs(Task t, JobConf conf) {
String[] localDirs = conf.getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
String jobId = t.getJobID().toString();
String taskId = t.getTaskID().toString();
boolean isCleanup = t.isTaskCleanupTask();
String user = t.getUser();
StringBuffer childMapredLocalDir =
new StringBuffer(localDirs[0] + Path.SEPARATOR
+ TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
for (int i = 1; i & localDirs. i++) {
childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+ TaskTracker.getLocalTaskDir(user, jobId, taskId, isCleanup));
LOG.debug("mapred.local.dir for child : " + childMapredLocalDir);
conf.set("mapred.local.dir", childMapredLocalDir.toString());
该目录即childMapredLocalDir,是根据本地目录JobConf.MAPRED_LOCAL_DIR_PROPERTY,加上jobid、TaskID等等组成的。
localizeTask将Job配置信息写成本地XML文件。
TaskRunner.setupWorkDir创建了Child子进程工作的临时目录。
之后,获取每个JVM可以运行的任务数量,默认是1:
numTasksToExecute = job.getNumTasksToExecutePerJvm();& /**&& * Get the number of tasks that a spawned JVM should execute&& */& public int getNumTasksToExecutePerJvm() {&&& return getInt("mapred.job.reuse.jvm.num.tasks", 1);& }
这个参数与上一节分析的JVM重用机制直接相关。比如一个Job需要10个Map任务,那么如果设定为1,则需要启动10个JVM。如果任务运行时间比较短,那么每次都要启动JVM,开销较大,因此可以将这个值修改为更大的值,比如如果为3,那么,可以允许同一个Job的任务顺序执行3次。这并不是指任务可以同时运行,而是顺序运行。如果修改为-1,则可以无限制的顺序运行,当然,前提是这些任务必须是一个Job内的任务。上一节在判断是否创建新JVM的时候曾经分析过类似的问题,在JvmRunner这个类中有两个变量numTasksRan和numTasksToRun,前一个变量表示已经顺序执行了几个任务,后一个变量表示最多可以顺序执行几个任务,numTasksToRun的值就决定于this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm()。
因此,上一节在分析是否可以新创建JVM的时候曾经遇到过下面的代码:
if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
LOG.info("No new JVM spawned for jobId/taskid: " +
jobId+"/"+t.getTask().getTaskID() +
". Attempting to reuse: " + jvmRunner.jvmId);
即属于同一个Job的任务,如果JVMRunner处于空闲状态,并且利用ranAll方法判断是否已经到达最大重用次数,如果还没有到达,则可以进行重用,而不用新启动JVM。
之后,创建管理相关的测量信息:
// Initiate Java VM metrics
initMetrics(prefix, jvmId.toString(), job.getSessionId());
5,执行用户实现的相关代码。
主要涉及到下面的代码:
// use job-specified working directory
FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
taskFinal.run(job, umbilical);
// run the task
首先设置其工作目录,之后调用Task的run方法执行任务。Task有MapTask和ReduceTask两类。
以MapTask为例,在run方法中,首先会启动一个向父进程报告的线程:
// start thread that will handle communication with parent
TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
jvmContext);
reporter.startCommunicationThread();
通过startCommunicationThread方法,进一步执行TaskReporter的run方法。
在该方法中,会向父进程报告进展,或者以ping的形式保持心跳,statusUpdate是一个RPC方法:
if (sendProgress) {
// we need to send progress update
updateCounters();
taskStatus.statusUpdate(taskProgress.get(),
taskProgress.toString(),
counters);
taskFound = umbilical.statusUpdate(taskId, taskStatus, jvmContext);
taskStatus.clearStatus();
// send ping
taskFound = umbilical.ping(taskId, jvmContext);
接下来,判断是否使用MapReduce新的API:
boolean useNewApi = job.getUseNewMapper();
* Should the framework use the new context-object code for running
* the mapper?
* @return true, if the new api should be used
public boolean getUseNewMapper() {
return getBoolean("mapred.mapper.new-api", false);
Hadoop目前提供了两套API,这个标志用于判断是否使用新API,默认不使用。
旧的一套API位于org.apache.hadoop.mapred中,主要用接口实现:
public interface Mapper&K1, V1, K2, V2& extends JobConfigurable, Closeable {
void map(K1 key, V1 value, OutputCollector&K2, V2& output, Reporter reporter)
throws IOE
public interface Reducer&K2, V2, K3, V3& extends JobConfigurable, Closeable {
void reduce(K2 key, Iterator&V2& values,
OutputCollector&K3, V3& output, Reporter reporter)
throws IOE
。。。。。。。
另外一套API位于org.apache.hadoop.mapreduce中,用类实现:
public class Mapper&KEYIN, VALUEIN, KEYOUT, VALUEOUT& {
protected void map(KEYIN key, VALUEIN value,
Context context) throws IOException, InterruptedException {
context.write((KEYOUT) key, (VALUEOUT) value);
public class Reducer&KEYIN,VALUEIN,KEYOUT,VALUEOUT& {
protected void reduce(KEYIN key, Iterable&VALUEIN& values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
。。。。。。。。
用户如果使用第一套API,那么就相当于实现Mapper和Reducer这两个接口中的map和reduce方法。如果使用第二套API,那么就可以继承这两个类,其方法也可以继承。应该说是一种改进。因为第二种方法里面将map、cleanup这些方法用Context这种概念串起来,其中Context存在两种:
public class MapContext&KEYIN,VALUEIN,KEYOUT,VALUEOUT&
extends TaskInputOutputContext&KEYIN,VALUEIN,KEYOUT,VALUEOUT& {
private RecordReader&KEYIN,VALUEIN&
private InputS
。。。。。。
public class ReduceContext&KEYIN,VALUEIN,KEYOUT,VALUEOUT&
比如,MapContext包含了OutputCollector和Reporter的功能,并且还容易扩展,这样的话扩展性更好。不过,其KV处理的基本思想仍然保持一致。
之后进入Task的initialize(job, getJobID(), reporter, useNewApi)方法。用于设置任务运行状态和输出目录等。并创建输出类:
committer = conf.getOutputCommitter();
public OutputCommitter getOutputCommitter() {
return (OutputCommitter)ReflectionUtils.newInstance(
getClass("mapred.output.committer.class", FileOutputCommitter.class,
OutputCommitter.class), this);
接着判断是否是特殊的任务,主要有Job清理任务、创建任务等。
// check if it is a cleanupJobTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
如果不是,则是MapTask或ReduceTask,假如是MapTask,则执行:
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
runOldMapper(job, splitMetaInfo, umbilical, reporter);
根据不同的API调用不同的方法。此处假定仍然使用默认的旧API。其方法声明为:
private &INKEY,INVALUE,OUTKEY,OUTVALUE&
void runOldMapper(final JobConf job,
final TaskSplitIndex splitIndex,
final TaskUmbilicalProtocol umbilical,
TaskReporter reporter
6,首先调用getSplitDetails方法获得Split信息:
InputSplit inputSplit = getSplitDetails(new Path(splitIndex.getSplitLocation()),
splitIndex.getStartOffset());
SplitIndex即传进来的参数,表示描述Split的信息,调用getSplitLocation获得其位置(即文件路径)以及在文件中的偏移(从文件起始位置的字节偏移量,是long型整数)。
首先获得该文件,移动偏移量:
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream inFile = fs.open(file);
inFile.seek(offset);
String className = Text.readString(inFile);
Split的类名即className,利用反射原理,创建这个类:
cls = (Class&T&) conf.getClassByName(className);
} catch (ClassNotFoundException ce) {
。。。。。
SerializationFactory factory = new SerializationFactory(conf);
Deserializer&T& deserializer = (Deserializer&T&) factory.getDeserializer(cls);
deserializer.open(inFile);
T split = deserializer.deserialize(null);
即获得类相关信息后,利用反序列化得到InputSplit对象,InputSplit本身是一个接口,FileSplit等等是其实现,在JobClient提交任务的时候,通过将FileSplit进行序列化存入split文件,在此处进行反序列化,则可以获得一样的FileSplit对象。获得该对象之后,则得到了输入文件、Split块的偏移位置、长度等信息,写入配置信息:
private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
if (inputSplit instanceof FileSplit) {
FileSplit fileSplit = (FileSplit) inputS
job.set("map.input.file", fileSplit.getPath().toString());
job.setLong("map.input.start", fileSplit.getStart());
job.setLong("map.input.length", fileSplit.getLength());
LOG.info("Processing split: " + inputSplit);
这样,就获得了该任务的Split对象。
7,创建记录读取类:
RecordReader&INKEY,INVALUE& in = isSkipping() ?
new SkippingRecordReader&INKEY,INVALUE&(inputSplit, umbilical, reporter) :
new TrackedRecordReader&INKEY,INVALUE&(inputSplit, job, reporter);
job.setBoolean("mapred.skip.on", isSkipping());
有两类,表示是否跳过失败记录,前面已经分析过,如果已经在同一个地方失败两次,则应该跳过。假如不跳过的话,则进入TrackedRecordReader的构造方法,TrackedRecordReader封装了用户自己编写的记录读取类,用变量RecordReader&K,V& rawIn表示,在构造方法中,如何获得rawIn是最核心的部分。其中有代码:
rawIn = job.getInputFormat().getRecordReader(split, job, reporter);
getInputFormat方法将创建输入数据的格式描述类:
* Get the {@link InputFormat} implementation for the map-reduce job,
* defaults to {@link TextInputFormat} if not specified explicity.
* @return the {@link InputFormat} implementation for the map-reduce job.
public InputFormat getInputFormat() {
return ReflectionUtils.newInstance(getClass("mapred.input.format.class",
TextInputFormat.class,
InputFormat.class),
可见,也是采用反射的方法创建的。默认是TextInputFormat。获得该格式对象后,调用其getRecordReader方法即可得到相应的记录读取对象。用户如果要自定义实现某种格式,则需要继承InputFormat,并且实现getRecordReader方法,自然,也必须实现一个RecordReader&K,V&类,该记录对输入数据进行读取,每次输出一个Key和Value对。Hadoop里默认已经有不少格式类和对应的读取类,比如以换行作为分隔符的文本格式和读取类;输入数据来自数据库中的表的读取类等等。
public interface InputFormat&K, V& {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOE
RecordReader&K, V& getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOE
总之,通过对配置文件进行解析,使用反射的方法,创建了相应的读取类,此时输入数据已经创建,读取类也创建,剩下的就是创建Map类等等。
8,创建MapOutputCollector对象。
Map输出可能有两种,当一个作业没有Reduce任务的时候,Map的输出数据可以直接写到HDFS为最终结果,否则应该写到本地文件,等着Reduce来取。有以下代码:
int numReduceTasks = conf.getNumReduceTasks();
LOG.info("numReduceTasks: " + numReduceTasks);
MapOutputCollector collector = null;
if (numReduceTasks & 0) {
collector = new MapOutputBuffer(umbilical, job, reporter);
collector = new DirectMapOutputCollector(umbilical, job, reporter);
即判断numReduceTasks是否等于0,如果为0,则创建DirectMapOutputCollector对象,否则创建MapOutputBuffer对象。
MapOutputBuffer是一个较为复杂的类,且与MapReduce性能关系较大,留作下一节深入分析,本节暂不涉及其细节,主要从流程上进行整体分析。
9,创建MapRunnable对象。
MapRunnable是一个接口,内部包含一个run方法:
public interface MapRunnable&K1, V1, K2, V2&
void run(RecordReader&K1, V1& input, OutputCollector&K2, V2& output,
Reporter reporter)
throws IOE
首先该接口的类主要是MapRunner,该类中包含一个对象:
private Mapper&K1, V1, K2, V2&
该对象即Mapper接口,用户实现的Mapper类将通过configure方法构造出来,同样也是通过反射的方法:
public void configure(JobConf job) {
this.mapper = ReflectionUtils.newInstance(job.getMapperClass(), job);
//increment processed counter only if skipping feature is enabled
this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)&0 &&
SkipBadRecords.getAutoIncrMapperProcCount(job);
10,执行MapRunnable的run方法:
runner.run(in, new OldOutputCollector(collector, conf), reporter);
collector.flush();
in.close();
in = null;
collector.close();
collector = null;
} finally {
closeQuietly(in);
closeQuietly(collector);
可以看出,Map任务最终即通过执行run方法达到目的,进入run方法中:
public void run(RecordReader&K1, V1& input, OutputCollector&K2, V2& output,
Reporter reporter)
throws IOException {
// allocate key & value instances that are re-used for all entries
K1 key = input.createKey();
V1 value = input.createValue();
while (input.next(key, value)) {
// map pair to output
mapper.map(key, value, output, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
} finally {
mapper.close();
通过调用记录读取类逐一读取记录,得到K1、V1,即map的输入;然后调用Mapper实现类的map方法,执行用户编写的Map方法,并计数。这种map方法将一直对每一个记录进行循环读取和处理,直到记录读取类无法读到记录为止。到此为止,Map的调用流程基本清楚了。
11,看完了Map的主要执行过程后,再来看Reduce的执行过程。
同样,ReduceTask的run方法是第一步。同样有判断新旧API的代码。假定使用旧API。
首先获取解码器,因为Map的输出结果可能会被压缩(减小Shuffle压力),因此这里利用反射方法进行判断:
private CompressionCodec initCodec() {
// check if map-outputs are to be compressed
if (conf.getCompressMapOutput()) {
Class&? extends CompressionCodec& codecClass =
conf.getMapOutputCompressorClass(DefaultCodec.class);
return ReflectionUtils.newInstance(codecClass, conf);
return null;
接着判断是否是本地模式,即JobTracker也与Reduce位于同一服务器,如果不是本地,需要创建一个ReduceCopier对象,到远端去获取Map的输出:
boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
if (!isLocal) {
reduceCopier = new ReduceCopier(umbilical, job, reporter);
if (!reduceCopier.fetchOutputs()) {
if(reduceCopier.mergeThrowable instanceof FSError) {
throw (FSError)reduceCopier.mergeT
throw new IOException("Task: " + getTaskID() +
" - The reduce copier failed", reduceCopier.mergeThrowable);
关于ReduceCopier如何从远端获取到Map的输出,我们留作后面分析。上面的代码结束后,拷贝过程结束了,接下来进行排序。
针对本地或分布式,采用不同的方法:
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
!conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
new Path(getTaskID().toString()), job.getOutputKeyComparator(),
reporter, spilledRecordsCounter, null)
: reduceCopier.createKVIterator(job, rfs, reporter);
如果是本地,采用Merger.merge方法,否则采用reduceCopier.createKVIterator方法,后面这个方法最终也是调用了Merger的方法,Merger内部较为复杂,与MapOutputBuffer、ReduceCopier一样,我们留作以后分析,这几个类涉及到Map的输出的处理(缓存写本地文件)、Reduce的输入的处理(拷贝、归并)。实际上,Map和Reduce由用户实现,实现较为简单,但其输出、输入的处理较为复杂。
12,排序完毕,接下来进入Reduce阶段。
首先获得Map的输出key,value的类信息:
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();&&& if (useNewApi) {&&&&& runNewReducer(job, umbilical, reporter, rIter, comparator, &&&&&&&&&&&&&&&&&&& keyClass, valueClass);&&& } else {&&&&& runOldReducer(job, umbilical, reporter, rIter, comparator, &&&&&&&&&&&&&&&&&&& keyClass, valueClass);&&& }
假定使用旧API,进入runOldReducer方法。
首先直接创建Reducer的实现类,即用户编写的类,与Mapper不同(使用MapRunner类封装用户编写的Mapper类)。
Reducer&INKEY,INVALUE,OUTKEY,OUTVALUE& reducer =
ReflectionUtils.newInstance(job.getReducerClass(), job);
获取Reduce输出文件名:
String finalName = getOutputName(getPartition());
static synchronized String getOutputName(int partition) {
return "part-" + NUMBER_FORMAT.format(partition);
不同的Reduce对应于不同的partition,输出文件名加上分区号。
13,创建输出记录写入类:
RecordWriter&OUTKEY, OUTVALUE& out =
new OldTrackingRecordWriter&OUTKEY, OUTVALUE&(
reduceOutputCounter, job, reporter, finalName);
final RecordWriter&OUTKEY, OUTVALUE& finalOut =
在OldTrackingRecordWriter类中,封装了一个真正的记录写入类RecordWriter&K, V& real。其创建方法和记录读取类类似:
this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName,
reporter);
public OutputFormat getOutputFormat() {
return ReflectionUtils.newInstance(getClass("mapred.output.format.class",
TextOutputFormat.class,
OutputFormat.class),
默认为TextOutputFormat。
在此基础上,创建输出对象:
OutputCollector&OUTKEY,OUTVALUE& collector =
new OutputCollector&OUTKEY,OUTVALUE&() {
public void collect(OUTKEY key, OUTVALUE value)
throws IOException {
finalOut.write(key, value);
// indicate that progress update needs to be sent
reporter.progress();
该对象即对key,value进行逐一写入。
14,创建读取记录迭代器
首先创建读取Reduce输入记录值的对象:
ReduceValuesIterator&INKEY,INVALUE& values = isSkipping() ?
new SkippingReduceValuesIterator&INKEY,INVALUE&(rIter,
comparator, keyClass, valueClass,
job, reporter, umbilical) :
new ReduceValuesIterator&INKEY,INVALUE&(rIter,
job.getOutputValueGroupingComparator(), keyClass, valueClass,
job, reporter);
注意,这里涉及几个迭代器:
RawKeyValueIterator用于对Map的输出中的KV进行迭代,适用于本地文件。从源代码的解释来看,其含义是:RawKeyValueIterator is an iterator used to iterate over the raw keys and values during sort/merge of intermediate data,即在Sort和归并中间数据的时候使用的迭代器。
ReduceValuesIterator也是一个迭代器,适用于对合并后的Reduce的值进行迭代。这两个迭代器有什么区别?
从代码来看,ReduceValuesIterator继承于ValuesIterator(CombineValuesIterator也继承于此类),ValuesIterator内部本身就有一个RawKeyValueIterator in,在读取下一个Key和Value的时候,其实现为:
private void readNextKey() throws IOException {
more = in.next();
if (more) {
DataInputBuffer nextKeyBytes = in.getKey();
keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(),
nextKeyBytes.getLength() - nextKeyBytes.getPosition());
nextKey = keyDeserializer.deserialize(nextKey);
hasNext = key != null && (comparator.compare(key, nextKey) == 0);
hasNext = false;
private void readNextValue() throws IOException {
DataInputBuffer nextValueBytes = in.getValue();
valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(),
nextValueBytes.getLength() - nextValueBytes.getPosition());
value = valDeserializer.deserialize(value);
可以看出,RawKeyValueIterator是在字节这个层次工作,其排序、归并过程本身不关心字节代表什么含义,完全是一种字节层面的排序和归并。而要真正进行Reduce计算的时候,这个时候需要按照其实际格式进行解析,比如解析为ASCII字符串还是Unicode字符串等等,因此,ValuesIterator里面才需要有两个反序列化的对象:
private Deserializer&KEY& keyD
private Deserializer&VALUE& valD
private DataInputBuffer keyIn = new DataInputBuffer();
private DataInputBuffer valueIn = new DataInputBuffer();
从其构造方法来看,这两个序列化对象实际上正是用户配置的Reduce的输入KV类:
this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
this.keyDeserializer.open(keyIn);
this.valDeserializer = serializationFactory.getDeserializer(valClass);
this.valDeserializer.open(this.valueIn);
其中的keyClass、valClass上面分析过,即:
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
即"mapred.mapoutput.key.class"和"mapred.mapoutput.value.class"指定的类。比较来自于"mapred.output.value.groupfn.class"指定的类。
注意,创建这些类之前实际上已经完成了归并排序工作,这说明Reduce之前的排序工作并不需要知道是什么格式的KV,完全按照字节进行。
15,循环调用reduce方法,执行reduce功能:
while (values.more()) {
reduceInputKeyCounter.increment(1);
reducer.reduce(values.getKey(), values, collector, reporter);
if(incrProcCount) {
reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
values.nextKey();
values.informReduceProgress();
其中values就是前面创建的ReduceValuesIterator。逐一取出其中的key和values,执行reduce方法,我们回顾一下reduce方法的定义就明白ReduceValuesIterator的重要性:
void reduce(K2 key, Iterator&V2& values,OutputCollector&K3, V3& output, Reporter reporter)
throws IOE
以上分析了Child子进程的处理流程,主要包含了与TaskTracker进程建立RPC通道,读取Split信息,通过反序列化创建Split对象,以及记录读取对象,对输入数据进行逐一调用用户编写的map方法的过程,另外,关于Reduce任务,分析了创建MapOutputCollector对象、ReduceCopier对象、Merger对象,以及反序列化创建Reduce输入数据的记录读取对象、结果输出对象等等。大体上,这就是MapReduce粗略的执行过程。目前还有几个关键的类没有分析,这几个类对MapReduce的性能有较大影响,主要有:
MapOutputBuffer,该类包含了如何缓存Map输出,以及对Map输出进行KV序列化、压缩、Combiner、写本地文件等复杂操作;
ReduceCopier,该类包含了如何从远端Map任务所在机器去取数据的功能,即实现Shuffle;
Merger,该类包含了如何对Reduce的输入数据进行归并排序的功能。关于这几个类的详细分析留作后面各节。
阅读(...) 评论()}

我要回帖

更多关于 mapreduce是什么 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信