diff --git a/.buildkite/test-e2e.yml b/.buildkite/test-e2e.yml index 95e392bfe8c..63b8ec67f2f 100644 --- a/.buildkite/test-e2e.yml +++ b/.buildkite/test-e2e.yml @@ -55,9 +55,29 @@ - set -o pipefail - mkdir -p "$(pwd)/tmp" && export KUBERAY_TEST_OUTPUT_DIR=$(pwd)/tmp - echo "KUBERAY_TEST_OUTPUT_DIR=$$KUBERAY_TEST_OUTPUT_DIR" - - KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 30m -v ./test/e2erayservice 2>&1 | awk -f ../.buildkite/format.awk | tee $$KUBERAY_TEST_OUTPUT_DIR/gotest.log || (kubectl logs --tail -1 -l app.kubernetes.io/name=kuberay | tee $$KUBERAY_TEST_OUTPUT_DIR/kuberay-operator.log && cd $$KUBERAY_TEST_OUTPUT_DIR && find . -name "*.log" | tar -cf /artifact-mount/e2e-rayservice-log.tar -T - && exit 1) + - KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 30m -v -skip Suspend ./test/e2erayservice 2>&1 | awk -f ../.buildkite/format.awk | tee $$KUBERAY_TEST_OUTPUT_DIR/gotest.log || (kubectl logs --tail -1 -l app.kubernetes.io/name=kuberay | tee $$KUBERAY_TEST_OUTPUT_DIR/kuberay-operator.log && cd $$KUBERAY_TEST_OUTPUT_DIR && find . -name "*.log" | tar -cf /artifact-mount/e2e-rayservice-log.tar -T - && exit 1) - echo "--- END:e2e rayservice (nightly operator) tests finished" +- label: 'Test E2E rayservice suspend (nightly operator)' + instance_size: large + image: golang:1.26-bookworm + commands: + - source .buildkite/setup-env.sh + - kind create cluster --wait 900s --config ./ci/kind-config-buildkite.yml + - kubectl config set clusters.kind-kind.server https://docker:6443 + # Build nightly KubeRay operator image + - pushd ray-operator + - source ../.buildkite/build-start-operator.sh + - kubectl wait --timeout=90s --for=condition=Available=true deployment kuberay-operator + # Run suspend e2e tests and print KubeRay operator logs if tests fail + - echo "--- START:Running e2e rayservice suspend (nightly operator) tests" + - if [ -n "$${KUBERAY_TEST_RAY_IMAGE}" ]; then echo "Using Ray Image $${KUBERAY_TEST_RAY_IMAGE}"; fi + - set -o pipefail + - mkdir -p "$(pwd)/tmp" && export KUBERAY_TEST_OUTPUT_DIR=$(pwd)/tmp + - echo "KUBERAY_TEST_OUTPUT_DIR=$$KUBERAY_TEST_OUTPUT_DIR" + - KUBERAY_TEST_TIMEOUT_SHORT=1m KUBERAY_TEST_TIMEOUT_MEDIUM=5m KUBERAY_TEST_TIMEOUT_LONG=10m go test -timeout 30m -v -run Suspend ./test/e2erayservice 2>&1 | awk -f ../.buildkite/format.awk | tee $$KUBERAY_TEST_OUTPUT_DIR/gotest.log || (kubectl logs --tail -1 -l app.kubernetes.io/name=kuberay | tee $$KUBERAY_TEST_OUTPUT_DIR/kuberay-operator.log && cd $$KUBERAY_TEST_OUTPUT_DIR && find . -name "*.log" | tar -cf /artifact-mount/e2e-rayservice-suspend-log.tar -T - && exit 1) + - echo "--- END:e2e rayservice suspend (nightly operator) tests finished" + - label: 'Test RayService Incremental Upgrade E2E (nightly operator)' instance_size: large image: golang:1.26-bookworm diff --git a/docs/reference/api.md b/docs/reference/api.md index d55777d8d9e..ea8d6a900d3 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -505,6 +505,7 @@ _Appears in:_ | `serveConfigV2` _string_ | Important: Run "make" to regenerate code after modifying this file
Defines the applications and deployments to deploy, should be a YAML multi-line scalar string. | | | | `rayClusterConfig` _[RayClusterSpec](#rayclusterspec)_ | | | | | `excludeHeadPodFromServeSvc` _boolean_ | If the field is set to true, the value of the label `ray.io/serve` on the head Pod should always be false.
Therefore, the head Pod's endpoint will not be added to the Kubernetes Serve service. | | | +| `suspend` _boolean_ | Suspend indicates whether the RayService should suspend its execution. When set to true,
all Kubernetes resources owned by the RayService controller (RayClusters, Kubernetes
Services, Gateway, HTTPRoute) will be deleted. Setting it back to false will allow the
RayService controller to recreate the resources. | | | diff --git a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml index 602bfdfb61a..754917291e1 100644 --- a/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml +++ b/helm-chart/kuberay-operator/crds/ray.io_rayservices.yaml @@ -8618,6 +8618,8 @@ spec: serviceUnhealthySecondThreshold: format: int32 type: integer + suspend: + type: boolean upgradeStrategy: properties: clusterUpgradeOptions: diff --git a/ray-operator/Makefile b/ray-operator/Makefile index 749f88afee5..e3361567fe9 100644 --- a/ray-operator/Makefile +++ b/ray-operator/Makefile @@ -81,8 +81,12 @@ test-e2e-autoscaler: manifests fmt vet ## Run e2e autoscaler tests. go test -timeout 30m -v $(WHAT) test-e2e-rayservice: WHAT ?= ./test/e2erayservice -test-e2e-rayservice: manifests fmt vet ## Run e2e RayService tests. - go test -timeout 30m -v $(WHAT) +test-e2e-rayservice: manifests fmt vet ## Run e2e RayService tests (excluding suspend tests, which have their own target). + go test -timeout 30m -v -skip Suspend $(WHAT) + +test-e2e-rayservice-suspend: WHAT ?= ./test/e2erayservice +test-e2e-rayservice-suspend: manifests fmt vet ## Run e2e RayService suspend tests. + go test -timeout 30m -v -run Suspend $(WHAT) test-e2e-upgrade: WHAT ?= ./test/e2eupgrade test-e2e-upgrade: manifests fmt vet ## Run e2e operator upgrade tests. diff --git a/ray-operator/apis/ray/v1/rayservice_types.go b/ray-operator/apis/ray/v1/rayservice_types.go index c69b59c2989..db68aec6e76 100644 --- a/ray-operator/apis/ray/v1/rayservice_types.go +++ b/ray-operator/apis/ray/v1/rayservice_types.go @@ -122,6 +122,12 @@ type RayServiceSpec struct { // Therefore, the head Pod's endpoint will not be added to the Kubernetes Serve service. // +optional ExcludeHeadPodFromServeSvc bool `json:"excludeHeadPodFromServeSvc,omitempty"` + // Suspend indicates whether the RayService should suspend its execution. When set to true, + // all Kubernetes resources owned by the RayService controller (RayClusters, Kubernetes + // Services, Gateway, HTTPRoute) will be deleted. Setting it back to false will allow the + // RayService controller to recreate the resources. + // +optional + Suspend bool `json:"suspend,omitempty"` } // RayServiceStatuses defines the observed state of RayService @@ -209,6 +215,11 @@ const ( UpgradeInProgress RayServiceConditionType = "UpgradeInProgress" // RollbackInProgress means the RayService is currently rolling back an in-progress upgrade to the original cluster state. RollbackInProgress RayServiceConditionType = "RollbackInProgress" + // RayServiceSuspending means the RayService is in the middle of deleting its owned resources in response to Spec.Suspend. + // Once entered, the suspend operation completes atomically regardless of later changes to Spec.Suspend. + RayServiceSuspending RayServiceConditionType = "Suspending" + // RayServiceSuspended means all resources owned by the RayService controller have been deleted and the RayService is suspended. + RayServiceSuspended RayServiceConditionType = "Suspended" ) const ( @@ -221,6 +232,9 @@ const ( NoActiveCluster RayServiceConditionReason = "NoActiveCluster" RayServiceValidationFailed RayServiceConditionReason = "ValidationFailed" TargetClusterChanged RayServiceConditionReason = "TargetClusterChanged" + SuspendRequested RayServiceConditionReason = "SuspendRequested" + SuspendInProgress RayServiceConditionReason = "SuspendInProgress" + SuspendComplete RayServiceConditionReason = "SuspendComplete" ) // +kubebuilder:object:root=true diff --git a/ray-operator/config/crd/bases/ray.io_rayservices.yaml b/ray-operator/config/crd/bases/ray.io_rayservices.yaml index 602bfdfb61a..754917291e1 100644 --- a/ray-operator/config/crd/bases/ray.io_rayservices.yaml +++ b/ray-operator/config/crd/bases/ray.io_rayservices.yaml @@ -8618,6 +8618,8 @@ spec: serviceUnhealthySecondThreshold: format: int32 type: integer + suspend: + type: boolean upgradeStrategy: properties: clusterUpgradeOptions: diff --git a/ray-operator/controllers/ray/rayservice_controller.go b/ray-operator/controllers/ray/rayservice_controller.go index 55ad78a3dc5..ea5ddb56bef 100644 --- a/ray-operator/controllers/ray/rayservice_controller.go +++ b/ray-operator/controllers/ray/rayservice_controller.go @@ -170,6 +170,17 @@ func (r *RayServiceReconciler) Reconcile(ctx context.Context, request ctrl.Reque return ctrl.Result{}, nil } + handled, result, err := r.handleSuspend(ctx, rayServiceInstance) + if updateErr := r.updateStatusIfChanged(ctx, originalRayServiceInstance, rayServiceInstance); updateErr != nil && err == nil { + err = updateErr + if (result == ctrl.Result{}) { + result = ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration} + } + } + if handled || err != nil { + return result, err + } + // If the RayService has timed out during initialization, skip the rest of the reconciliation. // The service is in a terminal failure state - only cleanup (above) is needed. // The user must delete and recreate the RayService to recover. @@ -338,6 +349,191 @@ func validateRayService(ctx context.Context, rayServiceInstance *rayv1.RayServic return "", nil } +// handleSuspend implements the Spec.Suspend lifecycle. It mutates +// rayServiceInstance.Status in-place; the caller persists the changes via a +// single Status().Update. +// +// Returns (handled, result, err): +// - handled=true: handleSuspend has fully serviced this reconcile; the caller +// must persist status changes and return (result, err) without running the +// rest of the reconcile loop. +// - handled=false: the RayService is not (or no longer) suspended; the +// caller should continue with the rest of the reconcile loop, which will +// persist any staged status mutations at its own status-update site. +// +// State machine: +// +// (no suspend) --Spec.Suspend=true--> Suspending --owned resources deleted--> Suspended --Spec.Suspend=false--> (no suspend) +// +// Atomicity comes from a persisted Suspending condition as the commit point. +// The first reconcile that observes Spec.Suspend=true only stages the status +// transition (Suspending=True + reset of ActiveServiceStatus, +// PendingServiceStatus, NumServeEndpoints, ServiceStatus) in the same status +// update; deletion runs on the next reconcile, once Suspending is durable. If +// a later deletion attempt errors out or Spec.Suspend is flipped back to +// false, Suspending stays True in storage and subsequent reconciles continue +// the cleanup. +func (r *RayServiceReconciler) handleSuspend(ctx context.Context, rayServiceInstance *rayv1.RayService) (bool, ctrl.Result, error) { + logger := ctrl.LoggerFrom(ctx) + + isSuspending := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RayServiceSuspending)) + isSuspended := meta.IsStatusConditionTrue(rayServiceInstance.Status.Conditions, string(rayv1.RayServiceSuspended)) + + // Case 1: already fully suspended. + if isSuspended { + if !rayServiceInstance.Spec.Suspend { + logger.Info("Spec.Suspend is false; exiting Suspended state and resuming reconcile") + meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RayServiceSuspended)) + return false, ctrl.Result{}, nil + } + // Stay suspended; nothing to reconcile. + return true, ctrl.Result{}, nil + } + + // Case 2: Spec.Suspend just transitioned to true. Stage the status + // transition (Suspending=True + reset status fields) and return so the + // caller can persist it. Deletion runs on the next reconcile once + // Suspending is durable. + if !isSuspending { + if !rayServiceInstance.Spec.Suspend { + return false, ctrl.Result{}, nil + } + logger.Info("Spec.Suspend is true; committing transition to Suspending state") + setCondition(rayServiceInstance, rayv1.RayServiceSuspending, metav1.ConditionTrue, rayv1.SuspendRequested, + "Spec.Suspend is true; will delete RayClusters, Services, Gateway, and HTTPRoute owned by this RayService.") + setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionFalse, rayv1.SuspendInProgress, "RayService is suspending.") + meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.UpgradeInProgress)) + meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RollbackInProgress)) + rayServiceInstance.Status.ActiveServiceStatus = rayv1.RayServiceStatus{} + rayServiceInstance.Status.PendingServiceStatus = rayv1.RayServiceStatus{} + rayServiceInstance.Status.NumServeEndpoints = 0 + rayServiceInstance.Status.ServiceStatus = rayv1.NotRunning + return true, ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil + } + + // Case 3: Suspending is committed in storage. Delete owned resources. + // Atomic: ignore Spec.Suspend here — once Suspending is persisted, the + // deletion always runs to completion. + allDeleted, err := r.deleteRayServiceOwnedResources(ctx, rayServiceInstance) + if err != nil { + return true, ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, err + } + + if !allDeleted { + setCondition(rayServiceInstance, rayv1.RayServiceSuspending, metav1.ConditionTrue, rayv1.SuspendInProgress, + "Waiting for RayClusters, Services, Gateway, and HTTPRoute owned by this RayService to be deleted.") + return true, ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil + } + + // All resources deleted: transition to Suspended. Requeue so that the + // next reconcile observes Spec.Suspend; this matters when the user + // flipped Spec.Suspend back to false mid-suspend (atomic completion put + // us here despite Spec.Suspend=false), since the status-only update + // below would not otherwise wake the controller up. + logger.Info("All RayService-owned resources deleted; transitioning to Suspended") + meta.RemoveStatusCondition(&rayServiceInstance.Status.Conditions, string(rayv1.RayServiceSuspending)) + setCondition(rayServiceInstance, rayv1.RayServiceSuspended, metav1.ConditionTrue, rayv1.SuspendComplete, "All owned resources have been deleted.") + setCondition(rayServiceInstance, rayv1.RayServiceReady, metav1.ConditionFalse, rayv1.SuspendComplete, "RayService is suspended.") + return true, ctrl.Result{RequeueAfter: ServiceDefaultRequeueDuration}, nil +} + +// deleteRayServiceOwnedResources deletes every Kubernetes resource that the +// RayService controller owns: RayClusters, head/serve Services, Gateway and +// HTTPRoute. Returns true when nothing remained to be deleted. +func (r *RayServiceReconciler) deleteRayServiceOwnedResources(ctx context.Context, rayServiceInstance *rayv1.RayService) (bool, error) { + logger := ctrl.LoggerFrom(ctx) + allDeleted := true + + // RayClusters. + rayClusterList := rayv1.RayClusterList{} + if err := r.List(ctx, &rayClusterList, common.RayServiceRayClustersAssociationOptions(rayServiceInstance).ToListOptions()...); err != nil { + return false, err + } + for i := range rayClusterList.Items { + cluster := &rayClusterList.Items[i] + allDeleted = false + if !cluster.DeletionTimestamp.IsZero() { + continue + } + logger.Info("Deleting RayCluster for suspend", "name", cluster.Name) + if err := r.Delete(ctx, cluster, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) { + r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteRayCluster), + "Failed to delete the RayCluster %s/%s during suspend: %v", cluster.Namespace, cluster.Name, err) + return false, err + } + r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeNormal, string(utils.DeletedRayCluster), + "Deleted the RayCluster %s/%s during suspend", cluster.Namespace, cluster.Name) + } + + // Kubernetes Services (head + serve, including per-cluster serve services in incremental upgrade). + svcList := corev1.ServiceList{} + if err := r.List(ctx, &svcList, common.RayServiceRayClustersAssociationOptions(rayServiceInstance).ToListOptions()...); err != nil { + return false, err + } + for i := range svcList.Items { + svc := &svcList.Items[i] + allDeleted = false + if !svc.DeletionTimestamp.IsZero() { + continue + } + logger.Info("Deleting Service for suspend", "name", svc.Name) + if err := r.Delete(ctx, svc, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) { + r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteService), + "Failed to delete the Service %s/%s during suspend: %v", svc.Namespace, svc.Name, err) + return false, err + } + } + + // Gateway and HTTPRoute live behind the RayServiceIncrementalUpgrade + // feature gate: the operator only registers gateway-api types in its + // scheme and ever creates these objects when the gate is on. Skip + // otherwise so a Get doesn't fail with "no kind is registered". + if features.Enabled(features.RayServiceIncrementalUpgrade) { + gateway := &gwv1.Gateway{} + if err := r.Get(ctx, common.RayServiceGatewayNamespacedName(rayServiceInstance), gateway); err == nil { + allDeleted = false + if gateway.DeletionTimestamp.IsZero() { + logger.Info("Deleting Gateway for suspend", "name", gateway.Name) + if err := r.Delete(ctx, gateway, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) { + r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteGateway), + "Failed to delete the Gateway %s/%s during suspend: %v", gateway.Namespace, gateway.Name, err) + return false, err + } + } + } else if !errors.IsNotFound(err) && !meta.IsNoMatchError(err) { + return false, err + } + + httpRoute := &gwv1.HTTPRoute{} + if err := r.Get(ctx, common.RayServiceHTTPRouteNamespacedName(rayServiceInstance), httpRoute); err == nil { + allDeleted = false + if httpRoute.DeletionTimestamp.IsZero() { + logger.Info("Deleting HTTPRoute for suspend", "name", httpRoute.Name) + if err := r.Delete(ctx, httpRoute, client.PropagationPolicy(metav1.DeletePropagationBackground)); err != nil && !errors.IsNotFound(err) { + r.Recorder.Eventf(rayServiceInstance, corev1.EventTypeWarning, string(utils.FailedToDeleteHTTPRoute), + "Failed to delete the HTTPRoute %s/%s during suspend: %v", httpRoute.Namespace, httpRoute.Name, err) + return false, err + } + } + } else if !errors.IsNotFound(err) && !meta.IsNoMatchError(err) { + return false, err + } + } + + return allDeleted, nil +} + +// updateStatusIfChanged persists rayServiceInstance.Status if it differs from +// originalRayService.Status. It mirrors the status-update path used at the end +// of Reconcile so that handleSuspend can finalize a reconcile early. +func (r *RayServiceReconciler) updateStatusIfChanged(ctx context.Context, originalRayService, rayServiceInstance *rayv1.RayService) error { + if !utils.InconsistentRayServiceStatuses(originalRayService.Status, rayServiceInstance.Status) { + return nil + } + rayServiceInstance.Status.LastUpdateTime = &metav1.Time{Time: time.Now()} + return r.Status().Update(ctx, rayServiceInstance) +} + func (r *RayServiceReconciler) reconcileServicesToReadyCluster(ctx context.Context, rayServiceInstance *rayv1.RayService, rayClusterInstance *rayv1.RayCluster) (*corev1.Service, *corev1.Service, error) { // Create K8s services if they don't exist. If they do exist, update the services to point to the RayCluster passed in. headSvc, err := r.reconcileServices(ctx, rayServiceInstance, rayClusterInstance, utils.HeadService) @@ -1140,14 +1336,6 @@ func shouldUpdateCluster(rayServiceInstance *rayv1.RayService, cluster *rayv1.Ra } } - if ptr.Deref(rayServiceInstance.Spec.RayClusterSpec.Suspend, false) != ptr.Deref(cluster.Spec.Suspend, false) { - // Suspend toggles (e.g. from Kueue admitting or preempting the workload) must be - // applied in-place to the existing RayCluster. Otherwise the hash comparison below - // selects neither the update nor the new-cluster path, and the cluster stays - // suspended with no head pod (ray-project/kuberay#4686). - return true - } - if isClusterSpecHashEqual(rayServiceInstance, cluster, false) { // The RayCluster spec matches the cluster spec in the RayService. No need to update the cluster. return false @@ -1160,9 +1348,10 @@ func shouldUpdateCluster(rayServiceInstance *rayv1.RayService, cluster *rayv1.Ra func isClusterSpecHashEqual(rayServiceInstance *rayv1.RayService, cluster *rayv1.RayCluster, partial bool) bool { // If `partial` is true, only compare the first `len(cluster.Spec.WorkerGroupSpecs)` worker groups in the CR spec. clusterHash := cluster.ObjectMeta.Annotations[utils.HashWithoutReplicasAndWorkersToDeleteKey] + goalClusterSpec := rayClusterSpecForHashing(rayServiceInstance) goalClusterHash := "" if !partial { - goalClusterHash, _ = utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec) + goalClusterHash, _ = utils.GenerateHashWithoutReplicasAndWorkersToDelete(*goalClusterSpec) } else { // If everything is identical except for the Replicas and WorkersToDelete of // the existing workergroups, and one or more new workergroups are added at the end, then update the cluster. @@ -1170,11 +1359,10 @@ func isClusterSpecHashEqual(rayServiceInstance *rayv1.RayService, cluster *rayv1 if err != nil { return true } - goalNumWorkerGroups := len(rayServiceInstance.Spec.RayClusterSpec.WorkerGroupSpecs) + goalNumWorkerGroups := len(goalClusterSpec.WorkerGroupSpecs) if goalNumWorkerGroups >= clusterNumWorkerGroups { // Remove the new workergroup(s) from the end before calculating the hash. - goalClusterSpec := rayServiceInstance.Spec.RayClusterSpec.DeepCopy() goalClusterSpec.WorkerGroupSpecs = goalClusterSpec.WorkerGroupSpecs[:clusterNumWorkerGroups] // Generate the hash of the old worker group specs. @@ -1235,8 +1423,14 @@ func modifyRayCluster(ctx context.Context, currentCluster, goalCluster *rayv1.Ra } logger.Info("updateRayClusterInstance", "Name", goalCluster.Name, "Namespace", goalCluster.Namespace) - // Update the fetched RayCluster with new changes + // Update the fetched RayCluster with new changes. Suspend is propagated + // from the RayService to the RayCluster only at creation time; afterwards + // the RayCluster's Suspend is delegated to Kueue, so we preserve the + // existing cluster's Suspend here instead of letting the goal spec + // overwrite it. + existingSuspend := currentCluster.Spec.Suspend currentCluster.Spec = goalCluster.Spec + currentCluster.Spec.Suspend = existingSuspend // Update the labels and annotations currentCluster.Labels = goalCluster.Labels @@ -1262,6 +1456,20 @@ func (r *RayServiceReconciler) createRayClusterInstance(ctx context.Context, ray return rayClusterInstance, nil } +// rayClusterSpecForHashing returns a copy of the RayService's RayClusterSpec +// to use for hash comparisons. Fields that should not trigger reconciliation +// of the underlying RayCluster are cleared here. Suspend is excluded because +// the RayService controller propagates Suspend to the RayCluster only at +// creation time; once the RayCluster exists, Suspend is delegated to Kueue +// (or whichever external controller owns the RayCluster's queueing), and +// later changes to RayService.Spec.RayClusterSpec.Suspend must not trigger +// an in-place update or a new cluster preparation. +func rayClusterSpecForHashing(rayService *rayv1.RayService) *rayv1.RayClusterSpec { + spec := rayService.Spec.RayClusterSpec.DeepCopy() + spec.Suspend = nil + return spec +} + func constructRayClusterForRayService(rayService *rayv1.RayService, rayClusterName string, scheme *runtime.Scheme) (*rayv1.RayCluster, error) { var err error rayClusterLabel := make(map[string]string) @@ -1271,7 +1479,7 @@ func constructRayClusterForRayService(rayService *rayv1.RayService, rayClusterNa rayClusterAnnotations := make(map[string]string) maps.Copy(rayClusterAnnotations, rayService.Annotations) - rayClusterAnnotations[utils.HashWithoutReplicasAndWorkersToDeleteKey], err = utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayService.Spec.RayClusterSpec) + rayClusterAnnotations[utils.HashWithoutReplicasAndWorkersToDeleteKey], err = utils.GenerateHashWithoutReplicasAndWorkersToDelete(*rayClusterSpecForHashing(rayService)) if err != nil { return nil, err } @@ -2091,7 +2299,7 @@ func shouldCompleteIncrementalRollback( func (r *RayServiceReconciler) reconcileRollbackState(ctx context.Context, rayServiceInstance *rayv1.RayService, activeCluster, pendingCluster *rayv1.RayCluster) error { logger := ctrl.LoggerFrom(ctx) - targetHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(rayServiceInstance.Spec.RayClusterSpec) + targetHash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(*rayClusterSpecForHashing(rayServiceInstance)) if err != nil { return fmt.Errorf("failed to generate hash for goal cluster spec: %w", err) } diff --git a/ray-operator/controllers/ray/rayservice_controller_unit_test.go b/ray-operator/controllers/ray/rayservice_controller_unit_test.go index 1b4f30c24ad..c997383ab69 100644 --- a/ray-operator/controllers/ray/rayservice_controller_unit_test.go +++ b/ray-operator/controllers/ray/rayservice_controller_unit_test.go @@ -2762,78 +2762,6 @@ func TestReconcileRollbackState(t *testing.T) { } } -// TestShouldUpdateCluster_SuspendFlip covers ray-project/kuberay#4686: when Kueue -// toggles RayService.Spec.RayClusterSpec.Suspend, the existing RayCluster must be -// updated in-place. Previously shouldUpdateCluster returned false because the -// cluster hash annotation encodes the old Suspend value, leaving the cluster -// stuck suspended with no head pod. -func TestShouldUpdateCluster_SuspendFlip(t *testing.T) { - namespace := "test-namespace" - - newRayService := func(suspend *bool) *rayv1.RayService { - return &rayv1.RayService{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-service", - Namespace: namespace, - }, - Spec: rayv1.RayServiceSpec{ - RayClusterSpec: rayv1.RayClusterSpec{ - RayVersion: "2.9.0", - Suspend: suspend, - }, - }, - } - } - - // newClusterFrom mirrors the annotation layout produced by - // constructRayClusterForRayService so the hash reflects the cluster's - // actual spec (including its Suspend value). - newClusterFrom := func(t *testing.T, service *rayv1.RayService, suspend *bool) *rayv1.RayCluster { - t.Helper() - clusterSpec := service.Spec.RayClusterSpec.DeepCopy() - clusterSpec.Suspend = suspend - hash, err := utils.GenerateHashWithoutReplicasAndWorkersToDelete(*clusterSpec) - require.NoError(t, err) - return &rayv1.RayCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-cluster", - Namespace: namespace, - Annotations: map[string]string{ - utils.HashWithoutReplicasAndWorkersToDeleteKey: hash, - utils.NumWorkerGroupsKey: strconv.Itoa(len(clusterSpec.WorkerGroupSpecs)), - utils.KubeRayVersion: utils.KUBERAY_VERSION, - }, - }, - Spec: *clusterSpec, - } - } - - tests := []struct { - name string - serviceSuspend *bool - clusterSuspend *bool - isActiveCluster bool - expect bool - }{ - {"pending unsuspended by Kueue: true -> false", new(false), new(true), false, true}, - {"pending suspended by Kueue: false -> true", new(true), new(false), false, true}, - {"active unsuspended by Kueue: true -> false", new(false), new(true), true, true}, - {"active suspended by Kueue: false -> true", new(true), new(false), true, true}, - {"no change, both nil", nil, nil, false, false}, - {"no change, both false", new(false), new(false), false, false}, - {"no change, both true", new(true), new(true), false, false}, - {"nil vs false treated equal", nil, new(false), false, false}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - service := newRayService(tt.serviceSuspend) - cluster := newClusterFrom(t, service, tt.clusterSuspend) - assert.Equal(t, tt.expect, shouldUpdateCluster(service, cluster, tt.isActiveCluster)) - }) - } -} - // headReadyCluster is a helper to construct a RayCluster with a specific HeadPodReady condition. func headReadyCluster(ready bool) *rayv1.RayCluster { status := metav1.ConditionFalse diff --git a/ray-operator/controllers/ray/utils/constant.go b/ray-operator/controllers/ray/utils/constant.go index 7a185557eec..7517a05b526 100644 --- a/ray-operator/controllers/ray/utils/constant.go +++ b/ray-operator/controllers/ray/utils/constant.go @@ -405,8 +405,11 @@ const ( FailedToUpdateTargetCapacity K8sEventType = "FailedToUpdateTargetCapacity" FailedToCreateGateway K8sEventType = "FailedToCreateGateway" FailedToUpdateGateway K8sEventType = "FailedToUpdateGateway" + FailedToDeleteGateway K8sEventType = "FailedToDeleteGateway" FailedToCreateHTTPRoute K8sEventType = "FailedToCreateHTTPRoute" FailedToUpdateHTTPRoute K8sEventType = "FailedToUpdateHTTPRoute" + FailedToDeleteHTTPRoute K8sEventType = "FailedToDeleteHTTPRoute" + FailedToDeleteService K8sEventType = "FailedToDeleteService" // Generic Pod event list DeletedPod K8sEventType = "DeletedPod" diff --git a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicespec.go b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicespec.go index 701d02d0c6c..6e1455c41f2 100644 --- a/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicespec.go +++ b/ray-operator/pkg/client/applyconfiguration/ray/v1/rayservicespec.go @@ -36,6 +36,11 @@ type RayServiceSpecApplyConfiguration struct { // If the field is set to true, the value of the label `ray.io/serve` on the head Pod should always be false. // Therefore, the head Pod's endpoint will not be added to the Kubernetes Serve service. ExcludeHeadPodFromServeSvc *bool `json:"excludeHeadPodFromServeSvc,omitempty"` + // Suspend indicates whether the RayService should suspend its execution. When set to true, + // all Kubernetes resources owned by the RayService controller (RayClusters, Kubernetes + // Services, Gateway, HTTPRoute) will be deleted. Setting it back to false will allow the + // RayService controller to recreate the resources. + Suspend *bool `json:"suspend,omitempty"` } // RayServiceSpecApplyConfiguration constructs a declarative configuration of the RayServiceSpec type for use with @@ -115,3 +120,11 @@ func (b *RayServiceSpecApplyConfiguration) WithExcludeHeadPodFromServeSvc(value b.ExcludeHeadPodFromServeSvc = &value return b } + +// WithSuspend sets the Suspend field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Suspend field is set to the value of the last call. +func (b *RayServiceSpecApplyConfiguration) WithSuspend(value bool) *RayServiceSpecApplyConfiguration { + b.Suspend = &value + return b +} diff --git a/ray-operator/test/e2erayservice/rayservice_suspend_test.go b/ray-operator/test/e2erayservice/rayservice_suspend_test.go new file mode 100644 index 00000000000..70bc40f2bc2 --- /dev/null +++ b/ray-operator/test/e2erayservice/rayservice_suspend_test.go @@ -0,0 +1,324 @@ +package e2erayservice + +import ( + "fmt" + "testing" + "time" + + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1" + . "github.com/ray-project/kuberay/ray-operator/test/support" +) + +// TestRayServiceSuspendResume covers the happy path: set Spec.Suspend=true on a +// Running RayService, observe that all owned resources are deleted and the +// status reaches Suspended, then set Spec.Suspend=false and observe that the +// service is recreated and serves traffic again. +func TestRayServiceSuspendResume(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + rayServiceName := "rayservice-suspend" + + rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(RayServiceSampleYamlApplyConfiguration()) + rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be Ready", rayService.Namespace, rayService.Name) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + + curlPod, err := CreateCurlPod(g, test, "curl-pod", "curl-container", namespace.Name) + g.Expect(err).NotTo(HaveOccurred()) + + stdout, _ := CurlRayServicePod(test, rayService, curlPod, "curl-container", "/fruit", `["MANGO", 2]`) + g.Expect(stdout.String()).To(Equal("6")) + + LogWithTimestamp(test.T(), "Setting Spec.Suspend=true") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = true + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for the Suspended condition to be True") + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceSuspended, BeTrue())) + + LogWithTimestamp(test.T(), "Asserting status fields are reset and all owned resources are gone") + g.Eventually(func(gg Gomega) { + rs, err := GetRayService(test, namespace.Name, rayServiceName) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(rs.Status.ActiveServiceStatus.RayClusterName).To(BeEmpty()) + gg.Expect(rs.Status.PendingServiceStatus.RayClusterName).To(BeEmpty()) + gg.Expect(rs.Status.NumServeEndpoints).To(BeEquivalentTo(0)) + gg.Expect(rs.Status.ServiceStatus).To(BeEquivalentTo("")) + gg.Expect(IsRayServiceReady(rs)).To(BeFalse()) + + rcList, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).List(test.Ctx(), metav1.ListOptions{ + LabelSelector: utils.RayOriginatedFromCRNameLabelKey + "=" + rayServiceName, + }) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(rcList.Items).To(BeEmpty()) + + svcList, err := test.Client().Core().CoreV1().Services(namespace.Name).List(test.Ctx(), metav1.ListOptions{ + LabelSelector: utils.RayOriginatedFromCRNameLabelKey + "=" + rayServiceName, + }) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(svcList.Items).To(BeEmpty()) + }, TestTimeoutMedium).Should(Succeed()) + + LogWithTimestamp(test.T(), "Setting Spec.Suspend=false to resume") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = false + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be Ready again", rayService.Namespace, rayService.Name) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutShort). + Should(WithTransform(IsRayServiceSuspended, BeFalse())) + + LogWithTimestamp(test.T(), "Sending requests to verify the resumed RayService serves traffic again") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + // --connect-timeout/--max-time keep each curl attempt short so Eventually + // can actually retry — without them a single attempt can hang on TCP + // retransmits longer than TestTimeoutShort, fooling Eventually into + // reporting a timeout after one attempt. + curlCmd := []string{ + "curl", "-sS", "--connect-timeout", "3", "--max-time", "5", + "-X", "POST", "-H", "Content-Type: application/json", + fmt.Sprintf("%s-serve-svc.%s.svc.cluster.local:8000/fruit", rayService.Name, rayService.Namespace), + "-d", `["MANGO", 2]`, + } + g.Eventually(func(gg Gomega) { + stdout, _, err := ExecPodCmdWithError(test, curlPod, "curl-container", curlCmd) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(stdout.String()).To(Equal("6")) + }, TestTimeoutShort).Should(Succeed()) +} + +// TestRayServiceSuspendAtomic verifies that once the Suspending condition has +// been committed, the suspend operation completes atomically regardless of +// subsequent flips on Spec.Suspend. The pattern mirrors the RayJob atomic +// suspend test: we pin the underlying RayCluster with a synthetic finalizer +// so deletion can't complete, then flip Spec.Suspend back and forth and +// assert via Consistently that Suspending stays True throughout. After the +// finalizer is removed, the deletion drains and the RayService comes back up. +func TestRayServiceSuspendAtomic(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + rayServiceName := "rayservice-suspend-atomic" + const deletionBlocker = "ray.io/test-suspend-block" + + rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(RayServiceSampleYamlApplyConfiguration()) + rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be Ready", rayService.Namespace, rayService.Name) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + originalClusterName := rayService.Status.ActiveServiceStatus.RayClusterName + g.Expect(originalClusterName).NotTo(BeEmpty()) + + LogWithTimestamp(test.T(), "Adding a synthetic finalizer to RayCluster %s so deletion blocks", originalClusterName) + rayCluster, err := GetRayCluster(test, namespace.Name, originalClusterName) + g.Expect(err).NotTo(HaveOccurred()) + rayCluster.Finalizers = append(rayCluster.Finalizers, deletionBlocker) + _, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Update(test.Ctx(), rayCluster, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Setting Spec.Suspend=true to enter Suspending") + rayService.Spec.Suspend = true + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for the Suspending condition to be True") + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutShort). + Should(WithTransform(IsRayServiceSuspending, BeTrue())) + + // Flipping Spec.Suspend back to false while a suspend is in flight must + // not unwind the Suspending state — the deletion has to complete first. + LogWithTimestamp(test.T(), "Flipping Spec.Suspend=false; Suspending must stay True") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = false + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Consistently(RayService(test, rayService.Namespace, rayService.Name), 5*time.Second, 500*time.Millisecond). + Should(WithTransform(IsRayServiceSuspending, BeTrue())) + + // Flipping back to true is also a no-op — Suspending is already in flight. + LogWithTimestamp(test.T(), "Flipping Spec.Suspend=true; Suspending must stay True") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = true + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + g.Consistently(RayService(test, rayService.Namespace, rayService.Name), 5*time.Second, 500*time.Millisecond). + Should(WithTransform(IsRayServiceSuspending, BeTrue())) + + // Settle on Spec.Suspend=false so the controller resumes once the + // finalizer is removed and deletion finishes. + LogWithTimestamp(test.T(), "Settling on Spec.Suspend=false and removing the deletion-blocker finalizer") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = false + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + rayCluster, err = GetRayCluster(test, namespace.Name, originalClusterName) + g.Expect(err).NotTo(HaveOccurred()) + rayCluster.Finalizers = nil + _, err = test.Client().Ray().RayV1().RayClusters(namespace.Name).Update(test.Ctx(), rayCluster, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Original RayCluster %s must be deleted after the finalizer is removed", originalClusterName) + g.Eventually(func() error { + _, err := GetRayCluster(test, namespace.Name, originalClusterName) + return err + }, TestTimeoutMedium).Should(WithTransform(errors.IsNotFound, BeTrue())) + + LogWithTimestamp(test.T(), "RayService should become Ready again, backed by a different RayCluster") + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayService.Status.ActiveServiceStatus.RayClusterName).NotTo(BeEmpty()) + g.Expect(rayService.Status.ActiveServiceStatus.RayClusterName).NotTo(Equal(originalClusterName)) +} + +// TestRayServiceCreatedSuspended verifies that a RayService created with +// Spec.Suspend=true never spins up its owned resources and reaches Suspended +// directly. Flipping Spec.Suspend=false afterwards must then bring the service +// up normally. +func TestRayServiceCreatedSuspended(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + rayServiceName := "rayservice-born-suspended" + + rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name). + WithSpec(RayServiceSampleYamlApplyConfiguration().WithSuspend(true)) + rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for the Suspended condition to be True") + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceSuspended, BeTrue())) + + LogWithTimestamp(test.T(), "Asserting no owned resources were ever created and status is empty") + g.Consistently(func(gg Gomega) { + rcList, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).List(test.Ctx(), metav1.ListOptions{ + LabelSelector: utils.RayOriginatedFromCRNameLabelKey + "=" + rayServiceName, + }) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(rcList.Items).To(BeEmpty()) + + svcList, err := test.Client().Core().CoreV1().Services(namespace.Name).List(test.Ctx(), metav1.ListOptions{ + LabelSelector: utils.RayOriginatedFromCRNameLabelKey + "=" + rayServiceName, + }) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(svcList.Items).To(BeEmpty()) + + rs, err := GetRayService(test, namespace.Name, rayServiceName) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(rs.Status.ActiveServiceStatus.RayClusterName).To(BeEmpty()) + gg.Expect(rs.Status.PendingServiceStatus.RayClusterName).To(BeEmpty()) + gg.Expect(rs.Status.NumServeEndpoints).To(BeEquivalentTo(0)) + }, TestTimeoutShort/2).Should(Succeed()) + + LogWithTimestamp(test.T(), "Setting Spec.Suspend=false; RayService should now come up normally") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = false + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutShort). + Should(WithTransform(IsRayServiceSuspended, BeFalse())) +} + +// TestRayServiceSuspendDuringUpgrade triggers a zero-downtime upgrade so both +// the active and pending RayClusters exist, then sets Spec.Suspend=true. Both +// clusters must be deleted and the RayService must transition to Suspended. +// After resuming, the new spec is applied and the service serves traffic. +func TestRayServiceSuspendDuringUpgrade(t *testing.T) { + test := With(t) + g := NewWithT(t) + + namespace := test.NewTestNamespace() + rayServiceName := "rayservice-suspend-upgrade" + + rayServiceAC := rayv1ac.RayService(rayServiceName, namespace.Name).WithSpec(RayServiceSampleYamlApplyConfiguration()) + rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting for RayService %s/%s to be Ready", rayService.Namespace, rayService.Name) + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + + LogWithTimestamp(test.T(), "Triggering a zero-downtime upgrade by bumping RayVersion") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.RayClusterSpec.RayVersion = rayService.Spec.RayClusterSpec.RayVersion + "-upgrade" + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Waiting until a pending RayCluster is created") + g.Eventually(func() string { + rs, err := GetRayService(test, namespace.Name, rayServiceName) + if err != nil { + return "" + } + return rs.Status.PendingServiceStatus.RayClusterName + }, TestTimeoutMedium).ShouldNot(BeEmpty()) + + LogWithTimestamp(test.T(), "Setting Spec.Suspend=true while upgrade is in progress") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = true + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + LogWithTimestamp(test.T(), "Both active and pending RayClusters must be deleted and Suspended must reach True") + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceSuspended, BeTrue())) + g.Eventually(func(gg Gomega) { + rcList, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).List(test.Ctx(), metav1.ListOptions{ + LabelSelector: utils.RayOriginatedFromCRNameLabelKey + "=" + rayServiceName, + }) + gg.Expect(err).NotTo(HaveOccurred()) + gg.Expect(rcList.Items).To(BeEmpty()) + }, TestTimeoutMedium).Should(Succeed()) + + LogWithTimestamp(test.T(), "Resuming by setting Spec.Suspend=false; RayService should become Ready with the upgraded spec") + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + rayService.Spec.Suspend = false + _, err = test.Client().Ray().RayV1().RayServices(namespace.Name).Update(test.Ctx(), rayService, metav1.UpdateOptions{}) + g.Expect(err).NotTo(HaveOccurred()) + + g.Eventually(RayService(test, rayService.Namespace, rayService.Name), TestTimeoutMedium). + Should(WithTransform(IsRayServiceReady, BeTrue())) + + rayService, err = GetRayService(test, namespace.Name, rayServiceName) + g.Expect(err).NotTo(HaveOccurred()) + g.Expect(rayService.Spec.RayClusterSpec.RayVersion).To(HaveSuffix("-upgrade")) +} diff --git a/ray-operator/test/support/ray.go b/ray-operator/test/support/ray.go index 0f6ff06dd53..7a993631522 100644 --- a/ray-operator/test/support/ray.go +++ b/ray-operator/test/support/ray.go @@ -236,6 +236,14 @@ func IsRayServiceRollingBack(service *rayv1.RayService) bool { return meta.IsStatusConditionTrue(service.Status.Conditions, string(rayv1.RollbackInProgress)) } +func IsRayServiceSuspending(service *rayv1.RayService) bool { + return meta.IsStatusConditionTrue(service.Status.Conditions, string(rayv1.RayServiceSuspending)) +} + +func IsRayServiceSuspended(service *rayv1.RayService) bool { + return meta.IsStatusConditionTrue(service.Status.Conditions, string(rayv1.RayServiceSuspended)) +} + func RayServicesNumEndPoints(service *rayv1.RayService) int32 { return service.Status.NumServeEndpoints }