编写一个kubernetes controller

Overview

根据Kuberneter文档对Controller的描述,Controller在kubernetes中是负责协调的组件,根据设计模式可知,controller会不断的你的对象(如Pod)从当前状态与期望状态同步的一个过程。当然Controller会监听你的实际状态与期望状态。

Writing Controllers

package main  import (  "flag"  "fmt"  "os"  "time"   v1 "k8s.io/api/core/v1"  "k8s.io/apimachinery/pkg/fields"  utilruntime "k8s.io/apimachinery/pkg/util/runtime"  "k8s.io/apimachinery/pkg/util/wait"  "k8s.io/client-go/kubernetes"  "k8s.io/client-go/rest"  "k8s.io/client-go/tools/cache"  "k8s.io/client-go/tools/clientcmd"  "k8s.io/client-go/util/homedir"  "k8s.io/client-go/util/workqueue"  "k8s.io/klog" )  type Controller struct {  lister     cache.Indexer  controller cache.Controller  queue      workqueue.RateLimitingInterface }  func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller {  return &Controller{   lister:     lister,   controller: controller,   queue:      queue,  } }  func (c *Controller) processItem() bool {  item, quit := c.queue.Get()  if quit {   return false  }  defer c.queue.Done(item)  fmt.Println(item)  err := c.processWrapper(item.(string))  if err != nil {   c.handleError(item.(string))  }  return true }  func (c *Controller) handleError(key string) {   if c.queue.NumRequeues(key) < 3 {   c.queue.AddRateLimited(key)   return  }  c.queue.Forget(key)  klog.Infof("Drop Object %s in queue", key) }  func (c *Controller) processWrapper(key string) error {  item, exists, err := c.lister.GetByKey(key)  if err != nil {   klog.Error(err)   return err  }  if !exists {   klog.Info(fmt.Sprintf("item %v not exists in cache./n", item))  } else {   fmt.Println(item.(*v1.Pod).GetName())  }  return err }  func (c *Controller) Run(threadiness int, stopCh chan struct{}) {  defer utilruntime.HandleCrash()  defer c.queue.ShutDown()  klog.Infof("Starting custom controller")   go c.controller.Run(stopCh)   if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {   utilruntime.HandleError(fmt.Errorf("sync failed."))   return  }   for i := 0; i < threadiness; i++ {   go wait.Until(func() {    for c.processItem() {    }   }, time.Second, stopCh)  }  <-stopCh  klog.Info("Stopping custom controller") }  func main() {  var (   k8sconfig  *string //使用kubeconfig配置文件进行集群权限认证   restConfig *rest.Config   err        error  )  if home := homedir.HomeDir(); home != "" {   k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")  }  k8sconfig = k8sconfig  flag.Parse()  if _, err := os.Stat(*k8sconfig); err != nil {   panic(err)  }   if restConfig, err = rest.InClusterConfig(); err != nil {   // 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一   restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)   if err != nil {    panic(err)   }  }  restset, err := kubernetes.NewForConfig(restConfig)  lister := cache.NewListWatchFromClient(restset.CoreV1().RESTClient(), "pods", "default", fields.Everything())  queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())  indexer, controller := cache.NewIndexerInformer(lister, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{   AddFunc: func(obj interface{}) {    fmt.Println("add ", obj.(*v1.Pod).GetName())    key, err := cache.MetaNamespaceKeyFunc(obj)    if err == nil {     queue.Add(key)    }    },   UpdateFunc: func(oldObj, newObj interface{}) {    fmt.Println("update", newObj.(*v1.Pod).GetName())    if newObj.(*v1.Pod).Status.Conditions[0].Status == "True" {     fmt.Println("update: the Initialized Status", newObj.(*v1.Pod).Status.Conditions[0].Status)    } else {     fmt.Println("update: the Initialized Status ", newObj.(*v1.Pod).Status.Conditions[0].Status)     fmt.Println("update: the Initialized Reason ", newObj.(*v1.Pod).Status.Conditions[0].Reason)    }     if len(newObj.(*v1.Pod).Status.Conditions) > 1 {     if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" {      fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status)     } else {      fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status)      fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason)     }      if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" {      fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status)     } else {      fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status)      fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason)     }      if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" {      fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status)     } else {      fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status)      fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason)     }    }    },   DeleteFunc: func(obj interface{}) {    fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase)    // 上面是事件函数的处理,下面是对workqueue的操作    key, err := cache.MetaNamespaceKeyFunc(obj)    if err == nil {     queue.Add(key)    }   },  }, cache.Indexers{})   c := NewController(indexer, controller, queue)  stopCh := make(chan struct{})  stopCh1 := make(chan struct{})  c.Run(1, stopCh)  defer close(stopCh)  <-stopCh1 }  

通过日志可以看出,Pod create后的步骤大概为4步:

  • Initialized:初始化好后状态为Pending
  • PodScheduled:然后调度
  • PodCondition
  • Ready
add  netbox default/netbox netbox update netbox status Pending to Pending update: the Initialized Status True update netbox status Pending to Pending update: the Initialized Status True update: the Ready Status  False update: the Ready Reason  ContainersNotReady update: the PodCondition Status  False update: the PodCondition Reason  ContainersNotReady update: the PodScheduled Status True   update netbox status Pending to Running update: the Initialized Status True update: the Ready Status True update: the PodCondition Status True update: the PodScheduled Status True 

大致上与 kubectl describe pod 看到的内容页相似

default-scheduler  Successfully assigned default/netbox to master-machine   Normal  Pulling    85s   kubelet            Pulling image "cylonchau/netbox"   Normal  Pulled     30s   kubelet            Successfully pulled image "cylonchau/netbox"   Normal  Created    30s   kubelet            Created container netbox   Normal  Started    30s   kubelet            Started container netbox 

Reference

controllers.md

商匡云商
Logo
注册新帐户
对比商品
  • 合计 (0)
对比
0
购物车