Kubernetes源码解读(四)--Lister&Watcher源码分析
Lister&&Watcher是Reflector的一个主要能力提供者,我们来看看Lister&&Watcher是如何实现List()和watch()的过程的。Lister&&Watcher的代码在k8s.io/client-go/tools/cache包中。
1、Lister&&Watcher对象的初始化
ListWatch对象和其创建都在listwatch.go中,下面是对ListWatch对象的定义:
type ListerWatcher interface { Lister Watcher } type ListWatch struct { ListFunc ListFunc WatchFunc WatchFunc //"DisableChunking 请求此列表监视器不进行分块。 // DisableChunking requests no chunking for this list watcher. DisableChunking bool }
可以看到这个结构体很简单,就是实现ListFunc 和 WatchFunc,下面是这个对象的初始化
//这里的Getter对应的类型的C对应一个RESTClient func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch { optionsModifier := func(options *metav1.ListOptions) { //序列化成字符串 options.FieldSelector = fieldSelector.String() } //调用NewFilteredListWatchFromClient这个函数 return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier) }
可以看到主要逻辑都在NewFilteredListWatchFromClient()函数中,list和watch能力都通过RESTClient提供的:
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch { //list某个namespace下的某个resource listFunc := func(options metav1.ListOptions) (runtime.Object, error) { optionsModifier(&options) return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Do(context.TODO()). Get() } //监听某个namespace(命名空间)下的资源 watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { options.Watch = true optionsModifier(&options) return c.Get(). Namespace(namespace). Resource(resource). VersionedParams(&options, metav1.ParameterCodec). Watch(context.TODO()) } return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} }
上面涉及一个Getter接口,关于Getter的定义
type Getter interface { Get() *restclient.Request }
这里需要一个能够获得*restclient.Request的方式,我们实际会使用rest.Interface接口类型的实例,这是一个相对底层的工具,封装的是Kubernetes REST APIS相应的动作,可以在client-go的rest包内的client.go源文件中看到:
type Interface interface { GetRateLimiter() flowcontrol.RateLimiter Verb(verb string) *Request Post() *Request Put() *Request Patch(pt types.PatchType) *Request Get() *Request Delete() *Request APIVersion() schema.GroupVersion }
这个接口对应的实现也在client-go源文件中:
type RESTClient struct { base *url.URL versionedAPIPath string content ClientContentConfig createBackoffMgr func() BackoffManager rateLimiter flowcontrol.RateLimiter warningHandler WarningHandler Client *http.Client }
这里的RESTClient和平时Operator代码中常用的ClientSet的关系,可以通过这个简单的例子了解下:
我们在用ClientSet去Get一个指定名字的DaemonSet的时候,调用过程类似如下:
r.AppsV1().DeamonSets("default").Get(ctx,"test-ds", getOpt)
这里的Get其实就是利用了RESTClient提供的能力,方法实现如下:
func (c *daemonSets) Get (ctx context.Context, name string, options v1.GetOptions) (result *v1betal.DaemonSet, err error) { result = & v1betal.DaemonSet{} //其实就是RESTClient.Get,返回的是*restclient.Request对象 err = c.client.Get(). Namespace(c.ns). Resource("daemonsets"). Name(name). VersionedParams(&options, scheme.ParameterCodes). Do(ctx). Into(result) return }
2、ListerWatcher接口
上面关于ListWatch对象其实实现的是ListerWatcher接口,代码实现在listwatch.go中:
//这里面嵌套了两个接口分别是Lister和watcher type ListerWatcher interface { Lister Watcher } type Lister interface { // List 的返回值应该是一个list类型对象,也就是其中有items字段,里面的ResourceVersion可以用来监听(watch) List(options metav1.ListOptions) (runtime.Object, error) } type Watcher interface { // 从指定的版本开始watch Watch(options metav1.ListOptions) (watch.Interface, error) }
ListWatch对象的List和Watch的实现:
// type ListFunc func(options metav1.ListOptions) (runtime.Object, error) type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) //这里的实现就是做相关的类型转换没有特殊实现 func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { return lw.ListFunc(options) } // Watch a set of apiserver resources func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) { return lw.WatchFunc(options) }
3、List-Watch与HTTP chunked
3.1、HTTP中的chunked
Kubernetes 中主要通过 List-Watch 机制实现组件间的异步消息通信。这里继续从HTTP层面来分析watch的实现机制,抓包试一下调用watch接口时数据包流向是怎么样的。
以下案例因为我没有部署k8s和相关的抓包工具,引用胡涛老师文章数据:
Kubernetes 里的 watch 长链接是通过 http 协议 chunked 机制实现的,在响应头里加一个 Transfer-Encoding: chunked 就可以实现分段响应。我们用 golang 来模拟一下这个过程,从而先理解 chunked 是什么。
写个 demo 程序,server 端代码如下:
func Server() { http.HandleFunc("/name", func(w http.ResponseWriter, r *http.Request) { flusher := w.(http.Flusher) for i := 0; i < 2; i++ { fmt.Fprintf(w, "Daniel Hu\n") flusher.Flush() <-time.Tick(1 * time.Second) } }) log.Fatal(http.ListenAndServe(":8080", nil)) }
这里的逻辑是当客户端请求 localhost:8080/name 的时候,服务器端响应两段:“Daniel Hu\n”,然后直接运行,再随便用什么工具访问一下,比如 curl localhost:8080/name,抓个包可以看到如下响应体:
chunked 类型的 response 由一个个 chunk 组成,每个 chunk 都是格式都是 Chunk size + Chunk data + Chunk boundary,也就是块大小+数据+边界标识。chunk 的结尾是一个大小为0的 chunk,也就是"0\r\n"。串在一起整体格式类似这样:
[Chunk size][Chunk data][Chunk boundary][Chunk size][Chunk data][Chunk boundary][Chunk size=0][Chunk boundary]
在上图的例子中,服务器端响应的内容是两个相同的字符串 “Daniel Hu\n”,客户端拿到的也就是是 “10Daniel Hu\n\r\n10Daniel Hu\n\r\n0\r\n”
这种类型的数据怎么接收呢?可以这样玩:
func Client() { resp, err := http.Get("http://127.0.0.1:8080/name") if err != nil { log.Fatal(err) } defer resp.Body.Close() fmt.Println(resp.TransferEncoding) reader := bufio.NewReader(resp.Body) for { line, err := reader.ReadString('\n') if len(line) > 0 { fmt.Print(line) } if err == io.EOF { break } if err != nil { log.Fatal(err) } } }
输出内容如下(两个字符串中间会间隔1s):
[chunked] Daniel Hu Daniel Hu
http 协议的 chunked 类型响应数据方式大概就是这个玩法,接下来我们看下调用 Kubernetes api 的时候,能不能找到里面的 chunked 痕迹。
3.2、watch API中的chunked
现在多数k8s集群都是 https 暴露 api 了,而且是双向 TLS,所以我们需要先通过 kubectl 来代理 kube-apiserver 提供HTTP的API,从而方便调用和抓包。
kubectl proxy
# kubectl proxy Starting to serve on 127.0.0.1:8001
然后开始 watch 一个资源,比如我这里选择 coredns 的 configmap:
curl localhost:8001/api/v1/watch/namespaces/kube-system/configmaps/coredns
# curl localhost:8001/api/v1/watch/namespaces/kube-system/configmaps/coredns {"type":"ADDED","object":{"kind":"ConfigMap","apiVersion":"v1","metadata":{"name":"coredns","
这时候可以马上拿到一个响应,然后我们通过 kubectl 命令去编辑一下这个 configmap,可以看到 watch 端继续收到一条消息:
{"type":"MODIFIED","object":{"kind":"ConfigMap","apiVersion":"v1","metadata":{"name":"cor
所以apiserver 是这样将资源变更通知到 watcher 的。
这时候如果我们去抓包,依旧可以看到这两个响应信息的具体数据包格式,第一个响应体如下(截取了中间关键信息):
// …… 0030 fe d7 b6 90 af fc 85 ac 48 54 54 50 2f 31 2e 31 ........HTTP/1.1 0040 20 32 30 30 20 4f 4b 0d 0a 41 75 64 69 74 2d 49 200 OK..Audit-I // …… 0160 2d 62 66 38 38 66 37 66 39 66 33 66 61 0d 0a 54 -bf88f7f9f3fa..T 0170 72 61 6e 73 66 65 72 2d 45 6e 63 6f 64 69 6e 67 ransfer-Encoding 0180 3a 20 63 68 75 6e 6b 65 64 0d 0a 0d 0a 33 61 38 : chunked....3a8 0190 0d 0a 7b 22 74 79 70 65 22 3a 22 41 44 44 45 44 ..{"type":"ADDED 01a0 22 2c 22 6f 62 6a 65 63 74 22 3a 7b 22 6b 69 6e ","object":{"kin 01b0 64 22 3a 22 43 6f 6e 66 69 67 4d 61 70 22 2c 22 d":"ConfigMap"," // ……
可以看到这里的 http 头有一个 Transfer-Encoding: chunked,下面的内容是 {“type”:“ADDED…
继续看第二个包,第二个简单很多,少了 http 头信息,只是简单的第二个 chunk,长这样:
// …… 0030 fe d7 fb 0b af fc c0 4a 33 61 62 0d 0a 7b 22 74 .......J3ab..{"t 0040 79 70 65 22 3a 22 4d 4f 44 49 46 49 45 44 22 2c ype":"MODIFIED", 0050 22 6f 62 6a 65 63 74 22 3a 7b 22 6b 69 6e 64 22 "object":{"kind" 0060 3a 22 43 6f 6e 66 69 67 4d 61 70 22 2c 22 61 70 :"ConfigMap","ap // ……
这里可以看到 0d 0a,出于程序员的职业敏感,得想到这个就是 \r\n,至于前面的 3ab,猜到了吗?转十进制就是 939,对应这个 chunk 的长度,这里和前面我们自己写的 http server 请求-响应格式就一致了。