背景:这两天要把一个文件中的的多个 html 代码块进行解析,该文件特别大将近 1TB,所以想用 Hadoop 来处理。该文件内容格式如下所示:htmlds.txt
<html>
<title>title1</title>
<div>xxxx</div>....
</html>
<html>
<title>title2</title>
<div>xxxxxxx</div>...
</html>....
现在就有个问题,Hadoop 默认的 map 处理逻辑是对每一行进行同样的逻辑处理,而我现在需要对每一个<html>...</html>为一个单元进行处理,因此就需要进行 Hadoop 输入格式的自定义。
基础知识【1】:
1.什么是输入格式 MapReduce 中接口 InputFormat 定义了获取数据分片和获取记录读取器的方法,分别是 getSplits 和 createRecordReader 方法,分别用于获取数据分片和定义数据访问方式;
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context)throws IOException, InterruptedException;
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException;
}
RecordReader 类是实际用来加载数据并把数据转换为适合 Mapper 读取的键值对,它会在输入块上被重复的调用直到整个输入块被处理完毕,每一次调用 RecordReader 都会调用 Mapper 的 map()方法。
2.有哪些输入格式抽象类 FileInputFormat 实现了 InputFormat 接口,是所有操作文件类型输入类的父类。InputFormat 常见的接口实现类包括 TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat、SequenceFileInputFormat、DBInputFormat 等。TextInputFormatTextInputFormat 是默认的 InputFormat。TextInputFormat 提供了一个 LineRecordReader,这个类会把输入文件的每一行作为值,每一行在文件中的字节偏移量为键。每条记录是一行输入,键是 LongWritable 类型,存储该行在整个文件中的字节偏移量,值是这行的内容,不包括任何行终止符(换行符和回车符)。KeyValueTextInputFormat 每一行均为一条记录,被分隔符分割为 key,value。可以通过在驱动类中设置 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "");来设定分隔符。默认分隔符是 tab(\t)。此时的键是每行排在制表符之前的 Text 序列。实现方案:通过上面的介绍,我们知道了输入格式是什么东西,那怎么改呢,就是重写我们的 InputFormat 接口,由于我们的实现功能是一次读取多行数据,所以可以参照 LineRecordReader 来实现我们需要的功能。LineRecordReader 类是每次读取一行就结束,那我们要想每次读取一个<html>...</html>代码块,只需要修改逻辑,即我们只有读取到</html>才结束这一次读取过程。所以,我们可以自定义 HtmlLineRecordReader 类,继承 RecordReader 类,修改 nextKeyValue 里面的读取逻辑。HtmlLineRecordReader.java
public boolean nextKeyValue() throws IOException { if (key == null) { key = new LongWritable(); } key.set(pos); if (value == null) { value = new Text(); } int newSize = 0; // flag为是否清空的标记,保证每隔两条清空一次 boolean flag=true; //curValue 为读取到的最新的一行,value为这次循环读取的所有行的不断积累 Text curValue = new Text(); int cycleCount = 0; do{ if (curValue.getLength()==0) { cycleCount++; }else { cycleCount = 0; } while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { //重写readLine方法,传入curValue和flag newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos),curValue,flag); pos += newSize; }
if ((newSize == 0) || (newSize < maxLineLength)) { break; }
// line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); }
复制代码
// System.out.println("For...Loop:value:"+value.toString());// System.out.println("For...Loop:curValue:"+curValue.toString());//如果未读取到</html>(''是因为我自己数据集的问题包含'')则不清空累计 append 的 value,if (!curValue.toString().equals("</html>\")) {flag = false;}//结束条件:遇到了'</html>'字符串,cycleCount 是为了防止我的文件出现连续空行,该异常不多见可以根据自己情况适当调整}while(!curValue.toString().equals("</html>\")&&cycleCount<20);//do while... end//
if (newSize == 0) { key = null; value = null; return false; } else { return true; } }
复制代码
readLine 方法在 HtmlLineReader.java 里 public int readLine(Text str, int maxLineLength,int maxBytesToConsume,Text curValue,boolean flag) throws IOException {if (this.recordDelimiterBytes != null) {return readCustomLine(str, maxLineLength, maxBytesToConsume,curValue,flag);} else {return readDefaultLine(str, maxLineLength, maxBytesToConsume,curValue,flag);}}然后重载 readCustomLine 和 readDefaultLine 两个方法/*** 重载 readCustomerLine 方法*/private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume,Text curValue,boolean flag)throws IOException {
if(flag){ str.clear(); } curValue.clear(); int txtLength = 0; // tracks str.getLength(), as an optimization long bytesConsumed = 0; int delPosn = 0; int ambiguousByteCount=0; // To capture the ambiguous characters count do { int startPosn = bufferPosn; // Start from previous end position if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0); if (bufferLength <= 0) { if (ambiguousByteCount > 0) { str.append(recordDelimiterBytes, 0, ambiguousByteCount); curValue.append(recordDelimiterBytes, 0, ambiguousByteCount); bytesConsumed += ambiguousByteCount; } break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) { delPosn++; if (delPosn >= recordDelimiterBytes.length) { bufferPosn++; break; } } else if (delPosn != 0) { bufferPosn--; delPosn = 0; } } int readLength = bufferPosn - startPosn; bytesConsumed += readLength; int appendLength = readLength - delPosn; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } bytesConsumed += ambiguousByteCount; if (appendLength >= 0 && ambiguousByteCount > 0) { //appending the ambiguous characters (refer case 2.2) str.append(recordDelimiterBytes, 0, ambiguousByteCount); curValue.append(recordDelimiterBytes, 0, ambiguousByteCount); ambiguousByteCount = 0; // since it is now certain that the split did not split a delimiter we // should not read the next record: clear the flag otherwise duplicate // records could be generated unsetNeedAdditionalRecordAfterSplit(); } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); curValue.append(buffer, startPosn, appendLength); txtLength += appendLength; } if (bufferPosn >= bufferLength) { if (delPosn > 0 && delPosn < recordDelimiterBytes.length) { ambiguousByteCount = delPosn; bytesConsumed -= ambiguousByteCount; //to be consumed in next } } } while (delPosn < recordDelimiterBytes.length && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before delimiter: " + bytesConsumed); } return (int) bytesConsumed; }
/** * 重载readDefaultLine方法 */private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume,Text curValue,boolean flag) throws IOException { if(flag){ str.clear(); } curValue.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) { ++bytesConsumed; //account for CR from previous read } bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); curValue.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int)bytesConsumed;}
复制代码
以上就完成了 Hadoop 输入格式的自定义工作,我们再总结一下:重写新实现 InputFormat,其中比较重要的两个类:HtmlLineRecordReader 类,该类里面的 nextKeyValue 方法,主要完成设置清空 value 值的条件;HtmlLineReader 类,该类中的 readCustomLine 和 readDefaultLine 方法,根据清空指示进行清空操作看懂了以上之后你是不是发现 so easy,只需要自己根据需求修改逻辑即可,其他的一些文件都是改改类名即可,比如压缩功能之类的。现在你是不是迫不及待的想运行一下试试,我把项目放到开源平台上(DoubleDue/htmlmrgitee.com)了,可以自行下载运行(包含测试数据集)。
参考文献:【1】CSDN 博主「机器熊技术大杂烩」的原创文章https://blog.csdn.net/majianxiong_lzu/article/details/89206198
评论