前言
相信熟悉infomer的同学应该清楚,在informer中使用DeltaFIFO先进先出队列来消费refactor获取到的对象,不知道大家看到这里是否跟我有一样的疑问,就是在出队判断队列中是否有数据时,为什么使用for,而不是使用if,其实这里关系到一个并发编程问题,下面我会详细讲一下这个点
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 队列中是否有数据
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
// 如果队列关闭了这直接返回错误
if f.closed {
return nil, ErrFIFOClosed
}
// 没有数据就一直等待
f.cond.Wait()
}
// 取出第一个对象键
id := f.queue[0]
// 更新下queue,相当于把第一个元素弹出去了
f.queue = f.queue[1:]
// 对象计数减一,当减到0就说明外部已经全部同步完毕了
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
// 取出真正的对象,queue里面是对象键
item, ok := f.items[id]
if !ok {
// This should never happen
// Item 可能后来被删除了。
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
continue
}
// 删除对象
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
// 如果处理出错,那就重新入队列
// 在生产者消费者模型中,如果消费者消费失败,还是会吧消息放到queue中重新入队消费
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
我这里简单画了一张流程图来模拟队列出队的情况
- g1 g2协程不断从队列中取数据,队列为空会被阻塞
- 队列新增数据,发送广播消息,g1 g2协程被唤醒
- g1协程抢到了这次机会,消费了这一条数据
- g2协程开始消费,此时队列为空,但使用了if,不会进行第二次判断,会继续向下执行,代码出现问题
思考一下,如果使用了for循环
...
for len(f.queue) == 0 {
if f.closed {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 取出第一个对象键
id := f.queue[0]
...
当g2被唤醒时,会再一次判断队列是否为空,由于此时g1已经消费了数据,队列为空所以并不会向下执行,代码不会出现问题