containerManager的作用

  • containerManager 管理容器的各种资源,比如 CGroups、QoS、cpuset、device 等

  • 内置了很多资源管理器,总结起来就是其他manager的管家

    D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager.go

// Manages the containers running on a machine.
type ContainerManager interface {
	// Runs the container manager's housekeeping.
	// - Ensures that the Docker daemon is in a container.
	// - Creates the system container where all non-containerized processes run.
	Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error

	// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
	// These cgroups include the system and Kubernetes services.
	SystemCgroupsLimit() v1.ResourceList

	// GetNodeConfig 返回节点配置
	GetNodeConfig() NodeConfig

	// Status 返回内部错误信息
	Status() Status

	// NewPodContainerManager 工厂函数返回podContainerManager对象
	NewPodContainerManager() PodContainerManager

	// GetMountedSubsystems 返回节点上挂载的cgroup subsystems 
	GetMountedSubsystems() *CgroupSubsystems

	// GetQOSContainersInfo 返回顶级qos 容器名
	GetQOSContainersInfo() QOSContainersInfo

	// GetNodeAllocatableReservation 返回节点预留的资源
	GetNodeAllocatableReservation() v1.ResourceList

	// GetCapacity 返回节点上可用的资源
	GetCapacity() v1.ResourceList

	// GetDevicePluginResourceCapacity  返回节点上插件资源总量,可用总量和不活跃的插件资源
	GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string)

	// UpdateQOSCgroups 确保顶级qos容器在期望的状态中
	UpdateQOSCgroups() error

	// GetResources returns RunContainerOptions with devices, mounts, and env fields populated for
	// extended resources required by container.
	GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)

	// UpdatePluginResources calls Allocate of device plugin handler for potential
	// requests for device plugin resources, and returns an error if fails.
	// Otherwise, it updates allocatableResource in nodeInfo if necessary,
	// to make sure it is at least equal to the pod's requested capacity for
	// any registered device plugin resource
	UpdatePluginResources(*schedulerframework.NodeInfo, *lifecycle.PodAdmitAttributes) error

	InternalContainerLifecycle() InternalContainerLifecycle

	// GetPodCgroupRoot 返回 cgroup的root
	GetPodCgroupRoot() string

	// GetPluginRegistrationHandler 插件注册
	GetPluginRegistrationHandler() cache.PluginHandler

	// ShouldResetExtendedResourceCapacity 决定扩展资源是否清理
	ShouldResetExtendedResourceCapacity() bool

	// GetAllocateResourcesPodAdmitHandler pod准入控制器
	GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler

	// GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement.
	GetNodeAllocatableAbsolute() v1.ResourceList

	// Implements the podresources Provider API for CPUs, Memory and Devices
	podresources.CPUsProvider
	podresources.DevicesProvider
	podresources.MemoryProvider
}

具体实现

go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager_linux.go

type containerManagerImpl struct {
	sync.RWMutex
	cadvisorInterface cadvisor.Interface
	mountUtil         mount.Interface
	NodeConfig
	status Status
	// External containers being managed.
	systemContainers []*systemContainer
	// 周期运行的任务
	periodicTasks []func()
	// 所有的挂载 cgroup subsystem
	subsystems *CgroupSubsystems
	// 节点信息
	nodeInfo   *v1.Node
	// cgroupManager
	cgroupManager CgroupManager
	// Capacity of this node.
	capacity v1.ResourceList
	// Capacity of this node, including internal resources.
	internalCapacity v1.ResourceList
	// Absolute cgroupfs path to a cgroup that Kubelet needs to place all pods under.
	// This path include a top level container for enforcing Node Allocatable.
	cgroupRoot CgroupName
	// Event recorder interface.
	recorder record.EventRecorder
	// Interface for QoS cgroup management
	qosContainerManager QOSContainerManager
	// Interface for exporting and allocating devices reported by device plugins.
	deviceManager devicemanager.Manager
	// cpu管理器
	cpuManager cpumanager.Manager
	// 内存管理器 
	memoryManager memorymanager.Manager
	//  numa 拓扑管理器
	topologyManager topologymanager.Manager
}

