• GenericRuntimeManager 的syncPod方法 步骤6代表创建init容器
  • 按照init容器要求的按顺序启动
  • 调用的通用的start 启动容器,和app容器一致
  • pull the image 拉取镜像
    • 会先检查容器是否已经在本节点上
    • 然后根据容器配置的镜像拉取策略拉取
    • 底层就是通过grpc调用runtime 的PullImage方法
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult)

相关参考代码: pkg\kubelet\kuberuntime\kuberuntime_manager.go

init容器的创建

	// Step 6: start the init container.
	if container := podContainerChanges.NextInitContainerToStart; container != nil {
		// Start the next init container.
		if err := start("init container", metrics.InitContainer, containerStartSpec(container)); err != nil {
			return
		}

		// Successfully started the container; clear the entry in the failure
		klog.V(4).InfoS("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
	}

  • 在这里可以看到是 根据podContainerChanges.NextInitContainerToStart 获取下一个待启动的init容器,NextInitContainerToStart 是在哪里被赋值的呢?其实是在syncPod最开始的函数computePodActions中
  • 首先会追溯到computePodActions中,NextInitContainerToStart 被赋值成init容器的第一个
		if len(pod.Spec.InitContainers) != 0 {
			// Pod has init containers, return the first one.
			changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
			return changes
		}
  • 后续就是在findNextInitContainerToRun 获取的下一个init 容器
	// Check initialization progress.
	initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
	if !done {
		if next != nil {
			initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus)
			if initFailed && !shouldRestartOnFailure(pod) {
				changes.KillPod = true
			} else {
				// Always try to stop containers in unknown state first.
				if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown {
					changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{
						name:      next.Name,
						container: next,
						message: fmt.Sprintf("Init container is in %q state, try killing it before restart",
							initLastStatus.State),
						reason: reasonUnknown,
					}
				}
				changes.NextInitContainerToStart = next
			}
		}
		// Initialization failed or still in progress. Skip inspecting non-init
		// containers.
		return changes
	}

findNextInitContainerToRun 解析

按照init容器先于业务容器启动,并且按顺序执行退出的原则

  • 有限检查用户是否配置了init容器
	if len(pod.Spec.InitContainers) == 0 {
		return nil, nil, true
	}
  • 遍历app容器,如果有app容器已经处于running了,那么init容器肯定已经执行完毕了
	for i := range pod.Spec.Containers {
		container := &pod.Spec.Containers[i]
		status := podStatus.FindContainerStatusByName(container.Name)
		if status != nil && status.State == kubecontainer.ContainerStateRunning {
			return nil, nil, true
		}
	}

  • 遍历init容器,找到最后failed的那个返回,做retry
	for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {
		container := &pod.Spec.InitContainers[i]
		status := podStatus.FindContainerStatusByName(container.Name)
		if status != nil && isInitContainerFailed(status) {
			return status, container, false
		}
	}
  • 遍历init容器,没找到容器的status,continue
	// There are no failed containers now.
	for i := len(pod.Spec.InitContainers) - 1; i >= 0; i-- {
		container := &pod.Spec.InitContainers[i]
		status := podStatus.FindContainerStatusByName(container.Name)
		if status == nil {
			continue
		}

  • 如果这个init容器还处于running状态,那么返回nil
		// container is still running, return not done.
		if status.State == kubecontainer.ContainerStateRunning {
			return nil, nil, false
		}
  • 如果最后一个init容器处于exited状态,那么所有的 init容器已经执行完毕
		if status.State == kubecontainer.ContainerStateExited {
			// all init containers successful
			if i == (len(pod.Spec.InitContainers) - 1) {
				return nil, nil, true
			}

			// all containers up to i successful, go to i+1
			return nil, &pod.Spec.InitContainers[i+1], false
		}
  • 兜底返回 init容器的第一个
return nil, &pod.Spec.InitContainers[0], false

findNextInitContainerToRun 的逻辑分析完,后面就是执行start

  • 可以看到调用start启动init容器
  • 首先构造 执行结果结构体
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
		result.AddSyncResult(startContainerResult)

  • 检查容器是否在backoff状态,代表重试中
		isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
			return err
		}

  • 然后就是调用 m.startContainer 启动容器,判断返回的错误
    • 这里ErrImagePullBackOff 代表拉取镜像错误
    • 其余错误归为另一类
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
			// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
			// useful to cluster administrators to distinguish "server errors" from "user errors".
			metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
			startContainerResult.Fail(err, msg)
			// known errors that are logged in other places are logged at higher levels here to avoid
			// repetitive log spam
			switch {
			case err == images.ErrImagePullBackOff:
				klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
			default:
				utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
			}
			return err
		}

