excel求和與計算器求和相差0.01(excel求和0.00)
1206
2022-05-30
一 前言
Informer 是 Client-go 中的一個核心工具包,其實就是一個帶有本地緩存和索引機制的、可以注冊 EventHandler 的 client,本地緩存被稱為 Store,索引被稱為 Index。Informer 中主要包含 Controller、Reflector、DeltaFIFO、LocalStore、Lister 和 Processor 六個組件,這篇文章主要從Controller來講,單獨拿Controller來將,注意Informer中的Controller和我們K8s內部傳統的controller不是一個概念。
Informer中的controller來看,processFunc以一個參數單獨穿入NewInformer中,如果有另一個程序需要處理相同的資源,那么就需要另外再創建一個Informer對象,而隊列也無法復用,隊列不能被兩個消費者同時消費,因此在Client-go中又設計有ShareInformer,后續的示例包括K8s的控制器中也都適用的是此類共享型的對象。
二 相關概念
2.1 資源Informer
每一種資源都實現了Informer機制,允許監控不同的資源事件
每一個Informer都會實現Informer和Lister方法
type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister }
2.2 SharedInformer
若同一個資源的Informer被實例化了多次,每個Informer使用一個Reflector,那么會運行過多相同的ListAndWatch,太多重復的序列化和反序列化操作會導致api-server負載過重
SharedInformer可以使同一類資源Informer共享一個Reflector。內部定義了一個map字段,用于存放所有Infromer的字段。
通常會使用informerFactory來管理控制器需要的多個資源對象的informer實例,例如創建一個deployment的Informer
// 創建一個informer factory sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0) // factory已經為所有k8s的內置資源對象提供了創建對應informer實例的方法,調用具體informer實例的Lister或Informer方法 // 就完成了將informer注冊到factory的過程 deploymentLister := sharedInformerFactory.Apps().V1().Deployments().Lister() // 啟動注冊到factory的所有informer kubeInformerFactory.Start(stopCh)
SharedInformer是一個接口,包含添加事件,當有資源變化時,會回掉通知使用者,啟動函數及獲取是否全利卿對象已經同步到本地存儲中。
type SharedInformer interface { // 添加資源事件處理器,當有資源變化時就會通過回調通知使用者 AddEventHandler(handler ResourceEventHandler) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) // 獲取一個 Store 對象 GetStore() Store // 主要是用來將 Reflector 和 DeltaFIFO 組合到一起工作 GetController() Controller // SharedInformer 的核心實現,啟動并運行這個 SharedInformer // 當 stopCh 關閉時候,informer 才會退出 Run(stopCh <-chan struct{}) // 告訴使用者全量的對象是否已經同步到了本地存儲中 HasSynced() bool // 最新同步資源的版本 LastSyncResourceVersion() string } // SharedIndexInformer在SharedInformer基礎上擴展了添加和獲取Indexers的能力 type SharedIndexInformer interface { SharedInformer // 在啟動之前添加 indexers 到 informer 中 AddIndexers(indexers Indexers) error GetIndexer() Indexer }
三 源碼分析
3.1 SharedInformerFactory
SharedInformerFactory 為所有已知 API 組版本中的資源提供共享informer
type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Admissionregistration() admissionregistration.Interface Internal() apiserverinternal.Interface Apps() apps.Interface Autoscaling() autoscaling.Interface Batch() batch.Interface Certificates() certificates.Interface Coordination() coordination.Interface Core() core.Interface Discovery() discovery.Interface Events() events.Interface Extensions() extensions.Interface Flowcontrol() flowcontrol.Interface Networking() networking.Interface Node() node.Interface Policy() policy.Interface Rbac() rbac.Interface Scheduling() scheduling.Interface Storage() storage.Interface }
這里的informer實現是shareIndexInformer NewSharedInformerFactory調用了NewSharedInformerFactoryWithOptions,將返回一個sharedInformerFactory對象
type sharedInformerFactory struct { client kubernetes.Interface //關注的namepace,可以通過WithNamespace Option配置 namespace string tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration customResync map[reflect.Type]time.Duration //針對每種類型資源存儲一個informer,informer的類型是ShareIndexInformer informers map[reflect.Type]cache.SharedIndexInformer // startedInformers is used for tracking which informers have been started. // This allows Start() to be called multiple times safely. startedInformers map[reflect.Type]bool } // SharedInformerFactory 構造方法 func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, // 初始化map informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory }
3.2 Start
啟動factory下的所有informer,其實就是啟動每個informer中的Reflector
// Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() // 啟動informers中所有informer for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } }
3.3 WaitForCacheSync
sharedInformerFactory的WaitForCacheSync將會不斷調用factory持有的所有informer的HasSynced方法,直到返回true
而informer的HasSynced方法調用的自己持有的controller的HasSynced方法(informer結構持有controller對象,下文會分析informer的結構)
informer中的controller的HasSynced方法則調用的是controller持有的deltaFIFO對象的HasSynced方法
也就說sharedInformerFactory的WaitForCacheSync方法判斷informer的cache是否同步,最終看的是informer中的deltaFIFO是否同步了。
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informers := map[reflect.Type]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer } } return informers }() res := map[reflect.Type]bool{} for informType, informer := range informers { // 調用Informer中的HasSynced res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res }
3.4 InformerFor
只有向factory中添加informer,factory才有意義,obj: informer關注的資源如deployment{} newFunc: 一個知道如何創建指定informer的方法,k8s為每一個內置的對象都實現了這個方法,比如創建deployment的ShareIndexInformer的方法
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() // 獲取對象類型 informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } // 根據用戶傳入的informer生成方法生成informer informer = newFunc(f.client, resyncPeriod) // 將informer添加進SharedIndexInformer f.informers[informerType] = informer return informer }
shareIndexInformer對應的newFunc的實現,client-go中已經為所有內置對象都提供了NewInformerFunc,例如podinformer。
// podinformer 接口,包含Informer/Lister兩個方法 type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister } // podInformer結構體,包含SharedInformerFactory接口和ns type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string } func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil) } // 真正生成SharedIndexInformer,在其中添加了eventHandler func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) } func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *podInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } func (f *podInformer) Lister() v1.PodLister { return v1.NewPodLister(f.Informer().GetIndexer()) }
調用sharedInformerFactory.Core().V1().Pods(),就掉用了podInformer構造函數,生成Podinformer對象。
// Pods returns a PodInformer. func (v *version) Pods() PodInformer { return &podInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} }
cache 包暴露的 Informer 創建方法有以下 5 個:
New
NewInformer
NewIndexerInformer
NewSharedInformer
NewSharedIndexInformer
它們有著不同程度的抽象和封裝,NewSharedIndexInformer 是其中抽象程度最低,封裝程度最高的一個,但即使是 NewSharedIndexInformer,也沒有封裝具體的資源類型,需要接收 ListerWatcher 和 Indexers 作為參數:
func NewSharedIndexInformer( lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers, ) SharedIndexInformer {}
3.5 sharedProcessor
sharedProcessor 有一個processorListener 的集合,可以將一個通知對象分發給它的-。有兩種分發操作。同步分發到偵聽器的子集,(a) 在偶爾調用 shouldResync 時重新計算,(b) 每個偵聽器最初都被放入。非同步分發到每個偵聽器。在SharedInformer中有非常重要的一個屬性sharedProcessor,其包含了processorListener,來通知從sharedProcessor到ResourceEventHandler,其使用兩個無緩沖chanel,兩個goroutines和一個無界環形緩沖區,一個 goroutine 運行 pop(),它使用環形緩沖區中的存儲將通知從 addCh 泵到 nextCh,而 nextCh 沒有跟上。另一個 goroutine 運行 run(),它接收來自 nextCh 的通知并同步調用適當的處理程序方法。
type sharedProcessor struct { // 所有processor是否都已經啟動 listenersStarted bool listenersLock sync.RWMutex // 通用處理列表 listeners []*processorListener // 定時同步的處理列表 syncingListeners []*processorListener clock clock.Clock wg wait.Group } // processorListener 通知 type processorListener struct { nextCh chan interface{} addCh chan interface{} // 添加事件的通道 handler ResourceEventHandler // pendingNotifications 是一個無邊界的環形緩沖區,用于保存所有尚未分發的通知 pendingNotifications buffer.RingGrowing requestedResyncPeriod time.Duration resyncPeriod time.Duration nextResync time.Time resyncLock sync.Mutex }
add:事件添加通過addCh通道接受,notification就是事件,也就是從DeltaFIFO彈出的Deltas,addCh是一個無緩沖通道,所以可以將其看作一個事件分發器,從DeltaFIFO彈出的對象需要逐一送到多個處理器,如果處理器沒有及時處理addCh則會被阻塞。
func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
pop:利用golang select來同時處理多個channel,直到至少有一個case表達式滿足條件為止。
func (p *processorListener) pop() { defer utilruntime.HandleCrash() // 通知run停止函數運行 defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { // 由于nextCh還沒有進行初始化,在此會zuse case nextCh <- notification: // 通知分發, var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // 沒有事件被Pop,則設置nextCh為nil nextCh = nil // Disable 這個 select的 case } // 從p.addCh 中讀取一個事件,消費addCh case notificationToAdd, ok := <-p.addCh: if !ok { // 如果關閉則直接退出 return } // pendingNotifications 為空,則說明沒有notification 去pop if notification == nil { // 把剛剛獲取的事件通過 p.nextCh 發送給處理器 notification = notificationToAdd nextCh = p.nextCh } else { // 上一個事件還沒發送完成(已經有一個通知等待發送),就先放到緩沖通道中 p.pendingNotifications.WriteOne(notificationToAdd) } } } }
run() 和 pop() 是 processorListener 的兩個最核心的函數,processorListener 就是實現了事件的緩沖和處理,在沒有事件的時候可以阻塞處理器,當事件較多是可以把事件緩沖起來,實現了事件分發器與處理器的異步處理。processorListener 的 run() 和 pop() 函數其實都是通過 sharedProcessor 啟動的協程來調用的,所以下面我們再來對 sharedProcessor 進行分析了。首先看下如何添加一個 processorListener:
// 添加處理器 func (p *sharedProcessor) addListener(listener *processorListener) { p.listenersLock.Lock() // 加鎖 defer p.listenersLock.Unlock() // 調用 addListenerLocked 函數 p.addListenerLocked(listener) // 如果事件處理列表中的處理器已經啟動了,則手動啟動下面的兩個協程 // 相當于啟動后了 if p.listenersStarted { // 通過 wait.Group 啟動兩個協程,就是上面我們提到的 run 和 pop 函數 p.wg.Start(listener.run) p.wg.Start(listener.pop) } } // 將處理器添加到處理器的列表中 func (p *sharedProcessor) addListenerLocked(listener *processorListener) { // 添加到通用處理器列表中 p.listeners = append(p.listeners, listener) // 添加到需要定時同步的處理器列表中 p.syncingListeners = append(p.syncingListeners, listener) }
這里添加處理器的函數 addListener 其實在 sharedIndexInformer 中的 AddEventHandler 函數中就會調用這個函數來添加處理器。然后就是事件分發的函數實現:
func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() // sync 表示 obj 對象是否是同步事件對象 // 將對象分發給每一個事件處理器列表中的處理器 if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } }
然后就是將 sharedProcessor 運行起來:
func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() // 遍歷所有的處理器,為處理器啟動兩個后臺協程:run 和 pop 操作 // 后續添加的處理器就是在上面的 addListener 中去啟動的 for _, listener := range p.listeners { p.wg.Start(listener.run) p.wg.Start(listener.pop) } // 標記為所有處理器都已啟動 p.listenersStarted = true }() // 等待退出信號 <-stopCh // 接收到退出信號后,關閉所有的處理器 p.listenersLock.RLock() defer p.listenersLock.RUnlock() // 遍歷所有處理器 for _, listener := range p.listeners { // 關閉 addCh,pop 會停止,pop 會通知 run 停止 close(listener.addCh) } // 等待所有協程退出,就是上面所有處理器中啟動的兩個協程 pop 與 run p.wg.Wait() }
到這里 sharedProcessor 就完成了對 ResourceEventHandler 的封裝處理,當然最終 sharedProcessor 還是在 SharedInformer 中去調用的。
sharedIndexInformer的HandleDeltas處理從deltaFIFO pod出來的增量時,先嘗試更新到本地緩存cache,更新成功的話會調用processor.distribute方法向processor中的listener添加notification,listener啟動之后會不斷獲取notification回調用戶的EventHandler方法。
在此就不在過度展開,更詳細的內容還需要去看k8s源碼。
四 小試牛刀
4.1 添加event事件
添加有AddEventHandler,首次list關注的資源后,后期通過watch判斷資源狀態執行相應操作,通過indexer來緩存中獲取對象。
func Muste(e error) { if e != nil { panic(e) } } func InitClientSet() *kubernetes.Clientset { kubeconfig := filepath.Join(homedir.HomeDir(), ".kube", "config") restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) Muste(err) return kubernetes.NewForConfigOrDie(restConfig) } func main() { clientSet := InitClientSet() // 初始化sharedInformerFactory sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0) // 生成podInformer podInformer := sharedInformerFactory.Core().V1().Pods() // 生成具體informer/indexer informer := podInformer.Informer() indexer := podInformer.Lister() // 添加Event事件處理函數 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ func(obj interface{}) { podObj := obj.(*corev1.Pod) fmt.Printf("AddFunc: %s\n", podObj.GetName()) }, func(oldObj, newObj interface{}) { oldPodObj := oldObj.(*corev1.Pod) newPodObj := newObj.(*corev1.Pod) fmt.Printf("old: %s\n", oldPodObj.GetName()) fmt.Printf("new: %s\n", newPodObj.GetName()) }, func(obj interface{}) { podObj := obj.(*corev1.Pod) fmt.Printf("deleteFunc: %s\n", podObj.GetName()) }, }) stopCh := make(chan struct{}) defer close(stopCh) // 啟動informer sharedInformerFactory.Start(stopCh) // 等待同步完成 sharedInformerFactory.WaitForCacheSync(stopCh) // 利用indexer獲取資源 pods, err := indexer.List(labels.Everything()) Muste(err) for _, items := range pods { fmt.Printf("namespace: %s, name:%s\n", items.GetNamespace(), items.GetName()) } <-stopCh }
4.2 通過Gin實現K8s資源獲取
通過生成deployinformer,通過gin框架暴露訪問入口,通過informer中的indexer來獲取deployment資源。
func main() { clientSet, err := initClientSet() if err != nil { log.Fatal(err) } sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0) // 獲取deployinformeer deployInformer := sharedInformerFactory.Apps().V1().Deployments() _ = deployInformer.Informer() lister := deployInformer.Lister() stopCh := make(chan struct{}) sharedInformerFactory.Start(stopCh) sharedInformerFactory.WaitForCacheSync(stopCh) // http://localhost:8888/deploy?labels[k8s-app]=kube-dns r := gin.Default() r.GET("/deploy", func(c *gin.Context) { var set map[string]string // 通過labels過濾deploy if content, ok := c.GetQueryMap("labels"); ok { set = content } selectQuery := labels.SelectorFromSet(set) podList, err := lister.List(selectQuery) if err != nil { c.JSON(400, gin.H{ "msg": err.Error(), }) return } c.JSON(200, gin.H{ "data": podList, }) }) r.Run(":8888") //<-stopCh }
之前的示例可以獲取制定資源,下面示例將gvr通過gin路徑參數傳遞進來,進而實現一個API可以獲取K8s所有資源。
func main() { clientSet := InitClientSet() sharedInformerFactory := informers.NewSharedInformerFactory(clientSet, 0) stopCh := make(chan struct{}) defer close(stopCh) engin := gin.Default() // 構造一個通用的根據不同group/version/kind 獲取對象類型 //http://localhost:8888/core/v1/pods engin.GET("/:group/:version/:resource", func(c *gin.Context) { g, v, r := c.Param("group"), c.Param("version"), c.Param("resource") // 如果是core組則為空字符串 if g == "core" { g = "" } gvr := schema.GroupVersionResource{g, v, r} genericInformer, _ := sharedInformerFactory.ForResource(gvr) _ = genericInformer.Informer() sharedInformerFactory.Start(stopCh) sharedInformerFactory.WaitForCacheSync(stopCh) items, err := genericInformer.Lister().List(labels.Everything()) if err != nil { c.JSON(500, gin.H{ "msg": err, }) } c.JSON(200, gin.H{ "msg": items, }) }) engin.Run(":8888") }
總結
在以上的內容中,我們分析了 informers 包的源代碼,并簡單測試了informer的原理,更詳細的內容還是需要自己去看源碼領會,后續看完indexer后,我們自己手動去編寫一個Controller來加深對Informer整個流程的理解。
參考鏈接
https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html
其他
【與云原生的故事】有獎征文火熱進行中:https://bbs.huaweicloud.com/blogs/345260
Go Kubernetes 云原生 云端實踐
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發現本站中有涉嫌抄襲或描述失實的內容,請聯系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。