经常阅读k8s代码的同学应该清楚,里面大量使用了wait这个包,本文将着重讲解wait包下的wait.BackoffUntil
是如何实现指数级退避的
参考:https://zhuanlan.zhihu.com/p/574171867
什么是BackoffUntil
wait.BackoffUntil的作用是按照backoffManager返回的周期时间来进行function的重试,典型的如按照指数增加的方式返回period。其目的是为了处理Apiserver连接失败的场景
谁在使用wait.BackoffUntil
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
如何实现指数级退避
首先我们先看一下这个函数
func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) {
var t clock.Timer
for {
select {
// 如果stopCh收到消息,退出
case <-stopCh:
return
default:
}
// 是否将函数执行的时间算上,如果为false,会先获取时间并执行函数,函数执行的时间会加到里面
if !sliding {
t = backoff.Backoff()
}
func() {
defer runtime.HandleCrash()
// 执行传入的函数
f()
}()
if sliding {
t = backoff.Backoff()
}
// NOTE: b/c there is no priority selection in golang
// it is possible for this to race, meaning we could
// trigger t.C and stopCh, and t.C select falls through.
// In order to mitigate we re-check stopCh at the beginning
// of every loop to prevent extra executions of f().
select {
case <-stopCh:
return
// 等待时间到期,继续执行传入函数
case <-t.C():
}
}
}
重点其实是获取t = backoff.Backoff()
,通过BackoffManager可以获取到时间周期,如果获取到的时间周期是指数级的,就会指数级的调用函数了
BackoffManager
可以看到我们的重点是在BackoffManager如何才能返回指数级的时间周期
举例指数级退避
会依次返回类似于这样的时间周期,但是是有最大值的 ,超过最大值就会重置为初始化的周期如下面所示
1 2 4 8 16 32 (大于cap,返回最大值) 32 32 (大于backoffResetDuration,执行reset)1 2 4 ...
接口定义
type BackoffManager interface {
Backoff() clock.Timer
}
type exponentialBackoffManagerImpl struct {
backoff *Backoff
backoffTimer clock.Timer
// 上一次Backoff的时间
lastBackoffStart time.Time
// 初始化的时间周期
initialBackoff time.Duration
// reset 的支持
backoffResetDuration time.Duration
clock clock.Clock
}
首先来看一下真正实现者 exponentialBackoffManagerImpl
如何实现BackoffManager接口的
// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff.
// The returned timer must be drained before calling Backoff() the second time
func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer {
// 如果没有被初始化,则初始化
if b.backoffTimer == nil {
b.backoffTimer = b.clock.NewTimer(b.getNextBackoff())
} else {
b.backoffTimer.Reset(b.getNextBackoff())
}
return b.backoffTimer
}
可以看到重点是b.getNextBackoff()
函数
func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
b.backoff.Steps = math.MaxInt32
b.backoff.Duration = b.initialBackoff
}
b.lastBackoffStart = b.clock.Now()
return b.backoff.Step()
}
func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration {
// 判断上一次执行这个方法的时间与这一次执行方法的时间是否超过2分钟了,refactor传入的是2*time.Minute.
if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration {
// 将步长设置为最大值
b.backoff.Steps = math.MaxInt32
// 将Duration初始化为initialBackoff,refactor传入的是800*time.Millisecond
b.backoff.Duration = b.initialBackoff
}
// 如果没有超过最大值,将lastBackoffStart设置为当前时间,前面会用到
b.lastBackoffStart = b.clock.Now()
return b.backoff.Step()
}
// Step (1) returns an amount of time to sleep determined by the
// original Duration and Jitter and (2) mutates the provided Backoff
// to update its Steps and Duration.
func (b *Backoff) Step() time.Duration {
// 判断Steps是否小于1
if b.Steps < 1 {
if b.Jitter > 0 {
// 使用种子进行随机加一下
return Jitter(b.Duration, b.Jitter)
}
return b.Duration
}
// 每一次会减
b.Steps--
duration := b.Duration
// calculate the next step
if b.Factor != 0 {
// 指数级增加,最重要的计算函数,refactor传入2.0可以实现 1 2 4 8的增加
b.Duration = time.Duration(float64(b.Duration) * b.Factor)
// cap refactor传入30*time.Second代表容量
if b.Cap > 0 && b.Duration > b.Cap {
// 如果大于30*time.Second,将周期改为30*time.Second,相当于最大值
b.Duration = b.Cap
// 将Steps改为0,下次Step()函数判断时会直接返回
b.Steps = 0
}
}
if b.Jitter > 0 {
duration = Jitter(duration, b.Jitter)
}
return duration
}
通过这种方式,就可以实现指数级的返回1 2 4这样的时间周期,进而控制函数执行的延迟