Kubevela源码解读(一):application_controller解读
application_controller是kubevela最主要的一个控制器,作用就是将用户创建的applicaion转化为实际需要创建的资源对象,通过本文可以:
1、了解cue模版在kubevela中的运用,context key的由来
2、workflowstep如何关联注册的provider
3、了解application创建后,集群如何生成资源对象
基本流程如下
1、用户往集群提交application资源
2、控制器监听到资源创建,将资源对象转化为appfile对象
3、根据appfile对象,生成应用工作流程步骤WorkflowSteps
4、根据上一步生成的WorkflowSteps,生成runner,每个WorkflowSteps对应一个runner
5、循环的执行runner,直到结束。至此application_controller的工作就完成了
一、生成appfile
// 传人application, 初始化appfile
func (p *Parser) GenerateAppFileFromApp(ctx context.Context, app *v1beta1.Application) (*Appfile, error) {
ns := app.Namespace
appName := app.Name
appfile := p.newAppfile(appName, ns, app) // 初始化一个 appFile 结构体,内部主要是一些初始化操作
if app.Status.LatestRevision != nil {
appfile.AppRevisionName = app.Status.LatestRevision.Name
}
var wds []*Workload
for _, comp := range app.Spec.Components { // 将application的Components转化为workload,注入appfile
wd, err := p.parseWorkload(ctx, comp)
if err != nil {
return nil, err
}
wds = append(wds, wd)
}
appfile.Workloads = wds
appfile.Components = app.Spec.Components
var err error
if err = p.parseWorkflowSteps(ctx, appfile); err != nil { // 生成默认的WorkflowSteps,注入appfile
return nil, errors.Wrapf(err, "failed to parseWorkflowSteps")
}
if err = p.parsePolicies(ctx, appfile); err != nil {
return nil, errors.Wrapf(err, "failed to parsePolicies")
}
if err = p.parseReferredObjects(ctx, appfile); err != nil {
return nil, errors.Wrapf(err, "failed to parseReferredObjects")
}
...
return appfile, nil
}
// 生成workload,内部模版的解析由engine实现Complete
func (p *Parser) convertTemplate2Workload(name, typ string, props *runtime.RawExtension, templ *Template) (*Workload, error) {
settings, err := util.RawExtension2Map(props)
if err != nil {
return nil, errors.WithMessagef(err, "fail to parse settings for %s", name)
}
wlType, err := util.ConvertDefinitionRevName(typ)
if err != nil {
wlType = typ
}
return &Workload{
Traits: []*Trait{},
ScopeDefinition: []*v1beta1.ScopeDefinition{},
Name: name,
Type: wlType,
CapabilityCategory: templ.CapabilityCategory,
FullTemplate: templ,
Params: settings,
engine: definition.NewWorkloadAbstractEngine(name, p.pd),
}, nil
}
// WorkflowSteps生成逻辑
func (p *Parser) loadWorkflowToAppfile(ctx context.Context, af *Appfile) error {
var err error
// parse workflow steps
af.WorkflowMode = &workflowv1alpha1.WorkflowExecuteMode{
Steps: workflowv1alpha1.WorkflowModeDAG,
SubSteps: workflowv1alpha1.WorkflowModeDAG,
}
// 如果用户手动指定了 Workflow 这里就解析一下
if wfSpec := af.app.Spec.Workflow; wfSpec != nil {
app := af.app
mode := wfSpec.Mode
// 根据 ref 中填的 name 找到对应 Workflow
if wfSpec.Ref != "" && mode == nil {
wf := &workflowv1alpha1.Workflow{}
if err := af.WorkflowClient(p.client).Get(ctx, ktypes.NamespacedName{Namespace: af.app.Namespace, Name: app.Spec.Workflow.Ref}, wf); err != nil {
return err
}
mode = wf.Mode
}
af.WorkflowSteps = wfSpec.Steps
af.WorkflowMode.Steps = workflowv1alpha1.WorkflowModeStep
if mode != nil {
if mode.Steps != "" {
af.WorkflowMode.Steps = mode.Steps
}
if mode.SubSteps != "" {
af.WorkflowMode.SubSteps = mode.SubSteps
}
}
}
// 然后开始生成 Workflow ,chain 形式,按顺序执行多个 generator,直到其中某一个步骤生成 Workflow 为止
af.WorkflowSteps, err = step.NewChainWorkflowStepGenerator(
// KubeVela 中支持引用集群中已经创建好的 Workflow,这里则是在处理这部分逻辑,根据 ref 字段名找到对应的 Workflow
&step.RefWorkflowStepGenerator{Client: af.WorkflowClient(p.client), Context: ctx},
// 根据 topology 类型的 policy 来生成 Workflow
&step.DeployWorkflowStepGenerator{},
// 部署到指定环境
&step.Deploy2EnvWorkflowStepGenerator{},
// 如果前面几个步骤都没有成功生成才会执行这部分逻辑,生成一个 apply-component 类型的 WorkflowStep,直接将组件部署到 local 集群的 default 命名空间。
&step.ApplyComponentWorkflowStepGenerator{},
// 在所有部署步骤之前生成挂起工作流步骤
&step.DeployPreApproveWorkflowStepGenerator{},
).Generate(af.app, af.WorkflowSteps)
return err
}二、根据appfile生成runnner
func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,
app *v1beta1.Application,
appParser *appfile.Parser,
af *appfile.Appfile) (*wfTypes.WorkflowInstance, []wfTypes.TaskRunner, error) {
appRev := h.currentAppRev
t := time.Now()
defer func() {
metrics.AppReconcileStageDurationHistogram.WithLabelValues("generate-app-steps").Observe(time.Since(t).Seconds())
}()
appLabels := map[string]string{
oam.LabelAppName: app.Name,
oam.LabelAppNamespace: app.Namespace,
}
// Install 用于注册
handlerProviders := providers.NewProviders()
kube.Install(handlerProviders, h.r.Client, appLabels, &kube.Handlers{
Apply: h.Dispatch,
Delete: h.Delete,
})
configprovider.Install(handlerProviders, h.r.Client, func(ctx context.Context, resources []*unstructured.Unstructured, applyOptions []apply.ApplyOption) error {
for _, res := range resources {
res.SetLabels(util.MergeMapOverrideWithDst(res.GetLabels(), appLabels))
}
return h.resourceKeeper.Dispatch(ctx, resources, applyOptions)
})
oamProvider.Install(handlerProviders, app, af, h.r.Client, h.applyComponentFunc(
appParser, appRev, af), h.renderComponentFunc(appParser, appRev, af))
pCtx := velaprocess.NewContext(generateContextDataFromApp(app, appRev.Name))
renderer := func(ctx context.Context, comp common.ApplicationComponent) (*appfile.Workload, error) {
return appParser.ParseWorkloadFromRevisionAndClient(ctx, comp, appRev)
}
multiclusterProvider.Install(handlerProviders, h.r.Client, app, af,
h.applyComponentFunc(appParser, appRev, af),
h.checkComponentHealth(appParser, appRev, af),
renderer)
terraformProvider.Install(handlerProviders, app, renderer)
query.Install(handlerProviders, h.r.Client, nil)
// 为application生成一个instance
instance := generateWorkflowInstance(af, app)
executor.InitializeWorkflowInstance(instance)
// 用于后面执行runner
runners, err := generator.GenerateRunners(ctx, instance, wfTypes.StepGeneratorOptions{
Providers: handlerProviders,
PackageDiscover: h.r.pd,
ProcessCtx: pCtx,
TemplateLoader: template.NewWorkflowStepTemplateRevisionLoader(appRev, h.r.dm),
Client: h.r.Client,
StepConvertor: map[string]func(step workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error){
wfTypes.WorkflowStepTypeApplyComponent: func(lstep workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error) {
copierStep := lstep.DeepCopy()
if err := convertStepProperties(copierStep, app); err != nil {
return lstep, errors.WithMessage(err, "convert [apply-component]")
}
copierStep.Type = wfTypes.WorkflowStepTypeBuiltinApplyComponent
return *copierStep, nil
},
},
})
if err != nil {
return nil, nil, err
}
return instance, runners, nil
}
// 生成runner其实就是根据WorkflowSteps,调用generateTaskRunner方法
func GenerateRunners(ctx monitorContext.Context, instance *types.WorkflowInstance, options types.StepGeneratorOptions) ([]types.TaskRunner, error) {
ctx.V(options.LogLevel)
subCtx := ctx.Fork("generate-task-runners", monitorContext.DurationMetric(func(v float64) {
metrics.GenerateTaskRunnersDurationHistogram.WithLabelValues("workflowrun").Observe(v)
}))
defer subCtx.Commit("finish generate task runners")
options = initStepGeneratorOptions(ctx, instance, options)
taskDiscover := tasks.NewTaskDiscover(ctx, options)
var tasks []types.TaskRunner
for _, step := range instance.Steps {
opt := &types.TaskGeneratorOptions{
ID: generateStepID(instance.Status, step.Name),
PackageDiscover: options.PackageDiscover,
ProcessContext: options.ProcessCtx,
}
for typ, convertor := range options.StepConvertor {
if step.Type == typ {
opt.StepConvertor = convertor
}
}
task, err := generateTaskRunner(ctx, instance, step, taskDiscover, opt, options)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}
return tasks, nil
}
// runner的创建实现是由customTaskDiscover类型完成
func (td *taskDiscover) GetTaskGenerator(ctx context.Context, name string) (types.TaskGenerator, error) {
tg, ok := td.builtin[name]
if ok {
return tg, nil
}
if td.customTaskDiscover != nil {
var err error
tg, err = td.customTaskDiscover.GetTaskGenerator(ctx, name)
if err != nil {
return nil, err
}
return tg, nil
}
return nil, errors.Errorf("can't find task generator: %s", name)
}
// 生成runner,makeTaskGenerator里面有居然的runner实现(单独讲解)
func (t *TaskLoader) GetTaskGenerator(ctx context.Context, name string) (types.TaskGenerator, error) {
templ, err := t.loadTemplate(ctx, name)
if err != nil {
return nil, err
}
return t.makeTaskGenerator(templ)
}
// 上一步loadTemplate底层的实现如下,主要还是从k8s里面拿到实际的cue模版
func getDefinitionTemplate(ctx context.Context, cli client.Client, definitionName string) (string, error) {
const (
definitionAPIVersion = "core.oam.dev/v1beta1"
kindWorkflowStepDefinition = "WorkflowStepDefinition"
)
definition := &unstructured.Unstructured{}
definition.SetAPIVersion(definitionAPIVersion)
definition.SetKind(kindWorkflowStepDefinition)
ns := getDefinitionNamespaceWithCtx(ctx)
if err := cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: ns}, definition); err != nil {
if apierrors.IsNotFound(err) {
if err := cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: systemDefinitionNamespace}, definition); err != nil {
return "", err
}
} else {
return "", err
}
}
d := new(def)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(definition.Object, d); err != nil {
return "", errors.Wrap(err, "invalid workflow step definition")
}
return d.Spec.Schematic.CUE.Template, nil
}三、runner实现
func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error) {
return func(wfStep v1alpha1.WorkflowStep, genOpt *types.TaskGeneratorOptions) (types.TaskRunner, error) {
// ...
// 获取cue模版的参数
paramsStr, err := GetParameterTemplate(wfStep)
if err != nil {
return nil, err
}
tRunner := new(taskRunner)
tRunner.name = wfStep.Name
tRunner.checkPending = func(ctx monitorContext.Context, wfCtx wfContext.Context, stepStatus map[string]v1alpha1.StepStatus) (bool, v1alpha1.StepStatus) {
options := &types.TaskRunOptions{}
if t.runOptionsProcess != nil {
t.runOptionsProcess(options)
}
basicVal, _, _ := MakeBasicValue(ctx, wfCtx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx)
return CheckPending(wfCtx, wfStep, exec.wfStatus.ID, stepStatus, basicVal)
}
// 这个run方法用于Executor的调用
tRunner.run = func(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) {
if options.GetTracer == nil {
options.GetTracer = func(id string, step v1alpha1.WorkflowStep) monitorContext.Context {
return monitorContext.NewTraceContext(context.Background(), "")
}
}
tracer := options.GetTracer(exec.wfStatus.ID, wfStep).AddTag("step_name", wfStep.Name, "step_type", wfStep.Type)
tracer.V(t.logLevel)
defer func() {
tracer.Commit(string(exec.status().Phase))
}()
if t.runOptionsProcess != nil {
t.runOptionsProcess(options)
}
basicVal, basicTemplate, err := MakeBasicValue(tracer, ctx, t.pd, wfStep.Name, exec.wfStatus.ID, paramsStr, options.PCtx)
if err != nil {
tracer.Error(err, "make context parameter")
return v1alpha1.StepStatus{}, nil, errors.WithMessage(err, "make context parameter")
}
var taskv *value.Value
// ...
for _, hook := range options.PreCheckHooks {
result, err := hook(wfStep, &types.PreCheckOptions{
PackageDiscover: t.pd,
BasicTemplate: basicTemplate,
BasicValue: basicVal,
})
if err != nil {
tracer.Error(err, "do preCheckHook")
exec.Skip(fmt.Sprintf("pre check error: %s", err.Error()))
return exec.status(), exec.operation(), nil
}
if result.Skip {
exec.Skip("")
return exec.status(), exec.operation(), nil
}
if result.Timeout {
exec.timeout("")
}
}
for _, hook := range options.PreStartHooks {
if err := hook(ctx, basicVal, wfStep); err != nil {
tracer.Error(err, "do preStartHook")
exec.err(ctx, false, err, types.StatusReasonInput)
return exec.status(), exec.operation(), nil
}
}
// refresh the basic template to get inputs value involved
basicTemplate, err = basicVal.String()
if err != nil {
exec.err(ctx, false, err, types.StatusReasonParameter)
return exec.status(), exec.operation(), nil
}
if status, ok := options.StepStatus[wfStep.Name]; ok {
exec.stepStatus = status
}
taskv, err = value.NewValue(strings.Join([]string{templ, basicTemplate}, "\n"), t.pd, "", value.ProcessScript, value.TagFieldOrder)
if err != nil {
exec.err(ctx, false, err, types.StatusReasonRendering)
return exec.status(), exec.operation(), nil
}
exec.tracer = tracer
if debugLog(taskv) {
exec.printStep("workflowStepStart", "workflow", "", taskv)
defer exec.printStep("workflowStepEnd", "workflow", "", taskv)
}
// runner会调用doSteps方法
if err := exec.doSteps(tracer, ctx, taskv); err != nil {
tracer.Error(err, "do steps")
exec.err(ctx, true, err, types.StatusReasonExecute)
return exec.status(), exec.operation(), nil
}
return exec.status(), exec.operation(), nil
}
return tRunner, nil
}, nil
}
// doSteps就是从provider注册的方法中获取,然后调用
func (exec *executor) doSteps(ctx monitorContext.Context, wfCtx wfContext.Context, v *value.Value) error {
do := OpTpy(v)
if do != "" && do != "steps" {
provider := opProvider(v)
if err := exec.Handle(ctx, wfCtx, provider, do, v); err != nil {
return errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do)
}
return nil
}
return v.StepByFields(func(fieldName string, in *value.Value) (bool, error) {
if in.CueValue().IncompleteKind() == cue.BottomKind {
errInfo, err := sets.ToString(in.CueValue())
if err != nil {
errInfo = "value is _|_"
}
return true, errors.New(errInfo + "(bottom kind)")
}
if retErr := in.CueValue().Err(); retErr != nil {
errInfo, err := sets.ToString(in.CueValue())
if err == nil {
retErr = errors.WithMessage(retErr, errInfo)
}
return false, retErr
}
if isStepList(fieldName) {
return false, in.StepByList(func(name string, item *value.Value) (bool, error) {
do := OpTpy(item)
if do == "" {
return false, nil
}
return false, exec.doSteps(ctx, wfCtx, item)
})
}
do := OpTpy(in)
if do == "" {
return false, nil
}
if do == "steps" {
if err := exec.doSteps(ctx, wfCtx, in); err != nil {
return false, err
}
} else {
provider := opProvider(in)
if err := exec.Handle(ctx, wfCtx, provider, do, in); err != nil {
return false, errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do)
}
}
if exec.suspend || exec.terminated || exec.wait {
return true, nil
}
return false, nil
})
}四、runner执行
// 生成一个runengine
func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunners []types.TaskRunner) (v1alpha1.WorkflowRunPhase, error) {
InitializeWorkflowInstance(w.instance)
status := &w.instance.Status
dagMode := status.Mode.Steps == v1alpha1.WorkflowModeDAG
cacheKey := fmt.Sprintf("%s-%s", w.instance.Name, w.instance.Namespace)
allRunnersDone, allRunnersSucceeded := checkRunners(taskRunners, w.instance.Status)
if status.Finished {
StepStatusCache.Delete(cacheKey)
}
if checkWorkflowTerminated(status, allRunnersDone) {
if isTerminatedManually(status) {
return v1alpha1.WorkflowStateTerminated, nil
}
return v1alpha1.WorkflowStateFailed, nil
}
if checkWorkflowSuspended(status) {
return v1alpha1.WorkflowStateSuspending, nil
}
if allRunnersSucceeded {
return v1alpha1.WorkflowStateSucceeded, nil
}
wfCtx, err := w.makeContext(ctx, w.instance.Name)
if err != nil {
ctx.Error(err, "make context")
return v1alpha1.WorkflowStateExecuting, err
}
w.wfCtx = wfCtx
if cacheValue, ok := StepStatusCache.Load(cacheKey); ok {
if len(status.Steps) < cacheValue.(int) {
return v1alpha1.WorkflowStateSkipped, nil
}
}
e := newEngine(ctx, wfCtx, w, status, taskRunners)
err = e.Run(ctx, taskRunners, dagMode)
if err != nil {
ctx.Error(err, "run steps")
StepStatusCache.Store(cacheKey, len(status.Steps))
return v1alpha1.WorkflowStateExecuting, err
}
StepStatusCache.Store(cacheKey, len(status.Steps))
if feature.DefaultMutableFeatureGate.Enabled(features.EnablePatchStatusAtOnce) {
return e.status.Phase, nil
}
return e.checkWorkflowPhase(), nil
}
// 调用每一个runner的run方法
func (e *engine) steps(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {
wfCtx := e.wfCtx
for index, runner := range taskRunners {
//...
options := e.generateRunOptions(ctx, e.findDependPhase(taskRunners, index, dag))
status, operation, err := runner.Run(wfCtx, options)
if err != nil {
return err
}
//...
}
return nil
}五、Provider注册的方案示例
func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentApply {
return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string, env string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {
// 生成k8s对象
wl, manifest, err := h.prepareWorkloadAndManifests(ctx, appParser, comp, appRev, patcher, af)
if err != nil {
return nil, nil, false, err
}
if len(manifest.PackagedWorkloadResources) != 0 {
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, manifest.PackagedWorkloadResources...); err != nil {
return nil, nil, false, errors.WithMessage(err, "cannot dispatch packaged workload resources")
}
}
wl.Ctx.SetCtx(auth.ContextWithUserInfo(ctx, h.app))
readyWorkload, readyTraits, err := renderComponentsAndTraits(h.r.Client, manifest, appRev, clusterName, overrideNamespace, env)
if err != nil {
return nil, nil, false, err
}
checkSkipApplyWorkload(wl)
isHealth := true
if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
manifestDispatchers, err := h.generateDispatcher(appRev, readyWorkload, readyTraits, overrideNamespace)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "generateDispatcher")
}
for _, dispatcher := range manifestDispatchers {
if isHealth, err := dispatcher.run(ctx, wl, appRev, clusterName); !isHealth || err != nil {
return nil, nil, false, err
}
}
} else {
dispatchResources := readyTraits
if !wl.SkipApplyWorkload {
dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...)
}
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil {
return nil, nil, false, errors.WithMessage(err, "Dispatch")
}
_, _, _, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus")
}
}
if DisableResourceApplyDoubleCheck {
return readyWorkload, readyTraits, isHealth, nil
}
workload, traits, err := getComponentResources(auth.ContextWithUserInfo(ctx, h.app), manifest, wl.SkipApplyWorkload, h.r.Client)
return workload, traits, isHealth, err
}
}
func (af *Appfile) GenerateComponentManifest(wl *Workload, mutate func(*velaprocess.ContextData)) (*types.ComponentManifest, error) {
if af.Namespace == "" {
af.Namespace = corev1.NamespaceDefault
}
// 注入公共的context key,用于模版参数
ctxData := GenerateContextDataFromAppFile(af, wl.Name)
if mutate != nil {
mutate(&ctxData)
}
wl.Ctx = NewBasicContext(ctxData, wl.Params)
switch wl.CapabilityCategory {
case types.HelmCategory:
return generateComponentFromHelmModule(wl, ctxData)
case types.KubeCategory:
return generateComponentFromKubeModule(wl, ctxData)
case types.TerraformCategory:
return generateComponentFromTerraformModule(wl, af.Name, af.Namespace)
default:
return generateComponentFromCUEModule(wl, ctxData)
}
}
// generateComponentFromCUEModule的实现
func baseGenerateComponent(pCtx process.Context, wl *Workload, appName, ns string) (*types.ComponentManifest, error) {
var err error
pCtx.PushData(velaprocess.ContextComponentType, wl.Type)
for _, tr := range wl.Traits {
if err := tr.EvalContext(pCtx); err != nil { // cue的渲染模版,调用engine的Complete方法
return nil, errors.Wrapf(err, "evaluate template trait=%s app=%s", tr.Name, wl.Name)
}
}
if patcher := wl.Patch; patcher != nil {
workload, auxiliaries := pCtx.Output()
if p, err := patcher.LookupValue("workload"); err == nil {
if err := workload.Unify(p.CueValue()); err != nil {
return nil, errors.WithMessage(err, "patch workload")
}
}
for _, aux := range auxiliaries {
if p, err := patcher.LookupByScript(fmt.Sprintf("traits[\"%s\"]", aux.Name)); err == nil && p.CueValue().Err() == nil {
if err := aux.Ins.Unify(p.CueValue()); err != nil {
return nil, errors.WithMessagef(err, "patch outputs.%s", aux.Name)
}
}
}
}
compManifest, err := evalWorkloadWithContext(pCtx, wl, ns, appName)
if err != nil {
return nil, err
}
compManifest.Name = wl.Name
// we record the external revision name in ExternalRevision field
compManifest.ExternalRevision = wl.ExternalRevision
compManifest.Scopes = make([]*corev1.ObjectReference, len(wl.Scopes))
for i, s := range wl.Scopes {
compManifest.Scopes[i] = &corev1.ObjectReference{
APIVersion: metav1.GroupVersion{
Group: s.GVK.Group,
Version: s.GVK.Version,
}.String(),
Kind: s.GVK.Kind,
Name: s.Name,
}
}
return compManifest, nil
}



