本文带大家一起来详细学习下 Go 中的优雅退出,由于文章过长,拆分成上下两篇,本文为下篇。
K8s 的优雅退出
现在,我们已经掌握了 Go 中 HTTP Server 程序如何实现优雅退出,是时候看一看 K8s 中提供的一种更为优雅的优雅退出退出方案了😄。
这要从 K8s API Server 启动入口说起:
https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/apiserver.go
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}
复制代码
K8s API Server 启动入口代码非常简单,我们可以进入 app.NewAPIServerCommand()
查看更多细节:
https://github.com/kubernetes/kubernetes/blob/release-1.31/cmd/kube-apiserver/app/server.go#L122
// NewAPIServerCommand creates a *cobra.Command object with default parameters
func NewAPIServerCommand() *cobra.Command {
...
cmd := &cobra.Command{
...
RunE: func(cmd *cobra.Command, args []string) error {
...
return Run(cmd.Context(), completedOptions)
},
...
}
cmd.SetContext(genericapiserver.SetupSignalContext())
...
return cmd
}
复制代码
在 NewAPIServerCommand
函数中,我们要关注的核心代码只有两行:
一行是 cmd.SetContext(genericapiserver.SetupSignalContext())
,这是在为 cmd
对象设置 ctx
属性。
另一行是 RunE
属性中最后一行代码 Run(cmd.Context(), completedOptions)
,这里是启动程序,并使用了 cmd
对象的 ctx
属性。
很明显,K8s 使用了 Go 语言中流行的 Cobra 命令行框架作为程序的启动框架,Cobra 提供了如下两个方法可以设置和获取 Context
:
func (c *Command) Context() context.Context {
return c.ctx
}
func (c *Command) SetContext(ctx context.Context) {
c.ctx = ctx
}
复制代码
NOTE:如果你对 Cobra 不太熟悉,可以参考我的另一篇文章《Go 语言现代命令行框架 Cobra 详解》。
这里的 ctx
就是串联起 K8s 实现优雅退出的核心对象。
首先通过 genericapiserver.SetupSignalContext()
获取到一个 context.Context
对象,根据函数名称可以猜测到它可能跟信号有关。
对于 Run(cmd.Context(), completedOptions)
方法的调用,由于嵌套层级比较深,逻辑比较复杂,我就不把整个代码调用链都贴出来讲了。总之,这个启动过程最终可以定位到 preparedGenericAPIServer.RunWithContext 这个方法的执行。在 RunWithContext
方法内部的第一行代码 stopCh := ctx.Done()
是重点,它拿到了一个控制程序退出时机的 channel
(这跟我们前文讲解的优雅退出示例中 quit := make(chan os.Signal, 1)
变量作用相同),而这个 ctx
实际上就是 genericapiserver.SetupSignalContext()
的返回值,如果你感兴趣可以详细研究下这个 stopCh
的使用过程。
我们直接去分析 genericapiserver.SetupSignalContext()
的实现:
https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal.go
package server
import (
"context"
"os"
"os/signal"
)
var onlyOneSignalHandler = make(chan struct{})
var shutdownHandler chan os.Signal
// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalHandler() <-chan struct{} {
return SetupSignalContext().Done()
}
// SetupSignalContext is same as SetupSignalHandler, but a context.Context is returned.
// Only one of SetupSignalContext and SetupSignalHandler should be called, and only can
// be called once.
func SetupSignalContext() context.Context {
close(onlyOneSignalHandler) // panics when called twice
shutdownHandler = make(chan os.Signal, 2)
ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
cancel()
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()
return ctx
}
// RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)
// This returns whether a handler was notified
func RequestShutdown() bool {
if shutdownHandler != nil {
select {
case shutdownHandler <- shutdownSignals[0]:
return true
default:
}
}
return false
}
复制代码
这里代码不多,但却相当精妙,可以一窥 K8s 设计之优雅。
我们从 SetupSignalContext
函数开始分析。
SetupSignalContext
函数第一行代码,通过调用 close(onlyOneSignalHandler)
来确保在整个程序中只调用一次 SetupSignalContext
函数,调用多次则直接 panic
。这能强制调用方写出正确的代码,避免出现意料之外的情况。
shutdownHandler
是一个包含了两个缓冲区的 channel
,而不像我们定义的 quit := make(chan os.Signal, 1)
那样只有一个缓冲区大小。
我们前文讲过,通过 signal.Notify(c chan<- os.Signal, sig ...os.Signal)
函数注册所关注的信号后,signal
包在给 c
发送信号时不会阻塞。因为我们要接收两次退出信号,所以 shutdownHandler
缓冲区大小为 2
。
这也是 SetupSignalContext
函数的精髓所在,它实现了收到一次 SIGINT/SIGTERM
信号,程序优雅退出,收到两次 SIGINT/SIGTERM
信号,程序强制退出的功能。
代码片段如下:
ctx, cancel := context.WithCancel(context.Background())
signal.Notify(shutdownHandler, shutdownSignals...)
go func() {
<-shutdownHandler
cancel()
<-shutdownHandler
os.Exit(1) // second signal. Exit directly.
}()
复制代码
这里使用一个带有取消功能的 Context
,当第一次收到信号时,就调用 cancel()
取消这个 ctx
。而这个 ctx
会作为函数返回值返给调用方,调用方拿到它,就可以在需要的地方调用 <-ctx.Done()
来等待退出信号了。这就是 preparedGenericAPIServer.RunWithContext
方法中调用 stopCh := ctx.Done()
拿到 channel
,然后等待 <-stopCh
退出信号的逻辑了。
这里用到的 shutdownSignals
变量,定义在 signal_posix.go
文件中:
https://github.com/kubernetes/apiserver/blob/release-1.31/pkg/server/signal_posix.go
package server
import (
"os"
"syscall"
)
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
复制代码
shutdownSignals
是一个保存了两个信号的切片对象。
os.Interrupt
实际上是一个变量,它的值等于 syscall.SIGINT
。
// The only signal values guaranteed to be present in the os package on all
// systems are os.Interrupt (send the process an interrupt) and os.Kill (force
// the process to exit). On Windows, sending os.Interrupt to a process with
// os.Process.Signal is not implemented; it will return an error instead of
// sending a signal.
var (
Interrupt Signal = syscall.SIGINT
Kill Signal = syscall.SIGKILL
)
复制代码
这里为实现优雅退出,监控了两个信号 SIGINT
和 SIGTERM
,并没有监控 SIGQUIT
信号。不过这已经足够用了,根据我的经验,绝大多数情况下我们都会使用 Ctrl + C
终止程序,而非使用 Ctrl + \
。
SetupSignalHandler
函数内部调用了 SetupSignalContext
函数,它唯一的作用就是直接返回给调用方 ctx.Done()
所返回的 channel
,以此来方便调用方。
RequestShutdown
函数可以主动触发退出事件信号(SIGTERM/SIGINT
),返回值表示是否触发成功。
现在将 K8s 优雅退出方案集成进我们的 net/http
优雅退出示例程序中:
package main
import (
"context"
"errors"
"log"
"net/http"
"time"
genericapiserver "k8s.io/apiserver/pkg/server"
)
func main() {
srv := &http.Server{
Addr: ":8000",
}
http.HandleFunc("/sleep", func(w http.ResponseWriter, r *http.Request) {
duration, err := time.ParseDuration(r.FormValue("duration"))
if err != nil {
http.Error(w, err.Error(), 400)
return
}
time.Sleep(duration)
_, _ = w.Write([]byte("Welcome HTTP Server"))
})
go func() {
if err := srv.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("HTTP server error: %v", err)
}
log.Println("Stopped serving new connections")
}()
// NOTE: 只需要替换这 3 行代码,Gin 版本同理
// quit := make(chan os.Signal, 1)
// signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
// <-quit
// 可以直接丢弃,context.Context.Done() 返回的就是普通空结构体
<-genericapiserver.SetupSignalHandler()
log.Println("Shutdown Server...")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// We received an SIGINT/SIGTERM signal, shut down.
if err := srv.Shutdown(ctx); err != nil {
// Error from closing listeners, or context timeout:
log.Printf("HTTP server Shutdown: %v", err)
}
log.Println("HTTP server graceful shutdown completed")
}
复制代码
我们只需要将如下 3 行代码:
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
复制代码
替换成 K8s 提供的 SetupSignalHandler
函数调用即可:
<-genericapiserver.SetupSignalHandler()
复制代码
其他代码都不用修改。
执行示例程序,按一次 Ctrl + C
测试优雅退出:
$ go build -o main main.go && ./main
^C2024/08/22 09:24:46 Shutdown Server...
2024/08/22 09:24:46 Stopped serving new connections
2024/08/22 09:24:49 HTTP server graceful shutdown completed
$ echo $?
0
复制代码
$ curl "http://localhost:8000/sleep?duration=5s"
Welcome HTTP Server
复制代码
执行示例程序,按两次 Ctrl + C
测试强制退出:
$ go build -o main main.go && ./main
^C2024/08/22 09:25:28 Shutdown Server...
2024/08/22 09:25:28 Stopped serving new connections
^C
$ echo $?
1
复制代码
$ curl "http://localhost:8000/sleep?duration=5s"
curl: (52) Empty reply from server
复制代码
完美,K8s 为我们提供了优雅退出的新思路。这样在开发环境,为了方便调试,我们可以无需等待优雅退出,只要连续发送两次 SIGTERM/SIGINT
即可强制退出程序。在生产环境发送一次 SIGTERM/SIGINT
信号等待优雅退出。
使用 Gin 框架开发的 Web 程序也可以这样修改,你可以自行尝试。
gRPC 的优雅退出
gRPC Server 优雅退出
接下来我们一起看下 gRPC Server 程序如何实现优雅退出。
示例程序目录结构如下:
$ tree grpc
grpc
├── Makefile
├── client
│ └── main.go
├── pb
│ ├── helloworld.pb.go
│ ├── helloworld.proto
│ └── helloworld_grpc.pb.go
└── server
└── main.go
复制代码
helloworld.proto
中定义了 gRPC Server 支持的服务接口:
syntax = "proto3";
option go_package = ".;pb";
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
string duration = 2;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
复制代码
server/main.go
中 Server 端代码如下:
// Package main implements a server for Greeter service.
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
genericapiserver "k8s.io/apiserver/pkg/server"
"github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb"
)
var (
port = flag.Int("port", 50051, "The server port")
)
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
duration, _ := time.ParseDuration(in.GetDuration())
time.Sleep(duration)
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
<-genericapiserver.SetupSignalHandler()
log.Printf("Shutdown Server...")
s.GracefulStop()
log.Println("gRPC server graceful shutdown completed")
}
复制代码
这与 HTTP Server 的优雅退出逻辑基本相同,同样由 grpc
包提供了优雅退出方法 GracefulStop
。
在接收到退出信号以后,调用 s.GracefulStop()
方法即可实现优雅退出。可以发现,这其实是一个优雅退出的套路。
client/main.go
中 Client 端代码如下:
// Package main implements a client for Greeter service.
package main
import (
"context"
"flag"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/jianghushinian/blog-go-example/gracefulstop/grpc/pb"
)
const (
defaultName = "world"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)
func main() {
flag.Parse()
// Set up a connection to the server.
conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)
// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*60)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name, Duration: "10s"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}
复制代码
执行示例程序,测试优雅退出逻辑:
# 执行服务端代码
$ go build -o main main.go && ./main
2024/08/22 09:26:17 server listening at [::]:50051
2024/08/22 09:26:24 Received: world
^C2024/08/22 09:26:26 Shutdown Server...
2024/08/22 09:26:34 gRPC server graceful shutdown completed
$ echo $?
0
复制代码
# 执行客户端代码
$ go build -o main main.go && ./main
2024/08/22 09:26:34 Greeting: Hello world
复制代码
优雅退出生效。
既然 gRPC Server 中的优雅退出方案已经介绍完了,同讲解 HTTP Server 优雅退出一样,接下来我再带你一起深入了解一下 GracefulStop
的源码是如何实现的。
GracefulStop 源码
GracefulStop
方法源码如下:
https://github.com/grpc/grpc-go/blob/v1.65.0/server.go#L1882
// GracefulStop stops the gRPC server gracefully. It stops the server from
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
s.stop(true)
}
func (s *Server) stop(graceful bool) {
s.quit.Fire()
defer s.done.Fire()
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
s.mu.Lock()
s.closeListenersLocked()
// Wait for serving threads to be ready to exit. Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
s.serveWG.Wait()
s.mu.Lock()
defer s.mu.Unlock()
if graceful {
s.drainAllServerTransportsLocked()
} else {
s.closeServerTransportsLocked()
}
for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil
if s.opts.numServerWorkers > 0 {
// Closing the channel (only once, via grpcsync.OnceFunc) after all the
// connections have been closed above ensures that there are no
// goroutines executing the callback passed to st.HandleStreams (where
// the channel is written to).
s.serverWorkerChannelClose()
}
if graceful || s.opts.waitForHandlers {
s.handlersWG.Wait()
}
if s.events != nil {
s.events.Finish()
s.events = nil
}
}
复制代码
GracefulStop
方法直接调用了 s.stop(true)
方法。
stop
方法的 graceful
参数用来决定是否启用优雅退出,传递 true
表示优雅退出,传递 false
表示强制退出。
stop
方法第一段代码逻辑如下:
s.quit.Fire()
defer s.done.Fire()
复制代码
Server
对象的 quit
和 done
属性类型都为 *grpcsync.Event
,前者用来标记 gRPC Server 正在执行退出流程,后者标记退出完成。
Event
定义如下:
https://github.com/grpc/grpc-go/blob/v1.65.0/internal/grpcsync/event.go
// Event represents a one-time event that may occur in the future.
type Event struct {
fired int32
c chan struct{}
o sync.Once
}
// Fire causes e to complete. It is safe to call multiple times, and
// concurrently. It returns true iff this call to Fire caused the signaling
// channel returned by Done to close.
func (e *Event) Fire() bool {
ret := false
e.o.Do(func() {
atomic.StoreInt32(&e.fired, 1)
close(e.c)
ret = true
})
return ret
}
// Done returns a channel that will be closed when Fire is called.
func (e *Event) Done() <-chan struct{} {
return e.c
}
// HasFired returns true if Fire has been called.
func (e *Event) HasFired() bool {
return atomic.LoadInt32(&e.fired) == 1
}
// NewEvent returns a new, ready-to-use Event.
func NewEvent() *Event {
return &Event{c: make(chan struct{})}
}
复制代码
可以发现,*Event
对象的 Fire
方法就是将 fired
字段值置为 1
,并且关闭类型为 channel
的字段 c
,所以其实只要调用了 Fire
,那么调用 Done
方法将立即返回,调用 HasFired
方法就是在判断 fired
字段值是否为 1
(即是否调用过 Fire
方法)。
s.quit.Fire()
代码被调用以后,Serve
方法就能够感知到当前服务正在退出,接下来就不会再接收新的请求进来了。
Serve
方法源码如下:
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
//
// Note: All supported releases of Go (as of December 2023) override the OS
// defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
// with OS defaults for keepalive time and interval, callers need to do the
// following two things:
// - pass a net.Listener created by calling the Listen method on a
// net.ListenConfig with the `KeepAlive` field set to a negative value. This
// will result in the Go standard library not overriding OS defaults for TCP
// keepalive interval and time. But this will also result in the Go standard
// library not enabling TCP keepalives by default.
// - override the Accept method on the passed in net.Listener and set the
// SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
s.serve = true
if s.lis == nil {
// Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
// 判断当前服务是否正在退出
if s.quit.HasFired() {
// Stop or GracefulStop called; block until done and return nil.
<-s.done.Done()
}
}()
ls := &listenSocket{
Listener: lis,
channelz: channelz.RegisterSocket(&channelz.Socket{
SocketType: channelz.SocketTypeListen,
Parent: s.channelz,
RefName: lis.Addr().String(),
LocalAddr: lis.Addr(),
SocketOptions: channelz.GetSocketOption(lis)},
),
}
s.lis[ls] = true
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
s.mu.Unlock()
channelz.Info(logger, ls.channelz, "ListenSocket created")
var tempDelay time.Duration // how long to sleep on accept failure
for {
rawConn, err := lis.Accept()
if err != nil {
if ne, ok := err.(interface {
Temporary() bool
}); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
// 判断当前服务是否正在退出
case <-s.quit.Done():
timer.Stop()
return nil
}
continue
}
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
// 判断当前服务是否正在退出
if s.quit.HasFired() {
return nil
}
return err
}
tempDelay = 0
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}
复制代码
我们可以直接跳到 for
循环部分的代码段,每次通过 rawConn, err := lis.Accept()
接收到一个新的请求进来,都会使用 if s.quit.HasFired()
来判断当前服务是否正在退出,如果返回结果为 true
,则 Serve
方法直接退出。
此时 Serve
方法的 defer
语句开始执行,这里会再次使用 if s.quit.HasFired()
判断当前服务是否正在退出(之所以判断两次,因为 Serve
方法也可能由于其他原因导致退出,进入 defer
逻辑),如果是,则调用 <-s.done.Done()
阻塞在这里,直到 GracefulStop
优雅退出逻辑执行完成。
此外,for
循环内部的 select
语句中,有一个 case 调用了 <-s.quit.Done()
,也是在判断当前服务是否正在退出,如果是,则调用 timer.Stop()
清理定时器后,Serve
方法直接退出。
我们接着往下看:
s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
s.mu.Lock()
s.closeListenersLocked()
// Wait for serving threads to be ready to exit. Only then can we be sure no
// new conns will be created.
s.mu.Unlock()
s.serveWG.Wait()
复制代码
这里的 channelz
是 gRPC 的一个监控工具,用于跟踪 gRPC 的内部状态,RemoveEntry
会移除该服务器的监控数据。
s.closeListenersLocked()
方法是不是很熟悉,这与 net/http
包中的 Shutdown
方法命名都一样,作用也就不言而喻了。
定义如下:
// s.mu must be held by the caller.
func (s *Server) closeListenersLocked() {
for lis := range s.lis {
lis.Close()
}
s.lis = nil
}
复制代码
对于 s.serveWG.Wait()
这行代码,根据这个操作的属性名和方法名可以猜到,serveWG
明显是 sync.WaitGroup
类型。
既然有 Wait()
,那就应该会有 Add(1)
操作。其正在前文中贴出 Serve
代码中。
刚进入 Serve
方法时,就会调用 s.serveWG.Add(1)
,Serve
方法退出时执行 s.serveWG.Done()
:
s.serveWG.Add(1)
defer func() {
s.serveWG.Done()
if s.quit.HasFired() {
// Stop or GracefulStop called; block until done and return nil.
<-s.done.Done()
}
}()
复制代码
并且,在 Serve
方法的 for
循环逻辑中,每次有新的请求进来,s.serveWG.Add(1)
和 s.serveWG.Done()
也会被调用一次:
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
复制代码
所以,这里其实是在等待 Serve
方法执行完成并退出。
接下来的代码段是根据是否要进行优雅退出,执行不同的逻辑:
s.mu.Lock()
defer s.mu.Unlock()
if graceful {
s.drainAllServerTransportsLocked()
} else {
s.closeServerTransportsLocked()
}
复制代码
可以发现,接下来的全部操作都加锁处理。
优雅退出走 s.drainAllServerTransportsLocked()
逻辑:
// s.mu must be held by the caller.
func (s *Server) drainAllServerTransportsLocked() {
if !s.drain {
for _, conns := range s.conns {
for st := range conns {
st.Drain("graceful_stop")
}
}
s.drain = true
}
}
复制代码
它的主要作用是在服务器优雅停止的过程中,让所有的服务器传输层(ServerTransports)停止接收新的请求,但继续处理现有的请求,直到它们完成。
这里有一行注释:s.mu must be held by the caller
。
说明了在调用 drainAllServerTransportsLocked
方法之前,调用者必须已经持有 s.mu
锁。这是为了确保在执行方法体时,服务器的状态不会被并发修改。
对于 if !s.drain
这个条件判断,其用于确保 drainAllServerTransportsLocked
方法只会执行一次。
s.conns
属性保存了所有连接,是 map[string]map[transport.ServerTransport]bool
类型。
通过嵌套的 for
循环遍历每个连接中的 ServerTransport
实例。ServerTransport
是 gRPC 中的一个概念,它表示一个抽象的传输层实现,负责处理客户端和服务器之间的实际数据传输。
调用 st.Drain("graceful_stop")
方法的作用,是告诉传输层不要再接收新的请求或连接了,但允许继续处理现有的请求,直到它们完成。
这个方法会向客户端发送信号,表明服务器正在进行优雅关闭。它会给所有的客户端发送一个控制帧 GOAWAY
(因为 gRPC 是基于 HTTP/2 的,所以才会这样处理),告诉客户端关闭 TCP 连接。
Drain
方法实现如下:
func (t *http2Server) Drain(debugData string) {
t.mu.Lock()
defer t.mu.Unlock()
if t.drainEvent != nil {
return
}
t.drainEvent = grpcsync.NewEvent()
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
}
复制代码
在完成对所有连接的遍历和 Drain
操作后,将 s.drain
设置为 true
,表示服务器已经进入了 "drain" 状态,这样后续不会再次执行相同的操作。
结合之前持有锁的操作,这里不会重复执行。
相对来说,没有采用优雅退出的另一个方法 closeServerTransportsLocked
就要暴力一些:
// s.mu must be held by the caller.
func (s *Server) closeServerTransportsLocked() {
for _, conns := range s.conns {
for st := range conns {
st.Close(errors.New("Server.Stop called"))
}
}
}
复制代码
这里直接调用 Close
方法关闭连接,省略了控制帧 GOAWAY
的发送。
stop
函数接下来会等待所有现有的连接被安全关闭:
for len(s.conns) != 0 {
s.cv.Wait()
}
s.conns = nil
复制代码
继续往下执行:
if s.opts.numServerWorkers > 0 {
// Closing the channel (only once, via grpcsync.OnceFunc) after all the
// connections have been closed above ensures that there are no
// goroutines executing the callback passed to st.HandleStreams (where
// the channel is written to).
s.serverWorkerChannelClose()
}
复制代码
这段代码用于关闭工作线程的 channel
,确保所有处理程序都已经终止,不会再处理新的请求。
s.serverWorkerChannelClose
在初始化操作时被赋值:
// initServerWorkers creates worker goroutines and a channel to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan func())
s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
close(s.serverWorkerChannel)
})
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
}
复制代码
接下来,如果是优雅退出或者配置了 s.opts.waitForHandlers
选项,则代码会调用 s.handlersWG.Wait()
,等待所有的处理程序完成:
if graceful || s.opts.waitForHandlers {
s.handlersWG.Wait()
}
复制代码
不知你有没有注意到,Serve
方法内部,调用了 s.handleRawConn(lis.Addr().String(), rawConn)
来处理每一个请求。
而 handleRawConn
方法内部会调用 s.serveStreams(context.Background(), st, rawConn)
方法。
serveStreams
方法定义如下:
func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
ctx = transport.SetConnection(ctx, rawConn)
ctx = peer.NewContext(ctx, st.Peer())
for _, sh := range s.opts.statsHandlers {
ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
RemoteAddr: st.Peer().Addr,
LocalAddr: st.Peer().LocalAddr,
})
sh.HandleConn(ctx, &stats.ConnBegin{})
}
defer func() {
st.Close(errors.New("finished serving streams for the server transport"))
for _, sh := range s.opts.statsHandlers {
sh.HandleConn(ctx, &stats.ConnEnd{})
}
}()
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
st.HandleStreams(ctx, func(stream *transport.Stream) {
s.handlersWG.Add(1)
streamQuota.acquire()
f := func() {
defer streamQuota.release()
defer s.handlersWG.Done()
s.handleStream(st, stream)
}
if s.opts.numServerWorkers > 0 {
select {
case s.serverWorkerChannel <- f:
return
default:
// If all stream workers are busy, fallback to the default code path.
}
}
go f()
})
}
复制代码
serveStreams
方法内部会调用 s.handlersWG.Add(1)
和 s.handlersWG.Done()
操作。
serveStreams
执行完成后,stop
方法就可以继续执行了:
if s.events != nil {
s.events.Finish()
s.events = nil
}
复制代码
stop
方法最后,对事件进行了清理,检查 s.events
是否不为空,如果不为空,则调用 s.events.Finish()
,完成事件的清理工作。
至此,GracefulStop
的源码就分析完成了。
可以发现 grpc
的优雅退出跟 net/http
的优雅退出还是有很多相似之处的,这也很好理解,gRPC 是基于 HTTP/2 的,所以底层也是 HTTP 协议。
普通 Go 程序的优雅退出
在日常开发中,除了 HTTP Server 或者 gRPC Server 程序,我们也可能会开发一些其他需要优雅退出的程序。
在文章的最后一个小节,我再来介绍一种常见的周期性执行任务的 Go 程序如何实现优雅退出。
示例代码如下:
package main
import (
"log"
"time"
genericapiserver "k8s.io/apiserver/pkg/server"
)
type Syncer struct {
interval time.Duration
}
func (s *Syncer) Run(quit <-chan struct{}) error {
ticker := time.NewTicker(s.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// 业务逻辑
log.Println("do something")
case <-quit:
log.Println("Stop loop")
s.Stop()
return nil
}
}
}
func (s *Syncer) Stop() {
log.Println("Stop Syncer start")
time.Sleep(time.Second * 5)
log.Println("Stop Syncer done")
}
func main() {
s := Syncer{interval: time.Second}
quit := genericapiserver.SetupSignalHandler()
if err := s.Run(quit); err != nil {
log.Fatalf("Syncer run err: %s", err.Error())
}
}
复制代码
这里定义了一个 Syncer
结构体,用来实现周期性执行一段代码逻辑。
Run
方法中用到了 time.Ticker
,在 for
循环中周期执行某些业务逻辑,比如定时同步数据状态、生成数据报表等。
Run
方法中还用到了类似 http.Server.Shutdown
方法内部的 for
+ select
代码结构,来实现接收退出信号 <-quit
。当收到退出信号,就调用 s.Stop()
进行优雅退出逻辑。
执行示例代码,中途按 Ctrl + C
进行优雅退出操作:
$ go build -o main main.go && ./main
2024/08/22 09:27:34 do something
2024/08/22 09:27:35 do something
2024/08/22 09:27:36 do something
^C2024/08/22 09:27:37 Stop loop
2024/08/22 09:27:37 Stop Syncer start
2024/08/22 09:27:42 Stop Syncer done
复制代码
也可以尝试强制退出:
$ go build -o main main.go && ./main
2024/08/22 09:28:05 do something
2024/08/22 09:28:06 do something
2024/08/22 09:28:07 do something
^C2024/08/22 09:28:07 Stop loop
2024/08/22 09:28:07 Stop Syncer start
^C
复制代码
两种操作都没有问题。
可以将这个示例程序作为优雅退出的代码模板,集成进你的 Go 程序中。
至此,本文要讲解的内容就全部都写完了。
总结
所谓的优雅退出,其实就是在关闭进程的时候,不能“暴力”关闭,而是要等待进程中的逻辑(比如一次完整的 HTTP 请求)处理完成后,才关闭进程。
Go 为我们提供的 os/singal
包进行信号处理。默认情况下接收到退出信号 Go 程序会立即退出,使用 signal.Notify
注册关注的退出信号以后,我们可以实现自己的处理逻辑。
常见退出信号有 SIGINT
、SIGQUIT
、SIGTERM
和 SIGKILL
,SIGKILL
不能被 Go 程序捕获。
我们分别为 HTTP Server、gRPC Server 以及周期性执行任务的 Go 程序实现了优雅退出功能,并且对 net/http
包的 Shutdown
源码以及 grpc
包的 GracefulStop
源码都进行了分析和讲解。
本文还重点讲解了 K8s 为我们提供了一种更加优雅的方式,来实现优雅退出功能。我们可以实现收到一次 SIGINT/SIGTERM
信号,程序优雅退出,收到两次 SIGINT/SIGTERM
信号,程序强制退出。
切记,忽略优雅退出可能会导致数据的不一致问题,因此实现优雅退出功能是非常有必要的。
实现优雅退出最核心的两点:
接收退出信号。
如何等待正在处理的任务完成,还要考虑超时机制。
这两步做完,程序就可以退出了。
本文示例源码我都放在了 GitHub 中,欢迎点击查看。
希望此文能对你有所启发。
联系我
评论