Kubevela源码解读(一):application_controller解读
application_controller是kubevela最主要的一个控制器,作用就是将用户创建的applicaion转化为实际需要创建的资源对象,通过本文可以:
1、了解cue模版在kubevela中的运用,context key的由来
2、workflowstep如何关联注册的provider
3、了解application创建后,集群如何生成资源对象
基本流程如下
1、用户往集群提交application资源
2、控制器监听到资源创建,将资源对象转化为appfile对象
3、根据appfile对象,生成应用工作流程步骤WorkflowSteps
4、根据上一步生成的WorkflowSteps,生成runner,每个WorkflowSteps对应一个runner
5、循环的执行runner,直到结束。至此application_controller的工作就完成了
一、生成appfile
// 传人application, 初始化appfile func (p *Parser) GenerateAppFileFromApp(ctx context.Context, app *v1beta1.Application) (*Appfile, error) { ns := app.Namespace appName := app.Name appfile := p.newAppfile(appName, ns, app) // 初始化一个 appFile 结构体,内部主要是一些初始化操作 if app.Status.LatestRevision != nil { appfile.AppRevisionName = app.Status.LatestRevision.Name } var wds []*Workload for _, comp := range app.Spec.Components { // 将application的Components转化为workload,注入appfile wd, err := p.parseWorkload(ctx, comp) if err != nil { return nil, err } wds = append(wds, wd) } appfile.Workloads = wds appfile.Components = app.Spec.Components var err error if err = p.parseWorkflowSteps(ctx, appfile); err != nil { // 生成默认的WorkflowSteps,注入appfile return nil, errors.Wrapf(err, "failed to parseWorkflowSteps") } if err = p.parsePolicies(ctx, appfile); err != nil { return nil, errors.Wrapf(err, "failed to parsePolicies") } if err = p.parseReferredObjects(ctx, appfile); err != nil { return nil, errors.Wrapf(err, "failed to parseReferredObjects") } ... return appfile, nil } // 生成workload,内部模版的解析由engine实现Complete func (p *Parser) convertTemplate2Workload(name, typ string, props *runtime.RawExtension, templ *Template) (*Workload, error) { settings, err := util.RawExtension2Map(props) if err != nil { return nil, errors.WithMessagef(err, "fail to parse settings for %s", name) } wlType, err := util.ConvertDefinitionRevName(typ) if err != nil { wlType = typ } return &Workload{ Traits: []*Trait{}, ScopeDefinition: []*v1beta1.ScopeDefinition{}, Name: name, Type: wlType, CapabilityCategory: templ.CapabilityCategory, FullTemplate: templ, Params: settings, engine: definition.NewWorkloadAbstractEngine(name, p.pd), }, nil } // WorkflowSteps生成逻辑 func (p *Parser) loadWorkflowToAppfile(ctx context.Context, af *Appfile) error { var err error // parse workflow steps af.WorkflowMode = &workflowv1alpha1.WorkflowExecuteMode{ Steps: workflowv1alpha1.WorkflowModeDAG, SubSteps: workflowv1alpha1.WorkflowModeDAG, } // 如果用户手动指定了 Workflow 这里就解析一下 if wfSpec := af.app.Spec.Workflow; wfSpec != nil { app := af.app mode := wfSpec.Mode // 根据 ref 中填的 name 找到对应 Workflow if wfSpec.Ref != "" && mode == nil { wf := &workflowv1alpha1.Workflow{} if err := af.WorkflowClient(p.client).Get(ctx, ktypes.NamespacedName{Namespace: af.app.Namespace, Name: app.Spec.Workflow.Ref}, wf); err != nil { return err } mode = wf.Mode } af.WorkflowSteps = wfSpec.Steps af.WorkflowMode.Steps = workflowv1alpha1.WorkflowModeStep if mode != nil { if mode.Steps != "" { af.WorkflowMode.Steps = mode.Steps } if mode.SubSteps != "" { af.WorkflowMode.SubSteps = mode.SubSteps } } } // 然后开始生成 Workflow ,chain 形式,按顺序执行多个 generator,直到其中某一个步骤生成 Workflow 为止 af.WorkflowSteps, err = step.NewChainWorkflowStepGenerator( // KubeVela 中支持引用集群中已经创建好的 Workflow,这里则是在处理这部分逻辑,根据 ref 字段名找到对应的 Workflow &step.RefWorkflowStepGenerator{Client: af.WorkflowClient(p.client), Context: ctx}, // 根据 topology 类型的 policy 来生成 Workflow &step.DeployWorkflowStepGenerator{}, // 部署到指定环境 &step.Deploy2EnvWorkflowStepGenerator{}, // 如果前面几个步骤都没有成功生成才会执行这部分逻辑,生成一个 apply-component 类型的 WorkflowStep,直接将组件部署到 local 集群的 default 命名空间。 &step.ApplyComponentWorkflowStepGenerator{}, // 在所有部署步骤之前生成挂起工作流步骤 &step.DeployPreApproveWorkflowStepGenerator{}, ).Generate(af.app, af.WorkflowSteps) return err }
二、根据appfile生成runnner
func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context, app *v1beta1.Application, appParser *appfile.Parser, af *appfile.Appfile) (*wfTypes.WorkflowInstance, []wfTypes.TaskRunner, error) { appRev := h.currentAppRev t := time.Now() defer func() { metrics.AppReconcileStageDurationHistogram.WithLabelValues("generate-app-steps").Observe(time.Since(t).Seconds()) }() appLabels := map[string]string{ oam.LabelAppName: app.Name, oam.LabelAppNamespace: app.Namespace, } // Install 用于注册 handlerProviders := providers.NewProviders() kube.Install(handlerProviders, h.r.Client, appLabels, &kube.Handlers{ Apply: h.Dispatch, Delete: h.Delete, }) configprovider.Install(handlerProviders, h.r.Client, func(ctx context.Context, resources []*unstructured.Unstructured, applyOptions []apply.ApplyOption) error { for _, res := range resources { res.SetLabels(util.MergeMapOverrideWithDst(res.GetLabels(), appLabels)) } return h.resourceKeeper.Dispatch(ctx, resources, applyOptions) }) oamProvider.Install(handlerProviders, app, af, h.r.Client, h.applyComponentFunc( appParser, appRev, af), h.renderComponentFunc(appParser, appRev, af)) pCtx := velaprocess.NewContext(generateContextDataFromApp(app, appRev.Name)) renderer := func(ctx context.Context, comp common.ApplicationComponent) (*appfile.Workload, error) { return appParser.ParseWorkloadFromRevisionAndClient(ctx, comp, appRev) } multiclusterProvider.Install(handlerProviders, h.r.Client, app, af, h.applyComponentFunc(appParser, appRev, af), h.checkComponentHealth(appParser, appRev, af), renderer) terraformProvider.Install(handlerProviders, app, renderer) query.Install(handlerProviders, h.r.Client, nil) // 为application生成一个instance instance := generateWorkflowInstance(af, app) executor.InitializeWorkflowInstance(instance) // 用于后面执行runner runners, err := generator.GenerateRunners(ctx, instance, wfTypes.StepGeneratorOptions{ Providers: handlerProviders, PackageDiscover: h.r.pd, ProcessCtx: pCtx, TemplateLoader: template.NewWorkflowStepTemplateRevisionLoader(appRev, h.r.dm), Client: h.r.Client, StepConvertor: map[string]func(step workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error){ wfTypes.WorkflowStepTypeApplyComponent: func(lstep workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error) { copierStep := lstep.DeepCopy() if err := convertStepProperties(copierStep, app); err != nil { return lstep, errors.WithMessage(err, "convert [apply-component]") } copierStep.Type = wfTypes.WorkflowStepTypeBuiltinApplyComponent return *copierStep, nil }, }, }) if err != nil { return nil, nil, err } return instance, runners, nil } // 生成runner其实就是根据WorkflowSteps,调用generateTaskRunner方法 func GenerateRunners(ctx monitorContext.Context, instance *types.WorkflowInstance, options types.StepGeneratorOptions) ([]types.TaskRunner, error) { ctx.V(options.LogLevel) subCtx := ctx.Fork("generate-task-runners", monitorContext.DurationMetric(func(v float64) { metrics.GenerateTaskRunnersDurationHistogram.WithLabelValues("workflowrun").Observe(v) })) defer subCtx.Commit("finish generate task runners") options = initStepGeneratorOptions(ctx, instance, options) taskDiscover := tasks.NewTaskDiscover(ctx, options) var tasks []types.TaskRunner for _, step := range instance.Steps { opt := &types.TaskGeneratorOptions{ ID: generateStepID(instance.Status, step.Name), PackageDiscover: options.PackageDiscover, ProcessContext: options.ProcessCtx, } for typ, convertor := range options.StepConvertor { if step.Type == typ { opt.StepConvertor = convertor } } task, err := generateTaskRunner(ctx, instance, step, taskDiscover, opt, options) if err != nil { return nil, err } tasks = append(tasks, task) } return tasks, nil } // runner的创建实现是由customTaskDiscover类型完成 func (td *taskDiscover) GetTaskGenerator(ctx context.Context, name string) (types.TaskGenerator, error) { tg, ok := td.builtin[name] if ok { return tg, nil } if td.customTaskDiscover != nil { var err error tg, err = td.customTaskDiscover.GetTaskGenerator(ctx, name) if err != nil { return nil, err } return tg, nil } return nil, errors.Errorf("can't find task generator: %s", name) } // 生成runner,makeTaskGenerator里面有居然的runner实现(单独讲解) func (t *TaskLoader) GetTaskGenerator(ctx context.Context, name string) (types.TaskGenerator, error) { templ, err := t.loadTemplate(ctx, name) if err != nil { return nil, err } return t.makeTaskGenerator(templ) } // 上一步loadTemplate底层的实现如下,主要还是从k8s里面拿到实际的cue模版 func getDefinitionTemplate(ctx context.Context, cli client.Client, definitionName string) (string, error) { const ( definitionAPIVersion = "core.oam.dev/v1beta1" kindWorkflowStepDefinition = "WorkflowStepDefinition" ) definition := &unstructured.Unstructured{} definition.SetAPIVersion(definitionAPIVersion) definition.SetKind(kindWorkflowStepDefinition) ns := getDefinitionNamespaceWithCtx(ctx) if err := cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: ns}, definition); err != nil { if apierrors.IsNotFound(err) { if err := cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: systemDefinitionNamespace}, definition); err != nil { return "", err } } else { return "", err } } d := new(def) if err := runtime.DefaultUnstructuredConverter.FromUnstructured(definition.Object, d); err != nil { return "", errors.Wrap(err, "invalid workflow step definition") } return d.Spec.Schematic.CUE.Template, nil }
三、runner实现
func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error) { return func(wfStep v1alpha1.WorkflowStep, genOpt *types.TaskGeneratorOptions) (types.TaskRunner, error) { // ... // 获取cue模版的参数 paramsStr, err := GetParameterTemplate(wfStep) if err != nil { return nil, err } tRunner := new(taskRunner) tRunner.name = wfStep.Name tRunner.checkPending = func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) { options := &types.TaskRunOptions{} if t.runOptionsProcess != nil { t.runOptionsProcess(options) } basicVal, _, _ := MakeBasicValue(ctx, wfCtx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx) return CheckPending(wfCtx, wfStep, exec.wfStatus.ID, stepStatus, basicVal) } // 这个run方法用于Executor的调用 tRunner.run = func(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) { if options.GetTracer == nil { options.GetTracer = func(id string, step v1alpha1.WorkflowStep) monitorContext.Context { return monitorContext.NewTraceContext(context.Background(), "") } } tracer := options.GetTracer(exec.wfStatus.ID, wfStep).AddTag("step_name", wfStep.Name, "step_type", wfStep.Type) tracer.V(t.logLevel) defer func() { tracer.Commit(string(exec.status().Phase)) }() if t.runOptionsProcess != nil { t.runOptionsProcess(options) } basicVal, basicTemplate, err := MakeBasicValue(tracer, ctx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx) if err != nil { tracer.Error(err, "make context parameter") return v1alpha1.StepStatus{}, nil, errors.WithMessage(err, "make context parameter") } var taskv *value.Value // ... for _, hook := range options.PreCheckHooks { result, err := hook(wfStep, &types.PreCheckOptions{ PackageDiscover: t.pd, BasicTemplate: basicTemplate, BasicValue: basicVal, }) if err != nil { tracer.Error(err, "do preCheckHook") exec.Skip(fmt.Sprintf("pre check error: %s", err.Error())) return exec.status(), exec.operation(), nil } if result.Skip { exec.Skip("") return exec.status(), exec.operation(), nil } if result.Timeout { exec.timeout("") } } for _, hook := range options.PreStartHooks { if err := hook(ctx, basicVal, wfStep); err != nil { tracer.Error(err, "do preStartHook") exec.err(ctx, false, err, types.StatusReasonInput) return exec.status(), exec.operation(), nil } } // refresh the basic template to get inputs value involved basicTemplate, err = basicVal.String() if err != nil { exec.err(ctx, false, err, types.StatusReasonParameter) return exec.status(), exec.operation(), nil } if status, ok := options.StepStatus[wfStep.Name]; ok { exec.stepStatus = status } taskv, err = value.NewValue(strings.Join([]string{templ, basicTemplate}, "\n"), t.pd, "", value.ProcessScript, value.TagFieldOrder) if err != nil { exec.err(ctx, false, err, types.StatusReasonRendering) return exec.status(), exec.operation(), nil } exec.tracer = tracer if debugLog(taskv) { exec.printStep("workflowStepStart", "workflow", "", taskv) defer exec.printStep("workflowStepEnd", "workflow", "", taskv) } // runner会调用doSteps方法 if err := exec.doSteps(tracer, ctx, taskv); err != nil { tracer.Error(err, "do steps") exec.err(ctx, true, err, types.StatusReasonExecute) return exec.status(), exec.operation(), nil } return exec.status(), exec.operation(), nil } return tRunner, nil }, nil } // doSteps就是从provider注册的方法中获取,然后调用 func (exec *executor) doSteps(ctx monitorContext.Context, wfCtx wfContext.Context, v *value.Value) error { do := OpTpy(v) if do != "" && do != "steps" { provider := opProvider(v) if err := exec.Handle(ctx, wfCtx, provider, do, v); err != nil { return errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do) } return nil } return v.StepByFields(func(fieldName string, in *value.Value) (bool, error) { if in.CueValue().IncompleteKind() == cue.BottomKind { errInfo, err := sets.ToString(in.CueValue()) if err != nil { errInfo = "value is _|_" } return true, errors.New(errInfo + "(bottom kind)") } if retErr := in.CueValue().Err(); retErr != nil { errInfo, err := sets.ToString(in.CueValue()) if err == nil { retErr = errors.WithMessage(retErr, errInfo) } return false, retErr } if isStepList(fieldName) { return false, in.StepByList(func(name string, item *value.Value) (bool, error) { do := OpTpy(item) if do == "" { return false, nil } return false, exec.doSteps(ctx, wfCtx, item) }) } do := OpTpy(in) if do == "" { return false, nil } if do == "steps" { if err := exec.doSteps(ctx, wfCtx, in); err != nil { return false, err } } else { provider := opProvider(in) if err := exec.Handle(ctx, wfCtx, provider, do, in); err != nil { return false, errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do) } } if exec.suspend || exec.terminated || exec.wait { return true, nil } return false, nil }) }
四、runner执行
// 生成一个runengine func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunners []types.TaskRunner) (v1alpha1.WorkflowRunPhase, error) { InitializeWorkflowInstance(w.instance) status := &w.instance.Status dagMode := status.Mode.Steps == v1alpha1.WorkflowModeDAG cacheKey := fmt.Sprintf("%s-%s", w.instance.Name, w.instance.Namespace) allRunnersDone, allRunnersSucceeded := checkRunners(taskRunners, w.instance.Status) if status.Finished { StepStatusCache.Delete(cacheKey) } if checkWorkflowTerminated(status, allRunnersDone) { if isTerminatedManually(status) { return v1alpha1.WorkflowStateTerminated, nil } return v1alpha1.WorkflowStateFailed, nil } if checkWorkflowSuspended(status) { return v1alpha1.WorkflowStateSuspending, nil } if allRunnersSucceeded { return v1alpha1.WorkflowStateSucceeded, nil } wfCtx, err := w.makeContext(ctx, w.instance.Name) if err != nil { ctx.Error(err, "make context") return v1alpha1.WorkflowStateExecuting, err } w.wfCtx = wfCtx if cacheValue, ok := StepStatusCache.Load(cacheKey); ok { if len(status.Steps) < cacheValue.(int) { return v1alpha1.WorkflowStateSkipped, nil } } e := newEngine(ctx, wfCtx, w, status, taskRunners) err = e.Run(ctx, taskRunners, dagMode) if err != nil { ctx.Error(err, "run steps") StepStatusCache.Store(cacheKey, len(status.Steps)) return v1alpha1.WorkflowStateExecuting, err } StepStatusCache.Store(cacheKey, len(status.Steps)) if feature.DefaultMutableFeatureGate.Enabled(features.EnablePatchStatusAtOnce) { return e.status.Phase, nil } return e.checkWorkflowPhase(), nil } // 调用每一个runner的run方法 func (e *engine) steps(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error { wfCtx := e.wfCtx for index, runner := range taskRunners { //... options := e.generateRunOptions(ctx, e.findDependPhase(taskRunners, index, dag)) status, operation, err := runner.Run(wfCtx, options) if err != nil { return err } //... } return nil }
五、Provider注册的方案示例
func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentApply { return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) { // 生成k8s对象 wl, manifest, err := h.prepareWorkloadAndManifests(ctx, appParser, comp, appRev, patcher, af) if err != nil { return nil, nil, false, err } if len(manifest.PackagedWorkloadResources) != 0 { if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, manifest.PackagedWorkloadResources...); err != nil { return nil, nil, false, errors.WithMessage(err, "cannot dispatch packaged workload resources") } } wl.Ctx.SetCtx(auth.ContextWithUserInfo(ctx, h.app)) readyWorkload, readyTraits, err := renderComponentsAndTraits(h.r.Client, manifest, appRev, clusterName, overrideNamespace, env) if err != nil { return nil, nil, false, err } checkSkipApplyWorkload(wl) isHealth := true if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) { manifestDispatchers, err := h.generateDispatcher(appRev, readyWorkload, readyTraits, overrideNamespace) if err != nil { return nil, nil, false, errors.WithMessage(err, "generateDispatcher") } for _, dispatcher := range manifestDispatchers { if isHealth, err := dispatcher.run(ctx, wl, appRev, clusterName); !isHealth || err != nil { return nil, nil, false, err } } } else { dispatchResources := readyTraits if !wl.SkipApplyWorkload { dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...) } if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil { return nil, nil, false, errors.WithMessage(err, "Dispatch") } _, _, _, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false) if err != nil { return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus") } } if DisableResourceApplyDoubleCheck { return readyWorkload, readyTraits, isHealth, nil } workload, traits, err := getComponentResources(auth.ContextWithUserInfo(ctx, h.app), manifest, wl.SkipApplyWorkload, h.r.Client) return workload, traits, isHealth, err } } func (af *Appfile) GenerateComponentManifest(wl *Workload, mutate func(*velaprocess.ContextData)) (*types.ComponentManifest, error) { if af.Namespace == "" { af.Namespace = corev1.NamespaceDefault } // 注入公共的context key,用于模版参数 ctxData := GenerateContextDataFromAppFile(af, wl.Name) if mutate != nil { mutate(&ctxData) } wl.Ctx = NewBasicContext(ctxData, wl.Params) switch wl.CapabilityCategory { case types.HelmCategory: return generateComponentFromHelmModule(wl, ctxData) case types.KubeCategory: return generateComponentFromKubeModule(wl, ctxData) case types.TerraformCategory: return generateComponentFromTerraformModule(wl, af.Name, af.Namespace) default: return generateComponentFromCUEModule(wl, ctxData) } } // generateComponentFromCUEModule的实现 func baseGenerateComponent(pCtx process.Context, wl *Workload, appName, ns string) (*types.ComponentManifest, error) { var err error pCtx.PushData(velaprocess.ContextComponentType, wl.Type) for _, tr := range wl.Traits { if err := tr.EvalContext(pCtx); err != nil { // cue的渲染模版,调用engine的Complete方法 return nil, errors.Wrapf(err, "evaluate template trait=%s app=%s", tr.Name, wl.Name) } } if patcher := wl.Patch; patcher != nil { workload, auxiliaries := pCtx.Output() if p, err := patcher.LookupValue("workload"); err == nil { if err := workload.Unify(p.CueValue()); err != nil { return nil, errors.WithMessage(err, "patch workload") } } for _, aux := range auxiliaries { if p, err := patcher.LookupByScript(fmt.Sprintf("traits[\"%s\"]", aux.Name)); err == nil && p.CueValue().Err() == nil { if err := aux.Ins.Unify(p.CueValue()); err != nil { return nil, errors.WithMessagef(err, "patch outputs.%s", aux.Name) } } } } compManifest, err := evalWorkloadWithContext(pCtx, wl, ns, appName) if err != nil { return nil, err } compManifest.Name = wl.Name // we record the external revision name in ExternalRevision field compManifest.ExternalRevision = wl.ExternalRevision compManifest.Scopes = make([]*corev1.ObjectReference, len(wl.Scopes)) for i, s := range wl.Scopes { compManifest.Scopes[i] = &corev1.ObjectReference{ APIVersion: metav1.GroupVersion{ Group: s.GVK.Group, Version: s.GVK.Version, }.String(), Kind: s.GVK.Kind, Name: s.Name, } } return compManifest, nil }