client-go Informer 机制源码剖析(二)

client-go informer 机制源码剖析(二)

之 sharedIndexInformer 具体实现

本文档基于 v1.22 版本

1. Informer 机制回顾

我们在 client-go informer 机制源码剖析(一) 中已经简单介绍了 informer 的使用方式以及其相关结构。如下简单回顾一下 informer 的创建过程:

1
NewSharedInformerFactory() --> SharedInformerFactory(接口) --> PodInformer(具体的 Informer 接口) --> cache.SharedIndexInformer(接口)

其中 NewSharedInformerFactory() 函数用来构建 informerFactory 工厂函数,会得到包含 k8s 内部所有资源的 SharedInformerFactory 接口,再将具体的的 Informer(如 PodInformer)注册到 informerFactory 中完成 informer 的初始化操作,最后再启动 informerFactory,同步 cache 等。而注册具体 Informer 时其实就是初始化/构建 cache.SharedIndexInformer,完成 ListWatch反射器deltafifoindexercontrollerprocesser(回调函数) 等组件的初始化及启动过程。如下给出一张各个组件之间关系的图(注意:图中的 WorkQueueControl loop 先不讲解,后面会讲解自定义控制器时讲到):

custom_controller

2. sharedIndexInformer 详解

https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go

2.1. sharedIndexInformer 结构体

sharedIndexInformer 实现了 SharedIndexInformer 接口,而 SharedIndexInformer 封装了 SharedInformer 接口,并在其基础上添加了 AddIndexers/GetIndexer 的操作。sharedIndexInformer 包含几个非常重要的组件,如下结构体只显示重要组件:

1
2
3
4
5
6
7
8
// client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
indexer Indexer // 本地索引存储,基于 index + threadSafeStore 实现的本地 cache 存储
controller Controller // 用于管理 sharedIndexInformer 中的各个组件,并控制各个组件的正常运行
processor *sharedProcessor // 回调处理函数,从 fifo 队列中消费(Pop)消息进行处理
listerWatcher ListerWatcher // 反射器通过 listwatch 从 k8s apiserver 的 etcd 中获取数据,并将数据放入 fifo 中
......
}

上面 4个组件在 sharedIndexInformer 中至关重要,后面会专门讲解,这里只做简单介绍,下图简单描述了下 sharedIndexInformer 结构体内 4个重要组件的关心和流程图:
sharedindexinformer01

下面将根据流程及上图所示各个模块,从上到下逐一讲解其功能:

  • listerWatcher: 该组件实现 list-watch 机制,其初始化位置在每个具体的 Informer (如 PodInformer)中,在 controller 中会将其封装在 反射器 中,使反射器具体有 list-watch 的功能。list-watch 通过 k8s apiservice 从 etcd 中获取数据,并将数据放入 deltafifo 队列中。
  • controller: 会通过一个 Config 结构体将 deltafifolisterWatcherProcess 关联起来,并管理各个模块的功能实现对象获取、入队列、从队列消费的整个流程。其中 deltafifolisterWatcher 会用来构造 反射器,并启动一个 goruntine 来运行 反射器Processcontroller 的 processLoop() 函数调用,并通过一个死循环来实现从队里中 Pop 数据,并将数据传给回调函数 Process 来进行处理,注意:该 controller 和我们所说的k8s调谐控制器不一样
  • process: 由 sharedIndexInformer ( HandleDeltas) 来实现,并由 controller 管理调用,controller 通过一个 processLoop(死循环)来不断的从 fifo 队列中 Pop 数据,并将数据回调给 HandleDeltas 函数进行处理。
  • indexer: 本地索引存储,基于 index + threadSafeStore 实现的本地存储。deltafifo 的底层也是基于 indexer 实现的,HandleDeltas 函数从队列中得到的数据会保存到本地 indexer 中。

2.2. sharedIndexInformer 重要方法

sharedIndexInformer 结构实现了 SharedIndexInformer 接口,其中最为重要的是下面几个方法,其中AddEventHandlerHasSynced 函数我们在自定义控制器时会经常使用到:

  • Run 方法:会初始化 informer 相关组件,并启动 controller 来管理各个组件的正常运行,该方法在 informerFactory.Start 中被调用,当启动 informerFactory 时会调用该方法来启动具体的 informer。
  • HasSynced 方法:用来判断 informer 同步 cache 中数据是否成功,该函数在使用控制器时使用 informer 从cache 中同步数据是会被调用。
  • AddEventHandler 方法:资源事件处理函数,在初始化 informer 时需要指定 资源事件回调(处理)函数 ResourceEventHandler,该函数定义了从 deltafifo 中取出的 key 经 processorListener 函数封装后需要以何种方式添加到自定义控制器的 WorkQueue 队列中。
  • HandleDeltas 方法:是 Process 回调函数的实际执行函数,Process 从 DeltaFifo 队列中 Pop 出的数据最终会回调交给 HandleDeltas 函数进行处理。HandleDeltas 函数主要做两件事情:一是将 pop 出的数据保存到 indexer 中存储;二是将 pop 出的数据传给 processorListener 处理函数进行分发,最中会调用 AddEventHandler 中定义的函数将 key 添加到控制器的 WorkQueue 队列中。

