介绍

  • PLEG 全称叫 Pod Lifecycle Event Generator,即 Pod 生命周期事件生成器
  • pleg通过定时的执行relist方法调用容器运行时获取pod信息
  • 遍历pod中的容器对比缓存中的状态,生成状态变化事件交给syncloop事件循环处理

整体工作流程
image-1681823855847

函数调用
image-1681903403682

什么是 PLEG

  • PLEG 全称叫 Pod Lifecycle Event Generator,即 Pod 生命周期事件生成器
  • 实际上它只是 Kubelet 中的一个模块
  • 主要职责就是通过每个匹配的 Pod 级别事件来调整容器运行时的状态,并将调整的结果写入缓存,使 Pod 的缓存保持最新状态

为什么需要PLEG

在 Kubernetes 中,每个节点上都运行着一个守护进程 Kubelet 来管理节点上的容器,调整容器的实际状态以匹配 spec 中定义的状态,具体来说,Kubelet 需要对两个地方的更改做出及时的回应

  1. Pod spec 中定义的状态
  2. 容器运行时的状态

对于POD: Kubelet 会从多个数据来源 watch Pod spec 中的变化
对于容器: Kubelet 会定期(例如,10s)轮询容器运行时,以获取所有容器的最新状态。

引发问题:

  • 随着 Pod 和容器数量的增加,会产生不可忽略的开销,并且会由于 Kubelet 的并行操作而加剧这种开销(为每个 Pod 分配一个 goruntine,用来获取容器的状态)
  • 轮询带来的周期性大量并发请求会导致较高的 CPU 使用率峰值(即使 Pod 的定义和容器的状态没有发生改变),降低性能。最后容器运行时可能不堪重负,从而降低系统的可靠性,限制 Kubelet 的可扩展性。

PLEG出现

为了降低 Pod 的管理开销,提升 Kubelet 的性能和可扩展性,引入了 PLEG,改进了之前的工作方式:

  • 减少空闲期间的不必要工作(例如 Pod 的定义和容器的状态没有发生更改)。
  • 减少获取容器状态的并发请求数量。

理论知识讲完了,taik is cheap show me your code

PLEG接口解读

D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\pleg\pleg.go
//PodLifecycleEventGenerator 包含用于生成Pod生命周期事件的函数
type PodLifecycleEventGenerator interface {
   Start()
   Watch() chan *PodLifecycleEvent
   Healthy() (bool, error)
}

PLEG接口实际实现

PodLifecycleEventGenerator接口实现: GenericPLEG

type GenericPLEG struct {
	// The period for relisting.
	relistPeriod time.Duration
	// The container runtime.
	runtime kubecontainer.Runtime
	// The channel from which the subscriber listens events.
	eventChannel chan *PodLifecycleEvent
	// The internal cache for pod/container information.
	podRecords podRecords
	// Time of the last relisting.
	relistTime atomic.Value
	// Cache for storing the runtime states required for syncing pods.
	cache kubecontainer.Cache
	// For testability.
	clock clock.Clock
	// Pods that failed to have their status retrieved during a relist. These pods will be
	// retried during the next relisting.
	podsToReinspect map[types.UID]*kubecontainer.Pod
}
pkg/kubelet/pleg/generic.go	

GenericPLEG 初始化

// NewGenericPLEG instantiates a new GenericPLEG object and return it.
func NewGenericPLEG(runtime kubecontainer.Runtime, channelCapacity int,
	relistPeriod time.Duration, cache kubecontainer.Cache, clock clock.Clock) PodLifecycleEventGenerator {
	return &GenericPLEG{
		relistPeriod: relistPeriod,
		runtime:      runtime,
		eventChannel: make(chan *PodLifecycleEvent, channelCapacity),
		podRecords:   make(podRecords),
		cache:        cache,
		clock:        clock,
	}
}

在kubelet启动时会被调用到

pkg/kubelet/kubelet.go

	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
	klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
    // 责任链模式 会不断调用Healthy() (bool, error)
	klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy)
	if _, err := klet.updatePodCIDR(kubeCfg.PodCIDR); err != nil {
		klog.ErrorS(err, "Pod CIDR update failed")
	}

