分布式计算 DAG1- 画猫

发布于: 2020 年 06 月 30 日

做分布式计算引擎的目的,主要还是为了达成分布式时序数据库silverDB的整体设计目标。期望siliverDB除了提供分布式写、时间范围标签组合或者指标组合的查询能力之外,可以支持更多的计算场景。因为,对于时序数据库来说,在计算能力方面,还需要支持时序窗口计算、标签组合或者指标组合的聚合计算、关联计算和持续计算。

对于当前众多的计算引擎,比如spark、storm等深入了解的不多,主要还是基于在前期调研siddhi cep 复杂计算引擎、flink流式计算引擎的核心设计思想和思路。来初探一下分布式计算引擎在算子编排方面的核心DAG,因为只有有了这个基础,才能生成可执行的分布式物理执行计划。顺便说一下,这里我不知道MPP的数据库是如何做分布式计算的,如果有了解的朋友,还请不吝赐教。

在进行具体介绍之前,先暂且命名该分布式计算引擎为RoseFinch吧。本篇文章还是主要介绍,DAG相关的部分。不了解DAG的话,可以参考以下文章。我这里主要还是从分布式计算的角度,来说DAG的构建。

1、执行环境和算子编排

type StreamExecutionEnvironment struct {
DefaultJobName string
TimeCharacteristic TimeCharacteristic //这个时间标示有3个,processTime、ingestionTime、EventTime。分别表示数据处理时间、数据获取时间、数据本身的时间,也就是说在进行时间窗口计算的时候,有三种时间标示可以配置和选择,分别应对不同的处理场景。
DefaultParallelism int
Config *ExecutionConfig
Transformations []*StreamTransformation //记录了用户所编排的算子,使用List来维护。
isChainingEnabled bool
HostName string
Port int
}

在构建DAG的过程中,这里的核心就是transformation这个list,因为整个算子的编排最开始就是通过这些transformation来记录的,也就是说excutionEnvironment 中记录了用户所编排的数据处理逻辑,记录了构建DAG所需要的基本信息。

既然说了streamTransformation这个结构很重要,那就具体看一下这个结构的定义:

type StreamTransformation struct {
Id int
Name string
OutPutType interface{}
Parallelism int // 这里有个并行度设置,默认是本节点cpu cores,但是最终会根据不同的算子类型设置
MinResources *operator.ResourceSpec //默认配置资源,主要包括cpu cores和mem size
PreferredResources *operator.ResourceSpec //自定义配置
uid string
slotSharingGroup string //这个主要用于表示哪些算子是属于同一个组的
coLocationGroupKey string
input interface{} //该transform的上游输入算子
operator interface{} //该transform的具体算子
}

注意:以上算子函数,我这里主要分了4个类型:包括source、map、reduce和sink。也就是说,这里的FromData属于source类型、FlatMap属于map类型、PrintData 属于 sink类型。

接下来,来看一段示例代码,主要从这段代码中,去看看这里的逻辑关系,这个代码只是为了调试DAG所写的,所以对于不必纠结逻辑正确性,主要关注是如何一步步构建DAG的。

func main() {
hostName,_:=os.Hostname()
port:=12345
env:= dataStream.NewStreamExecutionEnvironment(hostName,port) //初始化可执行环境
env.Config.ParallelismDefault=runtime.NumCPU() //这里配置了默认的并行读为cpu cores
data:="dsdsadadasdsdsda"
dataSource:=env.FromData(data,newReadData()) //算子编排从这里开始,传入了一个测试数据data和自定义的处理函数
dataSource.FlatMap(newMapData()).KeyBy(newKeySelector()).FlatMap(newMapData()).PrintData(newWriteData())
env.Execute("first job !")
}

注意:以上算子函数,我这里主要分了4个类型:包括source、map、reduce和sink。也就是说,这里的FromData属于source类型、FlatMap属于map类型、PrintData 属于 sink类型。

那在这个算子编排的链式调用中,具体做了哪些事呢?首先来看,FromData

