informer is all you need
在前面Controller小节中,我们以优化控制器为线索正式引入了client-go
中的Informer
组件。
Informer
本质上是一种本地资源缓存机制,旨在与集群的etcd
同步。因此,它被定义在k8s.io/client-go/tools/cache
包中。
由于Informer
组件众多,在正式介绍Informer
之前,我们会先从介绍构成Informer
的组件开始讲起。
Reflector组件
Reflector
是构成Informer
框架的一部分,它最主要作用是监听资源的变更。

Reflector
组件的历史Reflector
的出现甚至要早于Informer
本身,Reflector
的概念在 Pull Request #758时引入。
官方开发者这样描述Reflector
:
Reflector watches a specified resource and causes all changes to be reflected in the given store.
意思是说,Reflector
监听指定的资源类型并使所有的变更事件反映到指定的存储(缓存)中。这就是Reflector
名称的由来。
关于"指定的存储(缓存)"我们在接下来我们还会详细介绍。在这里,我们只想告诉你Reflector
的由来。
不过,Reflector
本身并不含有任何用于监听的组件,我们需要在创建Reflector
时,由调用者传入"监听逻辑",Reflector
帮助我们执行监听。
为了便于说明,我们以Reflector
的初始化函数作为对照:
func NewReflector(
lw ListerWatcher,
expectedType interface{},
store Store,
resyncPeriod time.Duration) *Reflector {
// ...
}
我们先忽略创建Reflector
的其他三个参数,仅关注lw
这个参数,它就是需要由调用者提供的"监听逻辑"。它的类型是ListerWatcher
:
type ListerWatcher interface {
List(options api.ListOptions) (runtime.Object, error)
Watch(options api.ListOptions) (watch.Interface, error)
}
我们可以看到ListerWatcher
包含一个Watch()
方法——这就是我们所说的调用者需要提供的"监听逻辑"。
除此以外,ListerWatcher
还包括一个List()
方法用于获取资源集合——在这里我们先暂且不说明需要它的原因,此刻我们还没有介绍Reflector
组件整体的运行逻辑,贸然的解释或许只会让你更加疑惑。
client-go
中任何一个资源客户端其实都"满足"这个接口。例如,pods
资源客户端:
type PodInterface interface {
Create(*v1.Pod) (*v1.Pod, error)
Update(*v1.Pod) (*v1.Pod, error)
UpdateStatus(*v1.Pod) (*v1.Pod, error)
Delete(name string, options *v1.DeleteOptions) error
DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error
Get(name string) (*v1.Pod, error)
List(opts v1.ListOptions) (*v1.PodList, error)
Watch(opts v1.ListOptions) (watch.Interface, error)
Patch(name string, pt api.PatchType, data []byte, subresources ...string) (result *v1.Pod, err error)
// ...
}
其实,站在Go语法的角度,资源客户端并没有真正意义上实现ListerWatcher
接口。我们以上述Pod资源客户端为例,请注意它的List()
方法的返回类型上与ListerWatcher
的细微差别:Pod资源客户端的List()
返回类型是*v1.PodList
。
在Go语言中,List(opts v1.ListOptions) (*v1.PodList, error)
并不等同于实现了ListerWatcher
的List(options api.ListOptions) (runtime.Object, error)
方法,尽管*v1.PodList
类实现了runtime.Object
接口。
// ❌ can not compile
var _ ListerWatcher = clientset.Core().Pods("default")
当然,这涉及到Go语法本身的内容,不在我们的讨论范围之内。
我们通常在创建ListerWatcher
类时,其实我们只需要借助client-go
为我们预先准备好的cache.ListWatch
2类型再稍加结合资源客户端本身就可以快捷地创建出一个ListerWatcher
对象。例如:
lw := cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods("default").List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Pods("default").Watch(options)
},
}
// ✅ can compile
var _ ListerWatcher = lw
除了"监听逻辑"以外,Reflector
中还包括两个缓存组件:DeltaFIFO
以及Indexer
。其中:
Indexer
缓存也需要在创建Reflector
时由调用者传入;DeltaFIFO
缓存则是在Reflector
内被创建。
DeltaFIFO
DeltaFIFO
中的"Delta"可以理解为"变更",它用于保存被监听资源的每一个变更事件(例如Added
,Updated
,Deleted
)。
"FIFO" 表示同时它也是一个先入先出队列。保存其实就是"入队"的过程。
DeltaFIFO
中的基本单元"变更"Delta
的定义为:
type Delta struct {
Type DeltaType
Object interface{}
}
它包括具体的变更类型(Added
,Updated
,Deleted
)以及变更后的资源对象本身。

