Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions pkg/controller/admissionchecks/multikueue/multikueuecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,28 @@ const (
// this set will provide waiting time between 0 to 5m20s
retryIncrement = 5 * time.Second
retryMaxSteps = 7

// Bounds how long startWatcher waits for client.Watch() to return,
// so a hung remote cannot head-of-line block the reconciler. See #11206.
Comment on lines +76 to +77
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exploratory question to confirm my understanding: could this issue be also mitigated by increasing the GroupKindConcurrency level for MultiKueueCluster for the controller, wdyt? Code pointer to the configuration: https://github.com/kubernetes-sigs/kueue/blob/main/apis/config/v1beta2/configuration_types.go#L264C2-L264C22
In that case when one Reconcile is stuck the other may continue

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this issue be also mitigated by increasing the GroupKindConcurrency level…

I don't think it actually buys us anything in this code path. The knob is wired through correctly (the controller doesn't override MaxConcurrentReconciles), so in principle you'd get N parallel workers. But Reconcile calls setRemoteClientConfig, which grabs clustersReconciler.lock (a single controller wide write mutex at multikueuecluster.go:394) and holds it through the entire setConfigestablishWatchc.Watch() chain. So when one worker is parked inside c.Watch(), every other Reconcile that wants to do anything with a different cluster sits waiting on that same lock.

In practice concurrency=N collapses back to 1 whenever any remote is hung. Bumping the knob would only help after we also split that lock per cluster, or released it before establishing the watch. Probably a separate PR. Happy to file a follow up issue for it if you want.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thank you for the summary, I now understand why this would not work currently, and why fixing the problem is not straighforward.

Having said that, I think this renders GroupKindConcurrency useless for MultiKueueCluster due to very technical reasons which can be considered a bug by a user.

So, please file an issue for that. Feel free to work on it, or just keep it posted for some contributors.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filed #11297 to track the lock refactor. Leaving it unassigned for now so it's open for any contributor to pick up.

//
// The bound has to cover the slowest legitimate case, not just the common
// one. client.Watch() returns as soon as the apiserver sends HTTP 200
// headers, but the apiserver can delay sending those headers while it
// warms its watch cache. When the served version differs from the storage
// version and a conversion webhook is in play, that warmup serializes a
// per-object conversion and has been observed to take ~8 min at ~50k
// Workloads (kubernetes/kubernetes#136950). 10 min leaves headroom above
// the documented worst case while still ringing the alarm on a remote
// that is truly hung. Today Kueue's served and storage versions match
// (v1beta2), so the warmup path is not exercised, but we keep the
// generous bound as a guard against future version skew and unknown
// apiserver behavior.
defaultWatchEstablishTimeout = 10 * time.Minute
)

var (
watchEstablishTimeout = defaultWatchEstablishTimeout
errWatchEstablishTimeout = errors.New("watch establishment timed out")
)

// retryAfter returns an exponentially increasing interval between
Expand Down Expand Up @@ -226,12 +248,58 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, config *clientConfig
return nil, nil
}

// cancelOnStopWatcher carries the establishment context's cancel func so it
// runs when Stop is called, satisfying govet's lostcancel check without
// shortening the watch's stream lifetime on the success path.
type cancelOnStopWatcher struct {
watch.Interface
cancel context.CancelFunc
}

func (cw *cancelOnStopWatcher) Stop() {
cw.Interface.Stop()
cw.cancel()
}

// establishWatch opens a MultiKueue remote watch, bounded by
// watchEstablishTimeout. On timeout the in-flight Watch is canceled and
// errWatchEstablishTimeout is returned so the caller falls back to the
// standard failedConnAttempts / retryAfter backoff in setConfig.
func establishWatch(ctx context.Context, c client.WithWatch, obj client.ObjectList, origin string) (watch.Interface, error) {
type result struct {
w watch.Interface
err error
}
resultCh := make(chan result, 1)
establishCtx, cancel := context.WithCancel(ctx)

go func() {
w, err := c.Watch(establishCtx, obj,
client.MatchingLabels{kueue.MultiKueueOriginLabel: origin},
&client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}},
)
resultCh <- result{w: w, err: err}
}()

select {
case r := <-resultCh:
if r.err != nil {
cancel()
return nil, r.err
}
return &cancelOnStopWatcher{Interface: r.w, cancel: cancel}, nil
case <-time.After(watchEstablishTimeout):
cancel()
if r := <-resultCh; r.w != nil {
r.w.Stop()
}
return nil, errWatchEstablishTimeout
}
}

func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w jobframework.MultiKueueWatcher) error {
log := ctrl.LoggerFrom(ctx).WithValues("watchKind", kind)
newWatcher, err := rc.client.Watch(ctx, w.GetEmptyList(),
client.MatchingLabels{kueue.MultiKueueOriginLabel: rc.origin},
&client.ListOptions{Raw: &metav1.ListOptions{AllowWatchBookmarks: true}},
)
newWatcher, err := establishWatch(ctx, rc.client, w.GetEmptyList(), rc.origin)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1160,3 +1160,87 @@ func TestClustersReconcilerEventFilters(t *testing.T) {
})
}
}

func TestEstablishWatch(t *testing.T) {
Comment thread
trilamsr marked this conversation as resolved.
ctx, _ := utiltesting.ContextWithLog(t)

prev := watchEstablishTimeout
watchEstablishTimeout = 100 * time.Millisecond
t.Cleanup(func() { watchEstablishTimeout = prev })

errBoom := errors.New("boom")

cases := map[string]struct {
interceptor interceptor.Funcs
wantErr error
maxElapsed time.Duration
}{
"hung Watch times out": {
interceptor: interceptor.Funcs{
Watch: func(ctx context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
<-ctx.Done()
return nil, ctx.Err()
},
},
wantErr: errWatchEstablishTimeout,
maxElapsed: 10 * watchEstablishTimeout,
},
"Watch error propagates": {
interceptor: interceptor.Funcs{
Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
return nil, errBoom
},
},
wantErr: errBoom,
},
"success returns without waiting": {
maxElapsed: watchEstablishTimeout,
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
c := getClientBuilder(ctx).WithInterceptorFuncs(tc.interceptor).Build()

start := time.Now()
w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin")
elapsed := time.Since(start)

if !errors.Is(err, tc.wantErr) {
t.Fatalf("want err %v, got: %v", tc.wantErr, err)
}
if (err == nil) != (w != nil) {
t.Fatalf("watcher/err mismatch: err=%v, w=%v", err, w)
}
if w != nil {
w.Stop()
}
if tc.maxElapsed > 0 && elapsed >= tc.maxElapsed {
t.Fatalf("took %v, expected < %v", elapsed, tc.maxElapsed)
}
})
}

// Watch races with the timeout: returns a non-nil watcher just after
// time.After fires. Must Stop() it to avoid leaking the stream.
t.Run("racing watcher is stopped on timeout", func(t *testing.T) {
fw := watch.NewFake()
c := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{
Watch: func(_ context.Context, _ client.WithWatch, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
time.Sleep(2 * watchEstablishTimeout)
return fw, nil
},
}).Build()

w, err := establishWatch(ctx, c, &kueue.WorkloadList{}, "test-origin")
if !errors.Is(err, errWatchEstablishTimeout) {
t.Fatalf("want errWatchEstablishTimeout, got: %v", err)
}
if w != nil {
t.Fatalf("want nil watcher, got: %v", w)
}
if !fw.IsStopped() {
t.Fatal("racing watcher was not Stop()ed; would leak")
}
})
}