Kubevela源码解读(一):application_controller解读

雅泽1年前技术文章328


application_controller是kubevela最主要的一个控制器,作用就是将用户创建的applicaion转化为实际需要创建的资源对象,通过本文可以:

1、了解cue模版在kubevela中的运用,context key的由来

2、workflowstep如何关联注册的provider

3、了解application创建后,集群如何生成资源对象

基本流程如下

1、用户往集群提交application资源

2、控制器监听到资源创建,将资源对象转化为appfile对象

3、根据appfile对象,生成应用工作流程步骤WorkflowSteps

4、根据上一步生成的WorkflowSteps,生成runner,每个WorkflowSteps对应一个runner

5、循环的执行runner,直到结束。至此application_controller的工作就完成了


一、生成appfile

// 传人application, 初始化appfile
func (p *Parser) GenerateAppFileFromApp(ctx context.Context, app *v1beta1.Application) (*Appfile, error) {
    ns := app.Namespace
    appName := app.Name
    appfile := p.newAppfile(appName, ns, app)   // 初始化一个 appFile 结构体,内部主要是一些初始化操作
    if app.Status.LatestRevision != nil {
        appfile.AppRevisionName = app.Status.LatestRevision.Name
    }

    var wds []*Workload
    for _, comp := range app.Spec.Components {  // 将application的Components转化为workload,注入appfile
        wd, err := p.parseWorkload(ctx, comp)   
        if err != nil {
            return nil, err
        }

        wds = append(wds, wd)
    }
    appfile.Workloads = wds
    appfile.Components = app.Spec.Components

    var err error
    if err = p.parseWorkflowSteps(ctx, appfile); err != nil {  // 生成默认的WorkflowSteps,注入appfile
        return nil, errors.Wrapf(err, "failed to parseWorkflowSteps")
    }
    if err = p.parsePolicies(ctx, appfile); err != nil {
        return nil, errors.Wrapf(err, "failed to parsePolicies")
    }
    if err = p.parseReferredObjects(ctx, appfile); err != nil {
        return nil, errors.Wrapf(err, "failed to parseReferredObjects")
    }
    ...

    return appfile, nil
}


// 生成workload,内部模版的解析由engine实现Complete
func (p *Parser) convertTemplate2Workload(name, typ string, props *runtime.RawExtension, templ *Template) (*Workload, error) {
	settings, err := util.RawExtension2Map(props)
	if err != nil {
		return nil, errors.WithMessagef(err, "fail to parse settings for %s", name)
	}
	wlType, err := util.ConvertDefinitionRevName(typ)
	if err != nil {
		wlType = typ
	}
	return &Workload{
		Traits:             []*Trait{},
		ScopeDefinition:    []*v1beta1.ScopeDefinition{},
		Name:               name,
		Type:               wlType,
		CapabilityCategory: templ.CapabilityCategory,
		FullTemplate:       templ,
		Params:             settings,
		engine:             definition.NewWorkloadAbstractEngine(name, p.pd),
	}, nil
}

// WorkflowSteps生成逻辑
func (p *Parser) loadWorkflowToAppfile(ctx context.Context, af *Appfile) error {
    var err error
    // parse workflow steps
    af.WorkflowMode = &workflowv1alpha1.WorkflowExecuteMode{
        Steps:    workflowv1alpha1.WorkflowModeDAG,
        SubSteps: workflowv1alpha1.WorkflowModeDAG,
    }
    // 如果用户手动指定了 Workflow 这里就解析一下
    if wfSpec := af.app.Spec.Workflow; wfSpec != nil {
        app := af.app
        mode := wfSpec.Mode
        // 根据 ref 中填的 name 找到对应 Workflow
        if wfSpec.Ref != "" && mode == nil {
            wf := &workflowv1alpha1.Workflow{}
            if err := af.WorkflowClient(p.client).Get(ctx, ktypes.NamespacedName{Namespace: af.app.Namespace, Name: app.Spec.Workflow.Ref}, wf); err != nil {
                return err
            }
            mode = wf.Mode
        }
        af.WorkflowSteps = wfSpec.Steps
        af.WorkflowMode.Steps = workflowv1alpha1.WorkflowModeStep
        if mode != nil {
            if mode.Steps != "" {
                af.WorkflowMode.Steps = mode.Steps
            }
            if mode.SubSteps != "" {
                af.WorkflowMode.SubSteps = mode.SubSteps
            }
        }
    }
    // 然后开始生成 Workflow ,chain 形式,按顺序执行多个 generator,直到其中某一个步骤生成 Workflow 为止
    af.WorkflowSteps, err = step.NewChainWorkflowStepGenerator(
        //  KubeVela 中支持引用集群中已经创建好的 Workflow,这里则是在处理这部分逻辑,根据 ref 字段名找到对应的 Workflow
        &step.RefWorkflowStepGenerator{Client: af.WorkflowClient(p.client), Context: ctx},
        // 根据 topology 类型的 policy 来生成 Workflow
        &step.DeployWorkflowStepGenerator{},
        // 部署到指定环境
        &step.Deploy2EnvWorkflowStepGenerator{},
        // 如果前面几个步骤都没有成功生成才会执行这部分逻辑,生成一个 apply-component 类型的 WorkflowStep,直接将组件部署到 local 集群的 default 命名空间。
        &step.ApplyComponentWorkflowStepGenerator{},
        // 在所有部署步骤之前生成挂起工作流步骤
        &step.DeployPreApproveWorkflowStepGenerator{},
    ).Generate(af.app, af.WorkflowSteps)
    return err
}

