写点什么

自定义 Hadoop 的输入格式

用户头像
小舰
关注
发布于: 2021 年 04 月 11 日

背景:这两天要把一个文件中的的多个 html 代码块进行解析,该文件特别大将近 1TB,所以想用 Hadoop 来处理。该文件内容格式如下所示:htmlds.txt


<html>

<title>title1</title>

<div>xxxx</div>....

</html>

<html>

<title>title2</title>

<div>xxxxxxx</div>...

</html>....

现在就有个问题,Hadoop 默认的 map 处理逻辑是对每一行进行同样的逻辑处理,而我现在需要对每一个<html>...</html>为一个单元进行处理,因此就需要进行 Hadoop 输入格式的自定义。


基础知识【1】:

1.什么是输入格式 MapReduce 中接口 InputFormat 定义了获取数据分片和获取记录读取器的方法,分别是 getSplits 和 createRecordReader 方法,分别用于获取数据分片和定义数据访问方式;


public abstract class InputFormat<K, V> {

public abstract List<InputSplit> getSplits(JobContext context)throws IOException, InterruptedException;

public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException;

}


RecordReader 类是实际用来加载数据并把数据转换为适合 Mapper 读取的键值对,它会在输入块上被重复的调用直到整个输入块被处理完毕,每一次调用 RecordReader 都会调用 Mapper 的 map()方法。


2.有哪些输入格式抽象类 FileInputFormat 实现了 InputFormat 接口,是所有操作文件类型输入类的父类。InputFormat 常见的接口实现类包括 TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat、SequenceFileInputFormat、DBInputFormat 等。TextInputFormatTextInputFormat 是默认的 InputFormat。TextInputFormat 提供了一个 LineRecordReader,这个类会把输入文件的每一行作为值,每一行在文件中的字节偏移量为键。每条记录是一行输入,键是 LongWritable 类型,存储该行在整个文件中的字节偏移量,值是这行的内容,不包括任何行终止符(换行符和回车符)。KeyValueTextInputFormat 每一行均为一条记录,被分隔符分割为 key,value。可以通过在驱动类中设置 conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "");来设定分隔符。默认分隔符是 tab(\t)。此时的键是每行排在制表符之前的 Text 序列。实现方案:通过上面的介绍,我们知道了输入格式是什么东西,那怎么改呢,就是重写我们的 InputFormat 接口,由于我们的实现功能是一次读取多行数据,所以可以参照 LineRecordReader 来实现我们需要的功能。LineRecordReader 类是每次读取一行就结束,那我们要想每次读取一个<html>...</html>代码块,只需要修改逻辑,即我们只有读取到</html>才结束这一次读取过程。所以,我们可以自定义 HtmlLineRecordReader 类,继承 RecordReader 类,修改 nextKeyValue 里面的读取逻辑。HtmlLineRecordReader.java


  public boolean nextKeyValue() throws IOException {    if (key == null) {      key = new LongWritable();    }    key.set(pos);    if (value == null) {      value = new Text();    }    int newSize = 0;            // flag为是否清空的标记,保证每隔两条清空一次    boolean flag=true;    //curValue 为读取到的最新的一行,value为这次循环读取的所有行的不断积累    Text curValue = new Text();    int cycleCount = 0;    do{        if (curValue.getLength()==0) {        cycleCount++;    }else {      cycleCount = 0;    }        while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {          if (pos == 0) {            newSize = skipUtfByteOrderMark();          } else {            //重写readLine方法,传入curValue和flag            newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos),curValue,flag);            pos += newSize;          }
if ((newSize == 0) || (newSize < maxLineLength)) { break; }
// line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); }
复制代码


// System.out.println("For...Loop:value:"+value.toString());// System.out.println("For...Loop:curValue:"+curValue.toString());//如果未读取到</html>(''是因为我自己数据集的问题包含'')则不清空累计 append 的 value,if (!curValue.toString().equals("</html>\")) {flag = false;}//结束条件:遇到了'</html>'字符串,cycleCount 是为了防止我的文件出现连续空行,该异常不多见可以根据自己情况适当调整}while(!curValue.toString().equals("</html>\")&&cycleCount<20);//do while... end//


    if (newSize == 0) {      key = null;      value = null;      return false;    } else {      return true;    }  }
复制代码


