前言

前文介绍Pod无论是启动时还是关闭时,处理是由kubelet的主循环syncLoop开始执行逻辑,而syncLoop的入参是一条传递变更Pod的通道,显然syncLoop往后的逻辑属于消费者一方,如何发现Pod的变更往通道里面传递变更消息的一方目前还没明朗,故本次来看一下kubelet是如何发现Pod的变更的。

调用链回溯

syncLoop的通道参数updates是经过在startKubelet函数(代码位于/cmd/kubelet/app/server.go)传入,

func startKubelet(...){
go k.Run(podCfg.Updates())
}

podCfg.Updates()方法只返回PodConfig对象的updates字段,updates通道是在构造PodConfig对象时创建出来的,它在构造PodStorage时传作入参,而后者又在构造Mux对象时传作入参,从而推断往updates通道传入Pod变更的跟Mux和Storage都会有关系,需要理清这三者间的关系。

代码位于/pkg/kubelet/config/config.go

func (c *PodConfig) Updates() <-chan kubetypes.PodUpdate {
return c.updates
} //PodConfig的结构定义
type PodConfig struct {
pods *podStorage
mux *config.Mux // the channel of denormalized changes passed to listeners
updates chan kubetypes.PodUpdate // contains the list of all configured sources
sourcesLock sync.Mutex
sources sets.String
} //PodConfig的构造函数
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) *PodConfig {
updates := make(chan kubetypes.PodUpdate, 50)
storage := newPodStorage(updates, mode, recorder)
podConfig := &PodConfig{
pods: storage,
mux: config.NewMux(storage),
updates: updates,
sources: sets.String{},
}
return podConfig
}

PodConfig相关对象内部结构

podStorage

podStorage的构造函数及结构定义如下,由结构名得知它主要是负责pod的存储,且它的成员中有一个用于存储pod对象的map,查看了对updates通道的引用是往里面塞入对象,主要通过PodStorage的Merge方法传入

代码位于/pkg/kubelet/config/config.go

func newPodStorage(updates chan<- kubetypes.PodUpdate, mode PodConfigNotificationMode, recorder record.EventRecorder) *podStorage {
return &podStorage{
pods: make(map[string]map[types.UID]*v1.Pod),
mode: mode,
updates: updates,
sourcesSeen: sets.String{},
recorder: recorder,
}
} type podStorage struct {
podLock sync.RWMutex
// map of source name to pod uid to pod reference
pods map[string]map[types.UID]*v1.Pod
mode PodConfigNotificationMode // ensures that updates are delivered in strict order
// on the updates channel
updateLock sync.Mutex
updates chan<- kubetypes.PodUpdate // contains the set of all sources that have sent at least one SET
sourcesSeenLock sync.RWMutex
sourcesSeen sets.String // the EventRecorder to use
recorder record.EventRecorder
}

暂且不管Merge方法里面的逻辑,Merge方法是podStorage实现Merger接口的方法,而调用Merger接口的地方,有且只有Mux的listen方法

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
for update := range listenChannel {
m.merger.Merge(source, update)
}
}

下面也了解一下Mux这个结构

Mux

Mux的结构和构造函数如下所示

type Mux struct {
// Invoked when an update is sent to a source.
merger Merger // Sources and their lock.
sourceLock sync.RWMutex
// Maps source names to channels
sources map[string]chan interface{}
} func NewMux(merger Merger) *Mux {
mux := &Mux{
sources: make(map[string]chan interface{}),
merger: merger,
}
return mux
}

在Mux的结构的成员及其构造函数得知podStorage在Mux这里只充当一个Merger的作用,此外它还有一个sources的map用于记录通道信息,作用待后续了解。

Mux的listen方法也只有一个地方用到,就是Channel方法,观其大意是用于记录每个新来的source到本地的sources这个map中,新来的source会为其开辟一条新的通道,每当收到内容就会调用merger的Merger进行合并操作