startContainer 解析

4个步骤启动容器

  • pull the image 拉取镜像
  • create the container 创建容器
  • start the container 启动容器
  • run the post start lifecycle hooks (if applicable) 执行poststart钩子

01 拉取镜像

	// Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)
	if err != nil {
		s, _ := grpcstatus.FromError(err)
		m.recordContainerEvent(pod, container, "", v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", s.Message())
		return msg, err
	}
  • 过程就是调用imagePuller.EnsureImageExists 拉取镜像,返回镜像的ref标识
  • 对应的实现者在 image_manager中 ,位置 D:\go_path\src\github.com\kubernetes\kubernetes\pkg\kubelet\images\image_manager.go
  • 首先获取 容器对象
	logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, container.Image)
	ref, err := kubecontainer.GenerateContainerRef(pod, container)
	if err != nil {
		klog.ErrorS(err, "Couldn't make a ref to pod", "pod", klog.KObj(pod), "containerName", container.Name)
	}

检查镜像的tag ,没有tag加上默认的latest

  • 传入的参数就是 容器的image字段
	// If the image contains no tag or digest, a default tag should be applied.
	image, err := applyDefaultImageTag(container.Image)
	if err != nil {
		msg := fmt.Sprintf("Failed to apply default image tag %q: %v", container.Image, err)
		m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
		return "", msg, ErrInvalidImageName
	}
  • applyDefaultImageTag 如下
// applyDefaultImageTag parses a docker image string, if it doesn't contain any tag or digest,
// a default tag will be applied.
func applyDefaultImageTag(image string) (string, error) {
	named, err := dockerref.ParseNormalizedNamed(image)
	if err != nil {
		return "", fmt.Errorf("couldn't parse image reference %q: %v", image, err)
	}
	_, isTagged := named.(dockerref.Tagged)
	_, isDigested := named.(dockerref.Digested)
	if !isTagged && !isDigested {
		// we just concatenate the image name with the default tag here instead
		// of using dockerref.WithTag(named, ...) because that would cause the
		// image to be fully qualified as docker.io/$name if it's a short name
		// (e.g. just busybox). We don't want that to happen to keep the CRI
		// agnostic wrt image names and default hostnames.
		image = image + ":latest"
	}
	return image, nil
}

构造镜像的spec ,把Annotation添加进去


	var podAnnotations []kubecontainer.Annotation
	for k, v := range pod.GetAnnotations() {
		podAnnotations = append(podAnnotations, kubecontainer.Annotation{
			Name:  k,
			Value: v,
		})
	}

	spec := kubecontainer.ImageSpec{
		Image:       image,
		Annotations: podAnnotations,
	}

调用 imageService的GetImageRef 获取镜像id

  • 如果返回的imageRef不为空代表该镜像已经在本地存储中了,否则不在
	imageRef, err := m.imageService.GetImageRef(spec)
	if err != nil {
		msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err)
		m.logIt(ref, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
		return "", msg, ErrImageInspect
	}
  • 底层通过grpc 调用runtime 的ImageStatus方法 获取
// GetImageRef gets the ID of the image which has already been in
// the local storage. It returns ("", nil) if the image isn't in the local storage.
func (m *kubeGenericRuntimeManager) GetImageRef(image kubecontainer.ImageSpec) (string, error) {
	status, err := m.imageService.ImageStatus(toRuntimeAPIImageSpec(image))
	if err != nil {
		klog.ErrorS(err, "Failed to get image status", "image", image.Image)
		return "", err
	}
	if status == nil {
		return "", nil
	}
	return status.Id, nil
}

根据pod中配置的容器镜像拉取策略判断是否要拉取

  • present := imageRef != “” 代表镜像是否在本地存在
  • shouldPullImage判断容器的镜像拉取策略
    • 如果策略等于PullNever 返回不需要
    • PullAlways 返回需要
    • PullIfNotPresent 并且 imagePresent为false 返回需要