注意,队列本身的粒度是资源,而非事件。每个资源对应一个保存本资源变更事件的切片([]Delta
)。
意思是说,当我们从这个队列pop出一项时,我们得到的是某个资源对应的当前所有变更事件的序列。
下图大致总结了DeltaFIFO
的内部结构:

其中queue
结构是队列的本体,而items
结构则用于辅助保存每个资源对应变更事件切片。
DeltaFIFO
的历史在最初版本的Reflector
中,其实并不包含DeltaFIFO
组件,Reflector
会把监听到的资源变更事件直接"反映"到指定资源缓存。
DeltaFIFO
的历史最早可以追溯到 Pull Request #5437。
它作为作者将要引入的Informer
框架的一部分而被引入。
在该PR中,根据作者对DeltaFIFO
的注释:
DeltaFIFO is a producer-consumer queue, where a Reflector is indended to be the producer, and the consumer is whatever calls the Pop() method.
DeltaFIFO solves this use case:
- You want to process every object change (delta) at most once.
- When you process an object, you want to see everything that's happened to it since you last processed it.
- You want to process the deletion of objects.
- You might want to periodically reprocess objects.
可以看出DeltaFIFO
设计之初就有意识地以资源作为队列"粒度"——以便在处理一个资源时(从队列中pop出一项),可以看到自从上一次处理它时所有的变更事件。
另外,一个值得注意的细节是,作者在设计DeltaFIFO
伊始就有意识地让这个结构可以阶段性地重新处理所有事件。这个"阶段性地重新处理"我们还会在后续Reflector的resync机制中详细介绍。
Indexer
Indexer
则是真正的本地资源缓存——它保存的并不是资源变更事件,而是资源对象本身。
另外我们从它的名字——索引器可以看出它更侧重于对本地缓存资源的索引能力。
索引器支持自定义添加索引逻辑(函数),以让索引器可以根据不同条件检索资源。
如何向索引器添加索引逻辑不在本书的讨论范围之内。Indexer
已经内置了一个名为MetaNamespaceIndexFunc
的索引函数,因此,Indexer
默认可以按照命名为空间来检索资源。
下图大致总结了Indexer
的内部结构:

其中indexes
结构用于保存索引函数,indices
结构保存则是索引函数建立的索引,items
结构保存的才是资源对象。
Reflector的运转
至此,我们已经把构成Reflector
的模块介绍完毕。接下来,我们将介绍Reflector
这些模块之间是如何产生联系的。
Reflector
组件通过Run()
方法开启运转,Run()
函数的主体是执行ListAndWatch()
方法:
func (r *Reflector) Run(stopCh <-chan struct{}) {
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
}
而ListAndWatch()
方法中则会使用传入的ListerWatcher
结构lw
。
具体来说,在ListAndWatch()
方法中:
lw
的List()
会被首先执行并且它仅被执行一次,用于先一次性获取所有资源集合,得到的结果将存入Indexer
缓存中;- 之后通过调用
lw
的Watch()
函数持续获取资源的变更事件并将结果压入DeltaFIFO
缓存中。
我们也可以把Reflector
的ListAndWatch()
方法理解成DeltaFIFO
的"生产者"。
需要注意的是,Reflector
组件通过Run()
函数启动之后,DeltaFIFO
缓存开始持续不断加入变更事件。而另一个Indexer
缓存在一次性获得资源集合后就没有再变动了。
Indexer
缓存之后的增、删、改则依赖另一个叫做Controller
的组件驱动。

