写点什么

听说 90% 的人都没搞定手撕协程池这道面试题!

作者:王中阳Go
  • 2023-11-22
    北京
  • 本文字数:2801 字

    阅读完需:约 9 分钟

听说90%的人都没搞定手撕协程池这道面试题!

特别的缘分

听说 90%的人都没搞定手撕协程池这道面试题!


能看到这篇文章一定是特殊的缘分,请务必珍惜,请详细看看吧,哈哈。



不止上图,最近 Go就业训练营 中不少小伙伴说,面试中碰到了好几次手撕协程池的问题。

解题思路:

  1. 定义协程池结构体:首先,我们需要定义一个协程池的结构体,包含协程池的属性和方法。结构体中需要包含一个任务队列、协程池的大小、当前运行的协程数量等属性。

  2. 初始化协程池:在初始化函数中,我们需要创建一个指定大小的任务队列,并初始化协程池的属性。

  3. 添加任务到协程池:当有任务需要执行时,我们将任务添加到任务队列中。

  4. 启动协程池:在启动函数中,我们需要根据协程池的大小创建对应数量的协程,并从任务队列中获取任务进行执行。每个协程会不断从任务队列中获取任务并执行,直到任务队列为空。

  5. 控制协程数量:在协程池中,我们需要控制同时运行的协程数量,以防止过多的协程导致资源浪费。可以使用信号量或者计数器来控制协程的数量。


通过以上的解题思路,我们可以实现一个基本的协程池。


在实际应用中,可能还需要考虑一些其他的因素,如任务优先级、任务超时处理等。根据具体的需求,可以对协程池进行进一步的扩展和优化。


说完了解题思路,再给大家一个可参考,可运行的示例代码:

示例代码:

package main
import ( "fmt" "sync")
type Job struct { ID int}
type Worker struct { ID int JobChannel chan Job Quit chan bool}
type Pool struct { WorkerNum int JobChannel chan Job WorkerQueue chan chan Job Quit chan bool wg sync.WaitGroup}
// NewWorker 创建一个新的工作者func NewWorker(id int, workerQueue chan chan Job) Worker { return Worker{ ID: id, JobChannel: make(chan Job), Quit: make(chan bool), }}
// Start 启动工作者func (w Worker) Start(workerQueue chan chan Job) { go func() { for { workerQueue <- w.JobChannel select { case job := <-w.JobChannel: fmt.Printf("Worker %d started job %d\n", w.ID, job.ID) // 执行任务 fmt.Printf("Worker %d finished job %d\n", w.ID, job.ID) case <-w.Quit: return } } }()}
// Start 启动工作者池func (p *Pool) Start() { for i := 0; i < p.WorkerNum; i++ { worker := NewWorker(i, p.WorkerQueue) worker.Start(p.WorkerQueue) }
go func() { for { select { case job := <-p.JobChannel: worker := <-p.WorkerQueue worker <- job case <-p.Quit: for i := 0; i < p.WorkerNum; i++ { worker := <-p.WorkerQueue worker <- Job{} // 发送空任务,通知协程退出 } p.wg.Done() return } } }()}
// AddJob 添加作业到作业通道func (p *Pool) AddJob(job Job) { p.JobChannel <- job}
// Stop 停止工作者池func (p *Pool) Stop() { p.Quit <- true p.wg.Wait()}
func main() { pool := Pool{ WorkerNum: 5, JobChannel: make(chan Job), WorkerQueue: make(chan chan Job, 5), Quit: make(chan bool), }
pool.Start()
for i := 0; i < 10; i++ { job := Job{ID: i} pool.AddJob(job) }
pool.Stop()}
复制代码


以下是对代码的注释:


  • Job 结构体定义了作业的 ID。

  • Worker 结构体定义了工作者的 ID、作业通道和退出通道。

  • Pool 结构体定义了工作者数量、作业通道、工作者队列和退出通道,以及一个等待组(WaitGroup)。

  • NewWorker 函数创建一个新的工作者,并返回一个工作者实例。

  • Start 方法启动一个工作者,它从工作者队列中获取自己的作业通道,并在循环中等待作业的到来,执行作业并在退出通道接收到信号时返回。

  • Pool 的 Start 方法启动工作者池,创建指定数量的工作者,并将它们启动。同时,它还启动一个循环,等待作业的到来,并将作业分发给可用的工作者。

  • AddJob 方法将作业添加到作业通道中,供工作者池处理。

  • Stop 方法停止工作者池,向退出通道发送信号,并等待所有工作者完成当前作业后返回。

  • main 函数创建一个工作者池实例,启动工作者池,并添加一些作业。最后,调用 Stop 方法停止工作者池。

答疑

有小伙伴提出了疑问:


WorkerQueue chan chan Job 的作用是什么?为什么要这么定义?

解答一下:

WorkerQueue chan chan Job 的作用是用于传递工作者(Worker)的作业通道(JobChannel)。它是一个通道(channel),其中每个元素都是一个作业通道。


为什么要这么定义呢?这是因为在工作者池模式中,每个工作者需要一个独立的作业通道来接收作业。通过将每个工作者的作业通道放入一个通道中,可以实现对工作者的动态分配和管理。


具体来说,WorkerQueue 通道用于存储每个工作者的作业通道。当有新的作业到达时,工作者池会从 WorkerQueue 中取出一个可用的工作者的作业通道,并将作业发送到该通道中,由相应的工作者进行处理。


这种设计可以有效地控制并发任务的分配和调度。通过将工作者的作业通道放入 WorkerQueue 中,可以实现对工作者的复用和动态管理,避免了频繁地创建和销毁工作者协程的开销。


总结起来,WorkerQueue 的定义允许工作者池动态地管理工作者的作业通道,实现对并发任务的高效分配和调度。

运行结果

总结

以上代码实现了一个简单的工作池(Worker Pool)模式。工作池由一组固定数量的工作者(Worker)协程组成,它们从作业通道(JobChannel)中获取作业(Job)并执行。


欢迎在评论区交流讨论。

一起学习

欢迎大家关注我的账号,你的支持,是我更文的最大动力!


也欢迎关注我的公众号: 程序员升职加薪之旅,领取更多 Go 学习和面试资料。


微信号:wangzhongyang1993

发布于: 刚刚阅读数: 5
用户头像

王中阳Go

关注

靠敲代码在北京买房的程序员 2022-10-09 加入

【微信】wangzhongyang1993【公众号】程序员升职加薪之旅【成就】InfoQ专家博主👍掘金签约作者👍B站&掘金&CSDN&思否等全平台账号:王中阳Go

评论 (1 条评论)

发布
用户头像
学Go碰到问题,欢迎加我微信:wangzhongyang1993
刚刚 · 北京
回复
没有更多了
听说90%的人都没搞定手撕协程池这道面试题!_Go_王中阳Go_InfoQ写作社区