Kubernetes源码解读(四)--Lister&Watcher源码分析

雅泽1年前技术文章466


Lister&&Watcher是Reflector的一个主要能力提供者,我们来看看Lister&&Watcher是如何实现List()和watch()的过程的。Lister&&Watcher的代码在k8s.io/client-go/tools/cache包中。

1、Lister&&Watcher对象的初始化

ListWatch对象和其创建都在listwatch.go中,下面是对ListWatch对象的定义:

type ListerWatcher interface {
	Lister
	Watcher
}

type ListWatch struct {
	ListFunc  ListFunc
	WatchFunc WatchFunc
    //"DisableChunking 请求此列表监视器不进行分块。
	// DisableChunking requests no chunking for this list watcher.
	DisableChunking bool
}

可以看到这个结构体很简单,就是实现ListFunc 和 WatchFunc,下面是这个对象的初始化

//这里的Getter对应的类型的C对应一个RESTClient
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
	optionsModifier := func(options *metav1.ListOptions) {
        //序列化成字符串
		options.FieldSelector = fieldSelector.String()
	}
    //调用NewFilteredListWatchFromClient这个函数
	return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

可以看到主要逻辑都在NewFilteredListWatchFromClient()函数中,list和watch能力都通过RESTClient提供的:

func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
    //list某个namespace下的某个resource
	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Do(context.TODO()).
			Get()
	}
    //监听某个namespace(命名空间)下的资源
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(context.TODO())
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

上面涉及一个Getter接口,关于Getter的定义

type Getter interface {
	Get() *restclient.Request
}

这里需要一个能够获得*restclient.Request的方式,我们实际会使用rest.Interface接口类型的实例,这是一个相对底层的工具,封装的是Kubernetes REST APIS相应的动作,可以在client-go的rest包内的client.go源文件中看到:

type Interface interface {
	GetRateLimiter() flowcontrol.RateLimiter
	Verb(verb string) *Request
	Post() *Request
	Put() *Request
	Patch(pt types.PatchType) *Request
	Get() *Request
	Delete() *Request
	APIVersion() schema.GroupVersion
}

这个接口对应的实现也在client-go源文件中:

type RESTClient struct {
	base *url.URL
    versionedAPIPath string
	content ClientContentConfig
	createBackoffMgr func() BackoffManager
	rateLimiter flowcontrol.RateLimiter
	warningHandler WarningHandler
	Client *http.Client
}

这里的RESTClient和平时Operator代码中常用的ClientSet的关系,可以通过这个简单的例子了解下:

我们在用ClientSet去Get一个指定名字的DaemonSet的时候,调用过程类似如下:

r.AppsV1().DeamonSets("default").Get(ctx,"test-ds", getOpt)

这里的Get其实就是利用了RESTClient提供的能力,方法实现如下:

func (c *daemonSets) Get (ctx context.Context, name string, options v1.GetOptions) (result *v1betal.DaemonSet, err error) {
    result = & v1betal.DaemonSet{}
    //其实就是RESTClient.Get,返回的是*restclient.Request对象
    err = c.client.Get().
            Namespace(c.ns).
            Resource("daemonsets").
            Name(name).
            VersionedParams(&options, scheme.ParameterCodes).
            Do(ctx).
            Into(result)
    return 
}

2、ListerWatcher接口

上面关于ListWatch对象其实实现的是ListerWatcher接口,代码实现在listwatch.go中:

//这里面嵌套了两个接口分别是Lister和watcher
type ListerWatcher interface {
	Lister
	Watcher
}


type Lister interface {
	// List 的返回值应该是一个list类型对象,也就是其中有items字段,里面的ResourceVersion可以用来监听(watch)
	List(options metav1.ListOptions) (runtime.Object, error)
}


type Watcher interface {
	// 从指定的版本开始watch
	Watch(options metav1.ListOptions) (watch.Interface, error)
}

ListWatch对象的List和Watch的实现:

//
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

