Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 69 additions & 5 deletions pkg/controller/jobs/raycluster/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"strconv"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
rayutils "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation/field"
Expand All @@ -40,6 +44,9 @@ import (
)

const (
redisCleanupPodSetName = kueue.PodSetReference(rayv1.RedisCleanupNode)
maxRayClusterPodSets = 8
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a kueue limitation, not a raycluster, so let's put it in the constants file and name it MaxPodSetsSize or something similar.


// RayClusterPodsetReplicaSizesAnnotation is set on the job when autoscaling causes
// PodSet replica sizes to differ from the original spec. The value is a JSON
// array compatible with []kueue.PodSet, containing only the changed PodSets.
Expand All @@ -51,8 +58,8 @@ const (
RayClusterGenerationAnnotation = "kueue.x-k8s.io/raycluster-generation"
)

// BuildPodSets builds PodSets from RayClusterSpec
func BuildPodSets(rayClusterSpec *rayv1.RayClusterSpec) ([]kueue.PodSet, error) {
// BuildPodSets builds PodSets from RayClusterSpec.
func BuildPodSets(rayClusterSpec *rayv1.RayClusterSpec, annotations map[string]string) ([]kueue.PodSet, error) {
podSets := make([]kueue.PodSet, 0)

// head
Expand Down Expand Up @@ -96,9 +103,66 @@ func BuildPodSets(rayClusterSpec *rayv1.RayClusterSpec) ([]kueue.PodSet, error)
podSets = append(podSets, workerPodSet)
}

if shouldAccountForRedisCleanup(rayClusterSpec, annotations) {
podSets = append(podSets, buildRedisCleanupPodSet(rayClusterSpec))
}

return podSets, nil
}

func buildRedisCleanupPodSet(rayClusterSpec *rayv1.RayClusterSpec) kueue.PodSet {
template := *rayClusterSpec.HeadGroupSpec.Template.DeepCopy()
template.Labels = maps.Clone(template.Labels)
if template.Labels == nil {
template.Labels = make(map[string]string, 1)
}
template.Labels[rayutils.RayNodeTypeLabelKey] = string(rayv1.RedisCleanupNode)
if len(template.Spec.Containers) > 0 {
template.Spec.Containers = []corev1.Container{*template.Spec.Containers[rayutils.RayContainerIndex].DeepCopy()}
template.Spec.Containers[rayutils.RayContainerIndex].Resources = redisCleanupResourceRequirements()
}
template.Spec.RestartPolicy = corev1.RestartPolicyNever

return kueue.PodSet{
Name: redisCleanupPodSetName,
Template: template,
Count: 1,
}
}

func redisCleanupResourceRequirements() corev1.ResourceRequirements {
return corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("200m"),
corev1.ResourceMemory: resource.MustParse("256Mi"),
},
}
}

func shouldAccountForRedisCleanup(rayClusterSpec *rayv1.RayClusterSpec, annotations map[string]string) bool {
return features.Enabled(features.KubeRayAccountForRedisCleanup) && rayutils.IsGCSFaultToleranceEnabled(rayClusterSpec, annotations)
}

func ExpectedPodSetsCount(rayClusterSpec *rayv1.RayClusterSpec, annotations map[string]string) int {
count := len(rayClusterSpec.WorkerGroupSpecs) + 1
if shouldAccountForRedisCleanup(rayClusterSpec, annotations) {
count++
}
return count
}

func maxWorkerGroupsForSpec(rayClusterSpec *rayv1.RayClusterSpec, annotations map[string]string) int {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need both ExpectedPodSetsCount and maxWorkerGroupsForSpec, since they are counting the same thing.
For example, instead of limiting workers in 284, we can just verify that ExpectedPodSetsCount is <= maxPodSetsSize (8).

maxWorkerGroups := maxRayClusterPodSets - 1
if shouldAccountForRedisCleanup(rayClusterSpec, annotations) {
maxWorkerGroups--
}
return maxWorkerGroups
}

func UpdatePodSets(ctx context.Context, podSets []kueue.PodSet, c client.Client, object client.Object, enableInTreeAutoscaling *bool, rayClusterName string) ([]kueue.PodSet, error) {
log := ctrl.LoggerFrom(ctx)

Expand Down Expand Up @@ -216,9 +280,9 @@ func ValidateCreate(object client.Object, rayClusterSpec *rayv1.RayClusterSpec,
)
}

// Should limit the worker count to 8 - 1 (max podSets num - cluster head)
if len(rayClusterSpec.WorkerGroupSpecs) > 7 {
allErrors = append(allErrors, field.TooMany(rayClusterSpecPath.Child("workerGroupSpecs"), len(rayClusterSpec.WorkerGroupSpecs), 7))
// Should limit the worker count to the maximum PodSet count, minus the head and optional Redis cleanup PodSets.
if maxWorkerGroups := maxWorkerGroupsForSpec(rayClusterSpec, object.GetAnnotations()); len(rayClusterSpec.WorkerGroupSpecs) > maxWorkerGroups {
allErrors = append(allErrors, field.TooMany(rayClusterSpecPath.Child("workerGroupSpecs"), len(rayClusterSpec.WorkerGroupSpecs), maxWorkerGroups))
}

// None of the workerGroups should be named "head"
Expand Down
Loading