client-go:5 Informer 使用

5 Informer 使用

前面我们在使用 Clientset 的时候了解到我们可以使用 Clientset 来获取所有的原生资源对象,那么如果我们想要去一直获取集群的资源对象数据呢?岂不是需要用一个轮询去不断执行 List() 操作?这显然是不合理的,实际上除了常用的 CRUD 操作之外,我们还可以进行 Watch 操作,可以监听资源对象的增、删、改、查操作,这样我们就可以根据自己的业务逻辑去处理这些数据了。

Watch 通过一个 event 接口监听对象的所有变化(添加、删除、更新):

// staging/src/k8s.io/apimachinery/pkg/watch/watch.go

// Interface 可以被任何知道如何 watch 和通知变化的对象实现
type Interface interface {
    // Stops watching. Will close the channel returned by ResultChan(). Releases
    // any resources used by the watch.
    Stop()

    // Returns a chan which will receive all the events. If an error occurs
    // or Stop() is called, this channel will be closed, in which case the
    // watch should be completely cleaned up.
    ResultChan() <-chan Event
}

watch 接口的 ResultChan 方法会返回如下几种事件:

// staging/src/k8s.io/apimachinery/pkg/watch/watch.go

// EventType 定义可能的事件类型
type EventType string

const (
    // 最重要得还是 Added、Modified、Deleted 增、删、改 这三种事件类型
    Added    EventType = "ADDED"
    Modified EventType = "MODIFIED"
    Deleted  EventType = "DELETED"
    Bookmark EventType = "BOOKMARK"
    Error    EventType = "ERROR"

    DefaultChanSize int32 = 100
)

// Event represents a single event to a watched resource.
// +k8s:deepcopy-gen=true
type Event struct {
    Type EventType

    // Object is:
    //  * If Type is Added or Modified: the new state of the object.
    //  * If Type is Deleted: the state of the object immediately before deletion.
    //  * If Type is Bookmark: the object (instance of a type being watched) where
    //    only ResourceVersion field is set. On successful restart of watch from a
    //    bookmark resourceVersion, client is guaranteed to not get repeat event
    //    nor miss any events.
    //  * If Type is Error: *api.Status is recommended; other types may make sense
    //    depending on context.
    Object runtime.Object
}

这个接口虽然我们可以直接去使用,但是实际上并不建议这样使用,因为往往由于集群中的资源较多,我们需要自己在客户端去维护一套缓存,而这个维护成本也是非常大的,为此 client-go 也提供了自己的实现机制,那就是 InformersInformers 是这个事件接口和带索引查找功能的内存缓存的组合,这样也是目前最常用的用法。Informers 第一次被调用的时候会首先在客户端调用 List 来获取全量的对象集合,然后通过 Watch 来获取增量的对象更新缓存。

也就是说当启动 informers 的时候就会将所有的对象全部拉取到我们本地的内存中。然后通过 watch 来检测集群的一个变化情况,比如说删除了一条记录就会在本地缓存中将该数据删除掉

5.1 informers 原理

一个控制器需要获取对象的时候都要访问 APISERVER ,这会给系统带来很高的负载,informers 的内存缓存就是用来解决这个问题的,此外 informers 还可以几乎实时的监控对象的变化,而不需要轮询的请求,这样就可以保证客户端的缓存数据和服务端的数据一致,并且也可以大大降低 APISERVER 的压力


也就是说在我们真正使用的时候并不会通过 clientSet 直接使用它的 Watch ,而实通过我们的 informers

如上图展示了 informer 的基本处理流程

  • 以 events 事件的方式从 APISERVER 获取数据
  • 然后提供一个类似于客户端的 lister 接口,从内存缓存中 get 和 list 对象,需要注意的是这里的 Lister 接口是从缓存中获取数据。如果我们的缓存没有去实力化 informer 的话,这个 Lister 调用接口是拿不到数据的
  • 然后就可以去 添加、删除、更新注册事件处理程序,也就说说我们可以在 informer 中直接对 K8S 对象进行处理了

此外 informers 也有错误处理方式,当长期运行的一个 watch 连接中断时,它会尝试使用另一个 watch 请求来实现恢复连接,在不丢失任何事件的情况下恢复事件流,如果中断的时间较长,而且 APISERVER 丢失了事件(etcd 在新的 watch 请求成功之前从数据库中清除这些事件),那么 informers 就会重新 list 全量数据,从而为了保证本地的缓存和服务端 ETCD 中的数据保持一致性,