1、解析输入数据的基本类型

2、将newReadData () (这个函数会生成一个实现了sourceFunction接口的结构体,也就是说当你可以实现该接口,自定义去处理数据读取的逻辑),并将这个函数封装为streamSource,在这里会配置这个streamSource的链接策略。

const (
ALWAYS = iota 上下游算子都可以链接该算子
NEVER 上下游算子不可以链接该算子
HEAD 标示该算子是DAG的头节点,也就是说只能允许其下游节点进行链接
)

3、针对封装的streamSource,接下来就是真正的封装一个sourceTransformation了,其实sourceTransformation是继承了streamTransformation的,在封装sourceTransformation的过程中,最终还是要生成streamTransformation,这里会为该transformation分配一个递增的id,用于表示该操作。

4、接下来就到了这里,一个非常重要的数据结构,dataStream。表示一个算子的基本运行环境和代表算子逻辑的Transformation,这对source类型的算子,这里会配置默认并行读为1。

type DataStream struct {
Environment *StreamExecutionEnvironment
Transformation *StreamTransformation
}

5、然后就将封装好的transformation,添加到可执行环境的transformation的list中。

6、这里还封装了source和 dataStream,其中source是继承dataStream的。在整个source操作一系列复杂的封装中,为什么最终还是要返回source呢?其实,仔细思考一下,这里其实核心只做了三件事。

a、封装source类型的操作函数称为transform,并分配了一个递增id。这个递增Id,代表了算子调用的顺序,非常关键。

同时,还为该算子分配了默认的链接策略,这个也非常关键,在后面dag优化中,会用到。

b、配置了该transform操作的执行环境,比如并行度、资源使用等。

c、将封装好的transform添加到可执行环境的list里面。

所以,不管后面的算子如何编排,基本上都是要实现以上三个核心,最终整个算子使用list的来维护,只有调用excute执行方法时候,才会去真正去进行图的构建、优化和提交。

2、图定义

好吧,终于到了图定义,因为我们已经使用list和递增id维护了编排好的算子,那么接下来就是生成图了,是吧。

streamGraph:=env.getStreamGraph(jobName) //这里首先会生成原始图,也就是streamGraph
jobGraphGenerator:=NewJobGraphGenerator(streamGraph,"") //然后在原始图的基础上,会进行图优化、配置算子中间结果数据集和算子通信的通道。
jobGraph:=jobGraphGenerator.CreateJobGraph() //生成jobGraph
jobGraph.JobName=jobName

具体来看一下,图定义吧。

func (env *StreamExecutionEnvironment) getStreamGraph(jobName string) *StreamGraph {
return env.getStreamGraphGenerator().Generate()
}

在这里,又一个图生成器,也就是会维护执行环境配置和生成图的一些基本信息。

type StreamGraphGenerator struct {
JobName string
Transformations []*StreamTransformation //其实这里的核心关键就是,生成图所需的transformation
ExecutionConfig *ExecutionConfig
Chaining bool
IsSlotSharingEnabled bool
AlreadyTransformed map[*StreamTransformation][]int //另外一个,维护在遍历过程中,维护已经访问过的transformation
StreamGraph *StreamGraph //还有就是图
TimeCharacteristic TimeCharacteristic
}
type StreamGraph struct {
JobName string
ExecutionConfig *ExecutionConfig
chaining bool
timeCharacteristic TimeCharacteristic
StreamNodes map[int]*StreamNode 对于图这里会维护图的节点信息,而每个节点中会维护该节点的出边和入边
Sources map[int]bool 当然,还有根据算子类型判断出来的,source和sink的transformation id
Sinks map[int]bool
virtualPartitionNodes map[int]*StreamPartitioner
}

然后,就开始transform list了,核心方法如下:

