daemonset controller分析

daemonset controller简介

daemonset controller是kube-controller-manager组件中众多控制器中的一个,是 daemonset 资源对象的控制器,其通过对daemonset、pod、node、ControllerRevision四种资源的监听,当这四种资源发生变化时会触发 daemonset controller 对相应的daemonset资源进行调谐操作,从而完成daemonset在合适node上pod的创建、在不合适node上pod的删除、daemonset的滚动更新、daemonset状态status更新、旧版本daemonset清理等操作。

daemonset controller架构图

daemonset controller的大致组成和处理流程如下图,daemonset controller对daemonset、pod、node、ControllerRevision对象注册了event handler,当有事件时,会watch到然后将对应的daemonset对象放入到queue中,然后syncDaemonset方法为daemonset controller调谐daemonset对象的核心处理逻辑所在,从queue中取出daemonset对象,做调谐处理。

daemonset更新策略

(1)OnDelete:使用 OnDelete 更新策略时,在更新 DaemonSet pod模板后,只有当你手动删除老的 DaemonSet pods 之后,新的 DaemonSet Pod 才会被自动创建。

(2)RollingUpdate:默认的更新策略。使用 RollingUpdate 更新策略时,在更新 DaemonSet pod模板后, 老的 DaemonSet pods 将被删除,并且将根据滚动更新配置自动创建新的 DaemonSet pods。 滚动更新期间,最多只能有 DaemonSet 的一个 Pod 运行于每个节点上。

daemonset controller分析将分为两大块进行,分别是:

(1)daemonset controller初始化与启动分析;

(2)daemonset controller处理逻辑分析。

1.daemonset controller初始化与启动分析

基于tag v1.17.4

https://github.com/kubernetes/kubernetes/releases/tag/v1.17.4

直接看到startDaemonSetController函数,作为daemonset controller初始化与启动分析的入口。

startDaemonSetController

startDaemonSetController主要逻辑:

(1)调用daemon.NewDaemonSetsController新建并初始化DaemonSetsController;

(2)拉起一个goroutine,跑DaemonSetsController的Run方法。

// cmd/kube-controller-manager/app/apps.go
func startDaemonSetController(ctx ControllerContext) (http.Handler, bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "daemonsets"}] {
return nil, false, nil
}
dsc, err := daemon.NewDaemonSetsController(
ctx.InformerFactory.Apps().V1().DaemonSets(),
ctx.InformerFactory.Apps().V1().ControllerRevisions(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
flowcontrol.NewBackOff(1*time.Second, 15*time.Minute),
)
if err != nil {
return nil, true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(int(ctx.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs), ctx.Stop)
return nil, true, nil
}

1.1 daemon.NewDaemonSetsController

daemon.NewDaemonSetsController函数代码中可以看到,daemonset controller注册了daemonset、node、pod与ControllerRevisions对象的EventHandler,也即对这几个对象的event进行监听,把event放入事件队列并做处理。并且将dsc.syncDaemonSet方法赋值给dsc.syncHandler,也即注册为核心处理方法,在dsc.Run方法中会调用该核心处理方法来调谐daemonset对象(核心处理方法后面会进行详细分析)。

// pkg/controller/daemon/daemon_controller.go
func NewDaemonSetsController(
daemonSetInformer appsinformers.DaemonSetInformer,
historyInformer appsinformers.ControllerRevisionInformer,
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
failedPodsBackoff *flowcontrol.Backoff,
) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dsc := &DaemonSetsController{
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
podControl: controller.RealPodControl{
KubeClient: kubeClient,
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "daemonset-controller"}),
},
crControl: controller.RealControllerRevisionControl{
KubeClient: kubeClient,
},
burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
} daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ds := obj.(*apps.DaemonSet)
klog.V(4).Infof("Adding daemon set %s", ds.Name)
dsc.enqueueDaemonSet(ds)
},
UpdateFunc: func(old, cur interface{}) {
oldDS := old.(*apps.DaemonSet)
curDS := cur.(*apps.DaemonSet)
klog.V(4).Infof("Updating daemon set %s", oldDS.Name)
dsc.enqueueDaemonSet(curDS)
},
DeleteFunc: dsc.deleteDaemonset,
})
dsc.dsLister = daemonSetInformer.Lister()
dsc.dsStoreSynced = daemonSetInformer.Informer().HasSynced historyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addHistory,
UpdateFunc: dsc.updateHistory,
DeleteFunc: dsc.deleteHistory,
})
dsc.historyLister = historyInformer.Lister()
dsc.historyStoreSynced = historyInformer.Informer().HasSynced // Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
DeleteFunc: dsc.deletePod,
})
dsc.podLister = podInformer.Lister() // This custom indexer will index pods based on their NodeName which will decrease the amount of pods we need to get in simulate() call.
podInformer.Informer().GetIndexer().AddIndexers(cache.Indexers{
"nodeName": indexByPodNodeName,
})
dsc.podNodeIndex = podInformer.Informer().GetIndexer()
dsc.podStoreSynced = podInformer.Informer().HasSynced nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,
},
)
dsc.nodeStoreSynced = nodeInformer.Informer().HasSynced
dsc.nodeLister = nodeInformer.Lister() dsc.syncHandler = dsc.syncDaemonSet
dsc.enqueueDaemonSet = dsc.enqueue dsc.failedPodsBackoff = failedPodsBackoff return dsc, nil
}