//这里的实现就是做相关的类型转换没有特殊实现
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
	return lw.ListFunc(options)
}

// Watch a set of apiserver resources
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
	return lw.WatchFunc(options)
}

3、List-Watch与HTTP chunked

3.1、HTTP中的chunked

Kubernetes 中主要通过 List-Watch 机制实现组件间的异步消息通信。这里继续从HTTP层面来分析watch的实现机制,抓包试一下调用watch接口时数据包流向是怎么样的。

以下案例因为我没有部署k8s和相关的抓包工具,引用胡涛老师文章数据:

Kubernetes 里的 watch 长链接是通过 http 协议 chunked 机制实现的,在响应头里加一个 Transfer-Encoding: chunked 就可以实现分段响应。我们用 golang 来模拟一下这个过程,从而先理解 chunked 是什么。

写个 demo 程序,server 端代码如下:

func Server() {
	http.HandleFunc("/name", func(w http.ResponseWriter, r *http.Request) {
		flusher := w.(http.Flusher)
		for i := 0; i < 2; i++ {
			fmt.Fprintf(w, "Daniel Hu\n")
			flusher.Flush()
			<-time.Tick(1 * time.Second)
		}
	})
	log.Fatal(http.ListenAndServe(":8080", nil))
}

这里的逻辑是当客户端请求 localhost:8080/name 的时候,服务器端响应两段:“Daniel Hu\n”,然后直接运行,再随便用什么工具访问一下,比如 curl localhost:8080/name,抓个包可以看到如下响应体:

Lister&Watcher源码分析.png

chunked 类型的 response 由一个个 chunk 组成,每个 chunk 都是格式都是 Chunk size + Chunk data + Chunk boundary,也就是块大小+数据+边界标识。chunk 的结尾是一个大小为0的 chunk,也就是"0\r\n"。串在一起整体格式类似这样:

  • [Chunk size][Chunk data][Chunk boundary][Chunk size][Chunk data][Chunk boundary][Chunk size=0][Chunk boundary]

在上图的例子中,服务器端响应的内容是两个相同的字符串 “Daniel Hu\n”,客户端拿到的也就是是 “10Daniel Hu\n\r\n10Daniel Hu\n\r\n0\r\n”

这种类型的数据怎么接收呢?可以这样玩:

func Client() {
	resp, err := http.Get("http://127.0.0.1:8080/name")
	if err != nil {
		log.Fatal(err)
	}
	defer resp.Body.Close()

	fmt.Println(resp.TransferEncoding)

	reader := bufio.NewReader(resp.Body)
	for {
		line, err := reader.ReadString('\n')
		if len(line) > 0 {
			fmt.Print(line)
		}
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatal(err)
		}
	}
}

输出内容如下(两个字符串中间会间隔1s):

[chunked]
Daniel Hu
Daniel Hu

http 协议的 chunked 类型响应数据方式大概就是这个玩法,接下来我们看下调用 Kubernetes api 的时候,能不能找到里面的 chunked 痕迹。

3.2、watch API中的chunked

现在多数k8s集群都是 https 暴露 api 了,而且是双向 TLS,所以我们需要先通过 kubectl 来代理 kube-apiserver 提供HTTP的API,从而方便调用和抓包。

  • kubectl proxy

# kubectl proxy
Starting to serve on 127.0.0.1:8001

然后开始 watch 一个资源,比如我这里选择 coredns 的 configmap:

  • curl localhost:8001/api/v1/watch/namespaces/kube-system/configmaps/coredns