二、根据appfile生成runnner

func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,
                                              app *v1beta1.Application,
                                              appParser *appfile.Parser,
                                              af *appfile.Appfile) (*wfTypes.WorkflowInstance, []wfTypes.TaskRunner, error) {

    appRev := h.currentAppRev
    t := time.Now()
    defer func() {
        metrics.AppReconcileStageDurationHistogram.WithLabelValues("generate-app-steps").Observe(time.Since(t).Seconds())
    }()

    appLabels := map[string]string{
        oam.LabelAppName:      app.Name,
        oam.LabelAppNamespace: app.Namespace,
    }
    //  Install 用于注册
    handlerProviders := providers.NewProviders()
    kube.Install(handlerProviders, h.r.Client, appLabels, &kube.Handlers{
        Apply:  h.Dispatch,
        Delete: h.Delete,
    })
    configprovider.Install(handlerProviders, h.r.Client, func(ctx context.Context, resources []*unstructured.Unstructured, applyOptions []apply.ApplyOption) error {
        for _, res := range resources {
            res.SetLabels(util.MergeMapOverrideWithDst(res.GetLabels(), appLabels))
        }
        return h.resourceKeeper.Dispatch(ctx, resources, applyOptions)
    })
    oamProvider.Install(handlerProviders, app, af, h.r.Client, h.applyComponentFunc(
        appParser, appRev, af), h.renderComponentFunc(appParser, appRev, af))
    pCtx := velaprocess.NewContext(generateContextDataFromApp(app, appRev.Name))
    renderer := func(ctx context.Context, comp common.ApplicationComponent) (*appfile.Workload, error) {
        return appParser.ParseWorkloadFromRevisionAndClient(ctx, comp, appRev)
    }
    multiclusterProvider.Install(handlerProviders, h.r.Client, app, af,
                                 h.applyComponentFunc(appParser, appRev, af),
                                 h.checkComponentHealth(appParser, appRev, af),
                                 renderer)
    terraformProvider.Install(handlerProviders, app, renderer)
    query.Install(handlerProviders, h.r.Client, nil)

    // 为application生成一个instance
    instance := generateWorkflowInstance(af, app)
    executor.InitializeWorkflowInstance(instance)
    // 用于后面执行runner
    runners, err := generator.GenerateRunners(ctx, instance, wfTypes.StepGeneratorOptions{
        Providers:       handlerProviders,
        PackageDiscover: h.r.pd,
        ProcessCtx:      pCtx,
        TemplateLoader:  template.NewWorkflowStepTemplateRevisionLoader(appRev, h.r.dm),
        Client:          h.r.Client,
        StepConvertor: map[string]func(step workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error){
            wfTypes.WorkflowStepTypeApplyComponent: func(lstep workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error) {
                copierStep := lstep.DeepCopy()
                if err := convertStepProperties(copierStep, app); err != nil {
                    return lstep, errors.WithMessage(err, "convert [apply-component]")
                }
                copierStep.Type = wfTypes.WorkflowStepTypeBuiltinApplyComponent
                return *copierStep, nil
            },
        },
    })
    if err != nil {
        return nil, nil, err
    }
    return instance, runners, nil
}

