diff --git a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml index e7540b926..6aa65c24b 100644 --- a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml +++ b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml @@ -60,6 +60,27 @@ spec: spec: description: HTTPScaledObjectSpec defines the desired state of HTTPScaledObject properties: + coldStartStreamingCallback: + description: (optional) Configuration for streaming SSE callback messages + during cold starts + properties: + intervalSeconds: + default: 5 + description: How often to send keepalive events in seconds (Default + 5) + format: int32 + type: integer + keepaliveMessage: + description: The message to send as SSE event content for keepalive + events (Default "") + type: string + message: + description: The message to send as SSE event content during cold + start + type: string + required: + - message + type: object coldStartTimeoutFailoverRef: description: (optional) The name of the failover service to route HTTP requests to when the target is not available diff --git a/config/crd/bases/http.keda.sh_interceptorroutes.yaml b/config/crd/bases/http.keda.sh_interceptorroutes.yaml index c6da4f3c0..647bde7fa 100644 --- a/config/crd/bases/http.keda.sh_interceptorroutes.yaml +++ b/config/crd/bases/http.keda.sh_interceptorroutes.yaml @@ -80,12 +80,32 @@ spec: x-kubernetes-validations: - message: exactly one of 'port' or 'portName' must be set rule: has(self.port) != has(self.portName) - required: - - fallback + streamingCallback: + description: Streaming callback to send SSE events while scaling + from zero. + properties: + interval: + default: 5s + description: Interval between keepalive events. + type: string + keepaliveMessage: + description: |- + Message text to include in SSE keepalive event delta content. + When empty, keepalive events contain an empty string. + type: string + message: + description: Message text to include in the SSE event delta + content. + minLength: 1 + type: string + required: + - message + type: object type: object x-kubernetes-validations: - - message: '''fallback'' must be set' - rule: has(self.fallback) + - message: at least one of 'fallback' or 'streamingCallback' must + be set + rule: has(self.fallback) || has(self.streamingCallback) rules: description: Routing rules that define how requests are matched to this target. diff --git a/interceptor/middleware/endpoint_resolver.go b/interceptor/middleware/endpoint_resolver.go index 02cdf94ad..91bca4c11 100644 --- a/interceptor/middleware/endpoint_resolver.go +++ b/interceptor/middleware/endpoint_resolver.go @@ -1,13 +1,17 @@ package middleware import ( + "bytes" "context" + "encoding/json" "fmt" + "io" "net/http" "strconv" "time" "github.com/kedacore/http-add-on/interceptor/handler" + httpv1beta1 "github.com/kedacore/http-add-on/operator/apis/http/v1beta1" kedahttp "github.com/kedacore/http-add-on/pkg/http" "github.com/kedacore/http-add-on/pkg/k8s" "github.com/kedacore/http-add-on/pkg/util" @@ -64,6 +68,22 @@ func (er *EndpointResolver) ServeHTTP(w http.ResponseWriter, r *http.Request) { } serviceKey := ir.Namespace + "/" + ir.Spec.Target.Service + + // Streaming callback: if the route has a StreamingCallback configured and + // the backend is not ready, check if this is a streaming request. If so, + // send SSE keepalive events while waiting for the backend. + hasStreamingCallback := ir.Spec.ColdStart != nil && ir.Spec.ColdStart.StreamingCallback != nil + if hasStreamingCallback && !er.readyCache.HasReadyEndpoints(serviceKey) { + streaming, err := isStreamingRequest(r) + if err != nil { + util.LoggerFromContext(ctx).Error(err, "failed to check streaming request") + } + if err == nil && streaming { + er.serveStreamingCallback(waitCtx, ctx, w, r, serviceKey, ir) + return + } + } + isColdStart, err := er.readyCache.WaitForReady(waitCtx, serviceKey) if err != nil { // No fallback, return an error @@ -100,3 +120,159 @@ func (er *EndpointResolver) ServeHTTP(w http.ResponseWriter, r *http.Request) { er.next.ServeHTTP(w, r) } + +// serveStreamingCallback handles cold-start waits for streaming requests by +// sending OpenAI-compatible SSE keepalive events until the backend is ready. +func (er *EndpointResolver) serveStreamingCallback( + waitCtx, parentCtx context.Context, + w http.ResponseWriter, + r *http.Request, + serviceKey string, + ir *httpv1beta1.InterceptorRoute, +) { + logger := util.LoggerFromContext(parentCtx) + cb := ir.Spec.ColdStart.StreamingCallback + + rc := http.NewResponseController(w) + + // Write SSE headers — commits to a 200 response. + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + + // Send initial loading message. + if err := writeSSEEvent(w, cb.Message); err != nil { + logger.Error(err, "failed to write initial streaming callback event") + return + } + if err := rc.Flush(); err != nil { + logger.Error(err, "failed to flush initial streaming callback event") + return + } + + interval := cb.Interval.Duration + if interval <= 0 { + interval = 5 * time.Second + } + + // Start keepalive goroutine. + callbackDone := make(chan struct{}) + callbackStopped := make(chan struct{}) + go func() { + defer close(callbackStopped) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if err := writeSSEEvent(w, cb.KeepaliveMessage); err != nil { + logger.Error(err, "failed to write keepalive streaming callback event") + return + } + if err := rc.Flush(); err != nil { + logger.Error(err, "failed to flush keepalive streaming callback event") + return + } + case <-callbackDone: + return + } + } + }() + + // Wait for the backend to become ready. + isColdStart, err := er.readyCache.WaitForReady(waitCtx, serviceKey) + close(callbackDone) + <-callbackStopped // ensure goroutine exits before touching w again + + if err != nil { + // Already committed to 200, so send an SSE error event instead of HTTP error. + logger.Error(err, "backend not ready during streaming callback") + errMsg := fmt.Sprintf("Backend did not become ready: %v", err) + _ = writeSSEEvent(w, errMsg) + _, _ = fmt.Fprintf(w, "data: [DONE]\n\n") + _ = rc.Flush() + return + } + + if er.cfg.EnableColdStartHeader { + w.Header().Set(kedahttp.HeaderColdStart, strconv.FormatBool(isColdStart)) + } + + // Send a visual separator so the real model response starts on a fresh + // line, rather than being appended directly after the keepalive dots. + // The content ends with a zero-width space (U+200B) after the final + // newline because common shell-based clients use bash command + // substitution which strips trailing newlines. The zero-width space + // anchors the newline so it survives extraction while remaining + // invisible in the terminal output. + if err := writeSSEEvent(w, "\n\n---\n\u200B"); err != nil { + logger.Error(err, "failed to write separator streaming callback event") + } else { + _ = rc.Flush() + } + + // Wrap the writer to suppress duplicate WriteHeader from the upstream proxy. + er.next.ServeHTTP(&headerSuppressingWriter{ + ResponseWriter: w, + headerWritten: true, + }, r) +} + +// isStreamingRequest checks whether the JSON request body contains "stream": true. +// It restores the body so subsequent handlers can re-read it. +func isStreamingRequest(r *http.Request) (bool, error) { + if r.Body == nil { + return false, nil + } + body, err := io.ReadAll(r.Body) + r.Body = io.NopCloser(bytes.NewReader(body)) + if err != nil { + return false, err + } + if len(body) == 0 { + return false, nil + } + var payload struct { + Stream bool `json:"stream"` + } + if err := json.Unmarshal(body, &payload); err != nil { + return false, nil //nolint:nilerr // non-JSON body is not an error + } + return payload.Stream, nil +} + +// writeSSEEvent writes a single OpenAI-compatible chat.completion.chunk SSE event. +func writeSSEEvent(w http.ResponseWriter, content string) error { + contentJSON, _ := json.Marshal(content) + _, err := fmt.Fprintf(w, + "data: {\"id\":\"keda-cold-start\",\"object\":\"chat.completion.chunk\",\"created\":%d,\"model\":\"system\",\"choices\":[{\"index\":0,\"delta\":{\"content\":%s},\"finish_reason\":null}]}\n\n", + time.Now().Unix(), + contentJSON, + ) + return err +} + +// headerSuppressingWriter wraps a ResponseWriter and silently ignores +// WriteHeader calls after the first one. This prevents the upstream +// reverse proxy from logging "superfluous response.WriteHeader call" +// when we have already committed to a 200 for SSE streaming. +type headerSuppressingWriter struct { + http.ResponseWriter + headerWritten bool +} + +func (w *headerSuppressingWriter) WriteHeader(code int) { + if w.headerWritten { + return + } + w.headerWritten = true + w.ResponseWriter.WriteHeader(code) +} + +// Unwrap exposes the underlying ResponseWriter so that +// http.NewResponseController can find optional interfaces (Flusher, Hijacker, etc.). +func (w *headerSuppressingWriter) Unwrap() http.ResponseWriter { + return w.ResponseWriter +} diff --git a/interceptor/middleware/endpoint_resolver_test.go b/interceptor/middleware/endpoint_resolver_test.go index a81ffb4b3..48cdbf9fb 100644 --- a/interceptor/middleware/endpoint_resolver_test.go +++ b/interceptor/middleware/endpoint_resolver_test.go @@ -1,10 +1,12 @@ package middleware import ( + "bytes" "context" "net/http" "net/http/httptest" "net/url" + "strings" "testing" "testing/synctest" "time" @@ -400,6 +402,208 @@ func TestEndpointResolver_PerRouteZeroReadinessDisablesGlobal(t *testing.T) { }) } +func TestEndpointResolver_StreamingCallbackDuringColdStart(t *testing.T) { + cache := k8s.NewReadyEndpointsCache(logr.Discard()) + + var nextCalled bool + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = true + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("data: {\"id\":\"real\",\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hello\"}}]}\n\n")) + }) + + ir := defaultIR() + ir.Spec.ColdStart = &httpv1beta1.ColdStartSpec{ + StreamingCallback: &httpv1beta1.StreamingCallbackSpec{ + Message: "Model is waking up...", + Interval: metav1.Duration{Duration: 100 * time.Millisecond}, + }, + } + + mw := NewEndpointResolver(next, cache, EndpointResolverConfig{ + ReadinessTimeout: 5 * time.Second, + EnableColdStartHeader: true, + }) + + rec := httptest.NewRecorder() + req := newStreamingRequest(t, ir, `{"stream":true,"messages":[]}`) + + // Mark ready after a short delay to simulate cold start. + go func() { + time.Sleep(150 * time.Millisecond) + addReadyEndpoint(cache) + }() + + mw.ServeHTTP(rec, req) + + if !nextCalled { + t.Fatal("expected next handler to be called after cold start") + } + if got, want := rec.Header().Get("Content-Type"), "text/event-stream"; got != want { + t.Fatalf("Content-Type = %q, want %q", got, want) + } + body := rec.Body.String() + if !strings.Contains(body, "Model is waking up...") { + t.Fatalf("response body should contain callback message, got:\n%s", body) + } + if !strings.Contains(body, `"id":"real"`) { + t.Fatalf("response body should contain upstream response, got:\n%s", body) + } +} + +func TestEndpointResolver_StreamingCallbackNonStreamingRequest(t *testing.T) { + cache := k8s.NewReadyEndpointsCache(logr.Discard()) + + var nextCalled bool + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = true + w.WriteHeader(http.StatusOK) + }) + + ir := defaultIR() + ir.Spec.ColdStart = &httpv1beta1.ColdStartSpec{ + StreamingCallback: &httpv1beta1.StreamingCallbackSpec{ + Message: "Model is waking up...", + Interval: metav1.Duration{Duration: 100 * time.Millisecond}, + }, + } + + mw := NewEndpointResolver(next, cache, EndpointResolverConfig{ + ReadinessTimeout: 25 * time.Millisecond, + }) + + rec := httptest.NewRecorder() + // Non-streaming request — "stream": false + req := newStreamingRequest(t, ir, `{"stream":false,"messages":[]}`) + mw.ServeHTTP(rec, req) + + if nextCalled { + t.Fatal("expected next handler not to be called on timeout") + } + // Should use normal path and return timeout error + if got, want := rec.Code, http.StatusGatewayTimeout; got != want { + t.Fatalf("status code = %d, want %d", got, want) + } +} + +func TestEndpointResolver_StreamingCallbackTimeout(t *testing.T) { + cache := k8s.NewReadyEndpointsCache(logr.Discard()) + + var nextCalled bool + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = true + }) + + ir := defaultIR() + ir.Spec.ColdStart = &httpv1beta1.ColdStartSpec{ + StreamingCallback: &httpv1beta1.StreamingCallbackSpec{ + Message: "Loading...", + Interval: metav1.Duration{Duration: 50 * time.Millisecond}, + }, + } + + mw := NewEndpointResolver(next, cache, EndpointResolverConfig{ + ReadinessTimeout: 100 * time.Millisecond, + }) + + rec := httptest.NewRecorder() + req := newStreamingRequest(t, ir, `{"stream":true}`) + mw.ServeHTTP(rec, req) + + if nextCalled { + t.Fatal("expected next handler not to be called on timeout") + } + body := rec.Body.String() + if !strings.Contains(body, "Loading...") { + t.Fatalf("response body should contain callback message, got:\n%s", body) + } + if !strings.Contains(body, "Backend did not become ready") { + t.Fatalf("response body should contain error message, got:\n%s", body) + } + if !strings.Contains(body, "data: [DONE]") { + t.Fatalf("response body should contain [DONE] marker, got:\n%s", body) + } +} + +func TestEndpointResolver_StreamingCallbackNotConfigured(t *testing.T) { + cache := k8s.NewReadyEndpointsCache(logr.Discard()) + addReadyEndpoint(cache) + + var nextCalled bool + next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + nextCalled = true + w.WriteHeader(http.StatusOK) + }) + + // No StreamingCallback configured + ir := defaultIR() + + mw := NewEndpointResolver(next, cache, EndpointResolverConfig{ + ReadinessTimeout: 5 * time.Second, + }) + + rec := httptest.NewRecorder() + req := newStreamingRequest(t, ir, `{"stream":true}`) + mw.ServeHTTP(rec, req) + + if !nextCalled { + t.Fatal("expected next handler to be called (warm path)") + } + // Should NOT have SSE content type + if got := rec.Header().Get("Content-Type"); got == "text/event-stream" { + t.Fatal("should not use SSE when StreamingCallback is not configured") + } +} + +func TestIsStreamingRequest(t *testing.T) { + tests := map[string]struct { + body string + want bool + }{ + "stream true": {body: `{"stream":true,"model":"test"}`, want: true}, + "stream false": {body: `{"stream":false}`, want: false}, + "no stream field": {body: `{"model":"test"}`, want: false}, + "empty body": {body: "", want: false}, + "invalid json": {body: `not json`, want: false}, + } + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + var req *http.Request + if tt.body == "" { + req = httptest.NewRequest("POST", "/v1/chat/completions", nil) + } else { + req = httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(tt.body)) + } + got, err := isStreamingRequest(req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Fatalf("isStreamingRequest() = %v, want %v", got, tt.want) + } + // Verify body is restored for non-empty inputs + if tt.body != "" && req.Body != nil { + restored := new(bytes.Buffer) + _, _ = restored.ReadFrom(req.Body) + if restored.String() != tt.body { + t.Fatalf("body not restored: got %q, want %q", restored.String(), tt.body) + } + } + }) + } +} + +func newStreamingRequest(t *testing.T, ir *httpv1beta1.InterceptorRoute, body string) *http.Request { + t.Helper() + req := httptest.NewRequest("POST", "/v1/chat/completions", bytes.NewBufferString(body)) + req.Header.Set("Content-Type", "application/json") + ctx := util.ContextWithLogger(req.Context(), logr.Discard()) + ctx = util.ContextWithInterceptorRoute(ctx, ir) + ctx = util.ContextWithUpstreamURL(ctx, &url.URL{Host: "upstream"}) + req = req.WithContext(ctx) + return req +} + func newRequest(t *testing.T, ir *httpv1beta1.InterceptorRoute) *http.Request { t.Helper() req := httptest.NewRequest("GET", "/test", nil) diff --git a/operator/apis/http/v1alpha1/httpscaledobject_types.go b/operator/apis/http/v1alpha1/httpscaledobject_types.go index 85b9a5e9c..cc6b4346b 100644 --- a/operator/apis/http/v1alpha1/httpscaledobject_types.go +++ b/operator/apis/http/v1alpha1/httpscaledobject_types.go @@ -81,6 +81,20 @@ func (s *ColdStartTimeoutFailoverRef) GetPortName() string { return s.PortName } +// ColdStartStreamingCallbackSpec configures SSE streaming callback messages +// sent to clients during cold starts to keep the connection alive. +type ColdStartStreamingCallbackSpec struct { + // The message to send as SSE event content during cold start + Message string `json:"message"` + // How often to send keepalive events in seconds (Default 5) + // +kubebuilder:default=5 + // +optional + IntervalSeconds int32 `json:"intervalSeconds,omitempty"` + // The message to send as SSE event content for keepalive events (Default "") + // +optional + KeepaliveMessage string `json:"keepaliveMessage,omitempty"` +} + // ReplicaStruct contains the minimum and maximum amount of replicas to have in the deployment type ReplicaStruct struct { // Minimum amount of replicas to have in the deployment (Default 0) @@ -188,6 +202,9 @@ type HTTPScaledObjectSpec struct { // (optional) Timeouts that override the global ones // +optional Timeouts *HTTPScaledObjectTimeoutsSpec `json:"timeouts,omitempty" description:"Timeouts that override the global ones"` + // (optional) Configuration for streaming SSE callback messages during cold starts + // +optional + ColdStartStreamingCallback *ColdStartStreamingCallbackSpec `json:"coldStartStreamingCallback,omitempty"` } // HTTPScaledObjectStatus defines the observed state of HTTPScaledObject diff --git a/operator/apis/http/v1alpha1/zz_generated.deepcopy.go b/operator/apis/http/v1alpha1/zz_generated.deepcopy.go index 9d896b40d..10a6573db 100644 --- a/operator/apis/http/v1alpha1/zz_generated.deepcopy.go +++ b/operator/apis/http/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ColdStartStreamingCallbackSpec) DeepCopyInto(out *ColdStartStreamingCallbackSpec) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ColdStartStreamingCallbackSpec. +func (in *ColdStartStreamingCallbackSpec) DeepCopy() *ColdStartStreamingCallbackSpec { + if in == nil { + return nil + } + out := new(ColdStartStreamingCallbackSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ColdStartTimeoutFailoverRef) DeepCopyInto(out *ColdStartTimeoutFailoverRef) { *out = *in @@ -170,6 +185,11 @@ func (in *HTTPScaledObjectSpec) DeepCopyInto(out *HTTPScaledObjectSpec) { *out = new(HTTPScaledObjectTimeoutsSpec) **out = **in } + if in.ColdStartStreamingCallback != nil { + in, out := &in.ColdStartStreamingCallback, &out.ColdStartStreamingCallback + *out = new(ColdStartStreamingCallbackSpec) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HTTPScaledObjectSpec. diff --git a/operator/apis/http/v1beta1/interceptorroute_types.go b/operator/apis/http/v1beta1/interceptorroute_types.go index c3281dd52..e6bf675d7 100644 --- a/operator/apis/http/v1beta1/interceptorroute_types.go +++ b/operator/apis/http/v1beta1/interceptorroute_types.go @@ -118,12 +118,32 @@ type TargetRef struct { PortName string `json:"portName,omitzero"` } +// StreamingCallbackSpec configures SSE keepalive events sent to streaming +// clients while the backend scales from zero. +type StreamingCallbackSpec struct { + // Message text to include in the SSE event delta content. + // +kubebuilder:validation:MinLength=1 + Message string `json:"message"` + // Interval between keepalive events. + // +kubebuilder:default="5s" + // +optional + Interval metav1.Duration `json:"interval,omitzero"` + // Message text to include in SSE keepalive event delta content. + // When empty, keepalive events contain an empty string. + // +optional + KeepaliveMessage string `json:"keepaliveMessage,omitzero"` +} + // ColdStartSpec configures behavior while the target is not ready. -// +kubebuilder:validation:XValidation:rule="has(self.fallback)",message="'fallback' must be set" +// +kubebuilder:validation:XValidation:rule="has(self.fallback) || has(self.streamingCallback)",message="at least one of 'fallback' or 'streamingCallback' must be set" type ColdStartSpec struct { // Fallback service to route to when the target is scaling from zero // and the wait timeout expires. + // +optional Fallback *TargetRef `json:"fallback,omitzero"` + // Streaming callback to send SSE events while scaling from zero. + // +optional + StreamingCallback *StreamingCallbackSpec `json:"streamingCallback,omitzero"` } // InterceptorRouteTimeouts configures per-route request handling timeouts. diff --git a/operator/apis/http/v1beta1/zz_generated.deepcopy.go b/operator/apis/http/v1beta1/zz_generated.deepcopy.go index f49670b5a..b9e099a85 100644 --- a/operator/apis/http/v1beta1/zz_generated.deepcopy.go +++ b/operator/apis/http/v1beta1/zz_generated.deepcopy.go @@ -33,6 +33,11 @@ func (in *ColdStartSpec) DeepCopyInto(out *ColdStartSpec) { *out = new(TargetRef) **out = **in } + if in.StreamingCallback != nil { + in, out := &in.StreamingCallback, &out.StreamingCallback + *out = new(StreamingCallbackSpec) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ColdStartSpec. @@ -310,6 +315,22 @@ func (in *ScalingMetricSpec) DeepCopy() *ScalingMetricSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StreamingCallbackSpec) DeepCopyInto(out *StreamingCallbackSpec) { + *out = *in + out.Interval = in.Interval +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamingCallbackSpec. +func (in *StreamingCallbackSpec) DeepCopy() *StreamingCallbackSpec { + if in == nil { + return nil + } + out := new(StreamingCallbackSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TargetRef) DeepCopyInto(out *TargetRef) { *out = *in diff --git a/pkg/routing/table.go b/pkg/routing/table.go index e1f7dd3dc..4e67303d8 100644 --- a/pkg/routing/table.go +++ b/pkg/routing/table.go @@ -139,6 +139,21 @@ func (t *table) refreshMemory(ctx context.Context) error { } } + if cb := httpso.Spec.ColdStartStreamingCallback; cb != nil { + if ir.Spec.ColdStart == nil { + ir.Spec.ColdStart = &httpv1beta1.ColdStartSpec{} + } + interval := 5 * time.Second + if cb.IntervalSeconds > 0 { + interval = time.Duration(cb.IntervalSeconds) * time.Second + } + ir.Spec.ColdStart.StreamingCallback = &httpv1beta1.StreamingCallbackSpec{ + Message: cb.Message, + Interval: metav1.Duration{Duration: interval}, + KeepaliveMessage: cb.KeepaliveMessage, + } + } + if httpso.Spec.ScalingMetric != nil { if httpso.Spec.ScalingMetric.Concurrency != nil { ir.Spec.ScalingMetric.Concurrency = &httpv1beta1.ConcurrencyTargetSpec{