informer is all you need
在前面Controller小节中,我们以优化控制器为线索正式引入了client-go中的Informer组件。
Informer本质上是一种本地资源缓存机制,旨在与集群的etcd同步。因此,它被定义在k8s.io/client-go/tools/cache包中。
由于Informer组件众多,在正式介绍Informer之前,我们会先从介绍构成Informer的组件开始讲起。
Reflector组件
Reflector是构成Informer框架的一部分,它最主要作用是监听资源的变更。

Reflector组件的历史Reflector的出现甚至要早于Informer本身,Reflector的概念在 Pull Request #758时引入。
官方开发者这样描述Reflector:
Reflector watches a specified resource and causes all changes to be reflected in the given store.
意思是说,Reflector监听指定的资源类型并使所有的变更事件反映到指定的存储(缓存)中。这就是Reflector名称的由来。
关于"指定的存储(缓存)"我们在接下来我们还会详细介绍。在这里,我们只想告诉你Reflector的由来。
不过,Reflector本身并不含有任何用于监听的组件,我们需要在创建Reflector时,由调用者传入"监听逻辑",Reflector帮助我们执行监听。
为了便于说明,我们以Reflector的初始化函数作为对照:
func NewReflector(
lw ListerWatcher,
expectedType interface{},
store Store,
resyncPeriod time.Duration) *Reflector {
// ...
}
我们先忽略创建Reflector的其他三个参数,仅关注lw这个参数,它就是需要由调用者提供的"监听逻辑"。它的类型是ListerWatcher:
type ListerWatcher interface {
List(options api.ListOptions) (runtime.Object, error)
Watch(options api.ListOptions) (watch.Interface, error)
}
我们可以看到ListerWatcher包含一个Watch()方法——这就是我们所说的调用者需要提供的"监听逻辑"。
除此以外,ListerWatcher还包括一个List()方法用于获取资源集合——在这里我们先暂且不说明需要它的原因,此刻我们还没有介绍Reflector组件整体的运行逻辑,贸然的解释或许只会让你更加疑惑。
client-go中任何一个资源客户端其实都"满足"这个接口。例如,pods资源客户端:
type PodInterface interface {
Create(*v1.Pod) (*v1.Pod, error)
Update(*v1.Pod) (*v1.Pod, error)
UpdateStatus(*v1.Pod) (*v1.Pod, error)
Delete(name string, options *v1.DeleteOptions) error
DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error
Get(name string) (*v1.Pod, error)
List(opts v1.ListOptions) (*v1.PodList, error)
Watch(opts v1.ListOptions) (watch.Interface, error)
Patch(name string, pt api.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error)
// ...
}
其实,站在Go语法的角度,资源客户端并没有真正意义上实现ListerWatcher接口。我们以上述Pod资源客户端为例,请注意它的List()方法的返回类型上与ListerWatcher的细微差别:Pod资源客户端的List()返回类型是*v1.PodList。
在Go语言中,List(opts v1.ListOptions) (*v1.PodList, error)并不等同于实现了ListerWatcher的List(options api.ListOptions) (runtime.Object, error)
方法,尽管*v1.PodList类实现了runtime.Object接口。
// ❌ can not compile
var _ ListerWatcher = clientset.Core().Pods("default")
当然,这涉及到Go语法本身的内容,不在我们的讨论范围之内。
我们通常在创建ListerWatcher类时,其实我们只需要借助client-go为我们预先准备好的cache.ListWatch2类型再稍加结合资源客户端本身就可以快捷地创建出一个ListerWatcher对象。例如:
lw := cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods("default").List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Pods("default").Watch(options)
},
}
// ✅ can compile
var _ ListerWatcher = lw
除了"监听逻辑"以外,Reflector中还包括两个缓存组件:DeltaFIFO以及Indexer。其中:
Indexer缓存也需要在创建Reflector时由调用者传入;DeltaFIFO缓存则是在Reflector内被创建。
DeltaFIFO
DeltaFIFO中的"Delta"可以理解为"变更",它用于保存被监听资源的每一个变更事件(例如Added,Updated,Deleted)。
"FIFO" 表示同时它也是一个先入先出队列。保存其实就是"入队"的过程。
DeltaFIFO中的基本单元"变更"Delta的定义为:
type Delta struct {
Type DeltaType
Object interface{}
}
它包括具体的变更类型(Added,Updated,Deleted)以及变更后的资源对象本身。

注意,队列本身的粒度是资源,而非事件。每个资源对应一个保存本资源变更事件的切片([]Delta)。
意思是说,当我们从这个队列pop出一项时,我们得到的是某个资源对应的当前所有变更事件的序列。
下图大致总结了DeltaFIFO的内部结构:

