写点什么

【遇见 Doris】Apache Doris Parquet 文件读取的设计与实现

用户头像
ApacheDoris
关注
发布于: 2021 年 03 月 24 日

6 月 29 日,Doris 有幸得到中国信通院云大所、大数据技术标准推进委员会的支持,在中国信通院举行了 0.11.0 新版本预览线下沙龙。各位嘉宾都带来了干货满满的分享。关注 Doris 官方公众号,后台回复“0629”即可获取各位嘉宾分享 PPT 及现场录像。




今天是 Doris 的 Contributor 徐小冰同学代表搜狐带来的关于 Apache Doris (incubating)Parquet 文件读取的设计与实现。





所有需求的推动都基于真实的业务痛点。搜狐在 Hadoop 上的文件主要存储为 Parquet。


Parquet 有如下优势:


  • 列式存储,压缩比高(RLE、字段编码等),查询效率高(列 pruning,block filter)

  • Spark/Impala/Hive 都支持(ORC Impala 最新版本才支持)


而 Doris 只支持 CSV 格式。因此 Parquet 文件的读取流程就需要两步:


  • 通过相关命令行或者工具将数据表导出到 csv 文件中

  • 通过 Doris load 命令进行导入


这种方式的问题在于一方面 CSV 默认换行符是\n,如果数据中包含该\n 会导致 load 失败。并且整体效率不高。因此,Doris 支持 Parquet 读取,势在必行。


Load 流程概览


 首先来了解一下 load 流程。Doris 的整体架构如图所示。



首先通过 MySQL Client 和 MySQL 协议,将 load 请求下发到 FE master 上。MySQL 协议是异步的,执行完成后就会直接返回给 MySQL client。


如果导入多个文件,可能会需要在 FE 进行负载均衡,即分发给其他的 FE(FE follower),FE 会从 BE 中选择来处理,BE 会分别通过 broker 读取 HDFS,由 BE 进行处理。


Load 流程



Load-Prepare 流程



Load-Open 流程



Load-Scanner 流程



使用方式


 导入指定文件 


显示指定文件格式为:parquet,指定语句为:


FORMAT AS "parquet"


如若缺省则按照导入文件后缀名判断。


具体示例:


LOAD LABEL mydatabase.load_test1(DATA INFILE("hdfs://127.0.0.1:8020/tmp/xuxb/000444abc_0")


INTO TABLE parquet_table


FORMAT AS "parquet"


(col_a, col_b, col_c))


WITH BROKER broker_parquet;


 导入指定目录


同时也支持指定目录,指定语句同样为:


FORMAT AS "parquet"


通过 DATA INFILE 后的/*来指定目录文件


DATA INFILE("hdfs://127.0.0.1:8020/tmp/xuxb/abc/*")


具体示例:


LOAD LABEL mydatabase.load_test1


(


DATA INFILE("hdfs://127.0.0.1:8020/tmp/xuxb/abc/*")


INTO TABLE parquet_table


FORMAT AS "parquet"


(col_a, col_b, col_c)


)


WITH BROKER broker_parquet;

 

 存在分区字段 


对于分区字段,在 Parquet 文件中是不存在的,在这种情况下应该如何应用?


例如 id,type 是两个分区字段,在 Parquet 文件中不存在该列信息,首先创建 Doris 表如下,将 id,type 存在 Doris 表中


CREATE TABLE demo_mem_infos


(


id INT, #分区字段 存在Doris表中


type INT, #分区字段 存在Doris表中


col_a VARCHAR(128),


col_b VARCHAR(128),


col_c VARCHAR(128)


)


ENGINE=olap


AGGREGATE KEY(`col_a`,`col_b`,`col_c`)


DISTRIBUTED BY HASH(id) BUCKETS 25


PROPERTIES ("storage_type"="column","replication_num" = "1");


Load 的写法如下:


LOAD LABEL mydatabase.load_test1


(


DATA INFILE("hdfs://127.0.0.1:8020/tmp/id=100/type=10/*")


INTO TABLE demo_mem_infos


FORMAT AS "parquet"


(col_a,col_b,col_c)


set (


id=default_value("100"),


type=default_value("10")


)


)


WITH BROKER broker_parquet; 


其中,在 DATA INFILE 中进行说明:


DATA INFILE("hdfs://127.0.0.1:8020/tmp/id=100/type=10/*")


并指定导入方式:


FORMAT AS "parquet"


并用 set 语句对这两个分区字段进行默认值的设置


set (


    id=default_value("100"),


    type=default_value("10")


)


即可完成导入。


 导入状态 


在导入过程中,通过 show load 查看状态, 因为 Parquet 压缩比比较高,可能会出现内存溢出问题:


type:ETL_RUN_FAIL; msg:Broker etl failed: Memory limit exceeded 


可在执行 load 之前执行如下命令:


set exec_mem_limit = 64424509440; #单位为字节, 可根据实际情况进行设置。


可参考:FAQ


https://github.com/apache/incubator-doris/wiki/Doris-FAQ


Arrow 库的引入


在实现读取 Parquet 文件的过程中引入了第三方库 Arrow:


代码地址: https://github.com/apache/arrow


引入 Arrow 库的原因有几点:


  • Arrow 接口抽象度非常高,只需要写少量代码就可读取 parquet 文件

  • 开发周期短,不用自己维护相关数据结构以及内存


具体的代码展示如下:


对现有的 scanner 对象进行抽象,定义抽象类 BaseScanner。




读取 RowGroup 效率提升的实现



在实现上,先去调取文件含有多少个 RowGroup,然后循环遍历,一次只读一个,提高了效率。



测试结果


导入 27G 的一张表,测试结果如下:



  • 内存过高


  • varchar 类型最大支持 65535。如果存储一篇文章内容可能无法满足。


性能优化


  • Arrow 接口不熟悉,第一版功能导入 2G 数据花费 2 个小时没有完成。通过 BE log 中 profile 信息发现网络 io 性能不佳,重新阅读 Arrow 源码+Arrow ut, 将实现方式改为每次读取一个完整 RowGroup(主要优化点)


  • 第二版完成后,性能任然不达标,通过 ut+perf+火焰图,定位发现 map find 为性能瓶颈点,优化代码后显著提升。


代码贡献

  • 将 libarrow 等相关第三方库编译加入到 thirdparty 中:


  • 将相关实现提交 be/fe/fs_broker 中


  • 解决 Doris 两个小问题




此次沙龙我们有幸邀请到了来自一点资讯、京东、搜狐、百度智能云的技术大牛带来他们的应用实践和开发分享。


其他嘉宾的分享会在近日放出,欢迎关注 Apache Doris(incubating)官方公众号,后台回复“0629”即可获取各位嘉宾分享 PPT 及现场录像。




欢迎扫码关注:



Apache Doris(incubating)官方公众号


相关链接:


Apache Doris 官方网站:


http://doris.incubator.apache.org


Apache Doris Github:

https://github.com/apache/incubator-doris

Apache Doris Wiki:

https://github.com/apache/incubator-doris/wiki

Apache Doris 开发者邮件组:

dev@doris.apache.org


发布于: 2021 年 03 月 24 日阅读数: 16
用户头像

ApacheDoris

关注

还未添加个人签名 2021.03.17 加入

Doris(原百度Palo https://cloud.baidu.com/product/palo.html )是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在2017年开源,2018年进入 Apache 孵化器

评论

发布
暂无评论
【遇见Doris】Apache Doris Parquet文件读取的设计与实现