而后追踪可以发现在kubelet的syncLoop中会定时调用runtimeState.runtimeErrors做健康检测

	for {
		if err := kl.runtimeState.runtimeErrors(); err != nil {
			klog.ErrorS(err, "Skipping pod synchronization")
			// exponential backoff
			time.Sleep(duration)
			duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
			continue
		}
		// reset backoff if we have a success
		duration = base

		kl.syncLoopMonitor.Store(kl.clock.Now())
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}

在runtimeState的runtimeErrors中可以看到就是遍历healthChecks的hc,然后执行hc的fn方法

	for _, hc := range s.healthChecks {
		if ok, err := hc.fn(); !ok {
			errs = append(errs, fmt.Errorf("%s is not healthy: %v", hc.name, err))
		}
	}

对应的pleg 的healthy方法如下

  • 获取上一次的relist时间
  • 和当前时间做差值,判断是否大于relistThreshold = 3 * time.Minute (why?每一次relist后会更新relistTime)
  • 如果大于3分钟证明relist耗时太久或者挂了,那么健康检查失败
// Healthy check if PLEG work properly.
// relistThreshold is the maximum interval between two relist.
func (g *GenericPLEG) Healthy() (bool, error) {
	relistTime := g.getRelistTime()
	if relistTime.IsZero() {
		return false, fmt.Errorf("pleg has yet to be successful")
	}
	// Expose as metric so you can alert on `time()-pleg_last_seen_seconds > nn`
	metrics.PLEGLastSeen.Set(float64(relistTime.Unix()))
	elapsed := g.clock.Since(relistTime)
	if elapsed > relistThreshold {
		return false, fmt.Errorf("pleg was last seen active %v ago; threshold is %v", elapsed, relistThreshold)
	}
	return true, nil
}

pleg的启动

在kubelet的启动时调用了pleg的start

	// Start the pod lifecycle event generator.
	kl.pleg.Start()
  • 对应的pleg start方法也很清晰,就是定时执行relist
// Start spawns a goroutine to relist periodically.
func (g *GenericPLEG) Start() {
	go wait.Until(g.relist, g.relistPeriod, wait.NeverStop)
}

relist解析

  • 首先就是耗时的metrics记录

	if lastRelistTime := g.getRelistTime(); !lastRelistTime.IsZero() {
		metrics.PLEGRelistInterval.Observe(metrics.SinceInSeconds(lastRelistTime))
	}

	timestamp := g.clock.Now()
	defer func() {
		metrics.PLEGRelistDuration.Observe(metrics.SinceInSeconds(timestamp))
	}()

	// Get all the pods.
	podList, err := g.runtime.GetPods(true)
	if err != nil {
		klog.ErrorS(err, "GenericPLEG: Unable to retrieve pods")
		return
	}
    
    	g.updateRelistTime(timestamp)
        
        	// update running pod and container count
	updateRunningPodAndContainerMetrics(pods)
  • 从代码中可以看到对应的统计细节
    • 首先构建以容器运行状态为key,value是个数的 map
    • 然后变量pods,再遍历容器
    • 将containerStateCount按状态计数
    • 然后根据每一个pod只有一个运行的sandbox 计算运行的pod数量
    • 最后打点即可
  • updateRunningPodAndContainerMetrics 代码如下
