Skip to content

Commit

Permalink
chore: add queue capacity metric
Browse files Browse the repository at this point in the history
Signed-off-by: zedongh <[email protected]>
  • Loading branch information
zedongh committed Jan 4, 2025
1 parent 588c7c1 commit 5f58a33
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 9 deletions.
35 changes: 35 additions & 0 deletions pkg/scheduler/metrics/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@ var (
Help: "If one queue is overused",
}, []string{"queue_name"},
)

queueCapacityMilliCPU = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "queue_capacity_milli_cpu",
Help: "Capacity CPU count for one queue",
}, []string{"queue_name"},
)

queueCapacityMemory = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "queue_capacity_memory_bytes",
Help: "Capacity memory for one queue",
}, []string{"queue_name"},
)

queueCapacityScalarResource = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: VolcanoNamespace,
Name: "queue_capacity_scalar_resources",
Help: "Capacity scalar resources for one queue",
}, []string{"queue_name", "resource"},
)
)

// UpdateQueueAllocated records allocated resources for one queue
Expand Down Expand Up @@ -168,6 +192,14 @@ func UpdateQueueOverused(queueName string, overused bool) {
queueOverused.WithLabelValues(queueName).Set(value)
}

func UpdateQueueCapacity(queueName string, milliCPU, memory float64, scalarResources map[v1.ResourceName]float64) {
queueCapacityMilliCPU.WithLabelValues(queueName).Set(milliCPU)
queueCapacityMemory.WithLabelValues(queueName).Set(memory)
for resource, value := range scalarResources {
queueCapacityScalarResource.WithLabelValues(queueName, string(resource)).Set(value)
}
}

