写点什么

Go 程序如何实现优雅退出?来看看 K8s 是怎么做的——下篇

作者:江湖十年
  • 2024-08-28
    浙江
  • 本文字数:14826 字

    阅读完需:约 49 分钟

Go 程序如何实现优雅退出?来看看 K8s 是怎么做的——下篇

本文带大家一起来详细学习下 Go 中的优雅退出,由于文章过长,拆分成上下两篇,本文为下篇。

K8s 的优雅退出

现在,我们已经掌握了 Go 中 HTTP Server 程序如何实现优雅退出,是时候看一看 K8s 中提供的一种更为优雅的优雅退出退出方案了😄。


这要从 K8s API Server 启动入口说起:


https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/apiserver.go


func main() {  command := app.NewAPIServerCommand()  code := cli.Run(command)  os.Exit(code)}
复制代码


K8s API Server 启动入口代码非常简单,我们可以进入 app.NewAPIServerCommand() 查看更多细节:


https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/app/server.go#L122


// NewAPIServerCommand creates a *cobra.Command object with default parametersfunc NewAPIServerCommand() *cobra.Command {  ...  cmd := &cobra.Command{    ...    RunE: func(cmd *cobra.Command, args []string) error {      ...      return Run(cmd.Context(), completedOptions)    },    ...  }  cmd.SetContext(genericapiserver.SetupSignalContext())
... return cmd}
复制代码


NewAPIServerCommand 函数中,我们要关注的核心代码只有两行:


一行是 cmd.SetContext(genericapiserver.SetupSignalContext()),这是在为 cmd 对象设置 ctx 属性。


另一行是 RunE 属性中最后一行代码 Run(cmd.Context(), completedOptions),这里是启动程序,并使用了 cmd 对象的 ctx 属性。


很明显,K8s 使用了 Go 语言中流行的 Cobra 命令行框架作为程序的启动框架,Cobra 提供了如下两个方法可以设置和获取 Context


func (c *Command) Context() context.Context {  return c.ctx}
func (c *Command) SetContext(ctx context.Context) { c.ctx = ctx}
复制代码


NOTE:如果你对 Cobra 不太熟悉,可以参考我的另一篇文章《Go 语言现代命令行框架 Cobra 详解》


这里的 ctx 就是串联起 K8s 实现优雅退出的核心对象。


首先通过 genericapiserver.SetupSignalContext() 获取到一个 context.Context 对象,根据函数名称可以猜测到它可能跟信号有关。


对于 Run(cmd.Context(), completedOptions) 方法的调用,由于嵌套层级比较深,逻辑比较复杂,我就不把整个代码调用链都贴出来讲了。总之,这个启动过程最终可以定位到 preparedGenericAPIServer.RunWithContext 这个方法的执行。在 RunWithContext 方法内部的第一行代码 stopCh := ctx.Done() 是重点,它拿到了一个控制程序退出时机的 channel(这跟我们前文讲解的优雅退出示例中 quit := make(chan os.Signal, 1) 变量作用相同),而这个 ctx 实际上就是 genericapiserver.SetupSignalContext() 的返回值,如果你感兴趣可以详细研究下这个 stopCh 的使用过程。


我们直接去分析 genericapiserver.SetupSignalContext() 的实现:


https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal.go


package server
import ( "context" "os" "os/signal")
var onlyOneSignalHandler = make(chan struct{})var shutdownHandler chan os.Signal
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned// which is closed on one of these signals. If a second signal is caught, the program// is terminated with exit code 1.// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can// be called once.func SetupSignalHandler() <-chan struct{} { return SetupSignalContext().Done()}
// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can// be called once.func SetupSignalContext() context.Context { close(onlyOneSignalHandler) // panics when called twice
shutdownHandler = make(chan os.Signal, 2)
ctx, cancel := context.WithCancel(context.Background()) signal.Notify(shutdownHandler, shutdownSignals...) go func() { <-shutdownHandler cancel() <-shutdownHandler os.Exit(1) // second signal. Exit directly. }()
return ctx}
// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)// This returns whether a handler was notifiedfunc RequestShutdown() bool { if shutdownHandler != nil { select { case shutdownHandler <- shutdownSignals[0]: return true default: } }
return false}
复制代码