1.2 dsc.Run

主要看到for循环处,根据workers的值(默认值为2),启动相应数量的goroutine,跑dsc.runWorker方法,主要是调用前面讲到的daemonset controller核心处理方法dsc.syncDaemonSet

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer dsc.queue.ShutDown() klog.Infof("Starting daemon sets controller")
defer klog.Infof("Shutting down daemon sets controller") if !cache.WaitForNamedCacheSync("daemon sets", stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced, dsc.historyStoreSynced, dsc.dsStoreSynced) {
return
} for i := 0; i < workers; i++ {
go wait.Until(dsc.runWorker, time.Second, stopCh)
} go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, stopCh) <-stopCh
}

1.2.1 dsc.runWorker

从queue队列中取出事件key,并调用dsc.syncHandledsc.syncDaemonSet做调谐处理。queue队列里的事件来源前面讲过,是daemonset controller注册的daemonset、node、pod与ControllerRevisions对象的EventHandler,它们的变化event会被监听到然后放入queue中。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) runWorker() {
for dsc.processNextWorkItem() {
}
} func (dsc *DaemonSetsController) processNextWorkItem() bool {
dsKey, quit := dsc.queue.Get()
if quit {
return false
}
defer dsc.queue.Done(dsKey) err := dsc.syncHandler(dsKey.(string))
if err == nil {
dsc.queue.Forget(dsKey)
return true
} utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
dsc.queue.AddRateLimited(dsKey) return true
}

2.daemonset controller核心处理逻辑分析

syncDaemonSet

直接看到daemonset controller核心处理方法syncDaemonSet。

主要逻辑:

(1)获取执行方法时的当前时间,并定义defer函数,用于计算该方法总执行时间,也即统计对一个 daemonset 进行同步调谐操作的耗时;

(2)根据 daemonset 对象的命名空间与名称,获取 daemonset 对象;

(3)获取所有node的对象列表;

(4)判断daemonset对象的DeletionTimestamp是否为空,不为空则直接return,代表该daemonset对象正在被删除,无需再调谐;

(5)调用dsc.constructHistory获取daemonset的历史版本;

(6)调用dsc.expectations.SatisfiedExpectations,判断该daemonset对象是否满足expectations机制(expectations机制与replicaset controller分析中的用途一致,这里不再展开分析),不满足则调用dsc.updateDaemonSetStatus更新daemonset状态后直接return;

(7)调用dsc.manage,dsc.manage方法中不区分新旧daemonset版本的pod,只保证daemonset的pod运行在每一个合适条件的node上,在合适的node上没有daemonset的pod时创建pod,且把不符合条件的node上的daemonset pod删除掉;

(8)再次调用dsc.expectations.SatisfiedExpectations判断是否满足expectations机制,满足则判断daemonset配置的更新策略,如果是滚动更新则调用dsc.rollingUpdate,主要用于处理daemonset对象的滚动更新处理,根据配置的滚动更新配置,删除旧的pod(pod的创建操作在dsc.manage方法中进行);

当daemonset更新策略配置为OnDelete时,这里不做额外处理,因为只有当手动删除老的 DaemonSet pods 之后,新的 DaemonSet Pod 才会被自动创建,手动删除老的pod后,将在dsc.manage方法中创建新版本的pod;

(9)调用dsc.cleanupHistory,根据daemonset的spec.revisionHistoryLimit配置以及版本新旧顺序(优先清理最老旧版本)来清理daemonset的已经不存在pod的历史版本;