func updateRunningPodAndContainerMetrics(pods []*kubecontainer.Pod) {
	runningSandboxNum := 0
	// intermediate map to store the count of each "container_state"
	containerStateCount := make(map[string]int)

	for _, pod := range pods {
		containers := pod.Containers
		for _, container := range containers {
			// update the corresponding "container_state" in map to set value for the gaugeVec metrics
			containerStateCount[string(container.State)]++
		}

		sandboxes := pod.Sandboxes

		for _, sandbox := range sandboxes {
			if sandbox.State == kubecontainer.ContainerStateRunning {
				runningSandboxNum++
				// every pod should only have one running sandbox
				break
			}
		}
	}
	for key, value := range containerStateCount {
		metrics.RunningContainerCount.WithLabelValues(key).Set(float64(value))
	}

	// Set the number of running pods in the parameter
	metrics.RunningPodCount.Set(float64(runningSandboxNum))
}
  • 遍历更新pod record的current
	g.podRecords.setCurrent(pods)

  • 遍历对比旧的pod和当前pod,生成事件
	for pid := range g.podRecords {
		oldPod := g.podRecords.getOld(pid)
		pod := g.podRecords.getCurrent(pid)
		// Get all containers in the old and the new pod.
		allContainers := getContainersFromPods(oldPod, pod)
  • getContainersFromPods就是遍历pod获取所有的容器
func getContainersFromPods(pods ...*kubecontainer.Pod) []*kubecontainer.Container {
	cidSet := sets.NewString()
	var containers []*kubecontainer.Container
	for _, p := range pods {
		if p == nil {
			continue
		}
		for _, c := range p.Containers {
			cid := string(c.ID.ID)
			if cidSet.Has(cid) {
				continue
			}
			cidSet.Insert(cid)
			containers = append(containers, c)
		}
		// Update sandboxes as containers
		// TODO: keep track of sandboxes explicitly.
		for _, c := range p.Sandboxes {
			cid := string(c.ID.ID)
			if cidSet.Has(cid) {
				continue
			}
			cidSet.Insert(cid)
			containers = append(containers, c)
		}

	}
	return containers
}
  • 然后遍历所有的容器进行event判定
		for _, container := range allContainers {
			events := computeEvents(oldPod, pod, &container.ID)
			for _, e := range events {
				updateEvents(eventsByPodID, e)
			}
		}

computeEvents解析

  • 使用getContainerState 分别获取oldState和newState,进行对比
func computeEvents(oldPod, newPod *kubecontainer.Pod, cid *kubecontainer.ContainerID) []*PodLifecycleEvent {
	var pid types.UID
	if oldPod != nil {
		pid = oldPod.ID
	} else if newPod != nil {
		pid = newPod.ID
	}
	oldState := getContainerState(oldPod, cid)
	newState := getContainerState(newPod, cid)
	return generateEvents(pid, cid.ID, oldState, newState)
}

getContainerState解析

  • 将初始状态记为plegContainerNonExistent non-existent
	// Default to the non-existent state.
	state := plegContainerNonExistent
	if pod == nil {
		return state
	}
	c := pod.FindContainerByID(*cid)
	if c != nil {
		return convertState(c.State)
	}
  • 然后调用convertState 获取状态

convertState 解析

  • 可以看到就是判断容器的state,然后转化为plegContainer state
    • ContainerStateCreated 对应的是 plegContainerUnknown ,意思是容器被创建出来但是没有启动
    • ContainerStateRunning 对应的是plegContainerRunning ,代表容器正常运行中
    • ContainerStateExited 对应的是plegContainerExited ,代表容器运行结束了
    • ContainerStateUnknown 对应的是plegContainerUnknown ,代表容器当前处于 restarting, paused, dead
func convertState(state kubecontainer.State) plegContainerState {
	switch state {
	case kubecontainer.ContainerStateCreated:
		// kubelet doesn't use the "created" state yet, hence convert it to "unknown".
		return plegContainerUnknown
	case kubecontainer.ContainerStateRunning:
		return plegContainerRunning
	case kubecontainer.ContainerStateExited:
		return plegContainerExited
	case kubecontainer.ContainerStateUnknown:
		return plegContainerUnknown
	default:
		panic(fmt.Sprintf("unrecognized container state: %v", state))
	}
}

computeEvents中调用generateEvents对比新老状态

generateEvents解析

  • 如果新老状态相等则返回
	if newState == oldState {
		return nil
	}

  • 然后对newState当前状态进行判定,产生对应的事件

    • plegContainerRunning 产生 ContainerStarted 事件,代表容器启动了
    • plegContainerExited 产生 ContainerDied 事件,代表容器死掉了
    • plegContainerUnknown 产生 ContainerDied 事件,代表容器死掉了
    • plegContainerExited 产生 ContainerChanged 事件,代表容器状态改变了
  • 如果newState是plegContainerNonExistent,则需要对旧状态进行判断

    • oldState是 plegContainerExited 产生ContainerRemoved ,代表exited的容器被删除了
    • oldState是 其他状态, 产生ContainerDied ,代表容器死掉了

遍历产生的event

  • 忽略ContainerChanged类型,其余类型发送给eventChannel

		for i := range events {
			// Filter out events that are not reliable and no other components use yet.
			if events[i].Type == ContainerChanged {
				continue
			}
			select {
			case g.eventChannel <- events[i]:
			default:
				metrics.PLEGDiscardEvents.Inc()
				klog.ErrorS(nil, "Event channel is full, discard this relist() cycle event")
			}

同时在kubelet的syncloop中会启动 pleg的watch

plegCh := kl.pleg.Watch()
  • 同时将获取到的plegCh变更chan 同步给syncLoopIteration处理
		if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
			break
		}
  • 在syncLoopIteration中判断event的类型,忽略其中的ContainerStarted,只是更新一下lastContainerStartedTime
	case e := <-plegCh:
		if e.Type == pleg.ContainerStarted {
			// record the most recent time we observed a container start for this pod.
			// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
			// to make sure we don't miss handling graceful termination for containers we reported as having started.
			kl.lastContainerStartedTime.Add(e.ID, time.Now())
		}
  • 如果event不是 remove ,需要判断在podManager是否存在pod
		if isSyncPodWorthy(e) {
			// PLEG event for a pod; sync it.
			if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {
				klog.V(2).InfoS("SyncLoop (PLEG): event for pod", "pod", klog.KObj(pod), "event", e)
				handler.HandlePodSyncs([]*v1.Pod{pod})
			} else {
				// If the pod no longer exists, ignore the event.
				klog.V(4).InfoS("SyncLoop (PLEG): pod does not exist, ignore irrelevant event", "event", e)
			}
		}
  • 如果存在,调用HandlePodSyncs 回调,调用dispatch分发给worker执行
// HandlePodSyncs is the callback in the syncHandler interface for pods
// that should be dispatched to pod workers for sync.
func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
	}
}
  • 如果不存在,只是打印一条日志即可
  • 如果类型是ContainerDied那就调用 cleanUpContainersInPod清理pod中的容器
		if e.Type == pleg.ContainerDied {
			if containerID, ok := e.Data.(string); ok {
				kl.cleanUpContainersInPod(e.ID, containerID)
			}

cleanUpContainersInPod分析

  • 获取pod中需要删除的containerid,传入p.worker中
func (p *podContainerDeletor) deleteContainersInPod(filterContainerID string, podStatus *kubecontainer.PodStatus, removeAll bool) {
	containersToKeep := p.containersToKeep
	if removeAll {
		containersToKeep = 0
		filterContainerID = ""
	}

	for _, candidate := range getContainersToDeleteInPod(filterContainerID, podStatus, containersToKeep) {
		select {
		case p.worker <- candidate.ID:
		default:
			klog.InfoS("Failed to issue the request to remove container", "containerID", candidate.ID)
		}
	}
}
  • 而worker chan的消费端在newPodContainerDeletor启动的
	go wait.Until(func() {
		for {
			id := <-buffer
			if err := runtime.DeleteContainer(id); err != nil {
				klog.InfoS("DeleteContainer returned error", "containerID", id, "err", err)
			}
		}
	}, 0, wait.NeverStop)

  • 底层调用的就是 runtime.DeleteContainer删除容器

重点总结:

  • PLEG 全称叫 Pod Lifecycle Event Generator,即 Pod 生命周期事件生成器
  • pleg通过定时的执行relist方法调用容器运行时获取pod信息
  • 遍历pod中的容器对比缓存中的状态,生成状态变化事件交给syncloop事件循环处理