Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,9 @@ func (s *Scheduler) processEntry(

s.waitForPodsReadyIfBlocked(ctx, log, e)

// Copy ClusterName from old slice before admission (needed for MultiKueue).
if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil {
if err := s.replaceOldWorkloadSlice(ctx, log, e, oldWorkloadSlice); err != nil {
return
}
e.Obj.Status.ClusterName = oldWorkloadSlice.WorkloadInfo.Obj.Status.ClusterName
}

if features.Enabled(features.ConcurrentAdmission) && concurrentadmission.IsVariant(e.Obj) {
Expand All @@ -463,7 +462,7 @@ func (s *Scheduler) processEntry(
}

e.markNominated()
if err := s.admit(ctx, e, cq); err != nil {
if err := s.admit(ctx, e, cq, oldWorkloadSlice); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
}
}
Expand Down Expand Up @@ -531,18 +530,14 @@ func (s *Scheduler) waitForPodsReadyIfBlocked(ctx context.Context, log logr.Logg
log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition")
}

// replaceOldWorkloadSlice deactivates the old slice and finalizes its status.
// The new slice's clusterName is copied from the old one so that MultiKueue
// placement stays consistent across the replacement. If the subsequent admit
// fails, the old slice may end up Finished while the new one is not admitted;
// downstream controllers handle suspension/eviction.
func (s *Scheduler) replaceOldWorkloadSlice(ctx context.Context, log logr.Logger, e *entry, oldWorkloadSlice *preemption.Target) error {
e.Obj.Status.ClusterName = oldWorkloadSlice.WorkloadInfo.Obj.Status.ClusterName
// replaceOldWorkloadSlice finishes the old slice after the new slice has been
// admitted. Called inside the admit success path so the old slice is only
// finished when the new one is confirmed. If this fails, the job reconciler's
// EnsureWorkloadSlices detects both slices admitted and finishes the old one.
func (s *Scheduler) replaceOldWorkloadSlice(ctx context.Context, log logr.Logger, e *entry, oldWorkloadSlice *preemption.Target) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: After the refactoring, replaceOldWorkloadSlice no longer appears to have meaningful control-flow value beyond local error reporting.

Would it make sense to make it fire-and-forget and move the error logging into the processEntry call site instead?

Something along the lines of:

if err := s.admit(ctx, e, cq); err != nil {
    e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
    return
}

if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil {
    if err := s.replaceOldWorkloadSlice(...); err != nil {
        log.Error(err, "Failed to finish old workload slice after admitting replacement")
    }
}

This would keep admit generic and avoid exposing it to workload-slice replacement semantics for the common non-elastic workload path.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above — since this runs inside the async goroutine, there's no synchronous call site to bubble the error up to. Logging inside the function is the only option here.

if err := s.replaceWorkloadSlice(ctx, oldWorkloadSlice.WorkloadInfo.ClusterQueue, e.Obj, oldWorkloadSlice.WorkloadInfo.Obj.DeepCopy()); err != nil {
log.Error(err, "Failed to replace workload slice")
return err
log.Error(err, "Failed to finish old workload slice after admitting replacement; job reconciler will handle recovery")
Copy link
Copy Markdown
Contributor

@mimowo mimowo May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is ok, but IIUC if this fails, then we may observe double counting of the quota for the brief moment - before the JobReconciler finishes the old slices.

The quota bump may result in temporary excessive preemptions of other workloads, or blocking workloads that could be admitted otherwise.

So, I think ideally if the scheduler cache was aware of the fact that both slices are admitted at the same time, and only count capacity from the "max" of them, rather than "sum".

However, that quota bump is very rare (requires request failure), and the consequence is also limited (excessive preemptions for a brief moment), so my comment is mostly to confirm my understanding rather than to request changes.

We may re-evaluate if this is good enough or not before GA graduation.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, that's right. The double-count only happens if replaceOldWorkloadSlice fails after admission succeeds i.e. the old slice stays admitted in the cache alongside the new one until the job reconciler's EnsureWorkloadSlices finishes the old slice on its next reconcile. This is conservative (over-reports usage, blocks other admits temporarily) and resolves within one reconcile cycle. I agree it's fine for now, and the cache-level max-not-sum optimization is a good candidate for GA if it shows up in practice.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had discussed earlier that it could be helpful to "teach" the queue/cache how to exclude the unfinished old slice from quota accounting, so we don't observe an artificial quota increase in the case where updating the old slice fails.

It would be great if we could address that in this PR, but I also think it is reasonable as a follow-up.

In that case, since you now have all the context around this flow 🙂, would you mind creating a follow-up issue and linking it to this PR?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about I document this as a GA consideration in the ElasticJobsViaWorkloadSlices KEP instead? It's a pretty narrow failure path (API write failing right after a successful one) and @mimowo was also leaning toward revisiting at GA.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to updating the KEP, could you do it in this PR, please?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added here: #11242

}
return nil
}

type entryStatus string
Expand Down Expand Up @@ -788,7 +783,7 @@ func updateAssignmentForTAS(log logr.Logger, snapshot *schdcache.Snapshot, cq *s
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
// Note: this does not necessarily make the workload "admitted".
func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQueueSnapshot) error {
func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQueueSnapshot, oldWorkloadSlice *preemption.Target) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I understand why replaceOldWorkloadSlice needs to happen only after successful admission, but is there a reason it has to be inside admit rather than immediately after a successful admit call?

For example, we could keep admit generic and do:

if err := s.admit(ctx, e, cq); err != nil {
    e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
    return
}

if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil {
    s.replaceOldWorkloadSlice(ctx, log, e, oldWorkloadSlice)
}

That would preserve the important ordering, finish the old slice only after the replacement was successfully admitted, while avoiding exposing admit to the workload-slice replacement concept. Most workloads are not elastic slice replacements, so passing oldWorkloadSlice through admit feels a bit leaky to me.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think admit() is async. The admissionRoutineWrapper.Run() launches a goroutine and admit() returns nil immediately before PatchAdmissionStatus completes. If we finish the old slice after admit() returns in processEntry(), we'd be finishing it before admission is actually confirmed. It's the same race we're fixing. That's why it has to live inside the goroutine's success path.

log := ctrl.LoggerFrom(ctx)
admission := &kueue.Admission{
ClusterQueue: e.ClusterQueue,
Expand Down Expand Up @@ -816,6 +811,9 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQu
s.recordWorkloadAdmissionMetrics(log, newWorkload, e.Obj, admission, consideredStr)

log.V(2).Info("Workload successfully admitted and assigned flavors", "assignments", admission.PodSetAssignments)
if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil {
s.replaceOldWorkloadSlice(ctx, log, e, oldWorkloadSlice)
}
return
}
// Ignore errors because the workload or clusterQueue could have been deleted
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5062,9 +5062,9 @@ func TestSchedule(t *testing.T) {
},
eventCmpOpts: ignoreEventMessageCmpOpts,
wantEvents: []utiltesting.EventRecord{
utiltesting.MakeEventRecord("sales", "foo-1", kueue.WorkloadSliceReplaced, corev1.EventTypeNormal).Obj(),
utiltesting.MakeEventRecord("sales", "foo-2", "QuotaReserved", corev1.EventTypeNormal).Obj(),
utiltesting.MakeEventRecord("sales", "foo-2", "Admitted", corev1.EventTypeNormal).Obj(),
utiltesting.MakeEventRecord("sales", "foo-1", kueue.WorkloadSliceReplaced, corev1.EventTypeNormal).Obj(),
},
},
"pending admission check with nofit and fit flavors": {
Expand Down
2 changes: 1 addition & 1 deletion pkg/workloadslicing/workloadslicing.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func EnsureWorkloadSlices(

// Finish the old workload slice if:
// a. It lost its quota reservation, or
// b. It was explicitly evicted, or
// b. It was explicitly evicted (preemption, timeout, admin action), or
// c. The new workload has been admitted (has quota reservation) AND has a replacement
// annotation pointing to the old workload. This handles the case where the scheduler
// admitted the new slice but failed to finish the old slice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4326,6 +4326,52 @@ var _ = ginkgo.Describe("Job with elastic jobs via workload-slices support", gin
util.ExpectFinishedWorkloadsTotalMetric(clusterQueue, highPriorityClass.Name, 0)
util.ExpectLQFinishedWorkloadsTotalMetric(localQueue, highPriorityClass.Name, 0)
})