// 生成runner其实就是根据WorkflowSteps,调用generateTaskRunner方法
func GenerateRunners(ctx monitorContext.Context, instance *types.WorkflowInstance, options types.StepGeneratorOptions) ([]types.TaskRunner, error) {
	ctx.V(options.LogLevel)
	subCtx := ctx.Fork("generate-task-runners", monitorContext.DurationMetric(func(v float64) {
		metrics.GenerateTaskRunnersDurationHistogram.WithLabelValues("workflowrun").Observe(v)
	}))
	defer subCtx.Commit("finish generate task runners")
	options = initStepGeneratorOptions(ctx, instance, options)
	taskDiscover := tasks.NewTaskDiscover(ctx, options)
	var tasks []types.TaskRunner
	for _, step := range instance.Steps {
		opt := &types.TaskGeneratorOptions{
			ID:              generateStepID(instance.Status, step.Name),
			PackageDiscover: options.PackageDiscover,
			ProcessContext:  options.ProcessCtx,
		}
		for typ, convertor := range options.StepConvertor {
			if step.Type == typ {
				opt.StepConvertor = convertor
			}
		}
		task, err := generateTaskRunner(ctx, instance, step, taskDiscover, opt, options)
		if err != nil {
			return nil, err
		}
		tasks = append(tasks, task)
	}
	return tasks, nil
}

// runner的创建实现是由customTaskDiscover类型完成
func (td *taskDiscover) GetTaskGenerator(ctx context.Context, name string) (types.TaskGenerator, error) {
	tg, ok := td.builtin[name]
	if ok {
		return tg, nil
	}
	if td.customTaskDiscover != nil {
		var err error
		tg, err = td.customTaskDiscover.GetTaskGenerator(ctx, name)
		if err != nil {
			return nil, err
		}
		return tg, nil

	}
	return nil, errors.Errorf("can't find task generator: %s", name)
}

// 生成runner,makeTaskGenerator里面有居然的runner实现(单独讲解)
func (t *TaskLoader) GetTaskGenerator(ctx context.Context, name string) (types.TaskGenerator, error) {
	templ, err := t.loadTemplate(ctx, name)
	if err != nil {
		return nil, err
	}
	return t.makeTaskGenerator(templ)
}

// 上一步loadTemplate底层的实现如下,主要还是从k8s里面拿到实际的cue模版
func getDefinitionTemplate(ctx context.Context, cli client.Client, definitionName string) (string, error) {
	const (
		definitionAPIVersion       = "core.oam.dev/v1beta1"
		kindWorkflowStepDefinition = "WorkflowStepDefinition"
	)
	definition := &unstructured.Unstructured{}
	definition.SetAPIVersion(definitionAPIVersion)
	definition.SetKind(kindWorkflowStepDefinition)
	ns := getDefinitionNamespaceWithCtx(ctx)
	if err := cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: ns}, definition); err != nil {
		if apierrors.IsNotFound(err) {
			if err := cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: systemDefinitionNamespace}, definition); err != nil {
				return "", err
			}
		} else {
			return "", err
		}
	}
	d := new(def)
	if err := runtime.DefaultUnstructuredConverter.FromUnstructured(definition.Object, d); err != nil {
		return "", errors.Wrap(err, "invalid workflow step definition")
	}
	return d.Spec.Schematic.CUE.Template, nil
}


三、runner实现