这里代码不多,但却相当精妙,可以一窥 K8s 设计之优雅。


我们从 SetupSignalContext 函数开始分析。


SetupSignalContext 函数第一行代码,通过调用 close(onlyOneSignalHandler) 来确保在整个程序中只调用一次 SetupSignalContext 函数,调用多次则直接 panic。这能强制调用方写出正确的代码,避免出现意料之外的情况。


shutdownHandler 是一个包含了两个缓冲区的 channel,而不像我们定义的 quit := make(chan os.Signal, 1) 那样只有一个缓冲区大小。


我们前文讲过,通过 signal.Notify(c chan<- os.Signal, sig ...os.Signal) 函数注册所关注的信号后,signal 包在给 c 发送信号时不会阻塞。因为我们要接收两次退出信号,所以 shutdownHandler 缓冲区大小为 2


这也是 SetupSignalContext 函数的精髓所在,它实现了收到一次 SIGINT/SIGTERM 信号,程序优雅退出,收到两次 SIGINT/SIGTERM 信号,程序强制退出的功能。


代码片段如下:


ctx, cancel := context.WithCancel(context.Background())signal.Notify(shutdownHandler, shutdownSignals...)go func() {  <-shutdownHandler  cancel()  <-shutdownHandler  os.Exit(1) // second signal. Exit directly.}()
复制代码


这里使用一个带有取消功能的 Context,当第一次收到信号时,就调用 cancel() 取消这个 ctx。而这个 ctx 会作为函数返回值返给调用方,调用方拿到它,就可以在需要的地方调用 <-ctx.Done() 来等待退出信号了。这就是 preparedGenericAPIServer.RunWithContext 方法中调用 stopCh := ctx.Done() 拿到 channel,然后等待 <-stopCh 退出信号的逻辑了。


这里用到的 shutdownSignals 变量,定义在 signal_posix.go 文件中:


https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal_posix.go


package server
import ( "os" "syscall")
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
复制代码


shutdownSignals 是一个保存了两个信号的切片对象。


os.Interrupt 实际上是一个变量,它的值等于 syscall.SIGINT


// The only signal values guaranteed to be present in the os package on all// systems are os.Interrupt (send the process an interrupt) and os.Kill (force// the process to exit). On Windows, sending os.Interrupt to a process with// os.Process.Signal is not implemented; it will return an error instead of// sending a signal.var (  Interrupt Signal = syscall.SIGINT  Kill      Signal = syscall.SIGKILL)
复制代码


这里为实现优雅退出,监控了两个信号 SIGINTSIGTERM,并没有监控 SIGQUIT 信号。不过这已经足够用了,根据我的经验,绝大多数情况下我们都会使用 Ctrl + C 终止程序,而非使用 Ctrl + \


SetupSignalHandler 函数内部调用了 SetupSignalContext 函数,它唯一的作用就是直接返回给调用方 ctx.Done() 所返回的 channel,以此来方便调用方。


RequestShutdown 函数可以主动触发退出事件信号(SIGTERM/SIGINT),返回值表示是否触发成功。


现在将 K8s 优雅退出方案集成进我们的 net/http 优雅退出示例程序中:


package main
import ( "context" "errors" "log" "net/http" "time"
genericapiserver "k8s.io/apiserver/pkg/server")
func main() { srv := &http.Server{ Addr: ":8000", }
http.HandleFunc("/sleep", func(w http.ResponseWriter, r *http.Request) { duration, err := time.ParseDuration(r.FormValue("duration")) if err != nil { http.Error(w, err.Error(), 400) return }
time.Sleep(duration) _, _ = w.Write([]byte("Welcome HTTP Server")) })
go func() { if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { log.Fatalf("HTTP server error: %v", err) } log.Println("Stopped serving new connections") }()
// NOTE: 只需要替换这 3 行代码,Gin 版本同理 // quit := make(chan os.Signal, 1) // signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) // <-quit
// 可以直接丢弃,context.Context.Done() 返回的就是普通空结构体 <-genericapiserver.SetupSignalHandler()
log.Println("Shutdown Server...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()
// We received an SIGINT/SIGTERM signal, shut down. if err := srv.Shutdown(ctx); err != nil { // Error from closing listeners, or context timeout: log.Printf("HTTP server Shutdown: %v", err) } log.Println("HTTP server graceful shutdown completed")}
复制代码


我们只需要将如下 3 行代码:


quit := make(chan os.Signal, 1)signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)<-quit
复制代码


替换成 K8s 提供的 SetupSignalHandler 函数调用即可:


<-genericapiserver.SetupSignalHandler()
复制代码


其他代码都不用修改。


执行示例程序,按一次 Ctrl + C 测试优雅退出:


$ go build -o main main.go && ./main^C2024/08/22 09:24:46 Shutdown Server...2024/08/22 09:24:46 Stopped serving new connections2024/08/22 09:24:49 HTTP server graceful shutdown completed$ echo $?0
复制代码


$ curl "http://localhost:8000/sleep?duration=5s"Welcome HTTP Server
复制代码


执行示例程序,按两次 Ctrl + C 测试强制退出:


$ go build -o main main.go && ./main^C2024/08/22 09:25:28 Shutdown Server...2024/08/22 09:25:28 Stopped serving new connections^C$ echo $?                                       1
复制代码


$ curl "http://localhost:8000/sleep?duration=5s"curl: (52) Empty reply from server
复制代码


完美,K8s 为我们提供了优雅退出的新思路。这样在开发环境,为了方便调试,我们可以无需等待优雅退出,只要连续发送两次 SIGTERM/SIGINT 即可强制退出程序。在生产环境发送一次 SIGTERM/SIGINT 信号等待优雅退出。


使用 Gin 框架开发的 Web 程序也可以这样修改,你可以自行尝试。

gRPC 的优雅退出

gRPC Server 优雅退出

接下来我们一起看下 gRPC Server 程序如何实现优雅退出。


示例程序目录结构如下:


$ tree grpcgrpc├── Makefile├── client│   └── main.go├── pb│   ├── helloworld.pb.go│   ├── helloworld.proto│   └── helloworld_grpc.pb.go└── server    └── main.go
复制代码


helloworld.proto 中定义了 gRPC Server 支持的服务接口:


syntax = "proto3";
option go_package = ".;pb";
// The greeting service definition.service Greeter { // Sends a greeting rpc SayHello (HelloRequest) returns (HelloReply) {}}
// The request message containing the user's name.message HelloRequest { string name = 1; string duration = 2;}
// The response message containing the greetingsmessage HelloReply { string message = 1;}
复制代码


server/main.go 中 Server 端代码如下:


// Package main implements a server for Greeter service.package main
import ( "context" "flag" "fmt" "log" "net" "time"
"google.golang.org/grpc" genericapiserver "k8s.io/apiserver/pkg/server"
"github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb")
var ( port = flag.Int("port", 50051, "The server port"))
// server is used to implement helloworld.GreeterServer.type server struct { pb.UnimplementedGreeterServer}
// SayHello implements helloworld.GreeterServerfunc (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetName())
duration, _ := time.ParseDuration(in.GetDuration()) time.Sleep(duration)
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil}
func main() { flag.Parse() lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) }
s := grpc.NewServer() pb.RegisterGreeterServer(s, &server{}) log.Printf("server listening at %v", lis.Addr())
go func() { if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) } }()
<-genericapiserver.SetupSignalHandler() log.Printf("Shutdown Server...") s.GracefulStop() log.Println("gRPC server graceful shutdown completed")}
复制代码


