Hadoop文件输入和文件输出学步园.docx
- 文档编号:18259518
- 上传时间:2023-08-14
- 格式:DOCX
- 页数:15
- 大小:33.32KB
Hadoop文件输入和文件输出学步园.docx
《Hadoop文件输入和文件输出学步园.docx》由会员分享,可在线阅读,更多相关《Hadoop文件输入和文件输出学步园.docx(15页珍藏版)》请在冰点文库上搜索。
Hadoop文件输入和文件输出学步园
Hadoop文件输入和文件输出学步园
本文完成对hadoop输入、输出文件方式的控制,完成
的功能如下:
2、改写输出的格式,输出文件时每个输入文件对应一个输
出文件,输出文件的名字跟输入文件名字相同。
直接上代码:
coAuInputFormatpackagean.hadoop.code.audit;/**
*Thefunctionofthisclassisrevisetheinputformat
*the--->map
*ofthemap
**/importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.Text;
importpress.CompressionCodec;
importpress.CompressionCodecFacto
ry;
importorg.apache.hadoop.mapred.FileSplit;
importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;publicclasscoAuInputFormatextendsFileInputFormat{
privateCompressionCodecFactorycompressionCodecs=null;
publicvoidconfigure(Configurationconf){
compressionCodecs=new
CompressionCodecFactory(conf);
/**
*@briefisSplitable不对文件进行切分,必须对文件整体进行处理
*@paramfs
*@paramfile
*@returnfalse
*/
protectedbooleanisSplitable(FileSystemfs,Pathfile){
CompressionCodeccodec=
compressionCodecs.getCodec(file);
returnfalse;//以文件为单位,每个单位作为一个split,
即使单个文件的大小超过了64M,也就是Hadoop一个块得大小,也不进行分片
}@Override
publicRecordReadercreateRecordReader(InputSplitsplit,
TaskAttemptContextcontext)throws
IOException,
InterruptedException{
//TODOAuto-generatedmethodstubreturnnewcoAuRecordReader(context,split);
}}coAuRecordReaderpackagean.hadoop.code.audit;importjava.io.IOException;import
mons.logging.Log;
importmons.logging.LogFactory;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.FSDataInputStream;
importorg.apache.hadoop.fs.FileSystem;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.Text;
importpress.CompressionCodec;
importpress.CompressionCodecFactory;importorg.apache.hadoop.mapreduce.InputSplit;
importorg.apache.hadoop.mapreduce.RecordReader;
importorg.apache.hadoop.mapreduce.TaskAttemptContext;
importorg.apache.hadoop.mapreduce.lib.input.FileSplit;publicclasscoAuRecordReaderextendsRecordReader{
privatestaticfinalLogLOG=
LogFactory.getLog(coAuRecordReader.class.getName());
privateCompressionCodecFactory
compressionCodecs=null;
privatelongstart;
privatelongpos;
privatelongend;
privatebyte[]buffer;
privateStringkeyName;
privateFSDataInputStreamfileIn;
privateTextkey=null;
privateTextvalue=null;public
coAuRecordReader(TaskAttemptContextcontext,
InputSplitgenericSplit)throwsIOException{
//TODOAuto-generatedconstructorstub
Configurationjob=context.getConfiguration();
FileSplitsplit=(FileSplit)genericSplit;
start=((FileSplit)split).getStart();//从中可以看出每
个文件是作为一个split的
end=split.getLength()+start;
finalPathpath=split.getPath();//keyName=path.toString();//key的值是文件路径
LOG.info("filenameinhdfsis:
"+keyName);//写入日
志文件,去哪里查看日志呢?
finalFileSystemfs=path.getFileSystem(job);
fileIn=fs.open(path);
fileIn.seek(start);
buffer=newbyte[(int)(end-start)];
this.pos=start;
/*if(key==null){
key=newText();
key.set(keyName);
if(value==null){
value=newText();
value.set(utf8);
}*/
publicvoidinitialize(InputSplitgenericSplit,
TaskAttemptContextcontext)
throwsIOException,InterruptedException{
FileSplitsplit=(FileSplit)genericSplit;
Configurationjob=context.getConfiguration();
//this.maxLineLength=
job.getInt("mapred.linerecordreader.maxlength",Integer.M
AX_VALUE);
start=split.getStart();
end=start+split.getLength();
finalPathfile=split.getPath();
compressionCodecs=new
CompressionCodecFactory(job);
finalCompressionCodeccodec=
compressionCodecs.getCodec(file);
keyName=file.toString();//key的值是文件路径
LOG.info("filenameinhdfsis:
"+keyName);//写入日
志文件,去哪里查看日志呢?
finalFileSystemfs=file.getFileSystem(job);
fileIn=fs.open(file);
fileIn.seek(start);
buffer=newbyte[(int)(end-start)];
this.pos=start;
}@Override
publicbooleannextKeyValue()throwsIOException,
InterruptedException{
//TODOAuto-generatedmethodstub//这个是需要做的if(key==null){
key=newText();
key.set(keyName);
if(value==null){
value=newText();
key.clear();
key.set(keyName);//setthekeyvalue.clear();//clearthevaluewhile(pos fileIn.readFully(pos,buffer); value.set(buffer); pos+=buffer.length; LOG.info("endis: "+end+"posis: "+pos); returntrue; returnfalse; }@Override publicTextgetCurrentKey()throwsIOException, InterruptedException{ //TODOAuto-generatedmethodstubreturnkey; }@Override publicTextgetCurrentValue()throwsIOException, InterruptedException{ //TODOAuto-generatedmethodstubreturnvalue; }@Override publicfloatgetProgress()throwsIOException, InterruptedException{ //TODOAuto-generatedmethodstubif(start==end){ return0.0f; }else{ returnMath.min(1.0f,(pos-start)/(float)(end- start)); }@Override publicvoidclose()throwsIOException{ //TODOAuto-generatedmethodstubif(fileIn! =null){ fileIn.close(); coAuOutputFormatpackagean.hadoop.code.audit;/** *thenameoftheoutputfilename **/importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.io.IntWritable; importorg.apache.hadoop.io.Text;publicclasscoAuOutputFormatextendsMultipleOutputFormat{ privatefinalstaticStringsuffix="_its4"; @Override protectedStringgenerateFileNameForKeyValue(Textkey,Textvalue, Configurationconf){ //TODOAuto-generatedmethodstub Stringpath=key.toString();//文件的路径及名字 String[]dir=path.split("/"); intlength=dir.length; Stringfilename=dir[length-1]; returnfilename+suffix;//输出的文件名,输出的文件名 MultipleOutputFormatpackagean.hadoop.code.audit;/** *themutiply **/importjava.io.DataOutputStream; importjava.io.IOException; importjava.util.HashMap; importjava.util.Iterator; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.FSDataOutputStream; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.Writable; importorg.apache.hadoop.io.WritableComparable; importpress.CompressionCodec; importpress.GzipCodec; importorg.apache.hadoop.mapreduce.OutputCommitter; importorg.apache.hadoop.mapreduce.RecordWriter; importorg.apache.hadoop.mapreduce.TaskAttemptContext; importorg.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importorg.apache.hadoop.util.ReflectionUtils; publicabstractclassMultipleOutputFormat,Vextends Writable> extendsFileOutputFormat{//默认的是 TextOutputFormat privateMultiRecordWriterwriter=null; publicRecordWriter IOException, InterruptedException{ if(writer==null){ writer=newMultiRecordWriter(job, getTaskOutputPath(job));//job,outputpath returnwriter; privatePathgetTaskOutputPath(TaskAttemptContextconf)throwsIOException{//获得输出路径 PathworkPath=null; OutputCommittercommitter= super.getOutputCommitter(conf); if(committerinstanceofFileOutputCommitter){//如果 workPath=((FileOutputCommitter) committer).getWorkPath();//工作路径 }else{ PathoutputPath=super.getOutputPath(conf);//获 得conf路径 if(outputPath==null){ thrownewIOException("Undefinedjob output-path"); workPath=outputPath; returnworkPath;// /**通过key,value,conf来确定输出文件名(含扩展名) */ protectedabstractStringgenerateFileNameForKeyValue(Kkey,Vvalue, Configurationconf);//抽象方法,被之后的方法重写了 publicclassMultiRecordWriterextendsRecordWriter{ /**RecordWriter的缓存*/privateHashMap>recordWriters=null; privateTaskAttemptContextjob=null; /**输出目录*/privatePathworkPath=null; publicMultiRecordWriter(TaskAttemptContextjob, PathworkPath){//构造函数 super(); this.job=job; this.workPath=workPath; @Override publicvoidclose(TaskAttemptContextcontext) throwsIOException,InterruptedException{//多个writer都要关掉 Iterator>values= this.recordWriters.values().iterator(); while(values.hasNext()){ values.next().close(context); this.recordWriters.clear(); @Overridepublicvoidwrite(Kkey,Vvalue)throwsIOException, InterruptedException{ //得到输出文件名 StringbaseName= generateFileNameForKeyValue(key,value,job.getConfiguration());//生成输出文件名 RecordWriterrw=if(rw==null){ rw=getBaseRecordWriter(job,baseName);// this.recordWriters.put(baseName,rw); rw.write(key,value); // ${mapred.out.dir}/_temporary/_${taskid}/${nameWithExtension} privateRecordWriter getBaseRecordWriter(TaskAttemptContextjob,StringbaseName) throwsIOException,InterruptedException{ Configurationconf=job.getConfiguration(); booleanisCompressed= getCompressOutput(job); StringkeyValueSeparator=","; RecordWriterrecordWriter=null; if(isCompressed){ ClasscodecClass= getOutputCompressorClass(job, GzipCodec.class); CompressionCodeccodec= ReflectionUtils.newInstance(codecClass,conf); Pathfile=newPath(workPath,baseName+ codec.getDefaultExtension()); FSDataOutputStreamfileOut= file.getFileSystem(conf).create(file,false); recordWriter=newLineRecordWriter(new DataOutputStream(codec .createOutputStream(fileOut)), keyValueSeparator); }else{ Pathfile=newPath(workPath,baseName); 是指的file FSDataOutputStreamfileOut= file.getFileSystem(conf).create(file,false);//filenameoftheoutputfile recordWriter=newLineRecordWriter(fileOut, keyValueSeparator);//这里调用的LineRecordWriter returnrecordWriter; LineRecordWriterpackagean.hadoop.code.audit;/*RecordWriter的一个实现,用于把 TextOutputFormat的一个子类存在, *protected访问权限,因此普通程序无法访问。 这里仅仅是把LineRecor
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- Hadoop 文件 输入 和文 输出 学步