// DeleteQueueMetrics delete all metrics related to the queue
func DeleteQueueMetrics(queueName string) {
queueAllocatedMilliCPU.DeleteLabelValues(queueName)
Expand All @@ -179,8 +211,11 @@ func DeleteQueueMetrics(queueName string) {
queueShare.DeleteLabelValues(queueName)
queueWeight.DeleteLabelValues(queueName)
queueOverused.DeleteLabelValues(queueName)
queueCapacityMilliCPU.DeleteLabelValues(queueName)
queueCapacityMemory.DeleteLabelValues(queueName)
partialLabelMap := map[string]string{"queue_name": queueName}
queueAllocatedScalarResource.DeletePartialMatch(partialLabelMap)
queueRequestScalarResource.DeletePartialMatch(partialLabelMap)
queueDeservedScalarResource.DeletePartialMatch(partialLabelMap)
queueCapacityScalarResource.DeletePartialMatch(partialLabelMap)
}
25 changes: 16 additions & 9 deletions pkg/scheduler/metrics/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,10 @@ func retryOnConnectionRefused(fn func() error) error {
}

func TestQueueResourceMetric(t *testing.T) {
UpdateQueueAllocated("queue1", 100, 1024, map[v1.ResourceName]float64{
"nvidia.com/gpu": 0,
})
UpdateQueueRequest("queue2", 200, 2048, map[v1.ResourceName]float64{
"nvidia.com/gpu": 1,
})
UpdateQueueDeserved("queue3", 300, 3096, map[v1.ResourceName]float64{
"nvidia.com/gpu": 2,
})
UpdateQueueAllocated("queue1", 100, 1024, map[v1.ResourceName]float64{"nvidia.com/gpu": 0})
UpdateQueueRequest("queue2", 200, 2048, map[v1.ResourceName]float64{"nvidia.com/gpu": 1})
UpdateQueueDeserved("queue3", 300, 3096, map[v1.ResourceName]float64{"nvidia.com/gpu": 2})
UpdateQueueCapacity("queue1", 200., 2048., map[v1.ResourceName]float64{v1.ResourceName("nvidia.com/gpu"): 10.})
go func() {
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(":8081", nil)
Expand All @@ -93,6 +88,11 @@ func TestQueueResourceMetric(t *testing.T) {
assert.Contains(t, m, "volcano_queue_deserved_scalar_resources{queue_name=\"queue3\",resource=\"nvidia.com/gpu\"}",
"volcano_queue_deserved_scalar_resources for queue3 and resource nvidia.com/gpu should be present")
assert.Equal(t, 2., m["volcano_queue_deserved_scalar_resources{queue_name=\"queue3\",resource=\"nvidia.com/gpu\"}"])
assert.Equal(t, 200., m["volcano_queue_capacity_milli_cpu{queue_name=\"queue1\"}"])
assert.Equal(t, 2048., m["volcano_queue_capacity_memory_bytes{queue_name=\"queue1\"}"])
assert.Contains(t, m, "volcano_queue_capacity_scalar_resources{queue_name=\"queue1\",resource=\"nvidia.com/gpu\"}",
"volcano_queue_capacity_scalar_resources for queue1 and resource nvidia.com/gpu should be present")
assert.Equal(t, 10., m["volcano_queue_capacity_scalar_resources{queue_name=\"queue1\",resource=\"nvidia.com/gpu\"}"])
})
})
if err != nil {
Expand All @@ -108,6 +108,12 @@ func TestQueueResourceMetric(t *testing.T) {
"volcano_queue_allocated_memory_bytes for queue1 should be removed")
assert.NotContains(t, m, "volcano_queue_allocated_scalar_resources{queue_name=\"queue1\",resource=\"nvidia.com/gpu\"}",
"volcano_queue_allocated_scalar_resources for queue1 should be removed")
assert.NotContains(t, m, "volcano_queue_capacity_milli_cpu{queue_name=\"queue1\"}",
"volcano_queue_capacity_milli_cpu for queue1 should be removed")
assert.NotContains(t, m, "volcano_queue_capacity_memory_bytes{queue_name=\"queue1\"}",
"volcano_queue_capacity_memory_bytes for queue1 should be removed")
assert.NotContains(t, m, "volcano_queue_capacity_scalar_resources{queue_name=\"queue1\",resource=\"nvidia.com/gpu\"}",
"volcano_queue_capacity_scalar_resources for queue1 should be removed")
assert.NotContains(t, m, "volcano_queue_request_milli_cpu{queue_name=\"queue2\"}",
"volcano_queue_request_milli_cpu for queue2 should be removed")
assert.NotContains(t, m, "volcano_queue_request_memory_bytes{queue_name=\"queue2\"}",
Expand All @@ -123,6 +129,7 @@ func TestQueueResourceMetric(t *testing.T) {
assert.Contains(t, m, "volcano_queue_deserved_scalar_resources{queue_name=\"queue3\",resource=\"nvidia.com/gpu\"}",
"volcano_queue_deserved_scalar_resources for queue3 & nvidia.com/gpu should not be removed")
assert.Equal(t, 2., m["volcano_queue_deserved_scalar_resources{queue_name=\"queue3\",resource=\"nvidia.com/gpu\"}"])

})
})
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/scheduler/plugins/capacity/capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) {
metrics.UpdateQueueDeserved(attr.name, attr.deserved.MilliCPU, attr.deserved.Memory, attr.deserved.ScalarResources)
metrics.UpdateQueueAllocated(attr.name, attr.allocated.MilliCPU, attr.allocated.Memory, attr.allocated.ScalarResources)
metrics.UpdateQueueRequest(attr.name, attr.request.MilliCPU, attr.request.Memory, attr.request.ScalarResources)
metrics.UpdateQueueCapacity(attr.name, attr.realCapability.MilliCPU, attr.realCapability.Memory, attr.realCapability.ScalarResources)
continue
}
deservedCPU, deservedMem, scalarResources := 0.0, 0.0, map[v1.ResourceName]float64{}
Expand All @@ -387,6 +388,14 @@ func (cp *capacityPlugin) buildQueueAttrs(ssn *framework.Session) {
metrics.UpdateQueueDeserved(queueInfo.Name, deservedCPU, deservedMem, scalarResources)
metrics.UpdateQueueAllocated(queueInfo.Name, 0, 0, map[v1.ResourceName]float64{})
metrics.UpdateQueueRequest(queueInfo.Name, 0, 0, map[v1.ResourceName]float64{})
if queue.Queue.Spec.Capability != nil {
guarantee := api.EmptyResource()
if len(queue.Queue.Spec.Guarantee.Resource) != 0 {
guarantee = api.NewResource(queue.Queue.Spec.Guarantee.Resource)
}
realCapacity := api.ExceededPart(cp.totalResource, cp.totalGuarantee).Add(guarantee)
metrics.UpdateQueueCapacity(queueInfo.Name, realCapacity.MilliCPU, realCapacity.Memory, realCapacity.ScalarResources)
}
}

ssn.AddQueueOrderFn(cp.Name(), func(l, r interface{}) int {
Expand Down

0 comments on commit 5f58a33

Please sign in to comment.