Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Volcano scheduler: Supports network topology aware scheduling when pods rescheduled #3936

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
hyperNodesWithLeftTasks := make(map[string]*util.PriorityQueue)
ssn := alloc.session
selectedTier := 0
jobNewHyperNodeMap := map[string]string{}
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]

// Find a suitable hyperNode in one tier from down to top everytime to ensure that the selected hyperNode spans the least tier.
for index, tier := range alloc.hyperNodesTiers {
Expand All @@ -245,6 +247,18 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
break
}
for _, hyperNodeName := range ssn.HyperNodesListByTier[tier] {
// job is scheduled for the first time
jobNewHyperNodeMap[hyperNodeName] = hyperNodeName
if jobHyperNode != "" {
jobNewHyperNode, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, nil)
// check whether the hyperNode meets the requirements of the topology hard tier.
if index+1 > highestAllowedTier {
klog.V(4).ErrorS(nil, "Skip search for higher tier cause highest allowed tier reached", "jobName", job.UID, "highestAllowedTier", highestAllowedTier, "tier", tier)
break
}
jobNewHyperNodeMap[hyperNodeName] = jobNewHyperNode
}

nodes, ok := ssn.HyperNodes[hyperNodeName]
if !ok {
klog.ErrorS(nil, "HyperNode not exists.", "jobName", job.UID, "name", hyperNodeName, "tier", tier)
Expand Down Expand Up @@ -286,6 +300,8 @@ func (alloc *Action) allocateResourceForTasksWithTopology(tasks *util.PriorityQu
klog.V(4).InfoS("Find available hyperNodes for job", "jobName", job.UID, "tier", selectedTier, "hyperNodes", hyperNodes)
}
stmt, hyperNode := alloc.selectBestHyperNode(jobStmtsByTier[selectedTier], job)
jobNewHyperNode := jobNewHyperNodeMap[hyperNode]
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = jobNewHyperNode
return stmt, hyperNodesWithLeftTasks[hyperNode]
}

Expand Down Expand Up @@ -390,6 +406,18 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
continue
}

// recored hyperNode of the job
if hyperNode == "" {
jobHyperNode := job.PodGroup.Annotations[api.TopologyAllocateLCAHyperNode]
hyperNodeOfNode := util.FindHyperNodeOfNode(bestNode.Name, ssn.HyperNodes)
newJobHyperNode := hyperNodeOfNode
if jobHyperNode != "" {
// job is not scheduled for the first time
newJobHyperNode, _ = util.FindLCAHyperNode(hyperNodeOfNode, jobHyperNode, ssn.HyperNodeTree)
}
job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = newJobHyperNode
}

alloc.sumNodeScoresInHyperNode(string(job.UID), hyperNode, highestScore)
alloc.allocateResourcesForTask(stmt, task, bestNode, job)

Expand Down
10 changes: 10 additions & 0 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/klog/v2"
k8sframework "k8s.io/kubernetes/pkg/scheduler/framework"

"volcano.sh/apis/pkg/apis/scheduling"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"

"volcano.sh/volcano/pkg/scheduler/api/devices/nvidia/gpushare"
Expand Down Expand Up @@ -90,6 +91,15 @@ type NodeInfo struct {
ImageStates map[string]*k8sframework.ImageStateSummary
}

// Recored podgroup old state
type PodGroupOldState struct {
// podGroupStatus cache podgroup status during schedule
// This should not be mutated after initiated
Status map[JobID]scheduling.PodGroupStatus
// recored old annotations for podgroup, used to detect changes
Annotations map[JobID]map[string]string
}

// FutureIdle returns resources that will be idle in the future:
//
// That is current idle resources plus released resources minus pipelined resources.
Expand Down
3 changes: 3 additions & 0 deletions pkg/scheduler/api/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,7 @@ const (

// topologyDecisionAnnotation is the key of topology decision about pod request resource
topologyDecisionAnnotation = "volcano.sh/topology-decision"

// TopologyAllocateLCAHyperNode is the key to the lowest common ancestor of the network topology to which the tasks assigned to a job belong.
TopologyAllocateLCAHyperNode = "volcano.sh/allocate-lca-hypernode"
)
5 changes: 4 additions & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
vcinformerv1 "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/features"
"volcano.sh/volcano/pkg/scheduler/api"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding"
"volcano.sh/volcano/pkg/scheduler/metrics"
Expand Down Expand Up @@ -1536,9 +1537,11 @@ func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePG b
if err != nil {
return nil, err
}
sc.Mutex.Lock()
sc.Jobs[job.UID].PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode] = job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
sc.Mutex.Unlock()
job.PodGroup = pg
}

sc.RecordJobStatusEvent(job, updatePG)

