写点什么

Go 定时任务源码 - robfig/cron

作者:人生如梦
  • 2022-11-21
    福建
  • 本文字数:1367 字

    阅读完需:约 4 分钟

介绍

robfig/cron是 Go 语言实现的开源定时任务调度框架,核心代码是巧妙的使用 chan + select + for 实现了一个轻量级调度协程,不但语法简洁,而且具有很好的性能。

设计

任务抽象(业务隔离):任务抽象成一个 Job 接口,业务逻辑类只需实现该接口


type Job interface {  Run()}
复制代码


计划接口:通过当前时间计算任务的下次执行执行时间,具体实现类可以根据实际需求实现


type Schedule interface {  Next(time.Time) time.Time}
复制代码


定时任务对象:保存执行的任务 Job、计算执行时间


type Entry struct {  ID       EntryID   // id  Schedule Schedule  // 计划  Next     time.Time // 下次执行时间  Job      Job       // 任务}
复制代码


任务调度管理:保存定时任务对象(Entry),调度任务执行,提供新增、删除接口(涉及关联资源竞争)


// 任务管理类type Cron struct {  nextID  int64        // 生成entry自增ID  entries []*Entry     // 保存Entry  add     chan *Entry  // 添加  remove  chan EntryID // 删除}// 删除func (c *Cron) Remove(id EntryID) {   c.remove <- id}// 新增func (c *Cron) Add(spec string, cmd Job) EntryID  {   entry := &Entry{    ID:         EntryID(atomic.AddInt64(&c.nextID, 1)),    Schedule:   ParseStandard(spec),    Job:        cmd,  }  c.add <- entry  return entry.ID}
复制代码


核心调度:计算下次执行时间 -> 排序 -> 取最早执行数据 -> timer 等待,因为只有一个协程在执行这个 run 的调度,所以不存在资源竞争,不需要加锁,另外考虑到执行任务可能涉及阻塞,例如:IO 操作,所以一般 startJob 方法会开启协程执行


func (c *Cron) run() {  now := time.Now()  for _, entry := range c.entries {    entry.Next = entry.Schedule.Next(now) // 计算下次执行时间  }  for {    sort.Sort(byTime(c.entries)) // 时间排序    timer := time.NewTimer(c.entries[0].Next.Sub(now))    select {    case now = <-timer.C:      for _, e := range c.entries {        if e.Next.After(now) || e.Next.IsZero() {          break        }        c.startJob(e.Job) // 开协程执行        e.Next = e.Schedule.Next(now) // 计算下次执行时间      }    case newEntry := <-c.add: // 新增      timer.Stop()      newEntry.Next = newEntry.Schedule.Next(now)      c.entries = append(c.entries, newEntry)    }    ...  }}// 执行任务func (c *Cron) startJob(j Job) {  go func() {    j.Run()  }()}
复制代码


启动时会开启唯一协程执行 run 方法,计算任务执行时间,执行,任务管理等


func New() *Cron {  c := &Cron{    entries: nil,    add:     make(chan *Entry),    remove:  make(chan EntryID),  }  return c}func (c *Cron) Start() {  go c.run()}
复制代码

总结

  1. 共享资源(定时任务)的管理和调度由唯一协程管理

  2. 通过 for + select + channel 来循环计算执行时间,监听任务到期、增删事件

  3. 执行任务会新启协程执行,不阻塞调度

  4. 采用扇入/扇出原理,多协程添加、增删任务调度协程(Fan In),调度启动新协程执行任务(Fan Out)

  5. 调度协程使用的是 CSP 并发模型思想


我的博客:https://itart.cn

原文地址:https://itart.cn/blogs/2022/explore/cron-source-code.html

发布于: 刚刚阅读数: 4
用户头像

人生如梦

关注

热爱编程,追求极致,IT是门技术-itart.cn 2021-11-23 加入

从事软件开发,对编程有浓厚兴趣,始终把IT当成一门艺术,最求代码优化极致 - 个人博客:https://itart.cn

评论

发布
暂无评论
Go定时任务源码 - robfig/cron_Go_人生如梦_InfoQ写作社区