Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion pkg/controller/admissionchecks/multikueue/multikueuecluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type remoteClient struct {
failedConnAttempts uint
retryConnNextAttempt metav1.Time

// Held during setConfig. Without it, one stuck remote would stall every
// other cluster's reconcile via clustersReconciler.lock. See #11297.
setConfigLock sync.Mutex

clock clock.Clock

// For unit testing only. There is now need of creating fully functional remote clients in the unit tests
Expand Down Expand Up @@ -488,7 +492,9 @@ func (c *clustersReconciler) stopAndRemoveCluster(clusterName string) {
}
}

func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, config *clientConfig, origin string) (*time.Duration, error) {
// findOrCreateRemoteClient returns the remoteClient for clusterName, creating
// one if absent. Only the brief map operation runs under c.lock.
func (c *clustersReconciler) findOrCreateRemoteClient(clusterName, origin string) *remoteClient {
c.lock.Lock()
defer c.lock.Unlock()

Expand All @@ -500,6 +506,14 @@ func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterN
}
c.remoteClients[clusterName] = client
}
return client
}

func (c *clustersReconciler) setRemoteClientConfig(ctx context.Context, clusterName string, config *clientConfig, origin string) (*time.Duration, error) {
client := c.findOrCreateRemoteClient(clusterName, origin)

client.setConfigLock.Lock()
defer client.setConfigLock.Unlock()
Comment on lines +515 to +516
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, in theory this lock is redundant, because I think there's only one reconcile running per MultiKueueCluster at any given point, so in theory there's at most one thread accessing c.remoteClients[clusterName].

But this would be very implicit, so having this lock just in case sounds good to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, the workqueue dedup makes it redundant in production. Kept as a local safety net because that guarantee is invisible from this file, and tests can bypass the workqueue (the one in this PR does).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I missed that. Having said that I'm ok with either option, both have some merit: explicit check + a bit more robust tests, vs. less production code.

I'm ok to keep as is.


clientLog := ctrl.LoggerFrom(c.rootContext).WithValues("clusterName", clusterName)
clientCtx := ctrl.LoggerInto(c.rootContext, clientLog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1267,3 +1268,91 @@ func TestEstablishBackoffSchedule(t *testing.T) {
})
}
}

// Regression for #11297. A remote stuck inside Watch must not stop
// reconciles of other clusters.
func TestSetRemoteClientConfigDoesNotBlockOtherClusters(t *testing.T) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test, thanks for adding 👍

// stuckWatchTimeout is the per-phase budget for the test: how long we
// wait for the slow watch to be reached, for the fast reconcile to
// finish, and for the slow goroutine to clean up after release. Generous
// enough to survive a loaded CI runner.
const stuckWatchTimeout = 5 * time.Second

ctx, _ := utiltesting.ContextWithLog(t)

slowReached := make(chan struct{})
slowRelease := make(chan struct{})
var releaseOnce sync.Once
releaseSlow := func() { releaseOnce.Do(func() { close(slowRelease) }) }
t.Cleanup(releaseSlow)

gatedBuilder := func(config *clientConfig, _ client.Options) (client.WithWatch, error) {
kubeconfig := string(config.Kubeconfig)
b := getClientBuilder(ctx).WithInterceptorFuncs(interceptor.Funcs{
Watch: func(watchCtx context.Context, c client.WithWatch, obj client.ObjectList, opts ...client.ListOption) (watch.Interface, error) {
if strings.Contains(kubeconfig, "slow-user") {
close(slowReached)
select {
case <-slowRelease:
return nil, errors.New("released")
case <-watchCtx.Done():
return nil, watchCtx.Err()
}
}
return c.Watch(watchCtx, obj, opts...)
},
})
return b.Build(), nil
}

slowCluster := utiltestingapi.MakeMultiKueueCluster("cluster-slow").
KubeConfig(kueue.SecretLocationType, "secret-slow").Generation(1).Obj()
fastCluster := utiltestingapi.MakeMultiKueueCluster("cluster-fast").
KubeConfig(kueue.SecretLocationType, "secret-fast").Generation(1).Obj()
slowSecret := makeTestSecret("secret-slow", testKubeconfig("slow-user"))
fastSecret := makeTestSecret("secret-fast", testKubeconfig("fast-user"))

localClient := getClientBuilder(ctx).
WithLists(&kueue.MultiKueueClusterList{Items: []kueue.MultiKueueCluster{*slowCluster, *fastCluster}}).
WithLists(&corev1.SecretList{Items: []corev1.Secret{slowSecret, fastSecret}}).
WithStatusSubresource(slowCluster, fastCluster).
Build()

reconciler := newClustersReconciler(localClient, TestNamespace, 0, defaultOrigin, nil, nil, &NoOpClusterProfileCreds{}, nil)
reconciler.rootContext = ctx
reconciler.builderOverride = gatedBuilder

slowDone := make(chan struct{})
go func() {
defer close(slowDone)
_, _ = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "cluster-slow"}})
}()

select {
case <-slowReached:
case <-time.After(stuckWatchTimeout):
t.Fatal("slow cluster's reconcile did not reach the Watch call in time")
}

fastDone := make(chan error, 1)
go func() {
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "cluster-fast"}})
fastDone <- err
}()

select {
case err := <-fastDone:
if err != nil {
t.Fatalf("cluster-fast reconcile returned err: %v", err)
}
case <-time.After(stuckWatchTimeout):
t.Fatal("cluster-fast reconcile was blocked by cluster-slow (head-of-line)")
}

releaseSlow()
select {
case <-slowDone:
case <-time.After(stuckWatchTimeout):
t.Fatal("slow goroutine did not exit after release")
}
}