# curl localhost:8001/api/v1/watch/namespaces/kube-system/configmaps/coredns
{"type":"ADDED","object":{"kind":"ConfigMap","apiVersion":"v1","metadata":{"name":"coredns","

这时候可以马上拿到一个响应,然后我们通过 kubectl 命令去编辑一下这个 configmap,可以看到 watch 端继续收到一条消息:

{"type":"MODIFIED","object":{"kind":"ConfigMap","apiVersion":"v1","metadata":{"name":"cor

所以apiserver 是这样将资源变更通知到 watcher 的。

这时候如果我们去抓包,依旧可以看到这两个响应信息的具体数据包格式,第一个响应体如下(截取了中间关键信息):

// ……
0030   fe d7 b6 90 af fc 85 ac 48 54 54 50 2f 31 2e 31   ........HTTP/1.1
0040   20 32 30 30 20 4f 4b 0d 0a 41 75 64 69 74 2d 49    200 OK..Audit-I
// ……
0160   2d 62 66 38 38 66 37 66 39 66 33 66 61 0d 0a 54   -bf88f7f9f3fa..T
0170   72 61 6e 73 66 65 72 2d 45 6e 63 6f 64 69 6e 67   ransfer-Encoding
0180   3a 20 63 68 75 6e 6b 65 64 0d 0a 0d 0a 33 61 38   : chunked....3a8
0190   0d 0a 7b 22 74 79 70 65 22 3a 22 41 44 44 45 44   ..{"type":"ADDED
01a0   22 2c 22 6f 62 6a 65 63 74 22 3a 7b 22 6b 69 6e   ","object":{"kin
01b0   64 22 3a 22 43 6f 6e 66 69 67 4d 61 70 22 2c 22   d":"ConfigMap","
// ……

可以看到这里的 http 头有一个 Transfer-Encoding: chunked,下面的内容是 {“type”:“ADDED…

继续看第二个包,第二个简单很多,少了 http 头信息,只是简单的第二个 chunk,长这样:

// ……
0030   fe d7 fb 0b af fc c0 4a 33 61 62 0d 0a 7b 22 74   .......J3ab..{"t
0040   79 70 65 22 3a 22 4d 4f 44 49 46 49 45 44 22 2c   ype":"MODIFIED",
0050   22 6f 62 6a 65 63 74 22 3a 7b 22 6b 69 6e 64 22   "object":{"kind"
0060   3a 22 43 6f 6e 66 69 67 4d 61 70 22 2c 22 61 70   :"ConfigMap","ap
// ……

这里可以看到 0d 0a,出于程序员的职业敏感,得想到这个就是 \r\n,至于前面的 3ab,猜到了吗?转十进制就是 939,对应这个 chunk 的长度,这里和前面我们自己写的 http server 请求-响应格式就一致了。



相关文章

云原生之网络篇

云原生之网络篇

前言:在云原生如火如荼的今天,作为云原生的基石:kubernetes(简称k8s)是不得不掌握的技术。而k8s的网络插件是大家绕不开的技术,但是由于k8s的开源包容性,以及网络的复杂性,导致网络插件出...

shell脚本--sed

sed后面接的动作,务必以 '........'两个单引号包住1.擅长对文件进行操作处理2.对文件信息进行内容的信息修改-i 参数代表直接修改原文件a :往后新增一行, a 的后面可以接字串,而这些字...

Prometheus集成pushgateway监控k8s集群

Prometheus集成pushgateway监控k8s集群

Prometheus部署环境介绍本文的k8s环境是通过二进制方式搭建的v1.20.13版本清单准备注意集群版本的坑,自己先到Github上下载对应的版本。注意: 集群版本在v1.21.x之前需要注意下...

Python Web 自动化测试工具 — Selenium

Selenium 是一个 Web 自动化测试工具,Selenium 通过非常简洁方便的 API,使用 Selenium WebDrivers(Selenium web 驱动器)像使用 Firefox,...

oracle PUS.SPU.CPU.BP.RU.RUR概念简介

PUS.SPU.CPU.BP.RU.RUR概念介绍PSU(Patch Set Updates):Oracle 选取在每个季度用户下载数量最多,并且得到验证具有较低风险的补丁放入到每个季度的PSU中,修...

MapReduce工作机制解析

MapReduce工作机制解析

一、MapTask工作机制主要可以分为Read阶段,Map阶段,Collect阶段,Spill阶段(1)Read阶段:MapTask通过InputFormat获得的RecordReader,从输入In...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。