func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error) {
	return func(wfStep v1alpha1.WorkflowStep, genOpt *types.TaskGeneratorOptions) (types.TaskRunner, error) {
    	// ...
    	// 获取cue模版的参数
		paramsStr, err := GetParameterTemplate(wfStep)
		if err != nil {
			return nil, err
		}

		tRunner := new(taskRunner)
		tRunner.name = wfStep.Name
		tRunner.checkPending = func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
			options := &types.TaskRunOptions{}
			if t.runOptionsProcess != nil {
				t.runOptionsProcess(options)
			}
			basicVal, _, _ := MakeBasicValue(ctx, wfCtx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx)
			return CheckPending(wfCtx, wfStep, exec.wfStatus.ID, stepStatus, basicVal)
		}
        // 这个run方法用于Executor的调用
		tRunner.run = func(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) {
			if options.GetTracer == nil {
				options.GetTracer = func(id string, step v1alpha1.WorkflowStep) monitorContext.Context {
					return monitorContext.NewTraceContext(context.Background(), "")
				}
			}
			tracer := options.GetTracer(exec.wfStatus.ID, wfStep).AddTag("step_name", wfStep.Name, "step_type", wfStep.Type)
			tracer.V(t.logLevel)
			defer func() {
				tracer.Commit(string(exec.status().Phase))
			}()

			if t.runOptionsProcess != nil {
				t.runOptionsProcess(options)
			}

			basicVal, basicTemplate, err := MakeBasicValue(tracer, ctx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx)
			if err != nil {
				tracer.Error(err, "make context parameter")
				return v1alpha1.StepStatus{}, nil, errors.WithMessage(err, "make context parameter")
			}

			var taskv *value.Value
        	// ...

			for _, hook := range options.PreCheckHooks {
				result, err := hook(wfStep, &types.PreCheckOptions{
					PackageDiscover: t.pd,
					BasicTemplate:   basicTemplate,
					BasicValue:      basicVal,
				})
				if err != nil {
					tracer.Error(err, "do preCheckHook")
					exec.Skip(fmt.Sprintf("pre check error: %s", err.Error()))
					return exec.status(), exec.operation(), nil
				}
				if result.Skip {
					exec.Skip("")
					return exec.status(), exec.operation(), nil
				}
				if result.Timeout {
					exec.timeout("")
				}
			}

			for _, hook := range options.PreStartHooks {
				if err := hook(ctx, basicVal, wfStep); err != nil {
					tracer.Error(err, "do preStartHook")
					exec.err(ctx, false, err, types.StatusReasonInput)
					return exec.status(), exec.operation(), nil
				}
			}

			// refresh the basic template to get inputs value involved
			basicTemplate, err = basicVal.String()
			if err != nil {
				exec.err(ctx, false, err, types.StatusReasonParameter)
				return exec.status(), exec.operation(), nil
			}

			if status, ok := options.StepStatus[wfStep.Name]; ok {
				exec.stepStatus = status
			}
			taskv, err = value.NewValue(strings.Join([]string{templ, basicTemplate}, "\n"), t.pd, "", value.ProcessScript, value.TagFieldOrder)
			if err != nil {
				exec.err(ctx, false, err, types.StatusReasonRendering)
				return exec.status(), exec.operation(), nil
			}

			exec.tracer = tracer
			if debugLog(taskv) {
				exec.printStep("workflowStepStart", "workflow", "", taskv)
				defer exec.printStep("workflowStepEnd", "workflow", "", taskv)
			}
            // runner会调用doSteps方法
			if err := exec.doSteps(tracer, ctx, taskv); err != nil {
				tracer.Error(err, "do steps")
				exec.err(ctx, true, err, types.StatusReasonExecute)
				return exec.status(), exec.operation(), nil
			}

			return exec.status(), exec.operation(), nil
		}
		return tRunner, nil
	}, nil
}


// doSteps就是从provider注册的方法中获取,然后调用
func (exec *executor) doSteps(ctx monitorContext.Context, wfCtx wfContext.Context, v *value.Value) error {
	do := OpTpy(v)
	if do != "" && do != "steps" {
		provider := opProvider(v)
		if err := exec.Handle(ctx, wfCtx, provider, do, v); err != nil {
			return errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do)
		}
		return nil
	}
	return v.StepByFields(func(fieldName string, in *value.Value) (bool, error) {
		if in.CueValue().IncompleteKind() == cue.BottomKind {
			errInfo, err := sets.ToString(in.CueValue())
			if err != nil {
				errInfo = "value is _|_"
			}
			return true, errors.New(errInfo + "(bottom kind)")
		}
		if retErr := in.CueValue().Err(); retErr != nil {
			errInfo, err := sets.ToString(in.CueValue())
			if err == nil {
				retErr = errors.WithMessage(retErr, errInfo)
			}
			return false, retErr
		}

		if isStepList(fieldName) {
			return false, in.StepByList(func(name string, item *value.Value) (bool, error) {
				do := OpTpy(item)
				if do == "" {
					return false, nil
				}
				return false, exec.doSteps(ctx, wfCtx, item)
			})
		}
		do := OpTpy(in)
		if do == "" {
			return false, nil
		}
		if do == "steps" {
			if err := exec.doSteps(ctx, wfCtx, in); err != nil {
				return false, err
			}
		} else {
			provider := opProvider(in)
			if err := exec.Handle(ctx, wfCtx, provider, do, in); err != nil {
				return false, errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do)
			}
		}

		if exec.suspend || exec.terminated || exec.wait {
			return true, nil
		}
		return false, nil
	})
}


四、runner执行

