前言

相信熟悉infomer的同学应该清楚,在informer中使用DeltaFIFO先进先出队列来消费refactor获取到的对象,不知道大家看到这里是否跟我有一样的疑问,就是在出队判断队列中是否有数据时,为什么使用for,而不是使用if,其实这里关系到一个并发编程问题,下面我会详细讲一下这个点

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		// 队列中是否有数据
		for len(f.queue) == 0 {
			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
			// When Close() is called, the f.closed is set and the condition is broadcasted.
			// Which causes this loop to continue and return from the Pop().
			// 如果队列关闭了这直接返回错误
			if f.closed {
				return nil, ErrFIFOClosed
			}
			// 没有数据就一直等待
			f.cond.Wait()
		}
		// 取出第一个对象键
		id := f.queue[0]
		// 更新下queue,相当于把第一个元素弹出去了
		f.queue = f.queue[1:]
		// 对象计数减一,当减到0就说明外部已经全部同步完毕了
		if f.initialPopulationCount > 0 {
			f.initialPopulationCount--
		}
		// 取出真正的对象,queue里面是对象键
		item, ok := f.items[id]
		if !ok {
			// This should never happen
			// Item 可能后来被删除了。
			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
			continue
		}
		// 删除对象
		delete(f.items, id)

		err := process(item)
		if e, ok := err.(ErrRequeue); ok {
			// 如果处理出错,那就重新入队列
			// 在生产者消费者模型中,如果消费者消费失败,还是会吧消息放到queue中重新入队消费
			f.addIfNotPresent(id, item)
			err = e.Err
		}
		// Don't need to copyDeltas here, because we're transferring
		// ownership to the caller.
		return item, err
	}
}

我这里简单画了一张流程图来模拟队列出队的情况
image-1682426080063

  1. g1 g2协程不断从队列中取数据,队列为空会被阻塞
  2. 队列新增数据,发送广播消息,g1 g2协程被唤醒
  3. g1协程抢到了这次机会,消费了这一条数据
  4. g2协程开始消费,此时队列为空,但使用了if,不会进行第二次判断,会继续向下执行,代码出现问题

思考一下,如果使用了for循环

       ...
	for len(f.queue) == 0 {
		if f.closed {
			return nil, ErrFIFOClosed
		}
		f.cond.Wait()
	}
	// 取出第一个对象键
	id := f.queue[0]
        ...

当g2被唤醒时,会再一次判断队列是否为空,由于此时g1已经消费了数据,队列为空所以并不会向下执行,代码不会出现问题