写点什么

k8s endpoints controller 分析

作者:良凯尔
  • 2021 年 11 月 21 日
  • 本文字数:9477 字

    阅读完需:约 31 分钟

k8s endpoints controller分析

endpoints controller 简介

endpoints controller 是 kube-controller-manager 组件中众多控制器中的一个,是 endpoints 资源对象的控制器,其通过对 service、pod 2 种资源的监听,当这 2 种资源发生变化时会触发 endpoints controller 对相应的 endpoints 资源进行调谐操作,从而完成 endpoints 对象的新建、更新、删除等操作。

endpoints controller 架构图

endpoints controller 的大致组成和处理流程如下图,endpoints controller 对 pod、service 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 service 对象放入到 queue 中,然后syncService方法为 endpoints controller 调谐 endpoints 对象的核心处理逻辑所在,从 queue 中取出 service 对象,再查询相应的 pod 对象列表,然后对 endpoints 对象做调谐处理。



service 对象简介

Service 是对一组提供相同功能的 Pods 的抽象,并为它们提供一个统一的入口。借助 Service,应用可以方便的实现服务发现与负载均衡,并实现应用的零宕机升级。Service 通过标签来选取服务后端,这些匹配标签的 Pod IP 和端口列表组成 endpoints,由 kube-proxy 负责将服务 IP 负载均衡到这些 endpoints 上。


service 的四种类型如下。


(1)ClusterIP


类型为 ClusterIP 的 service,这个 service 有一个 Cluster IP,其实就一个 VIP,具体实现原理依靠 kubeproxy 组件,通过 iptables 或是 ipvs 实现。该类型的 service 只能在集群内访问。


client 访问 Cluster IP,通过 iptables 或 ipvs 规则转到 Real Server(endpoints),从而达到负载均衡的效果。


Headless Service


特殊的 ClusterIP,通过指定 Cluster IP(spec.clusterIP)的值为 "None" 来创建 Headless Service。


使用场景一:自主选择权,client 自行决定使用哪个 Real Server,可以通过查询 DNS 来获取 Real Server 的信息。


使用场景二:Headless Service 的对应的每一个 Endpoints,即每一个 Pod,都会有对应的 DNS 域名,这样 Pod 之间就可以通过域名互相访问(该用法常用于 statefulset)。


(2)NodePort


在 ClusterIP 基础上为 Service 在每台机器上绑定一个端口,这样就可以通过<NodeIP>:NodePort来访问该服务。在集群内,NodePort 服务仍然像之前的 ClusterIP 服务一样访问。


(3)LoadBalancer


在 NodePort 的基础上,借助 cloud provider 创建一个外部的负载均衡器,并将请求转发到 <NodeIP>:NodePort


(4)ExternalName


将服务通过 DNS CNAME 记录方式转发到指定的域名。


apiVersion: v1kind: Servicemetadata:  name: baidu-service  namespace: testspec:  type: ExternalName  externalName: www.baidu.com
复制代码

endpoints 对象简介

endpoints 中指定了需要连接的服务 IP 和端口,可以认为 endpoints 定义了 service 的 backend 后端。当访问 service 时,实际上是会将请求负载均衡到 endpoints 定义的服务 IP 与端口上面去。


另外,endpoints 对象与同名称的 service 对象相关联。


endpoints controller 分析将分为两大块进行,分别是:


(1)endpoints controller 初始化与启动分析;


(2)endpoints controller 处理逻辑分析。

1.endpoints controller 初始化与启动分析

基于 tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4


直接看到 startEndpointController 函数,作为 endpoints controller 启动分析的入口。

startEndpointController

startEndpointController函数中启动了一个 goroutine,先是调用了 endpointcontroller 的NewEndpointController方法初始化 endpoints controller,接着调用Run方法启动 endpoints controller。


// cmd/kube-controller-manager/app/core.gofunc startEndpointController(ctx ControllerContext) (http.Handler, bool, error) {  go endpointcontroller.NewEndpointController(    ctx.InformerFactory.Core().V1().Pods(),    ctx.InformerFactory.Core().V1().Services(),    ctx.InformerFactory.Core().V1().Endpoints(),    ctx.ClientBuilder.ClientOrDie("endpoint-controller"),    ctx.ComponentConfig.EndpointController.EndpointUpdatesBatchPeriod.Duration,  ).Run(int(ctx.ComponentConfig.EndpointController.ConcurrentEndpointSyncs), ctx.Stop)  return nil, true, nil}
复制代码

