写点什么

「Go 实战」一文带你搞懂从单队列到优先级队列的实现

作者:Go学堂
  • 2022-11-17
    北京
  • 本文字数:7515 字

    阅读完需:约 25 分钟

「Go实战」一文带你搞懂从单队列到优先级队列的实现

一、 优先级队列概述

队列,是数据结构中实现先进先出策略的一种数据结构。而优先队列则是带有优先级的队列,即先按优先级分类,然后相同优先级的再 进行排队。优先级高的队列中的元素会优先被消费。如下图所示:



在 Go 中,可以定义一个切片,切片的每个元素代表一种优先级队列,切片的索引顺序代表优先级顺序,后面代码实现部分我们会详细讲解。

二、 为什么需要优先级队列

先来看现实生活中的例子。银行的办事窗口,有普通窗口和 vip 窗口,vip 窗口因为排队人数少,等待的时间就短,比普通窗口就会优先处理。同样,在登机口,就有贵宾通道和普通,同样贵宾通道优先登机。


在互联网中,当然就是请求和响应。使用优先级队列的作用是将请求按特定的属性划分出优先级,然后按优先级的高低进行优先处理。在研发服务的时候这里有个隐含的约束条件就是服务器资源(CPU、内存、带宽等)是有限的。如果服务器资源是无限的,那么也就不需要队列进行排队了,来一个请求就立即处理一个请求就好了。所以,为了在最大限度的利用服务器资源的前提下,将更重要的任务(优先级高的请求)优先处理,以更好的服务用户。


对于请求优先级的划分可以根据业务的特点根据价值高的优先原则来进行划分即可。例如可以根据是否是否是会员、是否是 VIP 会员等属性进行划分优先级。也可以根据是否是付费用户进行划分。在博客的业务中,也可以根据是否是大 V 的属性进行优先级划分。在互联网广告业务中,可以根据广告位资源价值高低来划分优先级。

三、优先级队列实现原理

01 四个角色

在完整的优先级队列中有四个角色,分别是优先级队列、工作单元、消费者 worker、通知 channel


  • 工作单元 Job:队列里的元素。我们把每一次业务处理都封装成一个工作单元,该工作单元会进入对应的优先级队列进行排队,然后等待消费者 worker 来消费执行。

  • 优先级队列:按优先级划分的队列,用来暂存对应优先级的工作单元 Job,相同优先级的工作单元会在同一个队列里。

  • noticeChan 通道:当有工作单元进入优先级队列排队后,会在通道里发送一个消息,以通知消费者 worker 从队列中获取元素(工作单元)进行消费。

  • 消费者 worker:监听 noticeChan,当监听到 noticeChan 有消息时,说明队列中有工作单元需要被处理,优先从高优先级队列中获取元素进行消费。

02 队列-消费者模式

根据队列个数和消费者个数,我们可以将队列-消费者模式分为单队列-单消费者模式多队列(优先级队列)- 单消费者模式多队列(优先级队列)- 多消费者模式


我们先从最简单的单队列-单消费者模式实现,然后一步步演化成多队列(优先级队列)-多消费者模式。

03 单队列-单消费者模式实现

3.1 队列的实现

我们先来看下队列的实现。这里我们用 Golang 中的 List 数据结果来实现,List 数据结构是一个双向链表,包含了将元素放到链表尾部、将头部元素弹出的操作,符合队列先进先出的特性。


我们看下具体的队列的数据结构:


