From ac18d60e0e8c453874ae110e7d4f7c0ad9b207b3 Mon Sep 17 00:00:00 2001 From: Guangya Liu Date: Fri, 15 May 2026 16:54:23 -0400 Subject: [PATCH] metrics: add workload-level preemption count gauge --- pkg/controller/core/workload_controller.go | 2 + pkg/metrics/metrics.go | 34 ++++++++++++++- pkg/metrics/metrics_test.go | 42 ++++++++++++++++--- pkg/scheduler/preemption/preemption.go | 10 ++++- pkg/workload/workload.go | 11 ++++- site/content/en/docs/reference/metrics.md | 10 +++++ site/content/zh-CN/docs/reference/metrics.md | 10 +++++ .../singlecluster/baseline/metrics_test.go | 1 + 8 files changed, 111 insertions(+), 9 deletions(-) diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index d1faa3a2d4e..85413f2fd77 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -252,6 +252,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c finishedCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadFinished) if finishedCond != nil && finishedCond.Status == metav1.ConditionTrue { + metrics.ClearWorkloadPreemptionMetrics(wl.Namespace, wl.Name) if r.workloadRetention == nil || r.workloadRetention.afterFinished == nil { return ctrl.Result{}, nil } @@ -1137,6 +1138,7 @@ func (r *WorkloadReconciler) Delete(e event.TypedDeleteEvent[*kueue.Workload]) b // Even if the state is unknown, the last cached state tells us whether the // workload was in the queues and should be cleared from them. r.queues.DeleteAndForgetWorkload(log, wlKey) + metrics.ClearWorkloadPreemptionMetrics(e.Object.Namespace, e.Object.Name) return true } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 31eca295f31..24c154ecd7e 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -188,6 +188,10 @@ var ( // +metricsdoc:labels=preempting_cluster_queue="the ClusterQueue executing preemption",reason="eviction or preemption reason",replica_role="one of `leader`, `follower`, or `standalone`" PreemptedWorkloadsTotal *prometheus.CounterVec + // +metricsdoc:group=workload + // +metricsdoc:labels=namespace="the namespace of the preempted workload",workload="the name of the preempted workload",cluster_queue="the ClusterQueue of the preempted workload",reason="eviction or preemption reason" + WorkloadPreemptionsCount *prometheus.GaugeVec + // +metricsdoc:group=clusterqueue // +metricsdoc:labels=cluster_queue="the evicted workload's ClusterQueue from status.admission on the workload before quota was released (only present when the metric records a sample)",reason="eviction or preemption reason (same values as evicted_workloads_total)",replica_role="one of `leader`, `follower`, or `standalone`" WorkloadEvictionLatencySeconds *prometheus.HistogramVec @@ -651,6 +655,21 @@ The label 'reason' can have the following values: }, append([]string{"preempting_cluster_queue", "reason", "replica_role"}, extraLabels...), ) + WorkloadPreemptionsCount = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Subsystem: constants.KueueName, + Name: "workload_preemptions", + Help: `The cumulative number of times each workload has been preempted. +The label 'reason' can have the following values: +- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue. +- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota. +- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort Fair Sharing. +- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing. +Uses a Gauge (rather than a Counter) so that the time series can be deleted when the workload finishes or is removed, keeping cardinality bounded. +This gauge is deleted when the workload is completed or deleted.`, + }, []string{"namespace", "workload", "cluster_queue", "reason"}, + ) + WorkloadEvictionLatencySeconds = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Subsystem: constants.KueueName, @@ -1008,9 +1027,21 @@ func ReportEvictedWorkloadsOnce(cqName kueue.ClusterQueueReference, reason, unde EvictedWorkloadsOnceTotal.WithLabelValues(labels...).Inc() } -func ReportPreemption(preemptingCqName kueue.ClusterQueueReference, preemptingReason string, targetCqName kueue.ClusterQueueReference, customLabelValues []string, tracker *roletracker.RoleTracker) { +func ReportPreemption( + preemptingCqName kueue.ClusterQueueReference, + preemptingReason string, + targetCqName kueue.ClusterQueueReference, + targetNamespace, targetWorkloadName string, + customLabelValues []string, + tracker *roletracker.RoleTracker, +) { labels := append([]string{string(preemptingCqName), preemptingReason, roletracker.GetRole(tracker)}, customLabelValues...) PreemptedWorkloadsTotal.WithLabelValues(labels...).Inc() + WorkloadPreemptionsCount.WithLabelValues(targetNamespace, targetWorkloadName, string(targetCqName), preemptingReason).Inc() +} + +func ClearWorkloadPreemptionMetrics(namespace, workloadName string) { + WorkloadPreemptionsCount.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "workload": workloadName}) } func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference { @@ -1321,6 +1352,7 @@ func Register() { EvictedWorkloadsTotal, EvictedWorkloadsOnceTotal, PreemptedWorkloadsTotal, + WorkloadPreemptionsCount, WorkloadEvictionLatencySeconds, ReservingActiveWorkloads, AdmittedActiveWorkloads, diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 64ec30ed337..cfeb8da35f0 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -195,10 +195,10 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) { } func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) { - ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1", nil, nil) - ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1", nil, nil) - ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1", nil, nil) - ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1", nil, nil) + ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1", "ns1", "wl1", nil, nil) + ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1", "ns1", "wl2", nil, nil) + ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1", "ns1", "wl3", nil, nil) + ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1", "ns1", "wl4", nil, nil) expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 4, "preempting_cluster_queue", "cluster_queue1") expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InClusterQueue") @@ -206,10 +206,42 @@ func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) { expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation") expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclaimWhileBorrowing") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 4, "namespace", "ns1", "cluster_queue", "cluster_queue1") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl1", "reason", "InClusterQueue") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl2", "reason", "InCohortReclamation") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl3", "reason", "InCohortFairSharing") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl4", "reason", "InCohortReclaimWhileBorrowing") + + ClearWorkloadPreemptionMetrics("ns1", "wl1") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 0, "namespace", "ns1", "workload", "wl1") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 3, "namespace", "ns1", "cluster_queue", "cluster_queue1") + ClearClusterQueueMetrics("cluster_queue1") expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preempting_cluster_queue", "cluster_queue1") } +func TestWorkloadPreemptionsCountAccumulates(t *testing.T) { + // Preempt the same workload three times; the gauge value must accumulate. + ReportPreemption("cq1", "InClusterQueue", "cq1", "ns-acc", "wl-acc", nil, nil) + ReportPreemption("cq1", "InClusterQueue", "cq1", "ns-acc", "wl-acc", nil, nil) + ReportPreemption("cq1", "InClusterQueue", "cq1", "ns-acc", "wl-acc", nil, nil) + + // One series, value == 3. + pts := metrics.CollectFilteredGaugeVec(WorkloadPreemptionsCount, prometheus.Labels{ + "namespace": "ns-acc", "workload": "wl-acc", "reason": "InClusterQueue", + }) + if len(pts) != 1 { + t.Fatalf("expected 1 data point, got %d", len(pts)) + } + if pts[0].Value != 3 { + t.Errorf("expected gauge value 3 after 3 preemptions, got %v", pts[0].Value) + } + + ClearWorkloadPreemptionMetrics("ns-acc", "wl-acc") + expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 0, "namespace", "ns-acc", "workload", "wl-acc") + ClearClusterQueueMetrics("cq1") +} + func TestReportAndCleanupLocalQueueEvictedNumber(t *testing.T) { lq := LocalQueueReference{Name: kueue.LocalQueueName("lq1"), Namespace: "ns1"} ReportLocalQueueEvictedWorkloads(lq, "Preempted", "", "", nil, nil) @@ -268,7 +300,7 @@ func TestMetricsWithReplicaRoleLabel(t *testing.T) { ReportEvictedWorkloads("cq_standalone", "Preempted", "", "", nil, nil) expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cq_standalone", "replica_role", "standalone") - ReportPreemption("cq_standalone", "InClusterQueue", "cq_standalone", nil, nil) + ReportPreemption("cq_standalone", "InClusterQueue", "cq_standalone", "ns", "wl-standalone", nil, nil) expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cq_standalone", "replica_role", "standalone") lq := LocalQueueReference{Name: "lq_standalone", Namespace: "ns"} diff --git a/pkg/scheduler/preemption/preemption.go b/pkg/scheduler/preemption/preemption.go index a12a32dfc90..1b511e7d9bf 100644 --- a/pkg/scheduler/preemption/preemption.go +++ b/pkg/scheduler/preemption/preemption.go @@ -250,7 +250,15 @@ func (p *Preemptor) IssuePreemptions( "Preempted workload %s (UID: %s) in ClusterQueue %s; preemptor effective priority: %d (base: %d, boost: %d); preemptee effective priority: %d (base: %d, boost: %d)", klog.KObj(target.WorkloadInfo.Obj), target.WorkloadInfo.Obj.UID, target.WorkloadInfo.ClusterQueue, preemptorEffPri, preemptorBase, preemptorBoost, targetEffPri, targetBase, targetBoost) - workload.ReportPreemption(preemptor.ClusterQueue, target.Reason, target.WorkloadInfo.ClusterQueue, p.roleTracker, p.customLabels) + workload.ReportPreemption( + preemptor.ClusterQueue, + target.Reason, + target.WorkloadInfo.ClusterQueue, + target.WorkloadInfo.Obj.Namespace, + target.WorkloadInfo.Obj.Name, + p.roleTracker, + p.customLabels, + ) successfullyPreempted.Add(1) }) return int(successfullyPreempted.Load()), int(preemptionErrors.Load()), errCh.ReceiveError() diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index 018ba74162e..56340bc6cd2 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -1812,8 +1812,15 @@ func reportEvictedWorkload(recorder events.EventRecorder, wl *kueue.Workload, cq recorder.Eventf(wl, nil, corev1.EventTypeNormal, eventReason, eventReason, message) } -func ReportPreemption(preemptingCqName kueue.ClusterQueueReference, preemptingReason string, targetCqName kueue.ClusterQueueReference, tracker *roletracker.RoleTracker, cl *metrics.CustomLabels) { - metrics.ReportPreemption(preemptingCqName, preemptingReason, targetCqName, cl.CQGet(preemptingCqName), tracker) +func ReportPreemption( + preemptingCqName kueue.ClusterQueueReference, + preemptingReason string, + targetCqName kueue.ClusterQueueReference, + targetNamespace, targetWorkloadName string, + tracker *roletracker.RoleTracker, + cl *metrics.CustomLabels, +) { + metrics.ReportPreemption(preemptingCqName, preemptingReason, targetCqName, targetNamespace, targetWorkloadName, cl.CQGet(preemptingCqName), tracker) } func References(wls []*Info) []klog.ObjectRef { diff --git a/site/content/en/docs/reference/metrics.md b/site/content/en/docs/reference/metrics.md index bfa465ef377..dda8522f8fe 100644 --- a/site/content/en/docs/reference/metrics.md +++ b/site/content/en/docs/reference/metrics.md @@ -80,6 +80,16 @@ The following metrics are available only if `LocalQueueMetrics` feature gate is | `kueue_local_queue_status` | Gauge | Reports 'localQueue' with its 'active' status (with possible values 'True', 'False', or 'Unknown').
For a LocalQueue, the metric only reports a value of 1 for one of the statuses. | `name`: the name of the LocalQueue
`namespace`: the namespace of the LocalQueue
`active`: one of `True`, `False`, or `Unknown`
`replica_role`: one of `leader`, `follower`, or `standalone` | +## Workload Status + +Use the following metrics to monitor individual workload preemption activity: + + +| Metric name | Type | Description | Labels | +| --- | --- | --- | --- | +| `kueue_workload_preemptions` | Gauge | The cumulative number of times each workload has been preempted.
The label 'reason' can have the following values:
- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue.
- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota.
- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort Fair Sharing.
- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.
Uses a Gauge (rather than a Counter) so that the time series can be deleted when the workload finishes or is removed, keeping cardinality bounded.
This gauge is deleted when the workload is completed or deleted. | `namespace`: the namespace of the preempted workload
`workload`: the name of the preempted workload
`cluster_queue`: the ClusterQueue of the preempted workload
`reason`: eviction or preemption reason | + + ## Cohort Status diff --git a/site/content/zh-CN/docs/reference/metrics.md b/site/content/zh-CN/docs/reference/metrics.md index 9a68be16c2f..0dbef9f818e 100644 --- a/site/content/zh-CN/docs/reference/metrics.md +++ b/site/content/zh-CN/docs/reference/metrics.md @@ -78,6 +78,16 @@ Kueue 暴露了 [prometheus](https://prometheus.io) 指标来监控系统的健 | `kueue_local_queue_status` | 仪表盘 | 报告 'localQueue' 的 'active' 状态(可能的值为 'True'、'False' 或 'Unknown')。
对于 LocalQueue,指标仅报告其中一个状态的值为 1 | `name`: LocalQueue 的名称
`namespace`: LocalQueue 的命名空间
`active`: `True`、`False` 或 `Unknown` 其中之一
`replica_role`: `leader`、`follower` 或 `standalone` 其中之一 | +## 工作负载状态 + +使用以下指标监控各工作负载的抢占活动: + + +| 指标名称 | 类型 | 描述 | 标签 | +| --- | --- | --- | --- | +| `kueue_workload_preemptions` | 仪表盘 | 每个工作负载被抢占的累计次数。
`reason` 标签可取以下值:
- "InClusterQueue" 表示工作负载被同一 ClusterQueue 中的工作负载抢占。
- "InCohortReclamation" 表示工作负载被同一 cohort 中的工作负载抢占(名义配额回收)。
- "InCohortFairSharing" 表示工作负载被同一 cohort 中的工作负载抢占(公平共享)。
- "InCohortReclaimWhileBorrowing" 表示工作负载被同一 cohort 中的工作负载抢占(借用时回收名义配额)。
当工作负载完成或被删除时,此仪表盘条目将被删除。 | `namespace`: 被抢占工作负载的命名空间
`workload`: 被抢占工作负载的名称
`cluster_queue`: 被抢占工作负载所属的 ClusterQueue
`reason`: 驱逐或抢占原因 | + + ## Cohort 状态 diff --git a/test/e2e/singlecluster/baseline/metrics_test.go b/test/e2e/singlecluster/baseline/metrics_test.go index 36f26a57ded..23a11c4c94b 100644 --- a/test/e2e/singlecluster/baseline/metrics_test.go +++ b/test/e2e/singlecluster/baseline/metrics_test.go @@ -531,6 +531,7 @@ var _ = ginkgo.Describe("Metrics", ginkgo.Label("area:singlecluster", "feature:m {"kueue_evicted_workloads_total"}, {"kueue_evicted_workloads_once_total"}, {"kueue_preempted_workloads_total"}, + {"kueue_workload_preemptions", ns.Name}, {"kueue_local_queue_evicted_workloads_total", ns.Name, localQueue2.Name}, {"kueue_local_queue_resource_reservation", ns.Name, localQueue1.Name},