这与 HTTP Server 的优雅退出逻辑基本相同,同样由 grpc 包提供了优雅退出方法 GracefulStop


在接收到退出信号以后,调用 s.GracefulStop() 方法即可实现优雅退出。可以发现,这其实是一个优雅退出的套路。


client/main.go 中 Client 端代码如下:


// Package main implements a client for Greeter service.package main
import ( "context" "flag" "log" "time"
"google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure"
"github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb")
const ( defaultName = "world")
var ( addr = flag.String("addr", "localhost:50051", "the address to connect to") name = flag.String("name", defaultName, "Name to greet"))
func main() { flag.Parse() // Set up a connection to the server. conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { log.Fatalf("did not connect: %v", err) } defer conn.Close() c := pb.NewGreeterClient(conn)
// Contact the server and print out its response. ctx, cancel := context.WithTimeout(context.Background(), time.Second*60) defer cancel() r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name, Duration: "10s"}) if err != nil { log.Fatalf("could not greet: %v", err) } log.Printf("Greeting: %s", r.GetMessage())}
复制代码


执行示例程序,测试优雅退出逻辑:


# 执行服务端代码$ go build -o main main.go && ./main2024/08/22 09:26:17 server listening at [::]:500512024/08/22 09:26:24 Received: world^C2024/08/22 09:26:26 Shutdown Server...2024/08/22 09:26:34 gRPC server graceful shutdown completed$ echo $?0
复制代码


# 执行客户端代码$ go build -o main main.go && ./main2024/08/22 09:26:34 Greeting: Hello world
复制代码


优雅退出生效。


既然 gRPC Server 中的优雅退出方案已经介绍完了,同讲解 HTTP Server 优雅退出一样,接下来我再带你一起深入了解一下 GracefulStop 的源码是如何实现的。

GracefulStop 源码

GracefulStop 方法源码如下:


https://github.com/grpc/grpc-go/blob/v1.65.0/server.go#L1882


// GracefulStop stops the gRPC server gracefully. It stops the server from// accepting new connections and RPCs and blocks until all the pending RPCs are// finished.func (s *Server) GracefulStop() {  s.stop(true)}
func (s *Server) stop(graceful bool) { s.quit.Fire() defer s.done.Fire()
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) }) s.mu.Lock() s.closeListenersLocked() // Wait for serving threads to be ready to exit. Only then can we be sure no // new conns will be created. s.mu.Unlock() s.serveWG.Wait()
s.mu.Lock() defer s.mu.Unlock()
if graceful { s.drainAllServerTransportsLocked() } else { s.closeServerTransportsLocked() }
for len(s.conns) != 0 { s.cv.Wait() } s.conns = nil
if s.opts.numServerWorkers > 0 { // Closing the channel (only once, via grpcsync.OnceFunc) after all the // connections have been closed above ensures that there are no // goroutines executing the callback passed to st.HandleStreams (where // the channel is written to). s.serverWorkerChannelClose() }
if graceful || s.opts.waitForHandlers { s.handlersWG.Wait() }
if s.events != nil { s.events.Finish() s.events = nil }}
复制代码