1.1 NewEndpointController

先来看到 endpoints controller 的初始化方法NewEndpointController


NewEndpointController函数代码中可以看到,endpoints controller 注册了三个 informer,分别是 podInformer、serviceInformer 与 endpointsInformer,以及注册了 service 与 pod 对象的 EventHandler,也即对这 2 个对象的 event 进行监听,把 event 放入事件队列,由 endpoints controller 的核心处理方法做做处理。


// pkg/controller/endpoint/endpoints_controller.gofunc NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,  endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *EndpointController {  broadcaster := record.NewBroadcaster()  broadcaster.StartLogging(klog.Infof)  broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})  recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil { ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.CoreV1().RESTClient().GetRateLimiter()) } e := &EndpointController{ client: client, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), workerLoopPeriod: time.Second, }
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: e.onServiceUpdate, UpdateFunc: func(old, cur interface{}) { e.onServiceUpdate(cur) }, DeleteFunc: e.onServiceDelete, }) e.serviceLister = serviceInformer.Lister() e.servicesSynced = serviceInformer.Informer().HasSynced
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, DeleteFunc: e.deletePod, }) e.podLister = podInformer.Lister() e.podsSynced = podInformer.Informer().HasSynced
e.endpointsLister = endpointsInformer.Lister() e.endpointsSynced = endpointsInformer.Informer().HasSynced
e.triggerTimeTracker = endpointutil.NewTriggerTimeTracker() e.eventBroadcaster = broadcaster e.eventRecorder = recorder
e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
e.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
return e}
复制代码

1.2 Run

主要看到 for 循环处,根据 workers 的值(来源于 kcm 启动参数concurrent-endpoint-syncs配置),启动相应数量的 goroutine,跑e.worker方法。


// pkg/controller/endpoint/endpoints_controller.gofunc (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {  defer utilruntime.HandleCrash()  defer e.queue.ShutDown()
klog.Infof("Starting endpoint controller") defer klog.Infof("Shutting down endpoint controller")
if !cache.WaitForNamedCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) { return }
for i := 0; i < workers; i++ { go wait.Until(e.worker, e.workerLoopPeriod, stopCh) }
go func() { defer utilruntime.HandleCrash() e.checkLeftoverEndpoints() }()
<-stopCh}
复制代码

1.2.1 worker

直接看到processNextWorkItem方法,从队列 queue 中取出一个 key,然后调用e.syncService方法对该 key 做处理,e.syncService方法也即 endpoints controller 的核心处理方法,后面会做详细分析。


// pkg/controller/endpoint/endpoints_controller.gofunc (e *EndpointController) worker() {  for e.processNextWorkItem() {  }}
func (e *EndpointController) processNextWorkItem() bool { eKey, quit := e.queue.Get() if quit { return false } defer e.queue.Done(eKey)
err := e.syncService(eKey.(string)) e.handleErr(err, eKey)
return true}
复制代码

2.endpoints controller 核心处理分析

基于 tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4


直接看到 syncService 方法,作为 endpoints controller 核心处理分析的入口。

2.1 核心处理逻辑-syncService

主要逻辑:


(1)获取 service 对象,当查询不到该 service 对象时,删除同名 endpoints 对象;


(2)当 service 对象的.Spec.Selector为空时,不存在对应的 endpoints 对象,直接返回;


(3)根据 service 对象的.Spec.Selector,查询与 service 对象匹配的 pod 列表;


(4)查询 service 的 annotations 中是否配置了TolerateUnreadyEndpoints,代表允许为 unready 的 pod 也创建 endpoints,该配置将会影响下面对 endpoints 对象的 subsets 信息的计算;


(5)遍历 service 对象匹配的 pod 列表,找出合适的 pod,计算 endpoints 的 subsets 信息;


遍历 pod 列表时如何计算出 subsets?


(5.1)过滤掉 pod ip 为空的 pod;


(5.2)当TolerateUnreadyEndpoints配置为 false 且 pod 的 deletetimestamp 不为空时,过滤掉该 pod;


