之前了解了client-go中的架构设计,也就是 tools/cache 下面的一些概念,那么下面将对informer进行分析

Controller

在client-go informer架构中存在一个 controller ,这个不是 Kubernetes 中的Controller组件;而是在 tools/cache 中的一个概念,controller 位于 informer 之下,Reflector 之上。code

Config

从严格意义上来讲,controller 是作为一个 sharedInformer 使用,通过接受一个 Config ,而 Reflector 则作为 controller 的 slot。Config 则包含了这个 controller 里所有的设置。

type Config struct {
Queue // DeltaFIFO
ListerWatcher // 用于list watch的
Process ProcessFunc // 定义如何从DeltaFIFO中弹出数据后处理的操作
ObjectType runtime.Object // Controller处理的对象数据,实际上就是kubernetes中的资源
FullResyncPeriod time.Duration // 全量同步的周期
ShouldResync ShouldResyncFunc // Reflector通过该标记来确定是否应该重新同步
RetryOnError bool
}

controller

然后 controller 又为 reflertor 的上层

type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
} type Controller interface {
// controller 主要做两件事,
// 1. 构建并运行 Reflector,将listerwacther中的泵压到queue(Delta fifo)中
// 2. Queue用Pop()弹出数据,具体的操作是Process
// 直到 stopCh 不阻塞,这两个协程将退出
Run(stopCh <-chan struct{})
HasSynced() bool // 这个实际上是从store中继承的,标记这个controller已经
LastSyncResourceVersion() string
}

controller 中的方法,仅有一个 Run()New();这意味着,controller 只是一个抽象的概念,作为 Reflector, Delta FIFO 整合的工作流

controller 则是 SharedInformer 了。

Queue

这里的 queue 可以理解为是一个具有 Pop() 功能的 Indexer ;而 Pop() 的功能则是 controller 中的一部分;也就是说 queue 是一个扩展的 StoreStore 是不具备弹出功能的。

type Queue interface {
Store
// Pop会阻塞等待,直到有内容弹出,删除对应的值并处理计数器
Pop(PopProcessFunc) (interface{}, error) // AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
AddIfNotPresent(interface{}) error // HasSynced returns true if the first batch of keys have all been
// popped. The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
HasSynced() bool
Close() // 关闭queue
}

而弹出的操作是通过 controller 中的 processLoop() 进行的,最终走到Delta FIFO中进行处理。

通过忙等待去读取要弹出的数据,然后在弹出前 通过PopProcessFunc 进行处理

func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

DeltaFIFO.Pop()

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
} f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
err := process(item) // 进行处理
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item) // 如果失败,再重新加入到队列中
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}

Informer

通过对 Reflector, Store, Queue, ListerWatcherProcessFunc, 等的概念,发现由 controller 所包装的起的功能并不能完成通过对API的动作监听,并通过动作来处理本地缓存的一个能力;这个情况下诞生了 informer 严格意义上来讲是 sharedInformer

func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// This will hold incoming changes. Note how we pass clientState in as a
// KeyLister, that way resync operations will result in the correct set
// of update/delete deltas.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
}) cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false, Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}

newInformer是位于 tools/cache/controller.go 下,可以看出,这里面并没有informer的概念,这里通过注释可以看到,newInformer实际上是一个提供了存储和事件通知的informer。他关联的 queue 则是 Delta FIFO,并包含了 ProcessFunc, Store 等 controller的概念。最终对外的方法为 NewInformer()

func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// This will hold the client state, as we know it.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc) return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
} type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

可以看到 NewInformer() 就是一个带有 Store功能的controller,通过这些可以假定出,Informer 就是controller ,将queue中相关操作分发给不同事件处理的功能

SharedIndexInformer

shareInformer 为客户端提供了与apiserver一致的数据对象本地缓存,并支持多事件处理程序的informer,而 shareIndexInformer 则是对shareInformer 的扩展

