写点什么

cri-o 技术探秘 2

用户头像
xumc
关注
发布于: 2021 年 05 月 12 日
cri-o 技术探秘2

1. resourceStore 在 crio 中的使用

我们知道,k8s scheduler 如果发现当前的系统状态和定义不一致的话,就会启用对应的策略让当前系统状态变得和定义一致。拿创建一个新 pod 为例,如果发现 pod name 没有,就会调用 CNI 接口,来创建一个新的 pod。但是我们知道, pod 也不是一下子就能够起来的,在某些情况下, 需要花费一些时间来完成。这个使用 CNI 的 RunPodSandbox 接口返回 pod_id(sandbox.id)的时候, pod 未必已经成功创建出来。那么接下来 k8s scheduler 可能仍然会再一次触发 RunPodSandbox 接口,代码再次走到 ReservePodName 的时候,由于 pod name 已经存在了, 代码再次走到 ReservePodName 的时候就会失败(虽然我们知道,失败是预期的),没有引入 resourceStore 前,cri-o 会打印出大量的 log "Kubelet may be retrying requests that are timing out in CRI-O due to system load".直至第一次 RunPodSandbox 成功。引入 resourceStore 后,我们就可以等,直到 pod 创建成功。这是我们通过调用 getResourceOrWait 函数来实现的。


为什么只有在 context.Canceled 或者 context.DeadlineExceeded 的时候才把把 pod 信息放入到 resourceStore 呢?k8s 调用 RunPodSandbox 接口的时候,在某些情况下(如 system load 特别高),第一次 RunPodSandbox 可能会超时,继而触发 DeadlineExceeded,这个时候,cri-o 并不会停止处理 RunPodSandbox,而是继续处理,并且把创建的 pod 放入到 resourceStore 中,这样下一次 k8s scheduler 再次调用 RunPodSandbox 的时候,就会直接从 resourceStore 中找到新创建的 pod。这样就能有效 k8s scheduler 重复触发 RunPodSandbox 接口。


container create 的流程中也是使用的 resourceStore 来解决类似问题,流程类似,不在赘述。


在 runPodSandbox 尾部代码,我们可以看到调用了 sb.SetCreated()讲 pod 设置为 create 完毕状态,这个时候 getResourceOrWait 函数中设置的 watcher 就会发现 pod 已经创建完成了, 继而处于等待状态 rpc 请求返回 pod.id(cachedID)给 k8s scheduler。


