feat(interceptor): add SSE streaming callback for cold-start scaling#1592
feat(interceptor): add SSE streaming callback for cold-start scaling#1592guimou wants to merge 1 commit into
Conversation
When a backend is scaling from zero and the incoming request is a streaming request (JSON body with "stream": true), the interceptor can now send OpenAI-compatible SSE keepalive events to keep the client connection alive until the backend becomes ready. This is configured via a new `coldStartStreamingCallback` field on HTTPScaledObject (v1alpha1) and a `streamingCallback` field inside `coldStart` on InterceptorRoute (v1beta1). The spec includes the initial message, keepalive interval, and optional keepalive message text. Once the backend is ready, the interceptor inserts a visual separator and proxies the real upstream response through the same SSE stream. Signed-off-by: Guillaume Moutier <guimou@users.noreply.github.com>
✅ Snyk checks have passed. No issues have been found so far.
💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse. |
There was a problem hiding this comment.
Pull request overview
This PR adds an optional Server-Sent Events (SSE) “streaming callback” during cold starts so OpenAI-style streaming clients receive periodic chat.completion.chunk keepalive events while the backend scales from zero, then continues streaming the real upstream response once ready.
Changes:
- Add interceptor support for detecting streaming requests and emitting SSE keepalive/separator events during cold-start waits.
- Extend CRD APIs/manifests (v1alpha1 HTTPScaledObject and v1beta1 InterceptorRoute) to configure the streaming callback and relax cold-start validation to allow either fallback or streamingCallback.
- Convert v1alpha1
coldStartStreamingCallbackinto v1beta1coldStart.streamingCallbackin the routing table refresh logic, plus unit tests for the new behavior.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
interceptor/middleware/endpoint_resolver.go |
Adds SSE callback flow for streaming requests during cold starts, plus request-body streaming detection and a ResponseWriter wrapper. |
interceptor/middleware/endpoint_resolver_test.go |
Adds unit tests for streaming callback behavior and request-body detection. |
operator/apis/http/v1beta1/interceptorroute_types.go |
Introduces StreamingCallbackSpec and relaxes coldStart validation. |
operator/apis/http/v1beta1/zz_generated.deepcopy.go |
Generated DeepCopy updates for the new v1beta1 types. |
operator/apis/http/v1alpha1/httpscaledobject_types.go |
Adds ColdStartStreamingCallbackSpec and a new HTTPSO spec field. |
operator/apis/http/v1alpha1/zz_generated.deepcopy.go |
Generated DeepCopy updates for the new v1alpha1 types. |
pkg/routing/table.go |
Converts HTTPSO coldStartStreamingCallback into IR coldStart.streamingCallback during refresh. |
config/crd/bases/http.keda.sh_interceptorroutes.yaml |
CRD schema update for streamingCallback and relaxed coldStart validation. |
config/crd/bases/http.keda.sh_httpscaledobjects.yaml |
CRD schema update for coldStartStreamingCallback. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| nextCalled = true | ||
| w.WriteHeader(http.StatusOK) | ||
| _, _ = w.Write([]byte("data: {\"id\":\"real\",\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hello\"}}]}\n\n")) | ||
| }) | ||
|
|
||
| ir := defaultIR() | ||
| ir.Spec.ColdStart = &httpv1beta1.ColdStartSpec{ | ||
| StreamingCallback: &httpv1beta1.StreamingCallbackSpec{ | ||
| Message: "Model is waking up...", | ||
| Interval: metav1.Duration{Duration: 100 * time.Millisecond}, | ||
| }, | ||
| } | ||
|
|
||
| mw := NewEndpointResolver(next, cache, EndpointResolverConfig{ | ||
| ReadinessTimeout: 5 * time.Second, | ||
| EnableColdStartHeader: true, | ||
| }) | ||
|
|
||
| rec := httptest.NewRecorder() | ||
| req := newStreamingRequest(t, ir, `{"stream":true,"messages":[]}`) | ||
|
|
||
| // Mark ready after a short delay to simulate cold start. | ||
| go func() { | ||
| time.Sleep(150 * time.Millisecond) | ||
| addReadyEndpoint(cache) | ||
| }() | ||
|
|
||
| mw.ServeHTTP(rec, req) | ||
|
|
||
| if !nextCalled { | ||
| t.Fatal("expected next handler to be called after cold start") | ||
| } | ||
| if got, want := rec.Header().Get("Content-Type"), "text/event-stream"; got != want { | ||
| t.Fatalf("Content-Type = %q, want %q", got, want) | ||
| } | ||
| body := rec.Body.String() | ||
| if !strings.Contains(body, "Model is waking up...") { | ||
| t.Fatalf("response body should contain callback message, got:\n%s", body) | ||
| } | ||
| if !strings.Contains(body, `"id":"real"`) { | ||
| t.Fatalf("response body should contain upstream response, got:\n%s", body) | ||
| } |
There was a problem hiding this comment.
These tests rely on real time.Sleep with tight deadlines/intervals; this can be flaky under load/CI scheduling. The file already uses testing/synctest for deterministic time—consider wrapping the streaming-callback tests in synctest.Test and advancing the fake clock instead of sleeping.
| next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
| nextCalled = true | |
| w.WriteHeader(http.StatusOK) | |
| _, _ = w.Write([]byte("data: {\"id\":\"real\",\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hello\"}}]}\n\n")) | |
| }) | |
| ir := defaultIR() | |
| ir.Spec.ColdStart = &httpv1beta1.ColdStartSpec{ | |
| StreamingCallback: &httpv1beta1.StreamingCallbackSpec{ | |
| Message: "Model is waking up...", | |
| Interval: metav1.Duration{Duration: 100 * time.Millisecond}, | |
| }, | |
| } | |
| mw := NewEndpointResolver(next, cache, EndpointResolverConfig{ | |
| ReadinessTimeout: 5 * time.Second, | |
| EnableColdStartHeader: true, | |
| }) | |
| rec := httptest.NewRecorder() | |
| req := newStreamingRequest(t, ir, `{"stream":true,"messages":[]}`) | |
| // Mark ready after a short delay to simulate cold start. | |
| go func() { | |
| time.Sleep(150 * time.Millisecond) | |
| addReadyEndpoint(cache) | |
| }() | |
| mw.ServeHTTP(rec, req) | |
| if !nextCalled { | |
| t.Fatal("expected next handler to be called after cold start") | |
| } | |
| if got, want := rec.Header().Get("Content-Type"), "text/event-stream"; got != want { | |
| t.Fatalf("Content-Type = %q, want %q", got, want) | |
| } | |
| body := rec.Body.String() | |
| if !strings.Contains(body, "Model is waking up...") { | |
| t.Fatalf("response body should contain callback message, got:\n%s", body) | |
| } | |
| if !strings.Contains(body, `"id":"real"`) { | |
| t.Fatalf("response body should contain upstream response, got:\n%s", body) | |
| } | |
| synctest.Test(t, func(t *testing.T) { | |
| next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
| nextCalled = true | |
| w.WriteHeader(http.StatusOK) | |
| _, _ = w.Write([]byte("data: {\"id\":\"real\",\"object\":\"chat.completion.chunk\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hello\"}}]}\n\n")) | |
| }) | |
| ir := defaultIR() | |
| ir.Spec.ColdStart = &httpv1beta1.ColdStartSpec{ | |
| StreamingCallback: &httpv1beta1.StreamingCallbackSpec{ | |
| Message: "Model is waking up...", | |
| Interval: metav1.Duration{Duration: 100 * time.Millisecond}, | |
| }, | |
| } | |
| mw := NewEndpointResolver(next, cache, EndpointResolverConfig{ | |
| ReadinessTimeout: 5 * time.Second, | |
| EnableColdStartHeader: true, | |
| }) | |
| rec := httptest.NewRecorder() | |
| req := newStreamingRequest(t, ir, `{"stream":true,"messages":[]}`) | |
| // Mark ready after a short delay to simulate cold start. | |
| go func() { | |
| time.Sleep(150 * time.Millisecond) | |
| addReadyEndpoint(cache) | |
| }() | |
| mw.ServeHTTP(rec, req) | |
| if !nextCalled { | |
| t.Fatal("expected next handler to be called after cold start") | |
| } | |
| if got, want := rec.Header().Get("Content-Type"), "text/event-stream"; got != want { | |
| t.Fatalf("Content-Type = %q, want %q", got, want) | |
| } | |
| body := rec.Body.String() | |
| if !strings.Contains(body, "Model is waking up...") { | |
| t.Fatalf("response body should contain callback message, got:\n%s", body) | |
| } | |
| if !strings.Contains(body, `"id":"real"`) { | |
| t.Fatalf("response body should contain upstream response, got:\n%s", body) | |
| } | |
| }) |
| // ColdStartStreamingCallbackSpec configures SSE streaming callback messages | ||
| // sent to clients during cold starts to keep the connection alive. | ||
| type ColdStartStreamingCallbackSpec struct { | ||
| // The message to send as SSE event content during cold start |
There was a problem hiding this comment.
ColdStartStreamingCallbackSpec.Message is required but has no MinLength validation, so an empty string is still accepted and results in empty SSE content. Consider adding +kubebuilder:validation:MinLength=1 (matching v1beta1 StreamingCallbackSpec) to prevent misconfiguration.
| // The message to send as SSE event content during cold start | |
| // The message to send as SSE event content during cold start | |
| // +kubebuilder:validation:MinLength=1 |
| } | ||
|
|
||
| if er.cfg.EnableColdStartHeader { | ||
| w.Header().Set(kedahttp.HeaderColdStart, strconv.FormatBool(isColdStart)) |
There was a problem hiding this comment.
In the streaming-callback path the response is already committed with WriteHeader(200), so setting X-KEDA-HTTP-Cold-Start here will not reach real clients (only httptest.ResponseRecorder). If this header is intended for clients, set it before WriteHeader (e.g., set it to "true" up-front for this path) or expose the value via an SSE event/trailer instead.
| w.Header().Set(kedahttp.HeaderColdStart, strconv.FormatBool(isColdStart)) | |
| // The streaming callback response has already been committed, so a | |
| // normal response header set here would not reach real clients. | |
| // Expose the cold-start value through the SSE stream instead. | |
| if err := writeSSEEvent(w, fmt.Sprintf("%s=%s", kedahttp.HeaderColdStart, strconv.FormatBool(isColdStart))); err != nil { | |
| logger.Error(err, "failed to write cold start streaming callback event") | |
| } else { | |
| _ = rc.Flush() | |
| } |
| func (w *headerSuppressingWriter) WriteHeader(code int) { | ||
| if w.headerWritten { | ||
| return | ||
| } | ||
| w.headerWritten = true |
There was a problem hiding this comment.
headerSuppressingWriter currently drops all subsequent WriteHeader calls, which means any non-200 status from the upstream proxy (4xx/5xx) will be masked as 200 once SSE headers are committed. Consider capturing the upstream status and translating it into an SSE error/[DONE] (and aborting proxying), or restrict suppression to redundant 200-only calls and log/emit an SSE error for other codes.
| body, err := io.ReadAll(r.Body) | ||
| r.Body = io.NopCloser(bytes.NewReader(body)) | ||
| if err != nil { | ||
| return false, err | ||
| } |
There was a problem hiding this comment.
isStreamingRequest buffers the entire request body via io.ReadAll, which can be very large for chat/completions and changes request handling from streaming to full-buffering (memory/latency impact) even though you only need to detect the "stream" flag. Consider bounding the read (e.g., MaxBytesReader / Content-Length guard) and/or using a streaming JSON tokenizer/tee approach so large requests don’t require full buffering just to detect streaming.
|
Hi @guimou , thanks for the PR! |
|
Hey @linkvt ! |
Summary
When serving LLM inference workloads behind KEDA HTTP Add-on, scaling from zero can take a significant amount of time (30s–several minutes) while the model loads into GPU memory. During this cold start, streaming clients (e.g. chat UIs, CLI tools using the OpenAI-compatible API) hold an open connection with no feedback, leading to:
This PR adds an optional SSE streaming callback to the interceptor's cold-start path. When configured and the incoming request is a streaming request (
"stream": truein the JSON body), the interceptor sends OpenAI-compatiblechat.completion.chunkSSE keepalive events to the client while waiting for the backend to become ready. Once the backend is available, it inserts a visual separator and proxies the real upstream response through the same SSE stream.Configuration
coldStartStreamingCallbackfield withmessage,intervalSeconds, andkeepaliveMessage.streamingCallbackfield insidecoldStartwithmessage,interval, andkeepaliveMessage.coldStartvalidation is relaxed from requiringfallbackto requiring at least one offallbackorstreamingCallback.Example output (CLI client)
Changes
interceptor/middleware/endpoint_resolver.go): newserveStreamingCallbackmethod,isStreamingRequestbody parser,writeSSEEventSSE formatter, andheaderSuppressingWriterto avoid duplicateWriteHeaderfrom the upstream proxy.ColdStartStreamingCallbackSpec(v1alpha1),StreamingCallbackSpec(v1beta1), plus generated DeepCopy methods.controller-gen.pkg/routing/table.go): converts v1alpha1coldStartStreamingCallbackto v1beta1streamingCallbackduringrefreshMemory.Test plan
make testpasses (unit tests including new streaming callback tests)make lintpassesstreamingCallbackis not configured, behavior is unchanged