Controller组件
注意本小节的Controller
并不是我们上一节所说的Kubernetes控制器,本节的Controller
仅是client-go
中的一个数据结构,也是构成Informer
的直接组件。
在结构上,上文介绍的Reflector
其实是Controller
组件内的一个模块。并且Controller
组件就是DeltaFIFO
的"消费者"。
下面我们将通过Controller
的运转逻辑来介绍Controller
是如何消费`DeltaFIFO
队列的。
Controller
通过Run()
方法开启运转:
func (c *controller) Run(stopCh <-chan struct{}) {
// ...
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
Run()
函数的主体是processLoop()
函数,它直接负责持续驱动组件的运转。
Controller
的processLoop()
方法如下所示,逻辑非常简单:
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
它会持续不断(for
循环)从Reflector
子组件的DeltaFIFO
队列(c.config.Queue
)中pop出一项(即某资源的完整变更事件切片Deltas
),同时交由Controller
的Process
函数(上述代码的c.config.Process
)处理,Process
函数如下所示:
func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
// ...
switch d.Type {
case Synced, Added, Updated:
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
}
h.OnUpdate(old, obj)
} else {
if err := clientState.Add(obj); err != nil {
return err
}
h.OnAdd(obj)
}
case Deleted:
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(obj)
}
}
return nil
}
Process
函数以一个资源完整的变更事件切片(Deltas
)为输入:
- 针对资源的每个变更事件(
for
循环),根据类型(witch
语句),触发相应的"回调函数":OnUpdate()
,OnUpdate()
,OnDelete()
——这些"回调函数"在创建Controller
组件时传入。 它们实际可以认为是Controller
对外提供的资源变更事件的通知及处理机制;注请暂时忽略代码中出现的
Synced
事件类型,从Kubernetes API Server中返回的资源变更事件仅有Added
,Updated
,Deleted
这三种。我们在后续Reflector的resync机制会介绍Synced
事件类型的产生。 - 针对资源的每个变更事件,根据类型,调用
Reflector
的Indexer
索引缓存子组件(在上述代码中对应clientState
)的相应方法更新(Update()
)、新增(Add()
)、删除(Delete()
)相应资源——因此,我们也可以认为在Controller
的驱动下,Indexer
缓存才开始真正成为"反映"etcd
的本地缓存。

至此,我们发现Controller
组件似乎已经完全契合我们之前watch机制到Informer小节中对Kubernetes控制器的两个要求。
- 资源变更事件的通知;
- 资源的本地缓存。
Controller
组件的历史Controller
的出现要早于Informer
。
Controller
最早可以追溯到 Issue #4877,作者提到:
There are pitfalls when using the watch + list pattern. Many of our controllers use this pattern, and we expect many future pieces of code to use this pattern. Therefore, we'd like to provide a framework/example which lets you fill out three functions (list, watch, and process) to get a shiny new bug-free (at least the list+watch part) controller.
可见Controller
的提出是为了解决使用"watch+list
模式"编写控制器时容易产生bug的问题。官方开发者希望提供一个控制器框架让编写控制器变得简单并且不易发生错误。
最终,Controller
的PR Pull Request #5270被合入Kubernetes v0.15.0中。
那么我们所说的Informer
组件又是什么呢?
Informer
Informer
包含了一个Controller
组件。除去Controller
本身,Informer
本身并不含有任何实质性的组件或者驱动组件运行的模块。
Informer
的作用像是一个专门给调用者封装的简单易用的交互"壳",它负责:
- 传入
Controller
组件所需要的:- "回调函数"——在代码中对应的封装类型为
ResourceEventHandlerFuncs
Reflector
子组件的ListerWatcher
结构;
- "回调函数"——在代码中对应的封装类型为
- 初始化
Controller
组件; - 并且向调用者返回:
Controller
组件的引用以便调用者可以控制它的启停;Reflector
子组件的Indexer
缓存的引用以便调用者可以用它检索资源。
在本节中,我们没有给出Informer
的创建函数的源码作为参照。原因是由于所含组件众多,Informer
初始化函数本身逻辑较为混乱。感兴趣的读者可以根据上述说明自行找源码对照理解。
Informer
的整体结构如下图所示:

Informer
的历史Informer
的历史最早可以追溯到 Pull Request #6546。
引入Informer
结构的动机根据作者commit信息(880f922):
Add easy setup for simple controller
可见Informer
就是作为辅助便于创建Controller
结构而存在的。
Reflector的resync机制
在最后,为了让本节所有知识形成闭环,我们还需要再介绍一下Reflector
组件的resync机制。
我们并没有把此小节并入Reflector组件中主要有两个原因。一方面是不想加重读者的阅读负担,
另一方面,在该小节中,我们仅仅知道了DeltaFIFO
的"生产者"——ListAndWatch()
函数,彼时还没有介绍DeltaFIFO
的"消费者"——Controller组件中的"回调函数"。在没有知道完整的背景之前,不利于我们对resync机制的理解。
resync机制是指定时将Indexer
组件中的数据重新同步回DeltaFIFO
队列中。
在很多文章或者技术博客中,它们称之为Informer
的resync机制。其实定时的resync
是在Reflector
的ListAndWatch()
方法中启动的。
另外,resync动作是Reflector
中两个缓存组件DeltaFIFO
和Indexer
之间的数据同步,
同时也正如我们在Informer小节中所强调的那样,Informer
是一个给调用者封装的一个"壳"。
因此,Reflector
的resync机制是一种更为严谨的表述。
那么为什么要引入resync机制呢?
官方开发者考虑到回调函数在处理(消费)从DeltaFIFO
队列中pop出的事件时,可能会存在处理失败的情况,引入定时的resync机制让这些处理失败的事件有了重新被处理的机会。
那么经过resync重新放入DeltaFIFO
队列的事件,和直接从kube-apiserver
中监听到的事件有什么不一样呢?
首先,不同于kube-apiserver
中监听到的三种事件类型(Added
,Updated
,Deleted
),它的类型为Sync
。
除此以外,Sync
类型的事件在压入DeltaFIFO
时,会检查DeltaFIFO
中该资源的事件切片([]Delta
)此刻是否已经含有事件了(事件切片长度大于0),如果有,那么则放弃resync。
原因是DeltaFIFO
中此刻已经存在的资源的事件(从kube-apiserver
中"新鲜出炉")一定比resync机制中"已经出炉"但准备"回炉重造"的资源更"新"。这种压入前的检查会减少重复消费发生的机率。
另外,Reflector
组件也支持通过参数指定resync动作的发生频率。现在我们再回过头来看看Reflector
的初始化函数:
func NewReflector(
lw ListerWatcher,
expectedType interface{},
store Store,
resyncPeriod time.Duration) *Reflector {
// ...
}
这四个参数分别是:
lw
——某资源类型的监听函数expectedType
——所监听资源类型的kindstore
——资源本地缓存Indexer
resyncPeriod
——用于指定resync动作的发生频率(0
表示不执行resync)
resync
机制的历史resync机制的引入最早可以追溯到Pull Request #4923。正如作者起的PR标题一样:
Allow reflector to do full resync periodically
意思是说允许Reflector
可以周期性地做全同步。
不过最初版本的resync其实并不是指从Indexer
中同步回DeltaFIFO
中,在此PR中,作者所谓的全同步就是指周期性地重新List()
一次。
直到 Issue #23394,社区开发者才决定将List()
从resync中分离。
最终,将List()
从resync机制中分离的 Pull Request #24142被合并入Kubernetes v1.3.0-alpha.5。
使用Informer🎈
可以说本小节之前的所有内容都在为如何使用Informer
组件做铺垫。
我们首先来看如何初始化一个Informer
:
初始化Informer(基于client-go v1.5.2)
import (
"fmt"
"k8s.io/client-go/1.5/kubernetes"
"k8s.io/client-go/1.5/pkg/api"
v1 "k8s.io/client-go/1.5/pkg/api/v1"
"k8s.io/client-go/1.5/pkg/runtime"
"k8s.io/client-go/1.5/pkg/watch"
"k8s.io/client-go/1.5/rest"
"k8s.io/client-go/1.5/tools/cache"
)
func main() {
// creates the in-cluster config
config, _ := rest.InClusterConfig()
// creates the clientset
clientset, _ := kubernetes.NewForConfig(config)
cache.NewInformer(&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return clientset.Core().Pods("default").List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return clientset.Core().Pods("default").Watch(options)
},
},
&v1.Pod{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
p := obj.(*v1.Pod)
fmt.Printf("pod %s is added", p.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
o := oldObj.(*v1.Pod)
n := newObj.(*v1.Pod)
fmt.Printf("pod %s is updated from %v to %v", o.Name, o.Spec, n.Spec)
},
DeleteFunc: func(obj interface{}) {
p := obj.(*v1.Pod)
fmt.Printf("pod %s is deleted", p.Name)
},
})
}
初始化Informer
一共需要四个参数:
- 某资源类型的监听函数
- 所监听资源类型的kind
- resync发生频率
- 三种事件对应的回调函数(
AddFunc
,UpdateFunc
,DeleteFunc
)
这四个参数已经在之前相应的各个组件中介绍过了,在这里我们不再赘述。
接下来我们看看如何使用Informer
。正如我们在前面所说,Informer
组件本身并不具有任何实质性的组件,我们使用Informer
本质上在使用它所代为创建的两个组件Controller
和Indexer
。
NewInformer
函数将代为创建的这两个组件返回以供调者使用:
localCache, controller := cache.NewInformer(...)
之后我们可以通过调用Controller
的Run()
函数让整个Informer
运转起来。
controller.Run()
而Indexer
作为一个本地缓存结构,client-go
为它提供了丰富的方法以供检索资源:
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
GetByKey(key string) (item interface{}, exists bool, err error)
// ...
}
小结
我们从构成Informer
的基本组件说起,最终描绘了Informer
的整体结构。
简单来说,你可以把Informer
理解成一个由某一资源类型的List()
和Watch()
函数驱动的本地资源缓存,同时它也提供了针对资源变更事件的通知及处理机制;
不同于其他文章或者博客介绍Informer
的思路,在本节中:
- 我们没有基于
client-go
中更被广泛使用的SharedIndexInformer
。我们认为介绍更为基础的Informer
结构更容易让读者接受。另外,为了让实现的控制器尽可能保持精简,我们使用的也是Informer
而非SharedIndexInformer
。因此,我们不再介绍SharedIndexInformer
的有关内容。 - 我们并没有在介绍
Informer
的同时引入client-go
中的workqueue
。 根据Informer
的源码,它本身运行并不需要workqueue
组件。client-go
中提供的workqueue
仅仅是一种增强。 它可以被使用在本节所说的"回调函数"中,相比于在"回调函数"中单独直接处理各个事件, 我们可以在各个"回调函数"里把事件先暂时放入到workqueue
中,我们再统一从workqueue
中"捞取"并处理这些事件。client-go
中已经为我们提供了多种队列类型:通用队列、限速队列、延时队列等,我们可以借助这些官方开发者实现高效且可靠安全的队列来帮助我们编写高质量的程序。 为了降低读者阅读的负担,并且由于本书实现的控制器极为精简,它没有再利用workqueue
增强,我们在本小节也不再赘述workqueue
组件。
请注意,informer框架本身并不是在Kubernetes v1.5.0时才提出。在Kubernetes v1.5.0之前,informer框架耦合在
kubernetes
库内(k8s.io/kubernetes/pkg/controller/framework/informers
)。 在Kubernetes v1.4.0-alpha.3,Kubernetes社区才开始有了单独的客户端库client-go
。在Kubernetes v1.5.0时完成了informer框架到client-go
的迁移。关于
client-go
库的历史可以参考Kubernetes的Issue #28559。关于informer框架迁入
client-go
的历史可以参考client-go
的 Issue #4以及Kubernetes的 Pull Request #32718和 Pull Request #34989。↩在本节中,
cache
指的是client-go
的tools
包中的cache
子包。↩