containerManager的作用
-
containerManager 管理容器的各种资源,比如 CGroups、QoS、cpuset、device 等
-
内置了很多资源管理器,总结起来就是其他manager的管家
D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager.go
// Manages the containers running on a machine.
type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc, config.SourcesReady, status.PodStatusProvider, internalapi.RuntimeService) error
// SystemCgroupsLimit returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
SystemCgroupsLimit() v1.ResourceList
// GetNodeConfig 返回节点配置
GetNodeConfig() NodeConfig
// Status 返回内部错误信息
Status() Status
// NewPodContainerManager 工厂函数返回podContainerManager对象
NewPodContainerManager() PodContainerManager
// GetMountedSubsystems 返回节点上挂载的cgroup subsystems
GetMountedSubsystems() *CgroupSubsystems
// GetQOSContainersInfo 返回顶级qos 容器名
GetQOSContainersInfo() QOSContainersInfo
// GetNodeAllocatableReservation 返回节点预留的资源
GetNodeAllocatableReservation() v1.ResourceList
// GetCapacity 返回节点上可用的资源
GetCapacity() v1.ResourceList
// GetDevicePluginResourceCapacity 返回节点上插件资源总量,可用总量和不活跃的插件资源
GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string)
// UpdateQOSCgroups 确保顶级qos容器在期望的状态中
UpdateQOSCgroups() error
// GetResources returns RunContainerOptions with devices, mounts, and env fields populated for
// extended resources required by container.
GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error)
// UpdatePluginResources calls Allocate of device plugin handler for potential
// requests for device plugin resources, and returns an error if fails.
// Otherwise, it updates allocatableResource in nodeInfo if necessary,
// to make sure it is at least equal to the pod's requested capacity for
// any registered device plugin resource
UpdatePluginResources(*schedulerframework.NodeInfo, *lifecycle.PodAdmitAttributes) error
InternalContainerLifecycle() InternalContainerLifecycle
// GetPodCgroupRoot 返回 cgroup的root
GetPodCgroupRoot() string
// GetPluginRegistrationHandler 插件注册
GetPluginRegistrationHandler() cache.PluginHandler
// ShouldResetExtendedResourceCapacity 决定扩展资源是否清理
ShouldResetExtendedResourceCapacity() bool
// GetAllocateResourcesPodAdmitHandler pod准入控制器
GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler
// GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement.
GetNodeAllocatableAbsolute() v1.ResourceList
// Implements the podresources Provider API for CPUs, Memory and Devices
podresources.CPUsProvider
podresources.DevicesProvider
podresources.MemoryProvider
}
具体实现
go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager_linux.go
type containerManagerImpl struct {
sync.RWMutex
cadvisorInterface cadvisor.Interface
mountUtil mount.Interface
NodeConfig
status Status
// External containers being managed.
systemContainers []*systemContainer
// 周期运行的任务
periodicTasks []func()
// 所有的挂载 cgroup subsystem
subsystems *CgroupSubsystems
// 节点信息
nodeInfo *v1.Node
// cgroupManager
cgroupManager CgroupManager
// Capacity of this node.
capacity v1.ResourceList
// Capacity of this node, including internal resources.
internalCapacity v1.ResourceList
// Absolute cgroupfs path to a cgroup that Kubelet needs to place all pods under.
// This path include a top level container for enforcing Node Allocatable.
cgroupRoot CgroupName
// Event recorder interface.
recorder record.EventRecorder
// Interface for QoS cgroup management
qosContainerManager QOSContainerManager
// Interface for exporting and allocating devices reported by device plugins.
deviceManager devicemanager.Manager
// cpu管理器
cpuManager cpumanager.Manager
// 内存管理器
memoryManager memorymanager.Manager
// numa 拓扑管理器
topologyManager topologymanager.Manager
}
NewContainerManager 初始化
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\cm\container_manager_linux.go
- 根据cgroup v1 v2版本获取cgroups subsystems
subsystems, err := GetCgroupSubsystems()
if err != nil {
return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
}
- swap开启时,kubelet不能运行,检查一下
if failSwapOn {
// Check whether swap is enabled. The Kubelet does not support running with swap enabled.
swapFile := "/proc/swaps"
swapData, err := ioutil.ReadFile(swapFile)
if err != nil {
if os.IsNotExist(err) {
klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFile)
} else {
return nil, err
}
} else {
swapData = bytes.TrimSpace(swapData) // extra trailing \n
swapLines := strings.Split(string(swapData), "\n")
// If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should
// error out unless --fail-swap-on is set to false.
if len(swapLines) > 1 {
return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
}
}
}
- 通过cadvisorInterface提供的获取节点信息的方法获取machineInfo,从中获取资源的容量信息,遍历后取值
var internalCapacity = v1.ResourceList{}
// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
// machine info is computed and cached once as part of cAdvisor object creation.
// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
machineInfo, err := cadvisorInterface.MachineInfo()
if err != nil {
return nil, err
}
capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
for k, v := range capacity {
internalCapacity[k] = v
}
pidlimits, err := pidlimit.Stats()
if err == nil && pidlimits != nil && pidlimits.MaxPID != nil {
internalCapacity[pidlimit.PIDs] = *resource.NewQuantity(
int64(*pidlimits.MaxPID),
resource.DecimalSI)
}
- 初始化cgroupManager
// Turn CgroupRoot from a string (in cgroupfs path format) to internal CgroupName
cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot)
cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
// Check if Cgroup-root actually exists on the node
if nodeConfig.CgroupsPerQOS {
// this does default to / when enabled, but this tests against regressions.
if nodeConfig.CgroupRoot == "" {
return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
}
// we need to check that the cgroup root actually exists for each subsystem
// of note, we always use the cgroupfs driver when performing this check since
// the input is provided in that format.
// this is important because we do not want any name conversion to occur.
if !cgroupManager.Exists(cgroupRoot) {
return nil, fmt.Errorf("invalid configuration: cgroup-root %q doesn't exist", cgroupRoot)
}
klog.InfoS("Container manager verified user specified cgroup-root exists", "cgroupRoot", cgroupRoot)
// Include the top level cgroup for enforcing node allocatable into cgroup-root.
// This way, all sub modules can avoid having to understand the concept of node allocatable.
cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
}
klog.InfoS("Creating Container Manager object based on Node Config", "nodeConfig", nodeConfig)
- 初始化qosManager
qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager)
if err != nil {
return nil, err
}
- 以众多字段初始化containerManagerImpl
cm := &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
NodeConfig: nodeConfig,
subsystems: subsystems,
cgroupManager: cgroupManager,
capacity: capacity,
internalCapacity: internalCapacity,
cgroupRoot: cgroupRoot,
recorder: recorder,
qosContainerManager: qosContainerManager,
}
- 初始化topologymanager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
cm.topologyManager, err = topologymanager.NewManager(
machineInfo.Topology,
nodeConfig.ExperimentalTopologyManagerPolicy,
nodeConfig.ExperimentalTopologyManagerScope,
)
if err != nil {
return nil, err
}
} else {
cm.topologyManager = topologymanager.NewFakeManager()
}
- 初始化deviceManager
klog.InfoS("Creating device plugin manager", "devicePluginEnabled", devicePluginEnabled)
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
}
if err != nil {
return nil, err
}
- 初始化cpuManager
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
nodeConfig.ExperimentalCPUManagerPolicyOptions,
nodeConfig.ExperimentalCPUManagerReconcilePeriod,
machineInfo,
nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
cm.GetNodeAllocatableReservation(),
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize cpu manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.cpuManager)
}
- 初始化memoryManager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
cm.memoryManager, err = memorymanager.NewManager(
nodeConfig.ExperimentalMemoryManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
nodeConfig.ExperimentalMemoryManagerReservedMemory,
nodeConfig.KubeletRootDir,
cm.topologyManager,
)
if err != nil {
klog.ErrorS(err, "Failed to initialize memory manager")
return nil, err
}
cm.topologyManager.AddHintProvider(cm.memoryManager)
}
总结可以看到
- containerManager的初始化过程就是初始化一众其他资源管理器
kubelet new中调用的containerManager初始化
- 将ContainerManager赋值给kubeDeps的相关字段
kubeDeps.ContainerManager, err = cm.NewContainerManager(
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
ContainerRuntime: s.ContainerRuntime,
CgroupsPerQOS: s.CgroupsPerQOS,
CgroupRoot: s.CgroupRoot,
CgroupDriver: s.CgroupDriver,
KubeletRootDir: s.RootDirectory,
ProtectKernelDefaults: s.ProtectKernelDefaults,
NodeAllocatableConfig: cm.NodeAllocatableConfig{
KubeReservedCgroupName: s.KubeReservedCgroup,
SystemReservedCgroupName: s.SystemReservedCgroup,
EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...),
KubeReserved: kubeReserved,
SystemReserved: systemReserved,
ReservedSystemCPUs: reservedSystemCPUs,
HardEvictionThresholds: hardEvictionThresholds,
},
QOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
ExperimentalCPUManagerPolicyOptions: cpuManagerPolicyOptions,
ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
ExperimentalMemoryManagerPolicy: s.MemoryManagerPolicy,
ExperimentalMemoryManagerReservedMemory: s.ReservedMemory,
ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
ExperimentalTopologyManagerScope: s.TopologyManagerScope,
},
s.FailSwapOn,
devicePluginEnabled,
kubeDeps.Recorder)
if err != nil {
return err
}
kubeDeps.ContainerManager最终用到了kubelet的containerManager
klet := &Kubelet{
containerManager: kubeDeps.ContainerManager,
}
追踪kubelet containerManager的启动
- 注释说明containerManager要在cAdvisor后启动,因为需要cAdvisor提供的节点文件系统容量信息
// containerManager must start after cAdvisor because it needs filesystem capacity information
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.sourcesReady, kl.statusManager, kl.runtimeService); err != nil {
// Fail kubelet and rely on the babysitter to retry starting kubelet.
klog.ErrorS(err, "Failed to start ContainerManager")
os.Exit(1)
}
底层是 containerManagerImpl的start
func (cm *containerManagerImpl) Start(){}
- 启动cpuManager
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
containerMap, err := buildContainerMapFromRuntime(runtimeService)
if err != nil {
return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
}
err = cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start cpu manager error: %v", err)
}
}
- 启动memoryManager
// Initialize memory manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
containerMap, err := buildContainerMapFromRuntime(runtimeService)
if err != nil {
return fmt.Errorf("failed to build map of initial containers from runtime: %v", err)
}
err = cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
if err != nil {
return fmt.Errorf("start memory manager error: %v", err)
}
}
限制kubelet本地临时存储的容量
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.LocalStorageCapacityIsolation) {
rootfs, err := cm.cadvisorInterface.RootFsInfo()
if err != nil {
return fmt.Errorf("failed to get rootfs info: %v", err)
}
for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
cm.capacity[rName] = rCap
}
}
- 源码中的流程为如果开启了LocalStorageCapacityIsolation特性就通过 cadvisor接口获取 rootfsinfo
- 然后将类型为ephemeral-storage的存储的容量设置为rootfs获取到的
// EphemeralStorageCapacityFromFsInfo returns the capacity of the ephemeral storage from the FsInfo.
func EphemeralStorageCapacityFromFsInfo(info cadvisorapi2.FsInfo) v1.ResourceList {
c := v1.ResourceList{
v1.ResourceEphemeralStorage: *resource.NewQuantity(
int64(info.Capacity),
resource.BinarySI),
}
return c
}
为什么要限制本地临时存储呢
- 早期kubernetes版本并没有限制container的rootfs的容量
- 由于默认容器使用的log存储空间是在 /var/lib/kubelet/ 下
- rootfs在/var/lib/docker下,而这两个目录默认就在宿主机node的根分区
- 如果应用恶意攻击,可以通过在容器内大量dd从而迅速造成宿主机node根分区文件系统满
- 我们知道,当linux根分区使用达到100%的时候,通常会很危险。
local ephemeral storage 类型的资源
- kubernetes在1.8版本引入了一种新的resource:local ephemeral storage(临时存储),用来管理本地临时存储
- 对应特性 LocalStorageCapacityIsolation。从1.10开始该特性转为beta状态,默认开启。
- 临时存储,如 emptyDir volumes, container logs, image layers and container writable layers
- 默认它们使用的是 /var/lib/kubelet ,通过限制临时存储容量,也就可以保护node的root分区了。
回到containerManagerImpl的start中
- 校验node 可用资源的配置
// Ensure that node allocatable configuration is valid.
if err := cm.validateNodeAllocatable(); err != nil {
return err
}
做node的一些初始化工作
// Setup the node
if err := cm.setupNode(activePods); err != nil {
return err
}
- validateSystemRequirements会校验 cpu memory cgroup是否已经挂载上了
f, err := validateSystemRequirements(cm.mountUtil)
if err != nil {
return err
}
- 创建顶级qos cgroup
// Setup top level qos containers only if CgroupsPerQOS flag is specified as true
if cm.NodeConfig.CgroupsPerQOS {
if err := cm.createNodeAllocatableCgroups(); err != nil {
return err
}
err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods)
if err != nil {
return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
}
}
- 如果容器运行时是 docker,那么检查 docker 的cgroup
if cm.ContainerRuntime == "docker" {
// With the docker-CRI integration, dockershim manages the cgroups
// and oom score for the docker processes.
// Check the cgroup for docker periodically, so kubelet can serve stats for the docker runtime.
// TODO(KEP#866): remove special processing for CRI "docker" enablement
cm.periodicTasks = append(cm.periodicTasks, func() {
klog.V(4).InfoS("Adding periodic tasks for docker CRI integration")
cont, err := getContainerNameForProcess(dockerProcessName, dockerPidFile)
if err != nil {
klog.ErrorS(err, "Failed to get container name for process")
return
}
klog.V(2).InfoS("Discovered runtime cgroup name", "cgroupName", cont)
cm.Lock()
defer cm.Unlock()
cm.RuntimeCgroupsName = cont
})
}