
Golang 优雅关闭 gRPC 实践

  • 2024-09-27
  • 本文字数:4791 字

    阅读完需:约 16 分钟

本文主要讨论了在 Go 语言中实现 gRPC 服务优雅关闭的技术和方法,从而确保所有连接都得到正确处理,防止数据丢失或损坏。原文: Go Concurrency — Graceful Shutdown


我在上次做技术支持的时候,遇到了一个有趣的错误。我们的服务在 Kubernetes 上运行,有一个容器在重启时不断出现以下错误信息--"Error bind: address already in use"。对于大多数程序员来说,这是一个非常熟悉的错误信息,表明一个进程正试图绑定到另一个进程正在使用的端口上。


我的团队维护一个 Go 服务,启动时会在各自的 goroutine 中生成大量不同的 gRPC 服务。

Goroutine - Go 运行时管理的轻量级线程,运行时只需要几 KB 内存,是 Go 并发性的基础。


package main
type GrpcServerInterface interface{ Run(stopChan chan <-struct{})}
type Server struct { ServerA GrpcServerIface ServerB GrpcServerIface}
func NewServer() *Server { return &NewServer{ ServerA: NewServerA, ServerB: NewServerB, }}
// Start runs each of the grpc serversfunc (s *Server) Start(stopChan <-chan struct{}){ go ServerA.Run(stopChan) go ServerB.Run(stopChan) <- stopChan}
func main() { stopChan := make(chan struct{}) server := NewServer() server.Start(stopChan) // Wait for program to terminate and then signal servers to stop ch := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) <-ch close(stopChan)}

package internal
type ServerA struct { stopChan <-chan struct{}}
// Start runs each of the grpc serversfunc (s *ServerA) Run(stopChan <-chan struct{}){ grpcServer := grpc.NewServer() var listener net.Listener ln, err := net.Listen("tcp", ":8080") if err != nil { // handle error } for { err := grpcServer.Serve(listener) if err != nil { return } } <- stopChan grpcServer.Stop() // Gracefully terminate connections and close listener}

我首先想到这可能是 Docker 或 Kubernetes 运行时的某种偶发性错误。这个错误让我觉得很奇怪,原因如下:1.)查看代码,我们似乎确实在主程序退出时关闭了所有监听,端口怎么可能在重启时仍在使用?2.)错误信息持续出现了几个小时,以至于需要人工干预。我原以为在最坏情况下,操作系统会在尝试重启容器之前为我们清理资源。或许是清理速度不够快?





通道用于在程序之间发送信号,通常以一对一的方式使用,当一个值被发送到某个通道时,只能从该通道读取一次。在我们的代码中,使用的是一对多模式。我们将在 main 中创建的通道传递给多个不同的 goroutine,每个 goroutine 都在等待 main 关闭通道,以便知道何时运行清理函数。

从 Go 1.7 开始,上下文被认为是向多个 goroutine 广播信号的标准方式。虽然这可能不是我们遇到问题的根本原因(我们是在等待通道关闭,而不是试图让每个 goroutine 从通道中读取相同的值),但考虑到这是最佳实践,还是希望采用这种模式。


package internal
type ServerA struct {}
func (s *ServerA) Run(ctx context.Context){ grpcServer := grpc.NewServer() var listener net.Listener ln, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal("ServerA - Failed to create listener") } for { err := grpcServer.Serve(listener) if err != nil { log.Fatal("ServerA - Failed to start server") } } <- ctx.Done() // Clean up logic grpcServer.Stop() // Gracefully terminate connections and close listener}