ginkgo.It("Should keep old workload-slice unfinished while replacement slice is pending", func() {
features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.ElasticJobsViaWorkloadSlices, true)

elasticJob := testingjob.MakeJob("job-pending-replacement", ns.Name).
SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue).
Queue(kueue.LocalQueueName(localQueue.Name)).
Request(corev1.ResourceCPU, "1000m").
Parallelism(3).
Completions(int32(cpuNominalQuota + 1)).
Obj()

ginkgo.By("creating an elastic job")
util.MustCreate(ctx, k8sClient, elasticJob)

ginkgo.By("the original workload slice is admitted")
workloads := util.ExpectWorkloadsInNamespace(ctx, k8sClient, elasticJob.Namespace, 1)
oldWorkloadSlice := &workloads[0]
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, oldWorkloadSlice)
util.ExpectJobUnsuspendedWithNodeSelectors(ctx, k8sClient, client.ObjectKeyFromObject(elasticJob), nil)

ginkgo.By("scaling the job beyond the remaining quota")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IFIUC If the newWorkload is beyond the remaining quota the newWorkload would not reserve the quota, and so Kueue won't try to admit the newWorkload and return in line 401.
So I don't think the added changes are tested here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler test framework can only fail all workload patches at once, not just the new slice's, so we can't simulate a selective admission failure here. The scheduler fix is structural: replaceOldWorkloadSlice only fires after PatchAdmissionStatus succeeds. This integration test covers the other side, making sure the job reconciler doesn't finish the old slice while the replacement is still pending.