(5.3)当 service 没有 ports 配置,且 ClusterIP 为 None 时,为 headless service,调用 addEndpointSubset 函数计算 subsets,计算出来的 subsets 中的 ports 信息为空;


(5.4)当 service 有 ports 配置,遍历 ports 配置,循环调用 addEndpointSubset 函数计算 subsets(addEndpointSubset 函数在后面会展开分析)。


(6)获取 endpoints 对象;


(7)判断现存 endpoints 对象与调谐中重新计算出来的的 endpoints 对象的 subsets 与 labels 是否一致,一致则无需更新,直接返回;


(8)当 endpoints 对象不存在时新建 endpoints 对象,当 endpoints 对象存在时更新 endpoints 对象。


func (e *EndpointController) syncService(key string) error {  startTime := time.Now()  defer func() {    klog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Since(startTime))  }()
namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } service, err := e.serviceLister.Services(namespace).Get(name) if err != nil { if !errors.IsNotFound(err) { return err }
// Delete the corresponding endpoint, as the service has been deleted. // TODO: Please note that this will delete an endpoint when a // service is deleted. However, if we're down at the time when // the service is deleted, we will miss that deletion, so this // doesn't completely solve the problem. See #6877. err = e.client.CoreV1().Endpoints(namespace).Delete(name, nil) if err != nil && !errors.IsNotFound(err) { return err } e.triggerTimeTracker.DeleteService(namespace, name) return nil }
if service.Spec.Selector == nil { // services without a selector receive no endpoints from this controller; // these services will receive the endpoints that are created out-of-band via the REST API. return nil }
klog.V(5).Infof("About to update endpoints for service %q", key) pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) if err != nil { // Since we're getting stuff from a local cache, it is // basically impossible to get this error. return err }
// If the user specified the older (deprecated) annotation, we have to respect it. tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses if v, ok := service.Annotations[TolerateUnreadyEndpointsAnnotation]; ok { b, err := strconv.ParseBool(v) if err == nil { tolerateUnreadyEndpoints = b } else { utilruntime.HandleError(fmt.Errorf("Failed to parse annotation %v: %v", TolerateUnreadyEndpointsAnnotation, err)) } }
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the // state of the trigger time tracker gets updated even if the sync turns out // to be no-op and we don't update the endpoints object. endpointsLastChangeTriggerTime := e.triggerTimeTracker. ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
subsets := []v1.EndpointSubset{} var totalReadyEps int var totalNotReadyEps int
for _, pod := range pods { if len(pod.Status.PodIP) == 0 { klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) continue } if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil { klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name) continue }
ep, err := podToEndpointAddressForService(service, pod) if err != nil { // this will happen, if the cluster runs with some nodes configured as dual stack and some as not // such as the case of an upgrade.. klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err) continue }
epa := *ep if endpointutil.ShouldSetHostname(pod, service) { epa.Hostname = pod.Spec.Hostname }
// Allow headless service not to have ports. if len(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints) // No need to repack subsets for headless service without ports. } } else { for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i]
portName := servicePort.Name portProto := servicePort.Protocol portNum, err := podutil.FindPort(pod, servicePort) if err != nil { klog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) continue }
var readyEps, notReadyEps int epp := &v1.EndpointPort{Name: portName, Port: int32(portNum), Protocol: portProto} subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints) totalReadyEps = totalReadyEps + readyEps totalNotReadyEps = totalNotReadyEps + notReadyEps } } } subsets = endpoints.RepackSubsets(subsets)
// See if there's actually an update here. currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name) if err != nil { if errors.IsNotFound(err) { currentEndpoints = &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: service.Name, Labels: service.Labels, }, } } else { return err } }
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
if !createEndpoints && apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(currentEndpoints.Labels, service.Labels) { klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } newEndpoints := currentEndpoints.DeepCopy() newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels if newEndpoints.Annotations == nil { newEndpoints.Annotations = make(map[string]string) }
if !endpointsLastChangeTriggerTime.IsZero() { newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] = endpointsLastChangeTriggerTime.Format(time.RFC3339Nano) } else { // No new trigger time, clear the annotation. delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime) }
if newEndpoints.Labels == nil { newEndpoints.Labels = make(map[string]string) }
if !helper.IsServiceIPSet(service) { newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "") } else { newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService) }
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps) if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(newEndpoints) } if err != nil { if createEndpoints && errors.IsForbidden(err) { // A request is forbidden primarily for two reasons: // 1. namespace is terminating, endpoint creation is not allowed by default. // 2. policy is misconfigured, in which case no service would function anywhere. // Given the frequency of 1, we log at a lower level. klog.V(5).Infof("Forbidden from creating endpoints: %v", err)
// If the namespace is terminating, creates will continue to fail. Simply drop the item. if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { return nil } }
if createEndpoints { e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err) } else { e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err) }
return err } return nil}
复制代码

