nsq 源码阅读之消息接收和发送
发布于: 2021 年 03 月 23 日
一、TCP Handler
nsqd 里面的 Main 函数。
//nsqd.gofunc (n *NSQD) Main() error { //... n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) //...}
//tcp_server.gofunc TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { //... go func() { handler.Handle(clientConn) wg.Done() }() //...}复制代码
n.tcpServer 实现了 TCPHandler 接口
在 nsqd.New()里面创建的 tcpServer
//nsqd.gofunc New(opts *Options) (*NSQD, error) { //... n.tcpServer = &tcpServer{} //...}复制代码
看下 tcpServer 实现的 Handle 接口里做了什么
func (p *tcpServer) Handle(clientConn net.Conn) {
//... //从socket中读取数据 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err) clientConn.Close() return } protocolMagic := string(buf)
p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol switch protocolMagic { case " V2": //这里是关键,创建了一个protocolV2对象 prot = &protocolV2{nsqd: p.nsqd} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return }
p.conns.Store(clientConn.RemoteAddr(), clientConn)
//开启protocolV2的IOLoop,这是一个客户端连接的“守护”协程 //接收消息和发送消息给客户端,都在这里面处理了 err = prot.IOLoop(clientConn) if err != nil { p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) }
p.conns.Delete(clientConn.RemoteAddr())}复制代码
//protocol_v2.go//因为这里是在Handler里启动的,所以这里其实是为每个客户端都启动了一个Loopfunc (p *protocolV2) IOLoop(conn net.Conn) error { ... clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.nsqd) p.nsqd.AddClient(client.ID, client)
... // messagePump负责从channel的memoryMsgChan和 //backend.ReadChan()中读取消息并将消息推送给client
messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan
//下面这个for循环负责接收客户端消息,比如消费订阅,以及生产消息等 //主要逻辑在p.Exec()里 for { ... //主要逻辑在这个Exec里面 response, err = p.Exec(client, params) ... }
//...}复制代码
二、接收消息
上面已经看到处理客户端消息主要在 protocolV2.Exec()里
这段代码我觉得很好理解了,直接去 protocolV2.PUB()
看客户端生产消息的逻辑,其他的指令先不看
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { if bytes.Equal(params[0], []byte("IDENTIFY")) { return p.IDENTIFY(client, params) } err := enforceTLSPolicy(client, p, params[0]) if err != nil { return nil, err } switch { ... //这里就是客户端生产消息的指令处理了 case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) ... case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) } ... return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))}
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { var err error ... bodyLen, err := readLen(client.Reader, client.lenSlice) ... //读取消息主体 messageBody := make([]byte, bodyLen) _, err = io.ReadFull(client.Reader, messageBody) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") }
... //将消息丢到topic.PutMessage() //PutMessage直接将消息丢到msgChan或者diskqueue了 //关于topic的部分,参考https://www.cnblogs.com/werben/p/14518283.html topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) }
client.PublishedMessage(topicName, 1)
return okBytes, nil}复制代码
三、发送消息
现在来搞清楚,服务器端又在哪里发送消息给 consumer?
在这 protocolV2.messagePump()
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var memoryMsgChan chan *Message var backendMsgChan <-chan []byte var subChannel *Channel var flusherChan <-chan time.Time var sampleRate int32
//客户端执行Sub的时候,会将Channle丢到这个SubEventChan里 //可以去看protocolV2.SUB()函数 subEventChan := client.SubEventChan
//鉴权Identify对应的chan,只能鉴权一次 identifyEventChan := client.IdentifyEventChan
//flushChan赋值outputBufferTicker,默认是250ms时间间隔Flush一次数据 outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout flushed := true close(startedChan)
for { // 检查订阅状态和消息是否可处理状态 if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { //这个memoryMsgChan是channel将消息存在内存中的地方 memoryMsgChan = subChannel.memoryMsgChan //这个backendMsgChan是channel将消息存在磁盘中的地方 backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C } fmt.Printf("werben subChannel nil: %t\n", subChannel == nil)
select { case <-flusherChan: //这个flusherChan就是outputBufferTicker //250ms时间间隔Flush一次数据 client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true case <-client.ReadyStateChan: case subChannel = <-subEventChan: //客户端Sub的时候,会将channel传到这个subEventChan通道, //参考protocolV2.SUB()函数 subEventChan = nil case identifyData := <-identifyEventChan: //客户端提交identify时出发,只能提交一次identify, //参考函数protocolV2.IDENTIFY()
//感觉这里就是在收到这个消息时 //重新启动心跳和flush同步的ticker identifyEventChan = nil
outputBufferTicker.Stop() if identifyData.OutputBufferTimeout > 0 { outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout) }
heartbeatTicker.Stop() heartbeatChan = nil if identifyData.HeartbeatInterval > 0 { heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval) heartbeatChan = heartbeatTicker.C }
if identifyData.SampleRate > 0 { sampleRate = identifyData.SampleRate }
msgTimeout = identifyData.MsgTimeout case <-heartbeatChan: //心跳处理 err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case b := <-backendMsgChan: ... //磁盘消息处理 client.SendingMessage() ... err = p.SendMessage(client, msg) ... flushed = false case msg := <-memoryMsgChan: //将内存消息发送给客户端 client.SendingMessage() ... err = p.SendMessage(client, msg) ... flushed = false case <-client.ExitChan: goto exit } } exit: ... //结束时候关闭心跳和flush同步的ticke heartbeatTicker.Stop() outputBufferTicker.Stop() ...}复制代码
划线
评论
复制
发布于: 2021 年 03 月 23 日阅读数: 15
版权声明: 本文为 InfoQ 作者【werben】的原创文章。
原文链接:【http://xie.infoq.cn/article/65ab71dc8fc11d3b49fe3f8ae】。文章转载请联系作者。
werben
关注
还未添加个人签名 2018.01.08 加入
还未添加个人简介











评论