Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

finishedCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadFinished)
if finishedCond != nil && finishedCond.Status == metav1.ConditionTrue {
metrics.ClearWorkloadPreemptionMetrics(wl.Namespace, wl.Name)
if r.workloadRetention == nil || r.workloadRetention.afterFinished == nil {
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -1137,6 +1138,7 @@ func (r *WorkloadReconciler) Delete(e event.TypedDeleteEvent[*kueue.Workload]) b
// Even if the state is unknown, the last cached state tells us whether the
// workload was in the queues and should be cleared from them.
r.queues.DeleteAndForgetWorkload(log, wlKey)
metrics.ClearWorkloadPreemptionMetrics(e.Object.Namespace, e.Object.Name)
return true
}

Expand Down
34 changes: 33 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ var (
// +metricsdoc:labels=preempting_cluster_queue="the ClusterQueue executing preemption",reason="eviction or preemption reason",replica_role="one of `leader`, `follower`, or `standalone`"
PreemptedWorkloadsTotal *prometheus.CounterVec

// +metricsdoc:group=workload
// +metricsdoc:labels=namespace="the namespace of the preempted workload",workload="the name of the preempted workload",cluster_queue="the ClusterQueue of the preempted workload",reason="eviction or preemption reason"
WorkloadPreemptionsCount *prometheus.GaugeVec

// +metricsdoc:group=clusterqueue
// +metricsdoc:labels=cluster_queue="the evicted workload's ClusterQueue from status.admission on the workload before quota was released (only present when the metric records a sample)",reason="eviction or preemption reason (same values as evicted_workloads_total)",replica_role="one of `leader`, `follower`, or `standalone`"
WorkloadEvictionLatencySeconds *prometheus.HistogramVec
Expand Down Expand Up @@ -651,6 +655,21 @@ The label 'reason' can have the following values:
}, append([]string{"preempting_cluster_queue", "reason", "replica_role"}, extraLabels...),
)

WorkloadPreemptionsCount = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: constants.KueueName,
Name: "workload_preemptions",
Help: `The cumulative number of times each workload has been preempted.
The label 'reason' can have the following values:
- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue.
- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota.
- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort Fair Sharing.
- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.
Uses a Gauge (rather than a Counter) so that the time series can be deleted when the workload finishes or is removed, keeping cardinality bounded.
This gauge is deleted when the workload is completed or deleted.`,
}, []string{"namespace", "workload", "cluster_queue", "reason"},
)

WorkloadEvictionLatencySeconds = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: constants.KueueName,
Expand Down Expand Up @@ -1008,9 +1027,21 @@ func ReportEvictedWorkloadsOnce(cqName kueue.ClusterQueueReference, reason, unde
EvictedWorkloadsOnceTotal.WithLabelValues(labels...).Inc()
}

func ReportPreemption(preemptingCqName kueue.ClusterQueueReference, preemptingReason string, targetCqName kueue.ClusterQueueReference, customLabelValues []string, tracker *roletracker.RoleTracker) {
func ReportPreemption(
preemptingCqName kueue.ClusterQueueReference,
preemptingReason string,
targetCqName kueue.ClusterQueueReference,
targetNamespace, targetWorkloadName string,
customLabelValues []string,
tracker *roletracker.RoleTracker,
) {
labels := append([]string{string(preemptingCqName), preemptingReason, roletracker.GetRole(tracker)}, customLabelValues...)
PreemptedWorkloadsTotal.WithLabelValues(labels...).Inc()
WorkloadPreemptionsCount.WithLabelValues(targetNamespace, targetWorkloadName, string(targetCqName), preemptingReason).Inc()
}

func ClearWorkloadPreemptionMetrics(namespace, workloadName string) {
WorkloadPreemptionsCount.DeletePartialMatch(prometheus.Labels{"namespace": namespace, "workload": workloadName})
}

func LQRefFromWorkload(wl *kueue.Workload) LocalQueueReference {
Expand Down Expand Up @@ -1321,6 +1352,7 @@ func Register() {
EvictedWorkloadsTotal,
EvictedWorkloadsOnceTotal,
PreemptedWorkloadsTotal,
WorkloadPreemptionsCount,
WorkloadEvictionLatencySeconds,
ReservingActiveWorkloads,
AdmittedActiveWorkloads,
Expand Down
42 changes: 37 additions & 5 deletions pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,21 +195,53 @@ func TestReportAndCleanupClusterQueueEvictedNumber(t *testing.T) {
}

func TestReportAndCleanupClusterQueuePreemptedNumber(t *testing.T) {
ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1", nil, nil)
ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1", nil, nil)
ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1", nil, nil)
ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1", nil, nil)
ReportPreemption("cluster_queue1", "InClusterQueue", "cluster_queue1", "ns1", "wl1", nil, nil)
ReportPreemption("cluster_queue1", "InCohortReclamation", "cluster_queue1", "ns1", "wl2", nil, nil)
ReportPreemption("cluster_queue1", "InCohortFairSharing", "cluster_queue1", "ns1", "wl3", nil, nil)
ReportPreemption("cluster_queue1", "InCohortReclaimWhileBorrowing", "cluster_queue1", "ns1", "wl4", nil, nil)

expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 4, "preempting_cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InClusterQueue")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortFairSharing")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclamation")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cluster_queue1", "reason", "InCohortReclaimWhileBorrowing")

expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 4, "namespace", "ns1", "cluster_queue", "cluster_queue1")
expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl1", "reason", "InClusterQueue")
expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl2", "reason", "InCohortReclamation")
expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl3", "reason", "InCohortFairSharing")
expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 1, "namespace", "ns1", "workload", "wl4", "reason", "InCohortReclaimWhileBorrowing")

