Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/framework/plugins/nodeutilization/highnodeutilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,17 @@ func (h *HighNodeUtilization) Name() string {
func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
logger := klog.FromContext(klog.NewContext(ctx, h.logger)).WithValues("ExtensionPoint", frameworktypes.BalanceExtensionPoint)

if err := h.usageClient.sync(ctx, nodes); err != nil {
syncedNodes, err := h.usageClient.sync(ctx, nodes)
if err != nil {
Comment thread
agentydragon marked this conversation as resolved.
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
}

// take a picture of the current state of the nodes, everything else
// here is based on this snapshot.
nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, h.usageClient)
capacities := referencedResourceListForNodesCapacity(nodes)
nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(syncedNodes, h.usageClient)
capacities := referencedResourceListForNodesCapacity(syncedNodes)

// node usages are not presented as percentages over the capacity.
// we need to normalize them to be able to compare them with the
Expand Down Expand Up @@ -232,7 +233,7 @@ func (h *HighNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fr
return nil
}

if len(lowNodes) == len(nodes) {
if len(lowNodes) == len(syncedNodes) {
logger.V(1).Info("All nodes are underutilized, nothing to do here")
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/framework/plugins/nodeutilization/lownodeutilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ func (l *LowNodeUtilization) Name() string {
func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status {
logger := klog.FromContext(klog.NewContext(ctx, l.logger)).WithValues("ExtensionPoint", frameworktypes.BalanceExtensionPoint)

if err := l.usageClient.sync(ctx, nodes); err != nil {
syncedNodes, err := l.usageClient.sync(ctx, nodes)
if err != nil {
return &frameworktypes.Status{
Err: fmt.Errorf("error getting node usage: %v", err),
}
Expand All @@ -149,8 +150,8 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
// starts by taking a snapshot ofthe nodes usage. we will use this
// snapshot to assess the nodes usage and classify them as
// underutilized or overutilized.
nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(nodes, l.usageClient)
capacities := referencedResourceListForNodesCapacity(nodes)
nodesMap, nodesUsageMap, podListMap := getNodeUsageSnapshot(syncedNodes, l.usageClient)
capacities := referencedResourceListForNodesCapacity(syncedNodes)

// usage, by default, is exposed in absolute values. we need to normalize
// them (convert them to percentages) to be able to compare them with the
Expand Down Expand Up @@ -271,7 +272,7 @@ func (l *LowNodeUtilization) Balance(ctx context.Context, nodes []*v1.Node) *fra
return nil
}

if len(lowNodes) == len(nodes) {
if len(lowNodes) == len(syncedNodes) {
logger.V(1).Info("All nodes are underutilized, nothing to do here")
return nil
}
Expand Down
56 changes: 37 additions & 19 deletions pkg/framework/plugins/nodeutilization/usageclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ type usageClient interface {
// Both low/high node utilization plugins are expected to invoke sync right
// after Balance method is invoked. There's no cache invalidation so each
// Balance is expected to get the latest data by invoking sync.
sync(ctx context.Context, nodes []*v1.Node) error
// sync returns the subset of input nodes for which usage data is
// available. Nodes without metrics (e.g. unreachable by metrics-server)
// are excluded from the returned list and logged at V(1).
sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error)
nodeUtilization(node string) api.ReferencedResourceList
pods(node string) []*v1.Pod
podUsage(pod *v1.Pod) (api.ReferencedResourceList, error)
Expand Down Expand Up @@ -105,31 +108,31 @@ func (s *requestedUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceList
return usage, nil
}

func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) error {
func (s *requestedUsageClient) sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error) {
s._nodeUtilization = make(map[string]api.ReferencedResourceList)
s._pods = make(map[string][]*v1.Pod)

for _, node := range nodes {
pods, err := podutil.ListPodsOnANode(node.Name, s.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
return nil, fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
}

nodeUsage, err := nodeutil.NodeUtilization(pods, s.resourceNames, func(pod *v1.Pod) (v1.ResourceList, error) {
req, _ := utils.PodRequestsAndLimits(pod)
return req, nil
})
if err != nil {
return err
return nil, err
}

// store the snapshot of pods from the same (or the closest) node utilization computation
s._pods[node.Name] = pods
s._nodeUtilization[node.Name] = nodeUsage
}

return nil
return nodes, nil
}

type actualUsageClient struct {
Expand Down Expand Up @@ -191,41 +194,49 @@ func (client *actualUsageClient) podUsage(pod *v1.Pod) (api.ReferencedResourceLi
return totalUsage, nil
}

func (client *actualUsageClient) sync(ctx context.Context, nodes []*v1.Node) error {
func (client *actualUsageClient) sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error) {
client._nodeUtilization = make(map[string]api.ReferencedResourceList)
client._pods = make(map[string][]*v1.Pod)

nodesUsage, err := client.metricsCollector.AllNodesUsage()
if err != nil {
return err
return nil, err
}

var syncedNodes []*v1.Node
for _, node := range nodes {
collectedNodeUsage, ok := nodesUsage[node.Name]
if !ok {
klog.V(1).InfoS("Node has no collected metrics and will be skipped", "node", klog.KObj(node))
continue
}

Comment thread
agentydragon marked this conversation as resolved.
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
return nil, fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
}

collectedNodeUsage, ok := nodesUsage[node.Name]
if !ok {
return fmt.Errorf("unable to find node %q in the collected metrics", node.Name)
}
collectedNodeUsage[v1.ResourcePods] = resource.NewQuantity(int64(len(pods)), resource.DecimalSI)

nodeUsage := api.ReferencedResourceList{}
for _, resourceName := range client.resourceNames {
if _, exists := collectedNodeUsage[resourceName]; !exists {
return fmt.Errorf("unable to find %q resource for collected %q node metric", resourceName, node.Name)
return nil, fmt.Errorf("unable to find %q resource for collected %q node metric", resourceName, node.Name)
}
nodeUsage[resourceName] = collectedNodeUsage[resourceName]
}
// store the snapshot of pods from the same (or the closest) node utilization computation
client._pods[node.Name] = pods
client._nodeUtilization[node.Name] = nodeUsage
syncedNodes = append(syncedNodes, node)
}

if len(nodes) > 0 && len(syncedNodes) == 0 {
klog.InfoS("No nodes had available metrics, balance cycle will be a no-op", "totalNodes", len(nodes))
}

return nil
return syncedNodes, nil
}

type prometheusUsageClient struct {
Expand Down Expand Up @@ -294,29 +305,36 @@ func NodeUsageFromPrometheusMetrics(ctx context.Context, promClient promapi.Clie
return nodeUsages, nil
}

func (client *prometheusUsageClient) sync(ctx context.Context, nodes []*v1.Node) error {
func (client *prometheusUsageClient) sync(ctx context.Context, nodes []*v1.Node) ([]*v1.Node, error) {
client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
client._pods = make(map[string][]*v1.Pod)

nodeUsages, err := NodeUsageFromPrometheusMetrics(ctx, client.promClient, client.promQuery)
if err != nil {
return err
return nil, err
}

var syncedNodes []*v1.Node
for _, node := range nodes {
if _, exists := nodeUsages[node.Name]; !exists {
return fmt.Errorf("unable to find metric entry for %v", node.Name)
klog.V(1).InfoS("Node has no collected metrics and will be skipped", "node", klog.KObj(node))
continue
}
Comment thread
agentydragon marked this conversation as resolved.
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
if err != nil {
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
return fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
return nil, fmt.Errorf("error accessing %q node's pods: %v", node.Name, err)
}

// store the snapshot of pods from the same (or the closest) node utilization computation
client._pods[node.Name] = pods
client._nodeUtilization[node.Name] = nodeUsages[node.Name]
syncedNodes = append(syncedNodes, node)
}

if len(nodes) > 0 && len(syncedNodes) == 0 {
klog.InfoS("No nodes had available metrics, balance cycle will be a no-op", "totalNodes", len(nodes))
}

return nil
return syncedNodes, nil
}
142 changes: 140 additions & 2 deletions pkg/framework/plugins/nodeutilization/usageclients_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func updateMetricsAndCheckNodeUtilization(
if err != nil {
t.Fatalf("failed to capture metrics: %v", err)
}
err = usageClient.sync(ctx, nodes)
_, err = usageClient.sync(ctx, nodes)
if err != nil {
t.Fatalf("failed to sync a snapshot: %v", err)
}
Expand Down Expand Up @@ -143,6 +143,144 @@ func TestActualUsageClient(t *testing.T) {
)
}

func TestActualUsageClientSkipsNodesWithoutMetrics(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)

p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
p2 := test.BuildTestPod("p2", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)

// Only register metrics for n1 and n3, not n2
n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)

clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p2, p3)
metricsClientset := fakemetricsclient.NewSimpleClientset()
metricsClientset.Tracker().Create(nodesgvr, n1metrics, "")
metricsClientset.Tracker().Create(nodesgvr, n3metrics, "")

ctx := context.TODO()

resourceNames := []v1.ResourceName{
v1.ResourceCPU,
v1.ResourceMemory,
}

sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

collector := metricscollector.NewMetricsCollector(nodeLister, metricsClientset, labels.Everything())
if err := collector.Collect(ctx); err != nil {
t.Fatalf("failed to collect metrics: %v", err)
}

usageClient := newActualUsageClient(
resourceNames,
podsAssignedToNode,
collector,
)

nodes := []*v1.Node{n1, n2, n3}
syncedNodes, err := usageClient.sync(ctx, nodes)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// n2 should be excluded because it has no metrics
if len(syncedNodes) != 2 {
t.Fatalf("expected 2 synced nodes, got %v", len(syncedNodes))
}

syncedNodeNames := map[string]bool{}
for _, n := range syncedNodes {
syncedNodeNames[n.Name] = true
}
if !syncedNodeNames["n1"] || !syncedNodeNames["n3"] {
t.Fatalf("expected n1 and n3 in synced nodes, got %v", syncedNodeNames)
}
if syncedNodeNames["n2"] {
t.Fatal("n2 should not be in synced nodes (no metrics)")
}

// Verify n1 and n3 have utilization data
if usageClient.nodeUtilization("n1") == nil {
t.Fatal("expected utilization data for n1")
}
if usageClient.nodeUtilization("n3") == nil {
t.Fatal("expected utilization data for n3")
}
// n2 should have no utilization data
if usageClient.nodeUtilization("n2") != nil {
t.Fatal("expected no utilization data for n2")
}
}

func TestPrometheusUsageClientSkipsNodesWithoutMetrics(t *testing.T) {
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)

p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
p2 := test.BuildTestPod("p2", 400, 0, n2.Name, nil)
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)

nodes := []*v1.Node{n1, n2, n3}

// Only provide metrics for n1 and n3, not n2
pClient := &fakePromClient{
result: model.Vector{
sample("instance:node_cpu:rate:sum", "n1", 0.42),
sample("instance:node_cpu:rate:sum", "n3", 0.56),
},
dataType: model.ValVector,
}

clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p2, p3)

ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
if err != nil {
t.Fatalf("Build get pods assigned to node function error: %v", err)
}

sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())

prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum")
syncedNodes, err := prometheusUsageClient.sync(ctx, nodes)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// n2 should be excluded because it has no metrics
if len(syncedNodes) != 2 {
t.Fatalf("expected 2 synced nodes, got %v", len(syncedNodes))
}

syncedNodeNames := map[string]bool{}
for _, n := range syncedNodes {
syncedNodeNames[n.Name] = true
}
if !syncedNodeNames["n1"] || !syncedNodeNames["n3"] {
t.Fatalf("expected n1 and n3 in synced nodes, got %v", syncedNodeNames)
}
if syncedNodeNames["n2"] {
t.Fatal("n2 should not be in synced nodes (no metrics)")
}
}

type fakePromClient struct {
result interface{}
dataType model.ValueType
Expand Down Expand Up @@ -272,7 +410,7 @@ func TestPrometheusUsageClient(t *testing.T) {
sharedInformerFactory.WaitForCacheSync(ctx.Done())

prometheusUsageClient := newPrometheusUsageClient(podsAssignedToNode, pClient, "instance:node_cpu:rate:sum")
err = prometheusUsageClient.sync(ctx, nodes)
_, err = prometheusUsageClient.sync(ctx, nodes)
if tc.err == nil {
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
Loading