Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions templates/test/ci/cluster-template-prow-aks-aso-kuberay.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions templates/test/ci/prow-aks-aso-kuberay/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ kind: Kustomization
namespace: default
resources:
- ../../../flavors/aks-aso
- patches/aks-gpu-pool.yaml

patches:
- patch: |-
Expand Down Expand Up @@ -43,6 +44,16 @@ patches:
value: "${AZURE_AKS_NODE_MACHINE_TYPE:=Standard_D2s_v3}"
target:
kind: AzureASOManagedMachinePool
- patch: |-
- op: test
path: /spec/resources/0/kind
value: ManagedClustersAgentPool
- op: replace
path: /spec/resources/0/spec/vmSize
value: "${AZURE_GPU_NODE_MACHINE_TYPE:=Standard_NC6s_v3}"
target:
kind: AzureASOManagedMachinePool
name: ".*-gpupool"

sortOptions:
order: fifo
42 changes: 42 additions & 0 deletions templates/test/ci/prow-aks-aso-kuberay/patches/aks-gpu-pool.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
apiVersion: cluster.x-k8s.io/v1beta1
kind: MachinePool
metadata:
name: "${CLUSTER_NAME}-gpupool"
spec:
clusterName: "${CLUSTER_NAME}"
replicas: 1
template:
metadata: {}
spec:
bootstrap:
dataSecretName: ""
clusterName: "${CLUSTER_NAME}"
infrastructureRef:
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: AzureASOManagedMachinePool
name: "${CLUSTER_NAME}-gpupool"
version: "${KUBERNETES_VERSION}"
---
apiVersion: infrastructure.cluster.x-k8s.io/v1beta1
kind: AzureASOManagedMachinePool
metadata:
name: "${CLUSTER_NAME}-gpupool"
spec:
resources:
- apiVersion: "containerservice.azure.com/v1api20240901"
kind: ManagedClustersAgentPool
metadata:
name: ${CLUSTER_NAME}-gpupool
annotations:
serviceoperator.azure.com/credential-from: ${ASO_CREDENTIAL_SECRET_NAME}
spec:
azureName: gpupool
owner:
name: ${CLUSTER_NAME}
mode: User
type: VirtualMachineScaleSets
vmSize: "${AZURE_GPU_NODE_MACHINE_TYPE:=Standard_NC6s_v3}"
tags:
EnableManagedGPUExperience: "true"
nodeTaints:
- "sku=gpu:NoSchedule"
270 changes: 270 additions & 0 deletions test/e2e/azure_kuberay.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,3 +505,273 @@ func describeRayJobStatus(ctx context.Context, dynamicClient dynamic.Interface,
b.WriteString(describeKubeRayOperatorLogs(ctx, clientset))
return b.String()
}

// KubeRayGPUClusterSpecInput is the input for KubeRayGPUClusterSpec.
type KubeRayGPUClusterSpecInput struct {
BootstrapClusterProxy framework.ClusterProxy
Namespace *corev1.Namespace
ClusterName string
SkipCleanup bool
}