NewContainerManager 初始化

  • 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager_linux.go
  • 根据cgroup v1 v2版本获取cgroups subsystems
	subsystems, err := GetCgroupSubsystems()
	if err != nil {
		return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
	}
  • swap开启时,kubelet不能运行,检查一下
	if failSwapOn {
		// Check whether swap is enabled. The Kubelet does not support running with swap enabled.
		swapFile := "/proc/swaps"
		swapData, err := ioutil.ReadFile(swapFile)
		if err != nil {
			if os.IsNotExist(err) {
				klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFile)
			} else {
				return nil, err
			}
		} else {
			swapData = bytes.TrimSpace(swapData) // extra trailing \n
			swapLines := strings.Split(string(swapData), "\n")

			// If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should
			// error out unless --fail-swap-on is set to false.
			if len(swapLines) > 1 {
				return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
			}
		}
	}

  • 通过cadvisorInterface提供的获取节点信息的方法获取machineInfo,从中获取资源的容量信息,遍历后取值
	var internalCapacity = v1.ResourceList{}
	// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
	// machine info is computed and cached once as part of cAdvisor object creation.
	// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
	machineInfo, err := cadvisorInterface.MachineInfo()
	if err != nil {
		return nil, err
	}
	capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
	for k, v := range capacity {
		internalCapacity[k] = v
	}
	pidlimits, err := pidlimit.Stats()
	if err == nil && pidlimits != nil && pidlimits.MaxPID != nil {
		internalCapacity[pidlimit.PIDs] = *resource.NewQuantity(
			int64(*pidlimits.MaxPID),
			resource.DecimalSI)
	}

  • 初始化cgroupManager
	// Turn CgroupRoot from a string (in cgroupfs path format) to internal CgroupName
	cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot)
	cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
	// Check if Cgroup-root actually exists on the node
	if nodeConfig.CgroupsPerQOS {
		// this does default to / when enabled, but this tests against regressions.
		if nodeConfig.CgroupRoot == "" {
			return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
		}

		// we need to check that the cgroup root actually exists for each subsystem
		// of note, we always use the cgroupfs driver when performing this check since
		// the input is provided in that format.
		// this is important because we do not want any name conversion to occur.
		if !cgroupManager.Exists(cgroupRoot) {
			return nil, fmt.Errorf("invalid configuration: cgroup-root %q doesn't exist", cgroupRoot)
		}
		klog.InfoS("Container manager verified user specified cgroup-root exists", "cgroupRoot", cgroupRoot)
		// Include the top level cgroup for enforcing node allocatable into cgroup-root.
		// This way, all sub modules can avoid having to understand the concept of node allocatable.
		cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
	}
	klog.InfoS("Creating Container Manager object based on Node Config", "nodeConfig", nodeConfig)

  • 初始化qosManager
	qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager)
	if err != nil {
		return nil, err
	}

  • 以众多字段初始化containerManagerImpl
	cm := &containerManagerImpl{
		cadvisorInterface:   cadvisorInterface,
		mountUtil:           mountUtil,
		NodeConfig:          nodeConfig,
		subsystems:          subsystems,
		cgroupManager:       cgroupManager,
		capacity:            capacity,
		internalCapacity:    internalCapacity,
		cgroupRoot:          cgroupRoot,
		recorder:            recorder,
		qosContainerManager: qosContainerManager,
	}
  • 初始化topologymanager
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
		cm.topologyManager, err = topologymanager.NewManager(
			machineInfo.Topology,
			nodeConfig.ExperimentalTopologyManagerPolicy,
			nodeConfig.ExperimentalTopologyManagerScope,
		)

		if err != nil {
			return nil, err
		}

	} else {
		cm.topologyManager = topologymanager.NewFakeManager()
	}

  • 初始化deviceManager
	klog.InfoS("Creating device plugin manager", "devicePluginEnabled", devicePluginEnabled)
	if devicePluginEnabled {
		cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
		cm.topologyManager.AddHintProvider(cm.deviceManager)
	} else {
		cm.deviceManager, err = devicemanager.NewManagerStub()
	}
	if err != nil {
		return nil, err
	}

  • 初始化cpuManager
	// Initialize CPU manager
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
		cm.cpuManager, err = cpumanager.NewManager(
			nodeConfig.ExperimentalCPUManagerPolicy,
			nodeConfig.ExperimentalCPUManagerPolicyOptions,
			nodeConfig.ExperimentalCPUManagerReconcilePeriod,
			machineInfo,
			nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
			cm.GetNodeAllocatableReservation(),
			nodeConfig.KubeletRootDir,
			cm.topologyManager,
		)
		if err != nil {
			klog.ErrorS(err, "Failed to initialize cpu manager")
			return nil, err
		}
		cm.topologyManager.AddHintProvider(cm.cpuManager)
	}

  • 初始化memoryManager
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
		cm.memoryManager, err = memorymanager.NewManager(
			nodeConfig.ExperimentalMemoryManagerPolicy,
			machineInfo,
			cm.GetNodeAllocatableReservation(),
			nodeConfig.ExperimentalMemoryManagerReservedMemory,
			nodeConfig.KubeletRootDir,
			cm.topologyManager,
		)
		if err != nil {
			klog.ErrorS(err, "Failed to initialize memory manager")
			return nil, err
		}
		cm.topologyManager.AddHintProvider(cm.memoryManager)
	}

