Hadoop MapReduce教程.docx
- 文档编号:9388897
- 上传时间:2023-05-18
- 格式:DOCX
- 页数:31
- 大小:36.60KB
Hadoop MapReduce教程.docx
《Hadoop MapReduce教程.docx》由会员分享,可在线阅读,更多相关《Hadoop MapReduce教程.docx(31页珍藏版)》请在冰点文库上搜索。
HadoopMapReduce教程
HadoopMap/Reduce教程
目的
这篇教程从用户的角度出发,全面地介绍了HadoopMap/Reduce框架的各个方面。
先决条件
请先确认Hadoop被正确安装、配置和正常运行中。
更多信息见:
∙Hadoop快速入门对初次使用者。
∙Hadoop集群搭建对大规模分布式集群。
概述
HadoopMap/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。
一个Map/Reduce作业(job)通常会把输入的数据集切分为若干独立的数据块,由map任务(task)以完全并行的方式处理它们。
框架会对map的输出先进行排序,然后把结果输入给reduce任务。
通常作业的输入和输出都会被存储在文件系统中。
整个框架负责任务的调度和监控,以及重新执行已经失败的任务。
通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。
这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。
Map/Reduce框架由一个单独的masterJobTracker和每个集群节点一个slaveTaskTracker共同组成。
master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。
而slave仅负责执行由master指派的任务。
应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。
再加上其他作业的参数,就构成了作业配置(jobconfiguration)。
然后,Hadoop的jobclient提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。
虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用Java来写。
∙HadoopStreaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序(例如:
Shell工具)来做为mapper和reducer。
∙HadoopPipes是一个与SWIG兼容的C++API(没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。
输入与输出
Map/Reduce框架运转在
框架需要对key和value的类(classes)进行序列化操作,因此,这些类需要实现Writable接口。
另外,为了方便框架执行排序操作,key类必须实现WritableComparable接口。
一个Map/Reduce作业的输入和输出类型如下所示:
(input)
例子:
WordCountv1.0
在深入细节之前,让我们先看一个Map/Reduce的应用示例,以便对它们的工作方式有一个初步的认识。
WordCount是一个简单的应用,它可以计算出指定数据集中每一个单词出现的次数。
这个应用适用于单机模式,伪分布式模式或完全分布式模式三种Hadoop安装方式。
源代码
WordCount.java
1.
packageorg.myorg;
2.
3.
importjava.io.IOException;
4.
importjava.util.*;
5.
6.
importorg.apache.hadoop.fs.Path;
7.
importorg.apache.hadoop.conf.*;
8.
importorg.apache.hadoop.io.*;
9.
importorg.apache.hadoop.mapred.*;
10.
importorg.apache.hadoop.util.*;
11.
12.
publicclassWordCount{
13.
14.
publicstaticclassMapextendsMapReduceBaseimplementsMapper
15.
privatefinalstaticIntWritableone=newIntWritable
(1);
16.
privateTextword=newText();
17.
18.
publicvoidmap(LongWritablekey,Textvalue,OutputCollector
19.
Stringline=value.toString();
20.
StringTokenizertokenizer=newStringTokenizer(line);
21.
while(tokenizer.hasMoreTokens()){
22.
word.set(tokenizer.nextToken());
23.
output.collect(word,one);
24.
}
25.
}
26.
}
27.
28.
publicstaticclassReduceextendsMapReduceBaseimplementsReducer
29.
publicvoidreduce(Textkey,Iterator
30.
intsum=0;
31.
while(values.hasNext()){
32.
sum+=values.next().get();
33.
}
34.
output.collect(key,newIntWritable(sum));
35.
}
36.
}
37.
38.
publicstaticvoidmain(String[]args)throwsException{
39.
JobConfconf=newJobConf(WordCount.class);
40.
conf.setJobName("wordcount");
41.
42.
conf.setOutputKeyClass(Text.class);
43.
conf.setOutputValueClass(IntWritable.class);
44.
45.
conf.setMapperClass(Map.class);
46.
conf.setCombinerClass(Reduce.class);
47.
conf.setReducerClass(Reduce.class);
48.
49.
conf.setInputFormat(TextInputFormat.class);
50.
conf.setOutputFormat(TextOutputFormat.class);
51.
52.
FileInputFormat.setInputPaths(conf,newPath(args[0]));
53.
FileOutputFormat.setOutputPath(conf,newPath(args[1]));
54.
55.
JobClient.runJob(conf);
57.
}
58.
}
59.
用法
假设环境变量HADOOP_HOME对应安装时的根目录,HADOOP_VERSION对应Hadoop的当前安装版本,编译WordCount.java来创建jar包,可如下操作:
$mkdirwordcount_classes
$javac-classpath${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar-dwordcount_classesWordCount.java
$jar-cvf/usr/joe/wordcount.jar-Cwordcount_classes/.
假设:
∙/usr/joe/wordcount/input-是HDFS中的输入路径
∙/usr/joe/wordcount/output-是HDFS中的输出路径
用示例文本文件做为输入:
$bin/hadoopdfs-ls/usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$bin/hadoopdfs-cat/usr/joe/wordcount/input/file01
HelloWorldByeWorld
$bin/hadoopdfs-cat/usr/joe/wordcount/input/file02
HelloHadoopGoodbyeHadoop
运行应用程序:
$bin/hadoopjar/usr/joe/wordcount.jarorg.myorg.WordCount/usr/joe/wordcount/input/usr/joe/wordcount/output
输出是:
$bin/hadoopdfs-cat/usr/joe/wordcount/output/part-00000
Bye1
Goodbye1
Hadoop2
Hello2
World2
应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。
使用选项-libjars可以向map和reduce的classpath中添加jar包。
使用-archives选项程序可以传递档案文件做为参数,这些档案文件会被解压并且在task的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。
有关命令行选项的更多细节请参考Commandsmanual。
使用-libjars和-files运行wordcount例子:
hadoopjarhadoop-examples.jarwordcount-filescachefile.txt-libjarsmylib.jarinputoutput
解释
WordCount应用程序非常直截了当。
Mapper(14-26行)中的map方法(18-25行)通过指定的TextInputFormat(49行)一次处理一行。
然后,它通过StringTokenizer以空格为分隔符将一行切分为若干tokens,之后,输出<
对于示例中的第一个输入,map输出是:
第二个输入,map输出是:
关于组成一个指定作业的map数目的确定,以及如何以更精细的方式去控制这些map,我们将在教程的后续部分学习到更多的内容。
WordCount还指定了一个combiner(46行)。
因此,每次map运行之后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。
第一个map的输出是:
第二个map的输出是:
Reducer(28-36行)中的reduce方法(29-35行)仅是将每个key(本例中就是单词)出现的次数求和。
因此这个作业的输出就是:
代码中的run方法中指定了作业的几个方面,例如:
通过命令行传递过来的输入/输出路径、key/value的类型、输入/输出的格式等等JobConf中的配置信息。
随后程序调用了JobClient.runJob(55行)来提交作业并且监控它的执行。
我们将在本教程的后续部分学习更多的关于JobConf,JobClient,Tool和其他接口及类(class)。
Map/Reduce-用户界面
这部分文档为用户将会面临的Map/Reduce框架中的各个环节提供了适当的细节。
这应该会帮助用户更细粒度地去实现、配置和调优作业。
然而,请注意每个类/接口的javadoc文档提供最全面的文档;本文只是想起到指南的作用。
我们会先看看Mapper和Reducer接口。
应用程序通常会通过提供map和reduce方法来实现它们。
然后,我们会讨论其他的核心接口,其中包括:
JobConf,JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat等等。
最后,我们将通过讨论框架中一些有用的功能点(例如:
DistributedCache,IsolationRunner等等)来收尾。
核心功能描述
应用程序通常会通过提供map和reduce来实现Mapper和Reducer接口,它们组成作业的核心。
Mapper
Mapper将输入键值对(key/valuepair)映射到一组中间格式的键值对集合。
Map是一类将输入记录集转换为中间格式记录集的独立任务。
这种转换的中间格式记录集不需要与输入记录集的类型一致。
一个给定的输入键值对可以映射成0个或多个输出键值对。
HadoopMap/Reduce框架为每一个InputSplit产生一个map任务,而每个InputSplit是由该作业的InputFormat产生的。
概括地说,对Mapper的实现者需要重写JobConfigurable.configure(JobConf)方法,这个方法需要传递一个JobConf参数,目的是完成Mapper的初始化工作。
然后,框架为这个任务的InputSplit中每个键值对调用一次map(WritableComparable,Writable,OutputCollector,Reporter)操作。
应用程序可以通过重写Closeable.close()方法来执行相应的清理工作。
输出键值对不需要与输入键值对的类型一致。
一个给定的输入键值对可以映射成0个或多个输出键值对。
通过调用OutputCollector.collect(WritableComparable,Writable)可以收集输出的键值对。
应用程序可以使用Reporter报告进度,设定应用级别的状态消息,更新Counters(计数器),或者仅是表明自己运行正常。
框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给Reducer以产出最终的结果。
用户可以通过JobConf.setOutputKeyComparatorClass(Class)来指定具体负责分组的Comparator。
Mapper的输出被排序后,就被划分给每个Reducer。
分块的总数目和一个作业的reduce任务的数目是一样的。
用户可以通过实现自定义的Partitioner来控制哪个key被分配给哪个Reducer。
用户可选择通过JobConf.setCombinerClass(Class)指定一个combiner,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到Reducer数据传输量。
这些被排好序的中间过程的输出结果保存的格式是(key-len,key,value-len,value),应用程序可以通过JobConf控制对这些中间结果是否进行压缩以及怎么压缩,使用哪种CompressionCodec。
需要多少个Map?
Map的数目通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。
Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU消耗较小的map任务可以设到300个左右。
由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟。
这样,如果你输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,除非使用setNumMapTasks(int)(注意:
这里仅仅是对框架进行了一个提示(hint),实际决定因素见这里)将这个数值设置得更高。
Reducer
Reducer将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。
用户可以通过JobConf.setNumReduceTasks(int)设定一个作业中reduce任务的数目。
概括地说,对Reducer的实现者需要重写JobConfigurable.configure(JobConf)方法,这个方法需要传递一个JobConf参数,目的是完成Reducer的初始化工作。
然后,框架为成组的输入数据中的每个
之后,应用程序可以通过重写Closeable.close()来执行相应的清理工作。
Reducer有3个主要阶段:
shuffle、sort和reduce。
Shuffle
Reducer的输入就是Mapper已经排好序的输出。
在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。
Sort
这个阶段,框架将按照key的值对Reducer的输入进行分组(因为不同mapper的输出中可能会有相同的key)。
Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。
SecondarySort
如果需要中间过程对key的分组规则和reduce前对key的分组规则不同,那么可以通过JobConf.setOutputValueGroupingComparator(Class)来指定一个Comparator。
再加上JobConf.setOutputKeyComparatorClass(Class)可用于控制中间过程的key如何被分组,所以结合两者可以实现按值的二次排序。
Reduce
在这个阶段,框架为已分组的输入数据中的每个
Reduce任务的输出通常是通过调用OutputCollector.collect(WritableComparable,Writable)写入文件系统的。
应用程序可以使用Reporter报告进度,设定应用程序级别的状态消息,更新Counters(计数器),或者仅是表明自己运行正常。
Reducer的输出是没有排序的。
需要多少个Reduce?
Reduce的数目建议是0.95或1.75乘以(
用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。
用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。
增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。
上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks)或失败的任务预留一些reduce的资源。
无Reducer
如果没有归约要进行,那么设置reduce任务的数目为零是合法的。
这种情况下,map任务的输出会直接被写入由setOutputPath(Path)指定的输出路径。
框架在把它们写入FileSystem之前没有对它们进行排序。
Partitioner
Partitioner用于划分键值空间(keyspace)。
Partitioner负责控制map输出结果key的分割。
Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。
分区的数目与一个作业的reduce任务的数目是一样的。
因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。
HashPartitioner是默认的Partitioner。
Reporter
Reporter是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息,更新Counters(计数器)的机制。
Mapper和Reducer的实现可以利用Reporter来报告进度,或者仅是表明自己运行正常。
在那种应用程序需要花很长时间处理个别键值对的场景中,这种机制是很关键的,因为框架可能会以为这个任务超时了,从而将它强行杀死。
另一个避免这种情况发生的方式是,将配置参数mapred.task.timeout设置为一个足够高的值(或者干脆设置为零,则没有超时限制了)。
应用程序可以用Reporter来更新Counter(计数器)。
OutputCollector
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Hadoop MapReduce教程 MapReduce 教程