而且在重新 List 全量操作的时候还可以配置一个重新同步的周期参数,用于协调内存缓存数据和业务逻辑的数据一致性,每次过了该周期后,注册的事件处理程序就将被所有的对象调用,通常这个周期参数以分为单位,比如10分钟或者30分钟。

注意:重新同步是纯内存操作,不会触发对服务器的调用。

Informers 的这些高级特性以及超强的鲁棒性,都足以让我们不去直接使用客户端的 Watch() 方法来处理自己的业务逻辑,而且在 Kubernetes 中也有很多地方都有使用到 Informers。

但是在使用 Informers 的时候,通常每个 GroupVersionResource(GVR)只实例化一个 Informers,但是有时候我们在一个应用中往往有使用多种资源对象的需求,这个时候为了方便共享 Informers,我们可以通过使用共享 Informer 工厂来实例化一个 Informer。

共享 Informer 工厂允许我们在应用中为同一个资源共享 Informer,也就是说不同的控制器循环可以使用相同的 watch 连接到后台的 APIServer,例如,kube-controller-manager 中的控制器数据量就非常多,因为我们都知道 kube-controller-manager 需要监听所有的资源,但是对于每个资源(比如 Pod),在这个进程中只有一个 Informer。

5.2 示例

首先我们创建一个 Clientset 对象,然后使用 Clientset 来创建一个共享的 Informer 工厂,Informer 是通过 informer-gen 这个代码生成器工具自动生成的,位于 k8s.io/client-go/informers 中。

在下面代码流程:

  • 首先获取到 clientSet 对象,因为 informer 也需要去访问 K8S APISERVER 对应的一个请求地址,和认证信息
  • 然后利用z合格 clientSet 对象去初始化一个 informer 的初始化工厂
  • 然后通过这个 informer 工厂拿到我们想要监听的一个 informer 资源类型如 deployment
  • 然后启动 informer 之前就要执行 lister 和 watch 操作,因为在启动之后的话是无法获取到 lister 和 watch 操作,并且 lister 是直接在缓冲中获取数据
  • 接着注册事件处理程序,在 informer中有 AddEventHandler 方法用来处理事件逻辑
  • 调用 informerFactory.Start 方法去启动 informer 执行 list 和 watch 操作,这样就会将全量的数据拉取到缓存当中,等到执行 informerFactory.WaitForCacheSync 方法就表示缓存中已经将数据获取下来
  • 通过 lister 获取对应资源类型的本地缓存数据

1.编写代码

package main

import (
    "flag"
    "fmt"
    "log"
    "os"
    "path/filepath"
    "time"

    v1 "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
)

func homeDir() string {
    if h := os.Getenv("HOME"); h != "" {
        return h
    }
    return os.Getenv("USERPROFILE") // windows 获取用户家目录
}

func main() {
    // 获取 clientset 对象
    var err error
    var config *rest.Config
    var kubeConfig *string

    // 如果是 K8S 集群外部通过获取本地用户家目录下的 k8s 配置文件实现授权
    if home := homeDir(); home != "" {
        kubeConfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(可选) 输入 kubeconfig 的决定路径")
    } else {
        kubeConfig = flag.String("kubeconfig", "", "kubeconfig 文件绝对路径")
    }
    flag.Parse()
    fmt.Println(*kubeConfig)

    // 如果是 K8S 集群本地则通过 rest.Config 方法获取 clientset
    if config, err = rest.InClusterConfig(); err != nil {
        if config, err = clientcmd.BuildConfigFromFlags("", *kubeConfig); err != nil {
            log.Panic(err)
        }
    }

    // 初始化 clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Panic(err)
    }

    // 初始化 informers factory (为了测试方便这里设置每30s重新全量 List 一次)
    informersFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)

    // 监听想要获取的资源对象 informers , 这里是对 deployment 监听, 因为我们都知道 deployment 的 group-version 是在 apps/v1 下
    deploymentInformers := informersFactory.Apps().V1().Deployments()

    // 注册 informer
    informer := deploymentInformers.Informer()

    // 创建 lister
    deploymentLister := deploymentInformers.Lister()

    // 注册处理事件(add,delete,update)
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{

        // 空接口所以这里可以通过类型断言,并输出添加的 deployment 名称
        AddFunc: func(obj interface{}) {
            deploy := obj.(*v1.Deployment)
            fmt.Printf("add a deployment: %s\n", deploy.Name)
        },

        // 空接口所以这里可以通过类型断言,并输出更新后的 deployment的 deployment 名称
        UpdateFunc: func(oldObj, newObj interface{}) {
            oldDeploy := oldObj.(*v1.Deployment)
            newDeploy := newObj.(*v1.Deployment)
            fmt.Printf("OldDeployment:%s update NewDeployment:%s\n", oldDeploy.Name, newDeploy.Name)
        },

        // 空接口所以这里可以通过类型断言,并将 delete 的 deployment 输出
        DeleteFunc: func(obj interface{}) {
            deleteDelpoy := obj.(*v1.Deployment)
            fmt.Printf("delete deployment:%s\n", deleteDelpoy.Name)
        },
    })

    stopChan := make(chan struct{})

    // 函数退出关闭 stopchan 管道
    defer close(stopChan)
    // 启动 informersFactory 工厂(执行 list & watch 操作)
    informersFactory.Start(stopChan)

    // 等待所有启动的 informer 同步完成
    informersFactory.WaitForCacheSync(stopChan)

    // 通过 Lister 获取缓存中的 deployment 数据
    deployments, err := deploymentLister.Deployments("").List(labels.Everything())
    if err != nil {
        log.Panic(err)
    }

    // 输出
    for _, deployment := range deployments {
        fmt.Println(deployment.Name)
    }

    <-stopChan
}