总结可以看到

  • containerManager的初始化过程就是初始化一众其他资源管理器

kubelet new中调用的containerManager初始化

  • 将ContainerManager赋值给kubeDeps的相关字段
		kubeDeps.ContainerManager, err = cm.NewContainerManager(
			kubeDeps.Mounter,
			kubeDeps.CAdvisorInterface,
			cm.NodeConfig{
				RuntimeCgroupsName:    s.RuntimeCgroups,
				SystemCgroupsName:     s.SystemCgroups,
				KubeletCgroupsName:    s.KubeletCgroups,
				ContainerRuntime:      s.ContainerRuntime,
				CgroupsPerQOS:         s.CgroupsPerQOS,
				CgroupRoot:            s.CgroupRoot,
				CgroupDriver:          s.CgroupDriver,
				KubeletRootDir:        s.RootDirectory,
				ProtectKernelDefaults: s.ProtectKernelDefaults,
				NodeAllocatableConfig: cm.NodeAllocatableConfig{
					KubeReservedCgroupName:   s.KubeReservedCgroup,
					SystemReservedCgroupName: s.SystemReservedCgroup,
					EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
					KubeReserved:             kubeReserved,
					SystemReserved:           systemReserved,
					ReservedSystemCPUs:       reservedSystemCPUs,
					HardEvictionThresholds:   hardEvictionThresholds,
				},
				QOSReserved:                             *experimentalQOSReserved,
				ExperimentalCPUManagerPolicy:            s.CPUManagerPolicy,
				ExperimentalCPUManagerPolicyOptions:     cpuManagerPolicyOptions,
				ExperimentalCPUManagerReconcilePeriod:   s.CPUManagerReconcilePeriod.Duration,
				ExperimentalMemoryManagerPolicy:         s.MemoryManagerPolicy,
				ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
				ExperimentalPodPidsLimit:                s.PodPidsLimit,
				EnforceCPULimits:                        s.CPUCFSQuota,
				CPUCFSQuotaPeriod:                       s.CPUCFSQuotaPeriod.Duration,
				ExperimentalTopologyManagerPolicy:       s.TopologyManagerPolicy,
				ExperimentalTopologyManagerScope:        s.TopologyManagerScope,
			},
			s.FailSwapOn,
			devicePluginEnabled,
			kubeDeps.Recorder)

		if err != nil {
			return err
		}

kubeDeps.ContainerManager最终用到了kubelet的containerManager

klet := &Kubelet{
    containerManager:                        kubeDeps.ContainerManager,
}

追踪kubelet containerManager的启动

  • 注释说明containerManager要在cAdvisor后启动,因为需要cAdvisor提供的节点文件系统容量信息
	// containerManager must start after cAdvisor because it needs filesystem capacity information
	if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
		// Fail kubelet and rely on the babysitter to retry starting kubelet.
		klog.ErrorS(err, "Failed to start ContainerManager")
		os.Exit(1)
	}

底层是 containerManagerImpl的start

