kubebuilder 开发operator初探

琉璃3年前技术文章731

1、使用kubebuilder初始化

$ mkdir project
$ cd project
$ kubebuilder init --domain tutorial.kubebuilder.io --repo tutorial.kubebuilder.io/project

$ tree .
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── config
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   └── manager_config_patch.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   └── rbac
│       ├── auth_proxy_client_clusterrole.yaml
│       ├── auth_proxy_role.yaml
│       ├── auth_proxy_role_binding.yaml
│       ├── auth_proxy_service.yaml
│       ├── kustomization.yaml
│       ├── leader_election_role.yaml
│       ├── leader_election_role_binding.yaml
│       ├── role_binding.yaml
│       └── service_account.yaml
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

6 directories, 24 files

2、使用kubebuilder 创建api 对象代码

$ kubebuilder create api --group batch --version v1 --kind CronJob
$ tree .
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1
│       ├── cronjob_types.go
│       ├── groupversion_info.go
│       └── zz_generated.deepcopy.go
├── bin
│   └── controller-gen
├── config
│   ├── crd
│   │   ├── kustomization.yaml
│   │   ├── kustomizeconfig.yaml
│   │   └── patches
│   │       ├── cainjection_in_cronjobs.yaml
│   │       └── webhook_in_cronjobs.yaml
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   └── manager_config_patch.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── cronjob_editor_role.yaml
│   │   ├── cronjob_viewer_role.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role_binding.yaml
│   │   └── service_account.yaml
│   └── samples
│       └── batch_v1_cronjob.yaml
├── controllers
│   ├── cronjob_controller.go
│   └── suite_test.go
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

13 directories, 37 files

3、编辑api 类型文件,定义cr的相关数据结构

# 我们这里只显示了差异的部分
$ vim project/api/v1/cronjob_types.go
import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
)