下面详细讲解几个重要的方法。

2.2.1. sharedIndexInformer.Run 方法

https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L368

上面已讲到 sharedIndexInformer.Run 方法的调用在 informerFactory.Start 时会调用该方法来启动具体的 informer 机制。该方法的真正定义是在 client-go/tools/cache/shared_informer.go 文件下定义的。其具体定义实现如下(只展示部分重要代码逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()

// 1. 初始化 Deltafifo,fifo的底层实现是基于 indexer
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
})

// 2. 通过 Config 结构体将 fifo、listerwatcher、Process 三者传给 controller 进行管理
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
Process: s.HandleDeltas,
......
}

// 3. 通过 config 文件 New 了一个 controller
func() {
....
s.controller = New(cfg)
}()

.......
// 4. 通过一个协程来运行 processor 函数,该函数会对从 deltafifo 队列中取出来的数据进行分发到控制器的 workqueue 中
wg.StartWithChannel(processorStopCh, s.processor.run)
......
// 5. 启动 informer 中的 controller,该 controller 和我们所说的k8s调谐控制器不一样
s.controller.Run(stopCh)
}

该函数主要做 5 件事情:

  1. 初始化 Deltafifo,fifo的底层实现是基于 indexer。
  2. 通过 Config 结构体将 fifo、listerwatcher、Process 三者传给 controller 进行管理。
  3. 通过 config 文件 New 了一个 controller。
  4. 通过一个协程来运行 processor 函数,该函数会对从 deltafifo 队列中取出来的数据进行分发到控制器的 workqueue 中。
  5. 启动 informer 中的 controller,该 controller 和我们所说的k8s调谐控制器不一样。

2.2.2. sharedIndexInformer.HandleDeltas 方法

https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L527

上面已提到 HandleDeltas 方法是 Process 回调函数的实际执行函数,Process 从 DeltaFifo 队列中 Pop 出的数据最终会回调交给 HandleDeltas 函数进行处理。HandleDeltas 函数主要做两件事情:一是将 pop 出的数据保存到 indexer 中存储;二是将 pop 出的数据传给 processorListener 处理函数进行分发,最中会调用 AddEventHandler 中定义的函数将 key 添加到控制器的 WorkQueue 队列中。其具体定义如下所示(只显示重要代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}

isSync := false
switch {
case d.Type == Sync:
// Sync events are only propagated to listeners that requested resync
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

根据上面 HandleDeltas 函数的具体实现可知,HandleDeltas 函数主要进行两个操作:

  1. 根据事件的类型对事件执行不同的 index 操作,如 Add/Update/Delete 操作。
  2. 根据事件的类型先对事件进行封装,然后再对封装后的事件执行 distribute 分发操作。

3. 总结

本文主要讲述了 sharedIndexInformer 结构体中 4个非常重要的组件并简单介绍了其功能,分别是 listerWatchercontrollerprocessindexer 4个主要成员。其中 controller 又通过 Config 组件来关联 fifolisterwatcherprocess,并通过 controller 来管理各组件的功能,使其相互协调完成 informer 机制的整体功能。同时也简单说明了下 反射器listerwatcherdeltafifo 两部分组成,这样使得 反射器 具有了 list-watch 的功能,并能不断往 deltafifo 队列中生产数据。随后简单讲述了 sharedIndexInformer 中几个重要的函数 RunHasSyncedHandleDeltasAddEventHandler,并对 RunHandleDeltas 函数进行详细了解。后期我将根据 informer 机制的整体工作流程来详细介绍各个组件的组成和具体功能。

  • listerwatcher 机制是怎么实现的?
  • reflector 反射器是怎么实现的?其主要功能是什么?
  • deltafifo 队列是如何实现的,其底层 store 是什么?deltafifo 与 indexer 的关系?
  • controller 是如何工作的?
  • process 回调函数如何处理从 deltafifo 中 pop 出的数据?processorListener 是什么?它是如何工作的?
  • workqueue 是如何工作的?

4. 参考

5. 推荐文章


client-go Informer 机制源码剖析(二)
https://qingwei8.github.io/2021/09/06/kubernetes-client-go-informer-informer-v1-22-02/
作者
qingwei
发布于
2021年9月6日
许可协议