(10)最后调用dsc.updateDaemonSetStatus,根据现存daemonset pod的部署情况以及pod的状态、node是否满足pod运行条件等信息,更新daemonset的status。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
}() namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
if errors.IsNotFound(err) {
klog.V(3).Infof("daemon set has been deleted %v", key)
dsc.expectations.DeleteExpectations(key)
return nil
}
if err != nil {
return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
} nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
} everything := metav1.LabelSelector{}
if reflect.DeepEqual(ds.Spec.Selector, &everything) {
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, SelectingAllReason, "This daemon set is selecting all pods. A non-empty selector is required.")
return nil
} // Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
} // If the DaemonSet is being deleted (either by foreground deletion or
// orphan deletion), we cannot be sure if the DaemonSet history objects
// it owned still exist -- those history objects can either be deleted
// or orphaned. Garbage collector doesn't guarantee that it will delete
// DaemonSet pods before deleting DaemonSet history objects, because
// DaemonSet history doesn't own DaemonSet pods. We cannot reliably
// calculate the status of a DaemonSet being deleted. Therefore, return
// here without updating status for the DaemonSet being deleted.
if ds.DeletionTimestamp != nil {
return nil
} // Construct histories of the DaemonSet, and get the hash of current history
cur, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey] if !dsc.expectations.SatisfiedExpectations(dsKey) {
// Only update status. Don't raise observedGeneration since controller didn't process object of that generation.
return dsc.updateDaemonSetStatus(ds, nodeList, hash, false)
} err = dsc.manage(ds, nodeList, hash)
if err != nil {
return err
} // Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds, nodeList, hash)
}
if err != nil {
return err
}
} err = dsc.cleanupHistory(ds, old)
if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
} return dsc.updateDaemonSetStatus(ds, nodeList, hash, true)
}

2.1 dsc.manage

dsc.manage方法中不区分新旧daemonset版本的pod,主要是用于保证daemonset的pod运行在每一个合适条件的node上,在合适的node上没有daemonset的pod时创建pod,且把不符合条件的node上的daemonset pod删除掉。

主要逻辑:

(1)调用dsc.getNodesToDaemonPods,根据daemonset的Selector获取daemonset的所有pod,然后返回pod与node的对应关联关系map;

(2)遍历前面获取到的node列表,执行dsc.podsShouldBeOnNode,根据pod是否指定了nodeName、nodeSelector、ToleratesNodeTaints等,以及node对象的相关信息来做比对,来确定在某个node上是否已经存在daemonset对应的pod,以及是要为该daemonset创建pod还是删除pod;

(3)调用getUnscheduledPodsWithoutNode,将pod的nodeName与前面获取到的node列表比对,将nodeName不存在的pod加入到要被删除的pod列表中;

(4)调用dsc.syncNodes,根据前面获取到的要创建的pod的node列表以及要删除的pod列表,做相应的创建、删除pod的操作。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
// Find out the pods which are created for the nodes by DaemonSet.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
} // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds) if err != nil {
continue
} nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
} // Remove unscheduled pods assigned to not existing nodes when daemonset pods are scheduled by scheduler.
// If node doesn't exist then pods are never scheduled and can't be deleted by PodGCController.
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...) // Label new pods using the hash label value of the current history when creating them
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
} return nil
}

2.1.1 dsc.podsShouldBeOnNode

dsc.podsShouldBeOnNode方法用于判断一个node上是否需要运行daemonset pod,方法返回nodesNeedingDaemonPods与podsToDelete,分别代表需要运行daemonset pod的node、需要被删除的pod列表。

主要逻辑:

(1)调用dsc.nodeShouldRunDaemonPod,返回shouldSchedule与shouldContinueRunning,分别代表daemonset pod是否应该调度到某node、某node上的daemonset pod是否可以继续运行;

(2)当shouldSchedule为true,即pod应该调度到某node,但现在不存在时,将该node添加到nodesNeedingDaemonPods;

(3)当shouldContinueRunning为true,找出在该node上还在运行没有退出的daemonset pod列表,然后按照pod创建时间排序,只保留最新创建的pod,其余的加入到podsToDelete;

(4)当shouldContinueRunning为false,即daemonset pod不应继续在某node上运行,且现在该node已经存在该daemonset pod时,将node上该daemonset的所有pod都加入到podsToDelete;

