diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 7e34efa7f2b..b7d32453a16 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -759,7 +759,12 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group // supporting preferred or required placement constraints. if clusterName := workload.ClusterName(group.local); group.IsElasticWorkload() && clusterName != "" { nominatedWorkers = []string{clusterName} - } else if w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce { + } else if !features.Enabled(features.MultiKueueAllAtOnceExternal) && w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce { + // Legacy inline AllAtOnce nomination path. Kept under feature gate so + // operators can roll back to the pre-#10937 behavior if the dedicated + // AllAtOnceDispatcherReconciler controller misbehaves. When the + // MultiKueueAllAtOnceExternal gate goes GA, this branch (and the + // surrounding gate check) can be removed. for workerName := range group.remotes { nominatedWorkers = append(nominatedWorkers, workerName) } @@ -773,7 +778,9 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group } } } else { - // Incremental dispatcher and External dispatcher path + // A dispatcher placed outside this file (AllAtOnce when MultiKueueAllAtOnceExternal=true, + // Incremental, or External) is responsible for populating + // Status.NominatedClusterNames; the synchronizer just reads it. nominatedWorkers = group.local.Status.NominatedClusterNames } @@ -790,6 +797,18 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group } } } else if remoteWl != nil { + // Preserve a remote workload that is currently Evicted: reconcileGroup + // needs it (with WorkloadEvicted=True) to call SyncJob and + // propagate the remote job's termination back to the manager Job. + // Deleting it here breaks the eviction-recovery flow because + // bestMatchByCondition(WorkloadEvicted) becomes nil, SyncJob is no + // longer called, and the manager Job's Status.Active is never updated. + // The remote will be cleaned up later, once the local workload is + // re-admitted (or finishes / loses its quota reservation). + if workload.IsEvicted(remoteWl) { + log.V(3).Info("Preserving evicted remote workload to allow eviction-recovery sync", "remote", rem) + continue + } if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil { log.V(2).Error(err, "removing non-nominated remote object", "remote", rem) errs = append(errs, err) diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index f0e56bc1dbf..d956b7dfcff 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -321,6 +321,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, useSecondWorker: true, @@ -351,6 +352,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, worker1Workloads: []kueue.Workload{ @@ -389,6 +391,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, managersJobs: []batchv1.Job{ @@ -1135,6 +1138,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, managersJobs: []batchv1.Job{ @@ -1178,6 +1182,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, managersJobs: []batchv1.Job{ @@ -1283,6 +1288,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, managersJobs: []batchv1.Job{ @@ -1372,6 +1378,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, managersJobs: []batchv1.Job{ @@ -1461,6 +1468,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now). + NominatedClusterNames("worker1", "worker2"). Obj(), }, managersJobs: []batchv1.Job{ @@ -1717,28 +1725,41 @@ func TestNominateAndSynchronizeWorkers_MoreCases(t *testing.T) { now := time.Now() tests := []struct { - name string - dispatcherMode string - remotes map[string]*kueue.Workload - nominatedWorkers []string - cond *metav1.Condition - createErr error - wantCreated []string - wantErr bool + name string + dispatcherMode string + allAtOnceExternalGate *bool // nil = use default; otherwise overrides MultiKueueAllAtOnceExternal for this case + remotes map[string]*kueue.Workload + nominatedWorkers []string + cond *metav1.Condition + createErr error + wantCreated []string + wantErr bool }{ { - name: "AllClusters: clone to all remotes, nominates all", - dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce, - remotes: map[string]*kueue.Workload{remoteNames[0]: nil, remoteNames[1]: nil}, - wantCreated: []string{remoteNames[0], remoteNames[1]}, + name: "AllClusters: clone to all remotes, nominates all", + dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce, + remotes: map[string]*kueue.Workload{remoteNames[0]: nil, remoteNames[1]: nil}, + nominatedWorkers: []string{remoteNames[0], remoteNames[1]}, + wantCreated: []string{remoteNames[0], remoteNames[1]}, }, { - name: "AllClusters: workloads already created on remotes, do not create again", - dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce, - remotes: map[string]*kueue.Workload{remoteNames[0]: {}, remoteNames[1]: {}}, - wantCreated: nil, + name: "AllClusters: workloads already created on remotes, do not create again", + dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce, + remotes: map[string]*kueue.Workload{remoteNames[0]: {}, remoteNames[1]: {}}, + nominatedWorkers: []string{remoteNames[0], remoteNames[1]}, + wantCreated: nil, }, - // Incremental dispatcher tests were moved to a separate file. + { + // Legacy inline AllAtOnce path: with the feature gate disabled, + // nominateAndSynchronizeWorkers itself populates NominatedClusterNames + // from group.remotes (no pre-populated nominatedWorkers needed). + name: "AllClusters legacy: gate off, inline branch nominates from group.remotes", + dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce, + allAtOnceExternalGate: new(bool), + remotes: map[string]*kueue.Workload{remoteNames[0]: nil, remoteNames[1]: nil}, + wantCreated: []string{remoteNames[0], remoteNames[1]}, + }, + // Incremental and AllAtOnce dispatcher unit tests live in pkg/controller/workloaddispatcher. { name: "External controller: no nominated workers, nothing created", dispatcherMode: externalMultiKueueDispatcherController, @@ -1763,6 +1784,9 @@ func TestNominateAndSynchronizeWorkers_MoreCases(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + if tt.allAtOnceExternalGate != nil { + features.SetFeatureGateDuringTest(t, features.MultiKueueAllAtOnceExternal, *tt.allAtOnceExternalGate) + } fakeClock := testingclock.NewFakeClock(now) local := &kueue.Workload{ diff --git a/pkg/controller/workloaddispatcher/allatoncedispatcher.go b/pkg/controller/workloaddispatcher/allatoncedispatcher.go new file mode 100644 index 00000000000..8335586fbe2 --- /dev/null +++ b/pkg/controller/workloaddispatcher/allatoncedispatcher.go @@ -0,0 +1,315 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloaddispatcher + +import ( + "context" + "errors" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/equality" + apimeta "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + kueueconfig "sigs.k8s.io/kueue/apis/config/v1beta2" + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" + "sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue" + "sigs.k8s.io/kueue/pkg/controller/core" + "sigs.k8s.io/kueue/pkg/util/admissioncheck" + "sigs.k8s.io/kueue/pkg/util/roletracker" + "sigs.k8s.io/kueue/pkg/workload" +) + +type AllAtOnceDispatcherReconciler struct { + client client.Client + helper *admissioncheck.MultiKueueStoreHelper + clock clock.Clock + roleTracker *roletracker.RoleTracker +} + +var _ reconcile.Reconciler = (*AllAtOnceDispatcherReconciler)(nil) + +const AllAtOnceDispatcherControllerName = "multikueue_all_at_once_dispatcher" + +func (r *AllAtOnceDispatcherReconciler) SetupWithManager(mgr ctrl.Manager, cfg *kueueconfig.Configuration) error { + return ctrl.NewControllerManagedBy(mgr). + Named(AllAtOnceDispatcherControllerName). + For(&kueue.Workload{}). + Watches(&kueue.MultiKueueConfig{}, &allAtOnceConfigHandler{client: r.client}). + Watches(&kueue.MultiKueueCluster{}, &allAtOnceClusterHandler{client: r.client}). + WithLogConstructor(roletracker.NewLogConstructor(r.roleTracker, AllAtOnceDispatcherControllerName)). + Complete(core.WithLeadingManager(mgr, r, &kueue.Workload{}, cfg)) +} + +func NewAllAtOnceDispatcherReconciler(c client.Client, helper *admissioncheck.MultiKueueStoreHelper, roleTracker *roletracker.RoleTracker) *AllAtOnceDispatcherReconciler { + return &AllAtOnceDispatcherReconciler{ + client: c, + helper: helper, + clock: realClock, + roleTracker: roleTracker, + } +} + +func (r *AllAtOnceDispatcherReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := ctrl.LoggerFrom(ctx) + wl := &kueue.Workload{} + if err := r.client.Get(ctx, req.NamespacedName, wl); err != nil { + log.Error(err, "Failed to retrieve Workload, skip the reconciliation") + return reconcile.Result{}, err + } + + if !wl.DeletionTimestamp.IsZero() { + log.V(3).Info("Workload is deleted, skip the reconciliation") + return reconcile.Result{}, nil + } + + mkAc, err := admissioncheck.GetMultiKueueAdmissionCheck(ctx, r.client, wl) + if err != nil { + log.Error(err, "Can not get MultiKueue AdmissionCheckState") + return reconcile.Result{}, err + } + + if mkAc == nil || mkAc.State != kueue.CheckStatePending { + log.V(3).Info("MultiKueue AdmissionCheckState is not Pending, skip the reconciliation") + return reconcile.Result{}, nil + } + + // The workload is already assigned to a cluster, no need to nominate workers. + if wl.Status.ClusterName != nil { + log.V(3).Info("The workload is already assigned to a cluster, no need to nominate workers") + return reconcile.Result{}, nil + } + + remoteClusters, err := admissioncheck.GetRemoteClusters(ctx, r.helper, mkAc.Name) + if err != nil { + log.Error(err, "Can not get workload group") + return reconcile.Result{}, err + } + + if workload.IsFinished(wl) || !workload.HasQuotaReservation(wl) { + log.V(3).Info("Workload is already finished or has no quota reserved, skip the reconciliation") + return reconcile.Result{}, nil + } + + // The workload is being evicted; let the core eviction flow complete (the Job + // reconciler will UnsetQuotaReservation once the job is no longer active, and + // the scheduler will requeue it) before re-nominating clusters. Re-nominating + // during eviction races with the post-eviction cleanup and prevents the + // workload from re-entering the queue. + if workload.IsEvicted(wl) { + log.V(3).Info("Workload is being evicted, skip the reconciliation") + return reconcile.Result{}, nil + } + + activeClusters, err := r.filterActiveClusters(ctx, remoteClusters) + if err != nil { + log.Error(err, "Failed to filter active clusters") + return reconcile.Result{}, err + } + + log.V(3).Info("Nominate Worker Clusters with AllAtOnce Dispatcher") + return r.nominateWorkers(ctx, wl, activeClusters, log) +} + +// filterActiveClusters returns the subset of remoteClusters whose MultiKueueCluster +// has the MultiKueueClusterActive condition set to True. Clusters that are missing +// or not active are excluded so they are not nominated for workload placement. +func (r *AllAtOnceDispatcherReconciler) filterActiveClusters(ctx context.Context, remoteClusters sets.Set[string]) (sets.Set[string], error) { + active := sets.New[string]() + for clusterName := range remoteClusters { + cluster := &kueue.MultiKueueCluster{} + if err := r.client.Get(ctx, types.NamespacedName{Name: clusterName}, cluster); err != nil { + if client.IgnoreNotFound(err) != nil { + return nil, err + } + // Missing cluster: skip. + continue + } + if apimeta.IsStatusConditionTrue(cluster.Status.Conditions, kueue.MultiKueueClusterActive) { + active.Insert(clusterName) + } + } + return active, nil +} + +func (r *AllAtOnceDispatcherReconciler) nominateWorkers(ctx context.Context, wl *kueue.Workload, remoteClusters sets.Set[string], log logr.Logger) (reconcile.Result, error) { + nominatedWorkers := sets.List(remoteClusters) + + if equality.Semantic.DeepEqual(wl.Status.NominatedClusterNames, nominatedWorkers) { + log.V(5).Info("Nominated cluster names already up to date, skip the reconciliation", "nominatedClusterNames", nominatedWorkers) + return reconcile.Result{}, nil + } + + log.V(5).Info("Nominating worker clusters", "nominatedClusterNames", nominatedWorkers) + if err := workload.PatchAdmissionStatus(ctx, r.client, wl, r.clock, func(wl *kueue.Workload) (bool, error) { + wl.Status.NominatedClusterNames = nominatedWorkers + return true, nil + }); err != nil { + log.V(2).Error(err, "Failed to patch nominated clusters") + return reconcile.Result{}, err + } + + return reconcile.Result{}, nil +} + +// Inline version had access to config and cluster handlers for free, we need to add them here. +// allAtOnceConfigHandler enqueues all Workloads referencing AdmissionChecks that +// use a given MultiKueueConfig whenever the config is created, updated or deleted. +type allAtOnceConfigHandler struct { + client client.Client +} + +var _ handler.EventHandler = (*allAtOnceConfigHandler)(nil) + +func (h *allAtOnceConfigHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + cfg, ok := e.Object.(*kueue.MultiKueueConfig) + if !ok { + return + } + if err := queueWorkloadsForConfig(ctx, h.client, cfg.Name, q); err != nil { + ctrl.LoggerFrom(ctx).V(2).Error(err, "Failed to queue workloads on config create", "multiKueueConfig", cfg.Name) + } +} + +func (h *allAtOnceConfigHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + oldCfg, isOld := e.ObjectOld.(*kueue.MultiKueueConfig) + newCfg, isNew := e.ObjectNew.(*kueue.MultiKueueConfig) + if !isOld || !isNew { + return + } + if equality.Semantic.DeepEqual(oldCfg.Spec.Clusters, newCfg.Spec.Clusters) { + return + } + if err := queueWorkloadsForConfig(ctx, h.client, newCfg.Name, q); err != nil { + ctrl.LoggerFrom(ctx).V(2).Error(err, "Failed to queue workloads on config update", "multiKueueConfig", newCfg.Name) + } +} + +func (h *allAtOnceConfigHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + cfg, ok := e.Object.(*kueue.MultiKueueConfig) + if !ok { + return + } + if err := queueWorkloadsForConfig(ctx, h.client, cfg.Name, q); err != nil { + ctrl.LoggerFrom(ctx).V(2).Error(err, "Failed to queue workloads on config delete", "multiKueueConfig", cfg.Name) + } +} + +func (h *allAtOnceConfigHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { +} + +// allAtOnceClusterHandler enqueues all Workloads referencing AdmissionChecks +// whose MultiKueueConfig contains the cluster, but only when the cluster's +// activity status (MultiKueueClusterActive) actually transitions. +type allAtOnceClusterHandler struct { + client client.Client +} + +var _ handler.EventHandler = (*allAtOnceClusterHandler)(nil) + +func (h *allAtOnceClusterHandler) Create(ctx context.Context, e event.CreateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + cluster, ok := e.Object.(*kueue.MultiKueueCluster) + if !ok { + return + } + if err := h.queueForCluster(ctx, cluster.Name, q); err != nil { + ctrl.LoggerFrom(ctx).V(2).Error(err, "Failed to queue workloads on cluster create", "multiKueueCluster", cluster.Name) + } +} + +func (h *allAtOnceClusterHandler) Update(ctx context.Context, e event.UpdateEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + oldCluster, isOld := e.ObjectOld.(*kueue.MultiKueueCluster) + newCluster, isNew := e.ObjectNew.(*kueue.MultiKueueCluster) + if !isOld || !isNew { + return + } + oldActive := apimeta.FindStatusCondition(oldCluster.Status.Conditions, kueue.MultiKueueClusterActive) + newActive := apimeta.FindStatusCondition(newCluster.Status.Conditions, kueue.MultiKueueClusterActive) + if conditionStatusEqual(oldActive, newActive) { + return + } + if err := h.queueForCluster(ctx, newCluster.Name, q); err != nil { + ctrl.LoggerFrom(ctx).V(2).Error(err, "Failed to queue workloads on cluster update", "multiKueueCluster", newCluster.Name) + } +} + +func (h *allAtOnceClusterHandler) Delete(ctx context.Context, e event.DeleteEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) { + cluster, ok := e.Object.(*kueue.MultiKueueCluster) + if !ok { + return + } + if err := h.queueForCluster(ctx, cluster.Name, q); err != nil { + ctrl.LoggerFrom(ctx).V(2).Error(err, "Failed to queue workloads on cluster delete", "multiKueueCluster", cluster.Name) + } +} + +func (h *allAtOnceClusterHandler) Generic(context.Context, event.GenericEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) { +} + +func (h *allAtOnceClusterHandler) queueForCluster(ctx context.Context, clusterName string, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + configs := &kueue.MultiKueueConfigList{} + if err := h.client.List(ctx, configs, client.MatchingFields{multikueue.UsingMultiKueueClusters: clusterName}); err != nil { + return err + } + var errs []error + for _, cfg := range configs.Items { + if err := queueWorkloadsForConfig(ctx, h.client, cfg.Name, q); err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +// queueWorkloadsForConfig enqueues every workload that has a pending MultiKueue +// admission check whose MultiKueueConfig parameter matches the given config name. +func queueWorkloadsForConfig(ctx context.Context, c client.Client, configName string, q workqueue.TypedRateLimitingInterface[reconcile.Request]) error { + admissionChecks := &kueue.AdmissionCheckList{} + if err := c.List(ctx, admissionChecks, client.MatchingFields{multikueue.AdmissionCheckUsingConfigKey: configName}); err != nil { + return err + } + var errs []error + for _, ac := range admissionChecks.Items { + workloads := &kueue.WorkloadList{} + if err := c.List(ctx, workloads, client.MatchingFields{multikueue.WorkloadsWithAdmissionCheckKey: ac.Name}); err != nil { + errs = append(errs, err) + continue + } + for _, wl := range workloads.Items { + q.Add(reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&wl)}) + } + } + return errors.Join(errs...) +} + +func conditionStatusEqual(a, b *metav1.Condition) bool { + if a == b { + return true + } + if a == nil || b == nil { + return false + } + return a.Status == b.Status && a.Reason == b.Reason +} diff --git a/pkg/controller/workloaddispatcher/allatoncedispatcher_test.go b/pkg/controller/workloaddispatcher/allatoncedispatcher_test.go new file mode 100644 index 00000000000..579a21e628f --- /dev/null +++ b/pkg/controller/workloaddispatcher/allatoncedispatcher_test.go @@ -0,0 +1,259 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workloaddispatcher + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + testingclock "k8s.io/utils/clock/testing" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" + "sigs.k8s.io/kueue/pkg/util/admissioncheck" + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" + utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2" +) + +func TestAllAtOnceDispatcherReconciler_Reconcile(t *testing.T) { + const workloadName = "test-workload" + + now := time.Now() + fakeClock := testingclock.NewFakeClock(now) + baseWorkload := utiltestingapi.MakeWorkload(workloadName, metav1.NamespaceDefault) + + tests := map[string]struct { + workload *kueue.Workload + mkAcState *kueue.AdmissionCheckState + wantErr error + remoteClusters []string + clusters []kueue.MultiKueueCluster + }{ + "workload not found": { + workload: nil, + wantErr: apierrors.NewNotFound(schema.GroupResource{Group: kueue.GroupVersion.Group, Resource: "workloads"}, workloadName), + }, + "workload deleted": { + workload: baseWorkload.Clone().DeletionTimestamp(now).Finalizers("kubernetes").Obj(), + }, + "admission check nil": { + workload: baseWorkload.Clone().Obj(), + }, + "admission check is rejected": { + workload: baseWorkload.Clone().Obj(), + mkAcState: &kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateRejected, + }, + }, + "admission check is ready": { + workload: baseWorkload.Clone().Obj(), + mkAcState: &kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + }, + }, + "already assigned to cluster": { + workload: baseWorkload.Clone().ClusterName("assigned").Obj(), + mkAcState: &kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + }, + "workload is already finished": { + workload: baseWorkload.Clone().Finished().Obj(), + mkAcState: &kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + remoteClusters: []string{"cluster1"}, + clusters: []kueue.MultiKueueCluster{ + *utiltestingapi.MakeMultiKueueCluster("cluster1"). + KubeConfig(kueue.SecretLocationType, "cluster1"). + Generation(1). + Obj(), + }, + }, + "workload has quota reserved": { + workload: baseWorkload.Clone().Obj(), + mkAcState: &kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + remoteClusters: []string{"cluster1"}, + clusters: []kueue.MultiKueueCluster{ + *utiltestingapi.MakeMultiKueueCluster("cluster1"). + KubeConfig(kueue.SecretLocationType, "cluster1"). + Generation(1). + Obj(), + }, + }, + "workload is being evicted": { + workload: baseWorkload.Clone().EvictedAt(now).Obj(), + mkAcState: &kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }, + remoteClusters: []string{"cluster1"}, + clusters: []kueue.MultiKueueCluster{ + *utiltestingapi.MakeMultiKueueCluster("cluster1"). + KubeConfig(kueue.SecretLocationType, "cluster1"). + Generation(1). + Obj(), + }, + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + objs := []client.Object{} + if tc.mkAcState != nil { + tc.workload.Status.AdmissionChecks = []kueue.AdmissionCheckState{*tc.mkAcState} + ac := utiltestingapi.MakeAdmissionCheck(string(tc.mkAcState.Name)). + ControllerName(kueue.MultiKueueControllerName). + Parameters(kueue.GroupVersion.Group, "MultiKueueConfig", string(tc.mkAcState.Name)). + Obj() + + objs = append(objs, ac) + } + + if tc.workload != nil { + objs = append(objs, tc.workload) + } + + if tc.mkAcState != nil { + mkConfig := utiltestingapi.MakeMultiKueueConfig(string(tc.mkAcState.Name)).Clusters("cluster1").Obj() + objs = append(objs, mkConfig) + } + scheme := runtime.NewScheme() + if err := kueue.AddToScheme(scheme); err != nil { + t.Fatalf("Fail to add to scheme %s", err) + } + + if tc.clusters != nil { + for _, cluster := range tc.clusters { + objs = append(objs, &cluster) + } + } + cl := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build() + helper, _ := admissioncheck.NewMultiKueueStoreHelper(cl) + rec := &AllAtOnceDispatcherReconciler{ + client: cl, + helper: helper, + clock: fakeClock, + } + + req := ctrl.Request{NamespacedName: types.NamespacedName{Namespace: metav1.NamespaceDefault, Name: workloadName}} + ctx, _ := utiltesting.ContextWithLog(t) + _, gotErr := rec.Reconcile(ctx, req) + if diff := cmp.Diff(tc.wantErr, gotErr); diff != "" { + t.Errorf("Unexpected error (-want/+got)\n%s", diff) + } + }) + } +} + +func TestAllAtOnceDispatcherNominateWorkers(t *testing.T) { + const testName = "test-wl" + now := time.Now() + fakeClock := testingclock.NewFakeClock(now) + baseWl := utiltestingapi.MakeWorkload(testName, metav1.NamespaceDefault). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + }) + + testCases := map[string]struct { + remoteClusters sets.Set[string] + workload *kueue.Workload + wantNominatedClusters []string + }{ + "no remotes": { + remoteClusters: make(sets.Set[string]), + workload: baseWl.Clone().Obj(), + wantNominatedClusters: nil, + }, + "one remote": { + remoteClusters: sets.New("A"), + workload: baseWl.Clone().Obj(), + wantNominatedClusters: []string{"A"}, + }, + "three remotes": { + remoteClusters: sets.New("A", "B", "C"), + workload: baseWl.Clone().Obj(), + wantNominatedClusters: []string{"A", "B", "C"}, + }, + "remotes returned in sorted order": { + remoteClusters: sets.New("C", "A", "B"), + workload: baseWl.Clone().Obj(), + wantNominatedClusters: []string{"A", "B", "C"}, + }, + "all already nominated, no patch needed": { + remoteClusters: sets.New("A", "B", "C"), + workload: baseWl.Clone().NominatedClusterNames("A", "B", "C").Obj(), + wantNominatedClusters: []string{"A", "B", "C"}, + }, + "partial existing nomination, expanded to full set": { + remoteClusters: sets.New("A", "B", "C", "D"), + workload: baseWl.Clone().NominatedClusterNames("A", "B").Obj(), + wantNominatedClusters: []string{"A", "B", "C", "D"}, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + scheme := runtime.NewScheme() + if err := kueue.AddToScheme(scheme); err != nil { + t.Fatalf("Fail to add to scheme %s", err) + } + + objs := []client.Object{tc.workload} + cl := fake.NewClientBuilder().WithScheme(scheme). + WithInterceptorFuncs(interceptor.Funcs{ + SubResourcePatch: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { + tc.workload.Status.NominatedClusterNames = obj.(*kueue.Workload).Status.NominatedClusterNames + return utiltesting.TreatSSAAsStrategicMerge(ctx, client, subResourceName, obj, patch, opts...) + }, + }).WithObjects(objs...).WithStatusSubresource(objs...).Build() + + reconciler := &AllAtOnceDispatcherReconciler{ + client: cl, + clock: fakeClock, + } + + ctx, log := utiltesting.ContextWithLog(t) + if _, err := reconciler.nominateWorkers(ctx, tc.workload, tc.remoteClusters, log); err != nil { + t.Fatalf("nominateWorkers returned unexpected error: %v", err) + } + + if diff := cmp.Diff(tc.wantNominatedClusters, tc.workload.Status.NominatedClusterNames); diff != "" { + t.Errorf("unexpected nominated clusters (-want/+got):\n%s", diff) + } + }) + } +} diff --git a/pkg/controller/workloaddispatcher/controllers.go b/pkg/controller/workloaddispatcher/controllers.go index ac7c1e394e2..abdca5bd133 100644 --- a/pkg/controller/workloaddispatcher/controllers.go +++ b/pkg/controller/workloaddispatcher/controllers.go @@ -20,24 +20,37 @@ import ( ctrl "sigs.k8s.io/controller-runtime" configapi "sigs.k8s.io/kueue/apis/config/v1beta2" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/util/admissioncheck" "sigs.k8s.io/kueue/pkg/util/roletracker" ) func SetupControllers(mgr ctrl.Manager, cfg *configapi.Configuration, roleTracker *roletracker.RoleTracker) (string, error) { - if *cfg.MultiKueue.DispatcherName != configapi.MultiKueueDispatcherModeIncremental { - return "", nil - } - helper, err := admissioncheck.NewMultiKueueStoreHelper(mgr.GetClient()) if err != nil { return "", err } - idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, roleTracker) - err = idRec.SetupWithManager(mgr, cfg) - if err != nil { - return "multikueue-incremental-dispatcher", err + switch *cfg.MultiKueue.DispatcherName { + case configapi.MultiKueueDispatcherModeIncremental: + idRec := NewIncrementalDispatcherReconciler(mgr.GetClient(), helper, roleTracker) + if err := idRec.SetupWithManager(mgr, cfg); err != nil { + return "multikueue-incremental-dispatcher", err + } + case configapi.MultiKueueDispatcherModeAllAtOnce: + // When MultiKueueAllAtOnceExternal is disabled, AllAtOnce nomination is + // handled inline in the MultiKueue workload reconciler; do not register + // the dedicated dispatcher controller, otherwise it would race the + // inline path on Status.NominatedClusterNames. + if !features.Enabled(features.MultiKueueAllAtOnceExternal) { + return "", nil + } + aRec := NewAllAtOnceDispatcherReconciler(mgr.GetClient(), helper, roleTracker) + if err := aRec.SetupWithManager(mgr, cfg); err != nil { + return "multikueue-all-at-once-dispatcher", err + } + default: + // External dispatcher mode: no built-in controller is registered. } return "", nil diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 214634a828a..701adef2e4e 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -266,6 +266,17 @@ const ( // Redo admission on eviction in worker cluster. MultiKueueRedoAdmissionOnEvictionInWorker featuregate.Feature = "MultiKueueRedoAdmissionOnEvictionInWorker" + // owner: @andrewseif + // + // issue: https://github.com/kubernetes-sigs/kueue/issues/6803 + // Run the AllAtOnce MultiKueue dispatcher as a dedicated controller in the + // workloaddispatcher package, instead of inline in the MultiKueue workload + // reconciler. When disabled, falls back to the legacy inline AllAtOnce + // nomination path. The synchronizer in the MultiKueue workload reconciler + // reads Status.NominatedClusterNames in either case; only the producer of + // that field changes. + MultiKueueAllAtOnceExternal featuregate.Feature = "MultiKueueAllAtOnceExternal" + // owner: @kannon92 // // issue: https://github.com/kubernetes-sigs/kueue/issues/8190 @@ -565,6 +576,9 @@ var defaultVersionedFeatureGates = map[featuregate.Feature]featuregate.Versioned {Version: version.MustParse("0.16"), Default: true, PreRelease: featuregate.Beta}, // GA in 0.18 {Version: version.MustParse("0.18"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 0.20 }, + MultiKueueAllAtOnceExternal: { + {Version: version.MustParse("0.18"), Default: true, PreRelease: featuregate.Beta}, + }, TLSOptions: { {Version: version.MustParse("0.16"), Default: true, PreRelease: featuregate.Beta}, // GA in 0.20 (https://github.com/kubernetes-sigs/kueue/issues/10704) }, diff --git a/site/data/featuregates/versioned_feature_list.yaml b/site/data/featuregates/versioned_feature_list.yaml index 36961a186c6..4e2f8cc56a2 100644 --- a/site/data/featuregates/versioned_feature_list.yaml +++ b/site/data/featuregates/versioned_feature_list.yaml @@ -199,6 +199,12 @@ lockToDefault: false preRelease: Beta version: "0.15" +- name: MultiKueueAllAtOnceExternal + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "0.18" - name: MultiKueueAllowInsecureKubeconfigs versionedSpecs: - default: false diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index 36961a186c6..4e2f8cc56a2 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -199,6 +199,12 @@ lockToDefault: false preRelease: Beta version: "0.15" +- name: MultiKueueAllAtOnceExternal + versionedSpecs: + - default: true + lockToDefault: false + preRelease: Beta + version: "0.18" - name: MultiKueueAllowInsecureKubeconfigs versionedSpecs: - default: false diff --git a/test/integration/multikueue/scheduler/suite_test.go b/test/integration/multikueue/scheduler/suite_test.go index 8bc75601c6c..772520ea1d8 100644 --- a/test/integration/multikueue/scheduler/suite_test.go +++ b/test/integration/multikueue/scheduler/suite_test.go @@ -42,6 +42,7 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" + "sigs.k8s.io/kueue/pkg/controller/workloaddispatcher" "sigs.k8s.io/kueue/pkg/scheduler" preemptexpectations "sigs.k8s.io/kueue/pkg/scheduler/preemption/expectations" "sigs.k8s.io/kueue/pkg/util/kubeversion" @@ -178,6 +179,15 @@ func managerAndMultiKueueSetup( multikueue.WithDispatcherName(dispatcherName), ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + configuration := &config.Configuration{ + MultiKueue: &config.MultiKueue{ + DispatcherName: &dispatcherName, + }, + } + mgr.GetScheme().Default(configuration) + _, err = workloaddispatcher.SetupControllers(mgr, configuration, nil) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } var _ = ginkgo.BeforeSuite(func() { diff --git a/test/integration/multikueue/tas/suite_test.go b/test/integration/multikueue/tas/suite_test.go index dca85781520..3cb00563fe3 100644 --- a/test/integration/multikueue/tas/suite_test.go +++ b/test/integration/multikueue/tas/suite_test.go @@ -44,6 +44,7 @@ import ( workloadjob "sigs.k8s.io/kueue/pkg/controller/jobs/job" "sigs.k8s.io/kueue/pkg/controller/tas" tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer" + "sigs.k8s.io/kueue/pkg/controller/workloaddispatcher" "sigs.k8s.io/kueue/pkg/scheduler" preemptexpectations "sigs.k8s.io/kueue/pkg/scheduler/preemption/expectations" "sigs.k8s.io/kueue/pkg/util/kubeversion" @@ -184,6 +185,15 @@ func managerAndMultiKueueSetup( multikueue.WithDispatcherName(dispatcherName), ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + configuration := &config.Configuration{ + MultiKueue: &config.MultiKueue{ + DispatcherName: &dispatcherName, + }, + } + mgr.GetScheme().Default(configuration) + _, err = workloaddispatcher.SetupControllers(mgr, configuration, nil) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) } var _ = ginkgo.BeforeSuite(func() {