client-go Informer 机制源码剖析(一)

client-go informer 机制源码剖析(一)

之 Informer 机制简介

本文档基于 v1.22 版本

1. 什么是 informer

informer 是 client-go 中重要的组件,在 kubernetes 的各个控制器中以及自定义 CRD 的控制器中有着广泛的应用,其主要用途是通过 ListWatch 机制从 K8S APIService 中获取资源信息,并将资源信息实时的同步到本地 cache 中,以减少 k8s 集群对 APIService(及 etcd) 的访问压力。k8s 控制器和 CRD 自定义控制器通过 informer 从本地 cache 中获取资源信息,实现对资源的调谐最终达到用户的期望状态。可以说 informer 机制在 k8s 中以及 crd 控制器中起着举足轻重的地位。

2. 如何使用 informer

在了解了 informer 的基本概念及在 k8s 控制器中的地位后,我们要如何来使用 informer 机制呢?

  • 在 k8s 的控制器中 informer 的使用方式可以参考 cmd/kube-controller-manager/app/controllermanager.go。先通过在 CreateControllerContext 中初始化 InformerFactory,再将具体的 informer 注册到 InformerFactory 中 (如 DeploymentInformer),再通过 controllerContext.InformerFactory.Start(controllerContext.Stop) 启动所有已注册的 informer。

  • 如在外面编写自定义的 CRD 控制器时,需要获取 k8s 资源(如 pod)或者自定义的资源时,可以通过如下方式启动 informer。代码参考例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
       ......

    // 创建 informerFactory
    informerFactory := informers.NewSharedInformerFactory(clientset, 30*time.Second)

    // 初始化具体的 informer,将具体的 informer 注册到 informerFactory 中
    // factory已经为所有k8s的内置资源对象提供了创建对应informer实例的方法,调用具体informer实例的Lister或Informer方法
    // 就完成了将informer注册到factory的过程
    podInformer := informerFactory.Core().V1().Pods()
    pInformer := podInformer.Informer() // 会创建 NewSharedIndexInformer
    pLister := podInformer.Lister() // 会提供 list 和 get 方法,供外部从 cache 中获取数据

    // 最终需要使用 informerFactory 来启动所有已经注册的 informer
    informerFactory.Start(stopCh)

    // informerFactory 会调用所有已注册的 informer 的 WaitForCacheSync 方法进行同步
    // 先启动,再同步
    informerFactory.WaitForCacheSync(stopCh)
    ......

综上所述,要使用 informer 机制来实现资源的实时获取,需要通过下面三步来实现:

    1. 初始化 informerFactory。
    1. 将所需要监听的资源所对应的 Informer 注册到 informerFactory 中去。
    1. 通过启动 informerFactory 来启动所有已经注册的 Informer。

3. informer 机制详解

3.1. SharedInformerFactory 接口

https://github.com/kubernetes/client-go/blob/master/informers/factory.go

根据上面 informer 的使用方式,可知道 informer 的入口函数为 NewSharedInformerFactory,首先我们来看看这个函数的具体定义。

1
2
3
4
5
// 代码位于: client-go/informers/factory.go
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}

该函数会返回一个 SharedInformerFactory接口。该接口封装了 k8s 内部所有资源的 informer 接口。实现该接口的具体实例为位于同一文件下 sharedInformerFactory 结构体,其中有两个非常关键的方法:StartWaitForCacheSync 函数。

  • Start 方法:会通过 goroutine 的方式来启动已注册到 informerFactory 中的 informer。每个 informer 都会启动一个 goroutine 来运行 informer 机制,已启动的 informer 不会再启动。
  • WaitForCacheSync 方法: 会对已注册到 informerFactory 中的且 start 为正常运行状态的 informer 进行同步数据到本地 Cache 中。并设定 informer 的状态为是否已同步完成。

接下来,再来看看实现 SharedInformerFactory 接口的结构体实例 sharedInformerFactory。该结构体的定义如下:

1
2
3
4
5
6
7
8
// 代码位于:client-go/informers/factory.go
type sharedInformerFactory struct {
......
informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}