type JobQueue struct {    mu sync.Mutex //队列的操作需要并发安全    jobList *list.List //List是golang库的双向队列实现,每个元素都是一个job    noticeChan chan struct{} //入队一个job就往该channel中放入一个消息,以供消费者消费}
复制代码


  • 入队操作


/*** 队列的Push操作*/func (queue *JobQueue) PushJob(job Job) {    queue.jobList.PushBack(job) //将job加到队尾    queue.noticeChan <- struct{}{}}
复制代码


到这里有同学就会问了,为什么不直接将 job 推送到 Channel 中,然后让消费者依次消费不就行了么?是的,单队列这样是可以的,因为我们最终目标是为了实现优先级的多队列,所以这里即使是单队列,我们也使用 List 数据结构,以便后续的演变


还有一点,大家注意到了,这里入队操作时有一个 这样的操作:


queue.noticeChan <- struct{}{}
复制代码


消费者监听的实际上不是队列本身,而是通道 noticeChan。当有一个元素入队时,就往 noticeChan 通道中输入一条消息,这里是一个空结构体,主要作用就是通知消费者 worker,队列里有要处理的元素了,可以从队列中获取了。 这个在后面演化成多队列以及多消费者模式时会很有用。


  • 出队操作


根据队列的先进先出原则,是要获取队列的最先进入的元素。Golang 中 List 结构体的 Front()函数是获取链表的第一个元素,然后通过 Remove 函数将该元素从链表中移出,即得到了队列中的第一个元素。这里的 Job 结构体先不用关心,我们后面实现工作单元 Job 时,会详细讲解。


/*** 弹出队列的第一个元素*/func (queue *JobQueue) PopJob() Job {    queue.mu.Lock()    defer queue.mu.Unlock()
/*** 说明在队列中没有元素了*/ if queue.jobList.Len() == 0 { return nil }
elements := queue.jobList.Front() //获取队里的第一个元素 return queue.jobList.Remove(elements).(Job) //将元素从队列中移除并返回}
复制代码


  • 等待通知操作


上面我们提到,消费者监听的是 noticeChan 通道。当有元素入队时,会往 noticeChan 中输入一条消息,以便通知消费者进行消费。如果队列中没有要消费的元素,那么消费者就会阻塞在该通道上。


func (queue *JobQueue) WaitJob() <-chan struct{} {  return queue.noticeChan}
复制代码
3.2 工作单元--Job 的实现

一个工作单元就是一个要执行的任务。在系统中往往需要执行不同的任务,就是需要有不同类型的工作单元,但这些工作单元都有一组共同的执行流程。我们看下工作单元的类图。



我们看下类图中的几个角色:


  • Job 接口:定义了所有 Job 要实现的方法。

  • BaseJob 类(结构体):定义了具体 Job 的基类。因为具体 Job 类中的有共同的属性和方法。所以抽象出一个基类,避免重复实现。但该基类对 Execute 方法没有实现,因为不同的工作单元有具体的执行逻辑。

  • SquareJob 和 AreaJob 类(结构体):是我们要具体实现的业务工作 Job。主要是实现 Execute 的具体执行逻辑。根据业务的需要定义自己的工作 Job 和对应的 Execute 方法即可。


接下来,我们以计算一个 int 类型数字的平方的 SquareJob 为例来看下具体的实现。


  • BaseJob 结构体


首先看下该结构体的定义


type BaseJob struct {    Err error    DoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者    Ctx context.Context    cancelFunc context.CancelFunc}
复制代码


在该结构体中,我们主要关注 DoneChan 字段就行,该字段是当具体的 Job 的 Execute 执行完成后,来通知调用者的。


再来看 Done 函数,该函数就是在 Execute 函数完成后,要关闭 DoneChan 通道,以解除 Job 的阻塞而继续执行其他逻辑。


/*** 作业执行完毕,关闭DoneChan,所有监听DoneChan的接收者都能收到关闭的信号*/func (job *BaseJob) Done() {    close(job.DoneChan)}
复制代码


再来看 WaitDone 函数,该函数是当 Job 执行后,要等待 Job 执行完成,在未完成之前,DoneChan 里没有消息,通过该函数就能将 job 阻塞,直到 Execute 中调用了 Done(),以便解除阻塞。


/*** 等待job执行完成*/func (job *BaseJob) WaitDone()  {    select {        case <-job.DoneChan:        return    }}
复制代码


  • SquareJob 结构体


type SquareJob struct {    *BaseJob    x int}
复制代码


从结构体的定义中可知,SquareJob 嵌套了 BaseJob,所以该结构体拥有 BaseJob 的所有字段和方法。在该结构体主要实现了 Execute 的逻辑:对 x 求平方。


func (s *SquareJob) Execute() error {    result := s.x * s.x    fmt.Println("the result is ", result)    return nil}
复制代码
3.3 消费者 Worker 的实现

Worker 主要功能是通过监听队列里的 noticeChan 是否有需要处理的元素,如果有元素的话从队列里获取到要处理的元素 job,然后执行 job 的 Execute 方法。


我们将该结构体定位为 WorkerManager,因为在后面我们讲解多 Worker 模式时,会需要一个 Worker 的管理者,因此定义成了 WorkerManager。


type WorkerManager struct {    queue *JobQueue    closeChan chan struct{}}
复制代码


StartWorker 函数,只有一个 for 循环,不断的从队列中获取 Job。获取到 Job 后,进行消费 Job,即 ConsumeJob。



func (m *WorkerManager) StartWork() error { fmt.Println("Start to Work") for { select { case <-m.closeChan: return nil
case <-m.queue.noticeChan: job := m.queue.PopJob() m.ConsumeJob(job) } }
return nil}
func (m *WorkerManager) ConsumeJob(job Job) { defer func() { job.Done() }()
job.Execute()}
复制代码


到这里,单队列-单消费者模式中各角色的实现就讲解完了。我们通过 main 函数将其关联起来。


func main() {    //初始化一个队列    queue := &JobQueue{        jobList: list.New(),        noticeChan: make(chan struct{}, 10),    }
//初始化一个消费worker workerManger := NewWorkerManager(queue)
// worker开始监听队列 go workerManger.StartWork()
// 构造SquareJob job := &SquareJob{ BaseJob: &BaseJob{ DoneChan: make(chan struct{}, 1), }, x: 5, }
//压入队列尾部 queue.PushJob(job)
//等待job执行完成 job.WaitDone() print("The End")}
复制代码

04 多队列-单消费者模式

有了单队列-单消费者的基础,我们如何实现多队列-单消费者模式。也就是优先级队列。



优先级的队列,实质上就是根据工作单元 Job 的优先级属性,将其放到对应的优先级队列中,以便 worker 可以根据优先级进行消费。我们要在 Job 结构体中增加一个 Priority 属性。因为该属性是所有 Job 都共有的,因此定义在 BaseJob 上更合适.


type BaseJob struct {    Err error    DoneChan chan struct{} //当作业完成时,或者作业被取消时,通知调用者    Ctx context.Context    cancelFunc context.CancelFunc    priority int //工作单元的优先级}
复制代码


我们再来看看多队列如何实现。实际上就是用一个切片来存储各个队列,切片的每个元素存储一个 JobQueue 队列元素即可。


var queues = make([]*JobQueue, 10, 100)
复制代码


那各优先级的队列在切片中是如何存储的呢?切片索引顺序只代表优先级的高于低,不代表具体是哪个优先级。


什么意思呢?假设我们现在对目前的工作单元定义了 1、4、7 三个优先级。这 3 个优先级在切片中是按优先级从小到到依次存储在 queues 切片中的,如下图:



那为什么不让切片的索引就代表优先级,让优先级为 1 的队列存储在索引 1 处,优先级 4 的队列存储在索引 4 处,优先级 7 的队列存储在索引 7 处呢?如果这样存储的话,就会变成如下这样:



可见如果我们设定的优先级不是连续的,那么就会造成空间的浪费。所以,我们是将队列按优先级高低依次存放到了切片中。


那既然这样,当一个优先级的 job 来了之后,我该怎么知道该优先级的队列是存储在哪个索引中呢?我们用一个 map 来映射优先级和切片索引之间的关系。这样当一个工作单元 Job 入队的时候,以优先级为 key,就可以查找到对应优先级的队列存储在切片的哪个位置了。如下图所示:



代码定义:


var priorityIdx map[int][int]//该map的key是优先级,value代表的是queues切片的索引
复制代码


好了,我们重新定义一下队列的结构体:


type PriorityQueue struct {    mu sync.Mutex    noticeChan chan struct{}    queues []*JobQueue    priorityIdx map[int]int}
//原来的JobQueue会变成如下这样:type JobQueue struct { priority int //代表该队列是哪种优先级的队列 jobList *list.List //List是golang库的双向队列实现,每个元素都是一个job}
复制代码


这里我们注意到有以下几个变化:


  • JobQueue 里多了一个 Priority 属性,代表该队列是哪个优先级别。

  • noticeChan 属性从 JobQueue 中移动到了 PriorityQueue 中。因为现在有多个队列,只要任意一个队列里有元素就需要通知消费者 worker 进行消费,因此消费者 worker 监听的是 PriorityQueue 中是否有元素,而在监听阶段不关心具体哪个优先级队列中有元素。


好了,数据结构定义完了,我们看看将工作单元 Job 推入队列和从队列中弹出 Job 又有什么变化。


  • 优先级队列的入队操作


优先级队列的入队操作,就需要根据入队 Job 的优先级属性放到对应的优先级队列中,入队流程图如下:



当一个 Job 加入队列的时候,有两种场景,一种是该优先级的队列已经存在,则直接 Push 到队尾即可。一种是该优先级的队列还不存在,则需要先创建该优先级的队列,然后再将该工作单元 Push 到队尾。如下是两种场景。


队列已经存在的场景这种场景会比较简单。假设我们要插入优先级为 7 的工作单元,首先从映射表中查找 7 是否存在,发现对应关系是 2,则直接找到切片中索引 2 的元素,即优先级为 7 的队列,将 job 加入即可。如下图。



队列不存在的场景这种场景稍微复杂些,在映射表中找不到要插入优先级的队列的话,则需要在切片中插入一个优先级队列,而为了优先级队列在切片中也保持有序(保持有序就可以知道队列的优先级的高低了),则需要移动相关的元素。我们以插入优先级为 6 的工作单元为例来讲解。


1、首先,我们的队列有一个初始化的状态,存储了优先级 1、4、7 的队列。如下图。



2、当插入优先级为 6 的工作单元时,发现在映射表中没有优先级 6 的映射关系,说明在切片中还没有优先级为 6 的队列的元素。所以需要在切片中依次查找到优先级 6 应该插入的位置在 4 和 7 之间,也就是需要存储在切片 2 的位置。



3、将原来索引 2 位置的优先级为 7 的队列往后移动到 3,同时更新映射表中的对应关系。



4、将优先级为 6 的工作单元插入到索引 2 的队列中,同时更新映射表中的优先级和索引的关系。



我们看下代码实现:


func (priorityQueue *PriorityQueue) Push(job Job) {    priorityQueue.mu.Lock()    defer priorityQueue.mu.Unlock()
//先根据job的优先级找要入队的队列 var idx int var ok bool //从优先级-切片索引的map中查找该优先级的队列是否存在 if idx, ok = priorityQueue.priorityIdx[job.Priority()]; !ok { //如果不存在该优先级的队列,则需要初始化一个队列,并返回该队列在切片中的索引位置 idx = priorityQueue.addPriorityQueue(job.Priority) }
//根据获取到的切片索引idx,找到具体的队列 queue := priority.queues[idx] //将job推送到队列的队尾 queue.JobList.PushBack(job)
//队列job个数+1 priorityQueue.Size++
//如果队列job个数超过队列的最大容量,则从优先级最低的队列中移除工作单元 if priorityQueue.size > priorityQueue.capacity { priorityQueue.RemoveLeastPriorityJob() }else { //通知新进来一个job priorityQueue.noticeChan <- struct{}{} }}
复制代码


代码中大部分也都做了注释,不难理解。这里我们来看下 addPriorityQueue 的具体实现:


func (priorityQueue *PriorityQueue) addPriorityQueue(priority int) int {    n := len(priorityQueue.queues)    //通过二分查找找到priority应插入的切片索引    pos := sort.Search(n, func(i int) bool {        return priority < priorityQueue.priority    })
//更新映射表中优先级和切片索引的对应关系 for i := pos; i < n; i++ { priorityQueue.priorityIdx[priorityQueue.queues[i].priority] = i + 1 }
tail := make([]*jobQueue, n-pos) copy(tail, priorityQueue.queues[pos:])
//初始化一个新的优先级队列,并将该元素放到切片的pos位置中 priorityQueue.queues = append(priorityQueue.queues[0:pos], newJobQueue(priority))
//将高于priority优先级的元素也拼接到切片后面 priorityQueue.queues = append(priorityQueue.queues, tail...)
return pos}
复制代码


最后,我们再来看一个实际的调用例子:


func main() {    //初始化一个队列    queue := &PriorityQueue{        noticeChan: make(chan struct{}, cap),        capacity: cap,        priorityIdx: make(map[int]int),        size: 0,    }
//初始化一个消费worker workerManger := NewWorkerManager(queue)
// worker开始监听队列 go workerManger.StartWork()
// 构造SquareJob job := &SquareJob{ BaseJob: &BaseJob{ DoneChan: make(chan struct{}, 1), }, x: 5, priority: 10, }
//压入队列尾部 queue.PushJob(job)
//等待job执行完成 job.WaitDone() print("The End")}
复制代码

05 多队列-多消费者模式


我们在多队列-单消费者的基础上,再来看看多消费者模式。也就是增加 worker 的数量,提高 Job 的处理速度。


我们再来看下 worker 的定义:


type WorkerManager struct {    queue *PriorityQueue    closeChans []chan struct{}}
复制代码


这里需要注意,closeChans 变成了切片数组。因为我们每启动一个 worker,就需要有一个关闭通道。


然后看 StartWorker 函数的实现:



func (m *WorkerManager) StartWork(n int) error { fmt.Println("Start to Work") for i := 0; i < n; i++ { m.createWorker(); }
return nil}
func (m *WorkerManager) createWorker() { closeChan := make(chan struct{}) //每个协程,就是一个worker go func(closeChan chan struct{}) { var job Job for { select { case <-m.closeChan: return nil
case <-m.queue.noticeChan: job := m.queue.PopJob() m.ConsumeJob(job) } } }(closeChan)
m.closeChanMu.Lock() defer m.closeChanMu.Unlock() m.closeChans = append(m.closeChans, closeChan) return nil}
func (m *WorkerManager) ConsumeJob(job Job) { defer func() { job.Done() }()
job.Execute()}
复制代码


这里需要注意的是,所有的 worker 都需要监听队列的 noticeChan 通道。测试的例子就留给读者自己了。


另外如下图的单队列-多消费者模式是多队列-多消费者模式的一个特例,这里就不再进行实现了。


四、总结

队列的作用可以用来控制流量,而优先级队列在兼顾流量控制的同时,还能将流量按优先级高低来进行处理。 本文中一些细节的并发加锁操作做了忽略,大家在实际应用中根据需要进行完善即可。


---特别推荐---

特别推荐:一个专注 go 项目实战、项目中踩坑经验及避坑指南、各种好玩的 go 工具的公众号。「Go 学堂」,专注实用性,非常值得大家关注。点击下方公众号卡片,直接关注。关注送《100 个 go 常见的错误》pdf 文档。

发布于: 刚刚阅读数: 4
用户头像

Go学堂

关注

关注「Go学堂」,学习更多编程知识 2019-08-06 加入

专注Go编程知识、案例、常见错误及原理分析。意在通过阅读更多优秀的代码,提高编程技能。同名公众号「Go学堂」期待你的关注

评论

发布
暂无评论
「Go实战」一文带你搞懂从单队列到优先级队列的实现_Go_Go学堂_InfoQ写作社区