1. kube-scheduler 的設計
Scheduler 在整個系統中承擔了“承上啟下”的重要功能。“承上”是指它負責接受 Controller Manager 創建的新 Pod,為其安排 Node;“啟下”是指安置工作完成后,目標 Node 上的 kubelet 服務進程接管后續工作。Pod 是 Kubernetes 中最小的調度單元,Pod 被創建出來的工作流程如圖所示:
在這張圖中
第一步通過 apiserver REST API 創建一個 Pod。
然后 apiserver 接收到數據后將數據寫入到 etcd 中。
由于 kube-scheduler 通過 apiserver watch API 一直在監聽資源的變化,這個時候發現有一個新的 Pod,但是這個時候該 Pod 還沒和任何 Node 節點進行綁定,所以 kube-scheduler 就進行調度,選擇出一個合適的 Node 節點,將該 Pod 和該目標 Node 進行綁定。綁定之后再更新消息到 etcd 中。
這個時候一樣的目標 Node 節點上的 kubelet 通過 apiserver watch API 檢測到有一個新的 Pod 被調度過來了,他就將該 Pod 的相關數據傳遞給后面的容器運行時(container runtime),比如 Docker,讓他們去運行該 Pod。
而且 kubelet 還會通過 container runtime 獲取 Pod 的狀態,然后更新到 apiserver 中,當然最后也是寫入到 etcd 中去的。
通過這個流程我們可以看出整個過程中最重要的就是 apiserver watch API 和kube-scheduler的調度策略。
總之,kube-scheduler 的功能是為還沒有和任何 Node 節點綁定的 Pods 逐個地挑選最合適 Pod 的 Node 節點,并將綁定信息寫入 etcd 中。整個調度流程分為,預選(Predicates)和優選(Priorities)兩個步驟。
預選(Predicates):kube-scheduler 根據預選策略(xxx Predicates)過濾掉不滿足策略的 Nodes。例如,官網中給的例子 node3 因為沒有足夠的資源而被剔除。
優選(Priorities):優選會根據優先策略(xxx Priority)為通過預選的 Nodes 進行打分排名,選擇得分最高的 Node。例如,資源越富裕、負載越小的 Node 可能具有越高的排名。
2. kube-scheduler 源碼分析
kubernetes 版本: v1.21
2.1 scheduler.New() 初始化 scheduler 結構體
在程序的入口,是通過一個 runCommand 函數來喚醒 scheduler 的操作的。首先會進入 Setup 函數,它會根據命令參數和選項創建一個完整的 config 和 scheduler。創建 scheduler 的方式就是使用 New 函數。
Scheduler 結構體:
?
SchedulerCache:通過 SchedulerCache 做出的改變將被 NodeLister 和 Algorithm 觀察到。
NextPod :應該是一個阻塞直到下一個 Pod 存在的函數。之所以不使用 channel 結構,是因為調度 pod 可能需要一些時間,k8s 不希望 pod 位于通道中變得陳舊。
Error:在出現錯誤的時候被調用。如果有錯誤,它會傳遞有問題的 pod 信息,和錯誤。
StopEverything:通過關閉它來停止 scheduler。
SchedulingQueue:保存著正在準備被調度的 pod 列表。
Profiles:調度的策略。
scheduler.New() 方法是初始化 scheduler 結構體的,該方法主要的功能是初始化默認的調度算法以及默認的調度器 GenericScheduler。
創建 scheduler 配置文件
根據默認的 DefaultProvider 初始化schedulerAlgorithmSource然后加載默認的預選及優選算法,然后初始化 GenericScheduler
若啟動參數提供了 policy config 則使用其覆蓋默認的預選及優選算法并初始化 GenericScheduler,不過該參數現已被棄用
kubernetes/pkg/scheduler/scheduler.go:189
//New函數創建一個新的scheduler funcNew(clientclientset.Interface,informerFactoryinformers.SharedInformerFactory,recorderFactoryprofile.RecorderFactory,stopCh<-chan?struct{},opts?...Option)?(*Scheduler,?error)?{ ??//查看并設置傳入的參數 ??????…… ??snapshot?:=?internalcache.NewEmptySnapshot() ??//?創建scheduler的配置文件 ??configurator?:=?&Configurator{……} ??metrics.Register() ??var?sched?*Scheduler ??source?:=?options.schedulerAlgorithmSource ??switch?{ ??case?source.Provider?!=?nil: ????//?根據Provider創建config ????sc,?err?:=?configurator.createFromProvider(*source.Provider) ????…… ??case?source.Policy?!=?nil: ????//?根據用戶指定的策略(policy?source)創建config ???? ????//?既然已經設置了策略,在configuation內設置extender為nil ????//?如果沒有,從Configuration的實例里設置extender ????configurator.extenders?=?policy.Extenders ????sc,?err?:=?configurator.createFromConfig(*policy) ????…… ??} ??//?對配置器生成的配置進行額外的調整 ??sched.StopEverything?=?stopEverything ??sched.client?=?client ??addAllEventHandlers(sched,?informerFactory) ??return?sched,?nil }
在 New 函數里提供了兩種初始化 scheduler 的方式,一種是 source.Provider,一種是 source.Policy,最后生成的 config 信息都會通過sched = sc創建新的調度器。Provider 方法對應的是createFromProvider函數,Policy 方法對應的是createFromConfig函數,最后它們都會調用 Create 函數,實例化 podQueue,返回配置好的 Scheduler 結構體。
2.2 Run() 啟動主邏輯
kubernetes 中所有組件的啟動流程都是類似的,首先會解析命令行參數、添加默認值,kube-scheduler 的默認參數在 k8s.io/kubernetes/pkg/scheduler/apis/config/v1alpha1/defaults.go 中定義的。然后會執行 run 方法啟動主邏輯,下面直接看 kube-scheduler 的主邏輯 run 方法執行過程。
Run() 方法主要做了以下工作:
配置了 Configz 參數
啟動事件廣播器,健康檢測服務,http server
啟動所有的 informer
執行 sched.Run() 方法,執行主調度邏輯
kubernetes/cmd/kube-scheduler/app/server.go:136
// Run 函數根據指定的配置執行調度程序。當出現錯誤或者上下文完成的時候才會返回。 funcRun(ctxcontext.Context,cc*schedulerserverconfig.CompletedConfig,sched*scheduler.Scheduler)error{ //為了幫助debug,先記錄Kubernetes的版本號 klog.V(1).Infof("StartingKubernetesSchedulerversion%+v",version.Get()) //1、配置Configz ifcz,err:=configz.New("componentconfig");err==nil{……} //2、準備事件廣播管理器,此處涉及到Events事件 cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) //3、啟動httpserver,進行健康監控服務器監聽 ifcc.InsecureServing!=nil{……} ifcc.InsecureMetricsServing!=nil{……} ifcc.SecureServing!=nil{……} //4、啟動所有informer cc.InformerFactory.Start(ctx.Done()) //等待所有的緩存同步后再進行調度。 cc.InformerFactory.WaitForCacheSync(ctx.Done()) // 5、因為Master節點可以存在多個,選舉一個作為Leader。通過 LeaderElector 運行命令直到完成并退出。 ifcc.LeaderElection!=nil{ cc.LeaderElection.Callbacks=leaderelection.LeaderCallbacks{ OnStartedLeading:func(ctxcontext.Context){ close(waitingForLeader) //6、執行sched.Run()方法,執行主調度邏輯 sched.Run(ctx) }, //鉤子函數,開啟Leading時運行調度,結束時打印報錯 OnStoppedLeading:func(){ klog.Fatalf("leaderelectionlost") }, } leaderElector,err:=leaderelection.NewLeaderElector(*cc.LeaderElection) //參加選舉的會持續通信 leaderElector.Run(ctx) returnfmt.Errorf("lostlease") } //領導者選舉失敗,所以runCommand函數會一直運行直到完成 close(waitingForLeader) //6、執行sched.Run()方法,執行主調度邏輯 sched.Run(ctx) returnfmt.Errorf("finishedwithoutleaderelect") }
這里相比 16 版本增加了一個waitingForLeader的 channel 用來監聽信號
Setup 函數中提到了 Informer。k8s 中有各種類型的資源,包括自定義的。而 Informer 的實現就將調度和資源結合了起來。pod informer 的啟動邏輯是,只監聽 status.phase 不為 succeeded 以及 failed 狀態的 pod,即非 terminating 的 pod。
2.3 sched.Run()開始監聽和調度
然后繼續看 Run() 方法中最后執行的 sched.Run() 調度循環邏輯,若 informer 中的 cache 同步完成后會啟動一個循環邏輯執行 sched.scheduleOne 方法。
kubernetes/pkg/scheduler/scheduler.go:313
// Run函數開始監視和調度。SchedulingQueue開始運行。一直處于調度狀態直到Context完成一直阻塞。 func (sched *Scheduler) Run(ctx context.Context) { sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
sched.SchedulingQueue.Run():會將 backoffQ 中的 Pods 節點和 unschedulableQ 中的節點移至 activeQ 中。即將之前運行失敗的節點和已經等待了很長時間超過時間設定的節點重新進入活躍節點隊列中。
backoffQ 是并發編程中常見的一種機制,就是如果一個任務重復執行,但依舊失敗,則會按照失敗的次數提高重試等待時間,避免頻繁重試浪費資源。
sched.SchedulingQueue.Close(),關閉調度之后,對隊列也進行關閉。SchedulingQueue 是一個優先隊列。
優先作為實現 SchedulingQueue 的實現,其核心數據結構主要包含三個隊列:activeQ、podBackoffQ、unschedulableQ 內部通過 cond 來實現 Pop 操作的阻塞與通知。當前隊列中沒有可調度的 pod 的時候,則通過 cond.Wait 來進行阻塞,然后在往 activeQ 中添加 pod 的時候通過 cond.Broadcast 來實現通知。
wait.UntilWithContext()中出現了 sched.scheduleOne 函數,它負責了為單個 Pod 執行整個調度工作流程,也是本次研究的重點,接下來將會詳細地進行分析。
2.4 scheduleOne() 分配 pod 的流程
scheduleOne() 每次對一個 pod 進行調度,主要有以下步驟:
從 scheduler 調度隊列中取出一個 pod,如果該 pod 處于刪除狀態則跳過
執行調度邏輯 sched.schedule() 返回通過預算及優選算法過濾后選出的最佳 node
如果過濾算法沒有選出合適的 node,則返回 core.FitError
若沒有合適的 node 會判斷是否啟用了搶占策略,若啟用了則執行搶占機制
執行 reserve plugin
pod 對應的 spec.NodeName 寫上 scheduler 最終選擇的 node,更新 scheduler cache
執行 permit plugin
執行 prebind plugin
進行綁定,請求 apiserver 異步處理最終的綁定操作,寫入到 etcd
執行 postbind plugin
kubernetes/pkg/scheduler/scheduler.go:441
準備工作
// scheduleOne為單個pod做整個調度工作流程。它被序列化在調度算法的主機擬合上。 func(sched*Scheduler)scheduleOne(ctxcontext.Context){ //podInfo就是從隊列中獲取到的Pod對象 podInfo:=sched.NextPod() //檢查pod的有效性,當schedulerQueue關閉時,pod可能為nil ifpodInfo==nil||podInfo.Pod==nil{ return } pod:=podInfo.Pod //根據定義的pod.Spec.SchedulerName查到對應的profile fwk,err:=sched.frameworkForPod(pod) iferr!=nil{ //這不應該發生,因為我們只接受調度指定與配置文件之一匹配的調度程序名稱的pod。 klog.ErrorS(err,"Erroroccurred") return } //可以跳過調度的情況,一般pod進不來 ifsched.skipPodSchedule(fwk,pod){ return } klog.V(3).InfoS("Attemptingtoschedulepod","pod",klog.KObj(pod))
調用調度算法,獲取結果
//執行調度策略選擇node start:=time.Now() state:=framework.NewCycleState() state.SetRecordPluginMetrics(rand.Intn(100)
assumedPod 是假設這個 Pod 按照前面的調度算法分配后,進行驗證。告訴緩存假設一個 pod 現在正在某個節點上運行,即使它還沒有被綁定。這使得我們可以繼續調度,而不需要等待綁定的發生。
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) assumedPodInfo:=podInfo.DeepCopy() assumedPod:=assumedPodInfo.Pod //為pod設置NodeName字段,更新scheduler緩存 err=sched.assume(assumedPod,scheduleResult.SuggestedHost) iferr!=nil{……}//如果出現錯誤,重新開始調度 //運行相關插件的代碼不作展示,這里省略運行reserve插件的Reserve方法、運行"permit"插件、運行"prebind"插件. //真正做綁定的動作 err:=sched.bind(bindingCycleCtx,fwk,assumedPod,scheduleResult.SuggestedHost,state) iferr!=nil{ //錯誤處理,清除狀態并重試 }else{ //打印結果,調試時將loglevel調整到2以上 ifklog.V(2).Enabled(){ klog.InfoS("Successfullyboundpodtonode","pod",klog.KObj(pod),"node",scheduleResult.SuggestedHost,"evaluatedNodes",scheduleResult.EvaluatedNodes,"feasibleNodes",scheduleResult.FeasibleNodes) } //metrics中記錄相關的監控指標 metrics.PodScheduled(fwk.ProfileName(),metrics.SinceInSeconds(start)) metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) //運行"postbind"插件
Binder 負責將調度器的調度結果,傳遞給 apiserver,即將一個 pod 綁定到選擇出來的 node 節點。
2.5 sched.Algorithm.Schedule() 選出 node
在上一節中scheduleOne() 通過調用 sched.Algorithm.Schedule() 來執行預選與優選算法處理:
scheduleResult,err:=sched.Algorithm.Schedule(schedulingCycleCtx,fwk,state,pod)
Schedule()方法屬于 ScheduleAlgorithm 接口的一個方法實現。ScheduleAlgorithm 是一個知道如何將 pods 調度到機器上的事物實現的接口。在 1.16 版本中 ScheduleAlgorithm 有四個方法——Schedule()、Preempt()、Predicates():Prioritizers(),現在則是Schedule()、Extenders() 在目前的代碼中進行優化,保證了程序的安全性。代碼中有一個 todo,目前的
名字已經不太符合這個接口所做的工作。
kubernetes/pkg/scheduler/core/generic_scheduler.go 61
type ScheduleAlgorithm interface { Schedule(context.Context, framework.Framework, *framework.CycleState, *v1.Pod) (scheduleResult ScheduleResult, err error) // 擴展器返回擴展器配置的一個片斷。這是為測試而暴露的。 Extenders() []framework.Extender }
點擊查看 Scheduler()的具體實現,發現它是由 genericScheduler 來進行實現的。
kubernetes/pkg/scheduler/core/generic_scheduler.go 97
func(g*genericScheduler)Schedule(ctxcontext.Context,fwkframework.Framework,state*framework.CycleState,pod*v1.Pod)(resultScheduleResult,errerror){ trace:=utiltrace.New("Scheduling",utiltrace.Field{Key:"namespace",Value:pod.Namespace},utiltrace.Field{Key:"name",Value:pod.Name}) defertrace.LogIfLong(100*time.Millisecond) //1.快照node信息,每次調度pod時都會獲取一次快照 iferr:=g.snapshot();err!=nil{ returnresult,err } trace.Step("Snapshottingschedulercacheandnodeinfosdone") ifg.nodeInfoSnapshot.NumNodes()==0{ returnresult,ErrNoNodesAvailable } // 2.Predict階段:找到所有滿足調度條件的節點feasibleNodes,不滿足的就直接過濾 feasibleNodes,diagnosis,err:=g.findNodesThatFitPod(ctx,fwk,state,pod) iferr!=nil{ returnresult,err } trace.Step("Computingpredicatesdone") //3.預選后沒有合適的node直接返回 iflen(feasibleNodes)==0{ returnresult,&framework.FitError{ Pod:pod, NumAllNodes:g.nodeInfoSnapshot.NumNodes(), Diagnosis:diagnosis, } } //4.當預選之后只剩下一個node,就使用它 iflen(feasibleNodes)==1{ returnScheduleResult{ SuggestedHost:feasibleNodes[0].Name, EvaluatedNodes:1+len(diagnosis.NodeToStatusMap), FeasibleNodes:1, },nil } // 5.Priority階段:執行優選算法,獲取打分之后的node列表 priorityList,err:=g.prioritizeNodes(ctx,fwk,state,pod,feasibleNodes) iferr!=nil{ returnresult,err } //6.根據打分選擇分數最高的node host,err:=g.selectHost(priorityList) trace.Step("Prioritizingdone") returnScheduleResult{ SuggestedHost:host, EvaluatedNodes:len(feasibleNodes)+len(diagnosis.NodeToStatusMap), FeasibleNodes:len(feasibleNodes), },err }
流程圖如圖所示:
在程序運行的整個過程中會使用 trace 來記錄當前的運行狀態,做安全處理。
如果超過了 trace 預定的時間會進行回滾
至此整個 Scheduler 分配 node 節點給 pod 的調度策略的基本流程介紹完畢。
2.6 總結
在本章節中,首先對 Kube-scheduler 進行了介紹。它在整個 k8s 的系統里,啟承上啟下的中藥作用,是核心組件之一。它的目的就是為每一個 pod 選擇一個合適的 node,整體流程可以概括為五步:
首先是 scheduler 組件的初始化;
其次是客戶端發起 command,啟動調度過程中用的服務,比如事件廣播管理器,啟動所有的 informer 組件等等;
再次是啟動整個調度器的主流程,特別需要指出的是,整個流程都會堵塞在wait.UntilWithContext()這個函數中,一直調用ScheduleOne()進行 pod 的調度分配策略。
然后客戶獲取未調度的 podList,通過執行調度邏輯 sched.schedule() 為 pod 選擇一個合適的 node,如果沒有合適的 node,則觸發搶占的操作,最后提進行綁定,請求 apiserver 異步處理最終的綁定操作,寫入到 etcd,其核心則是一系列調度算法的設計與執行。
最后對一系列的調度算法進行了解讀,調度過程主要為,對當前的節點情況做快照,然后通過預選和優選兩個主要步驟,為 pod 分配一個合適的 node。
3. 預選與優選算法源碼細節分析
3.1 預選算法
預選顧名思義就是從當前集群中的所有的 node 中進行過濾,選出符合當前 pod 運行的 nodes。預選的核心流程是通過findNodesThatFit來完成,其返回預選結果供優選流程使用。預選算法的主要邏輯如圖所示:
kubernetes/pkg/scheduler/core/generic_scheduler.go 223
//根據prefilter插件和extender過濾節點以找到適合 pod 的節點。 func(g*genericScheduler)findNodesThatFitPod(ctxcontext.Context,fwkframework.Framework,state*framework.CycleState,pod*v1.Pod)([]*v1.Node,framework.Diagnosis,error){ // prefilter插件用于預處理 Pod 的相關信息,或者檢查集群或 Pod 必須滿足的某些條件。 s:=fwk.RunPreFilterPlugins(ctx,state,pod) …… //查找能夠滿足filter過濾插件的節點,返回結果有可能是0,1,N feasibleNodes,err:=g.findNodesThatPassFilters(ctx,fwk,state,pod,diagnosis,allNodes) //查找能夠滿足Extenders過濾插件的節點,返回結果有可能是0,1,N feasibleNodes,err=g.findNodesThatPassExtenders(pod,feasibleNodes,diagnosis.NodeToStatusMap) returnfeasibleNodes,diagnosis,nil }
這個方法首先會通過前置過濾器來校驗 pod 是否符合條件;
然后調用findNodesThatPassFilters方法過濾掉不符合條件的 node。這樣就能設定最多需要檢查的節點數,作為預選節點數組的容量,避免總結點過多影響效率。
最后是findNodesThatPassExtenders函數,它是 kubernets 留給用戶的外部擴展方式,暫且不表。
findNodesThatPassFilters查找適合過濾器插件的節點,在這個方法中首先會根據numFeasibleNodesToFind方法選擇參與調度的節點的數量,調用Parallelizer().Until方法開啟 16 個線程來調用checkNode方法尋找合適的節點。判別節點合適的方式函數為checkNode(),函數中會對節點進行兩次檢查,確保所有的節點都有相同的機會被選擇。
kubernetes/pkg/scheduler/core/generic_scheduler.go 274
func(g*genericScheduler)findNodesThatPassFilters(ctxcontext.Context,fwkframework.Framework,state*framework.CycleState,pod*v1.Pod,diagnosisframework.Diagnosis,nodes[]*framework.NodeInfo)([]*v1.Node,error){……} //根據集群節點數量選擇參與調度的節點的數量 numNodesToFind:=g.numFeasibleNodesToFind(int32(len(nodes))) //初始化一個大小和numNodesToFind一樣的數組,用來存放node節點 feasibleNodes:=make([]*v1.Node,numNodesToFind) …… checkNode:=func(iint){ //我們從上一個調度周期中停止的地方開始檢查節點,這是為了確保所有節點都有相同的機會在pod中被檢查 nodeInfo:=nodes[(g.nextStartNodeIndex+i)%len(nodes)] status:=fwk.RunFilterPluginsWithNominatedPods(ctx,state,pod,nodeInfo) ifstatus.Code()==framework.Error{ errCh.SendErrorWithCancel(status.AsError(),cancel) return } //如果該節點合適,那么放入到feasibleNodes列表中 ifstatus.IsSuccess(){……} } …… //開啟N個線程并行尋找符合條件的node節點,數量等于feasibleNodes。一旦找到配置的可行節點數,就停止搜索更多節點。 fwk.Parallelizer().Until(ctx,len(nodes),checkNode) processedNodes:=int(feasibleNodesLen)+len(diagnosis.NodeToStatusMap) //設置下次開始尋找node的位置 g.nextStartNodeIndex=(g.nextStartNodeIndex+processedNodes)%len(nodes) //合并返回結果 feasibleNodes=feasibleNodes[:feasibleNodesLen] returnfeasibleNodes,nil }
在整個函數調用的過程中,有個很重要的函數——checkNode()會被傳入函數,進行每個 node 節點的判斷。具體更深入的細節將會在 3.1.2 節進行介紹。現在根據這個函數的定義可以看出,RunFilterPluginsWithNominatedPods會判斷當前的 node 是否符合要求。如果當前的 node 符合要求,就講當前的 node 加入預選節點的數組中(feasibleNodes),如果不符合要求,那么就加入到失敗的數組中,并且記錄原因。
3.1.1 確定參與調度的節點的數量
numFeasibleNodesToFind 返回找到的可行節點的數量,調度程序停止搜索更多可行節點。算法的具體邏輯如下圖所示:
找出能夠進行調度的節點,如果節點小于minFeasibleNodesToFind(默認值為 100),那么全部節點參與調度。
percentageOfNodesToScore參數值是一個集群中所有節點的百分比,范圍是 1 和 100 之間,0 表示不啟用。如果集群節點數大于 100,那么就會根據這個值來計算讓合適的節點數參與調度。
舉個例子,如果一個 5000 個節點的集群,percentageOfNodesToScore會默認設置為 10%,也就是 500 個節點參與調度。因為如果一個 5000 節點的集群來進行調度的話,不進行控制時,每個 pod 調度都需要嘗試 5000 次的節點預選過程時非常消耗資源的。
如果百分比后的數目小于minFeasibleNodesToFind,那么還是要返回最小節點的數目。
kubernetes/pkg/scheduler/core/generic_scheduler.go 179
func(g*genericScheduler)numFeasibleNodesToFind(numAllNodesint32)(numNodesint32){ //對于一個小于minFeasibleNodesToFind(100)的節點,全部節點參與調度 //percentageOfNodesToScore參數值是一個集群中所有節點的百分比,范圍是1和100之間,0表示不啟用,如果大于100,就是全量取樣 //這兩種情況都是直接便利整個集群中的所有節點 ifnumAllNodes=100{ returnnumAllNodes } adaptivePercentage:=g.percentageOfNodesToScore //當numAllNodes大于100時,如果沒有設置percentageOfNodesToScore,那么這里需要計算出一個值 ifadaptivePercentage<=?0?{ ??????basePercentageOfNodesToScore?:=?int32(50) ??????adaptivePercentage?=?basePercentageOfNodesToScore?-?numAllNodes/125 ??????if?adaptivePercentage?
3.1.2 并行化二次篩選節點
并行取樣主要通過調用工作隊列的ParallelizeUntil函數來啟動 N 個 goroutine 來進行并行取樣,并通過 ctx 來協調退出。選取節點的規則由函數 checkNode 來定義,checkNode里面使用RunFilterPluginsWithNominatedPods篩選出合適的節點。
在 k8s 中經過調度器調度后的 pod 結果會放入到 SchedulingQueue 中進行暫存,這些 pod 未來可能會經過后續調度流程運行在提議的 node 上,也可能因為某些原因導致最終沒有運行,而預選流程為了減少后續因為調度沖突,則會在進行預選的時候,將這部分 pod 考慮進去。如果在這些 pod 存在的情況下,node 可以滿足當前 pod 的篩選條件,則可以去除被提議的 pod 再進行篩選。
在搶占的情況下我們會運行兩次過濾器。如果節點有大于或等于優先級的被提名的 pod,我們在這些 pod 被添加到 PreFilter 狀態和 nodeInfo 時運行它們。如果所有的過濾器在這一次都成功了,我們在這些被提名的 pod 沒有被添加時再運行它們。
kubernetes/pkg/scheduler/framework/runtime/framework.go 650
func(f*frameworkImpl)RunFilterPluginsWithNominatedPods(ctxcontext.Context,state*framework.CycleState,pod*v1.Pod,info*framework.NodeInfo)*framework.Status{ varstatus*framework.Status // podsAdded主要用于標識當前是否有提議的pod如果沒有提議的pod則就不需要再進行一輪篩選了。 podsAdded:=false //待檢查的 Node 是一個即將被搶占的節點,調度器就會對這個Node用同樣的 Predicates 算法運行兩遍。 fori:=0;i2;?i++?{ ??????stateToUse?:=?state ??????nodeInfoToUse?:=?info ??????//處理優先級pod的邏輯 ??????if?i?==?0?{ ?????????var?err?error ??????//查找是否有優先級大于或等于當前pod的NominatedPods,然后加入到nodeInfoToUse中 ?????????podsAdded,?stateToUse,?nodeInfoToUse,?err?=?addNominatedPods(ctx,?f,?pod,?state,?info) ????????//?如果第一輪篩選出錯,則不會進行第二輪篩選 ?????????if?err?!=?nil?{ ????????????return?framework.AsStatus(err) ?????????} ??????}?else?if?!podsAdded?||?!status.IsSuccess()?{ ?????????break ??????} ??????//運行過濾器檢查該pod是否能運行在該節點上 ??????statusMap?:=?f.RunFilterPlugins(ctx,?stateToUse,?pod,?nodeInfoToUse) ??????status?=?statusMap.Merge() ??????if?!status.IsSuccess()?&&?!status.IsUnschedulable()?{ ?????????return?status ??????} ???} ???return?status }
這個方法用來檢測 node 是否能通過過濾器,此方法會在調度 Schedule 和搶占 Preempt 的時被調用,如果在 Schedule 時被調用,那么會測試 node,能否可以讓所有存在的 pod 以及更高優先級的 pod 在該 node 上運行。如果在搶占時被調用,那么我們首先要移除搶占失敗的 pod,添加將要搶占的 pod。
RunFilterPlugins 會運行過濾器,過濾器總共有這些:nodeunschedulable, noderesources, nodename, nodeports, nodeaffinity, volumerestrictions, tainttoleration, nodevolumelimits, nodevolumelimits, nodevolumelimits, nodevolumelimits, volumebinding, volumezone, podtopologyspread, interpodaffinity。這里就不詳細贅述。
至此關于預選模式的調度算法的執行過程已經分析完畢。
3.2 優選算法
優選階段通過分離計算對象來實現多個 node 和多種算法的并行計算,并且通過基于二級索引來設計最終的存儲結果,從而達到整個計算過程中的無鎖設計,同時為了保證分配的隨機性,針對同等優先級的采用了隨機的方式來進行最終節點的分配。這個思路很值得借鑒。
在上文中,我們提到在優化過程是先通過 prioritizeNodes 獲得 priorityList,然后再通過 selectHost 函數獲得得分最高的 Node,返回結果。
3.2.1 prioritizeNodes
在 prioritizeNodes 函數中會將需要調度的 Pod 列表和 Node 列表傳入各種優選算法進行打分排序,最終整合成結果集 priorityList。priorityList 是一個 framework.NodeScoreList 的結構體,結構如下面的代碼所示:
//NodeScoreList聲明一個節點列表及節點分數 typeNodeScoreList[]NodeScore //NodeScore節點和節點分數的結構體 typeNodeScorestruct{ Namestring Scoreint64 }
prioritizeNodes 通過運行評分插件對節點進行優先排序,這些插件從 RunScorePlugins()的調用中為每個節點返回一個分數。每個插件的分數和 Extender 的分數加在一起,成為該節點的分數。整個流程如圖所示:
由于 prioritizeNodes 的邏輯太長,這里將他們分四個部分,如下所示:
準備階段
func(g*genericScheduler)prioritizeNodes(ctxcontext.Context,fwkframework.Framework,state*framework.CycleState,pod*v1.Pod,nodes[]*v1.Node,)(framework.NodeScoreList,error){ //如果沒有提供優先級配置(即沒有Extender也沒有ScorePlugins),則所有節點的得分為 1。這是生成所需格式的優先級列表所必需的 iflen(g.extenders)==0&&!fwk.HasScorePlugins(){ result:=make(framework.NodeScoreList,0,len(nodes)) fori:=rangenodes{ result=append(result,framework.NodeScore{ Name:nodes[i].Name, Score:1, }) } returnresult,nil } //運行PreScore插件,準備評分數據 preScoreStatus:=fwk.RunPreScorePlugins(ctx,state,pod,nodes) if!preScoreStatus.IsSuccess(){ returnnil,preScoreStatus.AsError() }
運行 Score 插件進行評分
//運行Score插件對Node進行評分,此處需要知道的是scoresMap的類型是map[string][]NodeScore。scoresMap的key是插件名字,value是該插件對所有Node的評分 scoresMap,scoreStatus:=fwk.RunScorePlugins(ctx,state,pod,nodes) if!scoreStatus.IsSuccess(){ returnnil,scoreStatus.AsError() } //result用于匯總所有分數 result:=make(framework.NodeScoreList,0,len(nodes)) //將分數按照node的維度進行匯總,循環執行len(nodes)次 fori:=rangenodes{ //先在result中塞滿所有node的Name,Score初始化為0; result=append(result,framework.NodeScore{Name:nodes[i].Name,Score:0}) //執行了多少個scoresMap就有多少個Score,所以這里遍歷len(scoresMap)次; forj:=rangescoresMap{ //每個算法對應第i個node的結果分值加權后累加; result[i].Score+=scoresMap[j][i].Score } }
Score 插件中獲取的分數會直接記錄在 result[i].Score,result 就是最終返回結果的 priorityList。
RunScorePlugins里面分別調用 parallelize.Until 方法跑三次來進行打分:
第一次會調用runScorePlugin方法,里面會調用 getDefaultConfig 里面設置的 score 的 Plugin 來進行打分;
第二次會調用runScoreExtension方法,里面會調用 Plugin 的NormalizeScore方法,用來保證分數必須是 0 到 100 之間,不是每一個 plugin 都會實現 NormalizeScore 方法。
第三次會調用遍歷所有的scorePlugins,并對對應的算出的來的分數乘以一個權重。
打分的 plugin 共有:noderesources, imagelocality, interpodaffinity, noderesources, nodeaffinity, nodepreferavoidpods, podtopologyspread, tainttoleration
配置的 Extender 的評分獲取
//如果配置了Extender,還要調用Extender對Node評分并累加到result中 iflen(g.extenders)!=0&&nodes!=nil{ //因為要多協程并發調用Extender并統計分數,所以需要鎖來互斥寫入Node分數 varmusync.Mutex varwgsync.WaitGroup //combinedScores的key是Node名字,value是Node評分 combinedScores:=make(map[string]int64,len(nodes)) fori:=rangeg.extenders{ //如果Extender不管理Pod申請的資源則跳過 if!g.extenders[i].IsInterested(pod){ continue } //啟動協程調用Extender對所有Node評分。 wg.Add(1) gofunc(extIndexint){ deferfunc(){ wg.Done() }() //調用Extender對Node進行評分 prioritizedList,weight,err:=g.extenders[extIndex].Prioritize(pod,nodes) iferr!=nil{ //擴展器的優先級錯誤可以忽略,讓k8s/其他擴展器確定優先級。 return } mu.Lock() fori:=range*prioritizedList{ host,score:=(*prioritizedList)[i].Host,(*prioritizedList)[i].Score // Extender的權重是通過Prioritize()返回的,其實該權重是人工配置的,只是通過Prioritize()返回使用上更方便。 //合并后的評分是每個Extender對Node評分乘以權重的累加和 combinedScores[host]+=score*weight } mu.Unlock() }(i) } //等待所有的go routines結束,調用時間取決于最慢的Extender。 wg.Wait()
Extender 這里有幾個很有趣的設置
首先是擴展器中如果出現了評分的錯誤,可以忽略,而不是想預選階段那樣直接返回報錯。
能這樣做的原因是,因為評分不同于過濾,對錯誤不敏感。過濾如果失敗是要返回錯誤的(如果不能忽略),因為 Node 可能無法滿足 Pod 需求;而評分無非是選擇最優的節點,評分錯誤只會對選擇最優有一點影響,但是不會造成故障。
其次是使用了 combinedScores 來記錄分數,考慮到 Extender 和 Score 插件返回的評分的體系會存在出入,所以這邊并沒有直接累加。而是后續再進行一次遍歷麻將 Extender 的評分標準化之后才與原先的 Score 插件評分進行累加。
最后是關于鎖的使用
在評分的設置里面,使用了多協程來并發進行評分。在最后分數進行匯總的時候會出現并發寫的問題,為了避免這種現象的出現,k8s 的程序中對從 prioritizedList 里面讀取節點名稱和分數,然后寫入combinedScores的過程中上了互斥鎖。
為了記錄所有并發讀取 Extender 的協程,這里使用了 wait Group 這樣的數據結構來保證,所有的 go routines 結束再進行最后的分數累加。這里存在一個程序性能的問題,所有的線程只要有一個沒有運行完畢,程序就會卡在這一步。即便是多協程并發調用 Extender,也會存在木桶效應,即調用時間取決于最慢的 Extender。雖然 Extender 可能都很快,但是網絡延時是一個比較常見的事情,更嚴重的是如果 Extender 異常造成調度超時,那么就拖累了整個 kube-scheduler 的調度效率。這是一個后續需要解決的問題
分數的累加,返回結果集 priorityList
fori:=rangeresult{ //最終Node的評分是所有ScorePlugin分數總和+所有Extender分數總和 //此處標準化了Extender的評分,使其范圍與ScorePlugin一致,否則二者沒法累加在一起。 result[i].Score+=combinedScores[result[i].Name]*(framework.MaxNodeScore/extenderv1.MaxExtenderPriority) } } returnresult,nil }
優選算法由一系列的 PriorityConfig(也就是 PriorityConfig 數組)組成,每個 Config 代表了一個算法,Config 描述了權重 Weight、Function(一種優選算法函數類型)。需要調度的 Pod 分別對每個合適的 Node(N)執行每個優選算法(A)進行打分,最后得到一個二維數組,元素分別為 A1N1,A1N2,A1N3… ,行代表一個算法對應不同的 Node 計算得到的分值,列代表同一個 Node 對應不同算法的分值:
N1 | N2 | N3 | |
---|---|---|---|
A1 | { Name:“node1”,Score:5,PriorityConfig:{…weight:1}} | { Name:“node2”,Score:3,PriorityConfig:{…weight:1}} | { Name:“node3”,Score:1,PriorityConfig:{…weight:1}} |
A2 | { Name:“node1”,Score:6,PriorityConfig:{…weight:1}} | { Name:“node2”,Score:2,PriorityConfig:{…weight:1}} | { Name:“node3”,Score:3,PriorityConfig:{…weight:1}} |
A3 | { Name:“node1”,Score:4,PriorityConfig:{…weight:1}} | { Name:“node2”,Score:7,PriorityConfig:{…weight:1.}} | { Name:“node3”,Score:2,PriorityConfig:{…weight:1}} |
最后將結果合并(Combine)成一維數組 HostPriorityList :HostPriorityList =[{ Name:"node1",Score:15},{ Name:"node2",Score:12},{ Name:"node3",Score:6}]這樣就完成了對每個 Node 進行優選算法打分的流程。
Combine 的過程非常簡單,只需要將 Node 名字相同的分數進行加權求和統計即可。
最終得到一維數組 HostPriorityList,也就是前面提到的 HostPriority 結構體的集合。就這樣實現了為每個 Node 的打分 Priority 優選過程。
3.2.2 selectHost選出得分最高的 Node
priorityList 數組保存了每個 Node 的名字和它對應的分數,最后通過selectHost函數選出分數最高的 Node 對 Pod 進行綁定和調度。selectHost通過傳入的 priorityList,然后以隨機篩選的的方式從得分最高的節點們中挑選一個。
這里的隨機篩選是指的當多個 host 優先級相同的時候,會有一定的概率用當前的 node 替換之前的優先級相等的 node(到目前為止的優先級最高的 node), 其主要通過`cntOfMaxScore和rand.Intn(cntOfMaxScore)來進行實現。
//selectHost()根據所有可行Node的評分找到最優的Node func(g*genericScheduler)selectHost(nodeScoreListframework.NodeScoreList)(string,error){ //沒有可行Node的評分,返回錯誤 iflen(nodeScoreList)==0{ return"",fmt.Errorf("emptypriorityList") } //在nodeScoreList中找到分數最高的Node,初始化第0個Node分數最高 maxScore:=nodeScoreList[0].Score selected:=nodeScoreList[0].Name //如果最高分數相同,先統計數量(cntOfMaxScore) cntOfMaxScore:=1 for_,ns:=rangenodeScoreList[1:]{ ifns.Score>maxScore{ maxScore=ns.Score selected=ns.Name cntOfMaxScore=1 }elseifns.Score==maxScore{ //分數相同就累計數量 cntOfMaxScore++ ifrand.Intn(cntOfMaxScore)==0{ //以1/cntOfMaxScore的概率成為最優Node selected=ns.Name } } } returnselected,nil }
只有同時滿足 FilterPlugin 和 Extender 的過濾條件的 Node 才是可行 Node,調度算法優先用 FilterPlugin 過濾,然后在用 Extender 過濾,這樣可以盡量減少傳給 Extender 的 Node 數量;調度算法為待調度的 Pod 對每個可行 Node(過濾通過)進行評分,評分方法是:
其中 f(x)和 g(x)是標準化分數函數,w 為權重;分數最高的 Node 為最優候選 Node,當有多個 Node 都為最高分數時,每個 Node 有 1/n 的概率成最優 Node;調度算法并不是對調度框架和調度插件再抽象和封裝,只是對調度周期從 PreFilter 到 Score 的過程的一種抽象,其中 PostFilter 不在調度算法抽象范圍內。因為 PostFilter 與過濾無關,是用來實現搶占的擴展點;
3.3 總結
Scheduler 調度器,在 k8s 的整個代碼中處于一個承上啟下的作用。了解 Scheduler 在哪個過程中發揮作用,更能夠理解它的重要性。
本文第二章,主要對于 kube-scheduler v1.21 的調度流程進行了分析,但由于選擇的議題實在是太大,這里這對正常流程中的調度進行源碼的解析,其中有大量的細節都暫未提及,包括搶占調度、framework、extender 等實現。通過源碼閱讀可以發現,Pod 的調度是通過一個隊列 SchedulingQueue 異步工作的,隊列對 pod 時間進行監聽,并且進行調度流程。單個 pod 的調度主要分為 3 個步驟:
1)根據 Predict 和 Priority 兩個階段選擇最優的 Node;
2)為了提升效率,假設 Pod 已經被調度到對應的 Node,保存到 cache 中;
3)通過 extender 和各種插件進行驗證,如果通過就進行綁定。
在接受到命令之后,程序會現在scheduler.New() 初始化 scheduler 結構體,然后通過 Run() 函數啟動調度的主邏輯,喚醒 sched.Run()。在 sched.Run()中會一直監聽和調度,通過隊列的方式給 pod 分配合適的 node。scheduleOne() 里面是整個分配 pod 調度過程的主要邏輯,因為篇幅有限,這里只對 sched.Algorithm.Schedule() 進行了深入的了解。bind 和后續的操作就停留在scheduleOne()這里沒有再進行深入。
因篇幅有限,以及個人的興趣導向,在正常流程介紹完畢之后第三章對正常調度過程中的優選和預選策略再次進行深入的代碼閱讀。以期能夠對正常調度的細節有更好的把握。如果時間可以再多些,可以更細致到對具體的調度算法進行分析,這里因為篇幅有限,預選的部分就只介紹了根據 predict 過程中的 NameNode 函數。
-
函數
+關注
關注
3文章
4355瀏覽量
63319 -
代碼
+關注
關注
30文章
4858瀏覽量
69553 -
調度算法
+關注
關注
1文章
68瀏覽量
12024
原文標題:還沒吃透 K8S 調度器?看這篇文章就夠了
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
leader選舉在kubernetes controller中是如何實現的
請問AD9835的refin和refout引腳檢測不到1.21V會影響波形輸出嗎
long double to string函數找不到本地支持
終端重置功能返回SDK V1.21的問題
關于RTT中scheduler線程調度的學習
從零開始入門 K8s | 調度器的調度流程和算法介紹
容器進程調度時是該優先考慮CPU資源還是內存資源
NVIDIA Triton 系列文章(11):模型類別與調度器-1
Linux調度器的核心scheduler_tick介紹
重新分配pod節點

評論