channel 有下面几个重要的成员,其实跟 topic 还有点像,都有一个 memoryMsgChan 和 diskqueue
memoryMsgChan: 这是存放消息的内存,就是一个通道,通道的大小 MemQueueSize,
默认配置是 10000,也就是如果堆积的消息超过 10000 就会使用磁盘了
backend :就是 diskqueue,这个就是磁盘存储消息的地方了,关于这个 diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html
clients : Consumer 这里关联的是客户端的订阅者
deferredMessages 和 deferredPQ : 延迟消息存放的地方,其中 deferredPQ,是一个优先级管理的队列,直接丢在内存
inFlightMessages 和 inFlightPQ: 这个标识正在执行中的消息,也是直接丢在内存
topic 在 messagePump()中处理消息的时候,通过下面两个函数,将消息投递到 channel。
//延时消息
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
//非延时消息
channel.PutMessage(chanMsg)
复制代码
在这里我们看不到 Channel 的“守护”协程,也就是我们只看到 topic 将 msg 投递到 channel,channel 将延时消息丢到队列中,但是找不到 channel 从队列中取出数据发送给 client consumer 的地方
找到 channel 的两个函数, 看了下就是这两个函数将 msg 发送给 client consumer 的
func (c *Channel) processDeferredQueue(t int64) bool
func (c *Channel) processInFlightQueue(t int64) bool
复制代码
但是 channel 本身没有“守护”协程,一直运行来调用这两个函数,找这两个函数调用的地方,一直往上找调用者。最终找到“守护”协程在哪里了。
原来在 nsqd 里面运行了一个 queueScanLoop 的“守护”协程。
一、nsqd.queueScanLoop
//nsqd.go
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
//定时执行loop的间隔时间,默认100ms
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
//刷新channel数量,重新调整协程池,默认时间是5s刷新一次
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
//从 channels 中随机选择 num 个 channel
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
//等待处理响应,记录失败次数
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
//queueScanLoop的处理方法模仿了Redis的概率到期算法
//(probabilistic expiration algorithm),
//每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择,
//从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel,
//如果某个被选中channel存在InFlighting消息或者Deferred消息,
//则认为该channel为“脏”channel。
//如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%),
//则不投入睡眠,直接进行下一次概率选择
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
n.logf(LOG_INFO, "QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}
//协程池调整
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
// 协程池大小 = 总channel数 / 4
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
//idealPoolSize 协程池大小,最大默认是4个
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
// contract
// 协程池协程容量 < 当前已经开启的协程数量
// 说明开启的协程过多,需要关闭协程
// closeCh queueScanWorker会中断“守护”协程
// 关闭后,将当前开启的协程数量-1
closeCh <- 1
n.poolSize--
} else {
// expand
// 协程池协程容量 > 当前已经开启的协程数量
// 开启新的协程
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
// 真正调用channel.processInFlightQueue的地方
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
//处理一次某个channel的消息
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
//如果这个channel有消息,则设置为dirty=true
responseCh <- dirty
case <-closeCh:
return
}
}
}
复制代码
二、channel.processInFlightQueue
现在搞清楚了 processInFlightQueue 调用的地方,那这个 processInFlightQueue 到底做了什么
func (c *Channel) processInFlightQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
for {
c.inFlightMutex.Lock()
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
//只要有消息就标识这个通道是脏的
dirty = true
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
//循环将channel中存在inFlightPQ中消息put出去
//put到memoryMsgChan或者磁盘
c.put(msg)
}
exit:
return dirty
}
func (c *Channel) put(m *Message) error {
select {
//将消息写入到memoryMsgChan中去
case c.memoryMsgChan <- m:
//如果memoryMsgChan满了则将消息写到磁盘中区
default:
err := writeMessageToBackend(m, c.backend)
c.nsqd.SetHealth(err)
if err != nil {
c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
c.name, err)
return err
}
}
return nil
}
复制代码
三、channel.processDeferredQueue
接下来看看延时消息是怎么处理的
延时消息看起来跟非延时消息没什么区别
其实是有区别的,区别就在 c.deferredPQ.PeekAndShift(t)这里
func (c *Channel) processDeferredQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
for {
c.deferredMutex.Lock()
//区别在这个PeekAndShift里面
item, _ := c.deferredPQ.PeekAndShift(t)
c.deferredMutex.Unlock()
if item == nil {
goto exit
}
dirty = true
msg := item.Value.(*Message)
_, err := c.popDeferredMessage(msg.ID)
if err != nil {
goto exit
}
//put到memoryMsgChan或者磁盘
c.put(msg)
}
exit:
return dirty
}
//pqueue.go
//max实参传入的是当前时间
func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
if pq.Len() == 0 {
return nil, 0
}
item := (*pq)[0]
//存入延时消息时,Priority优先级就是消息的到期时间
//这里只有当前时间大于了消息的到期时间才回返回
if item.Priority > max {
return nil, item.Priority - max
}
heap.Remove(pq, 0)
return item, 0
}
复制代码
评论