diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index 2d413954aff..cbaacdaa6b1 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -338,6 +338,23 @@ type MultiKueue struct { // GroupVersionKind (GVK) for MultiKueue operations. // +optional ExternalFrameworks []MultiKueueExternalFramework `json:"externalFrameworks,omitempty"` + + // IncrementalDispatcherConfig contains the configuration for the incremental dispatcher. + // This field is only valid when DispatcherName is set to the incremental dispatcher. + // Note: This field is going to be ignored when the MultiKueueIncrementalDispatcherConfig feature gate is disabled. + // +optional + IncrementalDispatcherConfig *IncrementalDispatcherConfig `json:"incrementalDispatcherConfig,omitempty"` +} + +// IncrementalDispatcherConfig holds configuration for the MultiKueue Incremental Dispatcher. +type IncrementalDispatcherConfig struct { + // StepSize defines the number of worker clusters the Incremental Dispatcher + // will query simultaneously. + // Minimum value is 1. If not set, defaults to 3. + // +optional + // +kubebuilder:default=3 + // +kubebuilder:validation:Minimum=1 + StepSize *int32 `json:"stepSize,omitempty"` } // MultiKueueExternalFramework defines a framework that is not built-in. diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index f3c396404d3..89dfbdb96f3 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -125,6 +125,8 @@ func SetDefaults_Configuration(cfg *Configuration) { cfg.MultiKueue.Origin = new(cmp.Or(ptr.Deref(cfg.MultiKueue.Origin, ""), DefaultMultiKueueOrigin)) cfg.MultiKueue.WorkerLostTimeout = cmp.Or(cfg.MultiKueue.WorkerLostTimeout, &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}) cfg.MultiKueue.DispatcherName = cmp.Or(cfg.MultiKueue.DispatcherName, new(MultiKueueDispatcherModeAllAtOnce)) + cfg.MultiKueue.IncrementalDispatcherConfig = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig, &IncrementalDispatcherConfig{}) + cfg.MultiKueue.IncrementalDispatcherConfig.StepSize = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig.StepSize, ptr.To[int32](3)) if fs := cfg.FairSharing; fs != nil && fs.Enable && len(fs.PreemptionStrategies) == 0 { fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare, LessThanInitialShare} diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index 7a91f5d5224..4fe1a2a760c 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -96,6 +96,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new(DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}, DispatcherName: new(MultiKueueDispatcherModeAllAtOnce), + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, } podsReadyTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} @@ -512,6 +515,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, DispatcherName: new(MultiKueueDispatcherModeIncremental), + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector, WaitForPodsReady: &WaitForPodsReady{}, @@ -542,6 +548,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new(DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, DispatcherName: defaultMultiKueue.DispatcherName, + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector, WaitForPodsReady: &WaitForPodsReady{}, @@ -570,6 +579,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute}, DispatcherName: defaultMultiKueue.DispatcherName, + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector, WaitForPodsReady: &WaitForPodsReady{}, diff --git a/apis/config/v1beta1/zz_generated.conversion.go b/apis/config/v1beta1/zz_generated.conversion.go index b5b0ec7340f..2f7a1499096 100644 --- a/apis/config/v1beta1/zz_generated.conversion.go +++ b/apis/config/v1beta1/zz_generated.conversion.go @@ -130,6 +130,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*IncrementalDispatcherConfig)(nil), (*v1beta2.IncrementalDispatcherConfig)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta1_IncrementalDispatcherConfig_To_v1beta2_IncrementalDispatcherConfig(a.(*IncrementalDispatcherConfig), b.(*v1beta2.IncrementalDispatcherConfig), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*v1beta2.IncrementalDispatcherConfig)(nil), (*IncrementalDispatcherConfig)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta2_IncrementalDispatcherConfig_To_v1beta1_IncrementalDispatcherConfig(a.(*v1beta2.IncrementalDispatcherConfig), b.(*IncrementalDispatcherConfig), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*v1beta2.Integrations)(nil), (*Integrations)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta2_Integrations_To_v1beta1_Integrations(a.(*v1beta2.Integrations), b.(*Integrations), scope) }); err != nil { @@ -638,6 +648,26 @@ func autoConvert_v1beta2_FairSharing_To_v1beta1_FairSharing(in *v1beta2.FairShar return nil } +func autoConvert_v1beta1_IncrementalDispatcherConfig_To_v1beta2_IncrementalDispatcherConfig(in *IncrementalDispatcherConfig, out *v1beta2.IncrementalDispatcherConfig, s conversion.Scope) error { + out.StepSize = (*int32)(unsafe.Pointer(in.StepSize)) + return nil +} + +// Convert_v1beta1_IncrementalDispatcherConfig_To_v1beta2_IncrementalDispatcherConfig is an autogenerated conversion function. +func Convert_v1beta1_IncrementalDispatcherConfig_To_v1beta2_IncrementalDispatcherConfig(in *IncrementalDispatcherConfig, out *v1beta2.IncrementalDispatcherConfig, s conversion.Scope) error { + return autoConvert_v1beta1_IncrementalDispatcherConfig_To_v1beta2_IncrementalDispatcherConfig(in, out, s) +} + +func autoConvert_v1beta2_IncrementalDispatcherConfig_To_v1beta1_IncrementalDispatcherConfig(in *v1beta2.IncrementalDispatcherConfig, out *IncrementalDispatcherConfig, s conversion.Scope) error { + out.StepSize = (*int32)(unsafe.Pointer(in.StepSize)) + return nil +} + +// Convert_v1beta2_IncrementalDispatcherConfig_To_v1beta1_IncrementalDispatcherConfig is an autogenerated conversion function. +func Convert_v1beta2_IncrementalDispatcherConfig_To_v1beta1_IncrementalDispatcherConfig(in *v1beta2.IncrementalDispatcherConfig, out *IncrementalDispatcherConfig, s conversion.Scope) error { + return autoConvert_v1beta2_IncrementalDispatcherConfig_To_v1beta1_IncrementalDispatcherConfig(in, out, s) +} + func autoConvert_v1beta1_Integrations_To_v1beta2_Integrations(in *Integrations, out *v1beta2.Integrations, s conversion.Scope) error { out.Frameworks = *(*[]string)(unsafe.Pointer(&in.Frameworks)) out.ExternalFrameworks = *(*[]string)(unsafe.Pointer(&in.ExternalFrameworks)) @@ -710,6 +740,7 @@ func autoConvert_v1beta1_MultiKueue_To_v1beta2_MultiKueue(in *MultiKueue, out *v out.WorkerLostTimeout = (*metav1.Duration)(unsafe.Pointer(in.WorkerLostTimeout)) out.DispatcherName = (*string)(unsafe.Pointer(in.DispatcherName)) out.ExternalFrameworks = *(*[]v1beta2.MultiKueueExternalFramework)(unsafe.Pointer(&in.ExternalFrameworks)) + out.IncrementalDispatcherConfig = (*v1beta2.IncrementalDispatcherConfig)(unsafe.Pointer(in.IncrementalDispatcherConfig)) return nil } @@ -725,6 +756,7 @@ func autoConvert_v1beta2_MultiKueue_To_v1beta1_MultiKueue(in *v1beta2.MultiKueue out.DispatcherName = (*string)(unsafe.Pointer(in.DispatcherName)) out.ExternalFrameworks = *(*[]MultiKueueExternalFramework)(unsafe.Pointer(&in.ExternalFrameworks)) // WARNING: in.ClusterProfile requires manual conversion: does not exist in peer-type + out.IncrementalDispatcherConfig = (*IncrementalDispatcherConfig)(unsafe.Pointer(in.IncrementalDispatcherConfig)) return nil } diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index 9e5a81bff2a..060a2d7cd46 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -359,6 +359,26 @@ func (in *FairSharing) DeepCopy() *FairSharing { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IncrementalDispatcherConfig) DeepCopyInto(out *IncrementalDispatcherConfig) { + *out = *in + if in.StepSize != nil { + in, out := &in.StepSize, &out.StepSize + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IncrementalDispatcherConfig. +func (in *IncrementalDispatcherConfig) DeepCopy() *IncrementalDispatcherConfig { + if in == nil { + return nil + } + out := new(IncrementalDispatcherConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Integrations) DeepCopyInto(out *Integrations) { *out = *in @@ -472,6 +492,11 @@ func (in *MultiKueue) DeepCopyInto(out *MultiKueue) { *out = make([]MultiKueueExternalFramework, len(*in)) copy(*out, *in) } + if in.IncrementalDispatcherConfig != nil { + in, out := &in.IncrementalDispatcherConfig, &out.IncrementalDispatcherConfig + *out = new(IncrementalDispatcherConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKueue. diff --git a/apis/config/v1beta2/configuration_types.go b/apis/config/v1beta2/configuration_types.go index ba47b8d2d3c..602b5ba3523 100644 --- a/apis/config/v1beta2/configuration_types.go +++ b/apis/config/v1beta2/configuration_types.go @@ -334,6 +334,23 @@ type MultiKueue struct { // ClusterProfile defines configuration for using the ClusterProfile API. // +optional ClusterProfile *ClusterProfile `json:"clusterProfile,omitempty"` + + // IncrementalDispatcherConfig contains the configuration for the incremental dispatcher. + // This field is only valid when DispatcherName is set to the incremental dispatcher. + // Note: This field is going to be ignored when the MultiKueueIncrementalDispatcherConfig feature gate is disabled. + // +optional + IncrementalDispatcherConfig *IncrementalDispatcherConfig `json:"incrementalDispatcherConfig,omitempty"` +} + +// IncrementalDispatcherConfig holds configuration for the MultiKueue Incremental Dispatcher. +type IncrementalDispatcherConfig struct { + // StepSize defines the number of worker clusters the Incremental Dispatcher + // will query simultaneously. + // Minimum value is 1. If not set, defaults to 3. + // +optional + // +kubebuilder:default=3 + // +kubebuilder:validation:Minimum=1 + StepSize *int32 `json:"stepSize,omitempty"` } // MultiKueueExternalFramework defines a framework that is not built-in. diff --git a/apis/config/v1beta2/defaults.go b/apis/config/v1beta2/defaults.go index 5423297dae7..f32a3d72f72 100644 --- a/apis/config/v1beta2/defaults.go +++ b/apis/config/v1beta2/defaults.go @@ -123,6 +123,8 @@ func SetDefaults_Configuration(cfg *Configuration) { cfg.MultiKueue.Origin = new(cmp.Or(ptr.Deref(cfg.MultiKueue.Origin, ""), DefaultMultiKueueOrigin)) cfg.MultiKueue.WorkerLostTimeout = cmp.Or(cfg.MultiKueue.WorkerLostTimeout, &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}) cfg.MultiKueue.DispatcherName = cmp.Or(cfg.MultiKueue.DispatcherName, new(MultiKueueDispatcherModeAllAtOnce)) + cfg.MultiKueue.IncrementalDispatcherConfig = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig, &IncrementalDispatcherConfig{}) + cfg.MultiKueue.IncrementalDispatcherConfig.StepSize = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig.StepSize, ptr.To[int32](3)) if afs := cfg.AdmissionFairSharing; afs != nil { afs.UsageSamplingInterval.Duration = cmp.Or(afs.UsageSamplingInterval.Duration, 5*time.Minute) diff --git a/apis/config/v1beta2/defaults_test.go b/apis/config/v1beta2/defaults_test.go index 79147d1f738..f5e76036404 100644 --- a/apis/config/v1beta2/defaults_test.go +++ b/apis/config/v1beta2/defaults_test.go @@ -96,6 +96,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new(DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}, DispatcherName: new(MultiKueueDispatcherModeAllAtOnce), + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, } defaultVisibilityServer := &VisibilityServerConfiguration{ @@ -554,6 +557,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, DispatcherName: new(MultiKueueDispatcherModeIncremental), + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector, VisibilityServer: defaultVisibilityServer, @@ -584,6 +590,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new(DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, DispatcherName: defaultMultiKueue.DispatcherName, + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector, VisibilityServer: defaultVisibilityServer, @@ -612,6 +621,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Origin: new("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute}, DispatcherName: defaultMultiKueue.DispatcherName, + IncrementalDispatcherConfig: &IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector, VisibilityServer: defaultVisibilityServer, diff --git a/apis/config/v1beta2/zz_generated.deepcopy.go b/apis/config/v1beta2/zz_generated.deepcopy.go index f84613522d6..93ee6125f99 100644 --- a/apis/config/v1beta2/zz_generated.deepcopy.go +++ b/apis/config/v1beta2/zz_generated.deepcopy.go @@ -382,6 +382,26 @@ func (in *FairSharing) DeepCopy() *FairSharing { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *IncrementalDispatcherConfig) DeepCopyInto(out *IncrementalDispatcherConfig) { + *out = *in + if in.StepSize != nil { + in, out := &in.StepSize, &out.StepSize + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new IncrementalDispatcherConfig. +func (in *IncrementalDispatcherConfig) DeepCopy() *IncrementalDispatcherConfig { + if in == nil { + return nil + } + out := new(IncrementalDispatcherConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Integrations) DeepCopyInto(out *Integrations) { *out = *in @@ -495,6 +515,11 @@ func (in *MultiKueue) DeepCopyInto(out *MultiKueue) { *out = new(ClusterProfile) (*in).DeepCopyInto(*out) } + if in.IncrementalDispatcherConfig != nil { + in, out := &in.IncrementalDispatcherConfig, &out.IncrementalDispatcherConfig + *out = new(IncrementalDispatcherConfig) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKueue. diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 976228e95e9..04a9656e585 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -416,6 +416,9 @@ objectRetentionPolicies: Origin: ptr.To(configapi.DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce), + IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, } defaultVisibility := &configapi.VisibilityServerConfiguration{ @@ -749,6 +752,9 @@ objectRetentionPolicies: Origin: new("multikueue-manager1"), WorkerLostTimeout: &metav1.Duration{Duration: 10 * time.Minute}, DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeIncremental), + IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, ClusterProfile: &configapi.ClusterProfile{ CredentialsProviders: []configapi.ClusterProfileCredentialsProvider{ { @@ -948,6 +954,9 @@ webhook: Origin: ptr.To(configapi.DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce), + IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -998,6 +1007,9 @@ webhook: Origin: ptr.To(configapi.DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce), + IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -1048,6 +1060,9 @@ webhook: Origin: ptr.To(configapi.DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce), + IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -1094,6 +1109,9 @@ webhook: Origin: ptr.To(configapi.DefaultMultiKueueOrigin), WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce), + IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](3), + }, }, ManagedJobsNamespaceSelector: &metav1.LabelSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ @@ -1245,6 +1263,9 @@ func TestEncode(t *testing.T) { "origin": "multikueue", "workerLostTimeout": "15m0s", "dispatcherName": configapi.MultiKueueDispatcherModeAllAtOnce, + "incrementalDispatcherConfig": map[string]any{ + "stepSize": int64(3), + }, }, "visibilityServer": map[string]any{ "bindPort": int64(8082), diff --git a/pkg/controller/workloaddispatcher/controllers.go b/pkg/controller/workloaddispatcher/controllers.go index ac7c1e394e2..39f6550f616 100644 --- a/pkg/controller/workloaddispatcher/controllers.go +++ b/pkg/controller/workloaddispatcher/controllers.go @@ -34,7 +34,7 @@ func SetupControllers(mgr ctrl.Manager, cfg *configapi.Configuration, roleTracke return "", err } - idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, roleTracker) + idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, roleTracker, cfg) err = idRec.SetupWithManager(mgr, cfg) if err != nil { return "multikueue-incremental-dispatcher", err diff --git a/pkg/controller/workloaddispatcher/incrementaldispatcher.go b/pkg/controller/workloaddispatcher/incrementaldispatcher.go index 4719a02749b..60ed7c63f3f 100644 --- a/pkg/controller/workloaddispatcher/incrementaldispatcher.go +++ b/pkg/controller/workloaddispatcher/incrementaldispatcher.go @@ -26,6 +26,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,6 +35,7 @@ import ( kueueconfig "sigs.k8s.io/kueue/apis/config/v1beta2" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/admissioncheck" utilmaps "sigs.k8s.io/kueue/pkg/util/maps" "sigs.k8s.io/kueue/pkg/util/roletracker" @@ -52,6 +54,7 @@ type IncrementalDispatcherReconciler struct { clock clock.Clock roundStartTimes *utilmaps.SyncMap[types.NamespacedName, time.Time] roleTracker *roletracker.RoleTracker + cfg *kueueconfig.Configuration } var realClock = clock.RealClock{} @@ -67,15 +70,22 @@ func (r *IncrementalDispatcherReconciler) SetupWithManager(mgr ctrl.Manager, cfg Complete(core.WithLeadingManager(mgr, r, &kueue.Workload{}, cfg)) } -func NewIncrementalDispatcherReconciler(c client.Client, helper *admissioncheck.MultiKueueStoreHelper, roleTracker *roletracker.RoleTracker) *IncrementalDispatcherReconciler { +func NewIncrementalDispatcherReconciler( + c client.Client, + helper *admissioncheck.MultiKueueStoreHelper, + roleTracker *roletracker.RoleTracker, + cfg *kueueconfig.Configuration, +) *IncrementalDispatcherReconciler { return &IncrementalDispatcherReconciler{ client: c, helper: helper, clock: realClock, roundStartTimes: utilmaps.NewSyncMap[types.NamespacedName, time.Time](0), roleTracker: roleTracker, + cfg: cfg, } } + func (r *IncrementalDispatcherReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx) wl := &kueue.Workload{} @@ -138,7 +148,7 @@ func (r *IncrementalDispatcherReconciler) nominateWorkers(ctx context.Context, w return reconcile.Result{RequeueAfter: remainingWaitTime}, nil } - nextNominatedWorkers, err := getNextNominatedWorkers(log, wl, remoteClusters) + nextNominatedWorkers, err := getNextNominatedWorkers(log, wl, remoteClusters, r.stepSize()) log.V(5).Info("revoke outdated nomination and nominate new worker clusters", "revokedWorkerClusters", wl.Status.NominatedClusterNames, "nominatedWorkerClusters", nextNominatedWorkers) if err != nil { log.Error(err, "Failed to nominate next worker clusters") @@ -159,9 +169,7 @@ func (r *IncrementalDispatcherReconciler) nominateWorkers(ctx context.Context, w return reconcile.Result{}, nil } -// getNextNominatedWorkers returns the next set of nominated workers for incremental dispatching. -// It nominates up to 3 remotes that have not yet been nominated, in sorted order. -func getNextNominatedWorkers(log logr.Logger, wl *kueue.Workload, remoteClusters sets.Set[string]) ([]string, error) { +func getNextNominatedWorkers(log logr.Logger, wl *kueue.Workload, remoteClusters sets.Set[string], batchSize int) ([]string, error) { alreadyNominated := sets.New(wl.Status.NominatedClusterNames...) workers := make([]string, 0, len(remoteClusters)) @@ -177,13 +185,26 @@ func getNextNominatedWorkers(log logr.Logger, wl *kueue.Workload, remoteClusters if len(workers) == 0 { return nil, ErrNoMoreWorkers } - batchSize := 3 if len(workers) < batchSize { return workers, nil } return workers[:batchSize], nil } +func (r *IncrementalDispatcherReconciler) stepSize() int { + const defaultStepSize = 3 + if !utilfeature.DefaultFeatureGate.Enabled(features.MultiKueueIncrementalDispatcherConfig) { + return defaultStepSize + } + if r.cfg != nil && + r.cfg.MultiKueue != nil && + r.cfg.MultiKueue.IncrementalDispatcherConfig != nil && + r.cfg.MultiKueue.IncrementalDispatcherConfig.StepSize != nil { + return int(*r.cfg.MultiKueue.IncrementalDispatcherConfig.StepSize) + } + return defaultStepSize +} + func (r *IncrementalDispatcherReconciler) setRoundStartTime(key types.NamespacedName, t time.Time) { r.roundStartTimes.Add(key, t) } diff --git a/pkg/controller/workloaddispatcher/incrementaldispatcher_test.go b/pkg/controller/workloaddispatcher/incrementaldispatcher_test.go index 0c92997731a..13c57dba39f 100644 --- a/pkg/controller/workloaddispatcher/incrementaldispatcher_test.go +++ b/pkg/controller/workloaddispatcher/incrementaldispatcher_test.go @@ -30,12 +30,15 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" testingclock "k8s.io/utils/clock/testing" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + kueueconfig "sigs.k8s.io/kueue/apis/config/v1beta2" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/admissioncheck" utilmaps "sigs.k8s.io/kueue/pkg/util/maps" utiltesting "sigs.k8s.io/kueue/pkg/util/testing" @@ -180,11 +183,14 @@ func TestIncrementalDispatcherNominateWorkers(t *testing.T) { testCases := map[string]struct { remoteClusters sets.Set[string] workload *kueue.Workload + cfg *kueueconfig.Configuration + featureGateDisabled bool wantNominatedClustersCount int wantErr error advanceRoundTime bool wantNominatedClusters []string }{ + "one remote": { remoteClusters: sets.New("A"), workload: baseWl.DeepCopy(), @@ -265,10 +271,65 @@ func TestIncrementalDispatcherNominateWorkers(t *testing.T) { advanceRoundTime: false, wantNominatedClusters: []string{}, }, + + "stepSize=2, five remotes — first batch is exactly 2": { + remoteClusters: sets.New("A", "B", "C", "D", "E"), + workload: baseWl.DeepCopy(), + cfg: &kueueconfig.Configuration{ + MultiKueue: &kueueconfig.MultiKueue{ + IncrementalDispatcherConfig: &kueueconfig.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](2), + }, + }, + }, + wantNominatedClustersCount: 2, + wantErr: nil, + advanceRoundTime: false, + wantNominatedClusters: []string{"A", "B"}, + }, + + "stepSize=2, round expired — second batch is next 2": { + remoteClusters: sets.New("A", "B", "C", "D", "E"), + workload: baseWl.Clone().NominatedClusterNames("A", "B").Obj(), + cfg: &kueueconfig.Configuration{ + MultiKueue: &kueueconfig.MultiKueue{ + IncrementalDispatcherConfig: &kueueconfig.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](2), + }, + }, + }, + wantNominatedClustersCount: 4, + wantErr: nil, + advanceRoundTime: true, + wantNominatedClusters: []string{"A", "B", "C", "D"}, + }, + + "feature gate disabled — ignores stepSize=10, uses default 3": { + remoteClusters: sets.New("A", "B", "C", "D", "E", "F", "G"), + workload: baseWl.DeepCopy(), + cfg: &kueueconfig.Configuration{ + MultiKueue: &kueueconfig.MultiKueue{ + IncrementalDispatcherConfig: &kueueconfig.IncrementalDispatcherConfig{ + StepSize: ptr.To[int32](10), + }, + }, + }, + featureGateDisabled: true, + wantNominatedClustersCount: 3, + wantErr: nil, + advanceRoundTime: false, + wantNominatedClusters: []string{"A", "B", "C"}, + }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { + if tc.featureGateDisabled { + features.SetFeatureGateDuringTest(t, features.MultiKueueIncrementalDispatcherConfig, false) + } else { + features.SetFeatureGateDuringTest(t, features.MultiKueueIncrementalDispatcherConfig, true) + } + scheme := runtime.NewScheme() if err := kueue.AddToScheme(scheme); err != nil { t.Fatalf("Fail to add to scheme %s", err) @@ -287,6 +348,7 @@ func TestIncrementalDispatcherNominateWorkers(t *testing.T) { client: client, clock: fakeClock, roundStartTimes: utilmaps.NewSyncMap[types.NamespacedName, time.Time](0), + cfg: tc.cfg, } key := types.NamespacedName{Namespace: tc.workload.Namespace, Name: tc.workload.Name} diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 14e76c2d089..4cf37510f81 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -380,6 +380,12 @@ const ( // deletes a Deployment-owned pod). FinishOrphanedWorkloads featuregate.Feature = "FinishOrphanedWorkloads" + // owner: @Mostafahassen1 + // + // kep: https://github.com/kubernetes-sigs/kueue/pull/10877#issuecomment-4412688735 + // Enables configurable stepSize for the MultiKueue Incremental Dispatcher. + MultiKueueIncrementalDispatcherConfig featuregate.Feature = "MultiKueueIncrementalDispatcherConfig" + // owner: @pbundyra // kep: https://github.com/kubernetes-sigs/kueue/tree/main/keps/8691-concurrent-admission // @@ -609,6 +615,9 @@ var defaultVersionedFeatureGates = map[featuregate.Feature]featuregate.Versioned FinishOrphanedWorkloads: { {Version: version.MustParse("0.18"), Default: false, PreRelease: featuregate.Alpha}, }, + MultiKueueIncrementalDispatcherConfig: { + {Version: version.MustParse("0.19"), Default: false, PreRelease: featuregate.Alpha}, + }, ConcurrentAdmission: { {Version: version.MustParse("0.18"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md index 3c8d8abfce6..1441e2901af 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta1.md +++ b/site/content/en/docs/reference/kueue-config.v1beta1.md @@ -669,6 +669,34 @@ The default strategy is ["LessThanOrEqualToFinalShare", "LessThan +## `IncrementalDispatcherConfig` {#config-kueue-x-k8s-io-v1beta1-IncrementalDispatcherConfig} + + +**Appears in:** + +- [MultiKueue](#config-kueue-x-k8s-io-v1beta1-MultiKueue) + + +

IncrementalDispatcherConfig holds configuration for the MultiKueue Incremental Dispatcher.

+ + + + + + + + + + + +
FieldDescription
stepSize
+int32 +
+

StepSize defines the number of worker clusters the Incremental Dispatcher +will query simultaneously. +Minimum value is 1. If not set, defaults to 3.

+
+ ## `Integrations` {#config-kueue-x-k8s-io-v1beta1-Integrations} @@ -893,6 +921,15 @@ by the generic MultiKueue adapter. Each entry defines how to handle a specific GroupVersionKind (GVK) for MultiKueue operations.

+incrementalDispatcherConfig
+IncrementalDispatcherConfig + + +

IncrementalDispatcherConfig contains the configuration for the incremental dispatcher. +This field is only valid when DispatcherName is set to the incremental dispatcher. +Note: This field is going to be ignored when the MultiKueueIncrementalDispatcherConfig feature gate is disabled.

+ + diff --git a/site/content/en/docs/reference/kueue-config.v1beta2.md b/site/content/en/docs/reference/kueue-config.v1beta2.md index bfa0c0e4ebf..8fa3b24e38f 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta2.md +++ b/site/content/en/docs/reference/kueue-config.v1beta2.md @@ -688,6 +688,34 @@ newest start time first. +## `IncrementalDispatcherConfig` {#config-kueue-x-k8s-io-v1beta2-IncrementalDispatcherConfig} + + +**Appears in:** + +- [MultiKueue](#config-kueue-x-k8s-io-v1beta2-MultiKueue) + + +

IncrementalDispatcherConfig holds configuration for the MultiKueue Incremental Dispatcher.

+ + + + + + + + + + + +
FieldDescription
stepSize
+int32 +
+

StepSize defines the number of worker clusters the Incremental Dispatcher +will query simultaneously. +Minimum value is 1. If not set, defaults to 3.

+
+ ## `Integrations` {#config-kueue-x-k8s-io-v1beta2-Integrations} @@ -909,6 +937,15 @@ GroupVersionKind (GVK) for MultiKueue operations.

ClusterProfile defines configuration for using the ClusterProfile API.

+incrementalDispatcherConfig
+IncrementalDispatcherConfig + + +

IncrementalDispatcherConfig contains the configuration for the incremental dispatcher. +This field is only valid when DispatcherName is set to the incremental dispatcher. +Note: This field is going to be ignored when the MultiKueueIncrementalDispatcherConfig feature gate is disabled.

+ + diff --git a/site/data/featuregates/versioned_feature_list.yaml b/site/data/featuregates/versioned_feature_list.yaml index c821ea7318c..ac22f4b3a87 100644 --- a/site/data/featuregates/versioned_feature_list.yaml +++ b/site/data/featuregates/versioned_feature_list.yaml @@ -229,6 +229,12 @@ lockToDefault: false preRelease: Alpha version: "0.15" +- name: MultiKueueIncrementalDispatcherConfig + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "0.19" - name: MultiKueueManagerQuotaAutomation versionedSpecs: - default: false diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index c821ea7318c..d190caa709d 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -235,6 +235,12 @@ lockToDefault: false preRelease: Alpha version: "0.18" +- name: MultiKueueIncrementalDispatcherConfig + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "0.19" - name: MultiKueueOrchestratedPreemption versionedSpecs: - default: false