Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,12 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group
// supporting preferred or required placement constraints.
if clusterName := workload.ClusterName(group.local); group.IsElasticWorkload() && clusterName != "" {
nominatedWorkers = []string{clusterName}
} else if w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce {
} else if !features.Enabled(features.MultiKueueAllAtOnceExternal) && w.dispatcherName == config.MultiKueueDispatcherModeAllAtOnce {
// Legacy inline AllAtOnce nomination path. Kept under feature gate so
// operators can roll back to the pre-#10937 behavior if the dedicated
// AllAtOnceDispatcherReconciler controller misbehaves. When the
// MultiKueueAllAtOnceExternal gate goes GA, this branch (and the
// surrounding gate check) can be removed.
for workerName := range group.remotes {
nominatedWorkers = append(nominatedWorkers, workerName)
}
Expand All @@ -773,7 +778,9 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group
}
}
} else {
// Incremental dispatcher and External dispatcher path
// A dispatcher placed outside this file (AllAtOnce when MultiKueueAllAtOnceExternal=true,
// Incremental, or External) is responsible for populating
// Status.NominatedClusterNames; the synchronizer just reads it.
nominatedWorkers = group.local.Status.NominatedClusterNames
}

Expand All @@ -790,6 +797,18 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group
}
}
} else if remoteWl != nil {
// Preserve a remote workload that is currently Evicted: reconcileGroup
// needs it (with WorkloadEvicted=True) to call SyncJob and
// propagate the remote job's termination back to the manager Job.
// Deleting it here breaks the eviction-recovery flow because
// bestMatchByCondition(WorkloadEvicted) becomes nil, SyncJob is no
// longer called, and the manager Job's Status.Active is never updated.
// The remote will be cleaned up later, once the local workload is
// re-admitted (or finishes / loses its quota reservation).
if workload.IsEvicted(remoteWl) {
log.V(3).Info("Preserving evicted remote workload to allow eviction-recovery sync", "remote", rem)
continue
}
if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil {
log.V(2).Error(err, "removing non-nominated remote object", "remote", rem)
errs = append(errs, err)
Expand Down
58 changes: 41 additions & 17 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
useSecondWorker: true,
Expand Down Expand Up @@ -351,6 +352,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
worker1Workloads: []kueue.Workload{
Expand Down Expand Up @@ -389,6 +391,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
managersJobs: []batchv1.Job{
Expand Down Expand Up @@ -1135,6 +1138,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
managersJobs: []batchv1.Job{
Expand Down Expand Up @@ -1178,6 +1182,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
managersJobs: []batchv1.Job{
Expand Down Expand Up @@ -1283,6 +1288,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
managersJobs: []batchv1.Job{
Expand Down Expand Up @@ -1372,6 +1378,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
managersJobs: []batchv1.Job{
Expand Down Expand Up @@ -1461,6 +1468,7 @@ func TestWlReconcile(t *testing.T) {
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
ReserveQuotaAt(utiltestingapi.MakeAdmission("q1").Obj(), now).
NominatedClusterNames("worker1", "worker2").
Obj(),
},
managersJobs: []batchv1.Job{
Expand Down Expand Up @@ -1717,28 +1725,41 @@ func TestNominateAndSynchronizeWorkers_MoreCases(t *testing.T) {
now := time.Now()

tests := []struct {
name string
dispatcherMode string
remotes map[string]*kueue.Workload
nominatedWorkers []string
cond *metav1.Condition
createErr error
wantCreated []string
wantErr bool
name string
dispatcherMode string
allAtOnceExternalGate *bool // nil = use default; otherwise overrides MultiKueueAllAtOnceExternal for this case
remotes map[string]*kueue.Workload
nominatedWorkers []string
cond *metav1.Condition
createErr error
wantCreated []string
wantErr bool
}{
{
name: "AllClusters: clone to all remotes, nominates all",
dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce,
remotes: map[string]*kueue.Workload{remoteNames[0]: nil, remoteNames[1]: nil},
wantCreated: []string{remoteNames[0], remoteNames[1]},
name: "AllClusters: clone to all remotes, nominates all",
dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce,
remotes: map[string]*kueue.Workload{remoteNames[0]: nil, remoteNames[1]: nil},
nominatedWorkers: []string{remoteNames[0], remoteNames[1]},
wantCreated: []string{remoteNames[0], remoteNames[1]},
},
{
name: "AllClusters: workloads already created on remotes, do not create again",
dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce,
remotes: map[string]*kueue.Workload{remoteNames[0]: {}, remoteNames[1]: {}},
wantCreated: nil,
name: "AllClusters: workloads already created on remotes, do not create again",
dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce,
remotes: map[string]*kueue.Workload{remoteNames[0]: {}, remoteNames[1]: {}},
nominatedWorkers: []string{remoteNames[0], remoteNames[1]},
wantCreated: nil,
},
// Incremental dispatcher tests were moved to a separate file.
{
// Legacy inline AllAtOnce path: with the feature gate disabled,
// nominateAndSynchronizeWorkers itself populates NominatedClusterNames
// from group.remotes (no pre-populated nominatedWorkers needed).
name: "AllClusters legacy: gate off, inline branch nominates from group.remotes",
dispatcherMode: config.MultiKueueDispatcherModeAllAtOnce,
allAtOnceExternalGate: new(bool),
remotes: map[string]*kueue.Workload{remoteNames[0]: nil, remoteNames[1]: nil},
wantCreated: []string{remoteNames[0], remoteNames[1]},
},
// Incremental and AllAtOnce dispatcher unit tests live in pkg/controller/workloaddispatcher.
{
name: "External controller: no nominated workers, nothing created",
dispatcherMode: externalMultiKueueDispatcherController,
Expand All @@ -1763,6 +1784,9 @@ func TestNominateAndSynchronizeWorkers_MoreCases(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.allAtOnceExternalGate != nil {
features.SetFeatureGateDuringTest(t, features.MultiKueueAllAtOnceExternal, *tt.allAtOnceExternalGate)
}
fakeClock := testingclock.NewFakeClock(now)

local := &kueue.Workload{
Expand Down
Loading