GracefulStop 方法直接调用了 s.stop(true) 方法。


stop 方法的 graceful 参数用来决定是否启用优雅退出,传递 true 表示优雅退出,传递 false 表示强制退出。


stop 方法第一段代码逻辑如下:


s.quit.Fire()defer s.done.Fire()
复制代码


Server 对象的 quitdone 属性类型都为 *grpcsync.Event,前者用来标记 gRPC Server 正在执行退出流程,后者标记退出完成。


Event 定义如下:


https://github.com/grpc/grpc-go/blob/v1.65.0/internal/grpcsync/event.go


// Event represents a one-time event that may occur in the future.type Event struct {  fired int32  c     chan struct{}  o     sync.Once}
// Fire causes e to complete. It is safe to call multiple times, and// concurrently. It returns true iff this call to Fire caused the signaling// channel returned by Done to close.func (e *Event) Fire() bool { ret := false e.o.Do(func() { atomic.StoreInt32(&e.fired, 1) close(e.c) ret = true }) return ret}
// Done returns a channel that will be closed when Fire is called.func (e *Event) Done() <-chan struct{} { return e.c}
// HasFired returns true if Fire has been called.func (e *Event) HasFired() bool { return atomic.LoadInt32(&e.fired) == 1}
// NewEvent returns a new, ready-to-use Event.func NewEvent() *Event { return &Event{c: make(chan struct{})}}
复制代码


