重点
- containerRuntime 需要实现3类接口
- 管理容器的接口
- 管理镜像的接口
- Streaming API 用于客户端与容器进行交互
type KubeGenericRuntime interface {
kubecontainer.Runtime
kubecontainer.StreamingRuntime
kubecontainer.CommandRunner
}
- containerRuntime 的初始化
containerRuntime 接口解析
- 在上节课我们知道containerRuntime需要实现3类接口
- 管理容器的接口
- 管理镜像的接口
- Streaming API 用于客户端与容器进行交互
- 那么发现 KubeGenericRuntime实现了这些接口
- 位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\kuberuntime\kuberuntime_manager.go
// KubeGenericRuntime is a interface contains interfaces for container runtime and command.
type KubeGenericRuntime interface {
kubecontainer.Runtime
kubecontainer.StreamingRuntime
kubecontainer.CommandRunner
}
-
在KubeGenericRuntime接口中定义了3种接口
- kubecontainer.Runtime 代表管理容器的接口,包括镜像的管理
- kubecontainer.StreamingRuntime 代表Streaming API
- kubecontainer.CommandRunner 代表在容器中执行命令 ,给prober用的
-
下面就来具体分析一下
01 kubecontainer.Runtime
- 接口定义 ,位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\container\runtime.go
type Runtime interface {
// Type returns the type of the container runtime.
Type() string
//SupportsSingleFileMapping returns whether the container runtime supports single file mappings or not.
SupportsSingleFileMapping() bool
// Version returns the version information of the container runtime.
Version() (Version, error)
// APIVersion returns the cached API version information of the container
// runtime. Implementation is expected to update this cache periodically.
// This may be different from the runtime engine's version.
// TODO(random-liu): We should fold this into Version()
APIVersion() (Version, error)
// Status returns the status of the runtime. An error is returned if the Status
// function itself fails, nil otherwise.
Status() (*RuntimeStatus, error)
// GetPods returns a list of containers grouped by pods. The boolean parameter
// specifies whether the runtime returns all containers including those already
// exited and dead containers (used for garbage collection).
GetPods(all bool) ([]*Pod, error)
// GarbageCollect removes dead containers using the specified container gc policy
// If allSourcesReady is not true, it means that kubelet doesn't have the
// complete list of pods from all available sources (e.g., apiserver, http,
// file). In this case, garbage collector should refrain itself from aggressive
// behavior such as removing all containers of unrecognized pods (yet).
// If evictNonDeletedPods is set to true, containers and sandboxes belonging to pods
// that are terminated, but not deleted will be evicted. Otherwise, only deleted pods
// will be GC'd.
// TODO: Revisit this method and make it cleaner.
GarbageCollect(gcPolicy GCPolicy, allSourcesReady bool, evictNonDeletedPods bool) error
// SyncPod syncs the running pod into the desired pod.
SyncPod(pod *v1.Pod, podStatus *PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) PodSyncResult
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
// TODO(random-liu): Return PodSyncResult in KillPod.
// gracePeriodOverride if specified allows the caller to override the pod default grace period.
// only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data.
// it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios.
KillPod(pod *v1.Pod, runningPod Pod, gracePeriodOverride *int64) error
// GetPodStatus retrieves the status of the pod, including the
// information of all containers in the pod that are visible in Runtime.
GetPodStatus(uid types.UID, name, namespace string) (*PodStatus, error)
// TODO(vmarmol): Unify pod and containerID args.
// GetContainerLogs returns logs of a specific container. By
// default, it returns a snapshot of the container log. Set 'follow' to true to
// stream the log. Set 'follow' to false and specify the number of lines (e.g.
// "100" or "all") to tail the log.
GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error)
// DeleteContainer deletes a container. If the container is still running, an error is returned.
DeleteContainer(containerID ContainerID) error
// ImageService provides methods to image-related methods.
ImageService
// UpdatePodCIDR sends a new podCIDR to the runtime.
// This method just proxies a new runtimeConfig with the updated
// CIDR value down to the runtime shim.
UpdatePodCIDR(podCIDR string) error
}
- 其中重要的方法如
- SyncPod 创建容器时所用到的
- KillPod 删除容器时所用到的
- GetPods 获取pod
- 同时可以观察到内部继承了 ImageService 的接口
ImageService 的接口
- 在同一个文件中可以看到ImageService 的接口 全都是围绕镜像服务的
// ImageService interfaces allows to work with image service.
type ImageService interface {
// PullImage pulls an image from the network to local storage using the supplied
// secrets if necessary. It returns a reference (digest or ID) to the pulled image.
PullImage(image ImageSpec, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
// GetImageRef gets the reference (digest or ID) of the image which has already been in
// the local storage. It returns ("", nil) if the image isn't in the local storage.
GetImageRef(image ImageSpec) (string, error)
// ListImages gets all images currently on the machine.
ListImages() ([]Image, error)
// RemoveImage removes the specified image.
RemoveImage(image ImageSpec) error
// ImageStats returns Image statistics.
ImageStats() (*ImageStats, error)
}
02 kubecontainer.StreamingRuntime
- 追踪可以得知都是 和容器交互的 stream api
- 比如Getexec就是 kubectl exec 所用到的
// streaming calls (exec/attach/port-forward) themselves. In this case, Kubelet should redirect to
// the runtime server.
type StreamingRuntime interface {
GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
}
03 kubecontainer.CommandRunner
- 其中就一个RunInContainer方法,代表在指定容器中执行cmd,超时时间为timeout
// CommandRunner interface allows to run command in a container.
type CommandRunner interface {
// RunInContainer synchronously executes the command in the container, and returns the output.
// If the command completes with a non-0 exit code, a k8s.io/utils/exec.ExitError will be returned.
RunInContainer(id ContainerID, cmd []string, timeout time.Duration) ([]byte, error)
}
containerRuntime 的初始化
- 对应的字段在kubelet中就是containerRuntime
// Container runtime.
containerRuntime kubecontainer.Runtime
- 追踪这个containerRuntime的赋值,发现对应的初始化方法为NewKubeGenericRuntimeManager
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
klet.readinessManager,
klet.startupManager,
rootDirectory,
machineInfo,
klet.podWorkers,
kubeDeps.OSInterface,
klet,
httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
imageCredentialProviderConfigFile,
imageCredentialProviderBinDir,
kubeCfg.CPUCFSQuota,
kubeCfg.CPUCFSQuotaPeriod,
kubeDeps.RemoteRuntimeService,
kubeDeps.RemoteImageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
kubeDeps.dockerLegacyService,
klet.containerLogManager,
klet.runtimeClassManager,
seccompDefault,
kubeCfg.MemorySwap.SwapBehavior,
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
)
kubeGenericRuntimeManager字段解析
type kubeGenericRuntimeManager struct {
runtimeName string // runtime的名字
recorder record.EventRecorder // event recorder
osInterface kubecontainer.OSInterface // 节点级别操作的接口
// machineInfo cadvisor提供的 节点信息接口
machineInfo *cadvisorapi.MachineInfo
// Container GC manager
containerGC *containerGC
// 拉镜像的 凭证
keyring credentialprovider.DockerKeyring
// 声明周期事件的runner
runner kubecontainer.HandlerRunner
// RuntimeHelper that wraps kubelet to generate runtime container options.
runtimeHelper kubecontainer.RuntimeHelper
// 健康检测的Manager
livenessManager proberesults.Manager
readinessManager proberesults.Manager
startupManager proberesults.Manager
// CFS配额
cpuCFSQuota bool
// cpu.cfs_period_us, defaults to 100ms
cpuCFSQuotaPeriod metav1.Duration
// image puller的封装
imagePuller images.ImageManager
// gRPC service clients
runtimeService internalapi.RuntimeService
imageService internalapi.ImageManagerService
// The version cache of runtime daemon.
versionCache *cache.ObjectCache
// The directory path for seccomp profiles.
seccompProfileRoot string
// Internal lifecycle event handlers for container resource management.
internalLifecycle cm.InternalContainerLifecycle
// A shim to legacy functions for backward compatibility.
legacyLogProvider LegacyLogProvider
// Manage container logs.
logManager logs.ContainerLogManager
// Manage RuntimeClass resources.
runtimeClassManager *runtimeclass.Manager
// Cache last per-container error message to reduce log spam
logReduction *logreduction.LogReduction
// PodState provider instance
podStateProvider podStateProvider
// Use RuntimeDefault as the default seccomp profile for all workloads.
seccompDefault bool
// MemorySwapBehavior defines how swap is used
memorySwapBehavior string
//Function to get node allocatable resources
getNodeAllocatable func() v1.ResourceList
// Memory throttling factor for MemoryQoS
memoryThrottlingFactor float64
}
NewKubeGenericRuntimeManager解析
- 首先就是填充相关字段
kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder,
cpuCFSQuota: cpuCFSQuota,
cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
seccompProfileRoot: filepath.Join(rootDirectory, "seccomp"),
livenessManager: livenessManager,
readinessManager: readinessManager,
startupManager: startupManager,
machineInfo: machineInfo,
osInterface: osInterface,
runtimeHelper: runtimeHelper,
runtimeService: newInstrumentedRuntimeService(runtimeService),
imageService: newInstrumentedImageManagerService(imageService),
internalLifecycle: internalLifecycle,
legacyLogProvider: legacyLogProvider,
logManager: logManager,
runtimeClassManager: runtimeClassManager,
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
seccompDefault: seccompDefault,
memorySwapBehavior: memorySwapBehavior,
getNodeAllocatable: getNodeAllocatable,
memoryThrottlingFactor: memoryThrottlingFactor,
}
- 获取runtime 的版本
typedVersion, err := kubeRuntimeManager.getTypedVersion()
if err != nil {
klog.ErrorS(err, "Get runtime version failed")
return nil, err
}
- 要求kubeRuntimeAPIVersion 为指定版本
// Only matching kubeRuntimeAPIVersion is supported now
// TODO: Runtime API machinery is under discussion at https://github.com/kubernetes/kubernetes/issues/28642
if typedVersion.Version != kubeRuntimeAPIVersion {
klog.ErrorS(err, "This runtime api version is not supported",
"apiVersion", typedVersion.Version,
"supportedAPIVersion", kubeRuntimeAPIVersion)
return nil, ErrVersionNotSupported
}
- 使用返回的runtime Name字段
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
klog.InfoS("Container runtime initialized",
"containerRuntime", typedVersion.RuntimeName,
"version", typedVersion.RuntimeVersion,
"apiVersion", typedVersion.RuntimeApiVersion)
- 确保pod log目录存在
// If the container logs directory does not exist, create it.
// TODO: create podLogsRootDirectory at kubelet.go when kubelet is refactored to
// new runtime interface
if _, err := osInterface.Stat(podLogsRootDirectory); os.IsNotExist(err) {
if err := osInterface.MkdirAll(podLogsRootDirectory, 0755); err != nil {
klog.ErrorS(err, "Failed to create pod log directory", "path", podLogsRootDirectory)
}
}
if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletCredentialProviders) && (imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "") {
klog.InfoS("Flags --image-credential-provider-config or --image-credential-provider-bin-dir were set but the feature gate was disabled, these flags will be ignored",
"featureGate", features.KubeletCredentialProviders)
}
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletCredentialProviders) && (imageCredentialProviderConfigFile != "" || imageCredentialProviderBinDir != "") {
if err := plugin.RegisterCredentialProviderPlugins(imageCredentialProviderConfigFile, imageCredentialProviderBinDir); err != nil {
klog.ErrorS(err, "Failed to register CRI auth plugins")
os.Exit(1)
}
}
- 创建imagePuller 做镜像拉取
kubeRuntimeManager.keyring = credentialprovider.NewDockerKeyring()
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),
kubeRuntimeManager,
imageBackOff,
serializeImagePulls,
imagePullQPS,
imagePullBurst)
- 创建runner 、containerGC、podStateProvider
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = newContainerGC(runtimeService, podStateProvider, kubeRuntimeManager)
kubeRuntimeManager.podStateProvider = podStateProvider
runtime对象的赋值
- 可以看到kubelet把新建的runtime对象赋值给了 3个字段
- containerRuntime
- streamingRuntime
- runner
klet.containerRuntime = runtime
klet.streamingRuntime = runtime
klet.runner = runtime