ClearWorkloadPreemptionMetrics("ns1", "wl1")
expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 0, "namespace", "ns1", "workload", "wl1")
expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 3, "namespace", "ns1", "cluster_queue", "cluster_queue1")

ClearClusterQueueMetrics("cluster_queue1")
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 0, "preempting_cluster_queue", "cluster_queue1")
}

func TestWorkloadPreemptionsCountAccumulates(t *testing.T) {
// Preempt the same workload three times; the gauge value must accumulate.
ReportPreemption("cq1", "InClusterQueue", "cq1", "ns-acc", "wl-acc", nil, nil)
ReportPreemption("cq1", "InClusterQueue", "cq1", "ns-acc", "wl-acc", nil, nil)
ReportPreemption("cq1", "InClusterQueue", "cq1", "ns-acc", "wl-acc", nil, nil)

// One series, value == 3.
pts := metrics.CollectFilteredGaugeVec(WorkloadPreemptionsCount, prometheus.Labels{
"namespace": "ns-acc", "workload": "wl-acc", "reason": "InClusterQueue",
})
if len(pts) != 1 {
t.Fatalf("expected 1 data point, got %d", len(pts))
}
if pts[0].Value != 3 {
t.Errorf("expected gauge value 3 after 3 preemptions, got %v", pts[0].Value)
}

ClearWorkloadPreemptionMetrics("ns-acc", "wl-acc")
expectFilteredMetricsCount(t, WorkloadPreemptionsCount, 0, "namespace", "ns-acc", "workload", "wl-acc")
ClearClusterQueueMetrics("cq1")
}

func TestReportAndCleanupLocalQueueEvictedNumber(t *testing.T) {
lq := LocalQueueReference{Name: kueue.LocalQueueName("lq1"), Namespace: "ns1"}
ReportLocalQueueEvictedWorkloads(lq, "Preempted", "", "", nil, nil)
Expand Down Expand Up @@ -268,7 +300,7 @@ func TestMetricsWithReplicaRoleLabel(t *testing.T) {
ReportEvictedWorkloads("cq_standalone", "Preempted", "", "", nil, nil)
expectFilteredMetricsCount(t, EvictedWorkloadsTotal, 1, "cluster_queue", "cq_standalone", "replica_role", "standalone")

ReportPreemption("cq_standalone", "InClusterQueue", "cq_standalone", nil, nil)
ReportPreemption("cq_standalone", "InClusterQueue", "cq_standalone", "ns", "wl-standalone", nil, nil)
expectFilteredMetricsCount(t, PreemptedWorkloadsTotal, 1, "preempting_cluster_queue", "cq_standalone", "replica_role", "standalone")

lq := LocalQueueReference{Name: "lq_standalone", Namespace: "ns"}
Expand Down
10 changes: 9 additions & 1 deletion pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,15 @@ func (p *Preemptor) IssuePreemptions(
"Preempted workload %s (UID: %s) in ClusterQueue %s; preemptor effective priority: %d (base: %d, boost: %d); preemptee effective priority: %d (base: %d, boost: %d)",
klog.KObj(target.WorkloadInfo.Obj), target.WorkloadInfo.Obj.UID, target.WorkloadInfo.ClusterQueue,
preemptorEffPri, preemptorBase, preemptorBoost, targetEffPri, targetBase, targetBoost)
workload.ReportPreemption(preemptor.ClusterQueue, target.Reason, target.WorkloadInfo.ClusterQueue, p.roleTracker, p.customLabels)
workload.ReportPreemption(
preemptor.ClusterQueue,
target.Reason,
target.WorkloadInfo.ClusterQueue,
target.WorkloadInfo.Obj.Namespace,
target.WorkloadInfo.Obj.Name,
p.roleTracker,
p.customLabels,
)
successfullyPreempted.Add(1)
})
return int(successfullyPreempted.Load()), int(preemptionErrors.Load()), errCh.ReceiveError()
Expand Down
11 changes: 9 additions & 2 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -1812,8 +1812,15 @@ func reportEvictedWorkload(recorder events.EventRecorder, wl *kueue.Workload, cq
recorder.Eventf(wl, nil, corev1.EventTypeNormal, eventReason, eventReason, message)
}

