Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apis/kueue/v1beta2/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ type ConcurrentAdmissionMigration struct {
// mode defines the mode of Workload's migration.
// The possible values are:
// - `TryPreferredFlavors` (default): a Workload will try to migrate to the preferred flavor after it's admitted and running.
// - `HoldFirstAdmission`: a Workload, once admitted to a flavor, will stick to a flavor and will not be migrated.
//
// +required
Mode ConcurrentAdmissionMigrationMode `json:"mode,omitempty"`
Expand All @@ -230,12 +231,14 @@ type ConcurrentAdmissionConstraints struct {
}

// +kubebuilder:validation:MaxLength=253
// +kubebuilder:validation:Enum=TryPreferredFlavors
// +kubebuilder:validation:Enum=TryPreferredFlavors;HoldFirstAdmission
type ConcurrentAdmissionMigrationMode string

const (
// TryPreferredFlavors means that a Workload will try to migrate to the preferred flavor after it's admitted and running.
ConcurrentAdmissionTryPreferredFlavors ConcurrentAdmissionMigrationMode = "TryPreferredFlavors"
// HoldFirstAdmission means that a Workload, once admitted to a flavor, will not try to migrate to another flavor.
ConcurrentAdmissionHoldFirstAdmission ConcurrentAdmissionMigrationMode = "HoldFirstAdmission"
)

// +kubebuilder:validation:XValidation:rule="self.flavors.all(x, size(x.resources) == size(self.coveredResources))", message="flavors must have the same number of resources as the coveredResources"
Expand Down
2 changes: 2 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -966,8 +966,10 @@ spec:
mode defines the mode of Workload's migration.
The possible values are:
- `TryPreferredFlavors` (default): a Workload will try to migrate to the preferred flavor after it's admitted and running.
- `HoldFirstAdmission`: a Workload, once admitted to a flavor, will stick to a flavor and will not be migrated.
enum:
- TryPreferredFlavors
- HoldFirstAdmission
maxLength: 253
type: string
required:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -958,8 +958,10 @@ spec:
mode defines the mode of Workload's migration.
The possible values are:
- `TryPreferredFlavors` (default): a Workload will try to migrate to the preferred flavor after it's admitted and running.
- `HoldFirstAdmission`: a Workload, once admitted to a flavor, will stick to a flavor and will not be migrated.
enum:
- TryPreferredFlavors
- HoldFirstAdmission
maxLength: 253
type: string
required:
Expand Down
160 changes: 102 additions & 58 deletions pkg/controller/concurrentadmission/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,22 @@ func (r *variantReconciler) deactivateVariant(ctx context.Context, v *kueue.Work
return nil
}

func (r *variantReconciler) deactivateMatchingVariants(ctx context.Context, variants []kueue.Workload, reason string, match func(*kueue.Workload) bool, logArgs ...any) error {
log := ctrl.LoggerFrom(ctx)
for i := range variants {
v := &variants[i]
if match != nil && !match(v) {
continue
}
logFields := append([]any{"variant", klog.KObj(v), "flavor", concurrentadmission.GetVariantFlavor(v), "reason", reason}, logArgs...)
log.V(2).Info("Deactivating variant", logFields...)
if err := r.deactivateVariant(ctx, v, reason); err != nil {
return err
}
}
return nil
}

func (r *variantReconciler) deactivateVariants(
ctx context.Context,
parent *kueue.Workload,
Expand All @@ -367,54 +383,65 @@ func (r *variantReconciler) deactivateVariants(
log := ctrl.LoggerFrom(ctx)
if !workload.IsActive(parent) {
log.V(2).Info("Parent is not active, deactivating all variants", "parent", klog.KObj(parent))
for i := range variants {
v := &variants[i]
if err := r.deactivateVariant(ctx, v, fmt.Sprintf("Parent Workload %q not active", klog.KObj(parent))); err != nil {
return err
}
}
return nil
return r.deactivateMatchingVariants(
ctx,
variants,
fmt.Sprintf("Parent Workload %q not active", klog.KObj(parent)),
nil,
)
}

admittedWl := getAdmittedVariant(variants)
if admittedWl == nil {
log.V(3).Info("No admitted variant, no need to deactivate any variant")
return nil
}
// deactivate Variants below lastAcceptableFlavor if specified
var lastAcceptableFlavor *kueue.ResourceFlavorReference
if cq.Spec.ConcurrentAdmissionPolicy != nil && cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints != nil {
lastAcceptableFlavor = cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints.LastAcceptableFlavorName
}
if lastAcceptableFlavor != nil {
log.V(3).Info("Deactivating variants below lastAcceptableFlavor", "lastAcceptableFlavor", *lastAcceptableFlavor)
for i := range variants {
v := &variants[i]
if v.Name == admittedWl.Name {
continue
}
if flavorOrder[concurrentadmission.GetVariantFlavor(v)] > flavorOrder[*lastAcceptableFlavor] {
log.V(2).
Info("Deactivating variant because it is below the lastAcceptableFlavor", "variant", klog.KObj(v), "flavor", concurrentadmission.GetVariantFlavor(v), "lastAcceptableFlavor", *lastAcceptableFlavor)
if err := r.deactivateVariant(ctx, v, fmt.Sprintf("being below lastAcceptableFlavor: %q and another Variant admitted %q", *lastAcceptableFlavor, klog.KObj(admittedWl))); err != nil {
return err
}
}
switch migrationMode(cq) {
case kueue.ConcurrentAdmissionHoldFirstAdmission:
log.V(3).Info("HoldFirstAdmission mode, deactivating all variants except the admitted one", "admittedVariant", klog.KObj(admittedWl))
return r.deactivateMatchingVariants(
ctx,
variants,
fmt.Sprintf("HoldFirstAdmission: another Variant %q is admitted", klog.KObj(admittedWl)),
func(v *kueue.Workload) bool { return v.Name != admittedWl.Name },
)
case kueue.ConcurrentAdmissionTryPreferredFlavors:
// deactivate Variants below lastAcceptableFlavor if specified
var lastAcceptableFlavor *kueue.ResourceFlavorReference
if cq.Spec.ConcurrentAdmissionPolicy != nil && cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints != nil {
lastAcceptableFlavor = cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints.LastAcceptableFlavorName
}
}
// also deactivate Variants below the admitted variant regardless of lastAcceptableFlavorName
log.V(3).Info("Deactivating variants below the admitted variant", "admittedVariant", klog.KObj(admittedWl), "admittedFlavor", concurrentadmission.GetVariantFlavor(admittedWl))
for i := range variants {
v := &variants[i]
if flavorOrder[concurrentadmission.GetVariantFlavor(v)] > flavorOrder[concurrentadmission.GetVariantFlavor(admittedWl)] {
log.V(2).
Info("Deactivating variant because it is below the admitted variant", "variant", klog.KObj(v), "flavor", concurrentadmission.GetVariantFlavor(v), "admittedFlavor", concurrentadmission.GetVariantFlavor(admittedWl))
if err := r.deactivateVariant(ctx, v, fmt.Sprintf("being lower priority than admitted Variant %q", klog.KObj(admittedWl))); err != nil {
if lastAcceptableFlavor != nil {
log.V(3).Info("Deactivating variants below lastAcceptableFlavor", "lastAcceptableFlavor", *lastAcceptableFlavor)
err := r.deactivateMatchingVariants(
ctx,
variants,
fmt.Sprintf("being below lastAcceptableFlavor: %q and another Variant admitted %q", *lastAcceptableFlavor, klog.KObj(admittedWl)),
func(v *kueue.Workload) bool {
return v.Name != admittedWl.Name &&
flavorOrder[concurrentadmission.GetVariantFlavor(v)] > flavorOrder[*lastAcceptableFlavor]
},
"lastAcceptableFlavor", *lastAcceptableFlavor,
)
if err != nil {
return err
}
}
// also deactivate Variants below the admitted variant regardless of lastAcceptableFlavorName
log.V(3).Info("Deactivating variants below the admitted variant", "admittedVariant", klog.KObj(admittedWl), "admittedFlavor", concurrentadmission.GetVariantFlavor(admittedWl))
return r.deactivateMatchingVariants(
ctx,
variants,
fmt.Sprintf("being lower priority than admitted Variant %q", klog.KObj(admittedWl)),
func(v *kueue.Workload) bool {
return flavorOrder[concurrentadmission.GetVariantFlavor(v)] > flavorOrder[concurrentadmission.GetVariantFlavor(admittedWl)]
},
"admittedFlavor", concurrentadmission.GetVariantFlavor(admittedWl),
)
default:
log.Error(nil, "Unknown migration mode, skipping deactivation", "mode", migrationMode(cq))
return nil
}
return nil
}

func (r *variantReconciler) activateVariants(ctx context.Context, parent *kueue.Workload, variants []kueue.Workload, cq *kueue.ClusterQueue, flavorOrder map[kueue.ResourceFlavorReference]int) error {
Expand All @@ -435,36 +462,45 @@ func (r *variantReconciler) activateVariants(ctx context.Context, parent *kueue.
}
return nil
}
// activate all variants that are at least at the lastAcceptableFlavorName if specificed
var lastAcceptableFlavor *kueue.ResourceFlavorReference
if cq.Spec.ConcurrentAdmissionPolicy != nil && cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints != nil {
lastAcceptableFlavor = cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints.LastAcceptableFlavorName
}
if lastAcceptableFlavor != nil {
switch migrationMode(cq) {
case kueue.ConcurrentAdmissionHoldFirstAdmission:
log.V(3).Info("HoldFirstAdmission mode and a Variant is admitted, not activating any other variant", "admittedVariant", klog.KObj(admittedVariant))
return nil
case kueue.ConcurrentAdmissionTryPreferredFlavors:
// activate all variants that are at least at the lastAcceptableFlavorName if specificed
var lastAcceptableFlavor *kueue.ResourceFlavorReference
if cq.Spec.ConcurrentAdmissionPolicy != nil && cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints != nil {
lastAcceptableFlavor = cq.Spec.ConcurrentAdmissionPolicy.Migration.Constraints.LastAcceptableFlavorName
}
if lastAcceptableFlavor != nil {
for i := range variants {
v := &variants[i]
if flavorOrder[concurrentadmission.GetVariantFlavor(v)] <= flavorOrder[*lastAcceptableFlavor] &&
flavorOrder[concurrentadmission.GetVariantFlavor(v)] < flavorOrder[concurrentadmission.GetVariantFlavor(admittedVariant)] {
// activate the variant, the smaller or equal the flavor order is to the lastAcceptableFlavor, the higher the priority is
if err := r.activateWl(ctx, v, fmt.Sprintf("being at least lastAcceptableFlavor: %q and higher priority than admitted Variant %q",
*lastAcceptableFlavor, klog.KObj(admittedVariant))); err != nil {
return err
}
}
}
return nil
}
// no lastAcceptableFlavorName specified, so activate all variants that are below the admitted variant in the flavor order
for i := range variants {
v := &variants[i]
if flavorOrder[concurrentadmission.GetVariantFlavor(v)] <= flavorOrder[*lastAcceptableFlavor] &&
flavorOrder[concurrentadmission.GetVariantFlavor(v)] < flavorOrder[concurrentadmission.GetVariantFlavor(admittedVariant)] {
// activate the variant, the smaller or equal the flavor order is to the lastAcceptableFlavor, the higher the priority is
if err := r.activateWl(ctx, v, fmt.Sprintf("being at least lastAcceptableFlavor: %q and higher priority than admitted Variant %q",
*lastAcceptableFlavor, klog.KObj(admittedVariant))); err != nil {
if flavorOrder[concurrentadmission.GetVariantFlavor(v)] < flavorOrder[concurrentadmission.GetVariantFlavor(admittedVariant)] {
// activate the variant, the smaller the flavor order is to the admitted variant, the higher the priority is
if err := r.activateWl(ctx, v, fmt.Sprintf("being higher priority than admitted Variant %q", klog.KObj(admittedVariant))); err != nil {
return err
}
}
}
return nil
default:
log.Error(nil, "Unknown migration mode, skipping activation", "mode", migrationMode(cq))
return nil
}
// no lastAcceptableFlavorName specified, so activate all variants that are below the admitted variant in the flavor order
for i := range variants {
v := &variants[i]
if flavorOrder[concurrentadmission.GetVariantFlavor(v)] < flavorOrder[concurrentadmission.GetVariantFlavor(admittedVariant)] {
// activate the variant, the smaller the flavor order is to the admitted variant, the higher the priority is
if err := r.activateWl(ctx, v, fmt.Sprintf("being higher priority than admitted Variant %q", klog.KObj(admittedVariant))); err != nil {
return err
}
}
}
return nil
}

func (r *variantReconciler) activateWl(ctx context.Context, wl *kueue.Workload, message string) error {
Expand Down Expand Up @@ -606,3 +642,11 @@ func getAdmittedVariant(variants []kueue.Workload) *kueue.Workload {
}
return nil
}

func migrationMode(cq *kueue.ClusterQueue) kueue.ConcurrentAdmissionMigrationMode {
if cq.Spec.ConcurrentAdmissionPolicy == nil ||
cq.Spec.ConcurrentAdmissionPolicy.Migration.Mode == "" {
return kueue.ConcurrentAdmissionTryPreferredFlavors
}
return cq.Spec.ConcurrentAdmissionPolicy.Migration.Mode
}
107 changes: 105 additions & 2 deletions pkg/controller/concurrentadmission/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func TestReconcile(t *testing.T) {
Obj()
migrationLQNoConstraint := utiltestingapi.MakeLocalQueue("lq-migration-no-constraint", "default").ClusterQueue("cq-migration-no-constraint").Obj()

holdFirstAdmissionCQ := utiltestingapi.MakeClusterQueue("cq-hold").
ResourceGroup(
*utiltestingapi.MakeFlavorQuotas("reservation").Obj(),
*utiltestingapi.MakeFlavorQuotas("on-demand").Obj(),
*utiltestingapi.MakeFlavorQuotas("spot").Obj(),
).
ConcurrentAdmissionPolicy(kueue.ConcurrentAdmissionHoldFirstAdmission).
Obj()
holdFirstAdmissionLQ := utiltestingapi.MakeLocalQueue("lq-hold", "default").ClusterQueue("cq-hold").Obj()

testCases := map[string]struct {
parentWorkload *kueue.Workload
variantWorkloads []kueue.Workload
Expand Down Expand Up @@ -966,6 +976,99 @@ func TestReconcile(t *testing.T) {
},
},
},
"HoldFirstAdmission, variant admitted on spot, deactivate reservation and on-demand": {
parentWorkload: utiltestingapi.MakeWorkload("wl-12345", "default").
Queue("lq-hold").
Request(corev1.ResourceCPU, "1").
Label(constants.ConcurrentAdmissionParentLabelKey, "true").
Obj(),
variantWorkloads: []kueue.Workload{
*utiltestingapi.MakeWorkload("wl-variant-reservation", "default").
Queue("lq-hold").
AllowedFlavors("reservation").
ControllerReference(kueue.GroupVersion.WithKind("Workload"), "wl-12345", "").
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltestingapi.MakeWorkload("wl-variant-on-demand", "default").
Queue("lq-hold").
AllowedFlavors("on-demand").
ControllerReference(kueue.GroupVersion.WithKind("Workload"), "wl-12345", "").
Request(corev1.ResourceCPU, "1").
Obj(),
*utiltestingapi.MakeWorkload("wl-variant-spot", "default").
Queue("lq-hold").
AllowedFlavors("spot").
ControllerReference(kueue.GroupVersion.WithKind("Workload"), "wl-12345", "").
Request(corev1.ResourceCPU, "1").
SimpleReserveQuota("cq-hold", "spot", metav1.Now().Time).
AdmittedAt(true, metav1.Now().Time).
Obj(),
},
wantParentWorkload: utiltestingapi.MakeWorkload("wl-12345", "default").
Queue("lq-hold").
Request(corev1.ResourceCPU, "1").
Label(constants.ConcurrentAdmissionParentLabelKey, "true").
Admission(utiltestingapi.MakeAdmission("cq-hold", "main").
PodSets(kueue.PodSetAssignment{
Name: "main",
Flavors: map[corev1.ResourceName]kueue.ResourceFlavorReference{
corev1.ResourceCPU: "spot",
},
Count: ptr.To[int32](1),
ResourceUsage: corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1")},
}).Obj()).
Condition(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
Reason: "Admitted",
Message: "The variant wl-variant-spot is admitted",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionTrue,
Reason: "QuotaReserved",
Message: "Quota reserved in ClusterQueue cq-hold",
}).
Obj(),
wantVariantWorkloads: []kueue.Workload{
*utiltestingapi.MakeWorkload("wl-variant-reservation", "default").
Queue("lq-hold").
AllowedFlavors("reservation").
ControllerReference(kueue.GroupVersion.WithKind("Workload"), "wl-12345", "").
Request(corev1.ResourceCPU, "1").
Active(false).
Obj(),
*utiltestingapi.MakeWorkload("wl-variant-on-demand", "default").
Queue("lq-hold").
AllowedFlavors("on-demand").
ControllerReference(kueue.GroupVersion.WithKind("Workload"), "wl-12345", "").
Request(corev1.ResourceCPU, "1").
Active(false).
Obj(),
*utiltestingapi.MakeWorkload("wl-variant-spot", "default").
Queue("lq-hold").
AllowedFlavors("spot").
ControllerReference(kueue.GroupVersion.WithKind("Workload"), "wl-12345", "").
Request(corev1.ResourceCPU, "1").
SimpleReserveQuota("cq-hold", "spot", metav1.Now().Time).
AdmittedAt(true, metav1.Now().Time).
Obj(),
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Namespace: "default", Name: "wl-variant-reservation"},
EventType: corev1.EventTypeNormal,
Reason: ReasonDeactivatedVariant,
Message: "Variant Workload deactivated due to HoldFirstAdmission: another Variant \"default/wl-variant-spot\" is admitted",
},
{
Key: types.NamespacedName{Namespace: "default", Name: "wl-variant-on-demand"},
EventType: corev1.EventTypeNormal,
Reason: ReasonDeactivatedVariant,
Message: "Variant Workload deactivated due to HoldFirstAdmission: another Variant \"default/wl-variant-spot\" is admitted",
},
},
},
"parent marked finished, mark all variants finished": {
parentWorkload: utiltestingapi.MakeWorkload("wl-12345", "default").
Queue("lq").
Expand Down Expand Up @@ -1341,8 +1444,8 @@ func TestReconcile(t *testing.T) {
qManager := qcache.NewManagerForUnitTests(cl, nil, qcache.WithPreemptionExpectations(preemptionExpectations))
roleTracker := roletracker.NewFakeRoleTracker(roletracker.RoleLeader)

cqs := []*kueue.ClusterQueue{defaultCQ.DeepCopy(), migrationCQ.DeepCopy(), migrationCQNoConstraint.DeepCopy()}
lqs := []*kueue.LocalQueue{defaultLQ.DeepCopy(), migrationLQ.DeepCopy(), migrationLQNoConstraint.DeepCopy()}
cqs := []*kueue.ClusterQueue{defaultCQ.DeepCopy(), migrationCQ.DeepCopy(), migrationCQNoConstraint.DeepCopy(), holdFirstAdmissionCQ.DeepCopy()}
lqs := []*kueue.LocalQueue{defaultLQ.DeepCopy(), migrationLQ.DeepCopy(), migrationLQNoConstraint.DeepCopy(), holdFirstAdmissionLQ.DeepCopy()}

for _, cq := range cqs {
if err := cl.Create(t.Context(), cq); err != nil {
Expand Down
Loading