Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 60 additions & 20 deletions pkg/provider/azure_local_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,35 @@ func (updater *loadBalancerBackendPoolUpdater) removeOperation(serviceName strin
}
}

// process processes all operations in the loadBalancerBackendPoolUpdater.
// It merges operations that have the same loadBalancerName and backendPoolName,
// and then processes them in batches. If an operation fails, it will be retried
// if it is retriable, otherwise all operations in the batch targeting to
// this backend pool will fail.
func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) {
logger := log.FromContextOrBackground(ctx).WithName("loadBalancerBackendPoolUpdater.process")
// countOperations returns the number of pending operations in the queue.
func (updater *loadBalancerBackendPoolUpdater) countOperations() int {
updater.lock.Lock()
defer updater.lock.Unlock()
return len(updater.operations)
}

// drainOperations drains all pending operations from the queue and clears it.
func (updater *loadBalancerBackendPoolUpdater) drainOperations() []batchOperation {
updater.lock.Lock()
defer updater.lock.Unlock()

if len(updater.operations) == 0 {
logger.V(4).Info("no operations to process")
return
return nil
}

// Group operations by loadBalancerName:backendPoolName
ops := updater.operations
updater.operations = make([]batchOperation, 0)
return ops
}

// groupOperations filters and groups operations by loadBalancerName:backendPoolName.
// Must be called under serviceReconcileLock so that
// localServiceNameToServiceInfoMap reads are consistent.
func (updater *loadBalancerBackendPoolUpdater) groupOperations(ctx context.Context, ops []batchOperation) map[string][]batchOperation {
logger := log.FromContextOrBackground(ctx).WithName("loadBalancerBackendPoolUpdater.groupOperations")

groups := make(map[string][]batchOperation)
for _, op := range updater.operations {
for _, op := range ops {
lbOp := op.(*loadBalancerBackendPoolUpdateOperation)
si, found := updater.az.getLocalServiceInfo(strings.ToLower(lbOp.serviceName))
if !found {
Expand All @@ -196,8 +207,39 @@ func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) {
groups[key] = append(groups[key], op)
}

// Clear all jobs.
updater.operations = make([]batchOperation, 0)
return groups
}

// process processes all operations in the loadBalancerBackendPoolUpdater.
// It merges operations that have the same loadBalancerName and backendPoolName,
// and then processes them in batches. If an operation fails, it will be retried
// if it is retriable, otherwise all operations in the batch targeting to
// this backend pool will fail.
func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) {
logger := log.FromContextOrBackground(ctx).WithName("loadBalancerBackendPoolUpdater.process")

// Acquire serviceReconcileLock before draining operations so that
// removeOperation can cancel queued operations and localServiceNameToServiceInfoMap
// reads in groupOperations are consistent. The lock ordering
// (serviceReconcileLock, azureResourceLocker, updater.lock) matches
// the main reconciliation loop.
updater.az.serviceReconcileLock.Lock()
defer updater.az.serviceReconcileLock.Unlock()

if updater.countOperations() == 0 {
return
}

// Serialize with other components that may update Azure load balancer resources.
if updater.az.azureResourceLocker != nil {
if err := updater.az.azureResourceLocker.Lock(ctx); err != nil {
return
}
defer func() { _ = updater.az.azureResourceLocker.Unlock(ctx) }()
}

ops := updater.drainOperations()
groups := updater.groupOperations(ctx, ops)

for key, ops := range groups {
parts := strings.Split(key, ":")
Expand All @@ -211,15 +253,12 @@ func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) {
updater.az.getNetworkResourceSubscriptionID(),
"local_service_backend_pool_updater", // source name, use a constant source name for aggregation
)
isOperationSucceeded := false
defer func() {
mc.ObserveOperationWithResult(isOperationSucceeded)
}()

bp, err := updater.az.NetworkClientFactory.GetBackendAddressPoolClient().Get(ctx, updater.az.ResourceGroup, lbName, poolName)
if err != nil {
mc.ObserveOperationWithResult(false)
updater.processError(err, operationName, ops...)
continue // Metric will be recorded as failure via defer
continue
}

var changed bool
Expand All @@ -242,11 +281,12 @@ func (updater *loadBalancerBackendPoolUpdater) process(ctx context.Context) {
logger.V(2).Info("updating backend pool", "loadBalancer", lbName, "backendPool", poolName)
_, err = updater.az.NetworkClientFactory.GetBackendAddressPoolClient().CreateOrUpdate(ctx, updater.az.ResourceGroup, lbName, poolName, *bp)
if err != nil {
mc.ObserveOperationWithResult(false)
updater.processError(err, operationName, ops...)
continue // Metric will be recorded as failure via defer
continue
}
}
isOperationSucceeded = true // Mark operation as successful before notifying
mc.ObserveOperationWithResult(true)
updater.notify(newBatchOperationResult(operationName, true, nil), ops...)
}
}
Expand Down
Loading
Loading