// 生成一个runengine
func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunners []types.TaskRunner) (v1alpha1.WorkflowRunPhase, error) {
	InitializeWorkflowInstance(w.instance)
	status := &w.instance.Status
	dagMode := status.Mode.Steps == v1alpha1.WorkflowModeDAG
	cacheKey := fmt.Sprintf("%s-%s", w.instance.Name, w.instance.Namespace)

	allRunnersDone, allRunnersSucceeded := checkRunners(taskRunners, w.instance.Status)
	if status.Finished {
		StepStatusCache.Delete(cacheKey)
	}
	if checkWorkflowTerminated(status, allRunnersDone) {
		if isTerminatedManually(status) {
			return v1alpha1.WorkflowStateTerminated, nil
		}
		return v1alpha1.WorkflowStateFailed, nil
	}
	if checkWorkflowSuspended(status) {
		return v1alpha1.WorkflowStateSuspending, nil
	}
	if allRunnersSucceeded {
		return v1alpha1.WorkflowStateSucceeded, nil
	}

	wfCtx, err := w.makeContext(ctx, w.instance.Name)
	if err != nil {
		ctx.Error(err, "make context")
		return v1alpha1.WorkflowStateExecuting, err
	}
	w.wfCtx = wfCtx

	if cacheValue, ok := StepStatusCache.Load(cacheKey); ok {
    if len(status.Steps) < cacheValue.(int) {
			return v1alpha1.WorkflowStateSkipped, nil
		}
	}

	e := newEngine(ctx, wfCtx, w, status, taskRunners)

	err = e.Run(ctx, taskRunners, dagMode)  
	if err != nil {
		ctx.Error(err, "run steps")
		StepStatusCache.Store(cacheKey, len(status.Steps))
		return v1alpha1.WorkflowStateExecuting, err
	}

	StepStatusCache.Store(cacheKey, len(status.Steps))
	if feature.DefaultMutableFeatureGate.Enabled(features.EnablePatchStatusAtOnce) {
		return e.status.Phase, nil
	}
	return e.checkWorkflowPhase(), nil
}

// 调用每一个runner的run方法
func (e *engine) steps(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {
	wfCtx := e.wfCtx
	for index, runner := range taskRunners {
    	//...
		options := e.generateRunOptions(ctx, e.findDependPhase(taskRunners, index, dag))

		status, operation, err := runner.Run(wfCtx, options)
		if err != nil {
			return err
		}
        //...
	}
	return nil
}


五、Provider注册的方案示例

func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentApply {
	return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {

        // 生成k8s对象
        wl, manifest, err := h.prepareWorkloadAndManifests(ctx, appParser, comp, appRev, patcher, af)
		if err != nil {
			return nil, nil, false, err
		}
		if len(manifest.PackagedWorkloadResources) != 0 {
			if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, manifest.PackagedWorkloadResources...); err != nil {
				return nil, nil, false, errors.WithMessage(err, "cannot dispatch packaged workload resources")
			}
		}
		wl.Ctx.SetCtx(auth.ContextWithUserInfo(ctx, h.app))

		readyWorkload, readyTraits, err := renderComponentsAndTraits(h.r.Client, manifest, appRev, clusterName, overrideNamespace, env)
		if err != nil {
			return nil, nil, false, err
		}
		checkSkipApplyWorkload(wl)

		isHealth := true
		if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
			manifestDispatchers, err := h.generateDispatcher(appRev, readyWorkload, readyTraits, overrideNamespace)
			if err != nil {
				return nil, nil, false, errors.WithMessage(err, "generateDispatcher")
			}

			for _, dispatcher := range manifestDispatchers {
				if isHealth, err := dispatcher.run(ctx, wl, appRev, clusterName); !isHealth || err != nil {
					return nil, nil, false, err
				}
			}
		} else {
			dispatchResources := readyTraits
			if !wl.SkipApplyWorkload {
				dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...)
			}

			if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil {
				return nil, nil, false, errors.WithMessage(err, "Dispatch")
			}
			_, _, _, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false)
			if err != nil {
				return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus")
			}
		}

		if DisableResourceApplyDoubleCheck {
			return readyWorkload, readyTraits, isHealth, nil
		}
		workload, traits, err := getComponentResources(auth.ContextWithUserInfo(ctx, h.app), manifest, wl.SkipApplyWorkload, h.r.Client)
		return workload, traits, isHealth, err
	}
}