其中queue结构是队列的本体,而items结构则用于辅助保存每个资源对应变更事件切片。
DeltaFIFO的历史在最初版本的Reflector中,其实并不包含DeltaFIFO组件,Reflector会把监听到的资源变更事件直接"反映"到指定资源缓存。
DeltaFIFO的历史最早可以追溯到 Pull Request #5437。
它作为作者将要引入的Informer框架的一部分而被引入。
在该PR中,根据作者对DeltaFIFO的注释:
DeltaFIFO is a producer-consumer queue, where a Reflector is indended to be the producer, and the consumer is whatever calls the Pop() method.
DeltaFIFO solves this use case:
- You want to process every object change (delta) at most once.
- When you process an object, you want to see everything that's happened to it since you last processed it.
- You want to process the deletion of objects.
- You might want to periodically reprocess objects.
可以看出DeltaFIFO设计之初就有意识地以资源作为队列"粒度"——以便在处理一个资源时(从队列中pop出一项),可以看到自从上一次处理它时所有的变更事件。
另外,一个值得注意的细节是,作者在设计DeltaFIFO伊始就有意识地让这个结构可以阶段性地重新处理所有事件。这个"阶段性地重新处理"我们还会在后续Reflector的resync机制中详细介绍。
Indexer
Indexer则是真正的本地资源缓存——它保存的并不是资源变更事件,而是资源对象本身。
另外我们从它的名字——索引器可以看出它更侧重于对本地缓存资源的索引能力。
索引器支持自定义添加索引逻辑(函数),以让索引器可以根据不同条件检索资源。
如何向索引器添加索引逻辑不在本书的讨论范围之内。Indexer已经内置了一个名为MetaNamespaceIndexFunc的索引函数,因此,Indexer默认可以按照命名为空间来检索资源。
下图大致总结了Indexer的内部结构:

其中indexes结构用于保存索引函数,indices结构保存则是索引函数建立的索引,items结构保存的才是资源对象。
Reflector的运转
至此,我们已经把构成Reflector的模块介绍完毕。接下来,我们将介绍Reflector这些模块之间是如何产生联系的。
Reflector组件通过Run()方法开启运转,Run()函数的主体是执行ListAndWatch()方法:
func (r *Reflector) Run(stopCh <-chan struct{}) {
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
}
而ListAndWatch()方法中则会使用传入的ListerWatcher结构lw。
具体来说,在ListAndWatch()方法中:
lw的List()会被首先执行并且它仅被执行一次,用于先一次性获取所有资源集合,得到的结果将存入Indexer缓存中;- 之后通过调用
lw的Watch()函数持续获取资源的变更事件并将结果压入DeltaFIFO缓存中。
我们也可以把Reflector的ListAndWatch()方法理解成DeltaFIFO的"生产者"。
需要注意的是,Reflector组件通过Run()函数启动之后,DeltaFIFO缓存开始持续不断加入变更事件。而另一个Indexer缓存在一次性获得资源集合后就没有再变动了。
Indexer缓存之后的增、删、改则依赖另一个叫做Controller的组件驱动。

