From ac60d74a24a2f53fcf7b1922bccb0f2385663b83 Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Mon, 18 May 2026 19:14:01 -0700 Subject: [PATCH 1/3] Narrow clustersReconciler.lock to map ops; per cluster setConfig lock The single controller wide write lock in setRemoteClientConfig used to be held across the synchronous remote watch establishment in setConfig, which can take minutes against an unresponsive or warming apiserver. While that lock is held, every other MultiKueueCluster reconcile that calls setRemoteClientConfig blocks, so one slow remote stalls admission for every other cluster regardless of GroupKindConcurrency. Narrow the controller wide lock to just the remoteClients map find or insert, then serialize setConfig with a per cluster sync.Mutex on remoteClient. Concurrent reconciles for different clusters now run in parallel under their own locks. Same cluster reconciles still serialize through the workqueue dedup contract; the per cluster lock is belt and suspenders. Adds TestSetRemoteClientConfigDoesNotBlockOtherClusters which pins the property: while one cluster is parked inside its remote Watch call, another cluster's setRemoteClientConfig still completes. Refs 11297. --- .../multikueue/multikueuecluster.go | 10 ++- .../multikueue/multikueuecluster_test.go | 74 +++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index 483b6cff630..d9cd594c81f 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -120,6 +120,10 @@ type remoteClient struct { failedConnAttempts uint retryConnNextAttempt metav1.Time + // Held during setConfig. Without it, one stuck remote would stall every + // other cluster's reconcile via clustersReconciler.lock. See #11297. + setConfigLock sync.Mutex + clock clock.Clock // For unit testing only. There is now need of creating fully functional remote clients in the unit tests @@ -490,8 +494,6 @@ func (c *clustersReconciler) stopAndRemoveCluster(clusterName string) { func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, config *clientConfig, origin string) (*time.Duration, error) { c.lock.Lock() - defer c.lock.Unlock() - client, found := c.remoteClients[clusterName] if !found { client = newRemoteClient(c.localClient, c.wlUpdateCh, c.watchEndedCh, origin, clusterName, c.adapters) @@ -500,6 +502,10 @@ func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterN } c.remoteClients[clusterName] = client } + c.lock.Unlock() + + client.setConfigLock.Lock() + defer client.setConfigLock.Unlock() clientLog := ctrl.LoggerFrom(c.rootContext).WithValues("clusterName", clusterName) clientCtx := ctrl.LoggerInto(c.rootContext, clientLog) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index f7880bdd76a..14ef3c03514 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -1267,3 +1268,76 @@ func TestEstablishBackoffSchedule(t *testing.T) { }) } } + +// Regression for #11297. A remote stuck inside Watch must not stop +// reconciles of other clusters. +func TestSetRemoteClientConfigDoesNotBlockOtherClusters(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + + slowReached := make(chan struct{}) + slowRelease := make(chan struct{}) + var releaseOnce sync.Once + releaseSlow := func() { releaseOnce.Do(func() { close(slowRelease) }) } + t.Cleanup(releaseSlow) + + gatedBuilder := func(config *clientConfig, _ client.Options) (client.WithWatch, error) { + kubeconfig := string(config.Kubeconfig) + b := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ + Watch: func(watchCtx context.Context, c client.WithWatch, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) { + if strings.Contains(kubeconfig, "slow-user") { + close(slowReached) + select { + case <-slowRelease: + return nil, errors.New("released") + case <-watchCtx.Done(): + return nil, watchCtx.Err() + } + } + return c.Watch(watchCtx, obj, opts...) + }, + }) + return b.Build(), nil + } + + localClient := getClientBuilder(ctx).Build() + reconciler := newClustersReconciler(localClient, TestNamespace, 0, defaultOrigin, nil, nil, &NoOpClusterProfileCreds{}, nil) + reconciler.rootContext = ctx + reconciler.builderOverride = gatedBuilder + + slowConfig := &clientConfig{Kubeconfig: []byte(testKubeconfig("slow-user"))} + fastConfig := &clientConfig{Kubeconfig: []byte(testKubeconfig("fast-user"))} + + slowDone := make(chan struct{}) + go func() { + defer close(slowDone) + _, _ = reconciler.setRemoteClientConfig(ctx, "cluster-slow", slowConfig, defaultOrigin) + }() + + select { + case <-slowReached: + case <-time.After(2 * time.Second): + t.Fatal("slow cluster's setConfig did not reach the Watch call in time") + } + + fastDone := make(chan error, 1) + go func() { + _, err := reconciler.setRemoteClientConfig(ctx, "cluster-fast", fastConfig, defaultOrigin) + fastDone <- err + }() + + select { + case err := <-fastDone: + if err != nil { + t.Fatalf("cluster-fast setRemoteClientConfig returned err: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("cluster-fast setRemoteClientConfig was blocked by cluster-slow (head-of-line)") + } + + releaseSlow() + select { + case <-slowDone: + case <-time.After(2 * time.Second): + t.Fatal("slow goroutine did not exit after release") + } +} From 1a0607f122005f26f27a447f1b3630a6193777de Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Tue, 19 May 2026 02:56:28 -0700 Subject: [PATCH 2/3] Split out findOrCreateRemoteClient per review Per @mimowo's review, factor the map find or insert out of setRemoteClientConfig so each function uses the simple Lock(); defer Unlock() pattern instead of a manual unlock in the middle. --- .../admissionchecks/multikueue/multikueuecluster.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index d9cd594c81f..f5376917677 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -492,8 +492,12 @@ func (c *clustersReconciler) stopAndRemoveCluster(clusterName string) { } } -func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, config *clientConfig, origin string) (*time.Duration, error) { +// findOrCreateRemoteClient returns the remoteClient for clusterName, creating +// one if absent. Only the brief map operation runs under c.lock. +func (c *clustersReconciler) findOrCreateRemoteClient(clusterName, origin string) *remoteClient { c.lock.Lock() + defer c.lock.Unlock() + client, found := c.remoteClients[clusterName] if !found { client = newRemoteClient(c.localClient, c.wlUpdateCh, c.watchEndedCh, origin, clusterName, c.adapters) @@ -502,7 +506,11 @@ func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterN } c.remoteClients[clusterName] = client } - c.lock.Unlock() + return client +} + +func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, config *clientConfig, origin string) (*time.Duration, error) { + client := c.findOrCreateRemoteClient(clusterName, origin) client.setConfigLock.Lock() defer client.setConfigLock.Unlock() From 4620ba5d814227aa5dd5b7a57d4204aa9fdebc6e Mon Sep 17 00:00:00 2001 From: Tri Lam Date: Tue, 19 May 2026 09:47:14 -0700 Subject: [PATCH 3/3] Drive concurrency test through Reconcile, bump budget to 5s Per @mimowo's review on the test: - use Reconcile as the entry point instead of calling setRemoteClientConfig directly, so the test stays valid if watches ever get triggered from a different path - replace the magic 2s timeouts with a named stuckWatchTimeout = 5s so they survive a loaded CI runner --- .../multikueue/multikueuecluster_test.go | 39 +++++++++++++------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index 14ef3c03514..295e15dc4d8 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -1272,6 +1272,12 @@ func TestEstablishBackoffSchedule(t *testing.T) { // Regression for #11297. A remote stuck inside Watch must not stop // reconciles of other clusters. func TestSetRemoteClientConfigDoesNotBlockOtherClusters(t *testing.T) { + // stuckWatchTimeout is the per-phase budget for the test: how long we + // wait for the slow watch to be reached, for the fast reconcile to + // finish, and for the slow goroutine to clean up after release. Generous + // enough to survive a loaded CI runner. + const stuckWatchTimeout = 5 * time.Second + ctx, _ := utiltesting.ContextWithLog(t) slowReached := make(chan struct{}) @@ -1299,45 +1305,54 @@ func TestSetRemoteClientConfigDoesNotBlockOtherClusters(t *testing.T) { return b.Build(), nil } - localClient := getClientBuilder(ctx).Build() + slowCluster := utiltestingapi.MakeMultiKueueCluster("cluster-slow"). + KubeConfig(kueue.SecretLocationType, "secret-slow").Generation(1).Obj() + fastCluster := utiltestingapi.MakeMultiKueueCluster("cluster-fast"). + KubeConfig(kueue.SecretLocationType, "secret-fast").Generation(1).Obj() + slowSecret := makeTestSecret("secret-slow", testKubeconfig("slow-user")) + fastSecret := makeTestSecret("secret-fast", testKubeconfig("fast-user")) + + localClient := getClientBuilder(ctx). + WithLists(&kueue.MultiKueueClusterList{Items: []kueue.MultiKueueCluster{*slowCluster, *fastCluster}}). + WithLists(&corev1.SecretList{Items: []corev1.Secret{slowSecret, fastSecret}}). + WithStatusSubresource(slowCluster, fastCluster). + Build() + reconciler := newClustersReconciler(localClient, TestNamespace, 0, defaultOrigin, nil, nil, &NoOpClusterProfileCreds{}, nil) reconciler.rootContext = ctx reconciler.builderOverride = gatedBuilder - slowConfig := &clientConfig{Kubeconfig: []byte(testKubeconfig("slow-user"))} - fastConfig := &clientConfig{Kubeconfig: []byte(testKubeconfig("fast-user"))} - slowDone := make(chan struct{}) go func() { defer close(slowDone) - _, _ = reconciler.setRemoteClientConfig(ctx, "cluster-slow", slowConfig, defaultOrigin) + _, _ = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "cluster-slow"}}) }() select { case <-slowReached: - case <-time.After(2 * time.Second): - t.Fatal("slow cluster's setConfig did not reach the Watch call in time") + case <-time.After(stuckWatchTimeout): + t.Fatal("slow cluster's reconcile did not reach the Watch call in time") } fastDone := make(chan error, 1) go func() { - _, err := reconciler.setRemoteClientConfig(ctx, "cluster-fast", fastConfig, defaultOrigin) + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "cluster-fast"}}) fastDone <- err }() select { case err := <-fastDone: if err != nil { - t.Fatalf("cluster-fast setRemoteClientConfig returned err: %v", err) + t.Fatalf("cluster-fast reconcile returned err: %v", err) } - case <-time.After(2 * time.Second): - t.Fatal("cluster-fast setRemoteClientConfig was blocked by cluster-slow (head-of-line)") + case <-time.After(stuckWatchTimeout): + t.Fatal("cluster-fast reconcile was blocked by cluster-slow (head-of-line)") } releaseSlow() select { case <-slowDone: - case <-time.After(2 * time.Second): + case <-time.After(stuckWatchTimeout): t.Fatal("slow goroutine did not exit after release") } }