该结构体中最关键的字段为最后两个字段,由定义可知及上述使用时可知,往 informerFactory 中注册 informer 时其实是将具体的 informer 通过 key-value 的方式写入 informers map 字典中。上述 Start 方法即是将 map 中的 informers 逐一启动。而 WaitForCacheSync 方法是将 startedInformers map 字典中值为 true 的 informers 进行 cache 同步,也即只有 start 正常的 informer 才能进行 cache 同步,且可同步多次。

3.2. 注册 informer

https://github.com/kubernetes/client-go/tree/master/informers

在定义完 informerFactory 后需要往 factory 中注册具体的 informer(如 PodInformer),通过上一节 sharedInformerFactory 结构体可知,往 factory 中注册 informer 其实就是往 informers map 中添加数据,而 informers 中定义了 k8s 内部所有资源的具体定义,所有资源按照 group + version 的方式组织文件结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// client-go/informers
$ tree -L 2
.
......
├── apps
│   ├── interface.go
│   ├── v1
│   ├── v1beta1
│   └── v1beta2
├── autoscaling
│   ├── interface.go
│   ├── v1
│   ├── v2beta1
│   └── v2beta2
├── batch
│   ├── interface.go
│   ├── v1
│   └── v1beta1
......

PodInformer 定义在 client-go/informers/core/v1/pod.go 中,其接口定义如下:

1
2
3
4
5
// client-go/informers/core/v1/pod.go
type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}

PodInformer 接口中定义了两个方法,这两个方法在自定义控制器时会经常用到,是 informer 机制的实现者和使用者。

  • Informer 方法:该方法法类型为 cache.SharedIndexInformer,是实现 informer 机制的主体,informer 所有相关的逻辑实现都在这里实现,后面会详细介绍该接口的具体实现。
  • Lister 方法:该方法为用户提供从 cache 中访问数据的方法,Informer 通过 ListWath 会将数据同步到 cache 中,Lister 从 cache 中 get/list 数据。

3.3. SharedInformer/SharedIndexInformer 接口

https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go

在定义完 informerFactory 后需要往 factory 中注册具体的 informer(如 PodInformer),通过上一节 sharedInformerFactory 结构体可知,往 factory 中注册 informer 其实就是往 informers map 中添加数据,其类型为 cache.SharedIndexInformer 接口。SharedIndexInformer 接口封装了 SharedInformer 接口,在其基础上添加了 index 索引的相关功能,使查找/增加数据更加快速方便。
注意: SharedInformer 接口中有两个方法非常重要,我们在写 控制器 的时候会经常用到。

  • AddEventHandler 方法:自定义CRD的控制器从 informer 中获取的数据会通过该函数将资源对象的 key 添加到控制器的工作队列 Workqueue 中,后续控制器会从 Workqueue 中获取 key 对对象进行调谐处理。
  • HasSynced 方法:定义控制器时,每个具体的 informer start 后,需要对数据进行同步 cache 操作,该方法会以参数的形式传入到每个 informer 的 WaitForCacheSync 中,用来跟踪确保每个 informer 是否完成同步操作。

SharedIndexInformer 接口的具体实现由 sharedIndexInformer 结构体来实现。其结构体定义以及方法的实现都相对比较复杂,我将留在下一节进行详细剖析,简单说就是 sharedIndexInformer 结构体中包含了 listWatch反射器deltafifocontrollerindexer 等逻辑的处理。

4. 总结

本文主要介绍了 informer 机制的简单使用,从外部使用者的角度来介绍涉及到的组件,用户只需要通过如下三步即能完美使用 informer 机制,从 cache 中获取 k8s 中的所有资源信息。

    1. 初始化 informerFactory。
    1. 将所需要监听的资源所对应的 Informer 注册到 informerFactory 中去。
    1. 通过启动 informerFactory 来启动所有已经注册的 Informer。

5. 参考


client-go Informer 机制源码剖析(一)
https://qingwei8.github.io/2021/09/01/kubernetes-client-go-informer-informer-v1-22-01/
作者
qingwei
发布于
2021年9月1日
许可协议