可以发现,*Event 对象的 Fire 方法就是将 fired 字段值置为 1,并且关闭类型为 channel 的字段 c,所以其实只要调用了 Fire,那么调用 Done 方法将立即返回,调用 HasFired 方法就是在判断 fired 字段值是否为 1(即是否调用过 Fire 方法)。


s.quit.Fire() 代码被调用以后,Serve 方法就能够感知到当前服务正在退出,接下来就不会再接收新的请求进来了。


Serve 方法源码如下:


// Serve accepts incoming connections on the listener lis, creating a new// ServerTransport and service goroutine for each. The service goroutines// read gRPC requests and then call the registered handlers to reply to them.// Serve returns when lis.Accept fails with fatal errors.  lis will be closed when// this method returns.// Serve will return a non-nil error unless Stop or GracefulStop is called.//// Note: All supported releases of Go (as of December 2023) override the OS// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive// with OS defaults for keepalive time and interval, callers need to do the// following two things://   - pass a net.Listener created by calling the Listen method on a//     net.ListenConfig with the `KeepAlive` field set to a negative value. This//     will result in the Go standard library not overriding OS defaults for TCP//     keepalive interval and time. But this will also result in the Go standard//     library not enabling TCP keepalives by default.//   - override the Accept method on the passed in net.Listener and set the//     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.func (s *Server) Serve(lis net.Listener) error {  s.mu.Lock()  s.printf("serving")  s.serve = true  if s.lis == nil {    // Serve called after Stop or GracefulStop.    s.mu.Unlock()    lis.Close()    return ErrServerStopped  }
s.serveWG.Add(1) defer func() { s.serveWG.Done() // 判断当前服务是否正在退出 if s.quit.HasFired() { // Stop or GracefulStop called; block until done and return nil. <-s.done.Done() } }()
ls := &listenSocket{ Listener: lis, channelz: channelz.RegisterSocket(&channelz.Socket{ SocketType: channelz.SocketTypeListen, Parent: s.channelz, RefName: lis.Addr().String(), LocalAddr: lis.Addr(), SocketOptions: channelz.GetSocketOption(lis)}, ), } s.lis[ls] = true
defer func() { s.mu.Lock() if s.lis != nil && s.lis[ls] { ls.Close() delete(s.lis, ls) } s.mu.Unlock() }()
s.mu.Unlock() channelz.Info(logger, ls.channelz, "ListenSocket created")
var tempDelay time.Duration // how long to sleep on accept failure for { rawConn, err := lis.Accept() if err != nil { if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } s.mu.Lock() s.printf("Accept error: %v; retrying in %v", err, tempDelay) s.mu.Unlock() timer := time.NewTimer(tempDelay) select { case <-timer.C: // 判断当前服务是否正在退出 case <-s.quit.Done(): timer.Stop() return nil } continue } s.mu.Lock() s.printf("done serving; Accept = %v", err) s.mu.Unlock()
// 判断当前服务是否正在退出 if s.quit.HasFired() { return nil } return err } tempDelay = 0 // Start a new goroutine to deal with rawConn so we don't stall this Accept // loop goroutine. // // Make sure we account for the goroutine so GracefulStop doesn't nil out // s.conns before this conn can be added. s.serveWG.Add(1) go func() { s.handleRawConn(lis.Addr().String(), rawConn) s.serveWG.Done() }() }}
复制代码


我们可以直接跳到 for 循环部分的代码段,每次通过 rawConn, err := lis.Accept() 接收到一个新的请求进来,都会使用 if s.quit.HasFired() 来判断当前服务是否正在退出,如果返回结果为 true,则 Serve 方法直接退出。


此时 Serve 方法的 defer 语句开始执行,这里会再次使用 if s.quit.HasFired() 判断当前服务是否正在退出(之所以判断两次,因为 Serve 方法也可能由于其他原因导致退出,进入 defer 逻辑),如果是,则调用 <-s.done.Done() 阻塞在这里,直到 GracefulStop 优雅退出逻辑执行完成。


此外,for 循环内部的 select 语句中,有一个 case 调用了 <-s.quit.Done(),也是在判断当前服务是否正在退出,如果是,则调用 timer.Stop() 清理定时器后,Serve 方法直接退出。


我们接着往下看:


s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })s.mu.Lock()s.closeListenersLocked()// Wait for serving threads to be ready to exit.  Only then can we be sure no// new conns will be created.s.mu.Unlock()s.serveWG.Wait()
复制代码


