Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,15 @@ func (w *wlReconciler) nominateAndSynchronizeWorkers(ctx context.Context, group
for workerName := range group.remotes {
nominatedWorkers = append(nominatedWorkers, workerName)
}
if !equality.Semantic.DeepEqual(group.local.Status.NominatedClusterNames, nominatedWorkers) {
// ClusterName != nil indicates possibly stale cache (eviction just cleared ClusterName
// but the informer hasn't caught up yet). Avoid creating remote workloads without a
// confirmed nomination — wait for the cache to sync.
if group.local.Status.ClusterName != nil {
return reconcile.Result{}, nil
}
}

if group.local.Status.ClusterName == nil && !equality.Semantic.DeepEqual(group.local.Status.NominatedClusterNames, nominatedWorkers) {
if err := workload.PatchAdmissionStatus(ctx, w.client, group.local, w.clock, func(wl *kueue.Workload) (bool, error) {
wl.Status.NominatedClusterNames = nominatedWorkers
Expand Down
150 changes: 76 additions & 74 deletions test/e2e/multikueue/baseline/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,90 +1373,92 @@ app = HelloWorld.bind()`,
})
})

ginkgo.It("Should re-do admission process when workload gets evicted in the worker", func() {
job := testingjob.MakeJob("job", managerNs.Name).
WorkloadPriorityClass(managerLowWPC.Name).
Queue(kueue.LocalQueueName(managerLq.Name)).
RequestAndLimit(corev1.ResourceCPU, "100m").
RequestAndLimit(corev1.ResourceMemory, "0.1G").
RequestAndLimit(extraResourceGPUHighCost, "2"). // This will make the workload only schedulable in worker1
Obj()
util.MustCreate(ctx, k8sManagerClient, job)
for i := range 200 {
ginkgo.It(fmt.Sprintf("Should re-do admission process when workload gets evicted in the worker %d", i), func() {
job := testingjob.MakeJob("job", managerNs.Name).
WorkloadPriorityClass(managerLowWPC.Name).
Queue(kueue.LocalQueueName(managerLq.Name)).
RequestAndLimit(corev1.ResourceCPU, "100m").
RequestAndLimit(corev1.ResourceMemory, "0.1G").
RequestAndLimit(extraResourceGPUHighCost, "2"). // This will make the workload only schedulable in worker1
Obj()
util.MustCreate(ctx, k8sManagerClient, job)

wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}
managerWl := &kueue.Workload{}
workerWorkload := &kueue.Workload{}
wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name, job.UID), Namespace: managerNs.Name}
managerWl := &kueue.Workload{}
workerWorkload := &kueue.Workload{}

ginkgo.By("Checking that the workload is created and admitted in the manager cluster", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
g.Expect(workload.IsAdmitted(managerWl)).To(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not admitted in manager", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})
ginkgo.By("Checking that the workload is created and admitted in the manager cluster", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
g.Expect(workload.IsAdmitted(managerWl)).To(gomega.BeTrue())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not admitted in manager", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})

ginkgo.By("Checking that the workload is created on worker1", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not admitted in worker1", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})
ginkgo.By("Checking that the workload is created on worker1", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not admitted in worker1", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})

ginkgo.By("Checking that the workload is not created on worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload present in worker2", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})
ginkgo.By("Checking that the workload is not created on worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload present in worker2", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})

ginkgo.By("Switching worker cluster queues' resources to enforce re-admission on the worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(worker1Cq), worker1Cq)).To(gomega.Succeed())
g.Expect(k8sWorker1Client.Update(ctx, util.SetResourceNominalQuota(worker1Cq, extraResourceGPUHighCost, "1"))).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
ginkgo.By("Switching worker cluster queues' resources to enforce re-admission on the worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, client.ObjectKeyFromObject(worker1Cq), worker1Cq)).To(gomega.Succeed())
g.Expect(k8sWorker1Client.Update(ctx, util.SetResourceNominalQuota(worker1Cq, extraResourceGPUHighCost, "1"))).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())

gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(worker2Cq), worker2Cq)).To(gomega.Succeed())
g.Expect(k8sWorker2Client.Update(ctx, util.SetResourceNominalQuota(worker2Cq, extraResourceGPUHighCost, "2"))).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
ginkgo.By("Triggering eviction in worker1", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.SetConditionAndUpdate(
ctx,
k8sWorker1Client,
workerWorkload,
kueue.WorkloadEvicted,
metav1.ConditionTrue,
kueue.WorkloadEvictedByPreemption,
"By test",
"evict",
util.RealClock,
)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, client.ObjectKeyFromObject(worker2Cq), worker2Cq)).To(gomega.Succeed())
g.Expect(k8sWorker2Client.Update(ctx, util.SetResourceNominalQuota(worker2Cq, extraResourceGPUHighCost, "2"))).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
ginkgo.By("Triggering eviction in worker1", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.SetConditionAndUpdate(
ctx,
k8sWorker1Client,
workerWorkload,
kueue.WorkloadEvicted,
metav1.ConditionTrue,
kueue.WorkloadEvictedByPreemption,
"By test",
"evict",
util.RealClock,
)).To(gomega.Succeed())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Checking that the workload is re-admitted in worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
g.Expect(managerWl.Status.ClusterName).To(gomega.HaveValue(gomega.Equal(workerCluster2.Name)))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not Admitted in worker2", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})
ginkgo.By("Checking that the workload is re-admitted in worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sManagerClient.Get(ctx, wlKey, managerWl)).To(gomega.Succeed())
g.Expect(managerWl.Status.ClusterName).To(gomega.HaveValue(gomega.Equal(workerCluster2.Name)))
}, util.LongTimeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not Admitted in worker2", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})

ginkgo.By("Checking that the workload is created in worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not Admitted in worker2", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})
ginkgo.By("Checking that the workload is created in worker2", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker2Client.Get(ctx, wlKey, workerWorkload)).To(gomega.Succeed())
g.Expect(workload.IsAdmitted(workerWorkload)).To(gomega.BeTrue())
g.Expect(workerWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec))
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload not Admitted in worker2", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})

ginkgo.By("Checking that the workload is not created in worker1", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload present in worker1", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
ginkgo.By("Checking that the workload is not created in worker1", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sWorker1Client.Get(ctx, wlKey, workerWorkload)).To(utiltesting.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed(), util.AssertMsgForMk(ctx, "Workload present in worker1", wlKey, k8sManagerClient, k8sWorker1Client, k8sWorker2Client))
})
})
})
}

ginkgo.It("Should preempt a running low-priority workload when a high-priority workload is admitted (other workers)", func() {
lowJob := testingjob.MakeJob("", managerNs.Name).
Expand Down