2.执行

[23:27:31 root@go testk8s22]#go run main.go 
/root/.kube/config
add a deployment: kube-state-metrics
add a deployment: elastic-exporter
add a deployment: busy-box
add a deployment: istio-ingressgateway
add a deployment: ratings-v1
add a deployment: blackbox
add a deployment: coredns
add a deployment: reviews-v2
add a deployment: reviews-v3
add a deployment: calico-kube-controllers
add a deployment: istiod
add a deployment: productpage-v1
add a deployment: web-tomcat-app1-deployment
add a deployment: istio-egressgateway
add a deployment: details-v1
add a deployment: reviews-v1
busy-box
istio-ingressgateway
blackbox
coredns
istio-egressgateway
reviews-v2
calico-kube-controllers
web-tomcat-app1-deployment
details-v1
kube-state-metrics
elastic-exporter
reviews-v3
istiod
productpage-v1
ratings-v1
reviews-v1
OldDeployment:reviews-v3 update NewDeployment:reviews-v3
OldDeployment:istiod update NewDeployment:istiod
OldDeployment:productpage-v1 update NewDeployment:productpage-v1
OldDeployment:web-tomcat-app1-deployment update NewDeployment:web-tomcat-app1-deployment
OldDeployment:details-v1 update NewDeployment:details-v1
OldDeployment:kube-state-metrics update NewDeployment:kube-state-metrics
OldDeployment:elastic-exporter update NewDeployment:elastic-exporter
OldDeployment:ratings-v1 update NewDeployment:ratings-v1
OldDeployment:reviews-v1 update NewDeployment:reviews-v1
OldDeployment:blackbox update NewDeployment:blackbox
OldDeployment:coredns update NewDeployment:coredns
OldDeployment:istio-egressgateway update NewDeployment:istio-egressgateway
OldDeployment:busy-box update NewDeployment:busy-box
OldDeployment:istio-ingressgateway update NewDeployment:istio-ingressgateway
OldDeployment:reviews-v2 update NewDeployment:reviews-v2
OldDeployment:calico-kube-controllers update NewDeployment:calico-kube-controllers

# 这个时候一执行程序就能获取到 add 和 update 的 deployment 操作,肯定有的朋友就会有疑惑,为什么我们还没有对 K8S 的 deployment 进行操作就能输出这些内容呢?
# 因为我们都知道 informer 第一次是将数据全量添加至缓存中所以对于 informer 来说这就是一个 add 和 update 的操作
# 并且 update 的话是因为我们在代码层面写了每隔 30s 就会全量的将 deployment 的数据拉取下来,所以这时候针对我们的 informer 本地缓存来说就是更新操作了

3.到 K8S 终端创建 testnginx deployment

[23:34:32 root@k8s-master ~]#kubectl create deployment testnginx --image=nginx:1.18

4.informer 程序也能够动态获取

5.删除 testnginx deployment

[23:34:47 root@k8s-master ~]#kubectl delete deployment testnginx 

6.程序动态获取

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