Scheduler调度器
一、论 Pod 调度
在 kubernetes 中,无论是 Deployment、Statefulset 等多种控制器,它最终都是创建 Pod,在 Pod 创建是需要被调度到 Kubernetes 集群的 Node 节点中的,此处分析影响 Pod 调度到节点的几种因素。
1、定向调度
修改 Pod 编排模板中的 spec.nodeName 为指定 node
nodeSelector 选择标签
2、亲和性调度
Pod 亲和性与反亲和性
Node 亲和性
3、污点与容忍
污点
容忍
4、调度器打分
调度器打分
5、自研发调度功能
自研的调度器或者控制器
Pod 中的 nodeName
在查看 Pod 发现 web-0 在 kube02 节点,通过查看spec
内容发现nodeName
为kube02
,我们通过尝试修改指定 nodeName
看看 Pod 会不会调度到指定节点。
[root@kube01 pki]# kubectl get pod -o wide -l app=nginx NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES web-0 1/1 Running 0 3d6h 172.30.1.198 kube02 <none> <none> web-1 1/1 Running 0 3d5h 172.30.1.205 kube02 <none> <none>
[root@kube01 pki]# kubectl get pod web-0 -o yaml ... spec: containers: - image: nginx:1.7.9 imagePullPolicy: IfNotPresent name: nginx ports: - containerPort: 80 name: web protocol: TCP resources: {} terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /usr/share/nginx/html name: www - mountPath: /var/run/secrets/kubernetes.io/serviceaccount name: kube-api-access-b2m4j readOnly: true dnsPolicy: ClusterFirst enableServiceLinks: true hostname: web-0 nodeName: kube02 preemptionPolicy: PreemptLowerPriority priority: 0 restartPolicy: Always schedulerName: default-scheduler securityContext: {} serviceAccount: default serviceAccountName: default ...
创建 Pod 的 Yaml 文件,实现创建一个在 kube01 节点的 Pod,发现了只要在spec
下配置nodeName
就可以实现,由此可以知道 Pod 在哪个节点有nodeName
决定,这是因为在 Kubernetes 调度器中,SchedulerWatch
Pod 的时候,发现 Pod 已经存在 nodeName 就会跳过调度过程
# cat nodeName-Pod.yamlapiVersion: v1kind: Podmetadata: name: test-nodename labels: app: jixingxingspec: containers: - name: nodename image: busybox command: ["sleep", "36000"] nodeName: kube01[root@kube01 ~]# kubectl apply -f stu/job/nodeName-Job.yamlpod/test-nodename created[root@kube01 ~]# kubectl get pod -o wide -l app=jixingxingNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATEStest-nodename 0/1 ContainerCreating 0 9s <none> kube01 <none> <none>
nodeSelector 选择器
通过查看 node 节点标签,观察到 kube02 有status=health
标签,创建 Pod 使用 nodeSelector 选择 kube02 标签
[root@kube01 ~]# kubectl get nodes --show-labels NAME STATUS ROLES AGE VERSION LABELS kube01 Ready control-plane,master 133d v1.23.6 beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=kube01,kubernetes.io/os=linux,node-role.kubernetes.io/control-plane=,node-role.kubernetes.io/master=,node.kubernetes.io/exclude-from-external-load-balancers= kube02 Ready <none> 133d v1.23.6 beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=kube02,kubernetes.io/os=linux,status=health
通过创建了一个 Pod 选择了status=health
的标签,发现它被调度到 kube02 节点,如果node
标签没有的话就会pending
,这是因为 Scheduler 调度器中的label-selector
功能,它会判断有没有节点标签,跳过打分过程。
# cat nodeSelector-Pod.yamlapiVersion: v1kind: Podmetadata: name: test-nodeselector labels: app: jixingxingspec: containers: - name: nodeselector image: busybox command: ["sleep", "36000"] nodeSelector: status: health# kubectl apply -f nodeName-Job.yamlpod/test-nodeselector created# kubectl get pod -o wideNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATEStest-nodeselector 0/1 ContainerCreating 0 4s <none> kube02 <none> <none>
此时突发奇想nodeName
与nodeSelector
同时存在但是相反会怎么样?配置了nodeName
为 kube01,nodeSelector
为 kube02,发现 Pod 已经调度到 kube01 节点,但是却启动不了,报亲和性失败问题。由此判断两中选择类型应该都是强制约束的。
# cat nodeSelector-Pod.yamlapiVersion: v1 kind: Pod metadata: name: test-nodeselector labels: app: jixingxing spec: containers: - name: nodename image: busybox command: ["sleep", "36000"] nodeName: kube01 nodeSelector: status: health# kubectl apply -f nodeSelector-Pod.yamlpod/test-nodeselector created# kubectl get pod -o wideNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES test-nodeselector 0/1 NodeAffinity 0 8s <none> kube01 <none> <none>
Pod 亲和性与反亲和性
亲和性
它基于运行在 node 上的 pod 标签来限制 pod 调度在哪个节点上,而不是节点的标签
Pod 亲和性分为硬策略与软策略两种,硬策略强制调用到匹配节点,如果没有对应节点则 Pod 不会被调度;软策略就是优先匹配有标签的节点,如果没有的话会继续调度 Pod
# cat pod-affinityapiVersion: v1kind: Podmetadata: name: with-pod-affinityspec: affinity: podAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - jixingxing topologyKey: kubernetes.io/hostname containers: - name: busybox image: busybox command: ['sleep', '3600']
在 yaml 文件中,affinity
为亲和性,podAffinity
为 Pod 亲和性,requiredDuringSchedulingIgnoredDuringExecution
为硬策略没有达到亲和性要求则不调度,labelSelector
为标签选择器,operator
有四种表达式In
NotIn
Exists
DoesNotExist
,topologyKey
为寻找 node 节点(意思在 node 标签为 kubernetes.io/hostname 的节点上寻找 pod 标签,这里是硬策略没有的话就不调度)
执行后查看 Pod 所属节点,发现已经被调度到 kube01 节点
# kubectl get pod --show-labels -l app=jixingxing -o wideNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES LABELS test-nodename 1/1 Running 2 (71m ago) 21h 172.30.0.25 kube01 <none> <none> app=jixingxing test-nodeselector 1/1 Running 0 3h32m 172.30.0.89 kube01 <none> <none> app=jixingxing [root@kube01 ~]# kubectl get nodes --show-labels NAME STATUS ROLES AGE VERSION LABELS kube01 Ready control-plane,master 134d v1.23.6 beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=kube01,kubernetes.io/os=linux,node-role.kubernetes.io/control-plane=,node-role.kubernetes.io/master=,node.kubernetes.io/exclude-from-external-load-balancers= kube02 Ready <none> 134d v1.23.6 beta.kubernetes.io/arch=amd64,beta.kubernetes.io/os=linux,kubernetes.io/arch=amd64,kubernetes.io/hostname=kube02,kubernetes.io/os=linux,status=health [root@kube01 ~]# kubectl get pod --show-labels -o wide NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES LABELS with-pod-affinity 0/1 ContainerCreating 0 7s <none> kube01 <none> <none> <none>
设置一个没有的标签查看效果,发现 Pod Pending 中
apiVersion: v1kind: Podmetadata: name: with-pod-affinityspec: affinity: podAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - jixingxing001 topologyKey: kubernetes.io/hostname containers: - name: busybox image: busybox command: ['sleep', '3600']
[root@kube01 ~]# kubectl get pod -o wide NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES with-pod-affinity 0/1 Pending 0 9s <none> <none> <none> <none> [root@kube01 ~]# kubectl describe pod with-pod-affinity Name: with-pod-affinity Namespace: default Priority: 0 Node: <none> Labels: <none> Annotations: <none> Status: Pending IP: IPs: <none> Containers: busybox: Image: busybox Port: <none> Host Port: <none> Command: sleep 3600 Environment: <none> Mounts: /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-zw6p7 (ro) Conditions: Type Status PodScheduled False Volumes: kube-api-access-zw6p7: Type: Projected (a volume that contains injected data from multiple sources) TokenExpirationSeconds: 3607 ConfigMapName: kube-root-ca.crt ConfigMapOptional: <nil> DownwardAPI: true QoS Class: BestEffort Node-Selectors: <none> Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s node.kubernetes.io/unreachable:NoExecute op=Exists for 300s Events: Type Reason Age From Message ---- ------ ---- ---- ------- Warning FailedScheduling 28s default-scheduler 0/2 nodes are available: 2 node(s) didn't match pod affinity rules.
再次创建一个软策略找一个没有的标签,需要注意的是硬限制跟软限制模板不一样,preferredDuringSchedulingIgnoredDuringExecution
为软限制,weight
为匹配相关联的权重,设置为 100 就是严格按照匹配规则打分。
# cat pod-affinityapiVersion: v1kind: Podmetadata: name: with-pod-affinityspec: affinity: podAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 podAffinityTerm: labelSelector: matchExpressions: - key: app operator: In values: - jixingxing001 topologyKey: kubernetes.io/hostname containers: - name: busybox image: busybox command: ['sleep', '3600']
查看结果效果软限制匹配了标签,没有它也会创建
[root@kube01 ~]# kubectl apply -f pod-affinity pod/with-pod-affinity created [root@kube01 ~]# kubectl get pod -o wide NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES with-pod-affinity 0/1 ContainerCreating 0 6s <none> kube01 <none> <none>
反亲和性
Pod 亲和性就是根据规则匹配 Pod 标签,如果有就往它靠近,反亲和就是如果有就不往它靠近
# cat pod-affinityapiVersion: v1kind: Podmetadata: name: with-pod-affinityspec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: app operator: In values: - jixingxing topologyKey: kubernetes.io/hostname containers: - name: busybox image: busybox command: ['sleep', '3600']
# cat pod-affinityapiVersion: v1kind: Podmetadata: name: with-pod-affinityspec: affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 podAffinityTerm: labelSelector: matchExpressions: - key: app operator: In values: - jixingxing topologyKey: kubernetes.io/hostname containers: - name: busybox image: busybox command: ['sleep', '3600']
查看执行结果
[root@kube01 ~]# kubectl apply -f pod-affinity pod/with-pod-affinity created [root@kube01 ~]# kubectl get pod -o wide --show-labelsNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES LABELS test-nodename 1/1 Running 2 (96m ago) 21h 172.30.0.25 kube01 <none> <none> app=jixingxing test-nodeselector 1/1 Running 0 3h58m 172.30.0.89 kube01 <none> <none> app=jixingxingwith-pod-affinity 1/1 Running 0 115s 172.30.0.80 kube02 <none> <none> <none>
Node 亲和性
Node 只有亲和性没有反亲和性,可以通过 explain
查看
[root@kube01 ~]# kubectl explain pod.spec.affinity KIND: Pod VERSION: v1 RESOURCE: affinity <Object> DESCRIPTION: If specified, the pod's scheduling constraints Affinity is a group of affinity scheduling rules. FIELDS: nodeAffinity <Object> Describes node affinity scheduling rules for the pod. podAffinity <Object> Describes pod affinity scheduling rules (e.g. co-locate this pod in the same node, zone, etc. as some other pod(s)). podAntiAffinity <Object> Describes pod anti-affinity scheduling rules (e.g. avoid putting this pod in the same node, zone, etc. as some other pod(s)).
硬策略亲和
[root@kube01 ~]# cat node-affinityapiVersion: v1kind: Podmetadata: name: with-node-affinityspec: affinity: nodeAffinity: requiredDuringSchedulingIgnoredDuringExecution: nodeSelectorTerms: - matchExpressions: - key: kubernetes.io/hostname operator: In values: - kube01 containers: - name: busybox image: busybox command: ['sleep', '3600']
软策略亲和
apiVersion: v1kind: Podmetadata: name: with-node-affinityspec: affinity: nodeAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 1 preference: matchExpressions: - key: kubernetes.io/hostname operator: In values: - kube01 containers: - name: busybox image: busybox command: ['sleep', '3600']
执行发现亲和性成功
[root@kube01 ~]# kubectl get node NAME STATUS ROLES AGE VERSION kube01 Ready control-plane,master 134d v1.23.6kube02 Ready <none> 134d v1.23.6[root@kube01 ~]# kubectl get pod -o wide --show-labelsNAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES LABELSwith-node-affinity 0/1 ContainerCreating 0 2s <none> kube01 <none> <none> <none>
污点与容忍
如果一个节点标记为 Taints ,除非 Pod 被标识为可以容忍污点节点,否则该 Taints 节点不会被调度 Pod,在实际使用 Kubernetes 集群环境中,一般 Master 节点因为存在 API Server 等核心组件,所以会给 Master 节点打上污点让它不允许被调度。
查看 Node 节点污点,发现污点为NoSchedule
不可调度,污点状态有多种分别为:
NoSchedule:不可调度
PreferNoSchedule:尽量不调度
NoExecute:该节点内正在运行的 pod 没有对应 Tolerate 设置,会直接被逐出(前两种已经存在的不会被驱逐这个会)
[root@kube01 ~]# kubectl describe node kube01 Name: kube01 Roles: control-plane,master Labels: beta.kubernetes.io/arch=amd64 beta.kubernetes.io/os=linux kubernetes.io/arch=amd64 kubernetes.io/hostname=kube01 kubernetes.io/os=linux node-role.kubernetes.io/control-plane= node-role.kubernetes.io/master= node.kubernetes.io/exclude-from-external-load-balancers= Annotations: flannel.alpha.coreos.com/backend-data: {"VNI":1,"VtepMAC":"12:81:9d:ec:18:fe"} flannel.alpha.coreos.com/backend-type: vxlan flannel.alpha.coreos.com/kube-subnet-manager: true flannel.alpha.coreos.com/public-ip: 192.168.17.42 kubeadm.alpha.kubernetes.io/cri-socket: /var/run/dockershim.sock node.alpha.kubernetes.io/ttl: 0 volumes.kubernetes.io/controller-managed-attach-detach: true CreationTimestamp: Thu, 09 Mar 2023 11:24:38 +0800 Taints: node-role.kubernetes.io/master:NoSchedule
只有在 Pod 中设置容忍在可以使用该节点,需要注意的是容忍不代表调度,容忍只是调度时多一个节点选择,污点容忍一般在 DaemonSet 时使用
# cat tolerations.yamlapiVersion: v1 kind: Pod metadata: name: taint spec: containers: - name: busybox image: busybox command: ['sleep', '3600'] tolerations: - key: "node-role.kubernetes.io/master" operator: "Exists" effect: "NoSchedule"
调度器打分
调度 Pod 可以分为以下几个阶段:
预入队列:将 Pod 被添加到内部活动队列之前被调用,在此队列中 Pod 被标记为准备好进行调度
排队:用于对调度队列中的 Pod 进行排序。
过滤:过滤掉不满足条件的节点
打分:对通过的节点按照优先级排序
保留资源:因为是 Pod 的创建是异步的,所以需要在找到最合适的机器后先进行资源的保留
批准、拒绝、等待:调度完成后对于发送 Pod 的信息进行批准、拒绝、等待
绑定:最后从中选择优先级最高的节点,如果中间任何一步骤有错误,就直接返回错误
预入队列
PreEnqueue
这些插件在将 Pod 被添加到内部活动队列之前被调用,在此队列中 Pod 被标记为准备好进行调度。
只有当所有 PreEnqueue 插件返回Success
时,Pod 才允许进入活动队列。 否则,它将被放置在内部无法调度的 Pod 列表中,并且不会获得Unschedulable
状态。
排队
Sort
Sort 用于对调度队列中的 Pod 进行排序。 队列排序插件本质上提供 less(Pod1, Pod2) 函数。 一次只能启动一个队列插件。
过滤
PreFilter
这些插件用于预处理 Pod 的相关信息,或者检查集群或 Pod 必须满足的某些条件。如果 PreFilter 插件返回错误,则调度周期将终止。
Filter
这些插件用于过滤出不能运行该 Pod 的节点。对于每个节点,调度器将按照其配置顺序调用这些过滤插件。如果任何过滤插件将节点标记为不可行,则不会为该节点调用剩下的过滤插件。节点可以被同时进行评估。
PostFilter
这些插件在 Filter 阶段后调用,但仅在该 Pod 没有可行的节点时调用
。 插件按其配置的顺序调用。如果任何 PostFilter 插件标记节点为“Schedulable”, 则其余的插件不会调用。典型的 PostFilter 实现是抢占,试图通过抢占其他 Pod 的资源使该 Pod 可以调度。
打分
PreScore
这些插件用于执行 “前置评分(pre-scoring)” 工作,即生成一个可共享状态供 Score 插件使用。 如果 PreScore 插件返回错误,则调度周期将终止。
Score
这些插件用于对通过过滤阶段的节点进行排序。调度器将为每个节点调用每个评分插件。 将有一个定义明确的整数范围,代表最小和最大分数。 在标准化评分阶段之后,调度器将根据配置的插件权重合并所有插件的节点分数。
NormalizeScore
这些插件用于在调度器计算 Node 排名之前修改分数。 在此扩展点注册的插件被调用时会使用同一插件的 Score 结果。 每个插件在每个调度周期调用一次。
保留资源
Reserve
实现了 Reserve 扩展的插件,拥有两个方法,即 Reserve 和 Unreserve, 他们分别支持两个名为 Reserve 和 Unreserve 的信息处理性质的调度阶段。 维护运行时状态的插件(又称 "有状态插件")应该使用这两个阶段,以便在节点上的资源被保留和未保留给特定的 Pod 时得到调度器的通知。
Reserve 阶段发生在调度器实际将一个 Pod 绑定到其指定节点之前。 它的存在是为了防止在调度器等待绑定成功时发生竞争情况。 每个 Reserve 插件的 Reserve 方法可能成功,也可能失败; 如果一个 Reserve 方法调用失败,后面的插件就不会被执行,Reserve 阶段被认为失败。 如果所有插件的 Reserve 方法都成功了,Reserve 阶段就被认为是成功的,剩下的调度周期和绑定周期就会被执行。
如果 Reserve 阶段或后续阶段失败了,则触发 Unreserve 阶段。发生这种情况时,所有 Reserve 插件的 Unreserve 方法将按照 Reserve 方法调用的相反顺序执行。这个阶段的存在是为了清理与保留的 Pod 相关的状态。
这个是调度周期的最后一步。一旦 Pod 处于保留状态,它将在绑定周期结束时触发 Unreserve 插件(失败时)或 PostBind 插件(成功时)。
批准、拒绝、等待
Permit
Permit 插件在每个 Pod 调度周期的最后调用,用于防止或延迟 Pod 的绑定。 一个允许插件可以做以下三件事之一:
批准
一旦所有 Permit 插件批准 Pod 后,该 Pod 将被发送以进行绑定。拒绝
如果任何 Permit 插件拒绝 Pod,则该 Pod 将被返回到调度队列。 这将触发 Reserve 插件中的 Unreserve 阶段。等待(带有超时)
如果一个 Permit 插件返回 “等待” 结果,则 Pod 将保持在一个内部的 “等待中” 的 Pod 列表,同时该 Pod 的绑定周期启动时即直接阻塞直到得到批准。 如果超时发生,等待变成拒绝,并且 Pod 将返回调度队列,从而触发 Reserve 插件中的 Unreserve 阶段。
绑定
PreBind
这些插件用于执行 Pod 绑定前所需的所有工作。 例如,一个 PreBind 插件可能需要制备网络卷并且在允许 Pod 运行在该节点之前 将其挂载到目标节点上。
如果任何 PreBind 插件返回错误,则 Pod 将被拒绝并且退回到调度队列中。
Bind
Bind 插件用于将 Pod 绑定到节点上。直到所有的 PreBind 插件都完成,Bind 插件才会被调用。各 Bind 插件按照配置顺序被调用。Bind 插件可以选择是否处理指定的 Pod。 如果某 Bind 插件选择处理某 Pod,剩余的 Bind 插件将被跳过。
PostBind
这是个信息性的扩展点。 PostBind 插件在 Pod 成功绑定后被调用。这是绑定周期的结尾,可用于清理相关的资源。
Unreserve
这是个信息性的扩展点。 如果 Pod 被保留,然后在后面的阶段中被拒绝,则 Unreserve 插件将被通知。 Unreserve 插件应该清楚保留 Pod 的相关状态。
使用此扩展点的插件通常也使用 Reserve。
查看各个节点插件
插件路径pkg/scheduler/framework/plugins/registry.go
func NewInTreeRegistry() runtime.Registry { fts := plfeature.Features{ EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation), EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod), EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority), EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread), EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread), EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread), EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness), EnablePodDisruptionConditions: feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions), EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling), EnableSidecarContainers: feature.DefaultFeatureGate.Enabled(features.SidecarContainers), } registry := runtime.Registry{ dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New), selectorspread.Name: selectorspread.New, imagelocality.Name: imagelocality.New, tainttoleration.Name: tainttoleration.New, nodename.Name: nodename.New, nodeports.Name: nodeports.New, nodeaffinity.Name: nodeaffinity.New, podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New), nodeunschedulable.Name: nodeunschedulable.New, noderesources.Name: runtime.FactoryAdapter(fts, noderesources.NewFit), noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation), volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New), volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New), volumezone.Name: volumezone.New, nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI), nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS), nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD), nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk), nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder), interpodaffinity.Name: interpodaffinity.New, queuesort.Name: queuesort.New, defaultbinder.Name: defaultbinder.New, defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New), schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New), } return registry }
通过查看pkg/scheduler/framework/interface.go
文件内容发现,扩展点都是以接口的形式展现,所以插件只需要完成接口就可以被扩展点引用
// PreFilterPlugin is an interface that must be implemented by "PreFilter" plugins.// These plugins are called at the beginning of the scheduling cycle.type PreFilterPlugin interface { Plugin // PreFilter is called at the beginning of the scheduling cycle. All PreFilter // plugins must return success or the pod will be rejected. PreFilter could optionally // return a PreFilterResult to influence which nodes to evaluate downstream. This is useful // for cases where it is possible to determine the subset of nodes to process in O(1) time. // When it returns Skip status, returned PreFilterResult and other fields in status are just ignored, // and coupled Filter plugin/PreFilterExtensions() will be skipped in this scheduling cycle. PreFilter(ctx context.Context, state *CycleState, p *v1.Pod) (*PreFilterResult, *Status) // PreFilterExtensions returns a PreFilterExtensions interface if the plugin implements one, // or nil if it does not. A Pre-filter plugin can provide extensions to incrementally // modify its pre-processed info. The framework guarantees that the extensions // AddPod/RemovePod will only be called after PreFilter, possibly on a cloned // CycleState, and may call those functions more than once before calling // Filter again on a specific node. PreFilterExtensions() PreFilterExtensions }// FilterPlugin is an interface for Filter plugins. These plugins are called at the// filter extension point for filtering out hosts that cannot run a pod.// This concept used to be called 'predicate' in the original scheduler.// These plugins should return "Success", "Unschedulable" or "Error" in Status.code.// However, the scheduler accepts other valid codes as well.// Anything other than "Success" will lead to exclusion of the given host from// running the pod.type FilterPlugin interface { Plugin // Filter is called by the scheduling framework. // All FilterPlugins should return "Success" to declare that // the given node fits the pod. If Filter doesn't return "Success", // it will return "Unschedulable", "UnschedulableAndUnresolvable" or "Error". // For the node being evaluated, Filter plugins should look at the passed // nodeInfo reference for this particular node's information (e.g., pods // considered to be running on the node) instead of looking it up in the // NodeInfoSnapshot because we don't guarantee that they will be the same. // For example, during preemption, we may pass a copy of the original // nodeInfo object that has some pods removed from it to evaluate the // possibility of preempting them to schedule the target pod. Filter(ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo) *Status }
选择一个插件查看方法pkg/scheduler/framework/plugins/nodename/node_name.go
,发现只实现了 Filter 方法与 Name 方法,那么就是 Filter 扩展点接口
type NodeName struct{}var _ framework.FilterPlugin = &NodeName{}var _ framework.EnqueueExtensions = &NodeName{}const ( // Name is the name of the plugin used in the plugin registry and configurations. Name = names.NodeName // ErrReason returned when node name doesn't match. ErrReason = "node(s) didn't match the requested node name")// EventsToRegister returns the possible events that may make a Pod// failed by this plugin schedulable.func (pl *NodeName) EventsToRegister() []framework.ClusterEventWithHint { return []framework.ClusterEventWithHint{ {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}}, } }// Name returns name of the plugin. It is used in logs, etc.func (pl *NodeName) Name() string { return Name }// Filter invoked at the filter extension point.func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { if !Fits(pod, nodeInfo) { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason) } return nil}
Scheduler 启动流程
kube-scheduler 的启动文件中就包含一个 main
入口函数,cli.Run(command)
只是 cobra 框架启动格式,实际上具体任务在app.NewSchedulerCommand()
下实现
// cmd/kube-scheduler/scheduler.gofunc main() { // 初始化 Cobra.Command 对象 command := app.NewSchedulerCommand() // 执行命令 code := cli.Run(command) os.Exit(code) }
NewSchedulerCommand
函数的目的就是初始化参数,并且把参数传给程序 run 函数
// cmd/kube-scheduler/app/server.gofunc NewSchedulerCommand(registryOptions ...Option) *cobra.Command { // 初始化参数 // Options 相当于项目运行的一个参数,最终 Scheduler 的结构体不是这个 opts := options.NewOptions() cmd := &cobra.Command{ Use: "kube-scheduler", Long: `......`, RunE: func(cmd *cobra.Command, args []string) error { // runCommand 函数,此处是 Cobra 格式,同时把 opts 参数与 args 参数传给函数内部 return runCommand(cmd, opts, registryOptions...) }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { if len(arg) > 0 { return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args) } } return nil }, } ...... return cmd }
Options
生成了Scheduler
结构体所需要的配置,包括不限于身份认证、密钥、日志、选举等,最后有一步是把Options
的内容传给Scheduler
结构体
// cmd/kube-scheduler/app/options/options.gotype Options struct { // The default values. ComponentConfig *kubeschedulerconfig.KubeSchedulerConfiguration SecureServing *apiserveroptions.SecureServingOptionsWithLoopback Authentication *apiserveroptions.DelegatingAuthenticationOptions Authorization *apiserveroptions.DelegatingAuthorizationOptions Metrics *metrics.Options Logs *logs.Options Deprecated *DeprecatedOptions LeaderElection *componentbaseconfig.LeaderElectionConfiguration // ConfigFile is the location of the scheduler server's configuration file. ConfigFile string // WriteConfigTo is the path where the default configuration will be written. WriteConfigTo string Master string // Flags hold the parsed CLI flags. Flags *cliflag.NamedFlagSets }// NewOptions returns default scheduler app options.func NewOptions() *Options { o := &Options{ SecureServing: apiserveroptions.NewSecureServingOptions().WithLoopback(), Authentication: apiserveroptions.NewDelegatingAuthenticationOptions(), Authorization: apiserveroptions.NewDelegatingAuthorizationOptions(), Deprecated: &DeprecatedOptions{ PodMaxInUnschedulablePodsDuration: 5 * time.Minute, }, LeaderElection: &componentbaseconfig.LeaderElectionConfiguration{ LeaderElect: true, LeaseDuration: metav1.Duration{Duration: 15 * time.Second}, RenewDeadline: metav1.Duration{Duration: 10 * time.Second}, RetryPeriod: metav1.Duration{Duration: 2 * time.Second}, ResourceLock: "leases", ResourceName: "kube-scheduler", ResourceNamespace: "kube-system", }, Metrics: metrics.NewOptions(), Logs: logs.NewOptions(), } o.Authentication.TolerateInClusterLookupFailure = true o.Authentication.RemoteKubeConfigFileOptional = true o.Authorization.RemoteKubeConfigFileOptional = true // Set the PairName but leave certificate directory blank to generate in-memory by default o.SecureServing.ServerCert.CertDirectory = "" o.SecureServing.ServerCert.PairName = "kube-scheduler" o.SecureServing.BindPort = kubeschedulerconfig.DefaultKubeSchedulerPort o.initFlags() return o }
runCommand
中最重要的就是Setup
与Run
函数
// cmd/kube-scheduler/app/server.go// runCommand runs the scheduler.func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error { ...... // 安装程序根据命令参数和选项创建完整的配置和调度程序 // 生成完成配置文件,初始化 Scheduler 对象 cc, sched, err := Setup(ctx, opts, registryOptions...) if err != nil { return err } ...... // 根据给定的配置执行 Scheduler 程序 return Run(ctx, cc, sched) }
在Setup
函数中最重要的就是scheduler.New
方法,初始化scheduler
也是加载调度插件的方法。
// cmd/kube-scheduler/app/server.go// Setup creates a completed config and a scheduler based on the command args and options// 可以看到 Setup 函数最终返回的是一个 CompletedConfig 就是完成配置文件,还有一个是 Scheduler 就是调度结构体func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { ...... // 初始化 schedulerappconfig.Config 对象 // opts.Config(ctx) 返回的是 (*schedulerappconfig.Config, error),所以 c 就是已经初始化完成的 schedulerappconfig.Config c, err := opts.Config(ctx) if err != nil { return nil, nil, err } // Get the completed config // 获取完成的所有配置 // 此处相当于再次封装了一下 schedulerappconfig.Config,具体看 cmd/kube-scheduler/app/config/config.go // schedulerserverconfig 就是 schedulerappconfig 导入包的时候别名 cc := c.Complete() // 初始化外部树的插件,默认没有 outOfTreeRegistry := make(runtime.Registry) for _, option := range outOfTreeRegistryOptions { if err := option(outOfTreeRegistry); err != nil { return nil, nil, err } } recorderFactory := getRecorderFactory(&cc) completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0) // Create the scheduler. // 初始化 Scheduler // schedulerserverconfig.Config 已经初始化完成了那么就通过配置文件内容初始化 scheduler 结构体。 sched, err := scheduler.New(ctx, cc.Client, cc.InformerFactory, cc.DynInformerFactory, recorderFactory, scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), scheduler.WithKubeConfig(cc.KubeConfig), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), scheduler.WithParallelism(cc.ComponentConfig.Parallelism), scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) { // Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging completedProfiles = append(completedProfiles, profile) }), ) ...... return &cc, sched, nil}
在Setup
中初始化了schedulerserverconfig.config
并完成了初始化scheduler
,那么在Run
函数中就可以执行scheduler
的具体方法。
// cmd/kube-scheduler/app/server.go// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { ...... sched.Run(ctx) return fmt.Errorf("finished without leader elect") }
Scheduler New 方法分析
调度器的主要功能就是Watch
未调度的 Pod,找到一个合适的 Node 节点,追加到 Pod 结构体中然后回写到 ApiServer
调度器运行最终是调用的Scheduler
结构体的Run
方法,那么我们先简单看一下结构体存了哪些信息
// pkg/scheduler/scheduler.go// Scheduler watches for new unscheduled pods. It attempts to find// nodes that they fit on and writes bindings back to the api server.type Scheduler struct { // It is expected that changes made via Cache will be observed // by NodeLister and Algorithm. // 预计通过 Cache 所做的更改将被 NodeLister 和算法观察到。 Cache internalcache.Cache // 扩展器是外部进程影响Kubernetes做出的调度决策的接口,通常是不由Kubernetes直接管理的资源所需要的。 Extenders []framework.Extender // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get // stale while they sit in a channel. // 以阻塞的形式获取下一个有效的待调度的Pod。这里不使用channel,主要是因为对一个pod的调度可能需要一些时间 NextPod func() *framework.QueuedPodInfo // FailureHandler is called upon a scheduling failure. // 调度失败处理 Handler FailureHandler FailureHandlerFn // SchedulePod tries to schedule the given pod to one of the nodes in the node list. // Return a struct of ScheduleResult with the name of suggested host on success, // otherwise will return a FitError with reasons. // 试图将一个Pod调度到其中一个节点上,成功则返回 ScheduleResult, 否则返回带有失败原来的 FitError SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) // Close this to shut down the scheduler. StopEverything <-chan struct{} // SchedulingQueue holds pods to be scheduled // 等待被调度的Pod队列,也就是说只有从这个队列里获取的Pod才能被调度 SchedulingQueue internalqueue.SchedulingQueue // Profiles are the scheduling profiles. // 调度配置profiles Profiles profile.Map client clientset.Interface // node 快照信息 nodeInfoSnapshot *internalcache.Snapshot // 节点得分百分比 percentageOfNodesToScore int32 nextStartNodeIndex int // logger *must* be initialized when creating a Scheduler, // otherwise logging functions will access a nil sink and // panic. logger klog.Logger // registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start. registeredHandlers []cache.ResourceEventHandlerRegistration }
分析了结构体信息后,在Setup
函数中详细分析一下Scheduler
New 了哪些内容
先看一下Setup
中把options
值传给了schedulerappconfig.conf
,此处通过schedulerserverconfig.conf
初始化scheduler
// cmd/kube-scheduler/app/server.go// Setup creates a completed config and a scheduler based on the command args and optionsfunc Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { ...... sched, err := scheduler.New(ctx, cc.Client, cc.InformerFactory, cc.DynInformerFactory, recorderFactory, scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion), scheduler.WithKubeConfig(cc.KubeConfig), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), scheduler.WithParallelism(cc.ComponentConfig.Parallelism), scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) { // Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging completedProfiles = append(completedProfiles, profile) }), ) ...... return &cc, sched, nil}
详细看看New
函数初始化的内容
// pkg/scheduler/scheduler.go// New returns a Schedulerfunc New(ctx context.Context, client clientset.Interface, informerFactory informers.SharedInformerFactory, dynInformerFactory dynamicinformer.DynamicSharedInformerFactory, recorderFactory profile.RecorderFactory, opts ...Option) (*Scheduler, error) { logger := klog.FromContext(ctx) stopEverything := ctx.Done() // 初始化 options options := defaultSchedulerOptions // 此处相当于回调,Option 类型是函数,所以相当于把 options 传给函数执行一下 // 把自定义值传给 schedulerOptions //scheduler.WithComponentConfigVersion 开始都是 option for _, opt := range opts { opt(&options) } // 此处出现了一个重要结构 Profile if options.applyDefaultProfile { var versionedCfg configv1.KubeSchedulerConfiguration scheme.Scheme.Default(&versionedCfg) cfg := schedulerapi.KubeSchedulerConfiguration{} if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil { return nil, err } // 就是把默认的 Profiles 传给 Options.profiles // 通过指定 "scheduler name" profile,可以用来控制Pod的调度行为,如何为空则默认使用 "default-scheduler" profile options.profiles = cfg.Profiles } // 注册表注册所有树内插件 // registry 类型是 map[string]PluginFactory,就是前面是名称后面是方法 registry := frameworkplugins.NewInTreeRegistry() // 注册表中合并树外插件,自定义调度插件开发就是在这合并进来 if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { return nil, err } metrics.Register() // 此处出现了另一个重要结构 extenders // Extender 是外部进程影响 Kubernetes 调度决策的接口。这通常是 Kubernetes 不直接管理的资源所需要的 extenders, err := buildExtenders(logger, options.extenders, options.profiles) if err != nil { return nil, fmt.Errorf("couldn't build extenders: %w", err) } // 获取 pod、list 资源列表 podLister := informerFactory.Core().V1().Pods().Lister() nodeLister := informerFactory.Core().V1().Nodes().Lister() snapshot := internalcache.NewEmptySnapshot() // 集群Event事件 metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything) // 重点在这 // 按照profiles配置文件给出framework框架 // profile.NewMap返回的类型是map[string]framework.Framework,就是SchedulerName:Framework,其中 framework 里面结构多是 []PreEnqueuePlugin 这种,所以 map 里一个 profile 对应一个 framework profiles, err := profile.NewMap(ctx, // 调度器配置 options.profiles, // 注册表 registry, // 事件记录器 recorderFactory, // componentConfig 版本号 frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion), // client 客户端 frameworkruntime.WithClientSet(client), // kubeConfig 配置文件 frameworkruntime.WithKubeConfig(options.kubeConfig), // SharedInformerFactory frameworkruntime.WithInformerFactory(informerFactory), // SharedLister frameworkruntime.WithSnapshotSharedLister(snapshot), frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)), // 设置并行调度数量,默认为16 frameworkruntime.WithParallelism(int(options.parallelism)), // 外部扩展器 frameworkruntime.WithExtenders(extenders), // metrics 指标 frameworkruntime.WithMetricsRecorder(metricsRecorder), ) if err != nil { return nil, fmt.Errorf("initializing profiles: %v", err) } if len(profiles) == 0 { return nil, errors.New("at least one profile is required") } // 预入队插件映射 preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin) // 每个配置文件的排队提示 queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile) 首先遍历 profiles 获取其对应的已注册好的 PreQueuePlugin 插件,这些插件是在将Pods添加到 activeQ 之前调用。 for profileName, profile := range profiles { preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins() queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions()) } // 创建队列 // 将优先级队列初始化为新的调度队列 podQueue := internalqueue.NewSchedulingQueue( //排队函数 profiles[options.profiles[0].SchedulerName].QueueSortFunc(), // sharedInformerFactory informerFactory, // 设置 pod 的 Initial 阶段的 Backoff 的持续时间 internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second), // 最大backoff持续时间 internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second), // 设置podLister internalqueue.WithPodLister(podLister), // 一个 pod 在 unschedulablePods 停留的最长时间 internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration), // preEnqueuePluginMap 插件注册 internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), // 每个带有配置文件的插件注册 internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile), // 指标 internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent), internalqueue.WithMetricsRecorder(*metricsRecorder), ) // 把 pod 队列给到每个 framework for _, fwk := range profiles { fwk.SetPodNominator(podQueue) } // 缓存 schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod) // Setup cache debugger. debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue) debugger.ListenForSignal(ctx) sched := &Scheduler{ Cache: schedulerCache, client: client, nodeInfoSnapshot: snapshot, percentageOfNodesToScore: options.percentageOfNodesToScore, Extenders: extenders, NextPod: internalqueue.MakeNextPodFunc(logger, podQueue), StopEverything: stopEverything, SchedulingQueue: podQueue, Profiles: profiles, logger: logger, } // 添加默认处理程序此处是重点 sched.applyDefaultHandlers() // event 处理程序 if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil { return nil, fmt.Errorf("adding event handlers: %w", err) } return sched, nil}
Setup
之后就是处理Run
函数了Run
函数中有两个重要部分是sched.SchedulingQueue.Run(logger)
与sched.scheduleOne
// pkg/scheduler/scheduler.go// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.func (sched *Scheduler) Run(ctx context.Context) { logger := klog.FromContext(ctx) // 启用调度器的队列 sched.SchedulingQueue.Run(logger) // We need to start scheduleOne loop in a dedicated goroutine, // because scheduleOne function hangs on getting the next item // from the SchedulingQueue. // If there are no new pods to schedule, it will be hanging there // and if done in this goroutine it will be blocking closing // SchedulingQueue, in effect causing a deadlock on shutdown. // loop 启动调度程序,loop 是 wait.UntilWithContext 的功能 go wait.UntilWithContext(ctx, sched.scheduleOne, 0) <-ctx.Done() sched.SchedulingQueue.Close() }