Skip to content
Open
17 changes: 17 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,23 @@ type MultiKueue struct {
// GroupVersionKind (GVK) for MultiKueue operations.
// +optional
ExternalFrameworks []MultiKueueExternalFramework `json:"externalFrameworks,omitempty"`

// IncrementalDispatcherConfig contains the configuration for the incremental dispatcher.
// This field is only valid when DispatcherName is set to the incremental dispatcher.
// Note: This field is going to be ignored when the MultiKueueIncrementalDispatcherConfig feature gate is disabled.
// +optional
IncrementalDispatcherConfig *IncrementalDispatcherConfig `json:"incrementalDispatcherConfig,omitempty"`
}

// IncrementalDispatcherConfig holds configuration for the MultiKueue Incremental Dispatcher.
type IncrementalDispatcherConfig struct {
// StepSize defines the number of worker clusters the Incremental Dispatcher
// will query simultaneously.
// Minimum value is 1. If not set, defaults to 3.
// +optional
// +kubebuilder:default=3
// +kubebuilder:validation:Minimum=1
StepSize *int32 `json:"stepSize,omitempty"`
}

// MultiKueueExternalFramework defines a framework that is not built-in.
Expand Down
2 changes: 2 additions & 0 deletions apis/config/v1beta1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func SetDefaults_Configuration(cfg *Configuration) {
cfg.MultiKueue.Origin = new(cmp.Or(ptr.Deref(cfg.MultiKueue.Origin, ""), DefaultMultiKueueOrigin))
cfg.MultiKueue.WorkerLostTimeout = cmp.Or(cfg.MultiKueue.WorkerLostTimeout, &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout})
cfg.MultiKueue.DispatcherName = cmp.Or(cfg.MultiKueue.DispatcherName, new(MultiKueueDispatcherModeAllAtOnce))
cfg.MultiKueue.IncrementalDispatcherConfig = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig, &IncrementalDispatcherConfig{})
cfg.MultiKueue.IncrementalDispatcherConfig.StepSize = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig.StepSize, ptr.To[int32](3))

if fs := cfg.FairSharing; fs != nil && fs.Enable && len(fs.PreemptionStrategies) == 0 {
fs.PreemptionStrategies = []PreemptionStrategy{LessThanOrEqualToFinalShare, LessThanInitialShare}
Expand Down
12 changes: 12 additions & 0 deletions apis/config/v1beta1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new(DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout},
DispatcherName: new(MultiKueueDispatcherModeAllAtOnce),
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
}

podsReadyTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
Expand Down Expand Up @@ -512,6 +515,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
DispatcherName: new(MultiKueueDispatcherModeIncremental),
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector,
WaitForPodsReady: &WaitForPodsReady{},
Expand Down Expand Up @@ -542,6 +548,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new(DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
DispatcherName: defaultMultiKueue.DispatcherName,
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector,
WaitForPodsReady: &WaitForPodsReady{},
Expand Down Expand Up @@ -570,6 +579,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute},
DispatcherName: defaultMultiKueue.DispatcherName,
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector,
WaitForPodsReady: &WaitForPodsReady{},
Expand Down
32 changes: 32 additions & 0 deletions apis/config/v1beta1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions apis/config/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions apis/config/v1beta2/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,23 @@ type MultiKueue struct {
// ClusterProfile defines configuration for using the ClusterProfile API.
// +optional
ClusterProfile *ClusterProfile `json:"clusterProfile,omitempty"`

// IncrementalDispatcherConfig contains the configuration for the incremental dispatcher.
// This field is only valid when DispatcherName is set to the incremental dispatcher.
// Note: This field is going to be ignored when the MultiKueueIncrementalDispatcherConfig feature gate is disabled.
// +optional
IncrementalDispatcherConfig *IncrementalDispatcherConfig `json:"incrementalDispatcherConfig,omitempty"`
}

// IncrementalDispatcherConfig holds configuration for the MultiKueue Incremental Dispatcher.
type IncrementalDispatcherConfig struct {
// StepSize defines the number of worker clusters the Incremental Dispatcher
// will query simultaneously.
// Minimum value is 1. If not set, defaults to 3.
// +optional
// +kubebuilder:default=3
// +kubebuilder:validation:Minimum=1
StepSize *int32 `json:"stepSize,omitempty"`
}