func (m *Mux) Channel(source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
m.sourceLock.Lock()
defer m.sourceLock.Unlock()
channel, exists := m.sources[source]
if exists {
return channel
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
return newChannel
}

这个Channel方法也只在PodConfig的Channel方法被调用而已

func (c *PodConfig) Channel(source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
}

再往上溯就来到makePodSourceConfig函数了,这个函数在kubelet启动的流程中,构件kubelet对象的NewMainKubelet函数中被调用,那么Pod变更来源的关键就在这个makePodSourceConfig函数里面了

func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if len(kubeCfg.StaticPodURLHeader) > 0 {
for k, v := range kubeCfg.StaticPodURLHeader {
for i := range v {
manifestURLHeader.Add(k, v[i])
}
}
} // source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder) // define file config source
if kubeCfg.StaticPodPath != "" {
klog.Infof("Adding pod path: %v", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
} // define url config source
if kubeCfg.StaticPodURL != "" {
klog.Infof("Adding pod url %q with HTTP header %v", kubeCfg.StaticPodURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
} var updatechannel chan<- interface{}
if kubeDeps.KubeClient != nil {
klog.Infof("Watching apiserver")
if updatechannel == nil {
updatechannel = cfg.Channel(kubetypes.ApiserverSource)
}
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, updatechannel)
}
return cfg, nil
}

在这里看到Pod有三个来源,且在syncLoop方法也提及updates通道会传递来自file, apiserver和 http三种来源的Pod变更事件,下面则了解这三种Pod的来源

Pod的来源分类

  • FileSource:Kubelet会读取文件里面定义的pod进行创建.常常我们使用来定义kubelet管理的static pod ,从配置参数处获得,作为master则有master几个组件的pod定义
  • HttpSource:通过http Get该manifest url获取到pod的定义,配置中没指定,暂未发现其使用场景
  • ApiSource:通过定义跟kube-apiserver进行通过的kubeclient, 从kube-apiserver中获取需要本节点创建的pod的信息。通常kubelet获得pod的途径

FileSource的路径是在/etc/kubernetes/manifests/

ls /etc/kubernetes/manifests/
etcd-external.yaml kube-apiserver.yaml kube-controller-manager.yaml kube-scheduler.yaml

路径的来源是在kubetel的一个配置文件中获取,配置文件路径通过kubelet的启动参数--config=/var/lib/kubelet/config.yaml,打开这个文件里面有一项就是staticPodPath

cat /var/lib/kubelet/config.yaml|grep staticPodPath
staticPodPath: /etc/kubernetes/manifests

静态Pod

静态 Pod 在指定的节点上由 kubelet 守护进程直接管理,不需要 API 服务器 监管。 与由控制面管理的 Pod(例如,Deployment) 不同;kubelet 监视每个静态 Pod(在它崩溃之后重新启动)。

静态 Pod 永远都会绑定到一个指定节点上的 Kubelet。

kubelet 会尝试通过 Kubernetes API 服务器为每个静态 Pod 自动创建一个 镜像 Pod。 这意味着节点上运行的静态 Pod 对 API 服务来说是可见的,但是不能通过 API 服务器来控制。 Pod 名称将把以连字符开头的节点主机名作为后缀。

从yaml文件中可以得知k8s集群中,etcd,api-server,controller-manager,scheduler这几个控制面的pod都是以静态Pod的形式运行在master节点上。

PodConfig应对3种类型的pod的变更

本地yaml文件

makePodSourceConfig		/pkg/kubelet/kubelet.go
|--config.NewSourceFile /pkg/kubelet/config/file.go
|--config.run()

拿到了channel内部就从指定路径中读取所有yaml,无论是否读到,都会往通道中塞一个数据,而且Op全部都是Set,内部是执行以下方法,开的协程有两个作用,定时List以下目录里面有那些文件更改,更新缓存;另外的startWatch和watchEvents通道就是对路径进行监控,凡是对该路径的文件进行任何修改都会触发,更新缓存。这个操作很像Informer的ListAndWatch。

func (s *sourceFile) run() {
listTicker := time.NewTicker(s.period) go func() {
// Read path immediately to speed up startup.
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
klog.Errorf("Unable to read config path %q: %v", s.path, err)
}
case e := <-s.watchEvents:
if err := s.consumeWatchEvent(e); err != nil {
klog.Errorf("Unable to process watch event: %v", err)
}
}
}
}() s.startWatch()
}

