diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index 483b6cff630..f5376917677 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -120,6 +120,10 @@ type remoteClient struct { failedConnAttempts uint retryConnNextAttempt metav1.Time + // Held during setConfig. Without it, one stuck remote would stall every + // other cluster's reconcile via clustersReconciler.lock. See #11297. + setConfigLock sync.Mutex + clock clock.Clock // For unit testing only. There is now need of creating fully functional remote clients in the unit tests @@ -488,7 +492,9 @@ func (c *clustersReconciler) stopAndRemoveCluster(clusterName string) { } } -func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, config *clientConfig, origin string) (*time.Duration, error) { +// findOrCreateRemoteClient returns the remoteClient for clusterName, creating +// one if absent. Only the brief map operation runs under c.lock. +func (c *clustersReconciler) findOrCreateRemoteClient(clusterName, origin string) *remoteClient { c.lock.Lock() defer c.lock.Unlock() @@ -500,6 +506,14 @@ func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterN } c.remoteClients[clusterName] = client } + return client +} + +func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, config *clientConfig, origin string) (*time.Duration, error) { + client := c.findOrCreateRemoteClient(clusterName, origin) + + client.setConfigLock.Lock() + defer client.setConfigLock.Unlock() clientLog := ctrl.LoggerFrom(c.rootContext).WithValues("clusterName", clusterName) clientCtx := ctrl.LoggerInto(c.rootContext, clientLog) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index f7880bdd76a..295e15dc4d8 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -1267,3 +1268,91 @@ func TestEstablishBackoffSchedule(t *testing.T) { }) } } + +// Regression for #11297. A remote stuck inside Watch must not stop +// reconciles of other clusters. +func TestSetRemoteClientConfigDoesNotBlockOtherClusters(t *testing.T) { + // stuckWatchTimeout is the per-phase budget for the test: how long we + // wait for the slow watch to be reached, for the fast reconcile to + // finish, and for the slow goroutine to clean up after release. Generous + // enough to survive a loaded CI runner. + const stuckWatchTimeout = 5 * time.Second + + ctx, _ := utiltesting.ContextWithLog(t) + + slowReached := make(chan struct{}) + slowRelease := make(chan struct{}) + var releaseOnce sync.Once + releaseSlow := func() { releaseOnce.Do(func() { close(slowRelease) }) } + t.Cleanup(releaseSlow) + + gatedBuilder := func(config *clientConfig, _ client.Options) (client.WithWatch, error) { + kubeconfig := string(config.Kubeconfig) + b := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{ + Watch: func(watchCtx context.Context, c client.WithWatch, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) { + if strings.Contains(kubeconfig, "slow-user") { + close(slowReached) + select { + case <-slowRelease: + return nil, errors.New("released") + case <-watchCtx.Done(): + return nil, watchCtx.Err() + } + } + return c.Watch(watchCtx, obj, opts...) + }, + }) + return b.Build(), nil + } + + slowCluster := utiltestingapi.MakeMultiKueueCluster("cluster-slow"). + KubeConfig(kueue.SecretLocationType, "secret-slow").Generation(1).Obj() + fastCluster := utiltestingapi.MakeMultiKueueCluster("cluster-fast"). + KubeConfig(kueue.SecretLocationType, "secret-fast").Generation(1).Obj() + slowSecret := makeTestSecret("secret-slow", testKubeconfig("slow-user")) + fastSecret := makeTestSecret("secret-fast", testKubeconfig("fast-user")) + + localClient := getClientBuilder(ctx). + WithLists(&kueue.MultiKueueClusterList{Items: []kueue.MultiKueueCluster{*slowCluster, *fastCluster}}). + WithLists(&corev1.SecretList{Items: []corev1.Secret{slowSecret, fastSecret}}). + WithStatusSubresource(slowCluster, fastCluster). + Build() + + reconciler := newClustersReconciler(localClient, TestNamespace, 0, defaultOrigin, nil, nil, &NoOpClusterProfileCreds{}, nil) + reconciler.rootContext = ctx + reconciler.builderOverride = gatedBuilder + + slowDone := make(chan struct{}) + go func() { + defer close(slowDone) + _, _ = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "cluster-slow"}}) + }() + + select { + case <-slowReached: + case <-time.After(stuckWatchTimeout): + t.Fatal("slow cluster's reconcile did not reach the Watch call in time") + } + + fastDone := make(chan error, 1) + go func() { + _, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "cluster-fast"}}) + fastDone <- err + }() + + select { + case err := <-fastDone: + if err != nil { + t.Fatalf("cluster-fast reconcile returned err: %v", err) + } + case <-time.After(stuckWatchTimeout): + t.Fatal("cluster-fast reconcile was blocked by cluster-slow (head-of-line)") + } + + releaseSlow() + select { + case <-slowDone: + case <-time.After(stuckWatchTimeout): + t.Fatal("slow goroutine did not exit after release") + } +}