// MultiKueueExternalFramework defines a framework that is not built-in.
Expand Down
2 changes: 2 additions & 0 deletions apis/config/v1beta2/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func SetDefaults_Configuration(cfg *Configuration) {
cfg.MultiKueue.Origin = new(cmp.Or(ptr.Deref(cfg.MultiKueue.Origin, ""), DefaultMultiKueueOrigin))
cfg.MultiKueue.WorkerLostTimeout = cmp.Or(cfg.MultiKueue.WorkerLostTimeout, &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout})
cfg.MultiKueue.DispatcherName = cmp.Or(cfg.MultiKueue.DispatcherName, new(MultiKueueDispatcherModeAllAtOnce))
cfg.MultiKueue.IncrementalDispatcherConfig = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig, &IncrementalDispatcherConfig{})
cfg.MultiKueue.IncrementalDispatcherConfig.StepSize = cmp.Or(cfg.MultiKueue.IncrementalDispatcherConfig.StepSize, ptr.To[int32](3))

if afs := cfg.AdmissionFairSharing; afs != nil {
afs.UsageSamplingInterval.Duration = cmp.Or(afs.UsageSamplingInterval.Duration, 5*time.Minute)
Expand Down
12 changes: 12 additions & 0 deletions apis/config/v1beta2/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new(DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout},
DispatcherName: new(MultiKueueDispatcherModeAllAtOnce),
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
}

defaultVisibilityServer := &VisibilityServerConfiguration{
Expand Down Expand Up @@ -554,6 +557,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
DispatcherName: new(MultiKueueDispatcherModeIncremental),
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector,
VisibilityServer: defaultVisibilityServer,
Expand Down Expand Up @@ -584,6 +590,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new(DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
DispatcherName: defaultMultiKueue.DispatcherName,
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector,
VisibilityServer: defaultVisibilityServer,
Expand Down Expand Up @@ -612,6 +621,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Origin: new("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute},
DispatcherName: defaultMultiKueue.DispatcherName,
IncrementalDispatcherConfig: &IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: defaultManagedJobsNamespaceSelector,
VisibilityServer: defaultVisibilityServer,
Expand Down
25 changes: 25 additions & 0 deletions apis/config/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,9 @@ objectRetentionPolicies:
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout},
DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce),
IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
}

defaultVisibility := &configapi.VisibilityServerConfiguration{
Expand Down Expand Up @@ -749,6 +752,9 @@ objectRetentionPolicies:
Origin: new("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: 10 * time.Minute},
DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeIncremental),
IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
ClusterProfile: &configapi.ClusterProfile{
CredentialsProviders: []configapi.ClusterProfileCredentialsProvider{
{
Expand Down Expand Up @@ -948,6 +954,9 @@ webhook:
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout},
DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce),
IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand Down Expand Up @@ -998,6 +1007,9 @@ webhook:
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout},
DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce),
IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand Down Expand Up @@ -1048,6 +1060,9 @@ webhook:
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout},
DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce),
IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand Down Expand Up @@ -1094,6 +1109,9 @@ webhook:
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout},
DispatcherName: ptr.To(configapi.MultiKueueDispatcherModeAllAtOnce),
IncrementalDispatcherConfig: &configapi.IncrementalDispatcherConfig{
StepSize: ptr.To[int32](3),
},
},
ManagedJobsNamespaceSelector: &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
Expand Down Expand Up @@ -1245,6 +1263,9 @@ func TestEncode(t *testing.T) {
"origin": "multikueue",
"workerLostTimeout": "15m0s",
"dispatcherName": configapi.MultiKueueDispatcherModeAllAtOnce,
"incrementalDispatcherConfig": map[string]any{
"stepSize": int64(3),
},
},
"visibilityServer": map[string]any{
"bindPort": int64(8082),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workloaddispatcher/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func SetupControllers(mgr ctrl.Manager, cfg *configapi.Configuration, roleTracke
return "", err
}

idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, roleTracker)
idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, roleTracker, cfg)
err = idRec.SetupWithManager(mgr, cfg)
if err != nil {
return "multikueue-incremental-dispatcher", err
Expand Down
Loading