Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 15 additions & 22 deletions pkg/controller/admissionchecks/multikueue/multikueuecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/roletracker"
utilwait "sigs.k8s.io/kueue/pkg/util/wait"
)

const (
Expand All @@ -73,27 +74,19 @@ const (
retryIncrement = 5 * time.Second
retryMaxSteps = 7

// Bounds how long startWatcher waits for client.Watch() to return,
// so a hung remote cannot head-of-line block the reconciler. See #11206.
//
// The bound has to cover the slowest legitimate case, not just the common
// one. client.Watch() returns as soon as the apiserver sends HTTP 200
// headers, but the apiserver can delay sending those headers while it
// warms its watch cache. When the served version differs from the storage
// version and a conversion webhook is in play, that warmup serializes a
// per-object conversion and has been observed to take ~8 min at ~50k
// Workloads (kubernetes/kubernetes#136950). 10 min leaves headroom above
// the documented worst case while still ringing the alarm on a remote
// that is truly hung. Today Kueue's served and storage versions match
// (v1beta2), so the warmup path is not exercised, but we keep the
// generous bound as a guard against future version skew and unknown
// apiserver behavior.
defaultWatchEstablishTimeout = 10 * time.Minute
// Bounds how long startWatcher waits for client.Watch(). The schedule
// (1m, 2m, 4m, 8m, 10m, ...) catches hung remotes quickly while the cap
// covers the apiserver cold cache + conversion warmup path documented in
// kubernetes/kubernetes#136950 (~8 min at 50k Workloads). v1beta2 does
// not exercise that path today; the cap is a guard. See #11206.
initialEstablishTimeout = 1 * time.Minute
maxEstablishTimeout = 10 * time.Minute
)

var (
watchEstablishTimeout = defaultWatchEstablishTimeout
errWatchEstablishTimeout = errors.New("watch establishment timed out")

establishBackoff = utilwait.NewBackoff(initialEstablishTimeout, maxEstablishTimeout, 2, 0)
)

// retryAfter returns an exponentially increasing interval between
Expand Down Expand Up @@ -261,11 +254,11 @@ func (cw *cancelOnStopWatcher) Stop() {
cw.cancel()
}

// establishWatch opens a MultiKueue remote watch, bounded by
// watchEstablishTimeout. On timeout the in-flight Watch is canceled and
// establishWatch opens a MultiKueue remote watch, bounded by the given
// timeout. On timeout the in-flight Watch is canceled and
// errWatchEstablishTimeout is returned so the caller falls back to the
// standard failedConnAttempts / retryAfter backoff in setConfig.
func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectList, origin string) (watch.Interface, error) {
func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectList, origin string, timeout time.Duration) (watch.Interface, error) {
type result struct {
w watch.Interface
err error
Expand All @@ -288,7 +281,7 @@ func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectLi
return nil, r.err
}
return &cancelOnStopWatcher{Interface: r.w, cancel: cancel}, nil
case <-time.After(watchEstablishTimeout):
case <-time.After(timeout):
cancel()
if r := <-resultCh; r.w != nil {
r.w.Stop()
Expand All @@ -299,7 +292,7 @@ func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectLi

func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w jobframework.MultiKueueWatcher) error {
log := ctrl.LoggerFrom(ctx).WithValues("watchKind", kind)
newWatcher, err := establishWatch(ctx, rc.client, w.GetEmptyList(), rc.origin)
newWatcher, err := establishWatch(ctx, rc.client, w.GetEmptyList(), rc.origin, establishBackoff.WaitTime(int(rc.failedConnAttempts)+1))
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,10 +1163,7 @@ func TestClustersReconcilerEventFilters(t *testing.T) {
func TestEstablishWatch(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)

prev := watchEstablishTimeout
watchEstablishTimeout = 100 * time.Millisecond
t.Cleanup(func() { watchEstablishTimeout = prev })

const testTimeout = 100 * time.Millisecond
errBoom := errors.New("boom")

cases := map[string]struct {
Expand All @@ -1182,7 +1179,7 @@ func TestEstablishWatch(t *testing.T) {
},
},
wantErr: errWatchEstablishTimeout,
maxElapsed: 10 * watchEstablishTimeout,
maxElapsed: 10 * testTimeout,
},
"Watch error propagates": {
interceptor: interceptor.Funcs{
Expand All @@ -1193,7 +1190,7 @@ func TestEstablishWatch(t *testing.T) {
wantErr: errBoom,
},
"success returns without waiting": {
maxElapsed: watchEstablishTimeout,
maxElapsed: testTimeout,
},
}

Expand All @@ -1202,7 +1199,7 @@ func TestEstablishWatch(t *testing.T) {
c := getClientBuilder(ctx).WithInterceptorFuncs(tc.interceptor).Build()

start := time.Now()
w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin")
w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin", testTimeout)
elapsed := time.Since(start)

if !errors.Is(err, tc.wantErr) {
Expand All @@ -1226,12 +1223,12 @@ func TestEstablishWatch(t *testing.T) {
fw := watch.NewFake()
c := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{
Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
time.Sleep(2 * watchEstablishTimeout)
time.Sleep(2 * testTimeout)
return fw, nil
},
}).Build()

w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin")
w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin", testTimeout)
if !errors.Is(err, errWatchEstablishTimeout) {
t.Fatalf("want errWatchEstablishTimeout, got: %v", err)
}
Expand All @@ -1243,3 +1240,29 @@ func TestEstablishWatch(t *testing.T) {
}
})
}

// Pins the schedule produced by establishBackoff: 1m, 2m, 4m, 8m, 10m, 10m, ...
// The helper itself is tested in pkg/util/wait; this is a guard against
// accidental changes to the initial/cap/factor wiring.
func TestEstablishBackoffSchedule(t *testing.T) {
cases := map[string]struct {
failedAttempts uint
want time.Duration
}{
"first attempt is initial": {failedAttempts: 0, want: 1 * time.Minute},
"one failure doubles": {failedAttempts: 1, want: 2 * time.Minute},
"two failures": {failedAttempts: 2, want: 4 * time.Minute},
"three failures": {failedAttempts: 3, want: 8 * time.Minute},
"four failures hits cap": {failedAttempts: 4, want: maxEstablishTimeout},
"many failures stay at cap": {failedAttempts: 20, want: maxEstablishTimeout},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
got := establishBackoff.WaitTime(int(tc.failedAttempts) + 1)
if got != tc.want {
t.Fatalf("WaitTime(%d) = %v, want %v", tc.failedAttempts+1, got, tc.want)
}
})
}
}