func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan <-chan []byte
// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
//这里就是要等到startChan完成后才能往下走,
for {
select {
case <-t.channelUpdateChan:
continue
case <-t.pauseChan:
continue
case <-t.exitChan:
goto exit
case <-t.startChan:
//也就是要等到topic执行完GetChannel()之后才会接着往下走
}
break
}
t.RLock()
//将所有channel通道放在chans中
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
//backendChan就是backend暴露给外部的readChan
//参考: https://www.cnblogs.com/werben/p/14517781.html
backendChan = t.backend.ReadChan()
}
// main message loop
//这里是守护协程的主体了,也就是这个for会一直跑
for {
select {
case msg = <-memoryMsgChan:
//如果topic有收到新消息
case buf = <-backendChan:
//如果消息是从diskqueue里来的,还要解码反序列化成msg
msg, err = decodeMessage(buf)
if err != nil {
t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
//如果有新的channel通道
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan:
//如果channel通道暂停
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
//遍历每一个channel通道,将消息投递过去
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
//如果是延时消息则将延时消息丢给channel
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
//将消息则将延时消息丢给channel
err := channel.PutMessage(chanMsg)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}
评论