写点什么

nsq 源码阅读之消息接收和发送

用户头像
werben
关注
发布于: 2021 年 03 月 23 日

一、TCP Handler

nsqd 里面的 Main 函数。

//nsqd.gofunc (n *NSQD) Main() error {	//...	n.waitGroup.Wrap(func() {			exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))		})	//...}
//tcp_server.gofunc TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { //... go func() { handler.Handle(clientConn) wg.Done() }() //...}
复制代码


n.tcpServer 实现了 TCPHandler 接口

在 nsqd.New()里面创建的 tcpServer

//nsqd.gofunc New(opts *Options) (*NSQD, error) {	//...	n.tcpServer = &tcpServer{}	//...}
复制代码


看下 tcpServer 实现的 Handle 接口里做了什么

func (p *tcpServer) Handle(clientConn net.Conn) {
//... //从socket中读取数据 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err) clientConn.Close() return } protocolMagic := string(buf)
p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol switch protocolMagic { case " V2": //这里是关键,创建了一个protocolV2对象 prot = &protocolV2{nsqd: p.nsqd} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return }
p.conns.Store(clientConn.RemoteAddr(), clientConn)
//开启protocolV2的IOLoop,这是一个客户端连接的“守护”协程 //接收消息和发送消息给客户端,都在这里面处理了 err = prot.IOLoop(clientConn) if err != nil { p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) }
p.conns.Delete(clientConn.RemoteAddr())}
复制代码


//protocol_v2.go//因为这里是在Handler里启动的,所以这里其实是为每个客户端都启动了一个Loopfunc (p *protocolV2) IOLoop(conn net.Conn) error {	...	clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1)	client := newClientV2(clientID, conn, p.nsqd)	p.nsqd.AddClient(client.ID, client)
... // messagePump负责从channel的memoryMsgChan和 //backend.ReadChan()中读取消息并将消息推送给client
messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan
//下面这个for循环负责接收客户端消息,比如消费订阅,以及生产消息等 //主要逻辑在p.Exec()里 for { ... //主要逻辑在这个Exec里面 response, err = p.Exec(client, params) ... }
//...}
复制代码


二、接收消息

上面已经看到处理客户端消息主要在 protocolV2.Exec()里

这段代码我觉得很好理解了,直接去 protocolV2.PUB()

看客户端生产消息的逻辑,其他的指令先不看

func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {	if bytes.Equal(params[0], []byte("IDENTIFY")) {		return p.IDENTIFY(client, params)	}	err := enforceTLSPolicy(client, p, params[0])	if err != nil {		return nil, err	}	switch {	...        //这里就是客户端生产消息的指令处理了	case bytes.Equal(params[0], []byte("PUB")):		return p.PUB(client, params)	...	case bytes.Equal(params[0], []byte("SUB")):		return p.SUB(client, params)	}	...	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))}
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { var err error ... bodyLen, err := readLen(client.Reader, client.lenSlice) ... //读取消息主体 messageBody := make([]byte, bodyLen) _, err = io.ReadFull(client.Reader, messageBody) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") }
... //将消息丢到topic.PutMessage() //PutMessage直接将消息丢到msgChan或者diskqueue了 //关于topic的部分,参考https://www.cnblogs.com/werben/p/14518283.html topic := p.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) }
client.PublishedMessage(topicName, 1)
return okBytes, nil}
复制代码


三、发送消息


现在来搞清楚,服务器端又在哪里发送消息给 consumer?

在这 protocolV2.messagePump()


func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {	var err error	var memoryMsgChan chan *Message	var backendMsgChan <-chan []byte	var subChannel *Channel	var flusherChan <-chan time.Time	var sampleRate int32
//客户端执行Sub的时候,会将Channle丢到这个SubEventChan里 //可以去看protocolV2.SUB()函数 subEventChan := client.SubEventChan
//鉴权Identify对应的chan,只能鉴权一次 identifyEventChan := client.IdentifyEventChan
//flushChan赋值outputBufferTicker,默认是250ms时间间隔Flush一次数据 outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout flushed := true close(startedChan)
for { // 检查订阅状态和消息是否可处理状态 if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { //这个memoryMsgChan是channel将消息存在内存中的地方 memoryMsgChan = subChannel.memoryMsgChan //这个backendMsgChan是channel将消息存在磁盘中的地方 backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C } fmt.Printf("werben subChannel nil: %t\n", subChannel == nil)
select { case <-flusherChan: //这个flusherChan就是outputBufferTicker //250ms时间间隔Flush一次数据 client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true case <-client.ReadyStateChan: case subChannel = <-subEventChan: //客户端Sub的时候,会将channel传到这个subEventChan通道, //参考protocolV2.SUB()函数 subEventChan = nil case identifyData := <-identifyEventChan: //客户端提交identify时出发,只能提交一次identify, //参考函数protocolV2.IDENTIFY()
//感觉这里就是在收到这个消息时 //重新启动心跳和flush同步的ticker identifyEventChan = nil
outputBufferTicker.Stop() if identifyData.OutputBufferTimeout > 0 { outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout) }
heartbeatTicker.Stop() heartbeatChan = nil if identifyData.HeartbeatInterval > 0 { heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval) heartbeatChan = heartbeatTicker.C }
if identifyData.SampleRate > 0 { sampleRate = identifyData.SampleRate }
msgTimeout = identifyData.MsgTimeout case <-heartbeatChan: //心跳处理 err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case b := <-backendMsgChan: ... //磁盘消息处理 client.SendingMessage() ... err = p.SendMessage(client, msg) ... flushed = false case msg := <-memoryMsgChan: //将内存消息发送给客户端 client.SendingMessage() ... err = p.SendMessage(client, msg) ... flushed = false case <-client.ExitChan: goto exit } } exit: ... //结束时候关闭心跳和flush同步的ticke heartbeatTicker.Stop() outputBufferTicker.Stop() ...}
复制代码


发布于: 2021 年 03 月 23 日阅读数: 15
用户头像

werben

关注

还未添加个人签名 2018.01.08 加入

还未添加个人简介

评论

发布
暂无评论
nsq源码阅读之消息接收和发送