client-go Informer 机制源码剖析(二)
client-go informer 机制源码剖析(二)
之 sharedIndexInformer 具体实现
本文档基于
v1.22
版本
1. Informer 机制回顾
我们在 client-go informer 机制源码剖析(一) 中已经简单介绍了 informer 的使用方式以及其相关结构。如下简单回顾一下 informer 的创建过程:
1 |
|
其中 NewSharedInformerFactory() 函数用来构建 informerFactory 工厂函数,会得到包含 k8s 内部所有资源的 SharedInformerFactory 接口,再将具体的的 Informer(如 PodInformer)注册到 informerFactory 中完成 informer 的初始化操作,最后再启动 informerFactory,同步 cache 等。而注册具体 Informer 时其实就是初始化/构建 cache.SharedIndexInformer,完成 ListWatch
、反射器
、deltafifo
、indexer
、controller
、processer(回调函数)
等组件的初始化及启动过程。如下给出一张各个组件之间关系的图(注意:图中的 WorkQueue
和 Control loop
先不讲解,后面会讲解自定义控制器时讲到):
2. sharedIndexInformer 详解
https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go
2.1. sharedIndexInformer 结构体
sharedIndexInformer
实现了 SharedIndexInformer 接口,而 SharedIndexInformer 封装了 SharedInformer 接口,并在其基础上添加了 AddIndexers/GetIndexer
的操作。sharedIndexInformer 包含几个非常重要的组件,如下结构体只显示重要组件:
1 |
|
上面 4个组件在 sharedIndexInformer 中至关重要,后面会专门讲解,这里只做简单介绍,下图简单描述了下 sharedIndexInformer 结构体内 4个重要组件的关心和流程图:
下面将根据流程及上图所示各个模块,从上到下逐一讲解其功能:
listerWatcher
: 该组件实现 list-watch 机制,其初始化
位置在每个具体的 Informer (如 PodInformer)中,在 controller 中会将其封装在反射器
中,使反射器具体有list-watch
的功能。list-watch 通过 k8s apiservice 从 etcd 中获取数据,并将数据放入 deltafifo 队列中。controller
: 会通过一个 Config 结构体将deltafifo
、listerWatcher
、Process
关联起来,并管理各个模块的功能实现对象获取、入队列、从队列消费的整个流程。其中deltafifo
和listerWatcher
会用来构造反射器
,并启动一个 goruntine 来运行反射器
。Process
由controller
的 processLoop() 函数调用,并通过一个死循环来实现从队里中Pop
数据,并将数据传给回调函数 Process 来进行处理,注意:该 controller 和我们所说的k8s调谐控制器不一样。process
: 由 sharedIndexInformer ( HandleDeltas) 来实现,并由 controller 管理调用,controller 通过一个 processLoop(死循环)来不断的从 fifo 队列中Pop
数据,并将数据回调给HandleDeltas
函数进行处理。indexer
: 本地索引存储,基于index + threadSafeStore
实现的本地存储。deltafifo 的底层也是基于 indexer 实现的,HandleDeltas 函数从队列中得到的数据会保存到本地 indexer 中。
2.2. sharedIndexInformer 重要方法
sharedIndexInformer 结构实现了 SharedIndexInformer 接口,其中最为重要的是下面几个方法,其中AddEventHandler
和 HasSynced
函数我们在自定义控制器时会经常使用到:
Run
方法:会初始化 informer 相关组件,并启动 controller 来管理各个组件的正常运行,该方法在 informerFactory.Start 中被调用,当启动 informerFactory 时会调用该方法来启动具体的 informer。HasSynced
方法:用来判断 informer 同步 cache 中数据是否成功,该函数在使用控制器时使用 informer 从cache 中同步数据是会被调用。AddEventHandler
方法:资源事件处理函数,在初始化 informer 时需要指定 资源事件回调(处理)函数 ResourceEventHandler,该函数定义了从 deltafifo 中取出的 key 经 processorListener 函数封装后需要以何种方式添加到自定义控制器的WorkQueue
队列中。HandleDeltas
方法:是Process
回调函数的实际执行函数,Process
从 DeltaFifo 队列中Pop
出的数据最终会回调交给 HandleDeltas 函数进行处理。HandleDeltas 函数主要做两件事情:一是将 pop 出的数据保存到 indexer 中存储;二是将 pop 出的数据传给 processorListener 处理函数进行分发,最中会调用AddEventHandler
中定义的函数将key
添加到控制器的WorkQueue
队列中。
下面详细讲解几个重要的方法。
2.2.1. sharedIndexInformer.Run 方法
https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L368
上面已讲到 sharedIndexInformer.Run 方法的调用在 informerFactory.Start 时会调用该方法来启动具体的 informer 机制。该方法的真正定义是在 client-go/tools/cache/shared_informer.go
文件下定义的。其具体定义实现如下(只展示部分重要代码逻辑):
1 |
|
该函数主要做 5 件事情:
- 初始化 Deltafifo,fifo的底层实现是基于 indexer。
- 通过 Config 结构体将 fifo、listerwatcher、Process 三者传给 controller 进行管理。
- 通过 config 文件 New 了一个 controller。
- 通过一个协程来运行 processor 函数,该函数会对从 deltafifo 队列中取出来的数据进行分发到控制器的 workqueue 中。
- 启动 informer 中的 controller,该 controller 和我们所说的k8s调谐控制器不一样。
2.2.2. sharedIndexInformer.HandleDeltas 方法
https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L527
上面已提到 HandleDeltas
方法是 Process
回调函数的实际执行函数,Process
从 DeltaFifo 队列中 Pop
出的数据最终会回调交给 HandleDeltas 函数进行处理。HandleDeltas 函数主要做两件事情:一是将 pop 出的数据保存到 indexer 中存储;二是将 pop 出的数据传给 processorListener 处理函数进行分发,最中会调用 AddEventHandler
中定义的函数将 key
添加到控制器的 WorkQueue
队列中。其具体定义如下所示(只显示重要代码):
1 |
|
根据上面 HandleDeltas 函数的具体实现可知,HandleDeltas 函数主要进行两个操作:
- 根据事件的类型对事件执行不同的 index 操作,如 Add/Update/Delete 操作。
- 根据事件的类型先对事件进行封装,然后再对封装后的事件执行
distribute
分发操作。
3. 总结
本文主要讲述了 sharedIndexInformer 结构体中 4个非常重要的组件并简单介绍了其功能,分别是 listerWatcher
、controller
、process
、indexer
4个主要成员。其中 controller
又通过 Config 组件来关联 fifo
、listerwatcher
、process
,并通过 controller 来管理各组件的功能,使其相互协调完成 informer 机制的整体功能。同时也简单说明了下 反射器
由 listerwatcher
和 deltafifo
两部分组成,这样使得 反射器
具有了 list-watch
的功能,并能不断往 deltafifo 队列中生产数据。随后简单讲述了 sharedIndexInformer 中几个重要的函数 Run
、HasSynced
、HandleDeltas
、AddEventHandler
,并对 Run
和 HandleDeltas
函数进行详细了解。后期我将根据 informer 机制的整体工作流程来详细介绍各个组件的组成和具体功能。
- listerwatcher 机制是怎么实现的?
- reflector 反射器是怎么实现的?其主要功能是什么?
- deltafifo 队列是如何实现的,其底层 store 是什么?deltafifo 与 indexer 的关系?
- controller 是如何工作的?
- process 回调函数如何处理从 deltafifo 中 pop 出的数据?processorListener 是什么?它是如何工作的?
- workqueue 是如何工作的?