type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler(handler ResourceEventHandler)
// AddEventHandlerWithResyncPeriod adds an event handler to the
// shared informer with the requested resync period; zero means
// this handler does not care about resyncs. The resync operation
// consists of delivering to the handler an update notification
// for every object in the informer's local cache; it does not add
// any interactions with the authoritative storage. Some
// informers do no resyncs at all, not even for handlers added
// with a non-zero resyncPeriod. For an informer that does
// resyncs, and for each handler that requests resyncs, that
// informer develops a nominal resync period that is no shorter
// than the requested period but may be longer. The actual time
// between any two resyncs may be longer than the nominal period
// because the implementation takes time to do work and there may
// be competing load and scheduling noise.
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore returns the informer's local cache as a Store.
GetStore() Store
// GetController is deprecated, it does nothing useful
GetController() Controller
// Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed.
Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
HasSynced() bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion() string
}

SharedIndexInformer 是对SharedInformer的实现,可以从结构中看出,SharedIndexInformer 大致具有如下功能:

  • 索引本地缓存
  • controller,通过list watch拉取API并推入 Deltal FIFO
  • 事件的处理
type sharedIndexInformer struct {
indexer Indexer // 具有索引的本地缓存
controller Controller // controller processor *sharedProcessor // 事件处理函数集合
cacheMutationDetector MutationDetector listerWatcher ListerWatcher
objectType runtime.Object
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
blockDeltas sync.Mutex
}

而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的运行过程

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: s.indexer,
EmitDeltaTypeReplaced: true,
}) cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, // process 弹出时操作的流程
} func() {
s.startedLock.Lock()
defer s.startedLock.Unlock() s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}() // Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run) // 启动事件处理函数 defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
s.controller.Run(stopCh) // 启动controller,controller会启动Reflector和fifo的Pop()
}

而在操作Delta FIFO中可以看到,做具体操作时,会将动作分发至对应的事件处理函数中,这个是informer初始化时对事件操作的函数

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock() for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
} isSync := false
switch {
case d.Type == Sync:
isSync = true
case d.Type == Replaced:
if accessor, err := meta.Accessor(d.Object); err == nil {
if oldAccessor, err := meta.Accessor(old); err == nil {
isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
}
}
}
// 事件的分发
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 事件的分发
s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

事件处理函数 processor

启动informer时也会启动注册进来的事件处理函数;processor 就是这个事件处理函数。

run() 函数会启动两个 listener,j监听事件处理业务函数 listener.run 和 事件的处理

wg.StartWithChannel(processorStopCh, s.processor.run)

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看出,就是拿到的事件,根据注册的到informer的事件函数进行处理

func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
for next := range p.nextCh { // 消费事件
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
close(stopCh)
}, 1*time.Second, stopCh)
}

informer中的事件的设计

了解了informer如何处理事件,就需要学习下,informer的事件系统设计 prossorListener

事件的添加

当在handleDelta时,会分发具体的事件

// 事件的分发
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)

此时,事件泵 Pop() 会根据接收到的事件进行处理

// run() 时会启动一个事件泵
p.wg.Start(listener.pop) func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification: // 这里实际上是一个阻塞的等待
// 单向channel 可能不会走到这步骤
var ok bool
// deltahandle 中 distribute 会将事件添加到addCh待处理事件中
// 处理完事件会再次拿到一个事件
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
// 处理 分发过来的事件 addCh
case notificationToAdd, ok := <-p.addCh: // distribute分发的事件
if !ok {
return
}
// 这里代表第一次,没有任何事件时,或者上面步骤完成读取
if notification == nil { // 就会走这里
notification = notificationToAdd
nextCh = p.nextCh
} else {
// notification否则代表没有处理完,将数据再次添加到待处理中
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}

该消息事件的流程图为

通过一个简单实例来学习client-go中的消息通知机制

package main

