diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index ee0d36ddd40..4a0ef56677c 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -72,6 +72,28 @@ const ( // this set will provide waiting time between 0 to 5m20s retryIncrement = 5 * time.Second retryMaxSteps = 7 + + // Bounds how long startWatcher waits for client.Watch() to return, + // so a hung remote cannot head-of-line block the reconciler. See #11206. + // + // The bound has to cover the slowest legitimate case, not just the common + // one. client.Watch() returns as soon as the apiserver sends HTTP 200 + // headers, but the apiserver can delay sending those headers while it + // warms its watch cache. When the served version differs from the storage + // version and a conversion webhook is in play, that warmup serializes a + // per-object conversion and has been observed to take ~8 min at ~50k + // Workloads (kubernetes/kubernetes#136950). 10 min leaves headroom above + // the documented worst case while still ringing the alarm on a remote + // that is truly hung. Today Kueue's served and storage versions match + // (v1beta2), so the warmup path is not exercised, but we keep the + // generous bound as a guard against future version skew and unknown + // apiserver behavior. + defaultWatchEstablishTimeout = 10 * time.Minute +) + +var ( + watchEstablishTimeout = defaultWatchEstablishTimeout + errWatchEstablishTimeout = errors.New("watch establishment timed out") ) // retryAfter returns an exponentially increasing interval between @@ -226,12 +248,58 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, config *clientConfig return nil, nil } +// cancelOnStopWatcher carries the establishment context's cancel func so it +// runs when Stop is called, satisfying govet's lostcancel check without +// shortening the watch's stream lifetime on the success path. +type cancelOnStopWatcher struct { + watch.Interface + cancel context.CancelFunc +} + +func (cw *cancelOnStopWatcher) Stop() { + cw.Interface.Stop() + cw.cancel() +} + +// establishWatch opens a MultiKueue remote watch, bounded by +// watchEstablishTimeout. On timeout the in-flight Watch is canceled and +// errWatchEstablishTimeout is returned so the caller falls back to the +// standard failedConnAttempts / retryAfter backoff in setConfig. +func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectList, origin string) (watch.Interface, error) { + type result struct { + w watch.Interface + err error + } + resultCh := make(chan result, 1) + establishCtx, cancel := context.WithCancel(ctx) + + go func() { + w, err := c.Watch(establishCtx, obj, + client.MatchingLabels{kueue.MultiKueueOriginLabel: origin}, + &client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}}, + ) + resultCh <- result{w: w, err: err} + }() + + select { + case r := <-resultCh: + if r.err != nil { + cancel() + return nil, r.err + } + return &cancelOnStopWatcher{Interface: r.w, cancel: cancel}, nil + case <-time.After(watchEstablishTimeout): + cancel() + if r := <-resultCh; r.w != nil { + r.w.Stop() + } + return nil, errWatchEstablishTimeout + } +} + func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w jobframework.MultiKueueWatcher) error { log := ctrl.LoggerFrom(ctx).WithValues("watchKind", kind) - newWatcher, err := rc.client.Watch(ctx, w.GetEmptyList(), - client.MatchingLabels{kueue.MultiKueueOriginLabel: rc.origin}, - &client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}}, - ) + newWatcher, err := establishWatch(ctx, rc.client, w.GetEmptyList(), rc.origin) if err != nil { return err } diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index 52fc53a8233..1fb31c7f864 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -1160,3 +1160,87 @@ func TestClustersReconcilerEventFilters(t *testing.T) { }) } } + +func TestEstablishWatch(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + + prev := watchEstablishTimeout + watchEstablishTimeout = 100 * time.Millisecond + t.Cleanup(func() { watchEstablishTimeout = prev }) + + errBoom := errors.New("boom") + + cases := map[string]struct { + interceptor interceptor.Funcs + wantErr error + maxElapsed time.Duration + }{ + "hung Watch times out": { + interceptor: interceptor.Funcs{ + Watch: func(ctx context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + <-ctx.Done() + return nil, ctx.Err() + }, + }, + wantErr: errWatchEstablishTimeout, + maxElapsed: 10 * watchEstablishTimeout, + }, + "Watch error propagates": { + interceptor: interceptor.Funcs{ + Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + return nil, errBoom + }, + }, + wantErr: errBoom, + }, + "success returns without waiting": { + maxElapsed: watchEstablishTimeout, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + c := getClientBuilder(ctx).WithInterceptorFuncs(tc.interceptor).Build() + + start := time.Now() + w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin") + elapsed := time.Since(start) + + if !errors.Is(err, tc.wantErr) { + t.Fatalf("want err %v, got: %v", tc.wantErr, err) + } + if (err == nil) != (w != nil) { + t.Fatalf("watcher/err mismatch: err=%v, w=%v", err, w) + } + if w != nil { + w.Stop() + } + if tc.maxElapsed > 0 && elapsed >= tc.maxElapsed { + t.Fatalf("took %v, expected < %v", elapsed, tc.maxElapsed) + } + }) + } + + // Watch races with the timeout: returns a non-nil watcher just after + // time.After fires. Must Stop() it to avoid leaking the stream. + t.Run("racing watcher is stopped on timeout", func(t *testing.T) { + fw := watch.NewFake() + c := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ + Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + time.Sleep(2 * watchEstablishTimeout) + return fw, nil + }, + }).Build() + + w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin") + if !errors.Is(err, errWatchEstablishTimeout) { + t.Fatalf("want errWatchEstablishTimeout, got: %v", err) + } + if w != nil { + t.Fatalf("want nil watcher, got: %v", w) + } + if !fw.IsStopped() { + t.Fatal("racing watcher was not Stop()ed; would leak") + } + }) +}