写点什么

nsq 源码阅读之 nsqd 总体流程

用户头像
werben
关注
发布于: 2021 年 03 月 23 日

看了一阵子 nsq 源码,出去细节大体的流程基本算是看明白了

下面来总结一下,看下面的图



个人觉得其实只要搞清楚几个 Loop,我称为“守护”协程,也就是一直在 for... select... 里面跑着不退出的几个关键函数。

看上面的图,有几个呢?


从底层依次往上看


  1. diskqueue.IoLoop(): 我觉得这个第一个要看懂,topic 和 channel 都会用到他来作为磁盘存储消息,关键消息在里面怎么进怎么出,这里也要搞清楚。


  1. Topic.MessagePump(): 不断的将 topci 收到的消息,分发给底下所有的 Channel。


  1. protocolV2.IOLoop(): 处理客户端通过 socket 传上来的指令,然后分发处理指令。比如收到 PUB 指令,将消息丢给 topic


  1. protocolV2.messagePump(): 不断的将 Channel.memoryMsgChan 或者 diskqueue 里面的消息通过 socket 推给客户端订阅者


  1. nsqd.queueScanLoop(): 其实看上面的协程都是成对出现的,也就是消息有进去协程,就应该有消息出来的协程。但是 Channel 有消息进去的协程,也就是 topic 的 MessagePump 这个协程负责将消息给 Channel,那么 Channel 消息出来的协程就是这个 nsqd.queueScanLoop()了。他负责将 Inflight 也就是处理中的消息,推到 channel.memoryMsgChan 或者 diskqueue 中。


一个完整的消息进出流程大概就是这样子


客户端生产者 -> protocolV2.IOLoop() 收到消息

-> Topic.PutMessage() 消息传到 Topic

-> Topic.MessagePump() 消息传到 Channel.InFlight

-> nsqd.queueScanLoop()

-> 消息传到 Channel.memoryMsgChan

-> protocolV2.messagePump()

-> 客户端订阅者


这里有一个很容易搞晕的地方,就是 Channel 中放消息其实有两块

type Channel struct {	... //这一块是存储和持久化,消息最终是从这里出去	backend BackendQueue	memoryMsgChan chan *Message		... //这一块可以理解成一个缓存,意思在处理中消息,消息首先到这里	deferredMessages map[MessageID]*pqueue.Item	deferredPQ       pqueue.PriorityQueue	deferredMutex    sync.Mutex	inFlightMessages map[MessageID]*Message	inFlightPQ       inFlightPqueue	inFlightMutex    sync.Mutex}
复制代码


发布于: 2021 年 03 月 23 日阅读数: 13
用户头像

werben

关注

还未添加个人签名 2018.01.08 加入

还未添加个人简介

评论

发布
暂无评论
nsq源码阅读之nsqd总体流程