// +kubebuilder:docs-gen:collapse=Imports
type CronJobSpec struct {
//+kubebuilder:validation:MinLength=0
Schedule string `json:"schedule"`

//+kubebuilder:validation:Minimum=0
// +optional
StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`
// +optional
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
// +optional
Suspend *bool `json:"suspend,omitempty"`
JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"`
//+kubebuilder:validation:Minimum=0
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`
//+kubebuilder:validation:Minimum=0
// +optional
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
}

// +kubebuilder:validation:Enum=Allow;Forbid;Replace
type ConcurrencyPolicy string

const (
// AllowConcurrent allows CronJobs to run concurrently.
AllowConcurrent ConcurrencyPolicy = "Allow"

// ForbidConcurrent forbids concurrent runs, skipping next run if previous
// hasn't finished yet.
ForbidConcurrent ConcurrencyPolicy = "Forbid"

// ReplaceConcurrent cancels currently running job and replaces it with a new one.
ReplaceConcurrent ConcurrencyPolicy = "Replace"
)

type CronJobStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

// A list of pointers to currently running jobs.
// +optional
Active []corev1.ObjectReference `json:"active,omitempty"`

// Information when was the last time the job was successfully scheduled.
// +optional
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
}

//+kubebuilder:docs-gen:collapse=Root Object Definitions

4、编辑controller文件,此文件为operator的控制器部分实现

# 我们这里只显示了差异的部分
$ vim controllers/cronjob_controller.go
import (
"fmt"
"sort"
"time"

"github.com/robfig/cron"
kbatch "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

ref "k8s.io/client-go/tools/reference"

)

type CronJobReconciler struct {
Clock
}

type realClock struct{}

func (_ realClock) Now() time.Time { return time.Now() }
type Clock interface {
Now() time.Time
}

// +kubebuilder:docs-gen:collapse=Clock

//+kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get

var (
scheduledTimeAnnotation = "batch.tutorial.kubebuilder.io/scheduled-at"
)

func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var cronJob batchv1.CronJob
if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
log.Error(err, "unable to fetch CronJob")
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
var childJobs kbatch.JobList
if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
log.Error(err, "unable to list child Jobs")
return ctrl.Result{}, err
}

   // find the active list of jobs
var activeJobs []*kbatch.Job
var successfulJobs []*kbatch.Job
var failedJobs []*kbatch.Job
var mostRecentTime *time.Time // find the last run so we can update the status
isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
for _, c := range job.Status.Conditions {
if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
return true, c.Type
}
}

return false, ""
}
// +kubebuilder:docs-gen:collapse=isJobFinished
getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
timeRaw := job.Annotations[scheduledTimeAnnotation]
if len(timeRaw) == 0 {
return nil, nil
}

timeParsed, err := time.Parse(time.RFC3339, timeRaw)
if err != nil {
return nil, err
}
return &timeParsed, nil
}
// +kubebuilder:docs-gen:collapse=getScheduledTimeForJob
for i, job := range childJobs.Items {
_, finishedType := isJobFinished(&job)
switch finishedType {
case "": // ongoing
activeJobs = append(activeJobs, &childJobs.Items[i])
case kbatch.JobFailed:
failedJobs = append(failedJobs, &childJobs.Items[i])
case kbatch.JobComplete:
successfulJobs = append(successfulJobs, &childJobs.Items[i])
}

// We'll store the launch time in an annotation, so we'll reconstitute that from
// the active jobs themselves.
scheduledTimeForJob, err := getScheduledTimeForJob(&job)
if err != nil {
log.Error(err, "unable to parse schedule time for child job", "job", &job)
continue
}
if scheduledTimeForJob != nil {
if mostRecentTime == nil {
mostRecentTime = scheduledTimeForJob
} else if mostRecentTime.Before(*scheduledTimeForJob) {
mostRecentTime = scheduledTimeForJob
}
}
}
if mostRecentTime != nil {
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
} else {
cronJob.Status.LastScheduleTime = nil
}
cronJob.Status.Active = nil
for _, activeJob := range activeJobs {
jobRef, err := ref.GetReference(r.Scheme, activeJob)
if err != nil {
log.Error(err, "unable to make reference to active job", "job", activeJob)
continue
}
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
}

log.V(1).Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))

if err := r.Status().Update(ctx, &cronJob); err != nil {
log.Error(err, "unable to update CronJob status")
return ctrl.Result{}, err
}
if cronJob.Spec.FailedJobsHistoryLimit != nil {
sort.Slice(failedJobs, func(i, j int) bool {
if failedJobs[i].Status.StartTime == nil {
return failedJobs[j].Status.StartTime != nil
}
return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
})
for i, job := range failedJobs {
if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete old failed job", "job", job)
} else {
log.V(0).Info("deleted old failed job", "job", job)
}
}
}

if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
sort.Slice(successfulJobs, func(i, j int) bool {
if successfulJobs[i].Status.StartTime == nil {
return successfulJobs[j].Status.StartTime != nil
}
return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
})
for i, job := range successfulJobs {
if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
break
}
if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
log.Error(err, "unable to delete old successful job", "job", job)
} else {
log.V(0).Info("deleted old successful job", "job", job)
}
}
}

   if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
log.V(1).Info("cronjob suspended, skipping")
return ctrl.Result{}, nil
}
getNextSchedule := func(cronJob *batchv1.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
if err != nil {
return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
}

// for optimization purposes, cheat a bit and start from our last observed run time
// we could reconstitute this here, but there's not much point, since we've
// just updated it.
var earliestTime time.Time
if cronJob.Status.LastScheduleTime != nil {
earliestTime = cronJob.Status.LastScheduleTime.Time
} else {
earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
}
if cronJob.Spec.StartingDeadlineSeconds != nil {
// controller is not going to schedule anything below this point
schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))

if schedulingDeadline.After(earliestTime) {
earliestTime = schedulingDeadline
}
}
if earliestTime.After(now) {
return time.Time{}, sched.Next(now), nil
}

starts := 0
for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
lastMissed = t
// An object might miss several starts. For example, if
// controller gets wedged on Friday at 5:01pm when everyone has
// gone home, and someone comes in on Tuesday AM and discovers
// the problem and restarts the controller, then all the hourly
// jobs, more than 80 of them for one hourly scheduledJob, should
// all start running with no further intervention (if the scheduledJob
// allows concurrency and late starts).
//
// However, if there is a bug somewhere, or incorrect clock
// on controller's server or apiservers (for setting creationTimestamp)
// then there could be so many missed start times (it could be off
// by decades or more), that it would eat up all the CPU and memory
// of this controller. In that case, we want to not try to list
// all the missed start times.
starts++
if starts > 100 {
// We can't get the most recent times so just return an empty slice
return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
}
}
return lastMissed, sched.Next(now), nil
}
// +kubebuilder:docs-gen:collapse=getNextSchedule

// figure out the next times that we need to create
// jobs at (or anything we missed).
missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
if err != nil {
log.Error(err, "unable to figure out CronJob schedule")
// we don't really care about requeuing until we get an update that
// fixes the schedule, so don't return an error
return ctrl.Result{}, nil
}

/*
We'll prep our eventual request to requeue until the next job, and then figure
out if we actually need to run.
*/
scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // save this so we can re-use it elsewhere
log = log.WithValues("now", r.Now(), "next run", nextRun)

/*
### 6: Run a new job if it's on schedule, not past the deadline, and not blocked by our concurrency policy

If we've missed a run, and we're still within the deadline to start it, we'll need to run a job.
*/
if missedRun.IsZero() {
log.V(1).Info("no upcoming scheduled times, sleeping until next")
return scheduledResult, nil
}

// make sure we're not too late to start the run
log = log.WithValues("current run", missedRun)
tooLate := false
if cronJob.Spec.StartingDeadlineSeconds != nil {
tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
}
if tooLate {
log.V(1).Info("missed starting deadline for last run, sleeping till next")
// TODO(directxman12): events
return scheduledResult, nil
}

/*
If we actually have to run a job, we'll need to either wait till existing ones finish,
replace the existing ones, or just add new ones.  If our information is out of date due
to cache delay, we'll get a requeue when we get up-to-date information.
*/
// figure out how to run this job -- concurrency policy might forbid us from running
// multiple at the same time...
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(activeJobs) > 0 {
log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
return scheduledResult, nil
}

// ...or instruct us to replace existing ones...
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
for _, activeJob := range activeJobs {
// we don't care if the job was already deleted
if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
log.Error(err, "unable to delete active job", "job", activeJob)
return ctrl.Result{}, err
}
}
}

/*
Once we've figured out what to do with existing jobs, we'll actually create our desired job
*/

/*
We need to construct a job based on our CronJob's template.  We'll copy over the spec
from the template and copy some basic object meta.

Then, we'll set the "scheduled time" annotation so that we can reconstitute our
`LastScheduleTime` field each reconcile.

Finally, we'll need to set an owner reference.  This allows the Kubernetes garbage collector
to clean up jobs when we delete the CronJob, and allows controller-runtime to figure out
which cronjob needs to be reconciled when a given job changes (is added, deleted, completes, etc).
*/
constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
// We want job names for a given nominal start time to have a deterministic name to avoid the same job being created twice
name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())

job := &kbatch.Job{
ObjectMeta: metav1.ObjectMeta{
Labels:      make(map[string]string),
Annotations: make(map[string]string),
Name:        name,
Namespace:   cronJob.Namespace,
},
Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
}
for k, v := range cronJob.Spec.JobTemplate.Annotations {
job.Annotations[k] = v
}
job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
for k, v := range cronJob.Spec.JobTemplate.Labels {
job.Labels[k] = v
}
if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
return nil, err
}

return job, nil
}
// +kubebuilder:docs-gen:collapse=constructJobForCronJob

// actually make the job...
job, err := constructJobForCronJob(&cronJob, missedRun)
if err != nil {
log.Error(err, "unable to construct job from template")
// don't bother requeuing until we get a change to the spec
return scheduledResult, nil
}

// ...and create it on the cluster
if err := r.Create(ctx, job); err != nil {
log.Error(err, "unable to create Job for CronJob", "job", job)
return ctrl.Result{}, err
}

log.V(1).Info("created Job for CronJob run", "job", job)

/*
### 7: Requeue when we either see a running job or it's time for the next scheduled run

Finally, we'll return the result that we prepped above, that says we want to requeue
when our next run would need to occur.  This is taken as a maximum deadline -- if something
else changes in between, like our job starts or finishes, we get modified, etc, we might
reconcile again sooner.
*/
// we'll requeue once we see the running job, and update our status
return scheduledResult, nil
}

/*
### Setup

Finally, we'll update our setup.  In order to allow our reconciler to quickly
look up Jobs by their owner, we'll need an index.  We declare an index key that
we can later use with the client as a pseudo-field name, and then describe how to
extract the indexed value from the Job object.  The indexer will automatically take
care of namespaces for us, so we just have to extract the owner name if the Job has
a CronJob owner.

Additionally, we'll inform the manager that this controller owns some Jobs, so that it
will automatically call Reconcile on the underlying CronJob when a Job changes, is
deleted, etc.
*/
var (
jobOwnerKey = ".metadata.controller"
apiGVStr    = batchv1.GroupVersion.String()
)

func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
// set up a real clock, since we're not in a test
if r.Clock == nil {
r.Clock = realClock{}
}

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(rawObj client.Object) []string {
// grab the job object, extract the owner...
job := rawObj.(*kbatch.Job)
owner := metav1.GetControllerOf(job)
if owner == nil {
return nil
}
// ...make sure it's a CronJob...
if owner.APIVersion != apiGVStr || owner.Kind != "CronJob" {
return nil
}

// ...and if so, return it
return []string{owner.Name}
}); err != nil {
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.CronJob{}).
Owns(&kbatch.Job{}).
Complete(r)
}

5、编辑main主函数文件,支持调试时关闭webhook

$ vim main.go
// +kubebuilder:docs-gen:collapse=Imports

func main() {
if os.Getenv("ENABLE_WEBHOOKS") != "false" {
if err = (&batchv1.CronJob{}).SetupWebhookWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "CronJob")
os.Exit(1)
}
}
// +kubebuilder:docs-gen:collapse=old stuff
}

6、创建webhook 初始代码

$ kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation
$ tree .
.
├── Dockerfile
├── Makefile
├── PROJECT
├── README.md
├── api
│   └── v1
│       ├── cronjob_types.go
│       ├── cronjob_webhook.go
│       ├── groupversion_info.go
│       ├── webhook_suite_test.go
│       └── zz_generated.deepcopy.go
├── bin
│   ├── controller-gen
│   └── kustomize
├── config
│   ├── certmanager
│   │   ├── certificate.yaml
│   │   ├── kustomization.yaml
│   │   └── kustomizeconfig.yaml
│   ├── crd
│   │   ├── bases
│   │   │   └── batch.tutorial.kubebuilder.io_cronjobs.yaml
│   │   ├── kustomization.yaml
│   │   ├── kustomizeconfig.yaml
│   │   └── patches
│   │       ├── cainjection_in_cronjobs.yaml
│   │       └── webhook_in_cronjobs.yaml
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   ├── manager_config_patch.yaml
│   │   ├── manager_webhook_patch.yaml
│   │   └── webhookcainjection_patch.yaml
│   ├── manager
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   ├── rbac
│   │   ├── auth_proxy_client_clusterrole.yaml
│   │   ├── auth_proxy_role.yaml
│   │   ├── auth_proxy_role_binding.yaml
│   │   ├── auth_proxy_service.yaml
│   │   ├── cronjob_editor_role.yaml
│   │   ├── cronjob_viewer_role.yaml
│   │   ├── kustomization.yaml
│   │   ├── leader_election_role.yaml
│   │   ├── leader_election_role_binding.yaml
│   │   ├── role.yaml
│   │   ├── role_binding.yaml
│   │   └── service_account.yaml
│   ├── samples
│   │   └── batch_v1_cronjob.yaml
│   └── webhook
│       ├── kustomization.yaml
│       ├── kustomizeconfig.yaml
│       ├── manifests.yaml
│       └── service.yaml
├── controllers
│   ├── cronjob_controller.go
│   └── suite_test.go
├── go.mod
├── go.sum
├── hack
│   └── boilerplate.go.txt
└── main.go

16 directories, 51 files

7、编写webhook 代码

$ vim api/v1/cronjob_webhook.go

package v1

import (
"github.com/robfig/cron"
apierrors "k8s.io/apimachinery/pkg/api/errors"
  "k8s.io/apimachinery/pkg/runtime/schema"
validationutils "k8s.io/apimachinery/pkg/util/validation"
"k8s.io/apimachinery/pkg/util/validation/field"
)

//+kubebuilder:webhook:path=/mutate-batch-tutorial-kubebuilder-io-v1-cronjob,mutating=true,failurePolicy=fail,groups=batch.tutorial.kubebuilder.io,resources=cronjobs,verbs=create;update,versions=v1,name=mcronjob.kb.io,sideEffects=None,admissionReviewVersions=v1

func (r *CronJob) Default() {
  cronjoblog.Info("default", "name", r.Name)

if r.Spec.ConcurrencyPolicy == "" {
r.Spec.ConcurrencyPolicy = AllowConcurrent
}
if r.Spec.Suspend == nil {
r.Spec.Suspend = new(bool)
}
if r.Spec.SuccessfulJobsHistoryLimit == nil {
r.Spec.SuccessfulJobsHistoryLimit = new(int32)
*r.Spec.SuccessfulJobsHistoryLimit = 3
}
if r.Spec.FailedJobsHistoryLimit == nil {
r.Spec.FailedJobsHistoryLimit = new(int32)
*r.Spec.FailedJobsHistoryLimit = 1
}
}

//+kubebuilder:webhook:verbs=create;update;delete,path=/validate-batch-tutorial-kubebuilder-io-v1-cronjob,mutating=false,failurePolicy=fail,groups=batch.tutorial.kubebuilder.io,resources=cronjobs,versions=v1,name=vcronjob.kb.io,sideEffects=None,admissionReviewVersions=v1

func (r *CronJob) ValidateCreate() error {
cronjoblog.Info("validate create", "name", r.Name)

return r.validateCronJob()
}

func (r *CronJob) ValidateUpdate(old runtime.Object) error {
cronjoblog.Info("validate update", "name", r.Name)

return r.validateCronJob()
}

func (r *CronJob) validateCronJob() error {
var allErrs field.ErrorList
if err := r.validateCronJobName(); err != nil {
allErrs = append(allErrs, err)
}
if err := r.validateCronJobSpec(); err != nil {
allErrs = append(allErrs, err)
}
if len(allErrs) == 0 {
return nil
}

return apierrors.NewInvalid(
schema.GroupKind{Group: "batch.tutorial.kubebuilder.io", Kind: "CronJob"},
r.Name, allErrs)
}

func (r *CronJob) validateCronJobSpec() *field.Error {
// The field helpers from the kubernetes API machinery help us return nicely
// structured validation errors.
return validateScheduleFormat(
r.Spec.Schedule,
field.NewPath("spec").Child("schedule"))
}

func validateScheduleFormat(schedule string, fldPath *field.Path) *field.Error {
if _, err := cron.ParseStandard(schedule); err != nil {
return field.Invalid(fldPath, schedule, err.Error())
}
return nil
}

func (r *CronJob) validateCronJobName() *field.Error {
if len(r.ObjectMeta.Name) > validationutils.DNS1035LabelMaxLength-11 {
// The job name length is 63 character like all Kubernetes objects
// (which must fit in a DNS subdomain). The cronjob controller appends
// a 11-character suffix to the cronjob (`-$TIMESTAMP`) when creating
// a job. The job name length limit is 63 characters. Therefore cronjob
// names must have length <= 63-11=52. If we don't validate this here,
// then job creation will fail later.
return field.Invalid(field.NewPath("metadata").Child("name"), r.Name, "must be no more than 52 characters")
}
return nil
}

// +kubebuilder:docs-gen:collapse=Validate object name


相关文章

Flink 运行架构简介

Flink 运行架构简介

一、Flink简介Apache Flink 是一个开源的分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态计算。它具备强一致性的计算能力、大规模的扩展性,整体性能非常卓越,同时支持SQL、Ja...

em升级&添加节点实践

em升级&添加节点实践

一、扩容前准备 1.格式化磁盘分区并挂载(1)设置gpt分区表          &nbs...

cattle-cluster-agent产生的僵尸进程处理

cattle-cluster-agent产生的僵尸进程处理

现象zabbix告警服务器存在僵尸进程排查步骤1、登陆服务器使用top命令,确认下僵尸进程数量。2、使用ps -aux | grep Z命令,过滤出僵尸进程PID。3、根据查到的PID,借助ps命令,...

RDS通过DMS管理登录处理

RDS通过DMS管理登录处理

问题描述无法通过DMS管理登录进入数据库,报错如下:问题处理方式一在RDS控制台新建账号 账号管理--创建账号将此数据库添加进DMS在DMS控制台--数据库实例--新增实例将新建的数据库账号信息进行录...

greenplum安装配置

一、配置服务器安装环境1、修改服务器内核参数(所有节点)编辑sysctl.conf,需要设置共享内存Greenplum数据库使用共享内存在属于同一postgres实例的postgres进程之间进行通信...

flink算子优化

flink算子优化

这里先看两个任务的逻辑执行图:图一:全部打散的任务执行图图二:使用slot资源共享的任务执行图图一和图二中的两个任务是同一个任务,不同点是图一将所有的算子全部打散,在代码中使用了以下逻辑:,或者就是禁...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。