(5)返回nodesNeedingDaemonPods与podsToDelete,分别代表需要运行daemonset pod的node、需要被删除的pod列表。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) podsShouldBeOnNode(
node *v1.Node,
nodeToDaemonPods map[string][]*v1.Pod,
ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, err error) { _, shouldSchedule, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return
} daemonPods, exists := nodeToDaemonPods[node.Name] switch {
case shouldSchedule && !exists:
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
case shouldContinueRunning:
// If a daemon pod failed, delete it
// If there's non-daemon pods left on this node, we will create it in the next sync loop
var daemonPodsRunning []*v1.Pod
for _, pod := range daemonPods {
if pod.DeletionTimestamp != nil {
continue
}
if pod.Status.Phase == v1.PodFailed {
// This is a critical place where DS is often fighting with kubelet that rejects pods.
// We need to avoid hot looping and backoff.
backoffKey := failedPodsBackoffKey(ds, node.Name) now := dsc.failedPodsBackoff.Clock.Now()
inBackoff := dsc.failedPodsBackoff.IsInBackOffSinceUpdate(backoffKey, now)
if inBackoff {
delay := dsc.failedPodsBackoff.Get(backoffKey)
klog.V(4).Infof("Deleting failed pod %s/%s on node %s has been limited by backoff - %v remaining",
pod.Namespace, pod.Name, node.Name, delay)
dsc.enqueueDaemonSetAfter(ds, delay)
continue
} dsc.failedPodsBackoff.Next(backoffKey, now) msg := fmt.Sprintf("Found failed daemon pod %s/%s on node %s, will try to kill it", pod.Namespace, pod.Name, node.Name)
klog.V(2).Infof(msg)
// Emit an event so that it's discoverable to users.
dsc.eventRecorder.Eventf(ds, v1.EventTypeWarning, FailedDaemonPodReason, msg)
podsToDelete = append(podsToDelete, pod.Name)
} else {
daemonPodsRunning = append(daemonPodsRunning, pod)
}
}
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
// Sort the daemon pods by creation time, so the oldest is preserved.
if len(daemonPodsRunning) > 1 {
sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
for i := 1; i < len(daemonPodsRunning); i++ {
podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
}
}
case !shouldContinueRunning && exists:
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
for _, pod := range daemonPods {
if pod.DeletionTimestamp != nil {
continue
}
podsToDelete = append(podsToDelete, pod.Name)
}
} return nodesNeedingDaemonPods, podsToDelete, nil
}
dsc.nodeShouldRunDaemonPod

关于dsc.nodeShouldRunDaemonPod方法,不做展开分析,它主要是调用dsc.simulate执行Predicates预选算法来检查某个node 是否满足pod的运行条件,如果预选失败,则根据失败信息,返回wantToRun、shouldSchedule、shouldContinueRunning,分别代表node与pod的selector、taints 等是否匹配(不考虑node资源是否充足)、daemonset pod是否应该调度到某node、某node上的daemonset pod是否可以继续运行,预选成功则全都返回true。

2.1.2 dsc.syncNodes

dsc.syncNodes是daemonset controller对pod进行创建和删除操作的方法。

该方法也涉及到expectations机制,与replicaset controller中的expectations机制作用一致,使用上也基本一致,忘记的可以回头看下replicaset controller分析中对expectations机制的分析,这里不再对expectations机制展开分析。

主要逻辑:

(1)计算要创建、删除pod的数量,上限为dsc.burstReplicas(250),即每一次对daemonset对象的同步操作,能创建/删除的pod数量上限为250,超出的部分需要在下一次同步操作才能进行;

(2)调用dsc.expectations.SetExpectations,设置expectations;

(3)调用util.CreatePodTemplate,计算并获取要创建的podTemplate;

(4)先进行pod的创建操作:pod的创建与replicaset controller创建pod类似,使用了慢开始算法,分多批次进行创建,第一批创建1个pod,第二批创建2个pod,第三批创建4个pod,以2倍往下依次执行,直到达到期望为止;而每一批次的创建,会拉起与要创建pod数量相等的goroutine,每个goroutine负责创建一个pod,并使用WaitGroup等待该批次的所有创建任务完成,再进行下一批次的创建;

(4)再进行pod的删除操作:对于每个要删除的pod,都拉起一个goroutine来做删除操作,并使用WaitGroup等待所有goroutine完成。