指定URL的yaml

makePodSourceConfig		/pkg/kubelet/kubelet.go
|--config.NewSourceURL /pkg/kubelet/config/http.go
|--config.run()

这里也是跟本地yaml文件的很类似,定期往指定URL处拿pod的yaml,先把他当作单体的pod进行反序列化,不行则当作podList进行反序列化,得到的结果同样以Set形式往通道塞数据。

func (s *sourceURL) run() {
if err := s.extractFromURL(); err != nil {
// Don't log this multiple times per minute. The first few entries should be
// enough to get the point across.
if s.failureLogs < 3 {
klog.Warningf("Failed to read pods from URL: %v", err)
} else if s.failureLogs == 3 {
klog.Warningf("Failed to read pods from URL. Dropping verbosity of this message to V(4): %v", err)
} else {
klog.V(4).Infof("Failed to read pods from URL: %v", err)
}
s.failureLogs++
} else {
if s.failureLogs > 0 {
klog.Info("Successfully read pods from URL.")
s.failureLogs = 0
}
}
}

处理api-server的list&watch

跟api-server对接并没有用client-go的Informer,而是更直接地使用底层的reflector,将list&watch发现有变更的Pod塞到通道里面

代码位于/pkg/kubelet/config/apiserver.go

func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
} // newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
go r.Run(wait.NeverStop)
}

PosStorage的Merge方法

目前已经查明三种Pod变更的来源,但是从源头到syncLoop之间还缺了一环,就是之前先不纠结的PosStorage.Merge方法,因为三个来源的只是把Pod往通道里面塞,但是并没有明确本次变更的Pod是属于哪一种变更类型(新增,修改,删除,移除,调谐)

func (s *podStorage) Merge(source string, change interface{}) error {
s.updateLock.Lock()
defer s.updateLock.Unlock() seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
//传入的change是SET时才会是firstSet
firstSet := !seenBefore && s.sourcesSeen.Has(source) // deliver update notifications
switch s.mode {
///按podStorage只会有这种cast
case PodConfigNotificationIncremental:
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
// Only add reconcile support here, because kubelet doesn't support Snapshot update now.
if len(reconciles.Pods) > 0 {
s.updates <- *reconciles
} ...
} return nil
}

podStorage.Merge是调用它的内部方法merge将变更的pod分成add,update,delete,remove,reconcile几类,然后就按照podStorage的模式来执行后续的操作,而在makePodSourceConfig处构造PodConfig时就指定使用PodConfigNotificationIncremental模式。但也只是把不同类型的Pod变更放入updates通道而已,区分pod变更的类型主要逻辑还是在内部方法merge中

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {

	addPods := []*v1.Pod{}
updatePods := []*v1.Pod{}
deletePods := []*v1.Pod{}
removePods := []*v1.Pod{}
reconcilePods := []*v1.Pod{} pods := s.pods[source]
if pods == nil {
pods = make(map[types.UID]*v1.Pod)
} update := change.(kubetypes.PodUpdate)
switch update.Op {
....
case kubetypes.SET:
klog.V(4).Infof("Setting pods for source %s", source)
s.markSourceSet(source)
// Clear the old map entries by just creating a new map
oldPods := pods
pods = make(map[types.UID]*v1.Pod)
///按set的方式,只有update.Pods的内容才会被保留,原本只有cache才有的pod都会被移除
updatePodsFunc(update.Pods, oldPods, pods)
for uid, existing := range oldPods {
if _, found := pods[uid]; !found {
// this is a delete
removePods = append(removePods, existing)
}
} default:
klog.Warningf("Received invalid update type: %v", update) } s.pods[source] = pods adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source} return adds, updates, deletes, removes, reconciles
}

