Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions pkg/controller/admissionchecks/multikueue/multikueuecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,28 @@ const (
// this set will provide waiting time between 0 to 5m20s
retryIncrement = 5 * time.Second
retryMaxSteps = 7

// Bounds how long startWatcher waits for client.Watch() to return,
// so a hung remote cannot head-of-line block the reconciler. See #11206.
//
// The bound has to cover the slowest legitimate case, not just the common
// one. client.Watch() returns as soon as the apiserver sends HTTP 200
// headers, but the apiserver can delay sending those headers while it
// warms its watch cache. When the served version differs from the storage
// version and a conversion webhook is in play, that warmup serializes a
// per-object conversion and has been observed to take ~8 min at ~50k
// Workloads (kubernetes/kubernetes#136950). 10 min leaves headroom above
// the documented worst case while still ringing the alarm on a remote
// that is truly hung. Today Kueue's served and storage versions match
// (v1beta2), so the warmup path is not exercised, but we keep the
// generous bound as a guard against future version skew and unknown
// apiserver behavior.
defaultWatchEstablishTimeout = 10 * time.Minute
)

var (
watchEstablishTimeout = defaultWatchEstablishTimeout
errWatchEstablishTimeout = errors.New("watch establishment timed out")
)

// retryAfter returns an exponentially increasing interval between
Expand Down Expand Up @@ -226,12 +248,58 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, config *clientConfig
return nil, nil
}

// cancelOnStopWatcher carries the establishment context's cancel func so it
// runs when Stop is called, satisfying govet's lostcancel check without
// shortening the watch's stream lifetime on the success path.
type cancelOnStopWatcher struct {
watch.Interface
cancel context.CancelFunc
}

func (cw *cancelOnStopWatcher) Stop() {
cw.Interface.Stop()
cw.cancel()
}

// establishWatch opens a MultiKueue remote watch, bounded by
// watchEstablishTimeout. On timeout the in-flight Watch is canceled and
// errWatchEstablishTimeout is returned so the caller falls back to the
// standard failedConnAttempts / retryAfter backoff in setConfig.
func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectList, origin string) (watch.Interface, error) {
type result struct {
w watch.Interface
err error
}
resultCh := make(chan result, 1)
establishCtx, cancel := context.WithCancel(ctx)

go func() {
w, err := c.Watch(establishCtx, obj,
client.MatchingLabels{kueue.MultiKueueOriginLabel: origin},
&client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}},
)
resultCh <- result{w: w, err: err}
}()

select {
case r := <-resultCh:
if r.err != nil {
cancel()
return nil, r.err
}
return &cancelOnStopWatcher{Interface: r.w, cancel: cancel}, nil
case <-time.After(watchEstablishTimeout):
cancel()
if r := <-resultCh; r.w != nil {
r.w.Stop()
}
return nil, errWatchEstablishTimeout
}
}

func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w jobframework.MultiKueueWatcher) error {
log := ctrl.LoggerFrom(ctx).WithValues("watchKind", kind)
newWatcher, err := rc.client.Watch(ctx, w.GetEmptyList(),
client.MatchingLabels{kueue.MultiKueueOriginLabel: rc.origin},
&client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}},
)
newWatcher, err := establishWatch(ctx, rc.client, w.GetEmptyList(), rc.origin)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1159,3 +1159,87 @@ func TestClustersReconcilerEventFilters(t *testing.T) {
})
}
}

func TestEstablishWatch(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)

prev := watchEstablishTimeout
watchEstablishTimeout = 100 * time.Millisecond
t.Cleanup(func() { watchEstablishTimeout = prev })

errBoom := errors.New("boom")

cases := map[string]struct {
interceptor interceptor.Funcs
wantErr error
maxElapsed time.Duration
}{
"hung Watch times out": {
interceptor: interceptor.Funcs{
Watch: func(ctx context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
<-ctx.Done()
return nil, ctx.Err()
},
},
wantErr: errWatchEstablishTimeout,
maxElapsed: 10 * watchEstablishTimeout,
},
"Watch error propagates": {
interceptor: interceptor.Funcs{
Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
return nil, errBoom
},
},
wantErr: errBoom,
},
"success returns without waiting": {
maxElapsed: watchEstablishTimeout,
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
c := getClientBuilder(ctx).WithInterceptorFuncs(tc.interceptor).Build()

start := time.Now()
w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin")
elapsed := time.Since(start)

if !errors.Is(err, tc.wantErr) {
t.Fatalf("want err %v, got: %v", tc.wantErr, err)
}
if (err == nil) != (w != nil) {
t.Fatalf("watcher/err mismatch: err=%v, w=%v", err, w)
}
if w != nil {
w.Stop()
}
if tc.maxElapsed > 0 && elapsed >= tc.maxElapsed {
t.Fatalf("took %v, expected < %v", elapsed, tc.maxElapsed)
}
})
}

// Watch races with the timeout: returns a non-nil watcher just after
// time.After fires. Must Stop() it to avoid leaking the stream.
t.Run("racing watcher is stopped on timeout", func(t *testing.T) {
fw := watch.NewFake()
c := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{
Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
time.Sleep(2 * watchEstablishTimeout)
return fw, nil
},
}).Build()

w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin")
if !errors.Is(err, errWatchEstablishTimeout) {
t.Fatalf("want errWatchEstablishTimeout, got: %v", err)
}
if w != nil {
t.Fatalf("want nil watcher, got: %v", w)
}
if !fw.IsStopped() {
t.Fatal("racing watcher was not Stop()ed; would leak")
}
})
}