diff --git a/app/internal/k8s/app_client.go b/app/internal/k8s/app_client.go index 70a92a33e81..8b608600615 100644 --- a/app/internal/k8s/app_client.go +++ b/app/internal/k8s/app_client.go @@ -26,6 +26,7 @@ import ( "github.com/flyteorg/flyte/v2/app/internal/config" "github.com/flyteorg/flyte/v2/flytestdlib/k8s" "github.com/flyteorg/flyte/v2/flytestdlib/logger" + "github.com/flyteorg/flyte/v2/flytestdlib/utils" flyteapp "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/app" flytecore "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/core" ) @@ -582,7 +583,7 @@ func (c *AppK8sClient) buildKService(app *flyteapp.App) (*servingv1.Service, err } // buildPodSpec constructs a corev1.PodSpec from an App Spec. -// Supports Container payload only for now; K8sPod support can be added in a follow-up. +// Supports Container and K8sPod payloads. func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { switch p := spec.GetAppPayload().(type) { case *flyteapp.Spec_Container: @@ -612,9 +613,18 @@ func buildPodSpec(spec *flyteapp.Spec) (corev1.PodSpec, error) { }, nil case *flyteapp.Spec_Pod: - // K8sPod payloads are not yet supported — the pod spec serialization - // from flyteplugins is needed for a complete implementation. - return corev1.PodSpec{}, fmt.Errorf("K8sPod app payload is not yet supported") + pod := p.Pod + if pod == nil || pod.GetPodSpec() == nil { + return corev1.PodSpec{}, fmt.Errorf("K8sPod app payload has no pod spec") + } + var podSpec corev1.PodSpec + if err := utils.UnmarshalStructToObj(pod.GetPodSpec(), &podSpec); err != nil { + return corev1.PodSpec{}, fmt.Errorf("failed to unmarshal K8sPod spec: %w", err) + } + if podSpec.EnableServiceLinks == nil { + podSpec.EnableServiceLinks = boolPtr(false) + } + return podSpec, nil default: return corev1.PodSpec{}, fmt.Errorf("app spec has no payload (container or pod required)") diff --git a/app/internal/k8s/app_client_test.go b/app/internal/k8s/app_client_test.go index 29a84cb2ca1..c552de7deb0 100644 --- a/app/internal/k8s/app_client_test.go +++ b/app/internal/k8s/app_client_test.go @@ -4,9 +4,12 @@ import ( "context" "crypto/sha256" "encoding/hex" + "encoding/json" "testing" "time" + "google.golang.org/protobuf/types/known/structpb" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" @@ -740,3 +743,139 @@ func TestKserviceToStatus_Messages(t *testing.T) { }) } } + + +// transformStructToStructPB converts an arbitrary Go object into a *structpb.Struct +// by round-tripping through JSON. It fails the test on any marshaling error. +func transformStructToStructPB(t *testing.T, obj interface{}) *structpb.Struct { + t.Helper() + data, err := json.Marshal(obj) + require.NoError(t, err) + m := make(map[string]interface{}) + err = json.Unmarshal(data, &m) + require.NoError(t, err) + s, err := structpb.NewStruct(m) + require.NoError(t, err) + return s +} + +func TestBuildPodSpec_Container(t *testing.T) { + spec := &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Container{ + Container: &flytecoreapp.Container{ + Image: "nginx:latest", + Command: []string{"nginx"}, + Args: []string{"-g", "daemon off;"}, + Env: []*flytecoreapp.KeyValuePair{ + {Key: "FOO", Value: "bar"}, + }, + Ports: []*flytecoreapp.ContainerPort{ + {ContainerPort: 8080, Name: "http"}, + }, + Resources: &flytecoreapp.Resources{ + Requests: []*flytecoreapp.Resources_ResourceEntry{ + {Name: flytecoreapp.Resources_CPU, Value: "100m"}, + {Name: flytecoreapp.Resources_MEMORY, Value: "128Mi"}, + }, + }, + }, + }, + } + + podSpec, err := buildPodSpec(spec) + require.NoError(t, err) + require.Len(t, podSpec.Containers, 1) + assert.Equal(t, "app", podSpec.Containers[0].Name) + assert.Equal(t, "nginx:latest", podSpec.Containers[0].Image) + assert.Equal(t, []string{"nginx"}, podSpec.Containers[0].Command) + assert.Equal(t, []string{"-g", "daemon off;"}, podSpec.Containers[0].Args) + assert.Equal(t, []corev1.EnvVar{{Name: "FOO", Value: "bar"}}, podSpec.Containers[0].Env) + assert.Equal(t, []corev1.ContainerPort{{ContainerPort: 8080, Name: "http"}}, podSpec.Containers[0].Ports) + assert.Equal(t, "100m", podSpec.Containers[0].Resources.Requests.Cpu().String()) + assert.Equal(t, "128Mi", podSpec.Containers[0].Resources.Requests.Memory().String()) + assert.NotNil(t, podSpec.EnableServiceLinks) + assert.False(t, *podSpec.EnableServiceLinks) +} + +func TestBuildPodSpec_Pod(t *testing.T) { + podSpecMap := map[string]interface{}{ + "containers": []map[string]interface{}{ + { + "name": "app", + "image": "my-image:v1", + "ports": []map[string]interface{}{ + {"containerPort": float64(80), "name": "http"}, + }, + "resources": map[string]interface{}{ + "requests": map[string]interface{}{ + "cpu": "250m", + "memory": "256Mi", + }, + }, + }, + }, + "restartPolicy": "Always", + } + + spec := &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Pod{ + Pod: &flytecoreapp.K8SPod{ + PodSpec: transformStructToStructPB(t, podSpecMap), + }, + }, + } + + podSpec, err := buildPodSpec(spec) + require.NoError(t, err) + require.Len(t, podSpec.Containers, 1) + assert.Equal(t, "app", podSpec.Containers[0].Name) + assert.Equal(t, "my-image:v1", podSpec.Containers[0].Image) + assert.Len(t, podSpec.Containers[0].Ports, 1) + assert.Equal(t, int32(80), podSpec.Containers[0].Ports[0].ContainerPort) + assert.Equal(t, "250m", podSpec.Containers[0].Resources.Requests.Cpu().String()) + assert.Equal(t, "256Mi", podSpec.Containers[0].Resources.Requests.Memory().String()) + assert.Equal(t, corev1.RestartPolicyAlways, podSpec.RestartPolicy) + assert.NotNil(t, podSpec.EnableServiceLinks) + assert.False(t, *podSpec.EnableServiceLinks) +} + +func TestBuildPodSpec_Pod_NilPodSpec(t *testing.T) { + spec := &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Pod{ + Pod: &flytecoreapp.K8SPod{}, + }, + } + + _, err := buildPodSpec(spec) + require.Error(t, err) + assert.Contains(t, err.Error(), "has no pod spec") +} + +func TestBuildPodSpec_Pod_InvalidJSON(t *testing.T) { + // Create a Struct that cannot be unmarshaled into corev1.PodSpec. + // "containers" must be an array, not a string. + s := &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "containers": {Kind: &structpb.Value_StringValue{StringValue: "not-an-array"}}, + }, + } + + spec := &flyteapp.Spec{ + AppPayload: &flyteapp.Spec_Pod{ + Pod: &flytecoreapp.K8SPod{ + PodSpec: s, + }, + }, + } + + _, err := buildPodSpec(spec) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to unmarshal K8sPod spec") +} + +func TestBuildPodSpec_NoPayload(t *testing.T) { + spec := &flyteapp.Spec{} + _, err := buildPodSpec(spec) + require.Error(t, err) + assert.Contains(t, err.Error(), "app spec has no payload") +}