return job, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/conf/scheduler_conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type PluginOption struct {
EnabledOverused *bool `yaml:"enabledOverused"`
// EnabledAllocatable defines whether allocatable is enabled
EnabledAllocatable *bool `yaml:"enabledAllocatable"`
// EnabledNetworkTopology defines whether network topology is enabled
EnabledNetworkTopology *bool `yaml:"enabledNetworkTopology"`
// Arguments defines the different arguments that can be given to different plugins
Arguments map[string]interface{} `yaml:"arguments"`
}
6 changes: 4 additions & 2 deletions pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,12 @@ func isPodGroupStatusUpdated(newStatus, oldStatus scheduling.PodGroupStatus) boo
func (ju *jobUpdater) updateJob(index int) {
job := ju.jobQueue[index]
ssn := ju.ssn
oldHyperNode := ssn.PodGroupOldState.Annotations[job.UID][api.TopologyAllocateLCAHyperNode]

job.PodGroup.Status = jobStatus(ssn, job)
oldStatus, found := ssn.podGroupStatus[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
oldStatus, found := ssn.PodGroupOldState.Status[job.UID]
updatePG := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) || oldHyperNode != job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]

if _, err := ssn.cache.UpdateJobStatus(job, updatePG); err != nil {
klog.Errorf("Failed to update job <%s/%s>: %v",
job.Namespace, job.Name, err)
Expand Down
16 changes: 11 additions & 5 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ type Session struct {

TotalResource *api.Resource
TotalGuarantee *api.Resource
// podGroupStatus cache podgroup status during schedule

// PodGroupOldState contains podgroup status and annotations during schedule
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus
api.PodGroupOldState

Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
Expand All @@ -81,6 +82,8 @@ type Session struct {
HyperNodesListByTier map[int][]string
// HyperNodes maps hyperNode Name -> nodes under the hyperNode.
HyperNodes map[string][]*api.NodeInfo
// HyperNodeTree is the hypernode tree of all hypernodes in the cluster.
HyperNodeTree []map[string][]string

plugins map[string]Plugin
eventHandlers []*EventHandler
Expand Down Expand Up @@ -127,8 +130,10 @@ func openSession(cache cache.Cache) *Session {

TotalResource: api.EmptyResource(),
TotalGuarantee: api.EmptyResource(),
podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},

PodGroupOldState: api.PodGroupOldState{
Status: map[api.JobID]scheduling.PodGroupStatus{},
Annotations: map[api.JobID]map[string]string{},
},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
CSINodesStatus: map[string]*api.CSINodeStatusInfo{},
Expand Down Expand Up @@ -170,7 +175,8 @@ func openSession(cache cache.Cache) *Session {
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
if job.PodGroup != nil {
ssn.podGroupStatus[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Status[job.UID] = *job.PodGroup.Status.DeepCopy()
ssn.PodGroupOldState.Annotations[job.UID] = job.PodGroup.GetAnnotations()
}

if vjr := ssn.JobValid(job); vjr != nil {
Expand Down
198 changes: 198 additions & 0 deletions pkg/scheduler/plugins/networktopologyaware/networktopologyaware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
Copyright 2019 The Volcano Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package networktopologyaware

import (
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/util"
)

const (
// PluginName indicates name of volcano scheduler plugin.
PluginName = "networktopologyaware"
BaseScore = 100.0
TaskBaseScore = 10.0
ZeroScore = 0.0
NetworkTopologyWeight = "weight"
)

type networkTopologyAwarePlugin struct {
// Arguments given for the plugin
pluginArguments framework.Arguments
}

// New function returns prioritizePlugin object
func New(arguments framework.Arguments) framework.Plugin {
return &networkTopologyAwarePlugin{
pluginArguments: arguments,
}
}

func (nta *networkTopologyAwarePlugin) Name() string {
return PluginName
}

func calculateWeight(args framework.Arguments) int {
weight := 1
args.GetInt(&weight, NetworkTopologyWeight)
return weight
}

func (nta *networkTopologyAwarePlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("Enter networkTopologyAwarePlugin plugin ...")
defer func() {
klog.V(5).Infof("Leaving networkTopologyAware plugin ...")
}()

weight := calculateWeight(nta.pluginArguments)
hyperNodeFn := func(job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) (map[string]float64, error) {
hyperNodeScores := make(map[string]float64)
jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
// job is scheduled for the first time, All hyperNodes have the same score..
if jobHyperNode == "" {
for hyperNode := range hyperNodes {
hyperNodeScores[hyperNode] = ZeroScore
}
return hyperNodeScores, nil
}
// job is not scheduled for the first time, calculate score based on hypernode tree.
maxScore := ZeroScore
scoreHyperNode := map[float64][]string{}
for hyperNode := range hyperNodes {
score := networkTopologyAwareScore(hyperNode, job, ssn.HyperNodeTree)
score *= float64(weight)
hyperNodeScores[hyperNode] = score
if score >= maxScore {
maxScore = score
scoreHyperNode[score] = append(scoreHyperNode[score], hyperNode)
}
}
// calculate score based on task num if max score hyperNode has more than one.
if len(scoreHyperNode[maxScore]) > 1 {
for hyperNode, score := range hyperNodeScores {
if score == maxScore {
taskNumScore := networkTopologyAwareScoreWithTaskNum(hyperNode, job, ssn.HyperNodes)
taskNumScore *= float64(weight)
hyperNodeScores[hyperNode] += taskNumScore
}
}
}

klog.V(1).Infof("networkTopologyAware score is: %v", hyperNodeScores)
return hyperNodeScores, nil
}

nodeFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
nodeScores := make(map[string]float64)

taskJob := ssn.Jobs[task.Job]
jobHyperNode := taskJob.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]
// job fist first scheduler, All node have the same score.
if jobHyperNode == "" {
for _, node := range nodes {
nodeScores[node.Name] = ZeroScore
}
return nodeScores, nil
}
// job not first scheduler, calculate score based on hypernode tree.
for _, node := range nodes {
hyperNode := util.FindHyperNodeOfNode(node.Name, ssn.HyperNodes)
score := networkTopologyAwareScore(hyperNode, taskJob, ssn.HyperNodeTree)
score *= float64(weight)
nodeScores[node.Name] = score
}

maxScore := ZeroScore
scoreNodes := map[float64][]string{}
for _, node := range nodes {
hyperNode := util.FindHyperNodeOfNode(node.Name, ssn.HyperNodes)
score := networkTopologyAwareScore(hyperNode, taskJob, ssn.HyperNodeTree)
score *= float64(weight)
nodeScores[node.Name] = score
if score >= maxScore {
maxScore = score
scoreNodes[score] = append(scoreNodes[score], node.Name)
}
}
// calculate score based on task num if max score hyperNode has more than one.
if len(scoreNodes[maxScore]) > 1 {
for node, score := range nodeScores {
if score == maxScore {
hyperNode := util.FindHyperNodeOfNode(node, ssn.HyperNodes)
taskNumScore := networkTopologyAwareScoreWithTaskNum(hyperNode, taskJob, ssn.HyperNodes)
taskNumScore *= float64(weight)
nodeScores[node] += taskNumScore
}
}
}

klog.V(1).Infof("networkTopologyAware score is: %v", nodeScores)
return nodeScores, nil
}

ssn.AddHyperNodeOrederFn(nta.Name(), hyperNodeFn)
ssn.AddBatchNodeOrderFn(nta.Name(), nodeFn)
}

func (bp *networkTopologyAwarePlugin) OnSessionClose(ssn *framework.Session) {
}

// networkTopologyAwareScore use the best fit polices during scheduling.

// Goals:
// - The tier index to which the LCAHyperNode of a job belongs should be as low as possible.
func networkTopologyAwareScore(hyperNodeName string, job *api.JobInfo, hyperNodeTree []map[string][]string) float64 {
jobHyperNode := job.PodGroup.GetAnnotations()[api.TopologyAllocateLCAHyperNode]

if jobHyperNode == hyperNodeName {
return BaseScore
}
// Calculate hyperNode tier index score.
_, index := util.FindLCAHyperNode(hyperNodeName, jobHyperNode, hyperNodeTree)
if index <= 0 {
klog.V(4).Infof("find LCAhyperNode failed wtih %s in hyperNodeTree", hyperNodeName)
return 0.0
}
tierIndexScore := BaseScore * scoreHyperNodeWithIndex(index, 1, len(hyperNodeTree))

return tierIndexScore
}

// Goals:
// - Tasks under a job should be scheduled to one hyperNode as much as possible.
func networkTopologyAwareScoreWithTaskNum(hyperNodeName string, job *api.JobInfo, hyperNodes map[string][]*api.NodeInfo) float64 {
// Calculate tasks num score.
taskNum := util.FindJobTaskNumOfHyperNode(hyperNodeName, job, hyperNodes)
taskNumScore := ZeroScore
if len(job.Tasks) > 0 {
taskNumScore = TaskBaseScore * scoreHyperNodeWithTaskNum(taskNum, len(job.Tasks))
}
return taskNumScore
}

func scoreHyperNodeWithIndex(index int, minIndex int, maxIndex int) float64 {
// Use tier index to calculate scores and map the original score to the range between 0 and 1.
return float64(maxIndex-index) / float64(maxIndex-minIndex)
}

func scoreHyperNodeWithTaskNum(taskNum int, maxTaskNum int) float64 {
// Calculate task distribution rate as score and make sure the original score to the range between 0 and 1.
return float64(taskNum) / float64(maxTaskNum)
}
Loading