经常阅读k8s代码的同学应该清楚,里面大量使用了wait这个包,本文将着重讲解wait包下的wait.BackoffUntil是如何实现指数级退避的

参考:https://zhuanlan.zhihu.com/p/574171867

什么是BackoffUntil

wait.BackoffUntil的作用是按照backoffManager返回的周期时间来进行function的重试,典型的如按照指数增加的方式返回period。其目的是为了处理Apiserver连接失败的场景

谁在使用wait.BackoffUntil

func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

如何实现指数级退避

首先我们先看一下这个函数

func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
	var t clock.Timer
	for {
		select {
        // 如果stopCh收到消息,退出
		case <-stopCh:
			return
		default:
		}

	   // 是否将函数执行的时间算上,如果为false,会先获取时间并执行函数,函数执行的时间会加到里面
		if !sliding {
			t = backoff.Backoff()
		}

		func() {
			defer runtime.HandleCrash()
             // 执行传入的函数
			f()
		}()

		if sliding {
			t = backoff.Backoff()
		}

		// NOTE: b/c there is no priority selection in golang
		// it is possible for this to race, meaning we could
		// trigger t.C and stopCh, and t.C select falls through.
		// In order to mitigate we re-check stopCh at the beginning
		// of every loop to prevent extra executions of f().
		select {
		case <-stopCh:
			return
            // 等待时间到期,继续执行传入函数
		case <-t.C():
		}
	}
}

重点其实是获取t = backoff.Backoff(),通过BackoffManager可以获取到时间周期,如果获取到的时间周期是指数级的,就会指数级的调用函数了

BackoffManager

可以看到我们的重点是在BackoffManager如何才能返回指数级的时间周期

举例指数级退避

会依次返回类似于这样的时间周期,但是是有最大值的 ,超过最大值就会重置为初始化的周期如下面所示

1 2 4 8 16 32 (大于cap,返回最大值) 32 32 (大于backoffResetDuration,执行reset)1 2 4 ...

接口定义

type BackoffManager interface {
	Backoff() clock.Timer
}

type exponentialBackoffManagerImpl struct {
	backoff      *Backoff
	backoffTimer clock.Timer
	// 上一次Backoff的时间
	lastBackoffStart time.Time
	// 初始化的时间周期
	initialBackoff time.Duration
	// reset 的支持
	backoffResetDuration time.Duration
	clock                clock.Clock
}

首先来看一下真正实现者 exponentialBackoffManagerImpl如何实现BackoffManager接口的

// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
// The returned timer must be drained before calling Backoff() the second time
func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
	// 如果没有被初始化,则初始化
	if b.backoffTimer == nil {
		b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
	} else {
		b.backoffTimer.Reset(b.getNextBackoff())
	}
	return b.backoffTimer
}

可以看到重点是b.getNextBackoff()函数

func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
	if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
		b.backoff.Steps = math.MaxInt32
		b.backoff.Duration = b.initialBackoff
	}
	b.lastBackoffStart = b.clock.Now()
	return b.backoff.Step()
}

func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
	// 判断上一次执行这个方法的时间与这一次执行方法的时间是否超过2分钟了,refactor传入的是2*time.Minute.
	if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
		// 将步长设置为最大值
		b.backoff.Steps = math.MaxInt32
		// 将Duration初始化为initialBackoff,refactor传入的是800*time.Millisecond
		b.backoff.Duration = b.initialBackoff
	}
	// 如果没有超过最大值,将lastBackoffStart设置为当前时间,前面会用到
	b.lastBackoffStart = b.clock.Now()
	return b.backoff.Step()
}

// Step (1) returns an amount of time to sleep determined by the
// original Duration and Jitter and (2) mutates the provided Backoff
// to update its Steps and Duration.
func (b *Backoff) Step() time.Duration {
	// 判断Steps是否小于1
	if b.Steps < 1 {
		if b.Jitter > 0 {
			// 使用种子进行随机加一下
			return Jitter(b.Duration, b.Jitter)
		}
		return b.Duration
	}
	// 每一次会减
	b.Steps--

	duration := b.Duration

	// calculate the next step
	if b.Factor != 0 {
		// 指数级增加,最重要的计算函数,refactor传入2.0可以实现 1 2 4 8的增加
		b.Duration = time.Duration(float64(b.Duration) * b.Factor)
		// cap refactor传入30*time.Second代表容量
		if b.Cap > 0 && b.Duration > b.Cap {
			// 如果大于30*time.Second,将周期改为30*time.Second,相当于最大值
			b.Duration = b.Cap
			// 将Steps改为0,下次Step()函数判断时会直接返回
			b.Steps = 0
		}
	}

	if b.Jitter > 0 {
		duration = Jitter(duration, b.Jitter)
	}
	return duration
}

通过这种方式,就可以实现指数级的返回1 2 4这样的时间周期,进而控制函数执行的延迟