Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,16 +370,13 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
// - the new workload is not admitted.
// In a single-cluster context, this should lead to Job suspension.
// In a MultiKueue context, this should also trigger removal of remote workload/Job objects.
// Copy ClusterName from old slice before admission (needed for MultiKueue).
if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil {
e.Obj.Status.ClusterName = oldWorkloadSlice.WorkloadInfo.Obj.Status.ClusterName
if err := s.replaceWorkloadSlice(ctx, oldWorkloadSlice.WorkloadInfo.ClusterQueue, e.Obj, oldWorkloadSlice.WorkloadInfo.Obj.DeepCopy()); err != nil {
log.Error(err, "Failed to replace workload slice")
continue
}
}

e.status = nominated
if err := s.admit(ctx, e, cq); err != nil {
if err := s.admit(ctx, e, cq, oldWorkloadSlice); err != nil {
e.inadmissibleMsg = fmt.Sprintf("Failed to admit workload: %v", err)
}
}
Expand Down Expand Up @@ -651,7 +648,7 @@ func updateAssignmentForTAS(log logr.Logger, snapshot *schdcache.Snapshot, cq *s
// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQueueSnapshot) error {
func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQueueSnapshot, oldWorkloadSlice *preemption.Target) error {
log := ctrl.LoggerFrom(ctx)
admission := &kueue.Admission{
ClusterQueue: e.ClusterQueue,
Expand Down Expand Up @@ -679,6 +676,11 @@ func (s *Scheduler) admit(ctx context.Context, e *entry, cq *schdcache.ClusterQu
s.recordWorkloadAdmissionMetrics(newWorkload, e.Obj, admission, consideredStr)

log.V(2).Info("Workload successfully admitted and assigned flavors", "assignments", admission.PodSetAssignments)
if features.Enabled(features.ElasticJobsViaWorkloadSlices) && oldWorkloadSlice != nil {
if err := s.replaceWorkloadSlice(ctx, oldWorkloadSlice.WorkloadInfo.ClusterQueue, e.Obj, oldWorkloadSlice.WorkloadInfo.Obj.DeepCopy()); err != nil {
log.Error(err, "Failed to finish old workload slice after admitting replacement; job reconciler will handle recovery")
}
}
Comment on lines +679 to +683
return
}
// Ignore errors because the workload or clusterQueue could have been deleted
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4872,9 +4872,9 @@ func TestSchedule(t *testing.T) {
},
eventCmpOpts: ignoreEventMessageCmpOpts,
wantEvents: []utiltesting.EventRecord{
utiltesting.MakeEventRecord("sales", "foo-1", kueue.WorkloadSliceReplaced, corev1.EventTypeNormal).Obj(),
utiltesting.MakeEventRecord("sales", "foo-2", "QuotaReserved", corev1.EventTypeNormal).Obj(),
utiltesting.MakeEventRecord("sales", "foo-2", "Admitted", corev1.EventTypeNormal).Obj(),
utiltesting.MakeEventRecord("sales", "foo-1", kueue.WorkloadSliceReplaced, corev1.EventTypeNormal).Obj(),
},
},
"pending admission check with nofit and fit flavors": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4188,6 +4188,49 @@ var _ = ginkgo.Describe("Job with elastic jobs via workload-slices support", gin
util.ExpectFinishedWorkloadsTotalMetric(clusterQueue, highPriorityClass.Name, 0)
util.ExpectLQFinishedWorkloadsTotalMetric(localQueue, highPriorityClass.Name, 0)
})

ginkgo.It("Should not finish old workload-slice before new slice is admitted", func() {
features.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), features.ElasticJobsViaWorkloadSlices, true)

elasticJob := testingjob.MakeJob("job-slice-ordering", ns.Name).
SetAnnotation(workloadslicing.EnabledAnnotationKey, workloadslicing.EnabledAnnotationValue).
Queue(kueue.LocalQueueName(localQueue.Name)).
Request(corev1.ResourceCPU, "100m").
Parallelism(1).
Completions(3).
Obj()

ginkgo.By("creating an elastic job")
util.MustCreate(ctx, k8sClient, elasticJob)

ginkgo.By("the original workload slice is admitted")
workloads := util.ExpectWorkloadsInNamespace(ctx, k8sClient, elasticJob.Namespace, 1)
oldWorkloadSlice := &workloads[0]
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, oldWorkloadSlice)
util.ExpectJobUnsuspendedWithNodeSelectors(ctx, k8sClient, client.ObjectKeyFromObject(elasticJob), nil)

ginkgo.By("setting up a watch on workloads before scaling")
watchClient, ok := k8sClient.(client.WithWatch)
gomega.Expect(ok).Should(gomega.BeTrue(), "k8sClient must implement client.WithWatch")
watcher, err := watchClient.Watch(ctx, &kueue.WorkloadList{}, &client.ListOptions{
Namespace: elasticJob.Namespace,
})
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
defer watcher.Stop()

ginkgo.By("scaling the job up within quota")
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(elasticJob), elasticJob)).Should(gomega.Succeed())
elasticJob.Spec.Parallelism = ptr.To[int32](2)
g.Expect(k8sClient.Update(ctx, elasticJob)).Should(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("verifying the old slice is not finished when the new slice becomes admitted")
util.ExpectWorkloadSliceAdmittedBeforeOldFinished(watcher, oldWorkloadSlice.Name, util.Timeout)

ginkgo.By("old workload is finished")
util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKeyFromObject(oldWorkloadSlice))
})
})

var _ = ginkgo.Describe("Job reconciliation", ginkgo.Ordered, func() {
Expand Down
37 changes: 37 additions & 0 deletions test/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -1543,3 +1544,39 @@ func waitForDummyWorkloadToRunOnNode(ctx context.Context, c client.Client, node
}, LongTimeout, Interval).Should(gomega.Succeed(), AssertMsg(fmt.Sprintf("Dummy workload did not complete on node %s", node.Name), &createdDummyJob))
})
}

// ExpectWorkloadSliceAdmittedBeforeOldFinished watches workload events and asserts
// that the old workload slice is not marked Finished before the new slice is Admitted.
func ExpectWorkloadSliceAdmittedBeforeOldFinished(watcher watch.Interface, oldWorkloadName string, timeout time.Duration) {
ginkgo.GinkgoHelper()
oldSliceFinished := false
newSliceAdmitted := false
timeoutCh := time.After(timeout)
for !newSliceAdmitted {
select {
case evt, ok := <-watcher.ResultChan():
gomega.Expect(ok).Should(gomega.BeTrue(), "watch channel closed unexpectedly")
if evt.Type == watch.Error {
status, _ := evt.Object.(*metav1.Status)
gomega.Expect(evt.Type).ShouldNot(gomega.Equal(watch.Error), fmt.Sprintf("watch error: %v", status))
}
if evt.Type != watch.Modified {
continue
}
wl, isWorkload := evt.Object.(*kueue.Workload)
gomega.Expect(isWorkload).Should(gomega.BeTrue())

if wl.Name == oldWorkloadName && workload.IsFinished(wl) {
oldSliceFinished = true
}
if wl.Name != oldWorkloadName && workload.IsAdmitted(wl) {
gomega.Expect(oldSliceFinished).Should(gomega.BeFalse(),
"old workload slice was finished before new slice was admitted")
newSliceAdmitted = true
}
Comment on lines +1569 to +1576
case <-timeoutCh:
gomega.Expect(newSliceAdmitted).Should(gomega.BeTrue(),
"timed out waiting for new workload slice to be admitted")
}
}
}