Controller组件
注意本小节的Controller并不是我们上一节所说的Kubernetes控制器,本节的Controller仅是client-go中的一个数据结构,也是构成Informer的直接组件。
在结构上,上文介绍的Reflector其实是Controller组件内的一个模块。并且Controller组件就是DeltaFIFO的"消费者"。
下面我们将通过Controller的运转逻辑来介绍Controller是如何消费`DeltaFIFO队列的。
Controller通过Run()方法开启运转:
func (c *controller) Run(stopCh <-chan struct{}) {
// ...
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
Run()函数的主体是processLoop()函数,它直接负责持续驱动组件的运转。
Controller的processLoop()方法如下所示,逻辑非常简单:
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
它会持续不断(for循环)从Reflector子组件的DeltaFIFO队列(c.config.Queue)中pop出一项(即某资源的完整变更事件切片Deltas),同时交由Controller的Process函数(上述代码的c.config.Process)处理,Process函数如下所示:
func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
// ...
switch d.Type {
case Synced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(obj)
}
}
return nil
}
Process函数以一个资源完整的变更事件切片(Deltas)为输入:
- 针对资源的每个变更事件(
for循环),根据类型(witch语句),触发相应的"回调函数":OnUpdate(),OnUpdate(),OnDelete()——这些"回调函数"在创建Controller组件时传入。 它们实际可以认为是Controller对外提供的资源变更事件的通知及处理机制;注请暂时忽略代码中出现的
Synced事件类型,从Kubernetes API Server中返回的资源变更事件仅有Added,Updated,Deleted这三种。我们在后续Reflector的resync机制会介绍Synced事件类型的产生。 - 针对资源的每个变更事件,根据类型,调用
Reflector的Indexer索引缓存子组件(在上述代码中对应clientState)的相应方法更新(Update())、新增(Add())、删除(Delete())相应资源——因此,我们也可以认为在Controller的驱动下,Indexer缓存才开始真正成为"反映"etcd的本地缓存。

至此,我们发现Controller组件似乎已经完全契合我们之前watch机制到Informer小节中对Kubernetes控制器的两个要求。
- 资源变更事件的通知;
- 资源的本地缓存。
Controller组件的历史Controller的出现要早于Informer。
Controller最早可以追溯到 Issue #4877,作者提到:
There are pitfalls when using the watch + list pattern. Many of our controllers use this pattern, and we expect many future pieces of code to use this pattern. Therefore, we'd like to provide a framework/example which lets you fill out three functions (list, watch, and process) to get a shiny new bug-free (at least the list+watch part) controller.
可见Controller的提出是为了解决使用"watch+list模式"编写控制器时容易产生bug的问题。官方开发者希望提供一个控制器框架让编写控制器变得简单并且不易发生错误。
最终,Controller的PR Pull Request #5270被合入Kubernetes v0.15.0中。
那么我们所说的Informer组件又是什么呢?
Informer
Informer包含了一个Controller组件。除去Controller本身,Informer本身并不含有任何实质性的组件或者驱动组件运行的模块。
Informer的作用像是一个专门给调用者封装的简单易用的交互"壳",它负责:
- 传入
Controller组件所需要的:- "回调函数"——在代码中对应的封装类型为
ResourceEventHandlerFuncs Reflector子组件的ListerWatcher结构;
- "回调函数"——在代码中对应的封装类型为
- 初始化
Controller组件; - 并且向调用者返回:
Controller组件的引用以便调用者可以控制它的启停;Reflector子组件的Indexer缓存的引用以便调用者可以用它检索资源。
在本节中,我们没有给出Informer的创建函数的源码作为参照。原因是由于所含组件众多,Informer初始化函数本身逻辑较为混乱。感兴趣的读者可以根据上述说明自行找源码对照理解。
Informer的整体结构如下图所示:

Informer的历史Informer的历史最早可以追溯到 Pull Request #6546。
引入Informer结构的动机根据作者commit信息(880f922):
Add easy setup for simple controller
可见Informer就是作为辅助便于创建Controller结构而存在的。
Reflector的resync机制
在最后,为了让本节所有知识形成闭环,我们还需要再介绍一下Reflector组件的resync机制。
我们并没有把此小节并入Reflector组件中主要有两个原因。一方面是不想加重读者的阅读负担,
另一方面,在该小节中,我们仅仅知道了DeltaFIFO的"生产者"——ListAndWatch()函数,彼时还没有介绍DeltaFIFO的"消费者"——Controller组件中的"回调函数"。在没有知道完整的背景之前,不利于我们对resync机制的理解。
resync机制是指定时将Indexer组件中的数据重新同步回DeltaFIFO队列中。
在很多文章或者技术博客中,它们称之为Informer的resync机制。其实定时的resync是在Reflector的ListAndWatch()方法中启动的。
另外,resync动作是Reflector中两个缓存组件DeltaFIFO和Indexer之间的数据同步,
同时也正如我们在Informer小节中所强调的那样,Informer是一个给调用者封装的一个"壳"。
因此,Reflector的resync机制是一种更为严谨的表述。
那么为什么要引入resync机制呢?
官方开发者考虑到回调函数在处理(消费)从DeltaFIFO队列中pop出的事件时,可能会存在处理失败的情况,引入定时的resync机制让这些处理失败的事件有了重新被处理的机会。
那么经过resync重新放入DeltaFIFO队列的事件,和直接从kube-apiserver 中监听到的事件有什么不一样呢?
首先,不同于kube-apiserver中监听到的三种事件类型(Added,Updated,Deleted),它的类型为Sync。
除此以外,Sync类型的事件在压入DeltaFIFO时,会检查DeltaFIFO中该资源的事件切片([]Delta)此刻是否已经含有事件了(事件切片长度大于0),如果有,那么则放弃resync。
原因是DeltaFIFO中此刻已经存在的资源的事件(从kube-apiserver中"新鲜出炉")一定比resync机制中"已经出炉"但准备"回炉重造"的资源更"新"。这种压入前的检查会减少重复消费发生的机率。
另外,Reflector组件也支持通过参数指定resync动作的发生频率。现在我们再回过头来看看Reflector的初始化函数:
func NewReflector(
lw ListerWatcher,
expectedType interface{},
store Store,
resyncPeriod time.Duration) *Reflector {
// ...
}
这四个参数分别是:
lw——某资源类型的监听函数expectedType——所监听资源类型的kindstore——资源本地缓存IndexerresyncPeriod——用于指定resync动作的发生频率(0表示不执行resync)
resync机制的历史resync机制的引入最早可以追溯到Pull Request #4923。正如作者起的PR标题一样:
Allow reflector to do full resync periodically
意思是说允许Reflector可以周期性地做全同步。
不过最初版本的resync其实并不是指从Indexer中同步回DeltaFIFO中,在此PR中,作者所谓的全同步就是指周期性地重新List()一次。
直到 Issue #23394,社区开发者才决定将List()从resync中分离。
最终,将List()从resync机制中分离的 Pull Request #24142被合并入Kubernetes v1.3.0-alpha.5。
使用Informer🎈
可以说本小节之前的所有内容都在为如何使用Informer组件做铺垫。
我们首先来看如何初始化一个Informer:
初始化Informer(基于client-go v1.5.2)
import (
"fmt"
"k8s.io/client-go/1.5/kubernetes"
"k8s.io/client-go/1.5/pkg/api"
v1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/runtime"
"k8s.io/client-go/1.5/pkg/watch"
"k8s.io/client-go/1.5/rest"
"k8s.io/client-go/1.5/tools/cache"
)
func main() {
// creates the in-cluster config
config, _ := rest.InClusterConfig()
// creates the clientset
clientset, _ := kubernetes.NewForConfig(config)
cache.NewInformer(&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods("default").List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Pods("default").Watch(options)
},
},
&v1.Pod{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p := obj.(*v1.Pod)
fmt.Printf("pod %s is added", p.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
o := oldObj.(*v1.Pod)
n := newObj.(*v1.Pod)
fmt.Printf("pod %s is updated from %v to %v", o.Name, o.Spec, n.Spec)
},
DeleteFunc: func(obj interface{}) {
p := obj.(*v1.Pod)
fmt.Printf("pod %s is deleted", p.Name)
},
})
}
初始化Informer一共需要四个参数:
- 某资源类型的监听函数
- 所监听资源类型的kind
- resync发生频率
- 三种事件对应的回调函数(
AddFunc,UpdateFunc,DeleteFunc)
这四个参数已经在之前相应的各个组件中介绍过了,在这里我们不再赘述。
接下来我们看看如何使用Informer。正如我们在前面所说,Informer组件本身并不具有任何实质性的组件,我们使用Informer本质上在使用它所代为创建的两个组件Controller和Indexer。
NewInformer函数将代为创建的这两个组件返回以供调者使用:
localCache, controller := cache.NewInformer(...)
之后我们可以通过调用Controller的Run()函数让整个Informer运转起来。
controller.Run()
而Indexer作为一个本地缓存结构,client-go为它提供了丰富的方法以供检索资源:
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// ...
}
小结
我们从构成Informer的基本组件说起,最终描绘了Informer的整体结构。
简单来说,你可以把Informer理解成一个由某一资源类型的List()和Watch()函数驱动的本地资源缓存,同时它也提供了针对资源变更事件的通知及处理机制;
不同于其他文章或者博客介绍Informer的思路,在本节中:
- 我们没有基于
client-go中更被广泛使用的SharedIndexInformer。我们认为介绍更为基础的Informer结构更容易让读者接受。另外,为了让实现的控制器尽可能保持精简,我们使用的也是Informer而非SharedIndexInformer。因此,我们不再介绍SharedIndexInformer的有关内容。 - 我们并没有在介绍
Informer的同时引入client-go中的workqueue。 根据Informer的源码,它本身运行并不需要workqueue组件。client-go中提供的workqueue仅仅是一种增强。 它可以被使用在本节所说的"回调函数"中,相比于在"回调函数"中单独直接处理各个事件, 我们可以在各个"回调函数"里把事件先暂时放入到workqueue中,我们再统一从workqueue中"捞取"并处理这些事件。client-go中已经为我们提供了多种队列类型:通用队列、限速队列、延时队列等,我们可以借助这些官方开发者实现高效且可靠安全的队列来帮助我们编写高质量的程序。 为了降低读者阅读的负担,并且由于本书实现的控制器极为精简,它没有再利用workqueue增强,我们在本小节也不再赘述workqueue组件。
请注意,informer框架本身并不是在Kubernetes v1.5.0时才提出。在Kubernetes v1.5.0之前,informer框架耦合在
kubernetes库内(k8s.io/kubernetes/pkg/controller/framework/informers)。 在Kubernetes v1.4.0-alpha.3,Kubernetes社区才开始有了单独的客户端库client-go。在Kubernetes v1.5.0时完成了informer框架到client-go的迁移。关于
client-go库的历史可以参考Kubernetes的Issue #28559。关于informer框架迁入
client-go的历史可以参考client-go的 Issue #4以及Kubernetes的 Pull Request #32718和 Pull Request #34989。↩在本节中,
cache指的是client-go的tools包中的cache子包。↩