func (cm *containerManagerImpl) Start(){}
  • 启动cpuManager
	// Initialize CPU manager
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
		containerMap, err := buildContainerMapFromRuntime(runtimeService)
		if err != nil {
			return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
		}
		err = cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
		if err != nil {
			return fmt.Errorf("start cpu manager error: %v", err)
		}
	}
  • 启动memoryManager
	// Initialize memory manager
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
		containerMap, err := buildContainerMapFromRuntime(runtimeService)
		if err != nil {
			return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
		}
		err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
		if err != nil {
			return fmt.Errorf("start memory manager error: %v", err)
		}
	}

限制kubelet本地临时存储的容量

	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) {
		rootfs, err := cm.cadvisorInterface.RootFsInfo()
		if err != nil {
			return fmt.Errorf("failed to get rootfs info: %v", err)
		}
		for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
			cm.capacity[rName] = rCap
		}
	}
  • 源码中的流程为如果开启了LocalStorageCapacityIsolation特性就通过 cadvisor接口获取 rootfsinfo
    • 然后将类型为ephemeral-storage的存储的容量设置为rootfs获取到的
// EphemeralStorageCapacityFromFsInfo returns the capacity of the ephemeral storage from the FsInfo.
func EphemeralStorageCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList {
	c := v1.ResourceList{
		v1.ResourceEphemeralStorage: *resource.NewQuantity(
			int64(info.Capacity),
			resource.BinarySI),
	}
	return c
}

为什么要限制本地临时存储呢

  • 早期kubernetes版本并没有限制container的rootfs的容量
  • 由于默认容器使用的log存储空间是在 /var/lib/kubelet/ 下
  • rootfs在/var/lib/docker下,而这两个目录默认就在宿主机node的根分区
  • 如果应用恶意攻击,可以通过在容器内大量dd从而迅速造成宿主机node根分区文件系统满
  • 我们知道,当linux根分区使用达到100%的时候,通常会很危险。

local ephemeral storage 类型的资源

  • kubernetes在1.8版本引入了一种新的resource:local ephemeral storage(临时存储),用来管理本地临时存储
  • 对应特性 LocalStorageCapacityIsolation。从1.10开始该特性转为beta状态,默认开启。
  • 临时存储,如 emptyDir volumes, container logs, image layers and container writable layers
  • 默认它们使用的是 /var/lib/kubelet ,通过限制临时存储容量,也就可以保护node的root分区了。

回到containerManagerImpl的start中

  • 校验node 可用资源的配置
	// Ensure that node allocatable configuration is valid.
	if err := cm.validateNodeAllocatable(); err != nil {
		return err
	}

做node的一些初始化工作

	// Setup the node
	if err := cm.setupNode(activePods); err != nil {
		return err
	}

  • validateSystemRequirements会校验 cpu memory cgroup是否已经挂载上了
	f, err := validateSystemRequirements(cm.mountUtil)
	if err != nil {
		return err
	}
  • 创建顶级qos cgroup
	// Setup top level qos containers only if CgroupsPerQOS flag is specified as true
	if cm.NodeConfig.CgroupsPerQOS {
		if err := cm.createNodeAllocatableCgroups(); err != nil {
			return err
		}
		err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods)
		if err != nil {
			return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
		}
	}

  • 如果容器运行时是 docker,那么检查 docker 的cgroup
	if cm.ContainerRuntime == "docker" {
		// With the docker-CRI integration, dockershim manages the cgroups
		// and oom score for the docker processes.
		// Check the cgroup for docker periodically, so kubelet can serve stats for the docker runtime.
		// TODO(KEP#866): remove special processing for CRI "docker" enablement
		cm.periodicTasks = append(cm.periodicTasks, func() {
			klog.V(4).InfoS("Adding periodic tasks for docker CRI integration")
			cont, err := getContainerNameForProcess(dockerProcessName, dockerPidFile)
			if err != nil {
				klog.ErrorS(err, "Failed to get container name for process")
				return
			}
			klog.V(2).InfoS("Discovered runtime cgroup name", "cgroupName", cont)
			cm.Lock()
			defer cm.Unlock()
			cm.RuntimeCgroupsName = cont
		})
	}