写点什么

nsqlookupd:高性能消息中间件 NSQ 解析

发布于: 2021 年 04 月 07 日

​​摘要: 本篇将会结合源码介绍 nsqlookupd 的实现细节。


本文分享自华为云社区《高性能消息中间件NSQ 解析-nsqlookupd 实现细节介绍》,原文作者:aoho 。


本篇将会结合源码介绍 nsqlookupd 的实现细节。nsqlookupd 主要流程与 nsqd 执行逻辑相似,区别在于具体运行的任务不同。


nsqlookupd 是 nsq 管理集群拓扑信息以及用于注册和发现 nsqd 服务。所以,也可以把 nsqlookupd 理解为注册发现服务。当 nsq 集群中有多个 nsqlookupd 服务时,因为每个 nsqd 都会向所有的 nsqlookupd 上报本地信息,因此 nsqlookupd 具有最终一致性。

入口函数


在 nsq/apps/nsqlookupd/main.go 可以找到执行入口文件。


// 位于apps/nsqlookupd/main.go:45func main() {  prg := &program{}  if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {    logFatal("%s", err)  }}
func (p *program) Init(env svc.Environment) error { if env.IsWindowsService() { dir := filepath.Dir(os.Args[0]) return os.Chdir(dir) } return nil}
func (p *program) Start() error { opts := nsqlookupd.NewOptions()
flagSet := nsqlookupdFlagSet(opts) ...}
复制代码


​同样,通过第三方 svc 包进行优雅的后台进程管理,svc.Run() -> svc.Init() -> svc.Start(),启动 nsqlookupd 实例。


// 位于 apps/nsqlookupd/main.go:80options.Resolve(opts, flagSet, cfg)  nsqlookupd, err := nsqlookupd.New(opts)  if err != nil {    logFatal("failed to instantiate nsqlookupd", err)  }  p.nsqlookupd = nsqlookupd
go func() { err := p.nsqlookupd.Main() if err != nil { p.Stop() os.Exit(1) } }()
复制代码


初始化配置参数(优先级:flagSet-命令行参数 > cfg-配置文件 > opts-默认值),开启协程,进入 nsqlookupd.Main() 主函数。

监听请求


我们来看下 nsqlookupd 是如何监听请求的,代码实现如下:


// 位于 nsqlookupd/nsqlookupd.go:53func (l *NSQLookupd) Main() error {  ctx := &Context{l}
exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { once.Do(func() { if err != nil { l.logf(LOG_FATAL, "%s", err) } exitCh <- err }) }
tcpServer := &tcpServer{ctx: ctx} l.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(l.tcpListener, tcpServer, l.logf)) }) httpServer := newHTTPServer(ctx) l.waitGroup.Wrap(func() { exitFunc(http_api.Serve(l.httpListener, httpServer, "HTTP", l.logf)) })
err := <-exitCh return err}
复制代码


​开启 goroutine 执行 tcpServer, httpServer,分别监听 nsqd, nsqadmin 的客户端请求。

处理请求


// 位于 internal/protocol/tcp_server.go:17func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {  logf(lg.INFO, "TCP: listening on %s", listener.Addr())
for { clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } go handler.Handle(clientConn) }
logf(lg.INFO, "TCP: closing %s", listener.Addr())
return nil}
复制代码


​TCPServer 循环监听客户端请求,建立长连接进行通信,并开启 handler 处理每一个客户端 conn。

装饰 http 路由


httpServer 通过 http_api.Decorate 装饰器实现对各 http 路由进行 handler 装饰,如加 log 日志、V1 协议版本号的统一格式输出等;


func newHTTPServer(ctx *Context) *httpServer {  log := http_api.Log(ctx.nsqlookupd.logf)
router := httprouter.New() router.HandleMethodNotAllowed = true router.PanicHandler = http_api.LogPanicHandler(ctx.nsqlookupd.logf) router.NotFound = http_api.LogNotFoundHandler(ctx.nsqlookupd.logf) router.MethodNotAllowed = http_api.LogMethodNotAllowedHandler(ctx.nsqlookupd.logf) s := &httpServer{ ctx: ctx, router: router, }
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText)) router.Handle("GET", "/info", http_api.Decorate(s.doInfo, log, http_api.V1))
// v1 negotiate router.Handle("GET", "/debug", http_api.Decorate(s.doDebug, log, http_api.V1)) router.Handle("GET", "/lookup", http_api.Decorate(s.doLookup, log, http_api.V1)) router.Handle("GET", "/topics", http_api.Decorate(s.doTopics, log, http_api.V1)) router.Handle("GET", "/channels", http_api.Decorate(s.doChannels, log, http_api.V1))}
复制代码

处理客户端命令


tcp 解析 V1 协议,内部协议封装的 prot.IOLoop(conn) 进行循环处理客户端命令,直到客户端命令全部解析处理完毕才关闭连接。


var prot protocol.Protocol  switch protocolMagic {  case "  V1":    prot = &LookupProtocolV1{ctx: p.ctx}  default:    protocol.SendResponse(clientConn, []byte("E_BAD_PROTOCOL"))    clientConn.Close()    p.ctx.nsqlookupd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",      clientConn.RemoteAddr(), protocolMagic)    return  }
err = prot.IOLoop(clientConn)
复制代码

执行命令


通过内部协议进行 p.Exec(执行命令)、p.SendResponse(返回结果),保证每个 nsqd 节点都能正确的进行服务注册(register)与注销(unregister),并进行心跳检测(ping)节点的可用性,确保客户端取到的 nsqd 节点列表都是最新可用的。


for {    line, err = reader.ReadString('\n')    if err != nil {      break    }
line = strings.TrimSpace(line) params := strings.Split(line, " ")
var response []byte response, err = p.Exec(client, reader, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } _, sendErr := protocol.SendResponse(client, []byte(err.Error())) if sendErr != nil { p.ctx.nsqlookupd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } continue }
if response != nil { _, err = protocol.SendResponse(client, response) if err != nil { break } } }
conn.Close()
复制代码


​nsqlookupd 服务同时开启 tcp 和 http 两个监听服务,nsqd 会作为客户端,连上 nsqlookupd 的 tcp 服务,并上报自己的 topic 和 channel 信息,以及通过心跳机制判断 nsqd 状态;还有个 http 服务提供给 nsqadmin 获取集群信息。

小结


本文主要介绍 nsqlookupd 的实现,nsqlookupd 同样是一个守护进程,负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题( topic )的生产者,并且 nsqd 节点广播话题(topic)和通道( channel )信息。有两个接口: TCP 接口, nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。


下一篇文章,将会继续介绍 nsq 中其他模块实现的细节。

推荐阅读


高性能消息中间件 NSQ 解析-nsqd 实现细节介绍

高性能消息中间件 NSQ 解析-整体介绍

高性能消息中间件 NSQ 解析-应用实践

微服务架构中使用 ELK 进行日志采集以及统一处理

没有 try-catch,该如何处理 Go 错误异常?


点击关注,第一时间了解华为云新鲜技术~

发布于: 2021 年 04 月 07 日阅读数: 16
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
nsqlookupd:高性能消息中间件 NSQ 解析