2.1.1 addEndpointSubset

下面来展开分析下计算 service 对象 subsets 信息的函数 addEndpointSubset,计算出的 subsets 包括了 Address(ReadyAddresses)与 NotReadyAddresses。


主要逻辑:


(1)当配置了 tolerateUnreadyEndpoints 且为 true 时,或 pod 处于 ready 状态时,将计算进 subsets 中的 Addresses;


(2)当配置了 tolerateUnreadyEndpoints 且为 false 或没有配置时,或 pod 不处于 ready 状态时,调用 shouldPodBeInEndpoints 函数,返回 true 时将计算进 subsets 中的 NotReadyAddresses。


(2.1)当 pod.Spec.RestartPolicy 为 Never,Pod Status.Phase 不为 Failed/Successed 时,将计算进 subsets 中的 NotReadyAddresses;


(2.2)当 pod.Spec.RestartPolicy 为 OnFailure, Pod Status.Phase 不为 Successed 时,Pod 对应的 EndpointAddress 也会被加入到 NotReadyAddresses 中;


(2.3)其他情况下,将计算进 subsets 中的 NotReadyAddresses。


// pkg/controller/endpoint/endpoints_controller.gofunc addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,  epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {  var readyEps int  var notReadyEps int  ports := []v1.EndpointPort{}  if epp != nil {    ports = append(ports, *epp)  }  if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {    subsets = append(subsets, v1.EndpointSubset{      Addresses: []v1.EndpointAddress{epa},      Ports:     ports,    })    readyEps++  } else if shouldPodBeInEndpoints(pod) {    klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)    subsets = append(subsets, v1.EndpointSubset{      NotReadyAddresses: []v1.EndpointAddress{epa},      Ports:             ports,    })    notReadyEps++  }  return subsets, readyEps, notReadyEps}
func shouldPodBeInEndpoints(pod *v1.Pod) bool { switch pod.Spec.RestartPolicy { case v1.RestartPolicyNever: return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded case v1.RestartPolicyOnFailure: return pod.Status.Phase != v1.PodSucceeded default: return true }}
复制代码


IsPodReady


当在 pod 的.status.conditions 中,type 为 Ready 的 status 属性值为 True 时,IsPodReady 返回 true。


// pkg/api/v1/pod/util.go// IsPodReady returns true if a pod is ready; false otherwise.func IsPodReady(pod *v1.Pod) bool {  return IsPodReadyConditionTrue(pod.Status)}
// GetPodReadyCondition extracts the pod ready condition from the given status and returns that.// Returns nil if the condition is not present.func GetPodReadyCondition(status v1.PodStatus) *v1.PodCondition { _, condition := GetPodCondition(&status, v1.PodReady) return condition}
复制代码

总结

endpoints controller 架构图

endpoints controller 的大致组成和处理流程如下图,endpoints controller 对 pod、service 对象注册了 event handler,当有事件时,会 watch 到然后将对应的 service 对象放入到 queue 中,然后syncService方法为 endpoints controller 调谐 endpoints 对象的核心处理逻辑所在,从 queue 中取出 service 对象,再查询相应的 pod 对象列表,然后对 endpoints 对象做调谐处理。



endpoints controller 核心处理逻辑

endpoints controller 的核心处理逻辑是获取 service 对象,当 service 不存在时删除同名 endpoints 对象,当存在时,根据 service 对象所关联的 pod 列表,计算出 endpoints 对象的最新 subsets 信息,然后新建或更新 endpoints 对象。



发布于: 1 小时前阅读数: 4
用户头像

良凯尔

关注

热爱的力量 2020.01.10 加入

kubernetes开发者

评论

发布
暂无评论
k8s endpoints controller分析