// KubeRayGPUClusterSpec implements a test that verifies the KubeRay operator can schedule
// Ray workers on AKS GPU nodes provisioned with ManagedGPUExperiencePreview.
func KubeRayGPUClusterSpec(ctx context.Context, inputGetter func() KubeRayGPUClusterSpecInput) {
var (
specName = "kuberay-gpu"
input KubeRayGPUClusterSpecInput
)

input = inputGetter()
Expect(input.BootstrapClusterProxy).NotTo(BeNil(), "Invalid argument. input.BootstrapClusterProxy can't be nil when calling %s spec", specName)
Expect(input.Namespace).NotTo(BeNil(), "Invalid argument. input.Namespace can't be nil when calling %s spec", specName)
Expect(input.ClusterName).NotTo(BeEmpty(), "Invalid argument. input.ClusterName can't be empty when calling %s spec", specName)

By("creating a Kubernetes client to the workload cluster")
clusterProxy := input.BootstrapClusterProxy.GetWorkloadCluster(ctx, input.Namespace.Name, input.ClusterName)
Expect(clusterProxy).NotTo(BeNil())
clientset := clusterProxy.GetClientSet()
Expect(clientset).NotTo(BeNil())

By("installing the KubeRay operator via Helm")
InstallKubeRayOperator(ctx, clusterProxy, specName)

By("creating a GPU-enabled RayCluster")
dynamicClient := newDynamicClient(clusterProxy)
rayCluster := newGPURayClusterUnstructured("raycluster-gpu-e2e", corev1.NamespaceDefault)
_, err := dynamicClient.Resource(rayClusterGVR).Namespace(corev1.NamespaceDefault).Create(ctx, rayCluster, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())

By("waiting for the GPU RayCluster to become ready")
Eventually(func() bool {
rc, err := dynamicClient.Resource(rayClusterGVR).Namespace(corev1.NamespaceDefault).Get(ctx, "raycluster-gpu-e2e", metav1.GetOptions{})
if err != nil {
return false
}
state, found, err := unstructured.NestedString(rc.Object, "status", "state")
if err != nil || !found {
return false
}
return state == "ready"
}, e2eConfig.GetIntervals(specName, "wait-raycluster-ready")...).Should(BeTrue(), func() string {
return describeKubeRayOperatorLogs(ctx, clientset)
})

By("verifying the GPU worker pod is running on a GPU node")
Eventually(func() bool {
pods, err := clientset.CoreV1().Pods(corev1.NamespaceDefault).List(ctx, metav1.ListOptions{
LabelSelector: "ray.io/node-type=worker",
})
if err != nil || len(pods.Items) == 0 {
return false
}
for _, pod := range pods.Items {
if pod.Status.Phase != corev1.PodRunning {
continue
}
// Verify the pod was scheduled on the GPU pool
if pod.Spec.NodeName == "" {
continue
}
node, err := clientset.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
if err != nil {
continue
}
if pool, ok := node.Labels["agentpool"]; ok && pool == "gpupool" {
return true
}
}
return false
}, e2eConfig.GetIntervals(specName, "wait-deployment")...).Should(BeTrue(), "GPU worker pod did not reach Running state on a GPU node")

By("running a RayJob that verifies GPU access")
rayJob := newGPURayJobUnstructured("rayjob-gpu-e2e", corev1.NamespaceDefault)
_, err = dynamicClient.Resource(rayJobGVR).Namespace(corev1.NamespaceDefault).Create(ctx, rayJob, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())

By("waiting for the GPU RayJob to complete")
Eventually(func() bool {
rj, err := dynamicClient.Resource(rayJobGVR).Namespace(corev1.NamespaceDefault).Get(ctx, "rayjob-gpu-e2e", metav1.GetOptions{})
if err != nil {
return false
}
deploymentStatus, found, err := unstructured.NestedString(rj.Object, "status", "jobDeploymentStatus")
if err != nil || !found {
return false
}
return deploymentStatus == "Complete"
}, e2eConfig.GetIntervals(specName, "wait-rayjob-complete")...).Should(BeTrue(), func() string {
return describeRayJobStatus(ctx, dynamicClient, "rayjob-gpu-e2e", corev1.NamespaceDefault, clientset)
})

By("verifying the GPU RayJob completed with SUCCEEDED status")
rj, err := dynamicClient.Resource(rayJobGVR).Namespace(corev1.NamespaceDefault).Get(ctx, "rayjob-gpu-e2e", metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
jobStatus, _, _ := unstructured.NestedString(rj.Object, "status", "jobStatus")
Expect(jobStatus).To(Equal("SUCCEEDED"), "expected GPU RayJob status to be SUCCEEDED but got %s", jobStatus)

if !input.SkipCleanup {
By("deleting the GPU RayJob")
err = dynamicClient.Resource(rayJobGVR).Namespace(corev1.NamespaceDefault).Delete(ctx, "rayjob-gpu-e2e", metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())

By("deleting the GPU RayCluster")
err = dynamicClient.Resource(rayClusterGVR).Namespace(corev1.NamespaceDefault).Delete(ctx, "raycluster-gpu-e2e", metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
}
}

// gpuToleration returns the toleration needed to schedule pods on GPU-tainted nodes.
func gpuToleration() map[string]interface{} {
return map[string]interface{}{
"key": "sku",
"operator": "Equal",
"value": "gpu",
"effect": "NoSchedule",
}
}

// newGPURayClusterUnstructured creates a RayCluster with GPU worker nodes that tolerate
// the GPU taint and request nvidia.com/gpu resources.
func newGPURayClusterUnstructured(name, namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "ray.io/v1",
"kind": "RayCluster",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"rayVersion": "2.41.0",
"headGroupSpec": map[string]interface{}{
"rayStartParams": map[string]interface{}{
"dashboard-host": "0.0.0.0",
},
"template": map[string]interface{}{
"spec": map[string]interface{}{
"nodeSelector": map[string]interface{}{
"kubernetes.io/os": "linux",
},
"containers": []interface{}{
map[string]interface{}{
"name": "ray-head",
"image": rayImage,
"ports": []interface{}{
map[string]interface{}{
"containerPort": int64(6379),
"name": "gcs-server",
},
map[string]interface{}{
"containerPort": int64(8265),
"name": "dashboard",
},
map[string]interface{}{
"containerPort": int64(10001),
"name": "client",
},
},
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "300m",
"memory": "1Gi",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "2Gi",
},
},
},
},
},
},
},
"workerGroupSpecs": []interface{}{
map[string]interface{}{
"replicas": int64(1),
"minReplicas": int64(1),
"maxReplicas": int64(1),
"groupName": "gpu-group",
"rayStartParams": map[string]interface{}{
"num-gpus": "1",
},
"template": map[string]interface{}{
"spec": map[string]interface{}{
"nodeSelector": map[string]interface{}{
"kubernetes.io/os": "linux",
"agentpool": "gpupool",
},
"tolerations": []interface{}{
gpuToleration(),
},
"containers": []interface{}{
map[string]interface{}{
"name": "ray-worker",
"image": rayImage,
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "300m",
"memory": "1Gi",
"nvidia.com/gpu": "1",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "2Gi",
"nvidia.com/gpu": "1",
},
},
},
},
},
},
},
},
},
},
}
}

