5 Informer 使用
前面我们在使用 Clientset 的时候了解到我们可以使用 Clientset 来获取所有的原生资源对象,那么如果我们想要去一直获取集群的资源对象数据呢?岂不是需要用一个轮询去不断执行 List() 操作?这显然是不合理的,实际上除了常用的 CRUD 操作之外,我们还可以进行 Watch 操作,可以监听资源对象的增、删、改、查操作,这样我们就可以根据自己的业务逻辑去处理这些数据了。
Watch 通过一个 event 接口监听对象的所有变化(添加、删除、更新):
// staging/src/k8s.io/apimachinery/pkg/watch/watch.go
// Interface 可以被任何知道如何 watch 和通知变化的对象实现
type Interface interface {
// Stops watching. Will close the channel returned by ResultChan(). Releases
// any resources used by the watch.
Stop()
// Returns a chan which will receive all the events. If an error occurs
// or Stop() is called, this channel will be closed, in which case the
// watch should be completely cleaned up.
ResultChan() <-chan Event
}
watch 接口的 ResultChan 方法会返回如下几种事件:
// staging/src/k8s.io/apimachinery/pkg/watch/watch.go
// EventType 定义可能的事件类型
type EventType string
const (
// 最重要得还是 Added、Modified、Deleted 增、删、改 这三种事件类型
Added EventType = "ADDED"
Modified EventType = "MODIFIED"
Deleted EventType = "DELETED"
Bookmark EventType = "BOOKMARK"
Error EventType = "ERROR"
DefaultChanSize int32 = 100
)
// Event represents a single event to a watched resource.
// +k8s:deepcopy-gen=true
type Event struct {
Type EventType
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
// * If Type is Bookmark: the object (instance of a type being watched) where
// only ResourceVersion field is set. On successful restart of watch from a
// bookmark resourceVersion, client is guaranteed to not get repeat event
// nor miss any events.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.Object
}
这个接口虽然我们可以直接去使用,但是实际上并不建议这样使用,因为往往由于集群中的资源较多,我们需要自己在客户端去维护一套缓存,而这个维护成本也是非常大的,为此 client-go 也提供了自己的实现机制,那就是 Informers
。Informers
是这个事件接口和带索引查找功能的内存缓存的组合,这样也是目前最常用的用法。Informers
第一次被调用的时候会首先在客户端调用 List 来获取全量的对象集合,然后通过 Watch 来获取增量的对象更新缓存。
也就是说当启动 informers 的时候就会将所有的对象全部拉取到我们本地的内存中。然后通过 watch 来检测集群的一个变化情况,比如说删除了一条记录就会在本地缓存中将该数据删除掉
5.1 informers 原理
一个控制器需要获取对象的时候都要访问 APISERVER ,这会给系统带来很高的负载,informers 的内存缓存就是用来解决这个问题的,此外 informers 还可以几乎实时的监控对象的变化,而不需要轮询的请求,这样就可以保证客户端的缓存数据和服务端的数据一致,并且也可以大大降低 APISERVER 的压力
也就是说在我们真正使用的时候并不会通过 clientSet 直接使用它的 Watch ,而实通过我们的 informers
如上图展示了 informer 的基本处理流程
- 以 events 事件的方式从 APISERVER 获取数据
- 然后提供一个类似于客户端的 lister 接口,从内存缓存中 get 和 list 对象,需要注意的是这里的 Lister 接口是从缓存中获取数据。如果我们的缓存没有去实力化 informer 的话,这个 Lister 调用接口是拿不到数据的
- 然后就可以去 添加、删除、更新注册事件处理程序,也就说说我们可以在 informer 中直接对 K8S 对象进行处理了
此外 informers 也有错误处理方式,当长期运行的一个 watch 连接中断时,它会尝试使用另一个 watch 请求来实现恢复连接,在不丢失任何事件的情况下恢复事件流,如果中断的时间较长,而且 APISERVER 丢失了事件(etcd 在新的 watch 请求成功之前从数据库中清除这些事件),那么 informers 就会重新 list 全量数据,从而为了保证本地的缓存和服务端 ETCD 中的数据保持一致性,
而且在重新 List 全量操作的时候还可以配置一个重新同步的周期参数,用于协调内存缓存数据和业务逻辑的数据一致性,每次过了该周期后,注册的事件处理程序就将被所有的对象调用,通常这个周期参数以分为单位,比如10分钟或者30分钟。
注意:重新同步是纯内存操作,不会触发对服务器的调用。
Informers 的这些高级特性以及超强的鲁棒性,都足以让我们不去直接使用客户端的 Watch() 方法来处理自己的业务逻辑,而且在 Kubernetes 中也有很多地方都有使用到 Informers。
但是在使用 Informers 的时候,通常每个 GroupVersionResource(GVR)只实例化一个 Informers,但是有时候我们在一个应用中往往有使用多种资源对象的需求,这个时候为了方便共享 Informers,我们可以通过使用共享 Informer 工厂来实例化一个 Informer。
共享 Informer 工厂允许我们在应用中为同一个资源共享 Informer,也就是说不同的控制器循环可以使用相同的 watch 连接到后台的 APIServer,例如,kube-controller-manager 中的控制器数据量就非常多,因为我们都知道 kube-controller-manager
需要监听所有的资源,但是对于每个资源(比如 Pod),在这个进程中只有一个 Informer。
5.2 示例
首先我们创建一个 Clientset 对象,然后使用 Clientset 来创建一个共享的 Informer 工厂,Informer 是通过 informer-gen 这个代码生成器工具自动生成的,位于 k8s.io/client-go/informers
中。
在下面代码流程:
- 首先获取到 clientSet 对象,因为 informer 也需要去访问 K8S APISERVER 对应的一个请求地址,和认证信息
- 然后利用z合格 clientSet 对象去初始化一个 informer 的初始化工厂
- 然后通过这个 informer 工厂拿到我们想要监听的一个 informer 资源类型如 deployment
- 然后启动 informer 之前就要执行 lister 和 watch 操作,因为在启动之后的话是无法获取到 lister 和 watch 操作,并且 lister 是直接在缓冲中获取数据
- 接着注册事件处理程序,在 informer中有
AddEventHandler
方法用来处理事件逻辑 - 调用
informerFactory.Start
方法去启动 informer 执行 list 和 watch 操作,这样就会将全量的数据拉取到缓存当中,等到执行informerFactory.WaitForCacheSync
方法就表示缓存中已经将数据获取下来 - 通过 lister 获取对应资源类型的本地缓存数据
1.编写代码
package main
import (
"flag"
"fmt"
"log"
"os"
"path/filepath"
"time"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows 获取用户家目录
}
func main() {
// 获取 clientset 对象
var err error
var config *rest.Config
var kubeConfig *string
// 如果是 K8S 集群外部通过获取本地用户家目录下的 k8s 配置文件实现授权
if home := homeDir(); home != "" {
kubeConfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) 输入 kubeconfig 的决定路径")
} else {
kubeConfig = flag.String("kubeconfig", "", "kubeconfig 文件绝对路径")
}
flag.Parse()
fmt.Println(*kubeConfig)
// 如果是 K8S 集群本地则通过 rest.Config 方法获取 clientset
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeConfig); err != nil {
log.Panic(err)
}
}
// 初始化 clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err)
}
// 初始化 informers factory (为了测试方便这里设置每30s重新全量 List 一次)
informersFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
// 监听想要获取的资源对象 informers , 这里是对 deployment 监听, 因为我们都知道 deployment 的 group-version 是在 apps/v1 下
deploymentInformers := informersFactory.Apps().V1().Deployments()
// 注册 informer
informer := deploymentInformers.Informer()
// 创建 lister
deploymentLister := deploymentInformers.Lister()
// 注册处理事件(add,delete,update)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
// 空接口所以这里可以通过类型断言,并输出添加的 deployment 名称
AddFunc: func(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Printf("add a deployment: %s\n", deploy.Name)
},
// 空接口所以这里可以通过类型断言,并输出更新后的 deployment的 deployment 名称
UpdateFunc: func(oldObj, newObj interface{}) {
oldDeploy := oldObj.(*v1.Deployment)
newDeploy := newObj.(*v1.Deployment)
fmt.Printf("OldDeployment:%s update NewDeployment:%s\n", oldDeploy.Name, newDeploy.Name)
},
// 空接口所以这里可以通过类型断言,并将 delete 的 deployment 输出
DeleteFunc: func(obj interface{}) {
deleteDelpoy := obj.(*v1.Deployment)
fmt.Printf("delete deployment:%s\n", deleteDelpoy.Name)
},
})
stopChan := make(chan struct{})
// 函数退出关闭 stopchan 管道
defer close(stopChan)
// 启动 informersFactory 工厂(执行 list & watch 操作)
informersFactory.Start(stopChan)
// 等待所有启动的 informer 同步完成
informersFactory.WaitForCacheSync(stopChan)
// 通过 Lister 获取缓存中的 deployment 数据
deployments, err := deploymentLister.Deployments("").List(labels.Everything())
if err != nil {
log.Panic(err)
}
// 输出
for _, deployment := range deployments {
fmt.Println(deployment.Name)
}
<-stopChan
}
2.执行
[23:27:31 root@go testk8s22]#go run main.go
/root/.kube/config
add a deployment: kube-state-metrics
add a deployment: elastic-exporter
add a deployment: busy-box
add a deployment: istio-ingressgateway
add a deployment: ratings-v1
add a deployment: blackbox
add a deployment: coredns
add a deployment: reviews-v2
add a deployment: reviews-v3
add a deployment: calico-kube-controllers
add a deployment: istiod
add a deployment: productpage-v1
add a deployment: web-tomcat-app1-deployment
add a deployment: istio-egressgateway
add a deployment: details-v1
add a deployment: reviews-v1
busy-box
istio-ingressgateway
blackbox
coredns
istio-egressgateway
reviews-v2
calico-kube-controllers
web-tomcat-app1-deployment
details-v1
kube-state-metrics
elastic-exporter
reviews-v3
istiod
productpage-v1
ratings-v1
reviews-v1
OldDeployment:reviews-v3 update NewDeployment:reviews-v3
OldDeployment:istiod update NewDeployment:istiod
OldDeployment:productpage-v1 update NewDeployment:productpage-v1
OldDeployment:web-tomcat-app1-deployment update NewDeployment:web-tomcat-app1-deployment
OldDeployment:details-v1 update NewDeployment:details-v1
OldDeployment:kube-state-metrics update NewDeployment:kube-state-metrics
OldDeployment:elastic-exporter update NewDeployment:elastic-exporter
OldDeployment:ratings-v1 update NewDeployment:ratings-v1
OldDeployment:reviews-v1 update NewDeployment:reviews-v1
OldDeployment:blackbox update NewDeployment:blackbox
OldDeployment:coredns update NewDeployment:coredns
OldDeployment:istio-egressgateway update NewDeployment:istio-egressgateway
OldDeployment:busy-box update NewDeployment:busy-box
OldDeployment:istio-ingressgateway update NewDeployment:istio-ingressgateway
OldDeployment:reviews-v2 update NewDeployment:reviews-v2
OldDeployment:calico-kube-controllers update NewDeployment:calico-kube-controllers
# 这个时候一执行程序就能获取到 add 和 update 的 deployment 操作,肯定有的朋友就会有疑惑,为什么我们还没有对 K8S 的 deployment 进行操作就能输出这些内容呢?
# 因为我们都知道 informer 第一次是将数据全量添加至缓存中所以对于 informer 来说这就是一个 add 和 update 的操作
# 并且 update 的话是因为我们在代码层面写了每隔 30s 就会全量的将 deployment 的数据拉取下来,所以这时候针对我们的 informer 本地缓存来说就是更新操作了
3.到 K8S 终端创建 testnginx deployment
[23:34:32 root@k8s-master ~]#kubectl create deployment testnginx --image=nginx:1.18
4.informer 程序也能够动态获取
5.删除 testnginx deployment
[23:34:47 root@k8s-master ~]#kubectl delete deployment testnginx
6.程序动态获取