内部方法merge先从指定来源的缓存中把pod取出来,根据本次变更的类型,执行不一样的操作,但是看之前三个来源都发现update.Op的值都是kubetypes.SET。将原本缓存的pod作为ooldPod,另外再创建一个新的空的集合,调用局部函数updatePodsFunc,执行完毕后则把刚创建的pod集合整个放入缓存中,pod变更类型判定的逻辑在updatePodsFunc里面

//根据oldPods和newPods整合成新的pods用于缓存,同时填充add,update,delete,reconcile四个集合

	updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
//只是对newPods进行去重
filtered := filterInvalidPods(newPods, source, s.recorder)
for _, ref := range filtered {
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[ref.UID]; found {
pods[ref.UID] = existing
//比较两个新pod与缓存pod,如果其他一致,只是status不同,则单纯reconcile
//否则就更新缓存pod,如果有删除时间的(DeletionTimestamp)就执行删除
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
//oldPods里面无的就是新的
recordFirstSeenTime(ref)
pods[ref.UID] = ref
addPods = append(addPods, ref)
}
}

对入参newPods集合先调用filterInvalidPods函数进行按pod的fullname去重,然后尝试从缓存中查找有否UID一样的pod,如果没有则表明是新增操作,放入新增集合中。如果有就是剩下几个类型变更,需要与上一个版本的Pod进行比较方可判定,如果只是status不同,则属于reconcile操作,放入reconcile集合。如果有删除时间的(DeletionTimestamp)属于删除操作,放入删除集合。剩下就是更新操作,放入更新集合。

小结

本篇旨在衔接kubelet启动与kubelet处理pod的启动、关闭之间的逻辑。通过回溯调用链的形式找到pod的变更是如何发现,然后分辨pod变更的类型,最后传递到syncLoop方法针对不同的变更类型执行相应的操作。

其中Pod的来源有三种:本地yaml文件,指定URL上的yaml,还有从api-server中list&watch的结果

三种来源监控任务都由PodConfig中的Mux来承担,它为每种来源类型都开辟一个协程来执行监控任务

当发现有Pod的变更事件就会往通道里传递对象,会传到PodConfig的podStorage里作分辨变更类型的逻辑

最后把区分好变更类型的pod放入PodConfig的updates通道中传递给syncLoop

如要回顾本系列的文章可点击

kubelet源码分析——kubelet简介与启动

kubelet源码分析——启动Pod

kubelet源码分析——关闭Pod

kubelet源码分析——监控Pod变更

最新文章

  1. C++ URLDecode和URLEncode实现——仅限gb2312,非utf8
  2. ORA-00030: User session ID does not exist.
  3. TNS-01201: Listener cannot find executablen
  4. 图片轮换cycle插件的运用
  5. 自定义ContentProvider
  6. mysql 插入语句
  7. css编译工具Sass中混合宏,继承,占位符分别在什么时候使用
  8. Memory Limits for Windows and Windows Server Releases
  9. Ubuntu配置Samba
  10. 【linux 爱好者群】程序猿的那些聊天记录
  11. C语言代码训练(一)
  12. css选择器的优先级问题
  13. 5dfda1332b67817b0f2d7839242021ce&#39;Java数据结构和算法
  14. linux centos 安装Jenkins(非docker方式)
  15. Python练手例子(3)
  16. C#File类常用文件操作以及一个模拟的控制台文件管理系统
  17. APP多开教程
  18. JavaScript -- Window-Name
  19. Android使用动态代理搭建网络模块框架
  20. 《linux内核分析》作业一:分析汇编代码

热门文章

  1. 【mysql】用户和权限管理
  2. 【springcloud】springcloud与springboot的版本对应关系
  3. mfc HackerTools进程令牌设置为debug权限
  4. CentOS7 yum方式安装MySQL5.7 + 远程连接
  5. VMware中安装CentOS Linux release 7.4.1708 (Core)
  6. 二、vue组件化开发(轻松入门vue)
  7. roslaunch保存的log文件没有打印的ERROR信息
  8. system的使用
  9. Python中 sys.argv[]
  10. pyRevit开发:如何创建轴网