- GenericRuntimeManager 的syncPod方法 步骤6代表创建init容器
- 按照init容器要求的按顺序启动
- 调用的通用的start 启动容器,和app容器一致
- pull the image 拉取镜像
- 会先检查容器是否已经在本节点上
- 然后根据容器配置的镜像拉取策略拉取
- 底层就是通过grpc调用runtime 的PullImage方法
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
// 1. Compute sandbox and container changes.
// 2. Kill pod sandbox if necessary.
// 3. Kill any containers that should not be running.
// 4. Create sandbox if necessary.
// 5. Create ephemeral containers.
// 6. Create init containers.
// 7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult)
相关参考代码: pkg\kubelet\kuberuntime\kuberuntime_manager.go
init容器的创建
// Step 6: start the init container.
if container := podContainerChanges.NextInitContainerToStart; container != nil {
// Start the next init container.
if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
return
}
// Successfully started the container; clear the entry in the failure
klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}
- 在这里可以看到是 根据podContainerChanges.NextInitContainerToStart 获取下一个待启动的init容器,NextInitContainerToStart 是在哪里被赋值的呢?其实是在syncPod最开始的函数computePodActions中
- 首先会追溯到computePodActions中,NextInitContainerToStart 被赋值成init容器的第一个
if len(pod.Spec.InitContainers) != 0 {
// Pod has init containers, return the first one.
changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
return changes
}
- 后续就是在findNextInitContainerToRun 获取的下一个init 容器
// Check initialization progress.
initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
if !done {
if next != nil {
initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus)
if initFailed && !shouldRestartOnFailure(pod) {
changes.KillPod = true
} else {
// Always try to stop containers in unknown state first.
if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {
changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{
name: next.Name,
container: next,
message: fmt.Sprintf("Init container is in %q state, try killing it before restart",
initLastStatus.State),
reason: reasonUnknown,
}
}
changes.NextInitContainerToStart = next
}
}
// Initialization failed or still in progress. Skip inspecting non-init
// containers.
return changes
}
findNextInitContainerToRun 解析
按照init容器先于业务容器启动,并且按顺序执行退出的原则
- 有限检查用户是否配置了init容器
if len(pod.Spec.InitContainers) == 0 {
return nil, nil, true
}
- 遍历app容器,如果有app容器已经处于running了,那么init容器肯定已经执行完毕了
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
status := podStatus.FindContainerStatusByName(container.Name)
if status != nil && status.State == kubecontainer.ContainerStateRunning {
return nil, nil, true
}
}
- 遍历init容器,找到最后failed的那个返回,做retry
for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {
container := &pod.Spec.InitContainers[i]
status := podStatus.FindContainerStatusByName(container.Name)
if status != nil && isInitContainerFailed(status) {
return status, container, false
}
}
- 遍历init容器,没找到容器的status,continue
// There are no failed containers now.
for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {
container := &pod.Spec.InitContainers[i]
status := podStatus.FindContainerStatusByName(container.Name)
if status == nil {
continue
}
- 如果这个init容器还处于running状态,那么返回nil
// container is still running, return not done.
if status.State == kubecontainer.ContainerStateRunning {
return nil, nil, false
}
- 如果最后一个init容器处于exited状态,那么所有的 init容器已经执行完毕
if status.State == kubecontainer.ContainerStateExited {
// all init containers successful
if i == (len(pod.Spec.InitContainers) - 1) {
return nil, nil, true
}
// all containers up to i successful, go to i+1
return nil, &pod.Spec.InitContainers[i+1], false
}
- 兜底返回 init容器的第一个
return nil, &pod.Spec.InitContainers[0], false
findNextInitContainerToRun 的逻辑分析完,后面就是执行start
- 可以看到调用start启动init容器
- 首先构造 执行结果结构体
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
result.AddSyncResult(startContainerResult)
- 检查容器是否在backoff状态,代表重试中
isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
return err
}
- 然后就是调用 m.startContainer 启动容器,判断返回的错误
- 这里ErrImagePullBackOff 代表拉取镜像错误
- 其余错误归为另一类
if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
// useful to cluster administrators to distinguish "server errors" from "user errors".
metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
startContainerResult.Fail(err, msg)
// known errors that are logged in other places are logged at higher levels here to avoid
// repetitive log spam
switch {
case err == images.ErrImagePullBackOff:
klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
default:
utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
}
return err
}
startContainer 解析
4个步骤启动容器
- pull the image 拉取镜像
- create the container 创建容器
- start the container 启动容器
- run the post start lifecycle hooks (if applicable) 执行poststart钩子
01 拉取镜像
// Step 1: pull the image.
imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
if err != nil {
s, _ := grpcstatus.FromError(err)
m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
return msg, err
}
- 过程就是调用imagePuller.EnsureImageExists 拉取镜像,返回镜像的ref标识
- 对应的实现者在 image_manager中 ,位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\images\image_manager.go
- 首先获取 容器对象
logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, container.Image)
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod), "containerName", container.Name)
}
检查镜像的tag ,没有tag加上默认的latest
- 传入的参数就是 容器的image字段
// If the image contains no tag or digest, a default tag should be applied.
image, err := applyDefaultImageTag(container.Image)
if err != nil {
msg := fmt.Sprintf("Failed to apply default image tag %q: %v", container.Image, err)
m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
return "", msg, ErrInvalidImageName
}
- applyDefaultImageTag 如下
// applyDefaultImageTag parses a docker image string, if it doesn't contain any tag or digest,
// a default tag will be applied.
func applyDefaultImageTag(image string) (string, error) {
named, err := dockerref.ParseNormalizedNamed(image)
if err != nil {
return "", fmt.Errorf("couldn't parse image reference %q: %v", image, err)
}
_, isTagged := named.(dockerref.Tagged)
_, isDigested := named.(dockerref.Digested)
if !isTagged && !isDigested {
// we just concatenate the image name with the default tag here instead
// of using dockerref.WithTag(named, ...) because that would cause the
// image to be fully qualified as docker.io/$name if it's a short name
// (e.g. just busybox). We don't want that to happen to keep the CRI
// agnostic wrt image names and default hostnames.
image = image + ":latest"
}
return image, nil
}
构造镜像的spec ,把Annotation添加进去
var podAnnotations []kubecontainer.Annotation
for k, v := range pod.GetAnnotations() {
podAnnotations = append(podAnnotations, kubecontainer.Annotation{
Name: k,
Value: v,
})
}
spec := kubecontainer.ImageSpec{
Image: image,
Annotations: podAnnotations,
}
调用 imageService的GetImageRef 获取镜像id
- 如果返回的imageRef不为空代表该镜像已经在本地存储中了,否则不在
imageRef, err := m.imageService.GetImageRef(spec)
if err != nil {
msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
return "", msg, ErrImageInspect
}
- 底层通过grpc 调用runtime 的ImageStatus方法 获取
// GetImageRef gets the ID of the image which has already been in
// the local storage. It returns ("", nil) if the image isn't in the local storage.
func (m *kubeGenericRuntimeManager) GetImageRef(image kubecontainer.ImageSpec) (string, error) {
status, err := m.imageService.ImageStatus(toRuntimeAPIImageSpec(image))
if err != nil {
klog.ErrorS(err, "Failed to get image status", "image", image.Image)
return "", err
}
if status == nil {
return "", nil
}
return status.Id, nil
}
根据pod中配置的容器镜像拉取策略判断是否要拉取
- present := imageRef != “” 代表镜像是否在本地存在
- shouldPullImage判断容器的镜像拉取策略
- 如果策略等于PullNever 返回不需要
- PullAlways 返回需要
- PullIfNotPresent 并且 imagePresent为false 返回需要
// shouldPullImage returns whether we should pull an image according to
// the presence and pull policy of the image.
func shouldPullImage(container *v1.Container, imagePresent bool) bool {
if container.ImagePullPolicy == v1.PullNever {
return false
}
if container.ImagePullPolicy == v1.PullAlways ||
(container.ImagePullPolicy == v1.PullIfNotPresent && (!imagePresent)) {
return true
}
return false
}
- 如果判断不需要拉取镜像则产生event并打印日志
if !shouldPullImage(container, present) {
if present {
msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
return imageRef, "", nil
}
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
m.logIt(ref, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning)
return "", msg, ErrImageNeverPull
}
检查镜像是否处在 重试中
- 机制就是检查重试的窗口和上次重试的时间
backOffKey := fmt.Sprintf("%s_%s", pod.UID, container.Image)
if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
return "", msg, ErrImagePullBackOff
}
开始拉取镜像
- 调用puller.pullImage
m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", container.Image), klog.Info)
startTime := time.Now()
pullChan := make(chan pullResult)
m.puller.pullImage(spec, pullSecrets, pullChan, podSandboxConfig)
pullImage解析
- 根据传入的 serialize-image-pulls 决定串行拉取还是并行拉取
- 串行拉取,一次只能拉取一个
func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
sip.pullRequests <- &imagePullRequest{
spec: spec,
pullSecrets: pullSecrets,
pullChan: pullChan,
podSandboxConfig: podSandboxConfig,
}
}
func (sip *serialImagePuller) processImagePullRequests() {
for pullRequest := range sip.pullRequests {
imageRef, err := sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets, pullRequest.podSandboxConfig)
pullRequest.pullChan <- pullResult{
imageRef: imageRef,
err: err,
}
}
}
- 并行拉取,异步执行
func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
go func() {
imageRef, err := pip.imageService.PullImage(spec, pullSecrets, podSandboxConfig)
pullChan <- pullResult{
imageRef: imageRef,
err: err,
}
}()
}
- 无论并行串行 都需要调用imageService.PullImage
kubeGenericRuntimeManager的PullImage解析
- 首先通过ParseImageName 解析出镜像的
- 名字
- 标签
- digest 摘要 代表镜像的哈希值,是为了防止 镜像名和tag没有变,但是镜像的内容变了(layer变了),这就会造成相同的镜像
- digest是对manifest文件的sha256,当镜像的内容变化,即layer变化,相应的layer的sha256变化,以至manifest变化,从而保证了一个digest(不是镜像名+tag)对应一个镜像
img := image.Image
repoToPull, _, _, err := parsers.ParseImageName(img)
if err != nil {
return "", err
}
- 拉取镜像鉴权相关
keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, m.keyring)
if err != nil {
return "", err
}
- 根据鉴权 传入imageService.PullImage拉取镜像
imageRef, err := m.imageService.PullImage(imgSpec, nil, podSandboxConfig)
if err != nil {
klog.ErrorS(err, "Failed to pull image", "image", img)
return "", err
}
- 底层就是通过grpc调用runtime 的PullImage方法
resp, err := r.imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
Image: image,
Auth: auth,
SandboxConfig: podSandboxConfig,
})
if err != nil {
klog.ErrorS(err, "PullImage from image service failed", "image", image.Image)
return "", err
}
本节重点总结 :
- GenericRuntimeManager 的syncPod方法 步骤6代表创建init容器
- 按照init容器要求的按顺序启动
- 调用的通用的start 启动容器,和app容器一致
- pull the image 拉取镜像
- 会先检查容器是否已经在本节点上
- 然后根据容器配置的镜像拉取策略拉取
- 底层就是通过grpc调用runtime 的PullImage方法