From 8ae317ca4de971c975fe2dbc4d12ec530b383923 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Thu, 14 May 2026 17:54:23 -0700 Subject: [PATCH 1/4] Bound establishment phase of MultiKueue remote watches A hung client.Watch() against one remote MultiKueueCluster previously blocked the single multikueuecluster reconciler worker indefinitely, preventing every other cluster behind it from being reconciled. Those clusters keep remoteClient.connecting=true, the dispatcher then excludes them as inactive, and admission stops cluster-wide. Wrap the Watch establishment in a timeout-bounded helper. On timeout the in-flight Watch is canceled and an error is returned, so the existing failedConnAttempts / retryAfter backoff runs. Stream lifetime on the success path is unchanged: the returned watcher continues to use a context derived from the caller's ctx, and its cancel is owned by the watcher Stop method (no leak). Signed-off-by: Tri Lam --- .../multikueue/multikueuecluster.go | 61 +++++++++++++++-- .../multikueue/multikueuecluster_test.go | 65 +++++++++++++++++++ 2 files changed, 122 insertions(+), 4 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index d970bed35f0..f8a429d93d1 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -72,6 +72,15 @@ const ( // this set will provide waiting time between 0 to 5m20s retryIncrement = 5 * time.Second retryMaxSteps = 7 + + // Bounds how long startWatcher waits for client.Watch() to return, + // so a hung remote cannot head-of-line block the reconciler. See #11206. + defaultWatchEstablishTimeout = 60 * time.Second +) + +var ( + watchEstablishTimeout = defaultWatchEstablishTimeout + errWatchEstablishTimeout = errors.New("watch establishment timed out") ) // retryAfter returns an exponentially increasing interval between @@ -226,12 +235,56 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, config *clientConfig return nil, nil } +// cancelOnStopWatcher carries the establishment context's cancel func so it +// runs when Stop is called, satisfying govet's lostcancel check without +// shortening the watch's stream lifetime on the success path. +type cancelOnStopWatcher struct { + watch.Interface + cancel context.CancelFunc +} + +func (cw *cancelOnStopWatcher) Stop() { + cw.Interface.Stop() + cw.cancel() +} + +// establishWatch opens a MultiKueue remote watch, bounded by +// watchEstablishTimeout. On timeout the in-flight Watch is canceled and +// errWatchEstablishTimeout is returned so the caller falls back to the +// standard failedConnAttempts / retryAfter backoff in setConfig. +func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectList, origin string) (watch.Interface, error) { + type result struct { + w watch.Interface + err error + } + resultCh := make(chan result, 1) + establishCtx, cancel := context.WithCancel(ctx) + + go func() { + w, err := c.Watch(establishCtx, obj, + client.MatchingLabels{kueue.MultiKueueOriginLabel: origin}, + &client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}}, + ) + resultCh <- result{w: w, err: err} + }() + + select { + case r := <-resultCh: + if r.err != nil { + cancel() + return nil, r.err + } + return &cancelOnStopWatcher{Interface: r.w, cancel: cancel}, nil + case <-time.After(watchEstablishTimeout): + cancel() + <-resultCh + return nil, errWatchEstablishTimeout + } +} + func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w jobframework.MultiKueueWatcher) error { log := ctrl.LoggerFrom(ctx).WithValues("watchKind", kind) - newWatcher, err := rc.client.Watch(ctx, w.GetEmptyList(), - client.MatchingLabels{kueue.MultiKueueOriginLabel: rc.origin}, - &client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}}, - ) + newWatcher, err := establishWatch(ctx, rc.client, w.GetEmptyList(), rc.origin) if err != nil { return err } diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index d1a7a2d82b4..55bfe95c268 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -1159,3 +1159,68 @@ func TestClustersReconcilerEventFilters(t *testing.T) { }) } } + +func TestEstablishWatch(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + + prev := watchEstablishTimeout + watchEstablishTimeout = 100 * time.Millisecond + t.Cleanup(func() { watchEstablishTimeout = prev }) + + t.Run("hung Watch times out", func(t *testing.T) { + hung := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ + Watch: func(ctx context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + <-ctx.Done() + return nil, ctx.Err() + }, + }).Build() + + start := time.Now() + w, err := establishWatch(ctx, hung, &kueue.WorkloadList{}, "test-origin") + elapsed := time.Since(start) + + if !errors.Is(err, errWatchEstablishTimeout) { + t.Fatalf("want errWatchEstablishTimeout, got: %v", err) + } + if w != nil { + t.Fatalf("want nil watcher, got: %v", w) + } + if elapsed > 10*watchEstablishTimeout { + t.Fatalf("took %v, expected < 10x %v", elapsed, watchEstablishTimeout) + } + }) + + t.Run("Watch error propagates", func(t *testing.T) { + wantErr := errors.New("boom") + failing := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ + Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + return nil, wantErr + }, + }).Build() + + w, err := establishWatch(ctx, failing, &kueue.WorkloadList{}, "test-origin") + if !errors.Is(err, wantErr) { + t.Fatalf("want %v, got: %v", wantErr, err) + } + if w != nil { + t.Fatalf("want nil watcher, got: %v", w) + } + }) + + t.Run("success returns without waiting", func(t *testing.T) { + ok := getClientBuilder(ctx).Build() + start := time.Now() + w, err := establishWatch(ctx, ok, &kueue.WorkloadList{}, "test-origin") + elapsed := time.Since(start) + if err != nil { + t.Fatalf("want nil error, got: %v", err) + } + if w == nil { + t.Fatal("want non-nil watcher") + } + w.Stop() + if elapsed >= watchEstablishTimeout { + t.Fatalf("took %v, expected < %v", elapsed, watchEstablishTimeout) + } + }) +} From e47fdf1b5e4dc135edd850ee0f614d6fa1d111eb Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Fri, 15 May 2026 06:24:56 -0700 Subject: [PATCH 2/4] Table-driven TestEstablishWatch Address review feedback: refactor the three subtests into a map-keyed table following the codebase's prevailing test style. Behaviour is unchanged; the per-case interceptor, expected error (matched via errors.Is), and elapsed-time ceiling are uniform. Signed-off-by: Tri Lam --- .../multikueue/multikueuecluster_test.go | 100 +++++++++--------- 1 file changed, 48 insertions(+), 52 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index 55bfe95c268..8ac4994937d 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -1167,60 +1167,56 @@ func TestEstablishWatch(t *testing.T) { watchEstablishTimeout = 100 * time.Millisecond t.Cleanup(func() { watchEstablishTimeout = prev }) - t.Run("hung Watch times out", func(t *testing.T) { - hung := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ - Watch: func(ctx context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { - <-ctx.Done() - return nil, ctx.Err() - }, - }).Build() + errBoom := errors.New("boom") - start := time.Now() - w, err := establishWatch(ctx, hung, &kueue.WorkloadList{}, "test-origin") - elapsed := time.Since(start) + cases := map[string]struct { + interceptor interceptor.Funcs + wantErr error + maxElapsed time.Duration + }{ + "hung Watch times out": { + interceptor: interceptor.Funcs{ + Watch: func(ctx context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + <-ctx.Done() + return nil, ctx.Err() + }, + }, + wantErr: errWatchEstablishTimeout, + maxElapsed: 10 * watchEstablishTimeout, + }, + "Watch error propagates": { + interceptor: interceptor.Funcs{ + Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + return nil, errBoom + }, + }, + wantErr: errBoom, + }, + "success returns without waiting": { + maxElapsed: watchEstablishTimeout, + }, + } - if !errors.Is(err, errWatchEstablishTimeout) { - t.Fatalf("want errWatchEstablishTimeout, got: %v", err) - } - if w != nil { - t.Fatalf("want nil watcher, got: %v", w) - } - if elapsed > 10*watchEstablishTimeout { - t.Fatalf("took %v, expected < 10x %v", elapsed, watchEstablishTimeout) - } - }) + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + c := getClientBuilder(ctx).WithInterceptorFuncs(tc.interceptor).Build() - t.Run("Watch error propagates", func(t *testing.T) { - wantErr := errors.New("boom") - failing := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ - Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { - return nil, wantErr - }, - }).Build() + start := time.Now() + w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin") + elapsed := time.Since(start) - w, err := establishWatch(ctx, failing, &kueue.WorkloadList{}, "test-origin") - if !errors.Is(err, wantErr) { - t.Fatalf("want %v, got: %v", wantErr, err) - } - if w != nil { - t.Fatalf("want nil watcher, got: %v", w) - } - }) - - t.Run("success returns without waiting", func(t *testing.T) { - ok := getClientBuilder(ctx).Build() - start := time.Now() - w, err := establishWatch(ctx, ok, &kueue.WorkloadList{}, "test-origin") - elapsed := time.Since(start) - if err != nil { - t.Fatalf("want nil error, got: %v", err) - } - if w == nil { - t.Fatal("want non-nil watcher") - } - w.Stop() - if elapsed >= watchEstablishTimeout { - t.Fatalf("took %v, expected < %v", elapsed, watchEstablishTimeout) - } - }) + if !errors.Is(err, tc.wantErr) { + t.Fatalf("want err %v, got: %v", tc.wantErr, err) + } + if (err == nil) != (w != nil) { + t.Fatalf("watcher/err mismatch: err=%v, w=%v", err, w) + } + if w != nil { + w.Stop() + } + if tc.maxElapsed > 0 && elapsed >= tc.maxElapsed { + t.Fatalf("took %v, expected < %v", elapsed, tc.maxElapsed) + } + }) + } } From daeafeedb61244327b43869b4ada293197591b24 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Fri, 15 May 2026 06:35:21 -0700 Subject: [PATCH 3/4] Stop racing watcher on establishWatch timeout If c.Watch returns a non-nil watcher in the narrow window between time.After firing and the result-channel drain, the previous code discarded the watcher without calling Stop(). In production the watcher's HTTP stream is bound to establishCtx so cancel() tears it down indirectly, but fake clients used in tests ignore ctx and the watcher would leak. Drain the channel into a local and Stop() any returned watcher. Add a regression test using a sleeping interceptor and watch.NewFake() to assert Stop() was called. Signed-off-by: Tri Lam --- .../multikueue/multikueuecluster.go | 4 +++- .../multikueue/multikueuecluster_test.go | 23 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index f8a429d93d1..156fd489279 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -277,7 +277,9 @@ func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectLi return &cancelOnStopWatcher{Interface: r.w, cancel: cancel}, nil case <-time.After(watchEstablishTimeout): cancel() - <-resultCh + if r := <-resultCh; r.w != nil { + r.w.Stop() + } return nil, errWatchEstablishTimeout } } diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index 8ac4994937d..ac241543aa4 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -1219,4 +1219,27 @@ func TestEstablishWatch(t *testing.T) { } }) } + + // Watch races with the timeout: returns a non-nil watcher just after + // time.After fires. Must Stop() it to avoid leaking the stream. + t.Run("racing watcher is stopped on timeout", func(t *testing.T) { + fw := watch.NewFake() + c := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ + Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + time.Sleep(2 * watchEstablishTimeout) + return fw, nil + }, + }).Build() + + w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin") + if !errors.Is(err, errWatchEstablishTimeout) { + t.Fatalf("want errWatchEstablishTimeout, got: %v", err) + } + if w != nil { + t.Fatalf("want nil watcher, got: %v", w) + } + if !fw.IsStopped() { + t.Fatal("racing watcher was not Stop()ed; would leak") + } + }) } From 531f16f5ece0283e4651b018abd9f7bc0198a00b Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Mon, 18 May 2026 10:26:09 -0700 Subject: [PATCH 4/4] Raise establishWatch timeout default to 10 min 60s would false-trip during apiserver watch-cache cold-start when the served version differs from the storage version and a conversion webhook is in play (kubernetes/kubernetes#136950, observed ~8 min at ~50k Workloads in Kueue 0.15). Expand the constant's doc comment to capture the rationale so future readers don't tighten the bound without understanding the cold-start path. --- .../multikueue/multikueuecluster.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index 156fd489279..70b54c74370 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -75,7 +75,20 @@ const ( // Bounds how long startWatcher waits for client.Watch() to return, // so a hung remote cannot head-of-line block the reconciler. See #11206. - defaultWatchEstablishTimeout = 60 * time.Second + // + // The bound has to cover the slowest legitimate case, not just the common + // one. client.Watch() returns as soon as the apiserver sends HTTP 200 + // headers, but the apiserver can delay sending those headers while it + // warms its watch cache. When the served version differs from the storage + // version and a conversion webhook is in play, that warmup serializes a + // per-object conversion and has been observed to take ~8 min at ~50k + // Workloads (kubernetes/kubernetes#136950). 10 min leaves headroom above + // the documented worst case while still ringing the alarm on a remote + // that is truly hung. Today Kueue's served and storage versions match + // (v1beta2), so the warmup path is not exercised, but we keep the + // generous bound as a guard against future version skew and unknown + // apiserver behavior. + defaultWatchEstablishTimeout = 10 * time.Minute ) var (