这里的 channelz 是 gRPC 的一个监控工具,用于跟踪 gRPC 的内部状态,RemoveEntry 会移除该服务器的监控数据。


s.closeListenersLocked() 方法是不是很熟悉,这与 net/http 包中的 Shutdown 方法命名都一样,作用也就不言而喻了。


定义如下:


// s.mu must be held by the caller.func (s *Server) closeListenersLocked() {  for lis := range s.lis {    lis.Close()  }  s.lis = nil}
复制代码


对于 s.serveWG.Wait() 这行代码,根据这个操作的属性名和方法名可以猜到,serveWG 明显是 sync.WaitGroup 类型。


既然有 Wait(),那就应该会有 Add(1) 操作。其正在前文中贴出 Serve 代码中。


刚进入 Serve 方法时,就会调用 s.serveWG.Add(1)Serve 方法退出时执行 s.serveWG.Done()


s.serveWG.Add(1)defer func() {  s.serveWG.Done()  if s.quit.HasFired() {    // Stop or GracefulStop called; block until done and return nil.    <-s.done.Done()  }}()
复制代码


并且,在 Serve 方法的 for 循环逻辑中,每次有新的请求进来,s.serveWG.Add(1)s.serveWG.Done() 也会被调用一次:


s.serveWG.Add(1)go func() {  s.handleRawConn(lis.Addr().String(), rawConn)  s.serveWG.Done()}()
复制代码


所以,这里其实是在等待 Serve 方法执行完成并退出。


接下来的代码段是根据是否要进行优雅退出,执行不同的逻辑:


s.mu.Lock()defer s.mu.Unlock()
if graceful { s.drainAllServerTransportsLocked()} else { s.closeServerTransportsLocked()}
复制代码


可以发现,接下来的全部操作都加锁处理。


优雅退出走 s.drainAllServerTransportsLocked() 逻辑:


// s.mu must be held by the caller.func (s *Server) drainAllServerTransportsLocked() {  if !s.drain {    for _, conns := range s.conns {      for st := range conns {        st.Drain("graceful_stop")      }    }    s.drain = true  }}
复制代码


它的主要作用是在服务器优雅停止的过程中,让所有的服务器传输层(ServerTransports)停止接收新的请求,但继续处理现有的请求,直到它们完成。


这里有一行注释:s.mu must be held by the caller


说明了在调用 drainAllServerTransportsLocked 方法之前,调用者必须已经持有 s.mu 锁。这是为了确保在执行方法体时,服务器的状态不会被并发修改。


对于 if !s.drain 这个条件判断,其用于确保 drainAllServerTransportsLocked 方法只会执行一次。


s.conns 属性保存了所有连接,是 map[string]map[transport.ServerTransport]bool 类型。


通过嵌套的 for 循环遍历每个连接中的 ServerTransport 实例。ServerTransport 是 gRPC 中的一个概念,它表示一个抽象的传输层实现,负责处理客户端和服务器之间的实际数据传输。


调用 st.Drain("graceful_stop") 方法的作用,是告诉传输层不要再接收新的请求或连接了,但允许继续处理现有的请求,直到它们完成。


这个方法会向客户端发送信号,表明服务器正在进行优雅关闭。它会给所有的客户端发送一个控制帧 GOAWAY(因为 gRPC 是基于 HTTP/2 的,所以才会这样处理),告诉客户端关闭 TCP 连接。


Drain 方法实现如下:


func (t *http2Server) Drain(debugData string) {  t.mu.Lock()  defer t.mu.Unlock()  if t.drainEvent != nil {    return  }  t.drainEvent = grpcsync.NewEvent()  t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})}
复制代码