readLine 方法在 HtmlLineReader.java 里 public int readLine(Text str, int maxLineLength,int maxBytesToConsume,Text curValue,boolean flag) throws IOException {if (this.recordDelimiterBytes != null) {return readCustomLine(str, maxLineLength, maxBytesToConsume,curValue,flag);} else {return readDefaultLine(str, maxLineLength, maxBytesToConsume,curValue,flag);}}然后重载 readCustomLine 和 readDefaultLine 两个方法/*** 重载 readCustomerLine 方法*/private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume,Text curValue,boolean flag)throws IOException {


  if(flag){    str.clear();  }  curValue.clear();  int txtLength = 0; // tracks str.getLength(), as an optimization  long bytesConsumed = 0;  int delPosn = 0;  int ambiguousByteCount=0; // To capture the ambiguous characters count  do {    int startPosn = bufferPosn; // Start from previous end position    if (bufferPosn >= bufferLength) {      startPosn = bufferPosn = 0;      bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);      if (bufferLength <= 0) {        if (ambiguousByteCount > 0) {          str.append(recordDelimiterBytes, 0, ambiguousByteCount);          curValue.append(recordDelimiterBytes, 0, ambiguousByteCount);          bytesConsumed += ambiguousByteCount;        }        break; // EOF      }    }    for (; bufferPosn < bufferLength; ++bufferPosn) {      if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {        delPosn++;        if (delPosn >= recordDelimiterBytes.length) {          bufferPosn++;          break;        }      } else if (delPosn != 0) {        bufferPosn--;        delPosn = 0;      }    }    int readLength = bufferPosn - startPosn;    bytesConsumed += readLength;    int appendLength = readLength - delPosn;    if (appendLength > maxLineLength - txtLength) {      appendLength = maxLineLength - txtLength;    }    bytesConsumed += ambiguousByteCount;    if (appendLength >= 0 && ambiguousByteCount > 0) {      //appending the ambiguous characters (refer case 2.2)      str.append(recordDelimiterBytes, 0, ambiguousByteCount);      curValue.append(recordDelimiterBytes, 0, ambiguousByteCount);      ambiguousByteCount = 0;      // since it is now certain that the split did not split a delimiter we      // should not read the next record: clear the flag otherwise duplicate      // records could be generated      unsetNeedAdditionalRecordAfterSplit();    }    if (appendLength > 0) {      str.append(buffer, startPosn, appendLength);      curValue.append(buffer, startPosn, appendLength);      txtLength += appendLength;    }    if (bufferPosn >= bufferLength) {      if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {        ambiguousByteCount = delPosn;        bytesConsumed -= ambiguousByteCount; //to be consumed in next      }    }  } while (delPosn < recordDelimiterBytes.length       && bytesConsumed < maxBytesToConsume);  if (bytesConsumed > Integer.MAX_VALUE) {    throw new IOException("Too many bytes before delimiter: " + bytesConsumed);  }  return (int) bytesConsumed; }

/** * 重载readDefaultLine方法 */private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume,Text curValue,boolean flag) throws IOException { if(flag){ str.clear(); } curValue.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) { ++bytesConsumed; //account for CR from previous read } bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); curValue.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int)bytesConsumed;}
复制代码


以上就完成了 Hadoop 输入格式的自定义工作,我们再总结一下:重写新实现 InputFormat,其中比较重要的两个类:HtmlLineRecordReader 类,该类里面的 nextKeyValue 方法,主要完成设置清空 value 值的条件;HtmlLineReader 类,该类中的 readCustomLine 和 readDefaultLine 方法,根据清空指示进行清空操作看懂了以上之后你是不是发现 so easy,只需要自己根据需求修改逻辑即可,其他的一些文件都是改改类名即可,比如压缩功能之类的。现在你是不是迫不及待的想运行一下试试,我把项目放到开源平台上(DoubleDue/htmlmr​gitee.com)了,可以自行下载运行(包含测试数据集)。


参考文献:【1】CSDN 博主「机器熊技术大杂烩」的原创文章https://blog.csdn.net/majianxiong_lzu/article/details/89206198

发布于: 2021 年 04 月 11 日阅读数: 15
用户头像

小舰

关注

公众号:DLab数据实验室 2020.11.12 加入

中国人民大学硕士

评论

发布
暂无评论
自定义Hadoop的输入格式