From 34204c5f3a0ed64477fc7433b6e07c17521fb4c5 Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Tue, 31 Dec 2024 10:42:15 +0800 Subject: [PATCH] fix: task query optimize (#21904) Co-authored-by: Qiu Jian --- pkg/apis/tasks.go | 5 +- pkg/cloudcommon/db/taskman/subtasks.go | 1 + pkg/cloudcommon/db/taskman/taskobjs.go | 11 ++- pkg/cloudcommon/db/taskman/tasks.go | 108 ++++++++++++++++--------- 4 files changed, 86 insertions(+), 39 deletions(-) diff --git a/pkg/apis/tasks.go b/pkg/apis/tasks.go index b4cf7f96b9b..402e6002c33 100644 --- a/pkg/apis/tasks.go +++ b/pkg/apis/tasks.go @@ -33,9 +33,12 @@ type TaskListInput struct { IsMulti *bool `json:"is_multi" negative:"is_single" help:"is multi task"` IsComplete *bool `json:"is_complete" negative:"not_complete" help:"is task completed, either fail or complete"` IsInit *bool `json:"is_init" negative:"not_init" help:"is task started?"` - Stage []string `json:"stage" help:"task stage"` + Stage []string `json:"stage" help:"tasks in stages"` + NotStage []string `json:"not_stage" help:"tasks not in stages"` ParentId []string `json:"parent_id" help:"filter tasks by parent_task_id"` IsRoot *bool `json:"is_root" help:"filter root tasks"` + + SubTask *bool `json:"sub_task" help:"show sub task states"` } type TaskDetails struct { diff --git a/pkg/cloudcommon/db/taskman/subtasks.go b/pkg/cloudcommon/db/taskman/subtasks.go index f3e2eecc8a3..a7bf24e3fdd 100644 --- a/pkg/cloudcommon/db/taskman/subtasks.go +++ b/pkg/cloudcommon/db/taskman/subtasks.go @@ -44,6 +44,7 @@ func init() { "subtasks", )} SubTaskManager.SetVirtualObject(SubTaskManager) + SubTaskManager.TableSpec().AddIndex(true, "task_id", "stage", "subtask_id", "status") } type SSubTask struct { diff --git a/pkg/cloudcommon/db/taskman/taskobjs.go b/pkg/cloudcommon/db/taskman/taskobjs.go index 254f2805476..ae94e4a9fb0 100644 --- a/pkg/cloudcommon/db/taskman/taskobjs.go +++ b/pkg/cloudcommon/db/taskman/taskobjs.go @@ -38,7 +38,16 @@ type STaskObjectManager struct { var TaskObjectManager *STaskObjectManager func init() { - TaskObjectManager = &STaskObjectManager{SModelBaseManager: db.NewModelBaseManager(STaskObject{}, "taskobjects_tbl", "taskobject", "taskobjects")} + TaskObjectManager = &STaskObjectManager{ + SModelBaseManager: db.NewModelBaseManager( + STaskObject{}, + "taskobjects_tbl", + "taskobject", + "taskobjects", + ), + } + TaskObjectManager.SetVirtualObject(TaskObjectManager) + TaskObjectManager.TableSpec().AddIndex(true, "task_id", "obj_id", "tenant_id", "domain_id") } type STaskObject struct { diff --git a/pkg/cloudcommon/db/taskman/tasks.go b/pkg/cloudcommon/db/taskman/tasks.go index 80d381f45f9..6f7b35bab3f 100644 --- a/pkg/cloudcommon/db/taskman/tasks.go +++ b/pkg/cloudcommon/db/taskman/tasks.go @@ -94,6 +94,7 @@ func init() { userCredWidthLimit, _ = strconv.Atoi(widthStr) } } + TaskManager.TableSpec().AddIndex(true, "id", "created_at", "tenant_id", "domain_id", "parent_task_id", "obj_id", "stage") } type STask struct { @@ -118,13 +119,13 @@ type STask struct { ObjType string `old_name:"obj_name" json:"obj_type" width:"128" charset:"utf8" nullable:"true" list:"user"` Object string `json:"object" width:"128" charset:"utf8" nullable:"true" list:"user"` // Column(VARCHAR(128, charset='utf8'), nullable=False) ObjId string `width:"128" charset:"ascii" nullable:"false" list:"user" index:"true"` // Column(VARCHAR(ID_LENGTH, charset='ascii'), nullable=False) - TaskName string `width:"64" charset:"ascii" nullable:"false" list:"user"` // Column(VARCHAR(64, charset='ascii'), nullable=False) + TaskName string `width:"64" charset:"ascii" nullable:"false" list:"user" index:"true"` // Column(VARCHAR(64, charset='ascii'), nullable=False) UserCred mcclient.TokenCredential `width:"1024" charset:"utf8" nullable:"false" get:"user"` // Column(VARCHAR(1024, charset='ascii'), nullable=False) // OwnerCred string `width:"512" charset:"ascii" nullable:"true"` // Column(VARCHAR(512, charset='ascii'), nullable=True) Params *jsonutils.JSONDict `charset:"utf8" length:"medium" nullable:"false" get:"user"` // Column(MEDIUMTEXT(charset='ascii'), nullable=False) - Stage string `width:"64" charset:"ascii" nullable:"false" default:"on_init" list:"user"` // Column(VARCHAR(64, charset='ascii'), nullable=False, default='on_init') + Stage string `width:"64" charset:"ascii" nullable:"false" default:"on_init" list:"user" index:"true"` // Column(VARCHAR(64, charset='ascii'), nullable=False, default='on_init') // 父任务Id ParentTaskId string `width:"36" charset:"ascii" list:"user" index:"true" json:"parent_task_id"` @@ -132,9 +133,9 @@ type STask struct { taskObject db.IStandaloneModel `ignore:"true"` taskObjects []db.IStandaloneModel `ignore:"true"` - SubTaskCount int `ignore:"true"` - FailSubTaskCnt int `ignore:"true"` - SUccSubTaskCnt int `ignore:"true"` + SubTaskCount int `ignore:"true" json:"sub_task_count"` + FailSubTaskCnt int `ignore:"true" json:"fail_sub_task_cnt"` + SuccSubTaskCnt int `ignore:"true" json:"succ_sub_task_cnt"` } func (manager *STaskManager) CreateByInsertOrUpdate() bool { @@ -182,23 +183,55 @@ func (task *STask) GetOwnerId() mcclient.IIdentityProvider { return task.SProjectizedResourceBase.GetOwnerId() } -func (manager *STaskManager) FilterByOwner(ctx context.Context, q *sqlchemy.SQuery, man db.FilterByOwnerProvider, userCred mcclient.TokenCredential, owner mcclient.IIdentityProvider, scope rbacscope.TRbacScope) *sqlchemy.SQuery { - objectQAltered := false - objectQ := TaskObjectManager.Query().Snapshot() - objectQ = TaskObjectManager.SProjectizedResourceBaseManager.FilterByOwner(ctx, objectQ, man, userCred, owner, scope) - objectQAltered = objectQ.IsAltered() - objectQ = objectQ.AppendField(sqlchemy.DISTINCT("task_id", objectQ.Field("task_id"))) - - singleTaskQAltered := false - singleTaskQ := TaskManager.Query().NotEquals("obj_id", MULTI_OBJECTS_ID).Snapshot() - singleTaskQ = TaskManager.SProjectizedResourceBaseManager.FilterByOwner(ctx, singleTaskQ, man, userCred, owner, scope) - singleTaskQAltered = singleTaskQ.IsAltered() - singleTaskQ = singleTaskQ.AppendField(sqlchemy.DISTINCT("task_id", singleTaskQ.Field("id"))) - - if objectQAltered || singleTaskQAltered { - subQ := sqlchemy.Union(objectQ, singleTaskQ).Query().SubQuery() - q = q.Join(subQ, sqlchemy.Equals(q.Field("id"), subQ.Field("task_id"))) +func getExtendTaskObjectQuery(fields ...string) *sqlchemy.SSubQuery { + taskQ := TaskManager.Query("id", "obj_id", "tenant_id", "domain_id").SubQuery() + objectQ := TaskObjectManager.Query().SubQuery() + + taskTenantQ := taskQ.Query() + taskTenantQ = taskTenantQ.LeftJoin(objectQ, sqlchemy.Equals(taskQ.Field("id"), objectQ.Field("task_id"))) + if utils.IsInArray("task_id", fields) { + taskTenantQ = taskTenantQ.AppendField(taskQ.Field("id").Label("task_id")) + } + if utils.IsInArray("tenant_id", fields) { + taskTenantQ = taskTenantQ.AppendField(sqlchemy.NewFunction( + sqlchemy.NewCase(). + When(sqlchemy.Equals(taskQ.Field("obj_id"), MULTI_OBJECTS_ID), objectQ.Field("tenant_id")).Else(taskQ.Field("tenant_id")), + "tenant_id", + true, + )) + } + if utils.IsInArray("domain_id", fields) { + taskTenantQ = taskTenantQ.AppendField(sqlchemy.NewFunction( + sqlchemy.NewCase(). + When(sqlchemy.Equals(taskQ.Field("obj_id"), MULTI_OBJECTS_ID), objectQ.Field("domain_id")).Else(taskQ.Field("domain_id")), + "domain_id", + true, + )) } + if utils.IsInArray("obj_id", fields) { + taskTenantQ = taskTenantQ.AppendField(sqlchemy.NewFunction( + sqlchemy.NewCase(). + When(sqlchemy.Equals(taskQ.Field("obj_id"), MULTI_OBJECTS_ID), objectQ.Field("obj_id")).Else(taskQ.Field("obj_id")), + "obj_id", + true, + )) + } + return taskTenantQ.SubQuery() +} + +func (manager *STaskManager) FilterByOwner(ctx context.Context, q *sqlchemy.SQuery, man db.FilterByOwnerProvider, userCred mcclient.TokenCredential, owner mcclient.IIdentityProvider, scope rbacscope.TRbacScope) *sqlchemy.SQuery { + taskTenantSubQ := getExtendTaskObjectQuery("task_id", "tenant_id", "domain_id").Query() + + taskTenantSubQ = TaskManager.SProjectizedResourceBaseManager.FilterByOwner(ctx, taskTenantSubQ, man, userCred, owner, scope) + taskTenantSubQ = taskTenantSubQ.SubQuery().Query() + taskTenantSubQ = taskTenantSubQ.AppendField(taskTenantSubQ.Field("task_id")) + taskTenantSubQ = taskTenantSubQ.AppendField(taskTenantSubQ.Field("tenant_id")) + taskTenantSubQ = taskTenantSubQ.AppendField(taskTenantSubQ.Field("domain_id")) + taskTenantSubQ = taskTenantSubQ.GroupBy("task_id", "tenant_id", "domain_id") + taskTenantTaskIdQ := taskTenantSubQ.Distinct().SubQuery() + + q = q.Join(taskTenantTaskIdQ, sqlchemy.Equals(q.Field("id"), taskTenantTaskIdQ.Field("task_id"))) + return q } @@ -460,14 +493,6 @@ func (manager *STaskManager) fetchTask(idStr string) *STask { return task } -/*func (manager *STaskManager) getTaskName(taskId string) string { - baseTask := manager.fetchTask(taskId) - if baseTask == nil { - return "" - } - return baseTask.TaskName -}*/ - func (manager *STaskManager) execTask(taskId string, data jsonutils.JSONObject) { baseTask := manager.fetchTask(taskId) if baseTask == nil { @@ -884,7 +909,7 @@ func (task *STask) IsCurrentStageComplete() bool { totalSubtasksCnt, _ := SubTaskManager.GetTotalSubtasksCount(task.Id, task.Stage) initSubtasksCnt, _ := SubTaskManager.GetInitSubtasksCount(task.Id, task.Stage) log.Debugf("Task %s IsCurrentStageComplete totalSubtasks %d initSubtasks %d ", task.String(), totalSubtasksCnt, initSubtasksCnt) - task.SetProgress(float32(totalSubtasksCnt-initSubtasksCnt) / float32(totalSubtasksCnt)) + task.SetProgress(float32(totalSubtasksCnt-initSubtasksCnt) * 100 / float32(totalSubtasksCnt)) if totalSubtasksCnt > 0 && initSubtasksCnt == 0 { return true } else { @@ -1089,12 +1114,17 @@ func (manager *STaskManager) ListItemFilter( } if len(input.ObjId) > 0 { - taskObjsQ := TaskObjectManager.Query().In("obj_id", input.ObjId) - taskObjsQ = taskObjsQ.AppendField(sqlchemy.DISTINCT("task_id", taskObjsQ.Field("task_id"))) - tasksQ := TaskManager.Query().In("obj_id", input.ObjId) - tasksQ = tasksQ.AppendField(tasksQ.Field("id").Label("task_id")) - taskIdQ := sqlchemy.Union(taskObjsQ, tasksQ).Query().SubQuery() - q = q.Join(taskIdQ, sqlchemy.Equals(taskIdQ.Field("task_id"), q.Field("id"))) + taskExtSubQ := getExtendTaskObjectQuery("task_id", "obj_id").Query() + + taskExtSubQ = taskExtSubQ.In("obj_id", input.ObjId) + + taskExtSubQ = taskExtSubQ.SubQuery().Query() + taskExtSubQ = taskExtSubQ.AppendField(taskExtSubQ.Field("task_id")) + taskExtSubQ = taskExtSubQ.AppendField(taskExtSubQ.Field("obj_id")) + taskExtSubQ = taskExtSubQ.GroupBy("task_id", "obj_id") + taskObjTaskIdQ := taskExtSubQ.Distinct().SubQuery() + + q = q.Join(taskObjTaskIdQ, sqlchemy.Equals(taskObjTaskIdQ.Field("task_id"), q.Field("id"))) } if len(input.ObjName) > 0 { @@ -1113,6 +1143,10 @@ func (manager *STaskManager) ListItemFilter( q = q.In("stage", input.Stage) } + if len(input.NotStage) > 0 { + q = q.NotIn("stage", input.NotStage) + } + if len(input.ParentId) > 0 { q = q.In("parent_task_id", input.ParentId) } @@ -1149,7 +1183,7 @@ func (manager *STaskManager) ListItemFilter( } } - if input.Details != nil && *input.Details { + if input.SubTask != nil && *input.SubTask { subSQFunc := func(status string, cntField string) *sqlchemy.SSubQuery { subQ := SubTaskManager.Query() if len(status) > 0 {