Copy link
Copy Markdown
Contributor

@yaroslava-serdiuk yaroslava-serdiuk May 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This integration test covers the other side, making sure the job reconciler doesn't finish the old slice while the replacement is still pending.

I don't think it's actually tested here, because the replacement doesn't happening here since the new workload is beyond the quota.
Additionally, the test passes when run without the changes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @yaroslava-serdiuk for spotting that. I think this test is wrong indeed. If the test is not testing the behavior change (passes before and after the change), then let's drop it.

We may consider a test which explicitly tests the invariant that OldSlice it not finished when transitioning the new slice to Admitted. This can be done with the watch pattern.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened the follow up: #11283

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(elasticJob), elasticJob)).Should(gomega.Succeed())
elasticJob.Spec.Parallelism = new(int32(cpuNominalQuota + 1))
g.Expect(k8sClient.Update(ctx, elasticJob)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("observing a new pending replacement workload slice")
newWorkloadSlice := util.ExpectNewWorkloadSlice(ctx, k8sClient, oldWorkloadSlice)
gomega.Expect(newWorkloadSlice).ShouldNot(gomega.BeNil())
util.ExpectWorkloadsToBePending(ctx, k8sClient, newWorkloadSlice)

ginkgo.By("keeping the old workload slice unfinished until the replacement is admitted")
gomega.Consistently(func(g gomega.Gomega) {
currentOldSlice := &kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(oldWorkloadSlice), currentOldSlice)).Should(gomega.Succeed())
g.Expect(currentOldSlice.Status.Conditions).ShouldNot(utiltesting.HaveConditionStatusTrue(kueue.WorkloadFinished))
g.Expect(workload.IsAdmitted(currentOldSlice)).Should(gomega.BeTrue())

currentNewSlice := &kueue.Workload{}
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(newWorkloadSlice), currentNewSlice)).Should(gomega.Succeed())
g.Expect(currentNewSlice.Status.Conditions).Should(utiltesting.HaveConditionStatusFalseAndReason(kueue.WorkloadQuotaReserved, "Pending"))
g.Expect(currentNewSlice.Status.Conditions).ShouldNot(utiltesting.HaveConditionStatusTrue(kueue.WorkloadFinished))
}, util.ConsistentDuration, util.ShortInterval).Should(gomega.Succeed())
})
})

var _ = ginkgo.Describe("Job reconciliation", ginkgo.Ordered, func() {
Expand Down
Loading