背景:这两天要把一个文件中的的多个 html 代码块进行解析,该文件特别大将近 1TB,所以想用 Hadoop 来处理。该文件内容格式如下所示:htmlds.txt
<html>
<title>title1</title>
<div>xxxx</div>....
</html>
<html>
<title>title2</title>
<div>xxxxxxx</div>...
</html>....
现在就有个问题,Hadoop 默认的 map 处理逻辑是对每一行进行同样的逻辑处理,而我现在需要对每一个<html>...</html>为一个单元进行处理,因此就需要进行 Hadoop 输入格式的自定义。
基础知识【1】:
1.什么是输入格式 MapReduce 中接口 InputFormat 定义了获取数据分片和获取记录读取器的方法,分别是 getSplits 和 createRecordReader 方法,分别用于获取数据分片和定义数据访问方式;
public abstract class InputFormat<K, V> {
public abstract List<InputSplit> getSplits(JobContext context)throws IOException, InterruptedException;
public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException;
}
RecordReader 类是实际用来加载数据并把数据转换为适合 Mapper 读取的键值对,它会在输入块上被重复的调用直到整个输入块被处理完毕,每一次调用 RecordReader 都会调用 Mapper 的 map()方法。
2.有哪些输入格式抽象类 FileInputFormat 实现了 InputFormat 接口,是所有操作文件类型输入类的父类。InputFormat 常见的接口实现类包括 TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat、SequenceFileInputFormat、DBInputFormat 等。TextInputFormatTextInputFormat 是默认的 InputFormat。TextInputFormat 提供了一个 LineRecordReader,这个类会把输入文件的每一行作为值,每一行在文件中的字节偏移量为键。每条记录是一行输入,键是 LongWritable 类型,存储该行在整个文件中的字节偏移量,值是这行的内容,不包括任何行终止符(换行符和回车符)。KeyValueTextInputFormat 每一行均为一条记录,被分隔符分割为 key,value。可以通过在驱动类中设置 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "");来设定分隔符。默认分隔符是 tab(\t)。此时的键是每行排在制表符之前的 Text 序列。实现方案:通过上面的介绍,我们知道了输入格式是什么东西,那怎么改呢,就是重写我们的 InputFormat 接口,由于我们的实现功能是一次读取多行数据,所以可以参照 LineRecordReader 来实现我们需要的功能。LineRecordReader 类是每次读取一行就结束,那我们要想每次读取一个<html>...</html>代码块,只需要修改逻辑,即我们只有读取到</html>才结束这一次读取过程。所以,我们可以自定义 HtmlLineRecordReader 类,继承 RecordReader 类,修改 nextKeyValue 里面的读取逻辑。HtmlLineRecordReader.java
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// flag为是否清空的标记,保证每隔两条清空一次
boolean flag=true;
//curValue 为读取到的最新的一行,value为这次循环读取的所有行的不断积累
Text curValue = new Text();
int cycleCount = 0;
do{
if (curValue.getLength()==0) {
cycleCount++;
}else {
cycleCount = 0;
}
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
//重写readLine方法,传入curValue和flag
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos),curValue,flag);
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
复制代码
// System.out.println("For...Loop:value:"+value.toString());// System.out.println("For...Loop:curValue:"+curValue.toString());//如果未读取到</html>(''是因为我自己数据集的问题包含'')则不清空累计 append 的 value,if (!curValue.toString().equals("</html>\")) {flag = false;}//结束条件:遇到了'</html>'字符串,cycleCount 是为了防止我的文件出现连续空行,该异常不多见可以根据自己情况适当调整}while(!curValue.toString().equals("</html>\")&&cycleCount<20);//do while... end//
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
复制代码
readLine 方法在 HtmlLineReader.java 里 public int readLine(Text str, int maxLineLength,int maxBytesToConsume,Text curValue,boolean flag) throws IOException {if (this.recordDelimiterBytes != null) {return readCustomLine(str, maxLineLength, maxBytesToConsume,curValue,flag);} else {return readDefaultLine(str, maxLineLength, maxBytesToConsume,curValue,flag);}}然后重载 readCustomLine 和 readDefaultLine 两个方法/*** 重载 readCustomerLine 方法*/private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume,Text curValue,boolean flag)throws IOException {
if(flag){
str.clear();
}
curValue.clear();
int txtLength = 0; // tracks str.getLength(), as an optimization
long bytesConsumed = 0;
int delPosn = 0;
int ambiguousByteCount=0; // To capture the ambiguous characters count
do {
int startPosn = bufferPosn; // Start from previous end position
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
if (bufferLength <= 0) {
if (ambiguousByteCount > 0) {
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
curValue.append(recordDelimiterBytes, 0, ambiguousByteCount);
bytesConsumed += ambiguousByteCount;
}
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
delPosn++;
if (delPosn >= recordDelimiterBytes.length) {
bufferPosn++;
break;
}
} else if (delPosn != 0) {
bufferPosn--;
delPosn = 0;
}
}
int readLength = bufferPosn - startPosn;
bytesConsumed += readLength;
int appendLength = readLength - delPosn;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
bytesConsumed += ambiguousByteCount;
if (appendLength >= 0 && ambiguousByteCount > 0) {
//appending the ambiguous characters (refer case 2.2)
str.append(recordDelimiterBytes, 0, ambiguousByteCount);
curValue.append(recordDelimiterBytes, 0, ambiguousByteCount);
ambiguousByteCount = 0;
// since it is now certain that the split did not split a delimiter we
// should not read the next record: clear the flag otherwise duplicate
// records could be generated
unsetNeedAdditionalRecordAfterSplit();
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
curValue.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
if (bufferPosn >= bufferLength) {
if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
ambiguousByteCount = delPosn;
bytesConsumed -= ambiguousByteCount; //to be consumed in next
}
}
} while (delPosn < recordDelimiterBytes.length
&& bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
}
return (int) bytesConsumed;
}
/**
* 重载readDefaultLine方法
*/
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume,Text curValue,boolean flag)
throws IOException {
if(flag){
str.clear();
}
curValue.clear();
int txtLength = 0; //tracks str.getLength(), as an optimization
int newlineLength = 0; //length of terminating newline
boolean prevCharCR = false; //true of prev char was CR
long bytesConsumed = 0;
do {
int startPosn = bufferPosn; //starting from where we left off the last time
if (bufferPosn >= bufferLength) {
startPosn = bufferPosn = 0;
if (prevCharCR) {
++bytesConsumed; //account for CR from previous read
}
bufferLength = fillBuffer(in, buffer, prevCharCR);
if (bufferLength <= 0) {
break; // EOF
}
}
for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
if (prevCharCR) { //CR + notLF, we are at notLF
newlineLength = 1;
break;
}
prevCharCR = (buffer[bufferPosn] == CR);
}
int readLength = bufferPosn - startPosn;
if (prevCharCR && newlineLength == 0) {
--readLength; //CR at the end of the buffer
}
bytesConsumed += readLength;
int appendLength = readLength - newlineLength;
if (appendLength > maxLineLength - txtLength) {
appendLength = maxLineLength - txtLength;
}
if (appendLength > 0) {
str.append(buffer, startPosn, appendLength);
curValue.append(buffer, startPosn, appendLength);
txtLength += appendLength;
}
} while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int)bytesConsumed;
}
复制代码
以上就完成了 Hadoop 输入格式的自定义工作,我们再总结一下:重写新实现 InputFormat,其中比较重要的两个类:HtmlLineRecordReader 类,该类里面的 nextKeyValue 方法,主要完成设置清空 value 值的条件;HtmlLineReader 类,该类中的 readCustomLine 和 readDefaultLine 方法,根据清空指示进行清空操作看懂了以上之后你是不是发现 so easy,只需要自己根据需求修改逻辑即可,其他的一些文件都是改改类名即可,比如压缩功能之类的。现在你是不是迫不及待的想运行一下试试,我把项目放到开源平台上(DoubleDue/htmlmrgitee.com)了,可以自行下载运行(包含测试数据集)。
参考文献:【1】CSDN 博主「机器熊技术大杂烩」的原创文章https://blog.csdn.net/majianxiong_lzu/article/details/89206198
评论