// pkg/controller/daemon/daemon_controller.go
func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error {
// We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
} createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete) if createDiff > dsc.burstReplicas {
createDiff = dsc.burstReplicas
}
if deleteDiff > dsc.burstReplicas {
deleteDiff = dsc.burstReplicas
} dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff) // error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make(chan error, createDiff+deleteDiff) klog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{}
// If the returned error is not nil we have a parse error.
// The controller handles this via the hash.
generation, err := util.GetTemplateGeneration(ds)
if err != nil {
generation = nil
}
template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would
// likely all fail with the same error. For example a project with a
// low quota that attempts to create a large number of pods will be
// prevented from spamming the API service with the pod create requests
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done() podTemplate := template.DeepCopy()
// The pod's NodeAffinity will be updated to make sure the Pod is bound
// to the target node by default scheduler. It is safe to do so because there
// should be no conflicting node affinity with the target node.
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind)) if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
// this error since all subsequent creations will fail.
return
}
}
if err != nil {
klog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
createWait.Wait()
// any skipped pods that we never attempted to start shouldn't be expected.
skippedPods := createDiff - (batchSize + pos)
if errorCount < len(errCh) && skippedPods > 0 {
klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for set %q/%q", skippedPods, ds.Namespace, ds.Name)
dsc.expectations.LowerExpectations(dsKey, skippedPods, 0)
// The skipped pods will be retried later. The next controller resync will
// retry the slow start process.
break
}
} klog.V(4).Infof("Pods to delete for daemon set %s: %+v, deleting %d", ds.Name, podsToDelete, deleteDiff)
deleteWait := sync.WaitGroup{}
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
klog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.DeletionObserved(dsKey)
errCh <- err
utilruntime.HandleError(err)
}
}(i)
}
deleteWait.Wait() // collect errors if any for proper reporting/retry logic in the controller
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}

2.2 dsc.rollingUpdate

dsc.rollingUpdate方法主要用于处理daemonset对象的滚动更新处理,根据配置的滚动更新配置,删除旧的pod(pod的创建操作在dsc.manage方法中进行)。

主要逻辑:

(1)调用dsc.getNodesToDaemonPods,获取daemonset所属pod与node的对应关联关系map;

(2)调用dsc.getAllDaemonSetPods,获取所有的旧版本daemonset的pod;

(3)调用dsc.getUnavailableNumbers,根据daemonset的滚动更新策略配置获取maxUnavailable值,再获取numUnavailable值,numUnavailable代表在符合条件的node节点中,没有daemonset对应的pod或者pod处于Unavailable状态的node数量;

(4)调用util.SplitByAvailablePods,将旧版本daemonset的所有pod分成oldAvailablePods列表,以及oldUnavailablePods列表;

(5)定义一个字符串数组oldPodsToDelete,用于储存准备要删除的pod;

(6)将全部oldUnavailablePods加入到oldPodsToDelete数组中;

(7)遍历oldAvailablePods列表,当numUnavailable小于maxUnavailable值时,将pod加入到oldPodsToDelete数组中,且numUnavailable值加一;

(8)调用dsc.syncNodes,将oldPodsToDelete数组中的pod删除。

// pkg/controller/daemon/update.go
func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error {
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
} _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash)
maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods)
if err != nil {
return fmt.Errorf("couldn't get unavailable numbers: %v", err)
}
oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods) // for oldPods delete all not running pods
var oldPodsToDelete []string
klog.V(4).Infof("Marking all unavailable old pods for deletion")
for _, pod := range oldUnavailablePods {
// Skip terminating pods. We won't delete them again
if pod.DeletionTimestamp != nil {
continue
}
klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
} klog.V(4).Infof("Marking old pods for deletion")
for _, pod := range oldAvailablePods {
if numUnavailable >= maxUnavailable {
klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable)
break
}
klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name)
oldPodsToDelete = append(oldPodsToDelete, pod.Name)
numUnavailable++
}
return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
}

2.3 dsc.updateDaemonSetStatus

dsc.updateDaemonSetStatus方法负责根据现存daemonset pod的部署情况以及pod的状态、node是否满足pod运行条件等信息,来更新daemonset的status状态值,这里不对代码展开分析,只分析一下daemonset的status中各个字段的意思。

(1)currentNumberScheduled: 已经调度了daemonset pod的节点数量;

(2)desiredNumberScheduled: 期望调度daemonset pod的节点数量;

(3)numberMisscheduled:不需要调度daemonset pod但已经调度完成了的节点数量;