import (
"fmt"
"time" "k8s.io/utils/buffer"
) var nextCh1 = make(chan interface{})
var addCh = make(chan interface{})
var stopper = make(chan struct{})
var notification interface{}
var pendding = *buffer.NewRingGrowing(2) func main() {
// pop
go func() {
var nextCh chan<- interface{}
var notification interface{}
//var n int
for {
fmt.Println("busy wait")
fmt.Println("entry select", notification)
select {
// 初始时,一个未初始化的channel,nil,形成一个阻塞(单channel下是死锁)
case nextCh <- notification:
fmt.Println("entry nextCh", notification)
var ok bool
// 读不到数据代表已处理完,置空锁
notification, ok = pendding.ReadOne()
if !ok {
fmt.Println("unactive nextch")
nextCh = nil
}
// 事件的分发,监听,初始时也是一个阻塞
case notificationToAdd, ok := <-addCh:
fmt.Println(notificationToAdd, notification)
if !ok {
return
}
// 线程安全
// 当消息为空时,没有被处理
// 锁为空,就分发数据
if notification == nil {
fmt.Println("frist notification nil")
notification = notificationToAdd
nextCh = nextCh1 // 这步骤等于初始化了局部的nextCh,会触发上面的流程
} else {
// 在第三次时,会走到这里,数据进入环
fmt.Println("into ring", notificationToAdd)
pendding.WriteOne(notificationToAdd)
}
}
}
}()
// producer
go func() {
i := 0
for {
i++
if i%5 == 0 {
addCh <- fmt.Sprintf("thread 2 inner -- %d", i)
time.Sleep(time.Millisecond * 9000)
} else {
addCh <- fmt.Sprintf("thread 2 outer -- %d", i)
time.Sleep(time.Millisecond * 500)
}
}
}()
// subsriber
go func() {
for {
for next := range nextCh1 {
time.Sleep(time.Millisecond * 300)
fmt.Println("consumer", next)
}
}
}()
<-stopper
}

总结,这里的机制类似于线程安全,进入临界区的一些算法,临界区就是 nextChnotification 就是保证了至少有一个进程可以进入临界区(要么分发事件,要么生产事件);nextChnextCh1 一个是局部管道一个是全局的,管道未初始化代表了死锁(阻塞);当有消息要处理时,会将局部管道 nextCh 赋值给 全局 nextCh1 此时相当于解除了分发的步骤(对管道赋值,触发分发操作);ringbuffer 实际上是提供了一个对 notification 加锁的操作,在没有处理的消息时,需要保障 notification 为空,同时也关闭了流程 nextCh 的写入。这里主要是考虑对golang中channel的用法

最新文章

  1. 最全的Windows Azure学习教程汇总
  2. CentOS7下安装Mysql和Memcached 以及 使用C#操作Mysql和Memcached
  3. 【MySQL】探究之常用SQL
  4. NGUI悬浮菜单思路实践
  5. install usb serial
  6. python join字符连接函数的使用方法
  7. 子查询有OR无法展开,改写成union
  8. REST API出错响应的设计(转)
  9. seajs简记
  10. The message filter indicated that the application is busy. (Exception from HRESULT: 0x8001010A (RPC_E_SERVERCALL_RETRYLATER))
  11. VS下载地址
  12. 【转载】接触Matlab10年后的一个总结,随时使用Matlab要掌握的一些要点
  13. JSP第六篇【自定义标签之传统标签】
  14. 文本可视化[二]——《今生今世》人物关系可视化python实现
  15. Centos环境下给PHP7.0安装yaf扩展
  16. NoSQL还是SQL?这一篇讲清楚
  17. iOS - 解决 Cocoapods 第三方库下载不下来
  18. numpy数组(4)-二维数组
  19. 说说最小生成树(Minimum Spanning Tree)
  20. JNative用法注意事项

热门文章

  1. html5的video元素学习手札
  2. 该如何选择 background-image 和 img 标签
  3. React 可视化开发工具 shadow-widget 的非可视开发方法
  4. 从ES6重新认识JavaScript设计模式(三): 建造者模式
  5. 牛客网-剑指Offer 二维数组中的查找
  6. Java中 i++和++i 的区别
  7. Python入门-字符串格式化
  8. Docker-操作容器1
  9. spring-xml实现aop-通知的种类
  10. 使用SQL的FOR XML PATH(&#39;&#39;)将字段用逗号隔开