// newGPURayJobUnstructured creates a RayJob that verifies GPU access via nvidia-smi,
// using the existing GPU RayCluster rather than creating an inline one.
func newGPURayJobUnstructured(name, namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "ray.io/v1",
"kind": "RayJob",
"metadata": map[string]interface{}{
"name": name,
"namespace": namespace,
},
"spec": map[string]interface{}{
"entrypoint": "python -c \"import ray; ray.init(); print('GPU IDs:', ray.get_gpu_ids()); assert len(ray.get_gpu_ids()) > 0, 'No GPUs found'; print('GPU test passed'); ray.shutdown()\"",
"clusterSelector": map[string]interface{}{
"ray.io/cluster": "raycluster-gpu-e2e",
},
"submitterPodTemplate": map[string]interface{}{
"spec": map[string]interface{}{
"restartPolicy": "Never",
"nodeSelector": map[string]interface{}{
"kubernetes.io/os": "linux",
},
"containers": []interface{}{
map[string]interface{}{
"name": "ray-job-submitter",
"image": rayImage,
"resources": map[string]interface{}{
"requests": map[string]interface{}{
"cpu": "200m",
"memory": "200Mi",
},
"limits": map[string]interface{}{
"cpu": "500m",
"memory": "500Mi",
},
},
},
},
},
},
},
},
}
}
Loading
Loading