transformIds,ok:=generator.AlreadyTransformed[trans] //这里首先就会判断,这个transformation是不是遍历过了,如果没有就返回了。
if ok {
return transformIds
}
if trans.Parallelism <= 0 { //又再次判断是不是配置了并行度
trans.Parallelism=generator.ExecutionConfig.ParallelismDefault
}
_,ok=reflect.TypeOf(trans.operator).MethodByName("SourceRead") 判断这个transform的类型,这里将transform根据算子类型分了几类,这里我门看一下transformSource就可以了,因为都是大同小异的。
if ok {
transformIds=generator.transformSource(trans)
}
_,ok=reflect.TypeOf(trans.operator).MethodByName("SinkWrite")
if ok {
transformIds=generator.transformSink(trans)
}
_,ok=reflect.TypeOf(trans.operator).MethodByName("Reduce")
if ok {
transformIds=generator.transformReduce(trans)
}
_,ok=reflect.TypeOf(trans.operator).MethodByName("Map")
if ok {
transformIds=generator.transformMap(trans)
}
_,ok=reflect.TypeOf(trans.operator).MethodByName("GetKey")
if ok {
transformIds=generator.transformPartition(trans)
}
_,ok=generator.AlreadyTransformed[trans]
if !ok {
generator.AlreadyTransformed[trans]=make([]int,0) //将遍历过的transform添加到该map list中。
generator.AlreadyTransformed[trans]=append(generator.AlreadyTransformed[trans],transformIds...)
}
if trans.uid !="" {
generator.StreamGraph.SetTransformationUID(trans.Id,trans.uid)
}
if trans.MinResources !=nil && trans.PreferredResources !=nil {
generator.StreamGraph.SetResources(trans.Id,trans.MinResources,trans.PreferredResources)
}
return transformIds

那transformSource具体做了哪些事呢?

1、首先将该souce类型的transform添加到图的source list中

2、基于该transform,配置图的node,并添加到 sg.StreamNodes[vertexId]=streamNode 。

注意:也就是说,在streamNode中,是通过一个map结构来维护图中的节点的。注意这个时候在source类型的图节点中,并没有配置节点边。

3、配置节点边是在类似partition、map、reduce、sink这样的transform中。那么,边是如何配置的呢?这里以map类型的transform举例来说明一下:

a、因为每个transform中,都维护其上游的输入算子。所以,根据该算子可以得到其上游输入算子,所以,这个时候再次遍历图,获得该上游算子id

b、然后调用该方法generator.StreamGraph.AddEdge(id,mapOperator.Id),为该上游算子 和 该算子 添加边,核心就是下面的代码:

//根据节点id,获取上下游节点
upStreamNode:=sg.GetStreamNode(upStreamVertexId)
downStreamNode:=sg.GetStreamNode(downStreamVertexId)
//创建上下游节点的边
edge:=NewStreamEdge(upStreamNode,downStreamNode,outputNames,partitioner)
//为下游节点添加入边
sg.GetStreamNode(edge.TargetId).InEdges=append(sg.GetStreamNode(edge.TargetId).InEdges,edge)
//为上游节点添加出边
sg.GetStreamNode(edge.SourceId).OutEdges=append(sg.GetStreamNode(edge.SourceId).OutEdges,edge)

当遍历完所有的transform,也就图中每个节点都添加了对应的出边和入边。这个时候,整个最原始的DAG图就构建完成。

好吧!今天就写到这里,当构建完成整个DAG的时候,接下来就可以根据并行度、链接策略、输出/输出 进行图的优化,将相关的算子可以合并,以提升系统资源的利用率了。

总结,这个就作为silverDB分布式计算引擎调研和模拟的第一篇吧,后面一篇在说整个图的优化。

如果之前不了解分布式计算DAG和分布式时序数据库的话,那么可以关注以下专栏:

https://www.zhihu.com/people/hervor-94/columns

发布于: 2020 年 06 月 30 日 阅读数: 8
用户头像

Hervor。

关注

路漫漫其修远兮 吾将上下而求索 2019.04.21 加入

架构师 (大数据相关方向)

评论

发布
暂无评论
分布式计算DAG1-画猫