package main
type GrpcServerInterface interface{ Run(stopChan chan <-struct{})}
type Server struct { ServerA GrpcServerIface ServerB GrpcServerIface stopServer context.CancelFunc serverCtx context.Context}
func NewServer() *Server { return &NewServer{ ServerA: NewServerA, ServerB: NewServerB, }}
// Start runs each of the grpc serversfunc (s *Server) Start(ctx context.Context){ // create new context from parent context s.serverCtx, stopServer := context.WithCancel(ctx) go ServerA.Run(s.serverCtx) go ServerB.Run(s.serverCtx)}
func (s *Server) Stop() { s.stopServer() // close server context to signal spawned goroutines to stop}
func main() { ctx, cancel := context.withCancel() server := NewServer() server.Start(ctx) // Wait for program to terminate and then signal servers to stop ch := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) <-ch cancel() // close main context on terminate signal server.Stop() // clean up server resources}

虽然我们通过取消主上下文向 goroutine 发出了退出信号,但并没有等待它们完成工作。当主程序收到退出信号时,即使我们发送了取消信号,也不能保证它会等待生成的 goroutine 完成工作。因此我们必须明确等待每个 goroutine 完成工作,以避免任何泄漏,为此我们使用了 WaitGroup。

WaitGroup 是一种计数器,用于阻止函数(或者说是 goroutine)的执行,直到其内部计数器变为 0。

package internal
type ServerA struct {}
func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){ wg.Add(1) // Add the current function to the parent's wait group defer wg.Done() // Send "done" signal upon function exit grpcServer := grpc.NewServer() var listener net.Listener ln, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal("ServerA - Failed to create listener") } for { err := grpcServer.Serve(listener) if err != nil { log.Fatal("ServerA - Failed to start server") } } <- ctx.Done() // Clean up logic grpcServer.Stop() // Gracefully terminate connections and close listener fmt.Println("ServerA has stopped")}

package main
type GrpcServerInterface interface{ Run(stopChan chan <-struct{})}
type Server struct { ServerA GrpcServerIface ServerB GrpcServerIface wg sync.WaitGroup stopServer context.CancelFunc serverCtx context.Context}
func NewServer() *Server { return &NewServer{ ServerA: NewServerA, ServerB: NewServerB, }}
// Start runs each of the grpc serversfunc (s *Server) Start(ctx context.Context){ s.serverCtx, stopServer := context.WithCancel(ctx) go ServerA.Run(s.serverCtx, &s.wg) go ServerB.Run(s.serverCtx, &s.wg)}
func (s *Server) Stop() { s.stopServer() // close server context to signal spawned goroutines to stop s.wg.Wait() // wait for all goroutines to exit before returning fmt.Println("Main Server has stopped")}
func main() { ctx, cancel := context.withCancel() server := NewServer() server.Start(ctx) // Wait for program to terminate and then signal servers to stop ch := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) <-ch cancel() // close main context on terminate signal server.Stop() // clean up server resources}


package internal
type ServerA struct { startChan }
func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){ wg.Add(1) // Add the current function to the parent's wait group defer wg.Done() // Send "done" signal upon function exit go func(){ grpcServer := grpc.NewServer() var listener net.Listener ln, err := net.Listen("tcp", ":8080") if err != nil { log.Fatal("ServerA - Failed to create listener") } for { err := grpcServer.Serve(listener) if err != nil { log.Fatal("ServerA - Failed to start server") } } close(s.startChan) // Signal that we are done starting server to exit function // Wait in the background for mina program to exit <- ctx.Done() // Clean up logic grpcServer.Stop() // Gracefully terminate connections and close listener fmt.Println("ServerA has stopped") }() <- s.StartChan // Wait for signal before exiting function fmt.Println("ServerA has started")}

package main
type GrpcServerInterface interface{ Run(stopChan chan <-struct{})}
type Server struct { ServerA GrpcServerIface ServerB GrpcServerIface wg sync.WaitGroup stopServer context.CancelFunc serverCtx context.Context startChan chan <-struct{}}
func NewServer() *Server { return &NewServer{ ServerA: NewServerA, ServerB: NewServerB, startChan: make(chan <-struct{}), }}
// Start runs each of the grpc serversfunc (s *Server) Start(ctx context.Context){ s.serverCtx, stopServer := context.WithCancel(ctx) ServerA.Run(s.serverCtx, &s.wg) ServerB.Run(s.serverCtx, &s.wg) close(s.startChan) <- s.startChan // wait for each server to Start before returning fmt.Println("Main Server has started")}
func (s *Server) Stop() { s.stopServer() // close server context to signal spawned goroutines to stop s.wg.Wait() // wait for all goroutines to exit before returning fmt.Println("Main Server has stopped")}
func main() { ctx, cancel := context.withCancel() server := NewServer() server.Start(ctx) // Wait for program to terminate and then signal servers to stop ch := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, syscall.SIGTERM) <-ch cancel() // close main context on terminate signal server.Stop() // clean up server resources}

不瞒你说,刚开始学习 Go 时,并发会让你头疼不已。调试这个问题让我有机会看到这些概念的实际用途,并强化了之前不确定的主题,建议你自己尝试简单的示例!

你好,我是俞凡,在 Motorola 做过研发,现在在 Mavenir 做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI 等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!

发布于: 12 分钟前阅读数: 5



公众号:DeepNoMind 2017-10-18 加入

俞凡,Mavenir Systems研发总监,关注高可用架构、高性能服务、5G、人工智能、区块链、DevOps、Agile等。公众号:DeepNoMind

