diff --git a/pkg/controller/jobframework/defaults.go b/pkg/controller/jobframework/defaults.go index 7803f15135b..8289a9af37d 100644 --- a/pkg/controller/jobframework/defaults.go +++ b/pkg/controller/jobframework/defaults.go @@ -18,10 +18,8 @@ package jobframework import ( "context" - "fmt" "github.com/go-logr/logr" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -45,38 +43,6 @@ func ApplyDefaultForSuspend(ctx context.Context, job GenericJob, k8sClient clien return nil } -// WorkloadShouldBeSuspended determines whether jobObj should be default suspended on creation -func WorkloadShouldBeSuspended(ctx context.Context, jobObj client.Object, k8sClient client.Client, - manageJobsWithoutQueueName bool, managedJobsNamespaceSelector labels.Selector) (bool, error) { - // Do not default suspend a job whose ancestor is already managed by Kueue - ancestorJob, err := FindAncestorJobManagedByKueue(ctx, k8sClient, jobObj, manageJobsWithoutQueueName) - if err != nil || ancestorJob != nil { - return false, err - } - - // Jobs with queue names whose parents are not managed by Kueue are default suspended - if QueueNameForObject(jobObj) != "" { - return true, nil - } - - // Logic for managing jobs without queue names. - if manageJobsWithoutQueueName { - if managedJobsNamespaceSelector != nil { - // Default suspend the job if the namespace selector matches - ns := corev1.Namespace{} - err := k8sClient.Get(ctx, client.ObjectKey{Name: jobObj.GetNamespace()}, &ns) - if err != nil { - return false, fmt.Errorf("failed to get namespace: %w", err) - } - return managedJobsNamespaceSelector.Matches(labels.Set(ns.GetLabels())), nil - } else { - // Namespace filtering is disabled; unconditionally default suspend - return true, nil - } - } - return false, nil -} - func ApplyDefaultLocalQueue(jobObj client.Object, defaultQueueExist func(string) bool) { if !defaultQueueExist(jobObj.GetNamespace()) { return diff --git a/pkg/controller/jobframework/interface.go b/pkg/controller/jobframework/interface.go index fa73580d2d9..418d3bf60b9 100644 --- a/pkg/controller/jobframework/interface.go +++ b/pkg/controller/jobframework/interface.go @@ -18,7 +18,6 @@ package jobframework import ( "context" - "strconv" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -29,166 +28,192 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" - "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/podset" - "sigs.k8s.io/kueue/pkg/util/admissioncheck" - "sigs.k8s.io/kueue/pkg/util/maps" ) -// GenericJob if the interface which needs to be implemented by all jobs -// managed by the kueue's jobframework. +// GenericJob is a required interface that must be implemented by all jobs +// managed by Kueue's jobframework. type GenericJob interface { // Object returns the job instance. Object() client.Object - // IsSuspended returns whether the job is suspended or not. + + // IsSuspended returns whether the job is suspended. IsSuspended() bool - // Suspend will suspend the job. + + // Suspend suspends the job. Suspend() - // RunWithPodSetsInfo will inject the node affinity and podSet counts extracting from workload to job and unsuspend it. + + // RunWithPodSetsInfo injects node affinity and pod set counts extracted + // from the workload into the job and unsuspends it. RunWithPodSetsInfo(ctx context.Context, podSetsInfo []podset.PodSetInfo) error - // RestorePodSetsInfo will restore the original node affinity and podSet counts of the job. - // Returns whether any change was done. + + // RestorePodSetsInfo restores the original node affinity and pod set counts + // of the job. It returns whether any change was made. RestorePodSetsInfo(podSetsInfo []podset.PodSetInfo) bool - // Finished means whether the job is completed/failed or not, - // condition represents the workload finished condition. + + // Finished returns whether the job is completed or failed. + // The message describes the condition, and success indicates completion status. // Observed generation of the workload is set by the jobframework. Finished(ctx context.Context) (message string, success, finished bool) - // PodSets will build workload podSets corresponding to the job. + + // PodSets builds workload pod sets corresponding to the job. PodSets(ctx context.Context) ([]kueue.PodSet, error) + // IsActive returns true if there are any running pods. IsActive() bool - // PodsReady instructs whether job derived pods are all ready now. + + // PodsReady indicates whether all job-derived pods are ready. PodsReady(ctx context.Context) bool - // GVK returns GVK (Group Version Kind) for the job. + + // GVK returns the GroupVersionKind for the job. GVK() schema.GroupVersionKind } -// Optional interfaces, are meant to implemented by jobs to enable additional -// features of the jobframework reconciler. - +// JobWithPodLabelSelector is an optional interface that should be implemented by generic jobs +// when pod label selector information is needed. type JobWithPodLabelSelector interface { // PodLabelSelector returns the label selector used by pods for the job. PodLabelSelector() string } +// JobWithReclaimablePods is an optional interface that should be implemented by generic jobs +// when reclaimable pod information is needed. type JobWithReclaimablePods interface { // ReclaimablePods returns the list of reclaimable pods. ReclaimablePods(ctx context.Context) ([]kueue.ReclaimablePod, error) } -type StopReason string - -const ( - StopReasonWorkloadDeleted StopReason = "WorkloadDeleted" - StopReasonWorkloadEvicted StopReason = "WorkloadEvicted" - StopReasonNoMatchingWorkload StopReason = "NoMatchingWorkload" - StopReasonNotAdmitted StopReason = "NotAdmitted" -) - +// JobWithCustomStop is an optional interface that should be implemented by generic jobs +// when a custom stop procedure is needed. type JobWithCustomStop interface { // Stop implements a custom stop procedure. - // The function should be idempotent: not do any API calls if the job is already stopped. - // Returns whether the Job stopped with this call or an error + // The function should be idempotent and must not perform any API calls if the job is already stopped. + // Returns whether the Job was stopped by this call or an error. Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) (bool, error) } -// JobWithFinalize interface should be implemented by generic jobs, -// when custom finalization logic is needed for a job, after it's finished. +// JobWithFinalize is an optional interface that should be implemented by generic jobs +// when custom finalization logic is needed for a job after it has finished. type JobWithFinalize interface { Finalize(ctx context.Context, c client.Client) error } -// JobWithSkip interface should be implemented by generic jobs, -// when reconciliation should be skipped depending on the job's state +// JobWithSkip is an optional interface that should be implemented by generic jobs +// when reconciliation should be skipped depending on the job's state. type JobWithSkip interface { Skip(ctx context.Context) bool } +// JobWithPriorityClass is an optional interface that should be implemented by generic jobs +// when a custom priority class is needed. type JobWithPriorityClass interface { - // PriorityClass returns the job's priority class name. + // PriorityClass returns the name of the job's priority class. PriorityClass() string } -// JobWithCustomValidation optional interface that allows custom webhook validation -// for Jobs that use BaseWebhook. +// JobWithCustomValidation is an optional interface that allows custom webhook validation +// for jobs that use BaseWebhook. type JobWithCustomValidation interface { - // ValidateOnCreate returns list of webhook create validation errors. + // ValidateOnCreate returns a list of webhook create validation errors. ValidateOnCreate(ctx context.Context) (field.ErrorList, error) - // ValidateOnUpdate returns list of webhook update validation errors. + + // ValidateOnUpdate returns a list of webhook update validation errors. ValidateOnUpdate(ctx context.Context, oldJob GenericJob) (field.ErrorList, error) } -// ComposableJob interface should be implemented by generic jobs that -// are composed out of multiple API objects. +// ComposableJob is an optional interface that should be implemented by generic jobs +// composed of multiple API objects. type ComposableJob interface { - // Load loads all members of the composable job. If removeFinalizers == true, workload and job finalizers should be removed. + // Load loads all members of the composable job. If removeFinalizers is true, + // workload and job finalizers should be removed. Load(ctx context.Context, c client.Client, key *types.NamespacedName) (removeFinalizers bool, err error) - // Run unsuspends all members of the ComposableJob and injects the node affinity with podSet - // counts extracting from workload to all members of the ComposableJob. + + // Run unsuspends all members of the ComposableJob and injects node affinity + // with pod set counts extracted from the workload into all members of the job. Run(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, r events.EventRecorder, msg string) error - // ConstructComposableWorkload returns a new Workload that's assembled out of all members of the ComposableJob. + + // ConstructComposableWorkload builds a new Workload from all members of the ComposableJob. ConstructComposableWorkload(ctx context.Context, c client.Client, r events.EventRecorder, labelKeysToCopy []string) (*kueue.Workload, error) + // ListChildWorkloads returns all workloads related to the composable job. ListChildWorkloads(ctx context.Context, c client.Client, parent types.NamespacedName) (*kueue.WorkloadList, error) - // FindMatchingWorkloads returns all related workloads, workload that matches the ComposableJob and duplicates that has to be deleted. + + // FindMatchingWorkloads returns related workloads: the matching ComposableJob workload + // and any duplicates that should be deleted. FindMatchingWorkloads(ctx context.Context, c client.Client, r events.EventRecorder) (match *kueue.Workload, toDelete []*kueue.Workload, err error) + // Stop implements the custom stop procedure for ComposableJob. Stop(ctx context.Context, c client.Client, podSetsInfo []podset.PodSetInfo, stopReason StopReason, eventMsg string) ([]client.Object, error) + // ForEach calls f on each member of the ComposableJob. ForEach(f func(obj runtime.Object)) - // EnsureWorkloadOwnedByAllMembers ensures that the provided workload is owned by the specified owners. - // If the workload is not owned by all the specified owners, it adds them to the owner references. - // Returns true if the workload is updated, and an error if any issues occur. + + // EnsureWorkloadOwnedByAllMembers ensures that the provided workload is owned by all specified members. + // If not, it adds missing owner references and returns an error if any issue occurs. EnsureWorkloadOwnedByAllMembers(ctx context.Context, c client.Client, r events.EventRecorder, workload *kueue.Workload) error - // EquivalentToWorkload checks if the provided workload is equivalent to the target workload. + + // EquivalentToWorkload checks whether the provided workload is equivalent to the target workload. // Returns true if they are equivalent and an error if any issues occur. EquivalentToWorkload(ctx context.Context, c client.Client, wl *kueue.Workload) (bool, error) } -// JobWithCustomWorkloadConditions interface should be implemented by generic jobs, -// when custom workload conditions should be updated after ensure that the workload exists. +// JobWithCustomWorkloadConditions is an optional interface that should be implemented +// by generic jobs when custom workload conditions need to be updated after ensuring +// the workload exists. type JobWithCustomWorkloadConditions interface { - // CustomWorkloadConditions return custom workload conditions and status changed or not. + // CustomWorkloadConditions returns custom workload conditions and whether the status changed. CustomWorkloadConditions(wl *kueue.Workload) ([]metav1.Condition, bool) } -// JobWithCustomWorkloadActivation interface should be implemented by generic jobs, -// when custom logic is needed to determine if the workload is active. +// JobWithCustomWorkloadActivation is an optional interface that should be implemented +// by generic jobs when custom logic is needed to determine whether the workload is active. type JobWithCustomWorkloadActivation interface { + // IsWorkloadActive returns true if the workload is active. IsWorkloadActive() bool } -// JobWithManagedBy interface should be implemented by generic jobs -// that implement the managedBy protocol for Multi-Kueue +// JobWithManagedBy is an optional interface that should be implemented +// by generic jobs that support the managedBy protocol for Multi-Kueue. type JobWithManagedBy interface { - // CanDefaultManagedBy returns true of ManagedBy() would return nil or the default controller for the framework + // CanDefaultManagedBy returns true if ManagedBy() would return nil + // or the default controller for the framework. CanDefaultManagedBy() bool - // ManagedBy returns the name of the controller that is managing the Job + + // ManagedBy returns the name of the controller managing the Job. ManagedBy() *string - // SetManagedBy sets the field in the spec that contains the name of the managing controller + + // SetManagedBy sets the spec field containing the name + // of the managing controller. SetManagedBy(*string) } -// JobWithCustomAnnotations interface should be implemented by generic jobs -// when custom annotations should be updated to API server when there is change. -// An example is RayJob may have "kueue.x-k8s.io/podset-replica-sizes", which reflects the current replica sizes from -// underlying RayCluster. Job reconciler will call this method `UpdateAnnotations` to update such annotations to API Server. +// JobWithCustomAnnotations is an optional interface that should be implemented +// by generic jobs when custom annotations need to be updated in the API server +// after changes occur. +// +// For example, RayJob may have the "kueue.x-k8s.io/podset-replica-sizes" +// annotation, which reflects the current replica sizes of the underlying +// RayCluster. The job reconciler calls GetCustomAnnotations to update +// such annotations in the API server. type JobWithCustomAnnotations interface { - // GetCustomAnnotations gets extra annotations needed to be added to the job + // GetCustomAnnotations returns additional annotations + // that should be added to the job. GetCustomAnnotations(ctx context.Context, c client.Client, podSets []kueue.PodSet) (map[string]string, error) } -// ElasticWorkloadNameProvider interface contains method to provide extra information to build workload name for elastic job +// ElasticWorkloadNameProvider is an optional interface that provides additional +// information for building workload names for elastic jobs. type ElasticWorkloadNameProvider interface { - // GetWorkloadNameExtraPart gets extra information to build workload name + // GetWorkloadNameExtraPart returns additional information used + // to build the workload name. GetWorkloadNameExtraPart() string } -// TopLevelJob interface is an optional interface used to indicate -// that the Job owns/manages the Workload object, regardless of the Job +// TopLevelJob is an optional interface used to indicate +// that the Job owns or manages the Workload object, regardless of the Job's // owner references. type TopLevelJob interface { - // IsTopLevel returns true if the Job owns/manages the Workload. + // IsTopLevel returns true if the Job owns or manages the Workload. IsTopLevel() bool } @@ -199,96 +224,3 @@ type JobWithCustomQueueNameChange interface { // to customize the workload queue-name using custom logic. CustomQueueNameChange(ctx context.Context, c client.Client, wl *kueue.Workload) error } - -func QueueName(job GenericJob) kueue.LocalQueueName { - return QueueNameForObject(job.Object()) -} - -func QueueNameForObject(object client.Object) kueue.LocalQueueName { - return kueue.LocalQueueName(object.GetLabels()[constants.QueueLabel]) -} - -func MaximumExecutionTimeSeconds(job GenericJob) *int32 { - return MaximumExecutionTimeSecondsForObject(job.Object()) -} - -func MaximumExecutionTimeSecondsForObject(object client.Object) *int32 { - strVal, found := object.GetLabels()[constants.MaxExecTimeSecondsLabel] - if !found { - return nil - } - - v, err := strconv.ParseInt(strVal, 10, 32) - if err != nil || v <= 0 { - return nil - } - - return new(int32(v)) -} - -func WorkloadPriorityClassName(object client.Object) string { - if workloadPriorityClassLabel := object.GetLabels()[constants.WorkloadPriorityClassLabel]; workloadPriorityClassLabel != "" { - return workloadPriorityClassLabel - } - return "" -} - -func PrebuiltWorkloadFor(job GenericJob) (string, bool) { - name, found := job.Object().GetLabels()[constants.PrebuiltWorkloadLabel] - return name, found -} - -func NewWorkload(name string, obj client.Object, podSets []kueue.PodSet, labelKeysToCopy []string) *kueue.Workload { - return &kueue.Workload{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: obj.GetNamespace(), - Labels: maps.FilterKeys(obj.GetLabels(), labelKeysToCopy), - Finalizers: []string{kueue.ResourceInUseFinalizerName}, - Annotations: admissioncheck.FilterProvReqAnnotations(obj.GetAnnotations()), - }, - Spec: kueue.WorkloadSpec{ - QueueName: QueueNameForObject(obj), - PodSets: podSets, - MaximumExecutionTimeSeconds: MaximumExecutionTimeSecondsForObject(obj), - }, - } -} - -// MultiKueueAdapter interface needed for MultiKueue job delegation. -type MultiKueueAdapter interface { - // SyncJob creates the Job object in the worker cluster using remote client, if not already created. - // Copy the status from the remote job if already exists. - SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error - // DeleteRemoteObject deletes the Job in the worker cluster. - DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error - // IsJobManagedByKueue returns: - // - a bool indicating if the job object identified by key is managed by kueue and can be delegated. - // - a reason indicating why the job is not managed by Kueue - // - any API error encountered during the check - IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error) - // GVK returns GVK (Group Version Kind) for the job. - GVK() schema.GroupVersionKind -} - -// MultiKueueWatcher optional interface that can be implemented by a MultiKueueAdapter -// to receive job related watch events from the worker cluster. -// If not implemented, MultiKueue will only receive events related to the job's workload. -type MultiKueueWatcher interface { - // GetEmptyList returns an empty list of objects - GetEmptyList() client.ObjectList - // WorkloadKeysFor returns the keys of the workloads of interest - // - the object name for workloads - // - the prebuilt workload(s) for job types - WorkloadKeysFor(runtime.Object) ([]types.NamespacedName, error) -} - -// MultiKueueMultiWorkloadAdapter is an optional interface for MultiKueue adapters -// whose jobs create multiple workloads (e.g., LeaderWorkerSet creates one workload per replica). -type MultiKueueMultiWorkloadAdapter interface { - // GetExpectedWorkloadCount returns the number of workloads the job creates. - GetExpectedWorkloadCount(ctx context.Context, c client.Client, key types.NamespacedName) (int, error) - // GetWorkloadIndex extracts the numeric index from the workload for ordering. - // Returns -1 if the index cannot be determined. - GetWorkloadIndex(wl *kueue.Workload) int -} diff --git a/pkg/controller/jobframework/multikueue.go b/pkg/controller/jobframework/multikueue.go new file mode 100644 index 00000000000..caac4313d52 --- /dev/null +++ b/pkg/controller/jobframework/multikueue.go @@ -0,0 +1,66 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jobframework + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" +) + +// MultiKueueAdapter interface needed for MultiKueue job delegation. +type MultiKueueAdapter interface { + // SyncJob creates the Job object in the worker cluster using remote client, if not already created. + // Copy the status from the remote job if already exists. + SyncJob(ctx context.Context, localClient client.Client, remoteClient client.Client, key types.NamespacedName, workloadName, origin string) error + // DeleteRemoteObject deletes the Job in the worker cluster. + DeleteRemoteObject(ctx context.Context, remoteClient client.Client, key types.NamespacedName) error + // IsJobManagedByKueue returns: + // - a bool indicating if the job object identified by key is managed by kueue and can be delegated. + // - a reason indicating why the job is not managed by Kueue + // - any API error encountered during the check + IsJobManagedByKueue(ctx context.Context, localClient client.Client, key types.NamespacedName) (bool, string, error) + // GVK returns GVK (Group Version Kind) for the job. + GVK() schema.GroupVersionKind +} + +// MultiKueueWatcher optional interface that can be implemented by a MultiKueueAdapter +// to receive job related watch events from the worker cluster. +// If not implemented, MultiKueue will only receive events related to the job's workload. +type MultiKueueWatcher interface { + // GetEmptyList returns an empty list of objects + GetEmptyList() client.ObjectList + // WorkloadKeysFor returns the keys of the workloads of interest + // - the object name for workloads + // - the prebuilt workload(s) for job types + WorkloadKeysFor(runtime.Object) ([]types.NamespacedName, error) +} + +// MultiKueueMultiWorkloadAdapter is an optional interface for MultiKueue adapters +// whose jobs create multiple workloads (e.g., LeaderWorkerSet creates one workload per replica). +type MultiKueueMultiWorkloadAdapter interface { + // GetExpectedWorkloadCount returns the number of workloads the job creates. + GetExpectedWorkloadCount(ctx context.Context, c client.Client, key types.NamespacedName) (int, error) + // GetWorkloadIndex extracts the numeric index from the workload for ordering. + // Returns -1 if the index cannot be determined. + GetWorkloadIndex(wl *kueue.Workload) int +} diff --git a/pkg/controller/jobframework/stop_reason.go b/pkg/controller/jobframework/stop_reason.go new file mode 100644 index 00000000000..fb802697bb6 --- /dev/null +++ b/pkg/controller/jobframework/stop_reason.go @@ -0,0 +1,26 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package jobframework + +type StopReason string + +const ( + StopReasonWorkloadDeleted StopReason = "WorkloadDeleted" + StopReasonWorkloadEvicted StopReason = "WorkloadEvicted" + StopReasonNoMatchingWorkload StopReason = "NoMatchingWorkload" + StopReasonNotAdmitted StopReason = "NotAdmitted" +) diff --git a/pkg/controller/jobframework/utils.go b/pkg/controller/jobframework/utils.go index 1261a5f02de..44b18a08216 100644 --- a/pkg/controller/jobframework/utils.go +++ b/pkg/controller/jobframework/utils.go @@ -18,14 +18,21 @@ package jobframework import ( "context" + "fmt" + "strconv" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2" + controllerconstants "sigs.k8s.io/kueue/pkg/controller/constants" "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" + "sigs.k8s.io/kueue/pkg/util/admissioncheck" + "sigs.k8s.io/kueue/pkg/util/maps" "sigs.k8s.io/kueue/pkg/util/orderedgroups" utilqueue "sigs.k8s.io/kueue/pkg/util/queue" "sigs.k8s.io/kueue/pkg/util/roletracker" @@ -96,3 +103,103 @@ func RecordWorkloadCreationLatency(ctx context.Context, job client.Object, jobKi customLabelValues := customLabels.LQGet(utilqueue.KeyFromWorkload(wl)) metrics.RecordWorkloadCreationLatency(jobKind, latency, customLabelValues, tracker) } + +// WorkloadShouldBeSuspended determines whether jobObj should be default suspended on creation +func WorkloadShouldBeSuspended(ctx context.Context, jobObj client.Object, k8sClient client.Client, + manageJobsWithoutQueueName bool, managedJobsNamespaceSelector labels.Selector) (bool, error) { + // Do not default suspend a job whose ancestor is already managed by Kueue + ancestorJob, err := FindAncestorJobManagedByKueue(ctx, k8sClient, jobObj, manageJobsWithoutQueueName) + if err != nil || ancestorJob != nil { + return false, err + } + + // Jobs with queue names whose parents are not managed by Kueue are default suspended + if QueueNameForObject(jobObj) != "" { + return true, nil + } + + // Logic for managing jobs without queue names. + if manageJobsWithoutQueueName { + if managedJobsNamespaceSelector != nil { + // Default suspend the job if the namespace selector matches + ns := corev1.Namespace{} + err := k8sClient.Get(ctx, client.ObjectKey{Name: jobObj.GetNamespace()}, &ns) + if err != nil { + return false, fmt.Errorf("failed to get namespace: %w", err) + } + return managedJobsNamespaceSelector.Matches(labels.Set(ns.GetLabels())), nil + } else { + // Namespace filtering is disabled; unconditionally default suspend + return true, nil + } + } + return false, nil +} + +// QueueName extracts and returns the LocalQueueName for the given GenericJob +// by inspecting its underlying object labels. +func QueueName(job GenericJob) kueue.LocalQueueName { + return QueueNameForObject(job.Object()) +} + +// QueueNameForObject extracts and returns the LocalQueueName from the specified object's +// labels using the "kueue.x-k8s.io/queue-name" label. +func QueueNameForObject(object client.Object) kueue.LocalQueueName { + return kueue.LocalQueueName(object.GetLabels()[controllerconstants.QueueLabel]) +} + +// MaximumExecutionTimeSeconds determines the maximum execution time in seconds +// for a given GenericJob based on its labels. +func MaximumExecutionTimeSeconds(job GenericJob) *int32 { + return MaximumExecutionTimeSecondsForObject(job.Object()) +} + +// MaximumExecutionTimeSecondsForObject extracts and parses the maximum execution +// time in seconds from the given object's labels. +func MaximumExecutionTimeSecondsForObject(object client.Object) *int32 { + strVal, found := object.GetLabels()[controllerconstants.MaxExecTimeSecondsLabel] + if !found { + return nil + } + + v, err := strconv.ParseInt(strVal, 10, 32) + if err != nil || v <= 0 { + return nil + } + + return new(int32(v)) +} + +// WorkloadPriorityClassName retrieves the value of the "kueue.x-k8s.io/priority-class" label +// from the given object. If the label is not present, it returns an empty string. +func WorkloadPriorityClassName(object client.Object) string { + if workloadPriorityClassLabel := object.GetLabels()[controllerconstants.WorkloadPriorityClassLabel]; workloadPriorityClassLabel != "" { + return workloadPriorityClassLabel + } + return "" +} + +// PrebuiltWorkloadFor retrieves the prebuilt workload name and its existence from the given GenericJob's labels. +func PrebuiltWorkloadFor(job GenericJob) (string, bool) { + name, found := job.Object().GetLabels()[controllerconstants.PrebuiltWorkloadLabel] + return name, found +} + +// NewWorkload creates a new Workload object with the specified name, +// associated object, pod sets, and label keys to copy. +func NewWorkload(name string, obj client.Object, podSets []kueue.PodSet, labelKeysToCopy []string) *kueue.Workload { + return &kueue.Workload{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: obj.GetNamespace(), + Labels: maps.FilterKeys(obj.GetLabels(), labelKeysToCopy), + Finalizers: []string{kueue.ResourceInUseFinalizerName}, + Annotations: admissioncheck.FilterProvReqAnnotations(obj.GetAnnotations()), + }, + Spec: kueue.WorkloadSpec{ + QueueName: QueueNameForObject(obj), + PodSets: podSets, + MaximumExecutionTimeSeconds: MaximumExecutionTimeSecondsForObject(obj), + }, + } +}