diff --git a/pkg/provider/azure_local_services.go b/pkg/provider/azure_local_services.go index 70eaf7b5fc..1709bc6e5d 100644 --- a/pkg/provider/azure_local_services.go +++ b/pkg/provider/azure_local_services.go @@ -160,24 +160,35 @@ func (updater *loadBalancerBackendPoolUpdater) removeOperation(serviceName strin } } -// process processes all operations in the loadBalancerBackendPoolUpdater. -// It merges operations that have the same loadBalancerName and backendPoolName, -// and then processes them in batches. If an operation fails, it will be retried -// if it is retriable, otherwise all operations in the batch targeting to -// this backend pool will fail. -func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) { - logger := log.FromContextOrBackground(ctx).WithName("loadBalancerBackendPoolUpdater.process") +// countOperations returns the number of pending operations in the queue. +func (updater *loadBalancerBackendPoolUpdater) countOperations() int { + updater.lock.Lock() + defer updater.lock.Unlock() + return len(updater.operations) +} + +// drainOperations drains all pending operations from the queue and clears it. +func (updater *loadBalancerBackendPoolUpdater) drainOperations() []batchOperation { updater.lock.Lock() defer updater.lock.Unlock() if len(updater.operations) == 0 { - logger.V(4).Info("no operations to process") - return + return nil } - // Group operations by loadBalancerName:backendPoolName + ops := updater.operations + updater.operations = make([]batchOperation, 0) + return ops +} + +// groupOperations filters and groups operations by loadBalancerName:backendPoolName. +// Must be called under serviceReconcileLock so that +// localServiceNameToServiceInfoMap reads are consistent. +func (updater *loadBalancerBackendPoolUpdater) groupOperations(ctx context.Context, ops []batchOperation) map[string][]batchOperation { + logger := log.FromContextOrBackground(ctx).WithName("loadBalancerBackendPoolUpdater.groupOperations") + groups := make(map[string][]batchOperation) - for _, op := range updater.operations { + for _, op := range ops { lbOp := op.(*loadBalancerBackendPoolUpdateOperation) si, found := updater.az.getLocalServiceInfo(strings.ToLower(lbOp.serviceName)) if !found { @@ -196,8 +207,39 @@ func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) { groups[key] = append(groups[key], op) } - // Clear all jobs. - updater.operations = make([]batchOperation, 0) + return groups +} + +// process processes all operations in the loadBalancerBackendPoolUpdater. +// It merges operations that have the same loadBalancerName and backendPoolName, +// and then processes them in batches. If an operation fails, it will be retried +// if it is retriable, otherwise all operations in the batch targeting to +// this backend pool will fail. +func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) { + logger := log.FromContextOrBackground(ctx).WithName("loadBalancerBackendPoolUpdater.process") + + // Acquire serviceReconcileLock before draining operations so that + // removeOperation can cancel queued operations and localServiceNameToServiceInfoMap + // reads in groupOperations are consistent. The lock ordering + // (serviceReconcileLock, azureResourceLocker, updater.lock) matches + // the main reconciliation loop. + updater.az.serviceReconcileLock.Lock() + defer updater.az.serviceReconcileLock.Unlock() + + if updater.countOperations() == 0 { + return + } + + // Serialize with other components that may update Azure load balancer resources. + if updater.az.azureResourceLocker != nil { + if err := updater.az.azureResourceLocker.Lock(ctx); err != nil { + return + } + defer func() { _ = updater.az.azureResourceLocker.Unlock(ctx) }() + } + + ops := updater.drainOperations() + groups := updater.groupOperations(ctx, ops) for key, ops := range groups { parts := strings.Split(key, ":") @@ -211,15 +253,12 @@ func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) { updater.az.getNetworkResourceSubscriptionID(), "local_service_backend_pool_updater", // source name, use a constant source name for aggregation ) - isOperationSucceeded := false - defer func() { - mc.ObserveOperationWithResult(isOperationSucceeded) - }() bp, err := updater.az.NetworkClientFactory.GetBackendAddressPoolClient().Get(ctx, updater.az.ResourceGroup, lbName, poolName) if err != nil { + mc.ObserveOperationWithResult(false) updater.processError(err, operationName, ops...) - continue // Metric will be recorded as failure via defer + continue } var changed bool @@ -242,11 +281,12 @@ func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) { logger.V(2).Info("updating backend pool", "loadBalancer", lbName, "backendPool", poolName) _, err = updater.az.NetworkClientFactory.GetBackendAddressPoolClient().CreateOrUpdate(ctx, updater.az.ResourceGroup, lbName, poolName, *bp) if err != nil { + mc.ObserveOperationWithResult(false) updater.processError(err, operationName, ops...) - continue // Metric will be recorded as failure via defer + continue } } - isOperationSucceeded = true // Mark operation as successful before notifying + mc.ObserveOperationWithResult(true) updater.notify(newBatchOperationResult(operationName, true, nil), ops...) } } diff --git a/pkg/provider/azure_local_services_test.go b/pkg/provider/azure_local_services_test.go index 7375d44775..0eada5ff7e 100644 --- a/pkg/provider/azure_local_services_test.go +++ b/pkg/provider/azure_local_services_test.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "sync" + "sync/atomic" "testing" "time" @@ -33,8 +34,10 @@ import ( v1 "k8s.io/api/core/v1" discovery_v1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + clientgotesting "k8s.io/client-go/testing" "k8s.io/utils/ptr" "sigs.k8s.io/cloud-provider-azure/pkg/azclient/backendaddresspoolclient/mock_backendaddresspoolclient" @@ -136,6 +139,30 @@ func TestLoadBalancerBackendPoolUpdater(t *testing.T) { operations: []batchOperation{addOperationPool1}, changeLB: true, }, + { + name: "empty queue returns without ARM calls", + operations: nil, + }, + { + name: "removing non-existent IPs does not trigger CreateOrUpdate", + operations: []batchOperation{removeOperationPool1}, + existingBackendPools: []*armnetwork.BackendAddressPool{ + getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{}), + }, + expectedBackendPools: []*armnetwork.BackendAddressPool{ + getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{}), + }, + }, + { + name: "adding already-present IPs does not trigger CreateOrUpdate", + operations: []batchOperation{addOperationPool1}, + existingBackendPools: []*armnetwork.BackendAddressPool{ + getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1", "10.0.0.2"}), + }, + expectedBackendPools: []*armnetwork.BackendAddressPool{ + getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1", "10.0.0.2"}), + }, + }, } for _, tc := range testCases { @@ -662,3 +689,530 @@ func TestCheckAndApplyLocalServiceBackendPoolUpdates(t *testing.T) { }) } } + +func TestCountOperations(t *testing.T) { + op1 := getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"}) + op2 := getRemoveIPsFromBackendPoolOperation("ns1/svc2", "lb1", "pool1", []string{"10.0.0.2"}) + + tests := []struct { + name string + ops []batchOperation + expected int + }{ + {name: "empty queue", ops: nil, expected: 0}, + {name: "one operation", ops: []batchOperation{op1}, expected: 1}, + {name: "two operations", ops: []batchOperation{op1, op2}, expected: 2}, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cloud := &Cloud{} + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + for _, op := range tc.ops { + u.addOperation(op) + } + assert.Equal(t, tc.expected, u.countOperations()) + }) + } + + t.Run("acquires updater lock", func(t *testing.T) { + cloud := &Cloud{} + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(op1) + + u.lock.Lock() + + done := make(chan struct{}) + go func() { + defer close(done) + u.countOperations() + }() + + select { + case <-done: + t.Fatal("countOperations should block while lock is held") + case <-time.After(100 * time.Millisecond): + } + + u.lock.Unlock() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("countOperations did not complete after lock released") + } + }) +} + +func TestDrainOperations(t *testing.T) { + op1 := getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"}) + op2 := getAddIPsToBackendPoolOperation("ns1/svc2", "lb1", "pool2", []string{"10.0.0.2"}) + + testCases := []struct { + name string + operations []batchOperation + expected []batchOperation + }{ + { + name: "returns nil when queue is empty", + expected: nil, + }, + { + name: "returns single operation", + operations: []batchOperation{op1}, + expected: []batchOperation{op1}, + }, + { + name: "returns all operations in order", + operations: []batchOperation{op1, op2}, + expected: []batchOperation{op1, op2}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cloud := &Cloud{} + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + for _, op := range tc.operations { + u.addOperation(op) + } + + ops := u.drainOperations() + + assert.Equal(t, tc.expected, ops) + + u.lock.Lock() + assert.Equal(t, 0, len(u.operations), "queue should be cleared after drain") + u.lock.Unlock() + }) + } + + t.Run("acquires updater lock", func(t *testing.T) { + cloud := &Cloud{} + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(op1) + + u.lock.Lock() + + done := make(chan struct{}) + go func() { + defer close(done) + u.drainOperations() + }() + + select { + case <-done: + t.Fatal("drainOperations returned while lock was held") + case <-time.After(100 * time.Millisecond): + } + + u.lock.Unlock() + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("drainOperations did not complete after lock released") + } + }) +} + +func TestGroupOperations(t *testing.T) { + addPool1 := getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"}) + removePool1 := getRemoveIPsFromBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"}) + addPool1WithExtra := getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1", "10.0.0.2"}) + addPool2 := getAddIPsToBackendPoolOperation("ns1/svc2", "lb1", "pool2", []string{"10.0.0.2"}) + + testCases := []struct { + name string + operations []batchOperation + localServices map[string]*serviceInfo + expectedGroups map[string][]batchOperation + }{ + { + name: "returns empty map for nil input", + operations: nil, + localServices: map[string]*serviceInfo{"ns1/svc1": {lbName: "lb1"}}, + expectedGroups: map[string][]batchOperation{}, + }, + { + name: "groups single operation by pool key", + operations: []batchOperation{addPool1}, + localServices: map[string]*serviceInfo{"ns1/svc1": {lbName: "lb1"}}, + expectedGroups: map[string][]batchOperation{"lb1:pool1": {addPool1}}, + }, + { + name: "groups operations targeting same pool together", + operations: []batchOperation{removePool1, addPool1WithExtra}, + localServices: map[string]*serviceInfo{"ns1/svc1": {lbName: "lb1"}}, + expectedGroups: map[string][]batchOperation{"lb1:pool1": {removePool1, addPool1WithExtra}}, + }, + { + name: "groups operations targeting different pools separately", + operations: []batchOperation{addPool1, addPool2}, + localServices: map[string]*serviceInfo{ + "ns1/svc1": {lbName: "lb1"}, + "ns1/svc2": {lbName: "lb1"}, + }, + expectedGroups: map[string][]batchOperation{"lb1:pool1": {addPool1}, "lb1:pool2": {addPool2}}, + }, + { + name: "skips operations for non-local services", + operations: []batchOperation{addPool1}, + localServices: map[string]*serviceInfo{}, + expectedGroups: map[string][]batchOperation{}, + }, + { + name: "skips operations targeting stale load balancer", + operations: []batchOperation{addPool1}, + localServices: map[string]*serviceInfo{"ns1/svc1": {lbName: "lb2"}}, + expectedGroups: map[string][]batchOperation{}, + }, + { + name: "keeps valid operation when another is filtered", + operations: []batchOperation{addPool1, addPool2}, + localServices: map[string]*serviceInfo{"ns1/svc1": {lbName: "lb1"}}, + expectedGroups: map[string][]batchOperation{ + "lb1:pool1": {addPool1}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cloud := &Cloud{} + for k, v := range tc.localServices { + cloud.localServiceNameToServiceInfoMap.Store(k, v) + } + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + + groups := u.groupOperations(context.Background(), tc.operations) + + assert.Equal(t, len(tc.expectedGroups), len(groups)) + for key, expectedOps := range tc.expectedGroups { + assert.Equal(t, len(expectedOps), len(groups[key]), "group %s count", key) + for i, expectedOp := range expectedOps { + assert.Equal(t, expectedOp, groups[key][i], "group %s op %d", key, i) + } + } + }) + } +} + +func TestLoadBalancerBackendPoolUpdaterSerializesWithServiceReconcileLock(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cloud := GetTestCloud(ctrl) + cloud.localServiceNameToServiceInfoMap = sync.Map{} + cloud.localServiceNameToServiceInfoMap.Store("ns1/svc1", &serviceInfo{lbName: "lb1"}) + + svc := getTestService("svc1", v1.ProtocolTCP, nil, false) + client := fake.NewSimpleClientset(&svc) + informerFactory := informers.NewSharedInformerFactory(client, 0) + cloud.serviceLister = informerFactory.Core().V1().Services().Lister() + + existingBP := getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{}) + mockBPClient := cloud.NetworkClientFactory.GetBackendAddressPoolClient().(*mock_backendaddresspoolclient.MockInterface) + + // ARM calls should only happen after the lock is released. + armCalled := make(chan struct{}) + mockBPClient.EXPECT().Get( + gomock.Any(), gomock.Any(), "lb1", "pool1", + ).DoAndReturn(func(_ context.Context, _, _, _ string) (*armnetwork.BackendAddressPool, error) { + close(armCalled) + return existingBP, nil + }) + mockBPClient.EXPECT().CreateOrUpdate( + gomock.Any(), gomock.Any(), "lb1", "pool1", + *getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1"}), + ).Return(nil, nil) + + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"})) + + // Hold serviceReconcileLock to simulate the main reconcile path. + cloud.serviceReconcileLock.Lock() + + processDone := make(chan struct{}) + go func() { + defer close(processDone) + u.process(context.Background()) + }() + + // process() should be blocked, ARM call should not have happened yet. + select { + case <-armCalled: + t.Fatal("ARM call happened while serviceReconcileLock was held") + case <-time.After(500 * time.Millisecond): + // Expected: process() is blocked waiting for the lock. + } + + // Release the lock, process() should now complete. + cloud.serviceReconcileLock.Unlock() + + select { + case <-processDone: + // process() completed after lock was released. + case <-time.After(5 * time.Second): + t.Fatal("process() did not complete after serviceReconcileLock was released") + } +} + +func TestLoadBalancerBackendPoolUpdaterAddOperationNotBlockedDuringProcess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cloud := GetTestCloud(ctrl) + cloud.localServiceNameToServiceInfoMap = sync.Map{} + cloud.localServiceNameToServiceInfoMap.Store("ns1/svc1", &serviceInfo{lbName: "lb1"}) + + svc := getTestService("svc1", v1.ProtocolTCP, nil, false) + client := fake.NewSimpleClientset(&svc) + informerFactory := informers.NewSharedInformerFactory(client, 0) + cloud.serviceLister = informerFactory.Core().V1().Services().Lister() + + // Block the ARM Get call so process() is in the middle of ARM work. + armBlocked := make(chan struct{}) + armUnblock := make(chan struct{}) + existingBP := getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{}) + mockBPClient := cloud.NetworkClientFactory.GetBackendAddressPoolClient().(*mock_backendaddresspoolclient.MockInterface) + mockBPClient.EXPECT().Get( + gomock.Any(), gomock.Any(), "lb1", "pool1", + ).DoAndReturn(func(_ context.Context, _, _, _ string) (*armnetwork.BackendAddressPool, error) { + close(armBlocked) + <-armUnblock + return existingBP, nil + }) + mockBPClient.EXPECT().CreateOrUpdate( + gomock.Any(), gomock.Any(), "lb1", "pool1", + *getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1"}), + ).Return(nil, nil) + + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"})) + + processDone := make(chan struct{}) + go func() { + defer close(processDone) + u.process(context.Background()) + }() + + // Wait until the ARM call is in flight (updater.lock has been released). + select { + case <-armBlocked: + case <-time.After(5 * time.Second): + t.Fatal("ARM call did not start") + } + + // addOperation should not block because updater.lock is released during ARM calls. + addDone := make(chan struct{}) + go func() { + defer close(addDone) + u.addOperation(getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.2"})) + }() + + select { + case <-addDone: + // addOperation returned without blocking. + case <-time.After(500 * time.Millisecond): + t.Fatal("addOperation blocked while process() was performing ARM calls") + } + + // Unblock the ARM call and wait for process() to finish + // so that mock expectations (CreateOrUpdate) are satisfied. + close(armUnblock) + select { + case <-processDone: + case <-time.After(5 * time.Second): + t.Fatal("process() did not complete after unblocking ARM call") + } +} + +func TestLoadBalancerBackendPoolUpdaterPreservesOperationsOnLeaseLockFailure(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cloud := GetTestCloud(ctrl) + cloud.localServiceNameToServiceInfoMap = sync.Map{} + cloud.localServiceNameToServiceInfoMap.Store("ns1/svc1", &serviceInfo{lbName: "lb1"}) + + // Set up azureResourceLocker with a KubeClient that fails lease operations. + leaseClient := fake.NewSimpleClientset() + leaseClient.PrependReactor("get", "leases", func(_ clientgotesting.Action) (bool, runtime.Object, error) { + return true, nil, fmt.Errorf("simulated lease API failure") + }) + + cloud.KubeClient = leaseClient + cloud.azureResourceLocker = NewAzureResourceLocker( + cloud, + "test-holder", + "test-lease", + "test-namespace", + 15, + ) + + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"})) + + // No ARM mock expectations, ARM call should not happen. + u.process(context.Background()) + + u.lock.Lock() + assert.Equal(t, 1, len(u.operations), "Operations should be preserved after lease lock failure") + u.lock.Unlock() +} + +func TestLoadBalancerBackendPoolUpdaterCompletesOnUnlockFailure(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cloud := GetTestCloud(ctrl) + cloud.localServiceNameToServiceInfoMap = sync.Map{} + cloud.localServiceNameToServiceInfoMap.Store("ns1/svc1", &serviceInfo{lbName: "lb1"}) + + svc := getTestService("svc1", v1.ProtocolTCP, nil, false) + client := fake.NewSimpleClientset(&svc) + informerFactory := informers.NewSharedInformerFactory(client, 0) + cloud.serviceLister = informerFactory.Core().V1().Services().Lister() + + // Fail the second lease update (releaseLease), let the first (acquireLease) pass. + var updateCount int32 + leaseClient := fake.NewSimpleClientset() + leaseClient.PrependReactor("update", "leases", func(_ clientgotesting.Action) (bool, runtime.Object, error) { + if atomic.AddInt32(&updateCount, 1) >= 2 { + return true, nil, fmt.Errorf("simulated unlock failure") + } + return false, nil, nil + }) + + cloud.KubeClient = leaseClient + cloud.azureResourceLocker = NewAzureResourceLocker( + cloud, + "test-holder", + "test-lease", + "test-namespace", + 15, + ) + + existingBP := getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{}) + mockBPClient := cloud.NetworkClientFactory.GetBackendAddressPoolClient().(*mock_backendaddresspoolclient.MockInterface) + mockBPClient.EXPECT().Get( + gomock.Any(), gomock.Any(), "lb1", "pool1", + ).Return(existingBP, nil) + mockBPClient.EXPECT().CreateOrUpdate( + gomock.Any(), gomock.Any(), "lb1", "pool1", + *getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1"}), + ).Return(nil, nil) + + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"})) + + // ARM calls should complete despite unlock failure. + u.process(context.Background()) + + u.lock.Lock() + assert.Equal(t, 0, len(u.operations), "Operations should have been processed despite unlock failure") + u.lock.Unlock() +} + +func TestLoadBalancerBackendPoolUpdaterFiltersOperationsWhenLBChangedDuringProcess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cloud := GetTestCloud(ctrl) + cloud.localServiceNameToServiceInfoMap = sync.Map{} + cloud.localServiceNameToServiceInfoMap.Store("ns1/svc1", &serviceInfo{lbName: "lb1"}) + + svc := getTestService("svc1", v1.ProtocolTCP, nil, false) + client := fake.NewSimpleClientset(&svc) + informerFactory := informers.NewSharedInformerFactory(client, 0) + cloud.serviceLister = informerFactory.Core().V1().Services().Lister() + + // No ARM mock expectations. The operation targets lb1 but the service moves + // to lb2 while process() is blocked on serviceReconcileLock, so groupOperations + // filters it. + + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"})) + + // Hold serviceReconcileLock to simulate the main reconcile loop running. + cloud.serviceReconcileLock.Lock() + + processDone := make(chan struct{}) + go func() { + defer close(processDone) + u.process(context.Background()) + }() + + // process() is blocked on serviceReconcileLock. The queue has not been drained yet. + time.Sleep(200 * time.Millisecond) + u.lock.Lock() + assert.Equal(t, 1, len(u.operations), "queue should not be drained while process() is blocked") + u.lock.Unlock() + + // Simulate the main reconcile loop moving svc1 from lb1 to lb2. + cloud.localServiceNameToServiceInfoMap.Store("ns1/svc1", &serviceInfo{lbName: "lb2"}) + + // Release the lock. process() will drain and filter the operation via groupOperations. + cloud.serviceReconcileLock.Unlock() + + select { + case <-processDone: + case <-time.After(5 * time.Second): + t.Fatal("process() did not complete") + } + + u.lock.Lock() + assert.Equal(t, 0, len(u.operations)) + u.lock.Unlock() +} + +func TestLoadBalancerBackendPoolUpdaterRemoveOperationCancelsOperationsBeforeDrain(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cloud := GetTestCloud(ctrl) + cloud.localServiceNameToServiceInfoMap = sync.Map{} + cloud.localServiceNameToServiceInfoMap.Store("ns1/svc1", &serviceInfo{lbName: "lb1"}) + + svc := getTestService("svc1", v1.ProtocolTCP, nil, false) + client := fake.NewSimpleClientset(&svc) + informerFactory := informers.NewSharedInformerFactory(client, 0) + cloud.serviceLister = informerFactory.Core().V1().Services().Lister() + + // No ARM mock expectations. removeOperation cancels the operation + // before process() drains the queue. + + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + u.addOperation(getAddIPsToBackendPoolOperation("ns1/svc1", "lb1", "pool1", []string{"10.0.0.1"})) + + // Hold serviceReconcileLock to simulate the main reconcile loop running. + cloud.serviceReconcileLock.Lock() + + processDone := make(chan struct{}) + go func() { + defer close(processDone) + u.process(context.Background()) + }() + + // process() is blocked on serviceReconcileLock. The queue has not been drained yet. + time.Sleep(200 * time.Millisecond) + u.lock.Lock() + assert.Equal(t, 1, len(u.operations), "queue should not be drained while process() is blocked") + u.lock.Unlock() + + // Simulate the main reconcile loop calling removeOperation to cancel pending operations. + u.removeOperation("ns1/svc1") + + // Release the lock. process() should not make ARM calls for the cancelled service. + cloud.serviceReconcileLock.Unlock() + + select { + case <-processDone: + case <-time.After(5 * time.Second): + t.Fatal("process() did not complete") + } + + u.lock.Lock() + assert.Equal(t, 0, len(u.operations)) + u.lock.Unlock() +}