diff --git a/pkg/framework/plugins/nodeutilization/highnodeutilization.go b/pkg/framework/plugins/nodeutilization/highnodeutilization.go index 73633a52da..b75685b61c 100644 --- a/pkg/framework/plugins/nodeutilization/highnodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/highnodeutilization.go @@ -140,7 +140,8 @@ func (h *HighNodeUtilization) Name() string { func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { logger := klog.FromContext(klog.NewContext(ctx, h.logger)).WithValues("ExtensionPoint", frameworktypes.BalanceExtensionPoint) - if err := h.usageClient.sync(ctx, nodes); err != nil { + syncedNodes, err := h.usageClient.sync(ctx, nodes) + if err != nil { return &frameworktypes.Status{ Err: fmt.Errorf("error getting node usage: %v", err), } @@ -148,8 +149,8 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr // take a picture of the current state of the nodes, everything else // here is based on this snapshot. - nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, h.usageClient) - capacities := referencedResourceListForNodesCapacity(nodes) + nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(syncedNodes, h.usageClient) + capacities := referencedResourceListForNodesCapacity(syncedNodes) // node usages are not presented as percentages over the capacity. // we need to normalize them to be able to compare them with the @@ -232,7 +233,7 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr return nil } - if len(lowNodes) == len(nodes) { + if len(lowNodes) == len(syncedNodes) { logger.V(1).Info("All nodes are underutilized, nothing to do here") return nil } diff --git a/pkg/framework/plugins/nodeutilization/lownodeutilization.go b/pkg/framework/plugins/nodeutilization/lownodeutilization.go index 5748e73778..bd30f9e6ff 100644 --- a/pkg/framework/plugins/nodeutilization/lownodeutilization.go +++ b/pkg/framework/plugins/nodeutilization/lownodeutilization.go @@ -140,7 +140,8 @@ func (l *LowNodeUtilization) Name() string { func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status { logger := klog.FromContext(klog.NewContext(ctx, l.logger)).WithValues("ExtensionPoint", frameworktypes.BalanceExtensionPoint) - if err := l.usageClient.sync(ctx, nodes); err != nil { + syncedNodes, err := l.usageClient.sync(ctx, nodes) + if err != nil { return &frameworktypes.Status{ Err: fmt.Errorf("error getting node usage: %v", err), } @@ -149,8 +150,8 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra // starts by taking a snapshot ofthe nodes usage. we will use this // snapshot to assess the nodes usage and classify them as // underutilized or overutilized. - nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, l.usageClient) - capacities := referencedResourceListForNodesCapacity(nodes) + nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(syncedNodes, l.usageClient) + capacities := referencedResourceListForNodesCapacity(syncedNodes) // usage, by default, is exposed in absolute values. we need to normalize // them (convert them to percentages) to be able to compare them with the @@ -271,7 +272,7 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra return nil } - if len(lowNodes) == len(nodes) { + if len(lowNodes) == len(syncedNodes) { logger.V(1).Info("All nodes are underutilized, nothing to do here") return nil } diff --git a/pkg/framework/plugins/nodeutilization/usageclients.go b/pkg/framework/plugins/nodeutilization/usageclients.go index 90950d0b83..33ac1f50c2 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients.go +++ b/pkg/framework/plugins/nodeutilization/usageclients.go @@ -63,7 +63,10 @@ type usageClient interface { // Both low/high node utilization plugins are expected to invoke sync right // after Balance method is invoked. There's no cache invalidation so each // Balance is expected to get the latest data by invoking sync. - sync(ctx context.Context, nodes []*v1.Node) error + // sync returns the subset of input nodes for which usage data is + // available. Nodes without metrics (e.g. unreachable by metrics-server) + // are excluded from the returned list and logged at V(1). + sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error) nodeUtilization(node string) api.ReferencedResourceList pods(node string) []*v1.Pod podUsage(pod *v1.Pod) (api.ReferencedResourceList, error) @@ -105,7 +108,7 @@ func (s *requestedUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceList return usage, nil } -func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) error { +func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error) { s._nodeUtilization = make(map[string]api.ReferencedResourceList) s._pods = make(map[string][]*v1.Pod) @@ -113,7 +116,7 @@ func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) error pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil) if err != nil { klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) - return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + return nil, fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) } nodeUsage, err := nodeutil.NodeUtilization(pods, s.resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) { @@ -121,7 +124,7 @@ func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) error return req, nil }) if err != nil { - return err + return nil, err } // store the snapshot of pods from the same (or the closest) node utilization computation @@ -129,7 +132,7 @@ func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) error s._nodeUtilization[node.Name] = nodeUsage } - return nil + return nodes, nil } type actualUsageClient struct { @@ -191,41 +194,49 @@ func (client *actualUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceLi return totalUsage, nil } -func (client *actualUsageClient) sync(ctx context.Context, nodes []*v1.Node) error { +func (client *actualUsageClient) sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error) { client._nodeUtilization = make(map[string]api.ReferencedResourceList) client._pods = make(map[string][]*v1.Pod) nodesUsage, err := client.metricsCollector.AllNodesUsage() if err != nil { - return err + return nil, err } + var syncedNodes []*v1.Node for _, node := range nodes { + collectedNodeUsage, ok := nodesUsage[node.Name] + if !ok { + klog.V(1).InfoS("Node has no collected metrics and will be skipped", "node", klog.KObj(node)) + continue + } + pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) if err != nil { klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) - return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + return nil, fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) } - collectedNodeUsage, ok := nodesUsage[node.Name] - if !ok { - return fmt.Errorf("unable to find node %q in the collected metrics", node.Name) - } collectedNodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI) nodeUsage := api.ReferencedResourceList{} for _, resourceName := range client.resourceNames { if _, exists := collectedNodeUsage[resourceName]; !exists { - return fmt.Errorf("unable to find %q resource for collected %q node metric", resourceName, node.Name) + return nil, fmt.Errorf("unable to find %q resource for collected %q node metric", resourceName, node.Name) } nodeUsage[resourceName] = collectedNodeUsage[resourceName] } // store the snapshot of pods from the same (or the closest) node utilization computation client._pods[node.Name] = pods client._nodeUtilization[node.Name] = nodeUsage + syncedNodes = append(syncedNodes, node) + } + + if len(nodes) > 0 && len(syncedNodes) == 0 { + klog.InfoS("No nodes had available metrics, balance cycle will be a no-op", "totalNodes", len(nodes)) } - return nil + return syncedNodes, nil } type prometheusUsageClient struct { @@ -294,29 +305,36 @@ func NodeUsageFromPrometheusMetrics(ctx context.Context, promClient promapi.Clie return nodeUsages, nil } -func (client *prometheusUsageClient) sync(ctx context.Context, nodes []*v1.Node) error { +func (client *prometheusUsageClient) sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error) { client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity) client._pods = make(map[string][]*v1.Pod) nodeUsages, err := NodeUsageFromPrometheusMetrics(ctx, client.promClient, client.promQuery) if err != nil { - return err + return nil, err } + var syncedNodes []*v1.Node for _, node := range nodes { if _, exists := nodeUsages[node.Name]; !exists { - return fmt.Errorf("unable to find metric entry for %v", node.Name) + klog.V(1).InfoS("Node has no collected metrics and will be skipped", "node", klog.KObj(node)) + continue } pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil) if err != nil { klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err) - return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) + return nil, fmt.Errorf("error accessing %q node's pods: %v", node.Name, err) } // store the snapshot of pods from the same (or the closest) node utilization computation client._pods[node.Name] = pods client._nodeUtilization[node.Name] = nodeUsages[node.Name] + syncedNodes = append(syncedNodes, node) + } + + if len(nodes) > 0 && len(syncedNodes) == 0 { + klog.InfoS("No nodes had available metrics, balance cycle will be a no-op", "totalNodes", len(nodes)) } - return nil + return syncedNodes, nil } diff --git a/pkg/framework/plugins/nodeutilization/usageclients_test.go b/pkg/framework/plugins/nodeutilization/usageclients_test.go index 745b3a9bb2..214a9ab9b5 100644 --- a/pkg/framework/plugins/nodeutilization/usageclients_test.go +++ b/pkg/framework/plugins/nodeutilization/usageclients_test.go @@ -63,7 +63,7 @@ func updateMetricsAndCheckNodeUtilization( if err != nil { t.Fatalf("failed to capture metrics: %v", err) } - err = usageClient.sync(ctx, nodes) + _, err = usageClient.sync(ctx, nodes) if err != nil { t.Fatalf("failed to sync a snapshot: %v", err) } @@ -143,6 +143,144 @@ func TestActualUsageClient(t *testing.T) { ) } +func TestActualUsageClientSkipsNodesWithoutMetrics(t *testing.T) { + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil) + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p2 := test.BuildTestPod("p2", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + // Only register metrics for n1 and n3, not n2 + n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816) + n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816) + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p2, p3) + metricsClientset := fakemetricsclient.NewSimpleClientset() + metricsClientset.Tracker().Create(nodesgvr, n1metrics, "") + metricsClientset.Tracker().Create(nodesgvr, n3metrics, "") + + ctx := context.TODO() + + resourceNames := []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } + + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + collector := metricscollector.NewMetricsCollector(nodeLister, metricsClientset, labels.Everything()) + if err := collector.Collect(ctx); err != nil { + t.Fatalf("failed to collect metrics: %v", err) + } + + usageClient := newActualUsageClient( + resourceNames, + podsAssignedToNode, + collector, + ) + + nodes := []*v1.Node{n1, n2, n3} + syncedNodes, err := usageClient.sync(ctx, nodes) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // n2 should be excluded because it has no metrics + if len(syncedNodes) != 2 { + t.Fatalf("expected 2 synced nodes, got %v", len(syncedNodes)) + } + + syncedNodeNames := map[string]bool{} + for _, n := range syncedNodes { + syncedNodeNames[n.Name] = true + } + if !syncedNodeNames["n1"] || !syncedNodeNames["n3"] { + t.Fatalf("expected n1 and n3 in synced nodes, got %v", syncedNodeNames) + } + if syncedNodeNames["n2"] { + t.Fatal("n2 should not be in synced nodes (no metrics)") + } + + // Verify n1 and n3 have utilization data + if usageClient.nodeUtilization("n1") == nil { + t.Fatal("expected utilization data for n1") + } + if usageClient.nodeUtilization("n3") == nil { + t.Fatal("expected utilization data for n3") + } + // n2 should have no utilization data + if usageClient.nodeUtilization("n2") != nil { + t.Fatal("expected no utilization data for n2") + } +} + +func TestPrometheusUsageClientSkipsNodesWithoutMetrics(t *testing.T) { + n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil) + n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil) + n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil) + + p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil) + p2 := test.BuildTestPod("p2", 400, 0, n2.Name, nil) + p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil) + + nodes := []*v1.Node{n1, n2, n3} + + // Only provide metrics for n1 and n3, not n2 + pClient := &fakePromClient{ + result: model.Vector{ + sample("instance:node_cpu:rate:sum", "n1", 0.42), + sample("instance:node_cpu:rate:sum", "n3", 0.56), + }, + dataType: model.ValVector, + } + + clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p2, p3) + + ctx := context.TODO() + sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0) + podInformer := sharedInformerFactory.Core().V1().Pods().Informer() + podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer) + if err != nil { + t.Fatalf("Build get pods assigned to node function error: %v", err) + } + + sharedInformerFactory.Start(ctx.Done()) + sharedInformerFactory.WaitForCacheSync(ctx.Done()) + + prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum") + syncedNodes, err := prometheusUsageClient.sync(ctx, nodes) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // n2 should be excluded because it has no metrics + if len(syncedNodes) != 2 { + t.Fatalf("expected 2 synced nodes, got %v", len(syncedNodes)) + } + + syncedNodeNames := map[string]bool{} + for _, n := range syncedNodes { + syncedNodeNames[n.Name] = true + } + if !syncedNodeNames["n1"] || !syncedNodeNames["n3"] { + t.Fatalf("expected n1 and n3 in synced nodes, got %v", syncedNodeNames) + } + if syncedNodeNames["n2"] { + t.Fatal("n2 should not be in synced nodes (no metrics)") + } +} + type fakePromClient struct { result interface{} dataType model.ValueType @@ -272,7 +410,7 @@ func TestPrometheusUsageClient(t *testing.T) { sharedInformerFactory.WaitForCacheSync(ctx.Done()) prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum") - err = prometheusUsageClient.sync(ctx, nodes) + _, err = prometheusUsageClient.sync(ctx, nodes) if tc.err == nil { if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pr-body.md b/pr-body.md new file mode 100644 index 0000000000..e8f3b13a1e --- /dev/null +++ b/pr-body.md @@ -0,0 +1,34 @@ +## What type of PR is this? + +/kind bug + +## What this PR does / why we need it + +When using metrics-based node utilization (`KubernetesMetrics` or `Prometheus`), the `LowNodeUtilization` plugin fails entirely if **any** Ready node is missing from the collected metrics. This can happen when a node is Ready but unreachable by metrics-server (e.g. a roaming laptop node connected via WireGuard with intermittent connectivity — metrics-server reports "no route to host"). + +The metrics collector (`metricscollector.go`) already handles missing node metrics gracefully — it logs an error and continues to the next node. However, the downstream `actualUsageClient.sync()` and `prometheusUsageClient.sync()` methods treat a missing node as a **hard error**, returning `fmt.Errorf(...)` which aborts the entire balance cycle for **all** nodes. + +This means a single unreachable node prevents the descheduler from performing any load balancing, even among the remaining healthy nodes with valid metrics. + +Note: `HighNodeUtilization` currently only uses `requestedUsageClient` (which doesn't depend on metrics), so it is not affected by this bug today. However, the `usageClient` interface is shared, so this PR updates the interface and both callers for consistency. + +## How does this PR fix it + +- Change the `usageClient.sync()` interface to return `([]*v1.Node, error)` — the returned slice is the subset of input nodes for which usage data is available. +- `actualUsageClient.sync()` and `prometheusUsageClient.sync()` now log a warning at V(1) and skip nodes without metrics instead of returning a fatal error. +- `requestedUsageClient.sync()` returns the full input list (it doesn't depend on metrics). +- Both `LowNodeUtilization.Balance()` and `HighNodeUtilization.Balance()` now operate on the filtered node list returned by `sync()`. (`HighNodeUtilization` currently only uses `requestedUsageClient`, which always returns all nodes, but is updated for interface consistency.) + +## Which issue(s) this PR fixes + +None filed yet — discovered while running a mixed cluster with VPS + roaming nodes. + +## Special notes for your reviewer + +The `usageClient` interface is package-private, so the signature change only affects code within `nodeutilization/`. + +## Does this PR introduce a user-facing change? + +```release-note +nodeutilization plugins (LowNodeUtilization, HighNodeUtilization) now skip nodes without available metrics instead of failing the entire balance cycle. This allows the descheduler to continue operating when some Ready nodes are temporarily unreachable by metrics-server or Prometheus. +```