func (s *Server) runPodSandbox(ctx context.Context, req *types.RunPodSandboxRequest) (resp *types.RunPodSandboxResponse, retErr error) {...  if _, err := s.ReservePodName(sbox.ID(), sbox.Name()); err != nil {    cachedID, resourceErr := s.getResourceOrWait(ctx, sbox.Name(), "sandbox")    if resourceErr == nil {      return &types.RunPodSandboxResponse{PodSandboxID: cachedID}, nil    }    return nil, errors.Wrapf(err, resourceErr.Error())  }...  if isContextError(ctx.Err()) {    if err := s.resourceStore.Put(sbox.Name(), sb, resourceCleaner); err != nil {      log.Errorf(ctx, "runSandbox: failed to save progress of sandbox %s: %v", sbox.ID(), err)    }    log.Infof(ctx, "runSandbox: context was either canceled or the deadline was exceeded: %v", ctx.Err())    return nil, ctx.Err()  }  sb.SetCreated()...}
func (s *Server) getResourceOrWait(ctx context.Context, name, resourceType string) (string, error) { const resourceCreationWaitTime = time.Minute * 4
if cachedID := s.resourceStore.Get(name); cachedID != "" { log.Infof(ctx, "Found %s %s with ID %s in resource cache; using it", resourceType, name, cachedID) return cachedID, nil } watcher := s.resourceStore.WatcherForResource(name) if watcher == nil { return "", errors.Errorf("error attempting to watch for %s %s: no longer found", resourceType, name) } log.Infof(ctx, "Creation of %s %s not yet finished. Waiting up to %v for it to finish", resourceType, name, resourceCreationWaitTime) var err error select { // We should wait as long as we can (within reason), thus stalling the kubelet's sync loop. // This will prevent "name is reserved" errors popping up every two seconds. case <-ctx.Done(): err = ctx.Err() // This is probably overly cautious, but it doesn't hurt to have a way to terminate // independent of the kubelet's signal. case <-time.After(resourceCreationWaitTime): err = errors.Errorf("waited too long for request to timeout or %s %s to be created", resourceType, name) // If the resource becomes available while we're watching for it, we still need to error on this request. // When we pull the resource from the cache after waiting, we won't run the cleanup funcs. // However, we don't know how long we've been making the kubelet wait for the request, and the request could time outt // after we stop paying attention. This would cause CRI-O to attempt to send back a resource that the kubelet // will not receive, causing a resource leak. case <-watcher: err = errors.Errorf("the requested %s %s is now ready and will be provided to the kubelet on next retry", resourceType, name) }
return "", errors.Wrap(err, "Kubelet may be retrying requests that are timing out in CRI-O due to system load")}
复制代码


我们再来看看 resourceStore 的实现。


type ResourceStore struct {  resources map[string]*Resource  timeout   time.Duration  closeChan chan struct{}  closed    bool  sync.Mutex}
type Resource struct { resource IdentifiableCreatable cleaner *ResourceCleaner watchers []chan struct{} stale bool name string}
type IdentifiableCreatable interface { ID() string SetCreated()}
func (rc *ResourceStore) cleanupStaleResources() {...}
func (rc *ResourceStore) Get(name string) string {...}
func (rc *ResourceStore) Put(name string, resource IdentifiableCreatable, cleaner *ResourceCleaner) error {...}
func (rc *ResourceStore) WatcherForResource(name string) chan struct{} {...}
复制代码


resources 用来存储所有的 resource,在 cro-o 中包括 container 和 pod 两种类型。timeout 用来设置 resource 过期时间,用一个 goroutine 上跑 cleanupStaleResources 来实现的。然后就是 Put -> WatcherForResource -> Get 这三个函数。WatcherForResource 可能发生在 Put 前面。 Resource 要满足 IdentifiableCreatable 接口。

2. wipe

wipe 是指在 node 意外 reboot 的时候,crio 的有一些清尾的工作没有来的急完成,一般来说就是删除 container 和 image。如果不 wipe 掉这些文件夹文件, 再次启动的时候就会出问题。所以我们需要通过 wipe 操作来避免这些问题。


我们先来看看 wipe 相关配置。


# Location for CRI-O to lay down the temporary version file.# It is used to check if crio wipe should wipe containers, which should# always happen on a node rebootversion_file = "/var/run/crio/version"
# Location for CRI-O to lay down the persistent version file.# It is used to check if crio wipe should wipe images, which should# only happen when CRI-O has been upgradedversion_file_persist = "/var/lib/crio/version"
# Location for CRI-O to lay down the clean shutdown file.# It is used to check whether crio had time to sync before shutting down.# If not found, crio wipe will clear the storage directory.clean_shutdown_file = "/var/lib/crio/clean.shutdown"
复制代码


首先我们有一个完整的机制来判断 node 是否重启过,crio 是不是升级了新的版本这两个事情。


  1. 如何确定一个 node 是不是 rebooted?在 cri-o 中,这是通过 version_file 文件来完成的。首先启动的时候我们会在特定目录下(默认是/var/run/crio/version)生成 version_file,内容包含了版本信息。因为/var/run 使用的是 tmpfs,在 reboot 后,这个文件夹下由 crio 写入的内容就会被清空如果启动后没有这个文件,则肯定是 rebooted 了。

  2. 如何判断系统是不是升级了新版本?这是通过 version_file_persist 文件来完成的。首先我们启动的时候会在特定目录下(默认是/var/lib/crio/version)下生成 version_file_persist, 文件内容包含了版本信息。这个文件夹下 persist 到 disk 里面的,因此如果 crio 在启动后发现内部的内容不一致,那么肯定是这次启动的 crio 的版本和上次启动的版本不一致了。


如何解决由于意外停机导致有一些清尾工作没有做?这是通过 clean_shutdown_file 配置来完成的,首先启动的时候如果设置有 clean_shutdown_file 路径, 那么会 remove /var/lib/crio/clean.shutdown 文件,并且会在这个目录下(默认是/var/lib/crio/)生成 clean.shutdown.supported 文件。 然后在 shutdown 过程中,如果启动的时候设置有 clean_shutdown_file 路径,则会在/var/lib/crio/下创建/var/lib/crio/clean.shutdown 文件,代表 shutdown 操作完成。


cmd/main.go:244    if config.CleanShutdownFile != "" {      // clear out the shutdown file      if err := os.Remove(config.CleanShutdownFile); err != nil {        // not a fatal error, as it could have been cleaned up        logrus.Error(err)      }
// Write "$CleanShutdownFile".supported to show crio-wipe that // we should be wiping if the CleanShutdownFile wasn't found. // This protects us from wiping after an upgrade from a version that don't support // CleanShutdownFile. f, err := os.Create(config.CleanShutdownSupportedFileName()) if err != nil { logrus.Errorf("Writing clean shutdown supported file: %v", err) } f.Close()
// and sync the changes to disk if err := utils.SyncParent(config.CleanShutdownFile); err != nil { logrus.Errorf("failed to sync parent directory of clean shutdown file: %v", err) } } server/server.go:277// Shutdown attempts to shut down the server's storage cleanlyfunc (s *Server) Shutdown(ctx context.Context) error { ...
if s.config.CleanShutdownFile != "" { // then, we write the CleanShutdownFile // we do this after the sync, to ensure ordering. // Otherwise, we may run into situations where the CleanShutdownFile // is written before storage, causing us to think a corrupted storage // is not so. f, err := os.Create(s.config.CleanShutdownFile) if err != nil { return errors.Wrapf(err, "failed to write file to indicate a clean shutdown") } f.Close()
// finally, attempt to sync the newly created file to disk. // It's still possible we crash after Create but before this Sync, // which will lead us to think storage wasn't synced. // However, that's much less likely than if we don't have a second Sync, // and less risky than if we don't Sync after the Create if err := utils.SyncParent(s.config.CleanShutdownFile); err != nil { return errors.Wrapf(err, "failed to sync clean shutdown file") } }
...}
复制代码


这样意外停机重启 node 后,当我们调用 crio wipe 命令的时候, 如果 clean.shutdown.supported 存在并且 /var/lib/crio/clean.shutdown 文件不存在,我们就知道,上次 shutdown 操作可能没有及时完成(也就是 shutdownWasUnclean),这个时候我们就需要继续执行 shutdown 没有完成的工作。如果 version_file 版本有改动,那么我们知道 crio 版本变了,以前的 container 和 image 可能会和新版本不兼容了,这个时候我们就会彻底的删除历史遗留的文件等信息(handleCleanShutdown)。


如果 internal-wipe config 设为 true 并且不是强制(force)wipe 的时候,这个时候会在重启 node 后,再次启动 crio 的时候,判断是不是需要清理。具体逻辑在 wipeIfAppropriate 函数中。


如果是 force wipe 或者不是 internal-wipe,那么这个时候我们会删掉 container 和 images。


在 wipeIfAppropriate 内,我们只有在 version_file_persist 内的 version 有改变的时候,才会删除 images。如果要删删除 images,则必然会先删除掉 container。如果是 node reboot 了, 我们只会删除所有的 container。


server/server.go:495func (s *Server) wipeIfAppropriate(ctx context.Context) {  if !s.config.InternalWipe {    return  }  // First, check if the node was rebooted.  // We know this happened because the VersionFile (which lives in a tmpfs)  // will not be there.  shouldWipeContainers, err := version.ShouldCrioWipe(s.config.VersionFile)  if err != nil {    log.Warnf(ctx, "error encountered when checking whether cri-o should wipe containers: %v", err)  }
// there are two locations we check before wiping: // one in a temporary directory. This is to check whether the node has rebooted. // if so, we should remove containers // another is needed in a persistent directory. This is to check whether we've upgraded // if we've upgraded, we should wipe images shouldWipeImages, err := version.ShouldCrioWipe(s.config.VersionFilePersist) if err != nil { log.Warnf(ctx, "error encountered when checking whether cri-o should wipe images: %v", err) }
shouldWipeContainers = shouldWipeContainers || shouldWipeImages
// First, save the images we should be wiping // We won't remember if we wipe all the containers first var imagesToWipe []string if shouldWipeImages { containers, err := s.ContainerServer.ListContainers() if err != nil { log.Warnf(ctx, "Failed to list containers: %v", err) } for _, c := range containers { imagesToWipe = append(imagesToWipe, c.ImageRef()) } }
wipeResourceCleaner := resourcestore.NewResourceCleaner() if shouldWipeContainers { for _, sb := range s.ContainerServer.ListSandboxes() { sb := sb cleanupFunc := func() error { if err := s.stopPodSandbox(ctx, sb); err != nil { return err } return s.removePodSandbox(ctx, sb) } if err := cleanupFunc(); err != nil { log.Warnf(ctx, "Failed to cleanup pod %s (will retry): %v", sb.ID(), err) wipeResourceCleaner.Add(ctx, "stop and remove pod sandbox", cleanupFunc) } } }
go func() { if err := wipeResourceCleaner.Cleanup(); err != nil { log.Errorf(ctx, "Cleanup during server startup failed: %v", err) } }()
// Note: some of these will fail if some aspect of the pod cleanup failed as well, // but this is best-effort anyway, as the Kubelet will eventually cleanup images when // disk usage gets too high. if shouldWipeImages { for _, img := range imagesToWipe { if err := s.removeImage(ctx, img); err != nil { log.Warnf(ctx, "failed to remove image %s: %v", img, err) } } }}
复制代码


发布于: 2021 年 05 月 12 日阅读数: 36
用户头像

xumc

关注

golang攻城狮 2017.12.15 加入

FreeWheel 码农一枚

评论

发布
暂无评论
cri-o 技术探秘2