Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions config/crd/bases/http.keda.sh_httpscaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,27 @@ spec:
spec:
description: HTTPScaledObjectSpec defines the desired state of HTTPScaledObject
properties:
coldStartStreamingCallback:
description: (optional) Configuration for streaming SSE callback messages
during cold starts
properties:
intervalSeconds:
default: 5
description: How often to send keepalive events in seconds (Default
5)
format: int32
type: integer
keepaliveMessage:
description: The message to send as SSE event content for keepalive
events (Default "")
type: string
message:
description: The message to send as SSE event content during cold
start
type: string
required:
- message
type: object
coldStartTimeoutFailoverRef:
description: (optional) The name of the failover service to route
HTTP requests to when the target is not available
Expand Down
28 changes: 24 additions & 4 deletions config/crd/bases/http.keda.sh_interceptorroutes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,32 @@ spec:
x-kubernetes-validations:
- message: exactly one of 'port' or 'portName' must be set
rule: has(self.port) != has(self.portName)
required:
- fallback
streamingCallback:
description: Streaming callback to send SSE events while scaling
from zero.
properties:
interval:
default: 5s
description: Interval between keepalive events.
type: string
keepaliveMessage:
description: |-
Message text to include in SSE keepalive event delta content.
When empty, keepalive events contain an empty string.
type: string
message:
description: Message text to include in the SSE event delta
content.
minLength: 1
type: string
required:
- message
type: object
type: object
x-kubernetes-validations:
- message: '''fallback'' must be set'
rule: has(self.fallback)
- message: at least one of 'fallback' or 'streamingCallback' must
be set
rule: has(self.fallback) || has(self.streamingCallback)
rules:
description: Routing rules that define how requests are matched to
this target.
Expand Down
176 changes: 176 additions & 0 deletions interceptor/middleware/endpoint_resolver.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package middleware

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"time"

"github.com/kedacore/http-add-on/interceptor/handler"
httpv1beta1 "github.com/kedacore/http-add-on/operator/apis/http/v1beta1"
kedahttp "github.com/kedacore/http-add-on/pkg/http"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/util"
Expand Down Expand Up @@ -64,6 +68,22 @@ func (er *EndpointResolver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

serviceKey := ir.Namespace + "/" + ir.Spec.Target.Service

// Streaming callback: if the route has a StreamingCallback configured and
// the backend is not ready, check if this is a streaming request. If so,
// send SSE keepalive events while waiting for the backend.
hasStreamingCallback := ir.Spec.ColdStart != nil && ir.Spec.ColdStart.StreamingCallback != nil
if hasStreamingCallback && !er.readyCache.HasReadyEndpoints(serviceKey) {
streaming, err := isStreamingRequest(r)
if err != nil {
util.LoggerFromContext(ctx).Error(err, "failed to check streaming request")
}
if err == nil && streaming {
er.serveStreamingCallback(waitCtx, ctx, w, r, serviceKey, ir)
return
}
}

isColdStart, err := er.readyCache.WaitForReady(waitCtx, serviceKey)
if err != nil {
// No fallback, return an error
Expand Down Expand Up @@ -100,3 +120,159 @@ func (er *EndpointResolver) ServeHTTP(w http.ResponseWriter, r *http.Request) {

er.next.ServeHTTP(w, r)
}

// serveStreamingCallback handles cold-start waits for streaming requests by
// sending OpenAI-compatible SSE keepalive events until the backend is ready.
func (er *EndpointResolver) serveStreamingCallback(
waitCtx, parentCtx context.Context,
w http.ResponseWriter,
r *http.Request,
serviceKey string,
ir *httpv1beta1.InterceptorRoute,
) {
logger := util.LoggerFromContext(parentCtx)
cb := ir.Spec.ColdStart.StreamingCallback

rc := http.NewResponseController(w)

// Write SSE headers — commits to a 200 response.
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)

// Send initial loading message.
if err := writeSSEEvent(w, cb.Message); err != nil {
logger.Error(err, "failed to write initial streaming callback event")
return
}
if err := rc.Flush(); err != nil {
logger.Error(err, "failed to flush initial streaming callback event")
return
}

interval := cb.Interval.Duration
if interval <= 0 {
interval = 5 * time.Second
}

// Start keepalive goroutine.
callbackDone := make(chan struct{})
callbackStopped := make(chan struct{})
go func() {
defer close(callbackStopped)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := writeSSEEvent(w, cb.KeepaliveMessage); err != nil {
logger.Error(err, "failed to write keepalive streaming callback event")
return
}
if err := rc.Flush(); err != nil {
logger.Error(err, "failed to flush keepalive streaming callback event")
return
}
case <-callbackDone:
return
}
}
}()

// Wait for the backend to become ready.
isColdStart, err := er.readyCache.WaitForReady(waitCtx, serviceKey)
close(callbackDone)
<-callbackStopped // ensure goroutine exits before touching w again

if err != nil {
// Already committed to 200, so send an SSE error event instead of HTTP error.
logger.Error(err, "backend not ready during streaming callback")
errMsg := fmt.Sprintf("Backend did not become ready: %v", err)
_ = writeSSEEvent(w, errMsg)
_, _ = fmt.Fprintf(w, "data: [DONE]\n\n")
_ = rc.Flush()
return
}

if er.cfg.EnableColdStartHeader {
w.Header().Set(kedahttp.HeaderColdStart, strconv.FormatBool(isColdStart))
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the streaming-callback path the response is already committed with WriteHeader(200), so setting X-KEDA-HTTP-Cold-Start here will not reach real clients (only httptest.ResponseRecorder). If this header is intended for clients, set it before WriteHeader (e.g., set it to "true" up-front for this path) or expose the value via an SSE event/trailer instead.

Suggested change
w.Header().Set(kedahttp.HeaderColdStart, strconv.FormatBool(isColdStart))
// The streaming callback response has already been committed, so a
// normal response header set here would not reach real clients.
// Expose the cold-start value through the SSE stream instead.
if err := writeSSEEvent(w, fmt.Sprintf("%s=%s", kedahttp.HeaderColdStart, strconv.FormatBool(isColdStart))); err != nil {
logger.Error(err, "failed to write cold start streaming callback event")
} else {
_ = rc.Flush()
}

Copilot uses AI. Check for mistakes.
}

// Send a visual separator so the real model response starts on a fresh
// line, rather than being appended directly after the keepalive dots.
// The content ends with a zero-width space (U+200B) after the final
// newline because common shell-based clients use bash command
// substitution which strips trailing newlines. The zero-width space
// anchors the newline so it survives extraction while remaining
// invisible in the terminal output.
if err := writeSSEEvent(w, "\n\n---\n\u200B"); err != nil {
logger.Error(err, "failed to write separator streaming callback event")
} else {
_ = rc.Flush()
}

// Wrap the writer to suppress duplicate WriteHeader from the upstream proxy.
er.next.ServeHTTP(&headerSuppressingWriter{
ResponseWriter: w,
headerWritten: true,
}, r)
}

// isStreamingRequest checks whether the JSON request body contains "stream": true.
// It restores the body so subsequent handlers can re-read it.
func isStreamingRequest(r *http.Request) (bool, error) {
if r.Body == nil {
return false, nil
}
body, err := io.ReadAll(r.Body)
r.Body = io.NopCloser(bytes.NewReader(body))
if err != nil {
return false, err
}
Comment on lines +229 to +233
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isStreamingRequest buffers the entire request body via io.ReadAll, which can be very large for chat/completions and changes request handling from streaming to full-buffering (memory/latency impact) even though you only need to detect the "stream" flag. Consider bounding the read (e.g., MaxBytesReader / Content-Length guard) and/or using a streaming JSON tokenizer/tee approach so large requests don’t require full buffering just to detect streaming.

Copilot uses AI. Check for mistakes.
if len(body) == 0 {
return false, nil
}
var payload struct {
Stream bool `json:"stream"`
}
if err := json.Unmarshal(body, &payload); err != nil {
return false, nil //nolint:nilerr // non-JSON body is not an error
}
return payload.Stream, nil
}

// writeSSEEvent writes a single OpenAI-compatible chat.completion.chunk SSE event.
func writeSSEEvent(w http.ResponseWriter, content string) error {
contentJSON, _ := json.Marshal(content)
_, err := fmt.Fprintf(w,
"data: {\"id\":\"keda-cold-start\",\"object\":\"chat.completion.chunk\",\"created\":%d,\"model\":\"system\",\"choices\":[{\"index\":0,\"delta\":{\"content\":%s},\"finish_reason\":null}]}\n\n",
time.Now().Unix(),
contentJSON,
)
return err
}

// headerSuppressingWriter wraps a ResponseWriter and silently ignores
// WriteHeader calls after the first one. This prevents the upstream
// reverse proxy from logging "superfluous response.WriteHeader call"
// when we have already committed to a 200 for SSE streaming.
type headerSuppressingWriter struct {
http.ResponseWriter
headerWritten bool
}

func (w *headerSuppressingWriter) WriteHeader(code int) {
if w.headerWritten {
return
}
w.headerWritten = true
Comment on lines +266 to +270
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

headerSuppressingWriter currently drops all subsequent WriteHeader calls, which means any non-200 status from the upstream proxy (4xx/5xx) will be masked as 200 once SSE headers are committed. Consider capturing the upstream status and translating it into an SSE error/[DONE] (and aborting proxying), or restrict suppression to redundant 200-only calls and log/emit an SSE error for other codes.

Copilot uses AI. Check for mistakes.
w.ResponseWriter.WriteHeader(code)
}

// Unwrap exposes the underlying ResponseWriter so that
// http.NewResponseController can find optional interfaces (Flusher, Hijacker, etc.).
func (w *headerSuppressingWriter) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}
Loading
Loading