介绍
假设一个Nginx的QPS(服务器一秒内处理的请求数)上限为500,如果外部访问的QPS达到了600,为了保证服务质量,必须扩容一个Nginx来分摊请求。
在Kubernetes环境中,如果外部请求超过了单个Pod的处理极限,我们则可以增加Pod数量来达到横向扩容的目的。
假设我们的服务是无状态服务,我们来利用kubebuilder来开发一个operator,来模拟我们已上所述的场景。
项目初始化
在开发 Operator 之前我们需要先提前想好我们的 CRD 资源对象,比如我们想要通过下面的 CR 资源来创建我们的Operator :
apiVersion: elasticweb.example.com/v1 kind: ElasticWeb metadata: name: elasticweb-sample namespace: dev spec: image: nginx:1.17.1 # 镜像 port: 30003 # 外部访问的端口 singlePodsQPS: 800 # 单个 Pod 的 QPS totalQPS: 2400 # 总 QPS
首先初始化项目,这里使用kubebuilder来构建我们的脚手架:
$ mkdir app-operator && cd app-operator $ go mod init app-operator $ kubebuilder init --domain example.com kubebuilder init --domain example.com Writing kustomize manifests for you to edit... Writing scaffold for you to edit... ...
脚手架创建完成后,然后定义资源API:
$ kubebuilder create api --group elasticweb --version v1 --kind El asticWeb Create Resource [y/n] y Create Controller [y/n] y Writing kustomize manifests for you to edit... Writing scaffold for you to edit... ...
这样我们的项目初始化就完成了,整体的代码结构如下:
$ tree -L 2 . ├── Dockerfile ├── Makefile ├── PROJECT ├── api │ └── v1 ├── bin │ └── controller-gen ├── config │ ├── crd │ ├── default │ ├── manager │ ├── prometheus │ ├── rbac │ └── samples ├── controllers │ ├── elasticweb_controller.go │ └── suite_test.go ├── go.mod ├── go.sum ├── hack │ └── boilerplate.go.txt └── main.go 12 directories, 10 files
然后根据我们上面设计的 ElasticWeb 这个对象来编辑 Operator 的结构体即可,修改文件 api/v1/elasticweb_types.go
中的 ElasticWebSpec
结构体以及ElasticWebStatus
结构体,ElasticWebStatus
结构体主要用来记录当前集群实际支持的总QPS:
// api/v1/elasticweb_types.go type ElasticWebSpec struct { Image string `json:"image"` Port *int32 `json:"port"` // 单个pod的QPS上限 SinglePodsQPS *int32 `json:"singlePodsQPS"` // 当前整个业务的QPS TotalQPS *int32 `json:"totalQPS,omitempty"` } type ElasticWebStatus struct { // 当前 Kubernetes 集群实际支持的总QPS RealQPS *int32 `json:"realQPS"` }
同样,为了打印的日志方便我们阅读,我们给ElasticWeb
添加一个String
方法:
// api/v1/elasticweb_types.go func (e *ElasticWeb) String() string { var realQPS string if nil == e.Status.RealQPS { realQPS = "" } else { realQPS = strconv.Itoa(int(*e.Status.RealQPS)) } return fmt.Sprintf("Image [%s], Port [%d], SinglePodQPS [%d], TotalQPS [%d], RealQPS [%s]", e.Spec.Image, *e.Spec.Port, *e.Spec.SinglePodsQPS, *e.Spec.TotalQPS, realQPS) }
要注意每次修改完成需要执行make命令重新生成代码:
$ make make /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..." go fmt ./... api/v1/elasticweb_types.go go vet ./... go build -o bin/manager main.go
接下来我们就可以去控制器的 Reconcile 函数中来实现我们自己的业务逻辑了。
业务逻辑
首先在目录 controllers 下面创建一个 resource.go
文件,用来根据我们的ElasticWeb
对象生成对应的deployment
和service
以及更新状态。
// controllers/resource.go package controllers import ( v1 "app-operator/api/v1" "context" "fmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/utils/pointer" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" ) var ( ElasticWebCommonLabelKey = "app" ) const ( // APP_NAME deployment 中 App 标签名 APP_NAME = "elastic-app" // CONTAINER_PORT 容器的端口号 CONTAINER_PORT = 8080 // CPU_REQUEST 单个POD的CPU资源申请 CPU_REQUEST = "100m" // CPU_LIMIT 单个POD的CPU资源上限 CPU_LIMIT = "100m" // MEM_REQUEST 单个POD的内存资源申请 MEM_REQUEST = "512Mi" // MEM_LIMIT 单个POD的内存资源上限 MEM_LIMIT = "512Mi" ) // 根据总QPS以及单个POD的QPS,计算需要多少个Pod func getExpectReplicas(elasticWeb *v1.ElasticWeb) int32 { // 单个pod的QPS singlePodQPS := *elasticWeb.Spec.SinglePodsQPS // 期望的总QPS totalQPS := *elasticWeb.Spec.TotalQPS // 需要创建的副本数 replicas := totalQPS / singlePodQPS if totalQPS%singlePodQPS != 0 { replicas += 1 } return replicas } // CreateServiceIfNotExists 创建service func CreateServiceIfNotExists(ctx context.Context, r *ElasticWebReconciler, elasticWeb *v1.ElasticWeb, req ctrl.Request) error { logger := log.FromContext(ctx) logger.WithValues("func", "createService") svc := &corev1.Service{} svc.Name = elasticWeb.Name svc.Namespace = elasticWeb.Namespace svc.Spec = corev1.ServiceSpec{ Ports: []corev1.ServicePort{ { Name: "http", Port: CONTAINER_PORT, NodePort: *elasticWeb.Spec.Port, }, }, Type: corev1.ServiceTypeNodePort, Selector: map[string]string{ ElasticWebCommonLabelKey: APP_NAME, }, } // 设置关联关系 logger.Info("set reference") if err := controllerutil.SetControllerReference(elasticWeb, svc, r.Scheme); err != nil { logger.Error(err, "SetControllerReference error") return err } logger.Info("start create service") if err := r.Create(ctx, svc); err != nil { logger.Error(err, "create service error") return err } return nil } // CreateDeployment 创建deployment func CreateDeployment(ctx context.Context, r *ElasticWebReconciler, elasticWeb *v1.ElasticWeb) error { logger := log.FromContext(ctx) logger.WithValues("func", "createDeploy") // 计算期待pod的数量 expectReplicas := getExpectReplicas(elasticWeb) logger.Info(fmt.Sprintf("expectReplicas [%d]", expectReplicas)) deploy := &appsv1.Deployment{} deploy.Labels = map[string]string{ ElasticWebCommonLabelKey: APP_NAME, } deploy.Name = elasticWeb.Name deploy.Namespace = elasticWeb.Namespace deploy.Spec = appsv1.DeploymentSpec{ Replicas: pointer.Int32Ptr(expectReplicas), Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ ElasticWebCommonLabelKey: APP_NAME, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ ElasticWebCommonLabelKey: APP_NAME, }, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: APP_NAME, Image: elasticWeb.Spec.Image, Ports: []corev1.ContainerPort{ { Name: "http", ContainerPort: CONTAINER_PORT, Protocol: corev1.ProtocolSCTP, }, }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(CPU_LIMIT), corev1.ResourceMemory: resource.MustParse(MEM_LIMIT), }, Requests: corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(CPU_REQUEST), corev1.ResourceMemory: resource.MustParse(MEM_REQUEST), }, }, }, }, }, }, } // 建立关联,删除web后会将deploy一起删除 logger.Info("set reference") if err := controllerutil.SetControllerReference(elasticWeb, deploy, r.Scheme); err != nil { logger.Error(err, "SetControllerReference error") return err } // 创建Deployment logger.Info("start create deploy") if err := r.Create(ctx, deploy); err != nil { logger.Error(err, "create deploy error") return err } logger.Info("create deploy success") return nil } func UpdateStatus(ctx context.Context, r *ElasticWebReconciler, elasticWeb *v1.ElasticWeb) error { logger := log.FromContext(ctx) logger.WithValues("func", "updateStatus") // 单个pod的QPS singlePodQPS := *elasticWeb.Spec.SinglePodsQPS // pod 总数 replicas := getExpectReplicas(elasticWeb) // 当pod创建完成后,当前系统的QPS为: 单个pod的QPS * pod总数 // 如果没有初始化,则需要先初始化 if nil == elasticWeb.Status.RealQPS { elasticWeb.Status.RealQPS = new(int32) } *elasticWeb.Status.RealQPS = singlePodQPS * replicas logger.Info(fmt.Sprintf("singlePodQPS [%d],replicas [%d],realQPS[%d]", singlePodQPS, replicas, *elasticWeb.Status.RealQPS)) if err := r.Update(ctx, elasticWeb); err != nil { logger.Error(err, "update instance error") return err } return nil }
上面的代码虽然很多,但逻辑很简单,就是根据我们的 ElasticWeb 去构造 deploy
和 service
资源对象,构造完成后,当我们创建 ElasticWeb 的时候就可以在控制器的 Reconcile 函数中去进行逻辑处理了。
同时,我们需要在Reconcile函数注释中添加 deploy
和service
的RBAC声明。
// controllers/elasticweb_controller.go //+kubebuilder:rbac:groups=elasticweb.example.com,resources=elasticwebs,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=elasticweb.example.com,resources=elasticwebs/status,verbs=get;update;patch //+kubebuilder:rbac:groups=elasticweb.example.com,resources=elasticwebs/finalizers,verbs=update //+kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete func (r *ElasticWebReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) instance := &elasticwebv1.ElasticWeb{} if err := r.Get(ctx, req.NamespacedName, instance); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } logger.Info(fmt.Sprintf("instance:%s", instance.String())) // 获取deployment deploy := &appsv1.Deployment{} if err := r.Get(ctx, req.NamespacedName, deploy); err != nil { if errors.IsNotFound(err) { // 如果没有查找到,则需要创建 logger.Info("deploy not exists") // 判断qps的需求,如果qps没有需求,则啥都不做 if *instance.Spec.TotalQPS < 1 { logger.Info("not need deployment") return ctrl.Result{}, nil } // 创建service if err = CreateServiceIfNotExists(ctx, r, instance, req); err != nil { return ctrl.Result{}, err } // 创建Deploy if err := CreateDeployment(ctx, r, instance); err != nil { return ctrl.Result{}, err } // 更新状态 if err := UpdateStatus(ctx, r, instance); err != nil { return ctrl.Result{}, err } return ctrl.Result{}, nil } logger.Error(err, "failed to get deploy") return ctrl.Result{}, err } // 根据单个Pod的QPS计算期望pod的副本 expectReplicas := getExpectReplicas(instance) // 获取当前deployment实际的pod副本 realReplicas := deploy.Spec.Replicas if expectReplicas == *realReplicas { logger.Info("not need to reconcile") return ctrl.Result{}, nil } // 重新赋值 deploy.Spec.Replicas = &expectReplicas // 更新 deploy if err := r.Update(ctx, deploy); err != nil { logger.Error(err, "update deploy replicas error") return ctrl.Result{}, err } // 更新状态 if err := UpdateStatus(ctx, r, instance); err != nil { logger.Error(err, "update status error") return ctrl.Result{}, err } return ctrl.Result{}, nil }
调试
接下来我们首先安装我们的 CRD 对象,让我们的 Kubernetes 系统识别我们的 ElasitcWeb 对象:
$ make install /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases /Users/Christian/Documents/code/negan/app-operator/bin/kustomize build config/crd | kubectl apply -f - customresourcedefinition.apiextensions.k8s.io/elasticwebs.elasticweb.example.com configured
接着运行控制器:
$ make install /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases /Users/Christian/Documents/code/negan/app-operator/bin/controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..." go fmt ./... controllers/elasticweb_controller.go go vet ./... go run ./main.go 1.652941435373431e+09 INFO controller-runtime.metrics Metrics server is starting to listen {"addr": ":8080"} 1.6529414353737469e+09 INFO setup starting manager 1.6529414353739378e+09 INFO Starting server {"path": "/metrics", "kind": "metrics", "addr": "[::]:8080"} 1.652941435373951e+09 INFO Starting server {"kind": "health probe", "addr": "[::]:8081"} 1.6529414353741682e+09 INFO controller.elasticweb Starting EventSource {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb", "source": "kind source: *v1.ElasticWeb"} 1.652941435374196e+09 INFO controller.elasticweb Starting EventSource {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb", "source": "kind source: *v1.Deployment"} 1.652941435374202e+09 INFO controller.elasticweb Starting Controller {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb"} 1.65294143547575e+09 INFO controller.elasticweb Starting workers {"reconciler group": "elasticweb.example.com", "reconciler kind": "ElasticWeb", "worker count": 1}
控制器启动成功后我们就可以去创建我们的CR了,将示例 CR 资源清单修改成下面的 YAML:
apiVersion: elasticweb.example.com/v1 kind: ElasticWeb metadata: name: elasticweb-sample spec: image: nginx:1.17.1 port: 30003 singlePodsQPS: 800 totalQPS: 2400
另外开启一个终端创建上面的资源对象:
$ kubectl apply -f config/samples/elasticweb_v1_elasticweb.yaml elasticweb.elasticweb.example.com/elasticweb-sample created
创建完成后我们可以查看对应的 ElasticWeb对象:
$ kubectl get ElasticWeb NAME AGE elasticweb-sample 40s
对应也会自动创建我们的 Deployment 和 Service 资源清单:
$ kubectl get all NAME READY STATUS RESTARTS AGE pod/elasticweb-sample-6879bdfcf4-42jtc 1/1 Running 0 2m40s pod/elasticweb-sample-6879bdfcf4-sdmbp 1/1 Running 0 2m40s pod/elasticweb-sample-6879bdfcf4-w87tj 1/1 Running 0 2m40s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/elasticweb-sample NodePort 10.100.200.7 <none> 8080:30003/TCP 2m40s service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 14d NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/elasticweb-sample 3/3 3 3 2m40s NAME DESIRED CURRENT READY AGE replicaset.apps/elasticweb-sample-6879bdfcf4 3 3 3 2m40s
优化
现在我们需要对Deploy进行Watch,Service是的创建包含在创建Deploy的逻辑里,所以Deploy出现变化,我们需要重新进行调谐。当然我们只需要Watch被ElasticWeb控制的这部分独享即可。在elasticweb_controller.go
文件中更新SetupWithManager
函数即可:
func (r *ElasticWebReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&elasticwebv1.ElasticWeb{}). Owns(&appsv1.Deployment{}). Complete(r) }
而且我们发现在终端打印的日志中,worker count 为1,这时候我们同样可以更新SetupWithManager
函数:
func (r *ElasticWebReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). WithOptions(controller.Options{MaxConcurrentReconciles: 5}). For(&elasticwebv1.ElasticWeb{}). Owns(&appsv1.Deployment{}). Complete(r) }
同样我们发现输出的日志是时间戳格式,不够直观。 在 main 函数中有个zap的Options
,我们可以在这里面进行设置:
opts := zap.Options{ Development: true, TimeEncoder: zapcore.ISO8601TimeEncoder, }
自定义输出列
我们这里的 Elastic 实例,我们可以使用 kubectl 命令列出这个对象:
$ kubectl get ElasticWeb NAME AGE elasticweb-sample 40s
但是这个信息太过于简单,如果我们想要查看这个对象使用了什么镜像,部署了多少个副本,我们可能还需要通过 kubectl describe
命令去查看,这样就太过于麻烦了。这个时候我们就可以在 CRD 定义的结构体类型中使用 +kubebuilder:printcolumn
这个注释来告诉 kubebuilder 将我们所需的信息添加到 CRD 中,比如我们想要打印使用的镜像,在 +kubebuilder:object:root=true 注释下面添加一列新的注释,如下所示:
/+kubebuilder:object:root=true // +kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image",description="The Docker Image of MyAPP" //+kubebuilder:subresource:status // ElasticWeb is the Schema for the elasticwebs API type ElasticWeb struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec ElasticWebSpec `json:"spec,omitempty"` Status ElasticWebStatus `json:"status,omitempty"` }
printcolumn
注释有几个不同的选项,在这里我们只使用了其中一部分:
- name:这是我们新增的列的标题,由 kubectl 打印在标题中
- type:要打印的值的数据类型,有效类型为 integer、number、string、boolean 和 date
- JSONPath:这是要打印数据的路径,在我们的例子中,镜像 image 属于 spec 下面的属性,所以我们使用 .
spec.image
。需要注意的是 JSONPath 属性引用的是生成的 JSON CRD,而不是引用本地 Go 类。 - description:描述列的可读字符串,目前暂未发现该属性的作用…
新增了注释后,我们需要运行 make install
命令重新生成 CRD 并安装,然后我们再次尝试列出 CRD。
$ kubectl get ElasticWeb NAME IMAGE elasticweb-sample nginx:1.17.1
可以看到现在列出来的数据有一列 IMAGE
的数据了,不过却没有了之前列出来的 AGE
这一列了。这是因为当我们添加自定义列的时候,就不会再显示其他默认的列了(NAME
除外),所以如果我们还想出现 AGE
这一列,我们还需要在 EtcdCluster 的结构体上面添加对应的注释信息,如下所示:
// +kubebuilder:object:root=true // +kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.image",description="The Docker Image of Etcd" // +kubebuilder:printcolumn:name="Port",type="integer",JSONPath=".spec.port",description="container port" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // +kubebuilder:subresource:status // ElasticWeb is the Schema for the elasticwebs API type ElasticWeb struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` Spec ElasticWebSpec `json:"spec,omitempty"` Status ElasticWebStatus `json:"status,omitempty"` }
运行 make install
命令行,再次查看 CRD 数据:
$ kubectl get ElasticWeb NAME IMAGE PORT AGE elasticweb-sample nginx:1.17.1 30003 37m
如果我们还想获取当前应用的状态,同样也可以通过 +kubebuilder:printcolumn
来添加对应的信息,只是状态的数据是通过 .status
在 JSONPath 属性中去获取了。
如果你觉得这里添加了太多的信息,如果我们想隐藏某个字段并只在需要时显示该字段怎么办?
这个时候就需要使用 priority 这个属性了,如果没有配置这个属性,默认值为0,也就是默认情况下列出显示的数据是 priority=0
的列,如果将 priority 设置为大于1的数字,那么则只会当我们使用 -o wide
参数的时候才会显示,比如我们给 Port 这一列添加一个 priority=1
的属性:
// +kubebuilder:printcolumn:name="Port",type="string",priority=1,JSONPath=".spec.image",description="The Docker Image of Etcd"
同样重新运行make install
命令后,再次查看CRD:
$ kubectl get ElasticWeb NAME IMAGE AGE elasticweb-sample nginx:1.17.1 41m $ kubectl get ElasticWeb -o wide NAME IMAGE PORT AGE elasticweb-sample nginx:1.17.1 30003 41m
了解更多详细信息请查看 CRD 文档上的 AdditionalPrinterColumns 字段。
部署
现在我们已经完成了开发工作,并在本地完成了测试工作,这时候我们就需要把我们的operator部署到kubernetes环境中。
首先我们需要修改Dockerfile
文件,需要添加上go mod的代理配置:
# Build the manager binary FROM golang:1.17 as builder WORKDIR /workspace # Copy the Go Modules manifests COPY go.mod go.mod COPY go.sum go.sum # cache deps before building and copying source so that we don't need to re-download as much # and so that source changes don't invalidate our downloaded layer ENV GOPROXY https://goproxy.cn RUN go mod download # Copy the go source COPY main.go main.go COPY api/ api/ COPY controllers/ controllers/ # Build RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o manager main.go # Use distroless as minimal base image to package the manager binary # Refer to https://github.com/GoogleContainerTools/distroless for more details FROM gcr.io/distroless/static:nonroot WORKDIR / COPY --from=builder /workspace/manager . USER 65532:65532 ENTRYPOINT ["/manager"]
接下来就是登陆docker了,我这边使用的docker hub,直接在命令行登陆即可。
$ docker login Authenticating with existing credentials... Login Succeeded Logging in with your password grants your terminal complete access to your account. For better security, log in with a limited-privilege personal access token. Learn more at https://docs.docker.com/go/access-tokens/
登陆成功后,就可以构建镜像了。
注意如果你用的是Mac M1的电脑,那么需要对Makefile
做一小点修改,具体可见issues
.PHONY: test test: manifests generate fmt vet envtest ## Run tests. #KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out KUBEBUILDER_ASSETS="$(shell $(ENVTEST) --arch=amd64 use $(ENVTEST_K8S_VERSION) -p path)" go test ./... -coverprofile cover.out
接下来就是构建并将镜像推送到镜像仓库:
$ make docker-build docker-push IMG=<some-registry>/<project-name>:tag $ make docker-build docker-push IMG=huiyichanmian/elasitcweb:v0.0.1
等待推送成功后,就可以根据IMG
指定的镜像将控制器部署到集群中:
$ make deploy IMG=<some-registry>/<project-name>:tag $ make deploy IMG=huiyichanmian/elasticweb:v0.0.1
同样,这里可能会遇到镜像gcr.io/kubebuilder/kube-rbac-proxy:v0.8.0
这个镜像拉不下来的情况,这里可以使用kubesphere/kube-rbac-proxy:v0.8.0
进行替代。
可以直接修改config/default/manager_auth_proxy_patch.yaml
或者使用docker tag进行改名。
部署完成后,系统会自动创建项目名- system
的命名空间,我们的控制器所有东西都在这个namespace下。
最后如果要从集群中卸载operator也很简单:
$ make undeploy