在完成对所有连接的遍历和 Drain 操作后,将 s.drain 设置为 true,表示服务器已经进入了 "drain" 状态,这样后续不会再次执行相同的操作。


结合之前持有锁的操作,这里不会重复执行。


相对来说,没有采用优雅退出的另一个方法 closeServerTransportsLocked 就要暴力一些:


// s.mu must be held by the caller.func (s *Server) closeServerTransportsLocked() {  for _, conns := range s.conns {    for st := range conns {      st.Close(errors.New("Server.Stop called"))    }  }}
复制代码


这里直接调用 Close 方法关闭连接,省略了控制帧 GOAWAY 的发送。


stop 函数接下来会等待所有现有的连接被安全关闭:


for len(s.conns) != 0 {    s.cv.Wait()}s.conns = nil
复制代码


继续往下执行:


if s.opts.numServerWorkers > 0 {    // Closing the channel (only once, via grpcsync.OnceFunc) after all the    // connections have been closed above ensures that there are no    // goroutines executing the callback passed to st.HandleStreams (where    // the channel is written to).    s.serverWorkerChannelClose()}
复制代码


这段代码用于关闭工作线程的 channel,确保所有处理程序都已经终止,不会再处理新的请求。


s.serverWorkerChannelClose 在初始化操作时被赋值:


// initServerWorkers creates worker goroutines and a channel to process incoming// connections to reduce the time spent overall on runtime.morestack.func (s *Server) initServerWorkers() {  s.serverWorkerChannel = make(chan func())  s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {    close(s.serverWorkerChannel)  })  for i := uint32(0); i < s.opts.numServerWorkers; i++ {    go s.serverWorker()  }}
复制代码


接下来,如果是优雅退出或者配置了 s.opts.waitForHandlers 选项,则代码会调用 s.handlersWG.Wait(),等待所有的处理程序完成:


if graceful || s.opts.waitForHandlers {  s.handlersWG.Wait()}
复制代码


不知你有没有注意到,Serve 方法内部,调用了 s.handleRawConn(lis.Addr().String(), rawConn) 来处理每一个请求。


handleRawConn 方法内部会调用 s.serveStreams(context.Background(), st, rawConn) 方法。


serveStreams 方法定义如下:


func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {  ctx = transport.SetConnection(ctx, rawConn)  ctx = peer.NewContext(ctx, st.Peer())  for _, sh := range s.opts.statsHandlers {    ctx = sh.TagConn(ctx, &stats.ConnTagInfo{      RemoteAddr: st.Peer().Addr,      LocalAddr:  st.Peer().LocalAddr,    })    sh.HandleConn(ctx, &stats.ConnBegin{})  }
defer func() { st.Close(errors.New("finished serving streams for the server transport")) for _, sh := range s.opts.statsHandlers { sh.HandleConn(ctx, &stats.ConnEnd{}) } }()
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams) st.HandleStreams(ctx, func(stream *transport.Stream) { s.handlersWG.Add(1) streamQuota.acquire() f := func() { defer streamQuota.release() defer s.handlersWG.Done() s.handleStream(st, stream) }
if s.opts.numServerWorkers > 0 { select { case s.serverWorkerChannel <- f: return default: // If all stream workers are busy, fallback to the default code path. } } go f() })}
复制代码


serveStreams 方法内部会调用 s.handlersWG.Add(1)s.handlersWG.Done() 操作。


serveStreams 执行完成后,stop 方法就可以继续执行了:


if s.events != nil {  s.events.Finish()  s.events = nil}
复制代码


stop 方法最后,对事件进行了清理,检查 s.events 是否不为空,如果不为空,则调用 s.events.Finish(),完成事件的清理工作。


至此,GracefulStop 的源码就分析完成了。


可以发现 grpc 的优雅退出跟 net/http 的优雅退出还是有很多相似之处的,这也很好理解,gRPC 是基于 HTTP/2 的,所以底层也是 HTTP 协议。

普通 Go 程序的优雅退出

在日常开发中,除了 HTTP Server 或者 gRPC Server 程序,我们也可能会开发一些其他需要优雅退出的程序。


在文章的最后一个小节,我再来介绍一种常见的周期性执行任务的 Go 程序如何实现优雅退出。


示例代码如下:


package main
import ( "log" "time"
genericapiserver "k8s.io/apiserver/pkg/server")
type Syncer struct { interval time.Duration}
func (s *Syncer) Run(quit <-chan struct{}) error { ticker := time.NewTicker(s.interval) defer ticker.Stop()
for { select { case <-ticker.C: // 业务逻辑 log.Println("do something") case <-quit: log.Println("Stop loop") s.Stop() return nil } }}
func (s *Syncer) Stop() { log.Println("Stop Syncer start") time.Sleep(time.Second * 5) log.Println("Stop Syncer done")}
func main() { s := Syncer{interval: time.Second} quit := genericapiserver.SetupSignalHandler()
if err := s.Run(quit); err != nil { log.Fatalf("Syncer run err: %s", err.Error()) }}
复制代码


这里定义了一个 Syncer 结构体,用来实现周期性执行一段代码逻辑。


Run 方法中用到了 time.Ticker,在 for 循环中周期执行某些业务逻辑,比如定时同步数据状态、生成数据报表等。


Run 方法中还用到了类似 http.Server.Shutdown 方法内部的 for + select 代码结构,来实现接收退出信号 <-quit。当收到退出信号,就调用 s.Stop() 进行优雅退出逻辑。


执行示例代码,中途按 Ctrl + C 进行优雅退出操作:


$ go build -o main main.go && ./main2024/08/22 09:27:34 do something2024/08/22 09:27:35 do something2024/08/22 09:27:36 do something^C2024/08/22 09:27:37 Stop loop2024/08/22 09:27:37 Stop Syncer start2024/08/22 09:27:42 Stop Syncer done
复制代码


也可以尝试强制退出:


$ go build -o main main.go && ./main2024/08/22 09:28:05 do something2024/08/22 09:28:06 do something2024/08/22 09:28:07 do something^C2024/08/22 09:28:07 Stop loop2024/08/22 09:28:07 Stop Syncer start^C
复制代码


两种操作都没有问题。


可以将这个示例程序作为优雅退出的代码模板,集成进你的 Go 程序中。


至此,本文要讲解的内容就全部都写完了。

总结

所谓的优雅退出,其实就是在关闭进程的时候,不能“暴力”关闭,而是要等待进程中的逻辑(比如一次完整的 HTTP 请求)处理完成后,才关闭进程。


Go 为我们提供的 os/singal 包进行信号处理。默认情况下接收到退出信号 Go 程序会立即退出,使用 signal.Notify 注册关注的退出信号以后,我们可以实现自己的处理逻辑。


常见退出信号有 SIGINTSIGQUITSIGTERMSIGKILLSIGKILL 不能被 Go 程序捕获。


我们分别为 HTTP Server、gRPC Server 以及周期性执行任务的 Go 程序实现了优雅退出功能,并且对 net/http 包的 Shutdown 源码以及 grpc 包的 GracefulStop 源码都进行了分析和讲解。


本文还重点讲解了 K8s 为我们提供了一种更加优雅的方式,来实现优雅退出功能。我们可以实现收到一次 SIGINT/SIGTERM 信号,程序优雅退出,收到两次 SIGINT/SIGTERM 信号,程序强制退出。


切记,忽略优雅退出可能会导致数据的不一致问题,因此实现优雅退出功能是非常有必要的。


实现优雅退出最核心的两点:


  1. 接收退出信号。

  2. 如何等待正在处理的任务完成,还要考虑超时机制。


这两步做完,程序就可以退出了。


本文示例源码我都放在了 GitHub 中,欢迎点击查看。


希望此文能对你有所启发。


联系我


发布于: 刚刚阅读数: 5
用户头像

江湖十年

关注

野生程序员|公众号:Go编程世界 2018-11-10 加入

资深容器云开发工程师,分享不限于 Go、Python、Docker、K8s 技术。

评论

发布
暂无评论
Go 程序如何实现优雅退出?来看看 K8s 是怎么做的——下篇_k8s_江湖十年_InfoQ写作社区