写点什么

nsq 源码阅读之 Topic

用户头像
werben
关注
发布于: 2021 年 03 月 23 日

Topic 的入口在哪里:GetTopic()


GetTopic 如果存在则直接返回,不存在则 NewTopic()


个人觉得 Topic 里面有两个重要的变量和一个函数,搞清楚这三个东西就差不多了


  1. memoryMsgChan: 这是存放消息的内存,就是一个通道,通道的大小 MemQueueSize,

默认配置是 10000,也就是如果堆积的消息超过 10000 就会使用磁盘了


  1. backend :就是 diskqueue,这个就是磁盘存储消息的地方了,这个 diskqueue 一定要搞懂,因为后面 channel 也会用到这个 queue,关于这个 diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html


  1. messagePump : 这是 topic 的一个“守护”协程,看源码里的英文注释, messagePump selects over the in-memory and backend queue and writes messages to every channel for this topic,这个协程不断的将 Topic 收到的消息,分发给每个 channel。


func (t *Topic) messagePump() {	var msg *Message	var buf []byte	var err error	var chans []*Channel	var memoryMsgChan chan *Message	var backendChan <-chan []byte
// do not pass messages before Start(), but avoid blocking Pause() or GetChannel() //这里就是要等到startChan完成后才能往下走, for { select { case <-t.channelUpdateChan: continue case <-t.pauseChan: continue case <-t.exitChan: goto exit case <-t.startChan: //也就是要等到topic执行完GetChannel()之后才会接着往下走 } break } t.RLock() //将所有channel通道放在chans中 for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) > 0 && !t.IsPaused() { memoryMsgChan = t.memoryMsgChan //backendChan就是backend暴露给外部的readChan //参考: https://www.cnblogs.com/werben/p/14517781.html backendChan = t.backend.ReadChan() }
// main message loop //这里是守护协程的主体了,也就是这个for会一直跑 for { select { case msg = <-memoryMsgChan: //如果topic有收到新消息 case buf = <-backendChan: //如果消息是从diskqueue里来的,还要解码反序列化成msg msg, err = decodeMessage(buf) if err != nil { t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } case <-t.channelUpdateChan: //如果有新的channel通道 chans = chans[:0] t.RLock() for _, c := range t.channelMap { chans = append(chans, c) } t.RUnlock() if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.pauseChan: //如果channel通道暂停 if len(chans) == 0 || t.IsPaused() { memoryMsgChan = nil backendChan = nil } else { memoryMsgChan = t.memoryMsgChan backendChan = t.backend.ReadChan() } continue case <-t.exitChan: goto exit }
//遍历每一个channel通道,将消息投递过去 for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { //如果是延时消息则将延时消息丢给channel channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } //将消息则将延时消息丢给channel err := channel.PutMessage(chanMsg) if err != nil { t.nsqd.logf(LOG_ERROR, "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } } }
exit: t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)}
复制代码


发布于: 2021 年 03 月 23 日阅读数: 11
用户头像

werben

关注

还未添加个人签名 2018.01.08 加入

还未添加个人简介

评论

发布
暂无评论
nsq源码阅读之Topic