【遇见 Doris】Spark Doris Sink 的设计和实现
6 月 29 日,Doris 有幸得到中国信通院云大所、大数据技术标准推进委员会的支持,在中国信通院举行了 0.11.0 新版本预览线下沙龙。各位嘉宾都带来了干货满满的分享。关注 Doris 官方公众号,后台回复“0629”即可获取各位嘉宾分享 PPT 及现场录像。
今天是朱良昌同学代表百度智能云流式计算团队带来 Spark Streaming 对接 Doris 设计与实现的分享。
业务场景
Spark Streaming(主要是 Structured Streaming)在百度内部被广泛应用于实时计算,日志分析,ETL 等业务场景。其中有很多业务方希望可以使用 structured streaming 读取上游数据源(例如:kafka、 hdfs、 database 等),然后对数据进行处理后实时导入 Doris 以供查询分析。
为此流式计算团队专门开发了 Doris sink 的组件来适配 Doris。Doris sink 支持 exactly-once 语义,封装并对用户屏蔽了与 Doris 的交互细节,用户只需要关注用户逻辑和计算本身,经过简单配置即可非常方便的将流式数据导入到 Doris 中。
Structured Streaming 介绍
Structured Streaming 是 Spark 2.3 版本之后提出的新的流式计算引擎,具有以下特点:
1. 具备良好的扩展性和高容错性
2. 基于 Spark-SQL 引擎,使用 DataFrame API,使用简单
3. 相比于 Spark 2.2 版本之前使用 DStream API 的 spark Streaming 模型,Structured Streaming 支持更加丰富的流式语义。例如:基于 eventTime 的 window、聚合计算,watermark,流和 static DataFrame 的 join,流和流的简单 join 等
4. 端到端的 exactly-once 语义。非常适用于要求数据不重不丢的业务
Spark 工作模型
Spark 作业整体架构如上图所示:
Driver:作为程序的入口,执行用户代码,生成 DAG(有限无环图),对整个 app 的资源进行协调和管理。
Executor: 执行用户逻辑,dataset 计算(transformation 和 action)。一个 executor 中可以并行执行多个 task,task 的并发量由启动 executor 时指定的 core 数决定,而每个 task 负责对一个 partition 进行计算。
Cluster Manager: 百度内部使用的主要是 Yarn。
Structured Streaming 编程模型
Structured Streaming 与传统意义上的流式计算系统不同,它是一个微批次(micro-batch)的流式计算系统。其主要原理是将源源不断的数据,切分成一个一个的小数据块,其中每一小数据块称之为一个 batch。当每次触发计算时,系统会处理一个 batch 中的数据,而 batch 和 batch 之间则是串行执行的。一个 batch 的计算,可以看做是一个 Spark-SQL job 的一次执行,故一个 Structured Streaming 作业可以看做是无穷多个 job 组成的一个不会停止作业。
WordCount Demo
上图是一个 Structured Streaming 使用 Complete output mode 执行 Wordcount 的示例。
nc 是指 linux 的 netcat 指令,input 数据通过 socket 传入。在时间点 1,系统接收到 4 条数据作为一个 batch 进行计算,产生了结果(cat, 1)(dog, 3)。在时间点 2 的时候又来了两条数据,这两条数据会成为一个新的 batch 进行计算,新的计算结果会加到最终的结果里(cat, 2)(dog,3)(owl,1)。以此类推,每来一批新数据,将这批数据作为一个 batch 进行计算,然后对结果进行更新。Structured Streaming 就这样源源不断的将输入数据切分为一个一个小的 batch,然后执行计算。
端到端 Exactly-once 语
Structured Streaming 的 exactly-once 语义要求数据从读取->计算->写出的过程,实现端到端的不重不丢。因此对各模块有如下要求:
1. Source 可回溯且可回放。简单来说就是可以重复消费,常用 Source 主要是 kafka,bigpipe(百度内部以 c++实现的类似 kafka 的消息队列)
2. Execution engine 记录 checkpoint。引擎会在处理每个 batch 之前,先写 WAL 来记录当前 batch 要读哪些数据。如果发生 failover,可以利用 checkpoint 对 batch 的数据进行重新计算。故 Source + Execution engine 做到了 at-least once 语义,即数据的不丢
3. 支持幂等写的 sink,对重复数据去重,从而保证了任何情况 failover 的 exactly-once 语义
Checkpoint & WAL:
上图是一个 Batch 的计算流程, 以此来讲解 Execution engine 如果做到数据不丢:
1. batch 刚开始时,调用 getOffset, 获取该 batch 要处理的数据范围,即 offsetRange
2. 将 offsetRange [startOffset, endOffset)存储在 OffsetLog 中
3. 调用 getBatch,利用 offsetRange 构建 Dataset, 提交 batch
4. Executor 对 batch 进行计算,并将结果 sink 到下游系统
5. batch 运行结束,由 driver 写 commit log,标识该 batch 运行完成
因此一个 batch 从执行开始到结束会写两个 log,一个 offsetlog,一个 commitlog。通过这个两个 log 可以保证任何 Failover 场景下的数据不丢。
Failover case 分析:
Case 1 中,因为 OffsetLog 中记录的最新 batchId 和 CommitLog 中记录的最新 batchId 相等,所以 Failover 后,引擎发现第 75 个 batch 已经成功运行结束,且没有 batch 需要重放,则从第 76 个 batch 开始继续执行。
Case 2 中,OffsetLog 中最新的 batchId 是 85, 而 CommitLog 中记录的最新的 batchId 是 84,两者不相等,说明作业 failover 发生在 batch 85 执行过程中,此时需要重新执行 batch 85。
Sink 幂等写入:
Source + Execution-engine 保证了数据的不丢,如果在此基础上希望实现端到端的 exactly once,就需要 Sink 支持幂等写入以支持数据的去重。
Doris Sink 的设计与实现
由上文介绍可知,要实现端到端的 exacly-once 语义,需要下游系统支持对数据的去重,所以在设计 Doris Sink 时,就要考虑 Doris 对数据的去重功能。Doris 有一个很明显的特点:它的写入是唯一的,即对同一 Database,对同一个 label 的导入是唯一的,同一个 label 只能被导入一次,不可以被多次导入,即一个 label 对应唯一一批数据。因此我们可以利用该特性来进行 Doris sink 的去重设计。
Label 的生成逻辑
1. 每个 structured streaming 作业启动时都必须指定一个 checkpointlocation,且每个作业的 checkpoint 必须是唯一的,不能混用。
2. batch 是顺序执行的,因此每个 batch 的 id 是顺序递增且唯一的。
3. 每个 batch 实际上是一个普通的 spark job,其中的每个数据分片,可以通过 paritionId 的来标识。
因此,由 3 元组(checkpointLocation + batchId + paritionId)组成的 label 可以唯一的确定一个 structured streaming 作业中的一段数据。那么只要确保在 failover 前后同一段数据对应的 label 相同,即可以此来去重以实现 exactly-once 语义。
val replace = tmp.replaceAll("[-|/|:|.]", "_") +
s"_${batchId}_${TaskContext.getPartitionId}"
// shrink multiple underscores to one
// e.g: label___test => label_test
val builder = new StringBuilder
for (index <- 0 to replace.length - 1) {
if (index == 0) {
builder += replace(0)
} else if (replace(index - 1) != '_' || replace(index) != '_') {
builder += replace(index)
}
}
val resultStr = builder.toString
val length = resultStr.length
if (length > 128) {
logWarning("palo label size larger than 128!, we truncate it!")
resultStr.substring(length - 128, length)
} else {
resultStr
}
以上是生成 label 代码的部分实现,我们会对特殊字符进行一些处理,且如果生成的 label 超过 128 个字符会截断,因为 Doris 的 label 最多只支持 128 个字符。
Doris sink 结构图
Spark 本身提供了 Sink 接口,我们通过继承 Sink 接口来实现 Doris sink 组件。
这里重点关注 DorisWriterTask,该 task 是 executor 中实际进行计算的 task 逻辑,它有 3 种实现,分布对应了 Doris 的 3 种 Load 模式(Bulk load, Broker load, Streaming load)。下面主要介绍 Bulk load 和 Broker load。Streaming load 将在近期实现。
Doris Bulk Load Task
1. 开始执行后,每个 task 会首先先将数据写入本地磁盘形成文件,文件则以上文提到的 label 来命名。
2. Task 发送 http 请求,以 bulk load 的方式,向 Doris 发起 load 请求
3. Task 轮询 Doris,查询步骤 2 中的 load 是否结束
4. Finish load 之后,删除步骤 1 中生成的本地文件
注意点:
1. 容错处理:每个 task 执行过程中会对部分异常进行处理并重试,重试 4 次后(可配置),如果仍旧失败,则整个 batch 重算
2. 上述过程中步骤 3 的轮询的意义在于我们需要确保一个 batch 的数据成功导入后才能开始执行下一个 batch,所以我们一定要通过 query load info 的方法,确保 label 对应的数据被成功 load。
3. bulk load 的模式,适用于单个 partition 数据不超过 1GB 的导入。
Doris DFS Load Task
1. 开始执行后,每个 task 会首先先将数据写入 hdfs,文件则以上文提到的 label 来命名。
2. Task 想 Doris 发送 broker load 请求
3. Doris broker 去 hdfs load 文件
4. Task 轮询 Doris,查询步骤 2 中的 load 是否结束。
5. Finish load 之后,删除步骤 1 中生成的 hdfs 文件
注意点:
1. 容错处理:每个 task 执行过程中会对部分异常进行处理并重试,重试 4 次后(可配置),如果仍旧失败,则整个 batch 重算
2. 适用于单个 partition 数据超过 1GB 的导入。
Doris 社区 Pull Request
https://github.com/apache/incubator-doris/pull/1332
此次沙龙我们有幸邀请到了来自一点资讯、京东、搜狐、百度智能云的技术大牛带来他们的应用实践和开发分享。
其他嘉宾的分享会在近日放出,欢迎关注 Apache Doris(incubating)官方公众号,后台回复“0629”即可获取各位嘉宾分享 PPT 及现场录像。
欢迎扫码关注:
Apache Doris(incubating)官方公众号
相关链接:
Apache Doris 官方网站:
http://doris.incubator.apache.org
Apache Doris Github:
https://github.com/apache/incubator-doris
Apache Doris Wiki:
https://github.com/apache/incubator-doris/wiki
Apache Doris 开发者邮件组:
dev@doris.apache.org
版权声明: 本文为 InfoQ 作者【ApacheDoris】的原创文章。
原文链接:【http://xie.infoq.cn/article/cd9e3c5ac570da3e4a369b9ad】。文章转载请联系作者。
评论