写点什么

nsq 源码阅读之 Channel

用户头像
werben
关注
发布于: 2021 年 03 月 23 日

channel 有下面几个重要的成员,其实跟 topic 还有点像,都有一个 memoryMsgChan 和 diskqueue


  1. memoryMsgChan: 这是存放消息的内存,就是一个通道,通道的大小 MemQueueSize,

默认配置是 10000,也就是如果堆积的消息超过 10000 就会使用磁盘了


  1. backend :就是 diskqueue,这个就是磁盘存储消息的地方了,关于这个 diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html


  1. clients : Consumer 这里关联的是客户端的订阅者


  1. deferredMessages 和 deferredPQ : 延迟消息存放的地方,其中 deferredPQ,是一个优先级管理的队列,直接丢在内存


  1. inFlightMessages 和 inFlightPQ: 这个标识正在执行中的消息,也是直接丢在内存


topic 在 messagePump()中处理消息的时候,通过下面两个函数,将消息投递到 channel。


//延时消息channel.PutMessageDeferred(chanMsg, chanMsg.deferred)//非延时消息channel.PutMessage(chanMsg)
复制代码

在这里我们看不到 Channel 的“守护”协程,也就是我们只看到 topic 将 msg 投递到 channel,channel 将延时消息丢到队列中,但是找不到 channel 从队列中取出数据发送给 client consumer 的地方


找到 channel 的两个函数, 看了下就是这两个函数将 msg 发送给 client consumer 的

func (c *Channel) processDeferredQueue(t int64) boolfunc (c *Channel) processInFlightQueue(t int64) bool
复制代码


但是 channel 本身没有“守护”协程,一直运行来调用这两个函数,找这两个函数调用的地方,一直往上找调用者。最终找到“守护”协程在哪里了。


原来在 nsqd 里面运行了一个 queueScanLoop 的“守护”协程。


一、nsqd.queueScanLoop

//nsqd.go
func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int)
//定时执行loop的间隔时间,默认100ms workTicker := time.NewTicker(n.getOpts().QueueScanInterval) //刷新channel数量,重新调整协程池,默认时间是5s刷新一次 refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh)
for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit }
num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) }
loop: //从 channels 中随机选择 num 个 channel for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] }
//等待处理响应,记录失败次数 numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } }
//queueScanLoop的处理方法模仿了Redis的概率到期算法 //(probabilistic expiration algorithm), //每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择, //从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel, //如果某个被选中channel存在InFlighting消息或者Deferred消息, //则认为该channel为“脏”channel。 //如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%), //则不投入睡眠,直接进行下一次概率选择 if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } }
exit: n.logf(LOG_INFO, "QUEUESCAN: closing") close(closeCh) workTicker.Stop() refreshTicker.Stop()}
//协程池调整func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
// 协程池大小 = 总channel数 / 4 idealPoolSize := int(float64(num) * 0.25) if idealPoolSize < 1 { idealPoolSize = 1 } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax { //idealPoolSize 协程池大小,最大默认是4个 idealPoolSize = n.getOpts().QueueScanWorkerPoolMax } for { if idealPoolSize == n.poolSize { break } else if idealPoolSize < n.poolSize { // contract // 协程池协程容量 < 当前已经开启的协程数量 // 说明开启的协程过多,需要关闭协程 // closeCh queueScanWorker会中断“守护”协程 // 关闭后,将当前开启的协程数量-1 closeCh <- 1 n.poolSize-- } else { // expand // 协程池协程容量 > 当前已经开启的协程数量 // 开启新的协程 n.waitGroup.Wrap(func() { n.queueScanWorker(workCh, responseCh, closeCh) }) n.poolSize++ } }}
// 真正调用channel.processInFlightQueue的地方func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) { for { select { case c := <-workCh: //处理一次某个channel的消息 now := time.Now().UnixNano() dirty := false if c.processInFlightQueue(now) { dirty = true } if c.processDeferredQueue(now) { dirty = true } //如果这个channel有消息,则设置为dirty=true responseCh <- dirty case <-closeCh: return } }}
复制代码


二、channel.processInFlightQueue


现在搞清楚了 processInFlightQueue 调用的地方,那这个 processInFlightQueue 到底做了什么


func (c *Channel) processInFlightQueue(t int64) bool {	c.exitMutex.RLock()	defer c.exitMutex.RUnlock()
if c.Exiting() { return false }
dirty := false for { c.inFlightMutex.Lock() msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock()
if msg == nil { goto exit }
//只要有消息就标识这个通道是脏的 dirty = true
_, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } //循环将channel中存在inFlightPQ中消息put出去 //put到memoryMsgChan或者磁盘 c.put(msg) }
exit: return dirty}
func (c *Channel) put(m *Message) error { select { //将消息写入到memoryMsgChan中去 case c.memoryMsgChan <- m: //如果memoryMsgChan满了则将消息写到磁盘中区 default: err := writeMessageToBackend(m, c.backend) c.nsqd.SetHealth(err) if err != nil { c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.name, err) return err } } return nil}
复制代码


三、channel.processDeferredQueue

接下来看看延时消息是怎么处理的

延时消息看起来跟非延时消息没什么区别

其实是有区别的,区别就在 c.deferredPQ.PeekAndShift(t)这里

func (c *Channel) processDeferredQueue(t int64) bool {	c.exitMutex.RLock()	defer c.exitMutex.RUnlock()
if c.Exiting() { return false }
dirty := false for { c.deferredMutex.Lock() //区别在这个PeekAndShift里面 item, _ := c.deferredPQ.PeekAndShift(t) c.deferredMutex.Unlock()
if item == nil { goto exit } dirty = true
msg := item.Value.(*Message) _, err := c.popDeferredMessage(msg.ID) if err != nil { goto exit } //put到memoryMsgChan或者磁盘 c.put(msg) }
exit: return dirty}
//pqueue.go //max实参传入的是当前时间func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) { if pq.Len() == 0 { return nil, 0 }
item := (*pq)[0] //存入延时消息时,Priority优先级就是消息的到期时间 //这里只有当前时间大于了消息的到期时间才回返回 if item.Priority > max { return nil, item.Priority - max } heap.Remove(pq, 0)
return item, 0}
复制代码


发布于: 2021 年 03 月 23 日阅读数: 13
用户头像

werben

关注

还未添加个人签名 2018.01.08 加入

还未添加个人简介

评论

发布
暂无评论
nsq源码阅读之Channel