// shouldPullImage returns whether we should pull an image according to
// the presence and pull policy of the image.
func shouldPullImage(container *v1.Container, imagePresent bool) bool {
	if container.ImagePullPolicy == v1.PullNever {
		return false
	}

	if container.ImagePullPolicy == v1.PullAlways ||
		(container.ImagePullPolicy == v1.PullIfNotPresent && (!imagePresent)) {
		return true
	}

	return false
}

  • 如果判断不需要拉取镜像则产生event并打印日志
	if !shouldPullImage(container, present) {
		if present {
			msg := fmt.Sprintf("Container image %q already present on machine", container.Image)
			m.logIt(ref, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
			return imageRef, "", nil
		}
		msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image)
		m.logIt(ref, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning)
		return "", msg, ErrImageNeverPull
	}

检查镜像是否处在 重试中

  • 机制就是检查重试的窗口和上次重试的时间
	backOffKey := fmt.Sprintf("%s_%s", pod.UID, container.Image)
	if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
		msg := fmt.Sprintf("Back-off pulling image %q", container.Image)
		m.logIt(ref, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
		return "", msg, ErrImagePullBackOff
	}

开始拉取镜像

  • 调用puller.pullImage
	m.logIt(ref, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", container.Image), klog.Info)
	startTime := time.Now()
	pullChan := make(chan pullResult)
	m.puller.pullImage(spec, pullSecrets, pullChan, podSandboxConfig)

pullImage解析

  • 根据传入的 serialize-image-pulls 决定串行拉取还是并行拉取
  • 串行拉取,一次只能拉取一个
func (sip *serialImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
	sip.pullRequests <- &imagePullRequest{
		spec:             spec,
		pullSecrets:      pullSecrets,
		pullChan:         pullChan,
		podSandboxConfig: podSandboxConfig,
	}
}
func (sip *serialImagePuller) processImagePullRequests() {
	for pullRequest := range sip.pullRequests {
		imageRef, err := sip.imageService.PullImage(pullRequest.spec, pullRequest.pullSecrets, pullRequest.podSandboxConfig)
		pullRequest.pullChan <- pullResult{
			imageRef: imageRef,
			err:      err,
		}
	}
}

  • 并行拉取,异步执行
func (pip *parallelImagePuller) pullImage(spec kubecontainer.ImageSpec, pullSecrets []v1.Secret, pullChan chan<- pullResult, podSandboxConfig *runtimeapi.PodSandboxConfig) {
	go func() {
		imageRef, err := pip.imageService.PullImage(spec, pullSecrets, podSandboxConfig)
		pullChan <- pullResult{
			imageRef: imageRef,
			err:      err,
		}
	}()
}
  • 无论并行串行 都需要调用imageService.PullImage

kubeGenericRuntimeManager的PullImage解析

  • 首先通过ParseImageName 解析出镜像的
    • 名字
    • 标签
    • digest 摘要 代表镜像的哈希值,是为了防止 镜像名和tag没有变,但是镜像的内容变了(layer变了),这就会造成相同的镜像
      • digest是对manifest文件的sha256,当镜像的内容变化,即layer变化,相应的layer的sha256变化,以至manifest变化,从而保证了一个digest(不是镜像名+tag)对应一个镜像
	img := image.Image
	repoToPull, _, _, err := parsers.ParseImageName(img)
	if err != nil {
		return "", err
	}
  • 拉取镜像鉴权相关
	keyring, err := credentialprovidersecrets.MakeDockerKeyring(pullSecrets, m.keyring)
	if err != nil {
		return "", err
	}

  • 根据鉴权 传入imageService.PullImage拉取镜像
		imageRef, err := m.imageService.PullImage(imgSpec, nil, podSandboxConfig)
		if err != nil {
			klog.ErrorS(err, "Failed to pull image", "image", img)
			return "", err
		}

  • 底层就是通过grpc调用runtime 的PullImage方法
	resp, err := r.imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
		Image:         image,
		Auth:          auth,
		SandboxConfig: podSandboxConfig,
	})
	if err != nil {
		klog.ErrorS(err, "PullImage from image service failed", "image", image.Image)
		return "", err
	}

本节重点总结 :

  • GenericRuntimeManager 的syncPod方法 步骤6代表创建init容器
  • 按照init容器要求的按顺序启动
  • 调用的通用的start 启动容器,和app容器一致
  • pull the image 拉取镜像
    • 会先检查容器是否已经在本节点上
    • 然后根据容器配置的镜像拉取策略拉取
    • 底层就是通过grpc调用runtime 的PullImage方法