func ReportPreemption(preemptingCqName kueue.ClusterQueueReference, preemptingReason string, targetCqName kueue.ClusterQueueReference, tracker *roletracker.RoleTracker, cl *metrics.CustomLabels) {
metrics.ReportPreemption(preemptingCqName, preemptingReason, targetCqName, cl.CQGet(preemptingCqName), tracker)
func ReportPreemption(
preemptingCqName kueue.ClusterQueueReference,
preemptingReason string,
targetCqName kueue.ClusterQueueReference,
targetNamespace, targetWorkloadName string,
tracker *roletracker.RoleTracker,
cl *metrics.CustomLabels,
) {
metrics.ReportPreemption(preemptingCqName, preemptingReason, targetCqName, targetNamespace, targetWorkloadName, cl.CQGet(preemptingCqName), tracker)
}

func References(wls []*Info) []klog.ObjectRef {
Expand Down
10 changes: 10 additions & 0 deletions site/content/en/docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ The following metrics are available only if `LocalQueueMetrics` feature gate is
| `kueue_local_queue_status` | Gauge | Reports 'localQueue' with its 'active' status (with possible values 'True', 'False', or 'Unknown').<br>For a LocalQueue, the metric only reports a value of 1 for one of the statuses. | `name`: the name of the LocalQueue<br> `namespace`: the namespace of the LocalQueue<br> `active`: one of `True`, `False`, or `Unknown`<br> `replica_role`: one of `leader`, `follower`, or `standalone` |
<!-- END GENERATED TABLE: localqueue -->

## Workload Status

Use the following metrics to monitor individual workload preemption activity:

<!-- BEGIN GENERATED TABLE: workload -->
| Metric name | Type | Description | Labels |
| --- | --- | --- | --- |
| `kueue_workload_preemptions` | Gauge | The cumulative number of times each workload has been preempted.<br>The label 'reason' can have the following values:<br>- "InClusterQueue" means that the workload was preempted by a workload in the same ClusterQueue.<br>- "InCohortReclamation" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota.<br>- "InCohortFairSharing" means that the workload was preempted by a workload in the same cohort Fair Sharing.<br>- "InCohortReclaimWhileBorrowing" means that the workload was preempted by a workload in the same cohort due to reclamation of nominal quota while borrowing.<br>Uses a Gauge (rather than a Counter) so that the time series can be deleted when the workload finishes or is removed, keeping cardinality bounded.<br>This gauge is deleted when the workload is completed or deleted. | `namespace`: the namespace of the preempted workload<br> `workload`: the name of the preempted workload<br> `cluster_queue`: the ClusterQueue of the preempted workload<br> `reason`: eviction or preemption reason |
<!-- END GENERATED TABLE: workload -->

## Cohort Status

<!-- BEGIN GENERATED TABLE: cohort -->
Expand Down
10 changes: 10 additions & 0 deletions site/content/zh-CN/docs/reference/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ Kueue 暴露了 [prometheus](https://prometheus.io) 指标来监控系统的健
| `kueue_local_queue_status` | 仪表盘 | 报告 'localQueue' 的 'active' 状态(可能的值为 'True'、'False' 或 'Unknown')。<br>对于 LocalQueue,指标仅报告其中一个状态的值为 1 | `name`: LocalQueue 的名称<br> `namespace`: LocalQueue 的命名空间<br> `active`: `True`、`False` 或 `Unknown` 其中之一<br> `replica_role`: `leader`、`follower` 或 `standalone` 其中之一 |
<!-- END GENERATED TABLE: localqueue -->

## 工作负载状态

使用以下指标监控各工作负载的抢占活动:

<!-- BEGIN GENERATED TABLE: workload -->
| 指标名称 | 类型 | 描述 | 标签 |
| --- | --- | --- | --- |
| `kueue_workload_preemptions` | 仪表盘 | 每个工作负载被抢占的累计次数。<br>`reason` 标签可取以下值:<br>- "InClusterQueue" 表示工作负载被同一 ClusterQueue 中的工作负载抢占。<br>- "InCohortReclamation" 表示工作负载被同一 cohort 中的工作负载抢占(名义配额回收)。<br>- "InCohortFairSharing" 表示工作负载被同一 cohort 中的工作负载抢占(公平共享)。<br>- "InCohortReclaimWhileBorrowing" 表示工作负载被同一 cohort 中的工作负载抢占(借用时回收名义配额)。<br>当工作负载完成或被删除时,此仪表盘条目将被删除。 | `namespace`: 被抢占工作负载的命名空间<br> `workload`: 被抢占工作负载的名称<br> `cluster_queue`: 被抢占工作负载所属的 ClusterQueue<br> `reason`: 驱逐或抢占原因 |
<!-- END GENERATED TABLE: workload -->

## Cohort 状态

<!-- BEGIN GENERATED TABLE: cohort -->
Expand Down
1 change: 1 addition & 0 deletions test/e2e/singlecluster/baseline/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ var _ = ginkgo.Describe("Metrics", ginkgo.Label("area:singlecluster", "feature:m
{"kueue_evicted_workloads_total"},
{"kueue_evicted_workloads_once_total"},
{"kueue_preempted_workloads_total"},
{"kueue_workload_preemptions", ns.Name},

{"kueue_local_queue_evicted_workloads_total", ns.Name, localQueue2.Name},
{"kueue_local_queue_resource_reservation", ns.Name, localQueue1.Name},
Expand Down