diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 71176900a81..17f197b2d08 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -449,10 +449,9 @@ func (s *Scheduler) processEntry( s.waitForPodsReadyIfBlocked(ctx, log, e) + // Copy ClusterName from old slice before admission (needed for MultiKueue). if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil { - if err := s.replaceOldWorkloadSlice(ctx, log, e, oldWorkloadSlice); err != nil { - return - } + e.Obj.Status.ClusterName = oldWorkloadSlice.WorkloadInfo.Obj.Status.ClusterName } if features.Enabled(features.ConcurrentAdmission) && concurrentadmission.IsVariant(e.Obj) { @@ -463,7 +462,7 @@ func (s *Scheduler) processEntry( } e.markNominated() - if err := s.admit(ctx, e, cq); err != nil { + if err := s.admit(ctx, e, cq, oldWorkloadSlice); err != nil { e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err) } } @@ -531,18 +530,14 @@ func (s *Scheduler) waitForPodsReadyIfBlocked(ctx context.Context, log logr.Logg log.V(5).Info("Finished waiting for all admitted workloads to be in the PodsReady condition") } -// replaceOldWorkloadSlice deactivates the old slice and finalizes its status. -// The new slice's clusterName is copied from the old one so that MultiKueue -// placement stays consistent across the replacement. If the subsequent admit -// fails, the old slice may end up Finished while the new one is not admitted; -// downstream controllers handle suspension/eviction. -func (s *Scheduler) replaceOldWorkloadSlice(ctx context.Context, log logr.Logger, e *entry, oldWorkloadSlice *preemption.Target) error { - e.Obj.Status.ClusterName = oldWorkloadSlice.WorkloadInfo.Obj.Status.ClusterName +// replaceOldWorkloadSlice finishes the old slice after the new slice has been +// admitted. Called inside the admit success path so the old slice is only +// finished when the new one is confirmed. If this fails, the job reconciler's +// EnsureWorkloadSlices detects both slices admitted and finishes the old one. +func (s *Scheduler) replaceOldWorkloadSlice(ctx context.Context, log logr.Logger, e *entry, oldWorkloadSlice *preemption.Target) { if err := s.replaceWorkloadSlice(ctx, oldWorkloadSlice.WorkloadInfo.ClusterQueue, e.Obj, oldWorkloadSlice.WorkloadInfo.Obj.DeepCopy()); err != nil { - log.Error(err, "Failed to replace workload slice") - return err + log.Error(err, "Failed to finish old workload slice after admitting replacement; job reconciler will handle recovery") } - return nil } type entryStatus string @@ -788,7 +783,7 @@ func updateAssignmentForTAS(log logr.Logger, snapshot *schdcache.Snapshot, cq *s // the entry, and asynchronously updates the object in the apiserver after // assuming it in the cache. // Note: this does not necessarily make the workload "admitted". -func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQueueSnapshot) error { +func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQueueSnapshot, oldWorkloadSlice *preemption.Target) error { log := ctrl.LoggerFrom(ctx) admission := &kueue.Admission{ ClusterQueue: e.ClusterQueue, @@ -816,6 +811,9 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQu s.recordWorkloadAdmissionMetrics(log, newWorkload, e.Obj, admission, consideredStr) log.V(2).Info("Workload successfully admitted and assigned flavors", "assignments", admission.PodSetAssignments) + if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil { + s.replaceOldWorkloadSlice(ctx, log, e, oldWorkloadSlice) + } return } // Ignore errors because the workload or clusterQueue could have been deleted diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 021ae561c72..3f53360996e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -5062,9 +5062,9 @@ func TestSchedule(t *testing.T) { }, eventCmpOpts: ignoreEventMessageCmpOpts, wantEvents: []utiltesting.EventRecord{ - utiltesting.MakeEventRecord("sales", "foo-1", kueue.WorkloadSliceReplaced, corev1.EventTypeNormal).Obj(), utiltesting.MakeEventRecord("sales", "foo-2", "QuotaReserved", corev1.EventTypeNormal).Obj(), utiltesting.MakeEventRecord("sales", "foo-2", "Admitted", corev1.EventTypeNormal).Obj(), + utiltesting.MakeEventRecord("sales", "foo-1", kueue.WorkloadSliceReplaced, corev1.EventTypeNormal).Obj(), }, }, "pending admission check with nofit and fit flavors": { diff --git a/pkg/workloadslicing/workloadslicing.go b/pkg/workloadslicing/workloadslicing.go index dd1c3c3d3e8..383f8b807bf 100644 --- a/pkg/workloadslicing/workloadslicing.go +++ b/pkg/workloadslicing/workloadslicing.go @@ -217,7 +217,7 @@ func EnsureWorkloadSlices( // Finish the old workload slice if: // a. It lost its quota reservation, or - // b. It was explicitly evicted, or + // b. It was explicitly evicted (preemption, timeout, admin action), or // c. The new workload has been admitted (has quota reservation) AND has a replacement // annotation pointing to the old workload. This handles the case where the scheduler // admitted the new slice but failed to finish the old slice. diff --git a/test/integration/singlecluster/controller/jobs/job/job_controller_test.go b/test/integration/singlecluster/controller/jobs/job/job_controller_test.go index 65ba53c25d2..2fb7f423894 100644 --- a/test/integration/singlecluster/controller/jobs/job/job_controller_test.go +++ b/test/integration/singlecluster/controller/jobs/job/job_controller_test.go @@ -4326,6 +4326,52 @@ var _ = ginkgo.Describe("Job with elastic jobs via workload-slices support", gin util.ExpectFinishedWorkloadsTotalMetric(clusterQueue, highPriorityClass.Name, 0) util.ExpectLQFinishedWorkloadsTotalMetric(localQueue, highPriorityClass.Name, 0) }) + + ginkgo.It("Should keep old workload-slice unfinished while replacement slice is pending", func() { + features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.ElasticJobsViaWorkloadSlices, true) + + elasticJob := testingjob.MakeJob("job-pending-replacement", ns.Name). + SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue). + Queue(kueue.LocalQueueName(localQueue.Name)). + Request(corev1.ResourceCPU, "1000m"). + Parallelism(3). + Completions(int32(cpuNominalQuota + 1)). + Obj() + + ginkgo.By("creating an elastic job") + util.MustCreate(ctx, k8sClient, elasticJob) + + ginkgo.By("the original workload slice is admitted") + workloads := util.ExpectWorkloadsInNamespace(ctx, k8sClient, elasticJob.Namespace, 1) + oldWorkloadSlice := &workloads[0] + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, oldWorkloadSlice) + util.ExpectJobUnsuspendedWithNodeSelectors(ctx, k8sClient, client.ObjectKeyFromObject(elasticJob), nil) + + ginkgo.By("scaling the job beyond the remaining quota") + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(elasticJob), elasticJob)).Should(gomega.Succeed()) + elasticJob.Spec.Parallelism = new(int32(cpuNominalQuota + 1)) + g.Expect(k8sClient.Update(ctx, elasticJob)).Should(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("observing a new pending replacement workload slice") + newWorkloadSlice := util.ExpectNewWorkloadSlice(ctx, k8sClient, oldWorkloadSlice) + gomega.Expect(newWorkloadSlice).ShouldNot(gomega.BeNil()) + util.ExpectWorkloadsToBePending(ctx, k8sClient, newWorkloadSlice) + + ginkgo.By("keeping the old workload slice unfinished until the replacement is admitted") + gomega.Consistently(func(g gomega.Gomega) { + currentOldSlice := &kueue.Workload{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(oldWorkloadSlice), currentOldSlice)).Should(gomega.Succeed()) + g.Expect(currentOldSlice.Status.Conditions).ShouldNot(utiltesting.HaveConditionStatusTrue(kueue.WorkloadFinished)) + g.Expect(workload.IsAdmitted(currentOldSlice)).Should(gomega.BeTrue()) + + currentNewSlice := &kueue.Workload{} + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(newWorkloadSlice), currentNewSlice)).Should(gomega.Succeed()) + g.Expect(currentNewSlice.Status.Conditions).Should(utiltesting.HaveConditionStatusFalseAndReason(kueue.WorkloadQuotaReserved, "Pending")) + g.Expect(currentNewSlice.Status.Conditions).ShouldNot(utiltesting.HaveConditionStatusTrue(kueue.WorkloadFinished)) + }, util.ConsistentDuration, util.ShortInterval).Should(gomega.Succeed()) + }) }) var _ = ginkgo.Describe("Job reconciliation", ginkgo.Ordered, func() {