(4)numberAvailable: pod状态达到Available的数量(pod达到Ready状态MinReadySeconds时间后,就认为达到了Available状态);

(5)numberReady: pod状态达到Ready的数量;

(6)numberUnavailable: desiredNumberScheduled - numberAvailable;

(7)updatedNumberScheduled: 已经调度了最新版本daemonset pod的节点数量。

总结

daemonset controller创建 pod 的流程与 replicaset controller 创建 pod 的流程是相似的,都使用了 expectations 机制并且限制了在一次调谐过程中最多创建或删除的 pod 数量。daemonset的更新方式与 statefulset 一样包含 OnDelete 和 RollingUpdate(滚动更新) 两种,OnDelete 方式需要手动删除对应的 pod,然后daemonset controller才会创建出新的pod,而 RollingUpdate 方式与 statefulset 和 deployment 有所区别, RollingUpdate方式更新时是按照先删除pod再创建pod的顺序进行,不像deployment那样可以先创建出新的pod再删除旧的pod。

daemonset controller架构

daemonset controller的大致组成和处理流程如下图,daemonset controller对daemonset、pod、node、ControllerRevision对象注册了event handler,当有事件时,会watch到然后将对应的daemonset对象放入到queue中,然后syncDaemonset方法为daemonset controller调谐daemonset对象的核心处理逻辑所在,从queue中取出daemonset对象,做调谐处理。

daemonset controller核心处理逻辑

daemonset controller的核心处理逻辑是调谐daomonset对象,使得daemonset在合适node上完成pod的创建、在不合适node上完成pod的删除,触发滚动更新时按照配置的滚动更新策略配置来删除旧的pod、创建新的pod,并根据历史版本限制配置清理daemonset的历史版本,最后更新daemonset对象的status状态。

daemonset controller创建pod算法

daemonset controller创建pod的算法与replicaset controller创建pod的算法几乎相同,按1、2、4、8...的递增趋势分多批次进行(每次调谐中创建pod的数量上限为250个,超过上限的会在下次调谐中再创建),若某批次创建pod有失败的(如apiserver限流,丢弃请求等,注意:超时除外,因为initialization处理有可能超时),则后续批次的pod创建不再进行,需等待该daemonset对象下次调谐时再触发该pod创建算法,进行pod的创建,直至所有满足条件的node上都有该daemonset的pod。

daemonset controller删除pod算法

daemonset controller删除pod的算法是,拉起与要删除的pod数量相同的goroutine来删除pod(每次调谐中删除pod的数量上限为250),并等待所有goroutine执行完成。删除pod有失败的(如apiserver限流,丢弃请求)或超过250上限的部分,需等待该daemonset对象下次调谐时再触发该pod删除算法,进行pod的删除,直至所有期望被删除的pod都被删除。

最新文章

  1. Asp.Net WebApi开发注意
  2. Mesh Data Structure in OpenCascade
  3. CodeForces 279D The Minimum Number of Variables 题解
  4. sax xpath读取xml字符串
  5. 1千万英国用户被Cryptolocker勒索软件瞄准
  6. 使用Microsoft Fakes进行单元测试(2)
  7. 【PHP的异常处理【完整】】
  8. XStream的例子
  9. Linux 下的另一个密码破解工具medusa
  10. linux 命令——文件管理 ls
  11. web文档类型DOCTYPE html很重要
  12. Python之路第六天,基础(8)-反射
  13. Linux上传下载文件命令
  14. html5 拖拽文件到页面实现上传
  15. Openlayers修改矢量要素并且可捕捉
  16. 虚拟机Ubuntu无法上网问题解决过程
  17. 一个例子理解break和continue的区别
  18. 用shell脚本挂载linux主机拷贝相应文件copy.sh
  19. xampp——apache服务启动问题(端口占用)
  20. oracle(sql)基础篇系列(五)——PLSQL、游标、存储过程、触发器

热门文章

  1. idea给类增加注释
  2. git连接远程仓库
  3. Codeforces 1264D - Beautiful Bracket Sequence(组合数学)
  4. Codeforces 585E - Present for Vitalik the Philatelist(简单莫反+狄利克雷前缀和)
  5. 65-Binary Tree Zigzag Level Order Traversal
  6. 37-Invert Binary Tree
  7. excel--CLEAN()函数,解决为什么看着相同的字符串但是len()长度不同
  8. kubernetes部署Docker私有仓库Registry
  9. Hbase与Phoenix整合
  10. Oracle中的job(定时任务)