func (af *Appfile) GenerateComponentManifest(wl *Workload, mutate func(*velaprocess.ContextData)) (*types.ComponentManifest, error) {
	if af.Namespace == "" {
		af.Namespace = corev1.NamespaceDefault
	}
    // 注入公共的context key,用于模版参数
	ctxData := GenerateContextDataFromAppFile(af, wl.Name)
	if mutate != nil {
		mutate(&ctxData)
	}
	wl.Ctx = NewBasicContext(ctxData, wl.Params)
	switch wl.CapabilityCategory {
	case types.HelmCategory:
		return generateComponentFromHelmModule(wl, ctxData)
	case types.KubeCategory:
		return generateComponentFromKubeModule(wl, ctxData)
	case types.TerraformCategory:
		return generateComponentFromTerraformModule(wl, af.Name, af.Namespace)
	default:
		return generateComponentFromCUEModule(wl, ctxData)
	}
}

// generateComponentFromCUEModule的实现
func baseGenerateComponent(pCtx process.Context, wl *Workload, appName, ns string) (*types.ComponentManifest, error) {
	var err error
	pCtx.PushData(velaprocess.ContextComponentType, wl.Type)
	for _, tr := range wl.Traits {
		if err := tr.EvalContext(pCtx); err != nil {  // cue的渲染模版,调用engine的Complete方法
			return nil, errors.Wrapf(err, "evaluate template trait=%s app=%s", tr.Name, wl.Name)
		}
	}
	if patcher := wl.Patch; patcher != nil {
		workload, auxiliaries := pCtx.Output()
		if p, err := patcher.LookupValue("workload"); err == nil {
			if err := workload.Unify(p.CueValue()); err != nil {
				return nil, errors.WithMessage(err, "patch workload")
			}
		}
		for _, aux := range auxiliaries {
			if p, err := patcher.LookupByScript(fmt.Sprintf("traits[\"%s\"]", aux.Name)); err == nil && p.CueValue().Err() == nil {
				if err := aux.Ins.Unify(p.CueValue()); err != nil {
					return nil, errors.WithMessagef(err, "patch outputs.%s", aux.Name)
				}
			}
		}
	}
	compManifest, err := evalWorkloadWithContext(pCtx, wl, ns, appName)
	if err != nil {
		return nil, err
	}
	compManifest.Name = wl.Name
	// we record the external revision name in ExternalRevision field
	compManifest.ExternalRevision = wl.ExternalRevision

	compManifest.Scopes = make([]*corev1.ObjectReference, len(wl.Scopes))
	for i, s := range wl.Scopes {
		compManifest.Scopes[i] = &corev1.ObjectReference{
			APIVersion: metav1.GroupVersion{
				Group:   s.GVK.Group,
				Version: s.GVK.Version,
			}.String(),
			Kind: s.GVK.Kind,
			Name: s.Name,
		}
	}
	return compManifest, nil
}







相关文章

PG查询性能Top SQL

一、查询当前正在运行的Top SQL    查询当前正在运行的会话中耗时最长的Top SQL,where条件可按需修改SELECT pgsa.datname AS database_name    ...

Kubernetes安全--基于sa和user的rbac认证机制

前言Kubernetes中的用户K8S中有两种用户(User)——服务账号(ServiceAccount)和普通意义上的用户(User)ServiceAccount是由K8S管理的,而User通常是在...

grafana版本升级

grafana版本升级

      因Grafana需接入腾讯云监控数据,安装腾讯云监控插件。腾讯云监控应用插件需运行在 Grafana ≥ 7.3且 < 8.0 的版本上。当前使用版本为...

数仓主流架构简介之三

数仓主流架构简介之三

一、数仓架构经历过程随着数据量的暴增和数据实时性要求越来越高,以及大数据技术的发展驱动企业不断升级迭代,数据仓库架构方面也在不断演进,分别经历了以下过程:早期经典数仓架构 > 离线大数据架构 &...

静默安装oracle11g单实例

环境: CentOS 7.8 11.2.0.4.0 orclp:172.16.104.31一、准备1、依赖包检查pdksh 在 redhat 上叫 ksh检查是否有安装root# rpm -q bin...

ChaosBlade介绍

ChaosBlade介绍

ChaosBlade 是阿里巴巴开源的一款遵循混沌工程原理和混沌实验模型的实验注入工具,帮助企业提升分布式系统的容错能力,并且在企业上云或往云原生系统迁移过程中业务连续性保障。Chaosblade 是...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。