From 76f1267a7e7407c981a5c29de3163daee61ad88a Mon Sep 17 00:00:00 2001 From: George Edward Nechitoaia <58257818+georgeedward2000@users.noreply.github.com> Date: Thu, 19 Mar 2026 13:09:23 +0000 Subject: [PATCH 1/3] Add DiffTracker core types, state management, and sync operations Introduces the difftracker package with core K8s/NRP state tracking, diff computation, and state mutation logic. Includes comprehensive test coverage (25+ test functions). --- pkg/provider/difftracker/config.go | 49 ++ pkg/provider/difftracker/difftracker.go | 63 ++ pkg/provider/difftracker/difftracker_test.go | 632 +++++++++++++++++ pkg/provider/difftracker/k8s_state_updates.go | 252 +++++++ pkg/provider/difftracker/nrp_state_updates.go | 108 +++ pkg/provider/difftracker/sync_operations.go | 191 ++++++ pkg/provider/difftracker/types.go | 136 ++++ pkg/provider/difftracker/util.go | 302 ++++++++ pkg/provider/difftracker/util_test.go | 648 ++++++++++++++++++ pkg/util/sets/string.go | 33 + pkg/util/sets/string_test.go | 78 +++ 11 files changed, 2492 insertions(+) create mode 100644 pkg/provider/difftracker/config.go create mode 100644 pkg/provider/difftracker/difftracker.go create mode 100644 pkg/provider/difftracker/difftracker_test.go create mode 100644 pkg/provider/difftracker/k8s_state_updates.go create mode 100644 pkg/provider/difftracker/nrp_state_updates.go create mode 100644 pkg/provider/difftracker/sync_operations.go create mode 100644 pkg/provider/difftracker/types.go create mode 100644 pkg/provider/difftracker/util.go create mode 100644 pkg/provider/difftracker/util_test.go diff --git a/pkg/provider/difftracker/config.go b/pkg/provider/difftracker/config.go new file mode 100644 index 0000000000..198df8fe98 --- /dev/null +++ b/pkg/provider/difftracker/config.go @@ -0,0 +1,49 @@ +package difftracker + +import "fmt" + +// Config holds the configuration values needed by DiffTracker +// to perform Azure operations without depending on the entire AzureCloud struct +// This allows DiffTracker to be more modular and testable +type Config struct { + // Azure subscription ID + SubscriptionID string + + // Azure resource group name + ResourceGroup string + + // Azure location/region + Location string + + // Service Gateway resource name + ServiceGatewayResourceName string + + // Full Service Gateway resource ID + ServiceGatewayID string + + // Virtual Network name (required for backend pool configuration) + VNetName string +} + +// Validate checks if the configuration has all required fields +func (c *Config) Validate() error { + if c.SubscriptionID == "" { + return fmt.Errorf("config validation failed: SubscriptionID is required") + } + if c.ResourceGroup == "" { + return fmt.Errorf("config validation failed: ResourceGroup is required") + } + if c.Location == "" { + return fmt.Errorf("config validation failed: Location is required") + } + if c.ServiceGatewayResourceName == "" { + return fmt.Errorf("config validation failed: ServiceGatewayResourceName is required") + } + if c.ServiceGatewayID == "" { + return fmt.Errorf("config validation failed: ServiceGatewayID is required") + } + if c.VNetName == "" { + return fmt.Errorf("config validation failed: VNetName is required") + } + return nil +} diff --git a/pkg/provider/difftracker/difftracker.go b/pkg/provider/difftracker/difftracker.go new file mode 100644 index 0000000000..c8d850bf3a --- /dev/null +++ b/pkg/provider/difftracker/difftracker.go @@ -0,0 +1,63 @@ +package difftracker + +import ( + "fmt" + + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + "sigs.k8s.io/cloud-provider-azure/pkg/azclient" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" +) + +// InitializeDiffTracker creates and initializes a new DiffTracker with the given state and configuration. +// It validates the configuration and ensures all required dependencies are present. +// Panics if critical dependencies (config, networkClientFactory, kubeClient) are invalid. +func InitializeDiffTracker(K8s K8s_State, NRP NRP_State, config Config, networkClientFactory azclient.ClientFactory, kubeClient kubernetes.Interface) *DiffTracker { + // Validate configuration + if err := config.Validate(); err != nil { + panic(fmt.Sprintf("InitializeDiffTracker: %v", err)) + } + + // Validate required dependencies + if networkClientFactory == nil { + panic("InitializeDiffTracker: networkClientFactory must not be nil") + } + if kubeClient == nil { + panic("InitializeDiffTracker: kubeClient must not be nil") + } + + klog.V(2).Infof("InitializeDiffTracker: initializing with config: subscription=%s, resourceGroup=%s, location=%s", + config.SubscriptionID, config.ResourceGroup, config.Location) + + // If any field is nil, initialize it + if K8s.Services == nil { + K8s.Services = utilsets.NewString() + } + if K8s.Egresses == nil { + K8s.Egresses = utilsets.NewString() + } + if K8s.Nodes == nil { + K8s.Nodes = make(map[string]Node) + } + if NRP.LoadBalancers == nil { + NRP.LoadBalancers = utilsets.NewString() + } + if NRP.NATGateways == nil { + NRP.NATGateways = utilsets.NewString() + } + if NRP.Locations == nil { + NRP.Locations = make(map[string]NRPLocation) + } + + diffTracker := &DiffTracker{ + K8sResources: K8s, + NRPResources: NRP, + + // Configuration and clients + config: config, + networkClientFactory: networkClientFactory, + kubeClient: kubeClient, + } + + return diffTracker +} diff --git a/pkg/provider/difftracker/difftracker_test.go b/pkg/provider/difftracker/difftracker_test.go new file mode 100644 index 0000000000..11dacb8ed7 --- /dev/null +++ b/pkg/provider/difftracker/difftracker_test.go @@ -0,0 +1,632 @@ +package difftracker + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/cloud-provider-azure/pkg/azclient/mock_azclient" + "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" +) + +func TestDiffTracker_DeepEqual(t *testing.T) { + tests := []struct { + name string + dt *DiffTracker + expected bool + }{ + { + name: "equal empty states", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString(), + NATGateways: sets.NewString(), + Locations: map[string]NRPLocation{}, + }, + }, + expected: true, + }, + { + name: "equal states with services", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("service1", "service2"), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1", "service2"), + NATGateways: sets.NewString(), + Locations: map[string]NRPLocation{}, + }, + }, + expected: true, + }, + { + name: "services not equal", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("service1", "service2"), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1"), + NATGateways: sets.NewString(), + Locations: map[string]NRPLocation{}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.dt.DeepEqual() + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestUpdateK8sService(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + }, + } + + // Test ADD operation + err := dt.UpdateK8sService(UpdateK8sResource{ + Operation: ADD, + ID: "service1", + }) + assert.NoError(t, err) + assert.True(t, dt.K8sResources.Services.Has("service1")) + + // Test REMOVE operation + err = dt.UpdateK8sService(UpdateK8sResource{ + Operation: REMOVE, + ID: "service1", + }) + assert.NoError(t, err) + assert.False(t, dt.K8sResources.Services.Has("service1")) + + // Test invalid operation + err = dt.UpdateK8sService(UpdateK8sResource{ + Operation: UPDATE, + ID: "service1", + }) + assert.Error(t, err) +} + +func TestGetSyncLoadBalancerServices(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("service1", "service2", "service3"), + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service2", "service3", "service4"), + }, + } + + result := dt.GetSyncLoadBalancerServices() + + assert.True(t, result.Additions.Has("service1")) + assert.Equal(t, 1, result.Additions.Len()) + + assert.True(t, result.Removals.Has("service4")) + assert.Equal(t, 1, result.Removals.Len()) +} + +func TestUpdateK8sEndpoints(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Nodes: map[string]Node{}, + }, + } + + // Test adding new endpoint + input := UpdateK8sEndpointsInputType{ + InboundIdentity: "service1", + OldAddresses: map[string]string{}, + NewAddresses: map[string]string{"10.0.0.1": "node1"}, + } + + errs := dt.UpdateK8sEndpoints(input) + assert.Empty(t, errs) + assert.Contains(t, dt.K8sResources.Nodes, "node1") + assert.Contains(t, dt.K8sResources.Nodes["node1"].Pods, "10.0.0.1") + assert.True(t, dt.K8sResources.Nodes["node1"].Pods["10.0.0.1"].InboundIdentities.Has("service1")) + + // Test removing an endpoint + input = UpdateK8sEndpointsInputType{ + InboundIdentity: "service1", + OldAddresses: map[string]string{"10.0.0.1": "node1"}, + NewAddresses: map[string]string{}, + } + + errs = dt.UpdateK8sEndpoints(input) + assert.Empty(t, errs) + assert.NotContains(t, dt.K8sResources.Nodes["node1"].Pods, "10.0.0.1") +} + +func TestUpdateK8sPod(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Nodes: map[string]Node{}, + }, + } + + // Test adding new egress assignment + input := UpdatePodInputType{ + PodOperation: ADD, + PublicOutboundIdentity: "public1", + Location: "node1", + Address: "10.0.0.1", + } + + err := dt.UpdateK8sPod(input) + assert.NoError(t, err) + assert.Contains(t, dt.K8sResources.Nodes, "node1") + assert.Contains(t, dt.K8sResources.Nodes["node1"].Pods, "10.0.0.1") + assert.Equal(t, "public1", dt.K8sResources.Nodes["node1"].Pods["10.0.0.1"].PublicOutboundIdentity) + + // Test removing egress assignment + input = UpdatePodInputType{ + PodOperation: REMOVE, + Location: "node1", + Address: "10.0.0.1", + } + + err = dt.UpdateK8sPod(input) + assert.NoError(t, err) + assert.NotContains(t, dt.K8sResources.Nodes["node1"].Pods, "10.0.0.1") +} + +func TestGetSyncLocationsAddresses(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Nodes: map[string]Node{ + "node1": { + Pods: map[string]Pod{ + "10.0.0.1": { + InboundIdentities: sets.NewString("service1"), + PublicOutboundIdentity: "public1", + }, + }, + }, + }, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1"), + NATGateways: sets.NewString("public1"), + Locations: map[string]NRPLocation{}, + }, + } + + result := dt.GetSyncLocationsAddresses() + + assert.Equal(t, PartialUpdate, result.Action) + assert.Len(t, result.Locations, 1) + + location := result.Locations["node1"] + assert.NotNil(t, location) + assert.Equal(t, FullUpdate, location.AddressUpdateAction) + assert.Len(t, location.Addresses, 1) + + var address string + for addr := range location.Addresses { + address = addr + break + } + + assert.Equal(t, "10.0.0.1", address) + assert.True(t, location.Addresses[address].ServiceRef.Has("service1")) + assert.True(t, location.Addresses[address].ServiceRef.Has("public1")) +} + +func TestOperation_String(t *testing.T) { + assert.Equal(t, "ADD", ADD.String()) + assert.Equal(t, "REMOVE", REMOVE.String()) + assert.Equal(t, "UPDATE", UPDATE.String()) +} + +func TestUpdateNRPLoadBalancers(t *testing.T) { + tests := []struct { + name string + initialState *DiffTracker + expectedNRP *sets.IgnoreCaseSet + }{ + { + name: "add services from K8s to NRP", + initialState: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("service1", "service2", "service3"), + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1"), + }, + }, + expectedNRP: sets.NewString("service1", "service2", "service3"), + }, + { + name: "no changes needed when K8s and NRP are in sync", + initialState: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("service1", "service2"), + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1", "service2"), + }, + }, + expectedNRP: sets.NewString("service1", "service2"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + syncServices := tt.initialState.GetSyncLoadBalancerServices() + tt.initialState.UpdateNRPLoadBalancers(syncServices) + + assert.True(t, tt.expectedNRP.Equals(tt.initialState.NRPResources.LoadBalancers), + "Expected NRP LoadBalancers %v, but got %v", + tt.expectedNRP.UnsortedList(), + tt.initialState.NRPResources.LoadBalancers.UnsortedList()) + }) + } +} + +func TestUpdateK8sEgress(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Egresses: sets.NewString(), + }, + } + + err := dt.UpdateK8sEgress(UpdateK8sResource{Operation: ADD, ID: "egress1"}) + assert.NoError(t, err) + assert.True(t, dt.K8sResources.Egresses.Has("egress1")) + + err = dt.UpdateK8sEgress(UpdateK8sResource{Operation: REMOVE, ID: "egress1"}) + assert.NoError(t, err) + assert.False(t, dt.K8sResources.Egresses.Has("egress1")) + + err = dt.UpdateK8sEgress(UpdateK8sResource{Operation: UPDATE, ID: "egress1"}) + assert.Error(t, err) + assert.Contains(t, err.Error(), "error - ResourceType=Egress, Operation=UPDATE and ID=egress1") +} + +func TestGetSyncNRPNATGateways(t *testing.T) { + tests := []struct { + name string + k8sEgresses []string + nrpNATGateways []string + expectedAdditions []string + expectedRemovals []string + }{ + { + name: "empty states", + k8sEgresses: []string{}, + nrpNATGateways: []string{}, + expectedAdditions: []string{}, + expectedRemovals: []string{}, + }, + { + name: "mixed state with additions and removals", + k8sEgresses: []string{"egress1", "egress3", "egress5"}, + nrpNATGateways: []string{"egress1", "egress2", "egress4"}, + expectedAdditions: []string{"egress3", "egress5"}, + expectedRemovals: []string{"egress2", "egress4"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Egresses: sets.NewString(tt.k8sEgresses...), + }, + NRPResources: NRP_State{ + NATGateways: sets.NewString(tt.nrpNATGateways...), + }, + } + + result := dt.GetSyncNRPNATGateways() + + assert.Equal(t, len(tt.expectedAdditions), result.Additions.Len()) + for _, addition := range tt.expectedAdditions { + assert.True(t, result.Additions.Has(addition)) + } + + assert.Equal(t, len(tt.expectedRemovals), result.Removals.Len()) + for _, removal := range tt.expectedRemovals { + assert.True(t, result.Removals.Has(removal)) + } + }) + } +} + +func TestUpdateNRPNATGateways(t *testing.T) { + dt := &DiffTracker{ + K8sResources: K8s_State{ + Egresses: sets.NewString("egress1", "egress2", "egress4"), + }, + NRPResources: NRP_State{ + NATGateways: sets.NewString("egress1", "egress3", "egress5"), + }, + } + + syncServices := dt.GetSyncNRPNATGateways() + dt.UpdateNRPNATGateways(syncServices) + + expectedNRP := sets.NewString("egress1", "egress2", "egress4") + assert.True(t, expectedNRP.Equals(dt.NRPResources.NATGateways), + "Expected NRP NATGateways %v, but got %v", + expectedNRP.UnsortedList(), + dt.NRPResources.NATGateways.UnsortedList()) +} + +func TestUpdateLocationsAddresses(t *testing.T) { + tests := []struct { + name string + initialState *DiffTracker + expectedNRP map[string]map[string][]string + }{ + { + name: "sync empty states", + initialState: &DiffTracker{ + K8sResources: K8s_State{Nodes: map[string]Node{}}, + NRPResources: NRP_State{Locations: map[string]NRPLocation{}}, + }, + expectedNRP: map[string]map[string][]string{}, + }, + { + name: "add new location and address", + initialState: &DiffTracker{ + K8sResources: K8s_State{ + Nodes: map[string]Node{ + "node1": { + Pods: map[string]Pod{ + "10.0.0.1": { + InboundIdentities: sets.NewString("service1"), + PublicOutboundIdentity: "public1", + }, + }, + }, + }, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1"), + NATGateways: sets.NewString("public1"), + Locations: map[string]NRPLocation{}, + }, + }, + expectedNRP: map[string]map[string][]string{ + "node1": {"10.0.0.1": {"service1", "public1"}}, + }, + }, + { + name: "complex case with multiple operations", + initialState: &DiffTracker{ + K8sResources: K8s_State{ + Nodes: map[string]Node{ + "node1": { + Pods: map[string]Pod{ + "10.0.0.1": { + InboundIdentities: sets.NewString("service1", "service3"), + PublicOutboundIdentity: "public1", + }, + }, + }, + "node3": { + Pods: map[string]Pod{ + "10.0.0.5": {InboundIdentities: sets.NewString("service5")}, + }, + }, + }, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1", "service2", "service3", "service4", "service5"), + NATGateways: sets.NewString("public1"), + Locations: map[string]NRPLocation{ + "node1": { + Addresses: map[string]NRPAddress{ + "10.0.0.1": {Services: sets.NewString("service1", "service2")}, + "10.0.0.2": {Services: sets.NewString("service4")}, + }, + }, + "node2": { + Addresses: map[string]NRPAddress{ + "10.0.0.3": {Services: sets.NewString("service3")}, + }, + }, + }, + }, + }, + expectedNRP: map[string]map[string][]string{ + "node1": {"10.0.0.1": {"service1", "service3", "public1"}}, + "node3": {"10.0.0.5": {"service5"}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + locationData := tt.initialState.GetSyncLocationsAddresses() + tt.initialState.UpdateLocationsAddresses(locationData) + + assert.Equal(t, len(tt.expectedNRP), len(tt.initialState.NRPResources.Locations)) + + for locName, expectedAddressMap := range tt.expectedNRP { + nrpLoc, exists := tt.initialState.NRPResources.Locations[locName] + assert.True(t, exists, "Expected location %s not found", locName) + assert.Equal(t, len(expectedAddressMap), len(nrpLoc.Addresses)) + + for addr, expectedServices := range expectedAddressMap { + nrpAddr, exists := nrpLoc.Addresses[addr] + assert.True(t, exists, "Expected address %s not found in %s", addr, locName) + assert.Equal(t, len(expectedServices), nrpAddr.Services.Len()) + for _, svc := range expectedServices { + assert.True(t, nrpAddr.Services.Has(svc)) + } + } + } + }) + } +} + +func TestGetSyncOperations(t *testing.T) { + tests := []struct { + name string + initialState *DiffTracker + expectedSyncStatus SyncStatus + }{ + { + name: "states already in sync", + initialState: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("service1"), + Egresses: sets.NewString("egress1"), + Nodes: map[string]Node{ + "node1": {Pods: map[string]Pod{ + "10.0.0.1": {InboundIdentities: sets.NewString("service1")}, + }}, + }, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1"), + NATGateways: sets.NewString("egress1"), + Locations: map[string]NRPLocation{ + "node1": {Addresses: map[string]NRPAddress{ + "10.0.0.1": {Services: sets.NewString("service1")}, + }}, + }, + }, + }, + expectedSyncStatus: ALREADY_IN_SYNC, + }, + { + name: "services out of sync", + initialState: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("service1", "service2"), + Egresses: sets.NewString("egress1"), + Nodes: map[string]Node{ + "node1": {Pods: map[string]Pod{ + "10.0.0.1": {InboundIdentities: sets.NewString("service1")}, + }}, + }, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("service1"), + NATGateways: sets.NewString("egress1"), + Locations: map[string]NRPLocation{ + "node1": {Addresses: map[string]NRPAddress{ + "10.0.0.1": {Services: sets.NewString("service1")}, + }}, + }, + }, + }, + expectedSyncStatus: SUCCESS, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.initialState.GetSyncOperations() + assert.Equal(t, tt.expectedSyncStatus, result.SyncStatus) + }) + } +} + +// Real Scenario: CloudProvider is down and K8s Cluster is subject to continuous updates. +// This test verifies if the DiffTracker is able to sync K8s Cluster and NRP correctly +// when there is a huge discrepancy between K8s Cluster and NRP. +func TestInitializeDiffTracker(t *testing.T) { + K8sResources := K8s_State{ + Services: sets.NewString("Service0", "Service1", "Service2"), + Egresses: sets.NewString("Egress0", "Egress1", "Egress2"), + Nodes: map[string]Node{ + "Node1": { + Pods: map[string]Pod{ + "Pod34": {InboundIdentities: sets.NewString("Service0"), PublicOutboundIdentity: ""}, + "Pod0": {InboundIdentities: sets.NewString("Service0"), PublicOutboundIdentity: "Egress0"}, + "Pod1": {InboundIdentities: sets.NewString("Service1", "Service2"), PublicOutboundIdentity: "Egress1"}, + "Pod3": {InboundIdentities: sets.NewString(), PublicOutboundIdentity: "Egress2"}, + }, + }, + "Node2": { + Pods: map[string]Pod{ + "Pod2": {InboundIdentities: sets.NewString("Service1"), PublicOutboundIdentity: "Egress2"}, + }, + }, + }, + } + + NRPResources := NRP_State{ + LoadBalancers: sets.NewString("Service0", "Service6", "Service5"), + NATGateways: sets.NewString("Egress0", "Egress6", "Egress5"), + Locations: map[string]NRPLocation{ + "Node1": { + Addresses: map[string]NRPAddress{ + "Pod34": {Services: sets.NewString("Service0", "Service5")}, + "Pod00": {Services: sets.NewString("Service6", "Egress5")}, + "Pod0": {Services: sets.NewString("Service0", "Egress0")}, + }, + }, + "Node3": { + Addresses: map[string]NRPAddress{ + "Pod4": {Services: sets.NewString("Service6", "Eggres6")}, + "Pod5": {Services: sets.NewString("Egress5")}, + }, + }, + }, + } + + config := Config{ + SubscriptionID: "test-subscription", + ResourceGroup: "test-rg", + Location: "eastus", + VNetName: "test-vnet", + ServiceGatewayResourceName: "test-sgw", + ServiceGatewayID: "/subscriptions/test-subscription/resourceGroups/test-rg/providers/Microsoft.Network/serviceGateways/test-sgw", + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockFactory := mock_azclient.NewMockClientFactory(ctrl) + mockKubeClient := fake.NewSimpleClientset() + diffTracker := InitializeDiffTracker(K8sResources, NRPResources, config, mockFactory, mockKubeClient) + syncOperations := diffTracker.GetSyncOperations() + + diffTracker.UpdateNRPLoadBalancers(syncOperations.LoadBalancerUpdates) + diffTracker.UpdateNRPNATGateways(syncOperations.NATGatewayUpdates) + diffTracker.UpdateLocationsAddresses(syncOperations.LocationData) + + assert.Equal(t, SUCCESS, syncOperations.SyncStatus) + + expectedDiffTracker := &DiffTracker{ + K8sResources: K8sResources, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("Service0", "Service1", "Service2"), + NATGateways: sets.NewString("Egress0", "Egress1", "Egress2"), + Locations: map[string]NRPLocation{ + "Node1": { + Addresses: map[string]NRPAddress{ + "Pod34": {Services: sets.NewString("Service0")}, + "Pod0": {Services: sets.NewString("Service0", "Egress0")}, + }, + }, + }, + }, + } + + assert.True(t, diffTracker.Equals(expectedDiffTracker), + "DiffTracker does not match expected state") +} diff --git a/pkg/provider/difftracker/k8s_state_updates.go b/pkg/provider/difftracker/k8s_state_updates.go new file mode 100644 index 0000000000..4a421e58c4 --- /dev/null +++ b/pkg/provider/difftracker/k8s_state_updates.go @@ -0,0 +1,252 @@ +package difftracker + +import ( + "fmt" + "strings" + + "k8s.io/klog/v2" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" +) + +const ( + ResourceTypeService = "Service" + ResourceTypeEgress = "Egress" +) + +func updateK8Resource(input UpdateK8sResource, set *utilsets.IgnoreCaseSet, resourceType string) error { + if input.ID == "" { + return fmt.Errorf("%s: empty ID not allowed", resourceType) + } + + switch input.Operation { + case ADD: + set.Insert(input.ID) + case REMOVE: + set.Delete(input.ID) + default: + return fmt.Errorf("error - ResourceType=%s, Operation=%s and ID=%s", resourceType, input.Operation, input.ID) + } + return nil +} + +func (dt *DiffTracker) UpdateK8sService(input UpdateK8sResource) error { + dt.mu.Lock() + defer dt.mu.Unlock() + + return updateK8Resource(input, dt.K8sResources.Services, ResourceTypeService) +} + +func (dt *DiffTracker) UpdateK8sEgress(input UpdateK8sResource) error { + dt.mu.Lock() + defer dt.mu.Unlock() + + return updateK8Resource(input, dt.K8sResources.Egresses, ResourceTypeEgress) +} + +// updateK8sEndpointsLocked updates K8s endpoints state. Assumes lock is already held. +func (dt *DiffTracker) updateK8sEndpointsLocked(input UpdateK8sEndpointsInputType) []error { + var errs []error + for address, location := range input.NewAddresses { + + if location == "" { + errs = append(errs, fmt.Errorf("error UpdateK8sEndpoints, address=%s does not have a node associated", address)) + continue + } + + if _, exists := input.OldAddresses[address]; exists { + continue + } + + nodeState, exists := dt.K8sResources.Nodes[location] + if !exists { + nodeState = Node{ + Pods: make(map[string]Pod), + } + dt.K8sResources.Nodes[location] = nodeState + } + + pod, exists := nodeState.Pods[address] + if !exists { + pod = Pod{ + InboundIdentities: utilsets.NewString(), + } + nodeState.Pods[address] = pod + } + pod.InboundIdentities.Insert(input.InboundIdentity) + } + + for address, location := range input.OldAddresses { + if _, exists := input.NewAddresses[address]; exists { + continue + } + + if location == "" { + errs = append(errs, fmt.Errorf("error UpdateK8sEndpoints, address=%s does not have a node associated", address)) + } + + node, nodeExists := dt.K8sResources.Nodes[location] + if !nodeExists { + continue + } + + pod, podExists := node.Pods[address] + if !podExists { + continue + } + + pod.InboundIdentities.Delete(input.InboundIdentity) + + if !pod.HasIdentities() { + delete(node.Pods, address) + if !node.HasPods() { + delete(dt.K8sResources.Nodes, location) + } + } + } + + return errs +} + +// UpdateK8sEndpoints is a public wrapper that acquires lock before calling updateK8sEndpointsLocked. +func (dt *DiffTracker) UpdateK8sEndpoints(input UpdateK8sEndpointsInputType) []error { + dt.mu.Lock() + defer dt.mu.Unlock() + return dt.updateK8sEndpointsLocked(input) +} + +func (dt *DiffTracker) addOrUpdatePod(input UpdatePodInputType) error { + node, exists := dt.K8sResources.Nodes[input.Location] + if !exists { + node = Node{Pods: make(map[string]Pod)} + dt.K8sResources.Nodes[input.Location] = node + } + + pod, exists := node.Pods[input.Address] + if !exists { + pod = Pod{InboundIdentities: utilsets.NewString()} + } + + pod.PublicOutboundIdentity = input.PublicOutboundIdentity + node.Pods[input.Address] = pod + + return nil +} + +// removePod removes a pod from K8s state. Returns true if the pod was actually removed, +// false if it didn't exist (already removed by a previous call). +func (dt *DiffTracker) removePod(input UpdatePodInputType) (removed bool, err error) { + node, exists := dt.K8sResources.Nodes[input.Location] + if !exists { + return false, nil + } + + // Check if pod exists before removing + if _, podExists := node.Pods[input.Address]; !podExists { + return false, nil + } + + delete(node.Pods, input.Address) + if !node.HasPods() { + delete(dt.K8sResources.Nodes, input.Location) + } + + return true, nil +} + +// updateK8sPodLocked updates K8s pod state. Assumes lock is already held. +func (dt *DiffTracker) updateK8sPodLocked(input UpdatePodInputType) error { + switch input.PodOperation { + case ADD, UPDATE: + // Check if pod already exists with the same outbound identity + // This prevents double-counting when pod informer fires AddFunc for pods + // that were already counted during initialization + alreadyExists := false + if node, nodeExists := dt.K8sResources.Nodes[input.Location]; nodeExists { + if pod, podExists := node.Pods[input.Address]; podExists { + if pod.PublicOutboundIdentity == input.PublicOutboundIdentity { + alreadyExists = true + klog.V(4).Infof("updateK8sPodLocked: Pod at %s:%s already exists for service %s, skipping counter increment", + input.Location, input.Address, input.PublicOutboundIdentity) + } + } + } + + // Only increment counter if pod doesn't already exist + if !alreadyExists { + counter := 0 + if val, ok := dt.LocalServiceNameToNRPServiceMap.Load(strings.ToLower(input.PublicOutboundIdentity)); ok { + counter = val.(int) + } + dt.LocalServiceNameToNRPServiceMap.Store(strings.ToLower(input.PublicOutboundIdentity), counter+1) + } + return dt.addOrUpdatePod(input) + case REMOVE: + // First, try to remove the pod from K8s state + // This returns false if the pod doesn't exist (duplicate removal) + removed, err := dt.removePod(input) + if err != nil { + return err + } + if !removed { + // Pod didn't exist - this is a duplicate removal, don't decrement counter + klog.V(4).Infof("updateK8sPodLocked: Pod at %s:%s was already removed (duplicate delete), skipping counter decrement", + input.Location, input.Address) + return nil + } + + // Pod was actually removed, now decrement the counter + if val, ok := dt.LocalServiceNameToNRPServiceMap.Load(strings.ToLower(input.PublicOutboundIdentity)); ok { + counter := val.(int) + if counter <= 0 { + return fmt.Errorf("error - PublicOutboundIdentity %s has a negative count: %d", input.PublicOutboundIdentity, counter) + } + if counter == 1 { + dt.LocalServiceNameToNRPServiceMap.Delete(strings.ToLower(input.PublicOutboundIdentity)) + } else { + dt.LocalServiceNameToNRPServiceMap.Store(strings.ToLower(input.PublicOutboundIdentity), counter-1) + } + } + return nil + default: + return fmt.Errorf("invalid pod operation: %s for pod at %s:%s", + input.PodOperation, input.Location, input.Address) + } +} + +// UpdateK8sPod is a public wrapper that acquires lock before calling updateK8sPodLocked. +func (dt *DiffTracker) UpdateK8sPod(input UpdatePodInputType) error { + dt.mu.Lock() + defer dt.mu.Unlock() + return dt.updateK8sPodLocked(input) +} + +// removeServiceFromK8sStateLocked removes a service from all pod identities in K8s state. +// This is used during service deletion to proactively clear location/address references +// so the LocationsUpdater can sync the removal to NRP. +// Assumes lock is already held. +func (dt *DiffTracker) removeServiceFromK8sStateLocked(serviceUID string, isInbound bool) { + for nodeIP, node := range dt.K8sResources.Nodes { + for podIP, pod := range node.Pods { + if isInbound { + // Remove from inbound identities + if pod.InboundIdentities != nil && pod.InboundIdentities.Has(serviceUID) { + pod.InboundIdentities.Delete(serviceUID) + } + } else { + // Clear outbound identity if it matches + if strings.EqualFold(pod.PublicOutboundIdentity, serviceUID) { + pod.PublicOutboundIdentity = "" + node.Pods[podIP] = pod + } + } + + // Clean up empty pods and nodes + if !pod.HasIdentities() { + delete(node.Pods, podIP) + if !node.HasPods() { + delete(dt.K8sResources.Nodes, nodeIP) + } + } + } + } +} diff --git a/pkg/provider/difftracker/nrp_state_updates.go b/pkg/provider/difftracker/nrp_state_updates.go new file mode 100644 index 0000000000..91f05841b5 --- /dev/null +++ b/pkg/provider/difftracker/nrp_state_updates.go @@ -0,0 +1,108 @@ +package difftracker + +import ( + "k8s.io/klog/v2" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" +) + +func (dt *DiffTracker) UpdateNRPLoadBalancers(syncServicesReturnType SyncServicesReturnType) { + dt.mu.Lock() + defer dt.mu.Unlock() + + for _, service := range syncServicesReturnType.Additions.UnsortedList() { + dt.NRPResources.LoadBalancers.Insert(service) + klog.V(2).Infof("UpdateNRPLoadBalancers: Added service %s to NRP LoadBalancers\n", service) + } + + for _, service := range syncServicesReturnType.Removals.UnsortedList() { + dt.NRPResources.LoadBalancers.Delete(service) + klog.V(2).Infof("UpdateNRPLoadBalancers: Removed service %s from NRP LoadBalancers\n", service) + } +} + +func (dt *DiffTracker) UpdateNRPNATGateways(syncServicesReturnType SyncServicesReturnType) { + dt.mu.Lock() + defer dt.mu.Unlock() + + for _, service := range syncServicesReturnType.Additions.UnsortedList() { + dt.NRPResources.NATGateways.Insert(service) + klog.V(2).Infof("UpdateNRPNATGateways: Added service %s to NRP NATGateways\n", service) + } + + for _, service := range syncServicesReturnType.Removals.UnsortedList() { + dt.NRPResources.NATGateways.Delete(service) + klog.V(2).Infof("UpdateNRPNATGateways: Removed service %s from NRP NATGateways\n", service) + } +} + +func (dt *DiffTracker) UpdateLocationsAddresses(locationData LocationData) { + dt.mu.Lock() + defer dt.mu.Unlock() + + for locationKey, locationValue := range locationData.Locations { + // Remove empty locations + if len(locationValue.Addresses) == 0 { + delete(dt.NRPResources.Locations, locationKey) + continue + } + + // Get or create location + nrpLocation, exists := dt.NRPResources.Locations[locationKey] + isFullUpdate := !exists || locationValue.AddressUpdateAction == FullUpdate + + // For full updates, start with a fresh location + if isFullUpdate { + nrpLocation = NRPLocation{ + Addresses: make(map[string]NRPAddress), + } + } + + // Process address updates + for addressKey, addressValue := range locationValue.Addresses { + // Remove empty addresses + if addressValue.ServiceRef.Len() == 0 { + delete(nrpLocation.Addresses, addressKey) + continue + } + + // Create new service references set + serviceRefs := createServiceRefsFromAddress(addressValue) + + // For full update or when address doesn't exist, add new address + if isFullUpdate || !addressExists(nrpLocation, addressKey) { + nrpLocation.Addresses[addressKey] = NRPAddress{Services: serviceRefs} + continue + } + + // For partial updates with existing address + existingAddress := nrpLocation.Addresses[addressKey] + if !serviceRefs.Equals(existingAddress.Services) { + nrpLocation.Addresses[addressKey] = NRPAddress{ + Services: serviceRefs, + } + } + } + + // Save location if it has addresses, otherwise delete it + if len(nrpLocation.Addresses) > 0 { + dt.NRPResources.Locations[locationKey] = nrpLocation + } else { + delete(dt.NRPResources.Locations, locationKey) + } + } +} + +// Helper function to check if address exists in a location +func addressExists(location NRPLocation, addressKey string) bool { + _, exists := location.Addresses[addressKey] + return exists +} + +// Helper function to create service references from an address +func createServiceRefsFromAddress(addressValue Address) *utilsets.IgnoreCaseSet { + serviceRefs := utilsets.NewString() + for _, service := range addressValue.ServiceRef.UnsortedList() { + serviceRefs.Insert(service) + } + return serviceRefs +} diff --git a/pkg/provider/difftracker/sync_operations.go b/pkg/provider/difftracker/sync_operations.go new file mode 100644 index 0000000000..6b18b4d4d4 --- /dev/null +++ b/pkg/provider/difftracker/sync_operations.go @@ -0,0 +1,191 @@ +package difftracker + +import ( + "k8s.io/klog/v2" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" +) + +// GetServicesToSync handles the synchronization of services between K8s and NRP +func GetServicesToSync(k8sServices, Services *utilsets.IgnoreCaseSet) SyncServicesReturnType { + klog.Infof("GetServicesToSync: K8s services (%d): %v", k8sServices.Len(), k8sServices.UnsortedList()) + klog.Infof("GetServicesToSync: NRP services (%d): %v", Services.Len(), Services.UnsortedList()) + + syncServices := SyncServicesReturnType{ + Additions: utilsets.NewString(), + Removals: utilsets.NewString(), + } + + for _, service := range k8sServices.UnsortedList() { + if Services.Has(service) { + continue + } + syncServices.Additions.Insert(service) + klog.Infof("GetServicesToSync: Added service %s to additions", service) + } + + for _, service := range Services.UnsortedList() { + if k8sServices.Has(service) { + continue + } + syncServices.Removals.Insert(service) + klog.Infof("GetServicesToSync: Added service %s to removals", service) + } + + klog.Infof("GetServicesToSync: Result - Additions: %d, Removals: %d", syncServices.Additions.Len(), syncServices.Removals.Len()) + return syncServices +} + +func (dt *DiffTracker) GetSyncLoadBalancerServices() SyncServicesReturnType { + dt.mu.Lock() + defer dt.mu.Unlock() + + return GetServicesToSync(dt.K8sResources.Services, dt.NRPResources.LoadBalancers) +} + +func (dt *DiffTracker) GetSyncNRPNATGateways() SyncServicesReturnType { + dt.mu.Lock() + defer dt.mu.Unlock() + + return GetServicesToSync(dt.K8sResources.Egresses, dt.NRPResources.NATGateways) +} + +func (dt *DiffTracker) GetSyncLocationsAddresses() LocationData { + dt.mu.Lock() + defer dt.mu.Unlock() + + result := LocationData{ + Action: PartialUpdate, + Locations: make(map[string]Location), + } + + // Iterate over all nodes in the K8s state + for nodeIp, node := range dt.K8sResources.Nodes { + nrpLocation, locationExists := dt.NRPResources.Locations[nodeIp] + location := initializeLocation(locationExists) + locationUpdated := false + + for address, pod := range node.Pods { + // Filter services: only include services that exist in NRP + serviceRef := dt.createServiceRefFiltered(pod) + + // Check if address exists in NRP and if service list changed + nrpAddressData, addressExists := nrpLocation.Addresses[address] + + // Skip this address if: + // 1. No ready services AND address doesn't exist in NRP (nothing to sync) + // 2. ServiceRef matches what's already in NRP (no change) + if serviceRef.Len() == 0 && !addressExists { + continue + } + + if addressExists && serviceRef.Equals(nrpAddressData.Services) { + continue + } + + // ServiceRef changed (or address is new) - need to sync + addressData := Address{ServiceRef: serviceRef} + location.Addresses[address] = addressData + locationUpdated = true + } + if locationUpdated { + result.Locations[nodeIp] = location + } + } + + // Iterate over all locations in the NRP state + for location, nrpLocation := range dt.NRPResources.Locations { + node, exists := dt.K8sResources.Nodes[location] + if !exists { + result.Locations[location] = Location{ + AddressUpdateAction: PartialUpdate, + Addresses: make(map[string]Address), + } + } else { + locationData := findLocationData(result, location) + if locationData == nil { + locationData = &Location{ + AddressUpdateAction: PartialUpdate, + Addresses: make(map[string]Address), + } + } + for address := range nrpLocation.Addresses { + if _, exists := node.Pods[address]; !exists { + addressData := Address{ServiceRef: utilsets.NewString()} + locationData.Addresses[address] = addressData + result.Locations[location] = *locationData + } + } + } + } + return result +} + +// Helper function to initialize Location based on existence in NRP +func initializeLocation(exists bool) Location { + if !exists { + return Location{ + AddressUpdateAction: FullUpdate, + Addresses: make(map[string]Address), + } + } + return Location{ + AddressUpdateAction: PartialUpdate, + Addresses: make(map[string]Address), + } +} + +// createServiceRefFiltered creates ServiceRef but only includes services that exist in NRP. +// Must be called with dt.mu held. +func (dt *DiffTracker) createServiceRefFiltered(pod Pod) *utilsets.IgnoreCaseSet { + serviceRef := utilsets.NewString() + + // Check inbound services (LoadBalancers) + for _, serviceUID := range pod.InboundIdentities.UnsortedList() { + if dt.isServiceReady(serviceUID, true) { + serviceRef.Insert(serviceUID) + } + } + + // Check outbound service (NAT Gateway) + if pod.PublicOutboundIdentity != "" { + if dt.isServiceReady(pod.PublicOutboundIdentity, false) { + serviceRef.Insert(pod.PublicOutboundIdentity) + } + } + + return serviceRef +} + +// isServiceReady checks if a service is ready for location sync. +// Returns true if the service exists in NRP. +// Must be called with dt.mu held. +func (dt *DiffTracker) isServiceReady(serviceUID string, isInbound bool) bool { + if isInbound { + return dt.NRPResources.LoadBalancers.Has(serviceUID) + } + return dt.NRPResources.NATGateways.Has(serviceUID) +} + +// Helper function to find LocationData in result +func findLocationData(result LocationData, location string) *Location { + for keyCurrentLocation := range result.Locations { + if keyCurrentLocation == location { + loc := result.Locations[keyCurrentLocation] + return &loc + } + } + return nil +} + +func (dt *DiffTracker) GetSyncOperations() *SyncDiffTrackerReturnType { + if dt.DeepEqual() { + return &SyncDiffTrackerReturnType{SyncStatus: ALREADY_IN_SYNC} + } + + return &SyncDiffTrackerReturnType{ + SyncStatus: SUCCESS, + LoadBalancerUpdates: dt.GetSyncLoadBalancerServices(), + NATGatewayUpdates: dt.GetSyncNRPNATGateways(), + LocationData: dt.GetSyncLocationsAddresses(), + } +} diff --git a/pkg/provider/difftracker/types.go b/pkg/provider/difftracker/types.go new file mode 100644 index 0000000000..66c7f14e75 --- /dev/null +++ b/pkg/provider/difftracker/types.go @@ -0,0 +1,136 @@ +package difftracker + +import ( + "sync" + + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/cloud-provider-azure/pkg/azclient" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" +) + +// ================================================================================================ +// ENUMS +// ================================================================================================ +type Operation int + +const ( + ADD Operation = iota + REMOVE + UPDATE +) + +type UpdateAction int + +const ( + PartialUpdate UpdateAction = iota + FullUpdate +) + +type SyncStatus int + +const ( + ALREADY_IN_SYNC SyncStatus = iota + SUCCESS +) + +// -------------------------------------------------------------------------------- +// DiffTracker keeps track of the state of the K8s cluster and NRP +// -------------------------------------------------------------------------------- +type NRPAddress struct { + Services *utilsets.IgnoreCaseSet // all inbound and outbound identities +} + +type NRPLocation struct { + Addresses map[string]NRPAddress +} + +type NRP_State struct { + LoadBalancers *utilsets.IgnoreCaseSet + NATGateways *utilsets.IgnoreCaseSet + Locations map[string]NRPLocation +} + +type Pod struct { + InboundIdentities *utilsets.IgnoreCaseSet + PublicOutboundIdentity string +} + +type Node struct { + Pods map[string]Pod +} + +type K8s_State struct { + Services *utilsets.IgnoreCaseSet + Egresses *utilsets.IgnoreCaseSet + Nodes map[string]Node +} + +// DiffTracker is the main struct that contains the state of the K8s and NRP services +type DiffTracker struct { + mu sync.Mutex // Protects concurrent access to DiffTracker + + K8sResources K8s_State + NRPResources NRP_State + + LocalServiceNameToNRPServiceMap sync.Map + + // Configuration and clients + config Config + networkClientFactory azclient.ClientFactory + kubeClient kubernetes.Interface +} + +// -------------------------------------------------------------------------------- +// Types that are used while events are received and processed in order to update K8s state +// -------------------------------------------------------------------------------- + +// UpdateK8sResource represents input for K8s service or egress updates +type UpdateK8sResource struct { + Operation Operation + ID string +} + +// UpdateK8sEndpointsInputType represents input for K8s endpoints updates +type UpdateK8sEndpointsInputType struct { + InboundIdentity string + OldAddresses map[string]string // address -> location + NewAddresses map[string]string // address -> location +} + +// UpdatePodInputType represents input for K8s pod updates (egress assignments) +type UpdatePodInputType struct { + PodOperation Operation + PublicOutboundIdentity string + Location string + Address string +} + +// -------------------------------------------------------------------------------- +// Types that are used while syncing NRP state to K8s state +// -------------------------------------------------------------------------------- +type Address struct { + ServiceRef *utilsets.IgnoreCaseSet +} + +// Location uses a map for Addresses +type Location struct { + AddressUpdateAction UpdateAction + Addresses map[string]Address // key is Address.Address +} + +type LocationData struct { + Action UpdateAction + Locations map[string]Location // key is Location.Location +} + +type SyncServicesReturnType struct { + Additions *utilsets.IgnoreCaseSet + Removals *utilsets.IgnoreCaseSet +} + +type SyncDiffTrackerReturnType struct { + SyncStatus SyncStatus + LoadBalancerUpdates SyncServicesReturnType + NATGatewayUpdates SyncServicesReturnType + LocationData LocationData +} diff --git a/pkg/provider/difftracker/util.go b/pkg/provider/difftracker/util.go new file mode 100644 index 0000000000..eb853bfaa4 --- /dev/null +++ b/pkg/provider/difftracker/util.go @@ -0,0 +1,302 @@ +package difftracker + +import ( + "encoding/json" + "fmt" + + "k8s.io/klog/v2" +) + +func (operation Operation) String() string { + return [...]string{"ADD", "REMOVE", "UPDATE"}[operation] +} + +func (operation Operation) MarshalJSON() ([]byte, error) { + return json.Marshal(operation.String()) +} + +func (updateAction UpdateAction) String() string { + return [...]string{"PartialUpdate", "FullUpdate"}[updateAction] +} + +func (updateAction UpdateAction) MarshalJSON() ([]byte, error) { + return json.Marshal(updateAction.String()) +} + +func (updateAction *UpdateAction) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + switch s { + case "PartialUpdate": + *updateAction = PartialUpdate + case "FullUpdate": + *updateAction = FullUpdate + default: + return fmt.Errorf("unknown UpdateAction: %q", s) + } + + return nil +} + +func (syncStatus SyncStatus) String() string { + return [...]string{"ALREADY_IN_SYNC", "SUCCESS"}[syncStatus] +} + +func (syncStatus SyncStatus) MarshalJSON() ([]byte, error) { + return json.Marshal(syncStatus.String()) +} + +func (node *Node) HasPods() bool { return len(node.Pods) > 0 } + +func (pod *Pod) HasIdentities() bool { + return pod.InboundIdentities.Len() > 0 || pod.PublicOutboundIdentity != "" +} + +// DeepEqual compares the K8s and NRP states to check if they are in sync +func (dt *DiffTracker) DeepEqual() bool { + klog.Infof("DeepEqual: Checking equality - K8s Services=%d, NRP LoadBalancers=%d, K8s Egresses=%d, NRP NATGateways=%d", + dt.K8sResources.Services.Len(), dt.NRPResources.LoadBalancers.Len(), + dt.K8sResources.Egresses.Len(), dt.NRPResources.NATGateways.Len()) + + // Compare Services with LoadBalancers + if dt.K8sResources.Services.Len() != dt.NRPResources.LoadBalancers.Len() { + klog.Infof("DeepEqual: Services and LoadBalancers length mismatch") + return false + } + for _, service := range dt.K8sResources.Services.UnsortedList() { + if !dt.NRPResources.LoadBalancers.Has(service) { + klog.Infof("DeepEqual: Service %s not found in LoadBalancers", service) + return false + } + } + for _, service := range dt.NRPResources.LoadBalancers.UnsortedList() { + if !dt.K8sResources.Services.Has(service) { + klog.Infof("DeepEqual: LoadBalancer %s not found in Services", service) + return false + } + } + + // Compare Egresses with NATGateways + if dt.K8sResources.Egresses.Len() != dt.NRPResources.NATGateways.Len() { + klog.Infof("DeepEqual: Egresses and NATGateways length mismatch") + return false + } + for _, egress := range dt.K8sResources.Egresses.UnsortedList() { + if !dt.NRPResources.NATGateways.Has(egress) { + klog.Infof("DeepEqual: Egress %s not found in NATGateways", egress) + return false + } + } + for _, egress := range dt.NRPResources.NATGateways.UnsortedList() { + if !dt.K8sResources.Egresses.Has(egress) { + klog.Infof("DeepEqual: NATGateway %s not found in Egresses", egress) + return false + } + } + + // Compare Nodes with Locations + if len(dt.K8sResources.Nodes) != len(dt.NRPResources.Locations) { + klog.V(2).Infof("DeepEqual: Nodes and Locations length mismatch") + return false + } + for nodeKey, node := range dt.K8sResources.Nodes { + nrpLocation, exists := dt.NRPResources.Locations[nodeKey] + if !exists { + klog.V(2).Infof("DeepEqual: Node %s not found in Locations\n", nodeKey) + return false + } + + // Compare Pods with Addresses + if len(node.Pods) != len(nrpLocation.Addresses) { + klog.V(2).Infof("DeepEqual: Pods and Addresses length mismatch for node %s\n", nodeKey) + return false + } + for podKey, pod := range node.Pods { + nrpAddress, exists := nrpLocation.Addresses[podKey] + if !exists { + klog.V(2).Infof("DeepEqual: Pod %s not found in Addresses for node %s\n", podKey, nodeKey) + return false + } + + // Compare [...InboundIdentities, PublicOutboundIdentity] with Services + combinedIdentities := []string{} + combinedIdentities = append(combinedIdentities, pod.InboundIdentities.UnsortedList()...) + if pod.PublicOutboundIdentity != "" { + combinedIdentities = append(combinedIdentities, pod.PublicOutboundIdentity) + } + + if len(combinedIdentities) != nrpAddress.Services.Len() { + klog.V(2).Infof("DeepEqual: Combined identities length mismatch for pod %s in node %s\n", podKey, nodeKey) + return false + } + + for _, identity := range combinedIdentities { + if !nrpAddress.Services.Has(identity) { + klog.V(2).Infof("DeepEqual: Identity %s not found in Services for pod %s in node %s\n", identity, podKey, nodeKey) + return false + } + } + } + } + + return true +} + +func (syncServicesReturnType *SyncServicesReturnType) Equals(other *SyncServicesReturnType) bool { + return syncServicesReturnType.Additions.Equals(other.Additions) && syncServicesReturnType.Removals.Equals(other.Removals) +} + +// Equals compares two LocationData objects for equality +func (ld *LocationData) Equals(other *LocationData) bool { + if ld.Action != other.Action { + return false + } + + if len(ld.Locations) != len(other.Locations) { + return false + } + + for locName, location := range ld.Locations { + otherLocation, exists := other.Locations[locName] + if !exists { + return false + } + + if location.AddressUpdateAction != otherLocation.AddressUpdateAction { + return false + } + + if len(location.Addresses) != len(otherLocation.Addresses) { + return false + } + + for addrName, address := range location.Addresses { + otherAddress, exists := otherLocation.Addresses[addrName] + if !exists { + return false + } + + if !address.ServiceRef.Equals(otherAddress.ServiceRef) { + return false + } + } + } + + return true +} + +// Equals compares two SyncDiffTrackerReturnType objects for equality +func (sdts *SyncDiffTrackerReturnType) Equals(other *SyncDiffTrackerReturnType) bool { + if sdts.SyncStatus != other.SyncStatus { + return false + } + + if !sdts.LoadBalancerUpdates.Additions.Equals(other.LoadBalancerUpdates.Additions) { + return false + } + + if !sdts.LoadBalancerUpdates.Removals.Equals(other.LoadBalancerUpdates.Removals) { + return false + } + + if !sdts.NATGatewayUpdates.Additions.Equals(other.NATGatewayUpdates.Additions) { + return false + } + + if !sdts.NATGatewayUpdates.Removals.Equals(other.NATGatewayUpdates.Removals) { + return false + } + + if !sdts.LocationData.Equals(&other.LocationData) { + return false + } + + return true +} + +// Equals compares two DiffTracker objects for equality +func (dt *DiffTracker) Equals(other *DiffTracker) bool { + dt.mu.Lock() + defer dt.mu.Unlock() + + other.mu.Lock() + defer other.mu.Unlock() + + if !dt.K8sResources.Services.Equals(other.K8sResources.Services) { + return false + } + + if !dt.K8sResources.Egresses.Equals(other.K8sResources.Egresses) { + return false + } + + if len(dt.K8sResources.Nodes) != len(other.K8sResources.Nodes) { + return false + } + + for nodeKey, node := range dt.K8sResources.Nodes { + otherNode, exists := other.K8sResources.Nodes[nodeKey] + if !exists { + return false + } + + if len(node.Pods) != len(otherNode.Pods) { + return false + } + + for podKey, pod := range node.Pods { + otherPod, exists := otherNode.Pods[podKey] + if !exists { + return false + } + + if !pod.InboundIdentities.Equals(otherPod.InboundIdentities) { + return false + } + + if pod.PublicOutboundIdentity != otherPod.PublicOutboundIdentity { + return false + } + } + } + + // Compare NRP state + if !dt.NRPResources.LoadBalancers.Equals(other.NRPResources.LoadBalancers) { + return false + } + + if !dt.NRPResources.NATGateways.Equals(other.NRPResources.NATGateways) { + return false + } + + if len(dt.NRPResources.Locations) != len(other.NRPResources.Locations) { + return false + } + + for location, nrpLocation := range dt.NRPResources.Locations { + otherNrpLocation, exists := other.NRPResources.Locations[location] + if !exists { + return false + } + + if len(nrpLocation.Addresses) != len(otherNrpLocation.Addresses) { + return false + } + + for address, nrpAddress := range nrpLocation.Addresses { + otherNrpAddress, exists := otherNrpLocation.Addresses[address] + if !exists { + return false + } + + if !nrpAddress.Services.Equals(otherNrpAddress.Services) { + return false + } + } + } + + return true +} diff --git a/pkg/provider/difftracker/util_test.go b/pkg/provider/difftracker/util_test.go new file mode 100644 index 0000000000..9ffce2621f --- /dev/null +++ b/pkg/provider/difftracker/util_test.go @@ -0,0 +1,648 @@ +package difftracker + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" +) + +// TestOperationStringAndJSON tests Operation String() and MarshalJSON() +func TestOperationStringAndJSON(t *testing.T) { + tests := []struct { + name string + op Operation + expected string + }{ + {"ADD operation", ADD, "ADD"}, + {"REMOVE operation", REMOVE, "REMOVE"}, + {"UPDATE operation", UPDATE, "UPDATE"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test String() + assert.Equal(t, tt.expected, tt.op.String()) + + // Test MarshalJSON() + data, err := tt.op.MarshalJSON() + assert.NoError(t, err) + assert.Equal(t, `"`+tt.expected+`"`, string(data)) + }) + } +} + +// TestUpdateActionStringAndJSON tests UpdateAction String(), MarshalJSON(), and UnmarshalJSON() +func TestUpdateActionStringAndJSON(t *testing.T) { + tests := []struct { + name string + action UpdateAction + expected string + }{ + {"PartialUpdate", PartialUpdate, "PartialUpdate"}, + {"FullUpdate", FullUpdate, "FullUpdate"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test String() + assert.Equal(t, tt.expected, tt.action.String()) + + // Test MarshalJSON() + data, err := tt.action.MarshalJSON() + assert.NoError(t, err) + assert.Equal(t, `"`+tt.expected+`"`, string(data)) + + // Test UnmarshalJSON() round-trip + var unmarshaled UpdateAction + err = unmarshaled.UnmarshalJSON(data) + assert.NoError(t, err) + assert.Equal(t, tt.action, unmarshaled) + }) + } +} + +// TestUpdateActionUnmarshalJSON_InvalidValue tests error handling +func TestUpdateActionUnmarshalJSON_InvalidValue(t *testing.T) { + var action UpdateAction + err := action.UnmarshalJSON([]byte(`"InvalidAction"`)) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unknown UpdateAction") +} + +// TestUpdateActionUnmarshalJSON_InvalidJSON tests JSON parsing errors +func TestUpdateActionUnmarshalJSON_InvalidJSON(t *testing.T) { + var action UpdateAction + err := action.UnmarshalJSON([]byte(`{invalid json`)) + assert.Error(t, err) +} + +// TestSyncStatusStringAndJSON tests SyncStatus String() and MarshalJSON() +func TestSyncStatusStringAndJSON(t *testing.T) { + tests := []struct { + name string + status SyncStatus + expected string + }{ + {"ALREADY_IN_SYNC", ALREADY_IN_SYNC, "ALREADY_IN_SYNC"}, + {"SUCCESS", SUCCESS, "SUCCESS"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Test String() + assert.Equal(t, tt.expected, tt.status.String()) + + // Test MarshalJSON() + data, err := tt.status.MarshalJSON() + assert.NoError(t, err) + assert.Equal(t, `"`+tt.expected+`"`, string(data)) + }) + } +} + +// TestNodeHasPods tests Node.HasPods() +func TestNodeHasPods(t *testing.T) { + tests := []struct { + name string + node Node + expected bool + }{ + { + name: "node with pods", + node: Node{Pods: map[string]Pod{"pod1": {}}}, + expected: true, + }, + { + name: "node without pods", + node: Node{Pods: map[string]Pod{}}, + expected: false, + }, + { + name: "node with nil pods map", + node: Node{Pods: nil}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.node.HasPods()) + }) + } +} + +// TestPodHasIdentities tests Pod.HasIdentities() +func TestPodHasIdentities(t *testing.T) { + tests := []struct { + name string + pod Pod + expected bool + }{ + { + name: "pod with inbound identities", + pod: Pod{InboundIdentities: sets.NewString("id1", "id2")}, + expected: true, + }, + { + name: "pod with public outbound identity", + pod: Pod{PublicOutboundIdentity: "outbound-id"}, + expected: true, + }, + { + name: "pod with both identities", + pod: Pod{InboundIdentities: sets.NewString("id1"), PublicOutboundIdentity: "outbound-id"}, + expected: true, + }, + { + name: "pod without identities", + pod: Pod{InboundIdentities: sets.NewString(), PublicOutboundIdentity: ""}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.pod.HasIdentities()) + }) + } +} + +// TestDeepEqual tests DiffTracker.DeepEqual() +func TestDeepEqual(t *testing.T) { + tests := []struct { + name string + dt *DiffTracker + expected bool + }{ + { + name: "in sync - matching services and load balancers", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("svc1", "svc2"), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("svc1", "svc2"), + NATGateways: sets.NewString(), + Locations: map[string]NRPLocation{}, + }, + }, + expected: true, + }, + { + name: "not in sync - service count mismatch", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("svc1"), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("svc1", "svc2"), + NATGateways: sets.NewString(), + Locations: map[string]NRPLocation{}, + }, + }, + expected: false, + }, + { + name: "not in sync - service name mismatch", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("svc1"), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString("svc2"), + NATGateways: sets.NewString(), + Locations: map[string]NRPLocation{}, + }, + }, + expected: false, + }, + { + name: "in sync - matching egresses and NAT gateways", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString("egress1", "egress2"), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString(), + NATGateways: sets.NewString("egress1", "egress2"), + Locations: map[string]NRPLocation{}, + }, + }, + expected: true, + }, + { + name: "not in sync - egress count mismatch", + dt: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString("egress1"), + Nodes: map[string]Node{}, + }, + NRPResources: NRP_State{ + LoadBalancers: sets.NewString(), + NATGateways: sets.NewString("egress1", "egress2"), + Locations: map[string]NRPLocation{}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.dt.DeepEqual()) + }) + } +} + +// TestLocationDataEquals tests LocationData.Equals() +func TestLocationDataEquals(t *testing.T) { + tests := []struct { + name string + ld1 LocationData + ld2 LocationData + expected bool + }{ + { + name: "equal location data - empty locations", + ld1: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{}, + }, + ld2: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{}, + }, + expected: true, + }, + { + name: "equal location data - with addresses", + ld1: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{ + "loc1": { + AddressUpdateAction: PartialUpdate, + Addresses: map[string]Address{ + "addr1": {ServiceRef: sets.NewString("svc1")}, + }, + }, + }, + }, + ld2: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{ + "loc1": { + AddressUpdateAction: PartialUpdate, + Addresses: map[string]Address{ + "addr1": {ServiceRef: sets.NewString("svc1")}, + }, + }, + }, + }, + expected: true, + }, + { + name: "different action", + ld1: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{}, + }, + ld2: LocationData{ + Action: FullUpdate, + Locations: map[string]Location{}, + }, + expected: false, + }, + { + name: "different location count", + ld1: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{ + "loc1": {AddressUpdateAction: PartialUpdate, Addresses: map[string]Address{}}, + }, + }, + ld2: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{ + "loc1": {AddressUpdateAction: PartialUpdate, Addresses: map[string]Address{}}, + "loc2": {AddressUpdateAction: PartialUpdate, Addresses: map[string]Address{}}, + }, + }, + expected: false, + }, + { + name: "different address update action", + ld1: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{ + "loc1": {AddressUpdateAction: PartialUpdate, Addresses: map[string]Address{}}, + }, + }, + ld2: LocationData{ + Action: PartialUpdate, + Locations: map[string]Location{ + "loc1": {AddressUpdateAction: FullUpdate, Addresses: map[string]Address{}}, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.ld1.Equals(&tt.ld2)) + }) + } +} + +// TestDiffTrackerEquals tests DiffTracker.Equals() +func TestDiffTrackerEquals(t *testing.T) { + tests := []struct { + name string + dt1 *DiffTracker + dt2 *DiffTracker + expected bool + }{ + { + name: "equal diff trackers", + dt1: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("svc1"), + Egresses: sets.NewString("egress1"), + Nodes: map[string]Node{}, + }, + }, + dt2: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("svc1"), + Egresses: sets.NewString("egress1"), + Nodes: map[string]Node{}, + }, + }, + expected: true, + }, + { + name: "different services", + dt1: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("svc1"), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + }, + dt2: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString("svc2"), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + }, + expected: false, + }, + { + name: "different egresses", + dt1: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString("egress1"), + Nodes: map[string]Node{}, + }, + }, + dt2: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString("egress2"), + Nodes: map[string]Node{}, + }, + }, + expected: false, + }, + { + name: "different node count", + dt1: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString(), + Nodes: map[string]Node{"node1": {}}, + }, + }, + dt2: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString(), + Nodes: map[string]Node{}, + }, + }, + expected: false, + }, + { + name: "different pod count in node", + dt1: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString(), + Nodes: map[string]Node{ + "node1": {Pods: map[string]Pod{"pod1": {}}}, + }, + }, + }, + dt2: &DiffTracker{ + K8sResources: K8s_State{ + Services: sets.NewString(), + Egresses: sets.NewString(), + Nodes: map[string]Node{ + "node1": {Pods: map[string]Pod{}}, + }, + }, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.dt1.Equals(tt.dt2)) + }) + } +} + +// TestSyncServicesReturnTypeEquals tests SyncServicesReturnType.Equals() +func TestSyncServicesReturnTypeEquals(t *testing.T) { + tests := []struct { + name string + s1 SyncServicesReturnType + s2 SyncServicesReturnType + expected bool + }{ + { + name: "equal - both empty", + s1: SyncServicesReturnType{ + Additions: sets.NewString(), + Removals: sets.NewString(), + }, + s2: SyncServicesReturnType{ + Additions: sets.NewString(), + Removals: sets.NewString(), + }, + expected: true, + }, + { + name: "equal - same additions", + s1: SyncServicesReturnType{ + Additions: sets.NewString("svc1", "svc2"), + Removals: sets.NewString(), + }, + s2: SyncServicesReturnType{ + Additions: sets.NewString("svc1", "svc2"), + Removals: sets.NewString(), + }, + expected: true, + }, + { + name: "not equal - different additions", + s1: SyncServicesReturnType{ + Additions: sets.NewString("svc1"), + Removals: sets.NewString(), + }, + s2: SyncServicesReturnType{ + Additions: sets.NewString("svc2"), + Removals: sets.NewString(), + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, tt.s1.Equals(&tt.s2)) + }) + } +} + +// TestConfigValidate tests Config.Validate() +func TestConfigValidate(t *testing.T) { + tests := []struct { + name string + config Config + shouldError bool + errorMsg string + }{ + { + name: "valid config", + config: Config{ + SubscriptionID: "sub1", + ResourceGroup: "rg1", + Location: "eastus", + VNetName: "test-vnet", + ServiceGatewayResourceName: "sgw", + ServiceGatewayID: "/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.Network/serviceGateways/sgw", + }, + shouldError: false, + }, + { + name: "missing subscription ID", + config: Config{ + ResourceGroup: "rg1", + Location: "eastus", + ServiceGatewayResourceName: "sgw", + ServiceGatewayID: "/id", + }, + shouldError: true, + errorMsg: "SubscriptionID is required", + }, + { + name: "missing resource group", + config: Config{ + SubscriptionID: "sub1", + Location: "eastus", + ServiceGatewayResourceName: "sgw", + ServiceGatewayID: "/id", + }, + shouldError: true, + errorMsg: "ResourceGroup is required", + }, + { + name: "missing location", + config: Config{ + SubscriptionID: "sub1", + ResourceGroup: "rg1", + ServiceGatewayResourceName: "sgw", + ServiceGatewayID: "/id", + }, + shouldError: true, + errorMsg: "Location is required", + }, + { + name: "missing ServiceGatewayResourceName", + config: Config{ + SubscriptionID: "sub1", + ResourceGroup: "rg1", + Location: "eastus", + ServiceGatewayID: "/id", + }, + shouldError: true, + errorMsg: "ServiceGatewayResourceName is required", + }, + { + name: "missing ServiceGatewayID", + config: Config{ + SubscriptionID: "sub1", + ResourceGroup: "rg1", + Location: "eastus", + ServiceGatewayResourceName: "sgw", + }, + shouldError: true, + errorMsg: "ServiceGatewayID is required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.shouldError { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.errorMsg) + } else { + assert.NoError(t, err) + } + }) + } +} + +// TestJSONRoundTrip tests JSON marshaling/unmarshaling for various types +func TestJSONRoundTrip(t *testing.T) { + t.Run("UpdateAction round trip", func(t *testing.T) { + original := PartialUpdate + data, err := json.Marshal(original) + assert.NoError(t, err) + + var unmarshaled UpdateAction + err = json.Unmarshal(data, &unmarshaled) + assert.NoError(t, err) + assert.Equal(t, original, unmarshaled) + }) + + t.Run("Operation round trip", func(t *testing.T) { + original := ADD + data, err := json.Marshal(original) + assert.NoError(t, err) + assert.Equal(t, `"ADD"`, string(data)) + }) + + t.Run("SyncStatus round trip", func(t *testing.T) { + original := SUCCESS + data, err := json.Marshal(original) + assert.NoError(t, err) + assert.Equal(t, `"SUCCESS"`, string(data)) + }) +} diff --git a/pkg/util/sets/string.go b/pkg/util/sets/string.go index 2562fcf6ea..8b1ee4495f 100644 --- a/pkg/util/sets/string.go +++ b/pkg/util/sets/string.go @@ -17,6 +17,7 @@ limitations under the License. package sets import ( + "encoding/json" "strings" "k8s.io/apimachinery/pkg/util/sets" @@ -98,3 +99,35 @@ func (s *IgnoreCaseSet) Len() int { } return s.set.Len() } + +// MarshalJSON marshals the set to JSON as an array of strings. +func (s *IgnoreCaseSet) MarshalJSON() ([]byte, error) { + if s == nil { + return []byte("null"), nil + } + if s.Len() == 0 { + return []byte("[]"), nil + } + return json.Marshal(s.UnsortedList()) +} + +// Equals returns true if the two sets are equal. +func (s1 *IgnoreCaseSet) Equals(s2 *IgnoreCaseSet) bool { + // Early exit if sizes are different + if len(s1.UnsortedList()) != len(s2.UnsortedList()) { + return false + } + // Check if all items in s1 are in s2 + for _, item := range s1.UnsortedList() { + if !s2.Has(item) { + return false + } + } + // Check if all items in s2 are in s1 + for _, item := range s2.UnsortedList() { + if !s1.Has(item) { + return false + } + } + return true +} diff --git a/pkg/util/sets/string_test.go b/pkg/util/sets/string_test.go index b0286b7d3d..968cdb3dc7 100644 --- a/pkg/util/sets/string_test.go +++ b/pkg/util/sets/string_test.go @@ -367,3 +367,81 @@ func TestLen(t *testing.T) { }) } } +func TestEquals(t *testing.T) { + tests := []struct { + name string + s1 *IgnoreCaseSet + s2 *IgnoreCaseSet + want bool + }{ + { + name: "both nil", + s1: nil, + s2: nil, + want: true, + }, + { + name: "first nil", + s1: nil, + s2: NewString("foo"), + want: false, + }, + { + name: "second nil", + s1: NewString("foo"), + s2: nil, + want: false, + }, + { + name: "empty sets", + s1: NewString(), + s2: NewString(), + want: true, + }, + { + name: "same elements", + s1: NewString("foo", "bar"), + s2: NewString("foo", "bar"), + want: true, + }, + { + name: "same elements with different case", + s1: NewString("foo", "bar"), + s2: NewString("FOO", "BAR"), + want: true, + }, + { + name: "different sizes", + s1: NewString("foo", "bar"), + s2: NewString("foo"), + want: false, + }, + { + name: "same size but different elements", + s1: NewString("foo", "bar"), + s2: NewString("foo", "baz"), + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.s1 == nil && tt.s2 == nil { + // Special case for nil sets + if !tt.want { + t.Errorf("Equals() = true, want %v", tt.want) + } + return + } + if tt.s1 == nil || tt.s2 == nil { + // One set is nil, they can't be equal + if tt.want { + t.Errorf("Equals() = false, want %v", tt.want) + } + return + } + if got := tt.s1.Equals(tt.s2); got != tt.want { + t.Errorf("Equals() = %v, want %v", got, tt.want) + } + }) + } +} From 0760d17e5fe0c567ea58d2aaebf91bfba6d4eb5f Mon Sep 17 00:00:00 2001 From: George Edward Nechitoaia <58257818+georgeedward2000@users.noreply.github.com> Date: Wed, 20 May 2026 07:27:38 +0000 Subject: [PATCH 2/3] addressed comments --- pkg/provider/difftracker/config.go | 16 +++ pkg/provider/difftracker/difftracker.go | 31 +++-- pkg/provider/difftracker/difftracker_test.go | 107 +++++++++++------- pkg/provider/difftracker/k8s_state_updates.go | 36 +++++- pkg/provider/difftracker/nrp_state_updates.go | 17 +++ pkg/provider/difftracker/sync_operations.go | 17 +++ pkg/provider/difftracker/types.go | 25 +++- pkg/provider/difftracker/util.go | 16 +++ pkg/provider/difftracker/util_test.go | 56 +++++---- 9 files changed, 240 insertions(+), 81 deletions(-) diff --git a/pkg/provider/difftracker/config.go b/pkg/provider/difftracker/config.go index 198df8fe98..3ce5d7c8c1 100644 --- a/pkg/provider/difftracker/config.go +++ b/pkg/provider/difftracker/config.go @@ -1,3 +1,19 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import "fmt" diff --git a/pkg/provider/difftracker/difftracker.go b/pkg/provider/difftracker/difftracker.go index c8d850bf3a..19fb3c1794 100644 --- a/pkg/provider/difftracker/difftracker.go +++ b/pkg/provider/difftracker/difftracker.go @@ -1,3 +1,19 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( @@ -5,25 +21,24 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "sigs.k8s.io/cloud-provider-azure/pkg/azclient" utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" ) // InitializeDiffTracker creates and initializes a new DiffTracker with the given state and configuration. // It validates the configuration and ensures all required dependencies are present. -// Panics if critical dependencies (config, networkClientFactory, kubeClient) are invalid. -func InitializeDiffTracker(K8s K8s_State, NRP NRP_State, config Config, networkClientFactory azclient.ClientFactory, kubeClient kubernetes.Interface) *DiffTracker { - // Validate configuration +// Returns an error if the configuration is invalid or if any required dependency is nil. +func InitializeDiffTracker(K8s K8sState, NRP NRPState, config Config, networkClientFactory azclient.ClientFactory, kubeClient kubernetes.Interface) (*DiffTracker, error) { if err := config.Validate(); err != nil { - panic(fmt.Sprintf("InitializeDiffTracker: %v", err)) + return nil, fmt.Errorf("InitializeDiffTracker: %w", err) } - // Validate required dependencies if networkClientFactory == nil { - panic("InitializeDiffTracker: networkClientFactory must not be nil") + return nil, fmt.Errorf("InitializeDiffTracker: networkClientFactory must not be nil") } if kubeClient == nil { - panic("InitializeDiffTracker: kubeClient must not be nil") + return nil, fmt.Errorf("InitializeDiffTracker: kubeClient must not be nil") } klog.V(2).Infof("InitializeDiffTracker: initializing with config: subscription=%s, resourceGroup=%s, location=%s", @@ -59,5 +74,5 @@ func InitializeDiffTracker(K8s K8s_State, NRP NRP_State, config Config, networkC kubeClient: kubeClient, } - return diffTracker + return diffTracker, nil } diff --git a/pkg/provider/difftracker/difftracker_test.go b/pkg/provider/difftracker/difftracker_test.go index 11dacb8ed7..d4e0366e16 100644 --- a/pkg/provider/difftracker/difftracker_test.go +++ b/pkg/provider/difftracker/difftracker_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( @@ -5,7 +21,9 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" + "k8s.io/client-go/kubernetes/fake" + "sigs.k8s.io/cloud-provider-azure/pkg/azclient/mock_azclient" "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" ) @@ -19,12 +37,12 @@ func TestDiffTracker_DeepEqual(t *testing.T) { { name: "equal empty states", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString(), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString(), NATGateways: sets.NewString(), Locations: map[string]NRPLocation{}, @@ -35,12 +53,12 @@ func TestDiffTracker_DeepEqual(t *testing.T) { { name: "equal states with services", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("service1", "service2"), Egresses: sets.NewString(), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1", "service2"), NATGateways: sets.NewString(), Locations: map[string]NRPLocation{}, @@ -51,12 +69,12 @@ func TestDiffTracker_DeepEqual(t *testing.T) { { name: "services not equal", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("service1", "service2"), Egresses: sets.NewString(), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1"), NATGateways: sets.NewString(), Locations: map[string]NRPLocation{}, @@ -74,15 +92,15 @@ func TestDiffTracker_DeepEqual(t *testing.T) { } } -func TestUpdateK8sService(t *testing.T) { +func TestEnqueueK8sServiceOperation(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), }, } // Test ADD operation - err := dt.UpdateK8sService(UpdateK8sResource{ + err := dt.EnqueueK8sServiceOperation(UpdateK8sResource{ Operation: ADD, ID: "service1", }) @@ -90,7 +108,7 @@ func TestUpdateK8sService(t *testing.T) { assert.True(t, dt.K8sResources.Services.Has("service1")) // Test REMOVE operation - err = dt.UpdateK8sService(UpdateK8sResource{ + err = dt.EnqueueK8sServiceOperation(UpdateK8sResource{ Operation: REMOVE, ID: "service1", }) @@ -98,7 +116,7 @@ func TestUpdateK8sService(t *testing.T) { assert.False(t, dt.K8sResources.Services.Has("service1")) // Test invalid operation - err = dt.UpdateK8sService(UpdateK8sResource{ + err = dt.EnqueueK8sServiceOperation(UpdateK8sResource{ Operation: UPDATE, ID: "service1", }) @@ -107,10 +125,10 @@ func TestUpdateK8sService(t *testing.T) { func TestGetSyncLoadBalancerServices(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("service1", "service2", "service3"), }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service2", "service3", "service4"), }, } @@ -126,7 +144,7 @@ func TestGetSyncLoadBalancerServices(t *testing.T) { func TestUpdateK8sEndpoints(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Nodes: map[string]Node{}, }, } @@ -158,7 +176,7 @@ func TestUpdateK8sEndpoints(t *testing.T) { func TestUpdateK8sPod(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Nodes: map[string]Node{}, }, } @@ -191,7 +209,7 @@ func TestUpdateK8sPod(t *testing.T) { func TestGetSyncLocationsAddresses(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Nodes: map[string]Node{ "node1": { Pods: map[string]Pod{ @@ -203,7 +221,7 @@ func TestGetSyncLocationsAddresses(t *testing.T) { }, }, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1"), NATGateways: sets.NewString("public1"), Locations: map[string]NRPLocation{}, @@ -246,10 +264,10 @@ func TestUpdateNRPLoadBalancers(t *testing.T) { { name: "add services from K8s to NRP", initialState: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("service1", "service2", "service3"), }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1"), }, }, @@ -258,10 +276,10 @@ func TestUpdateNRPLoadBalancers(t *testing.T) { { name: "no changes needed when K8s and NRP are in sync", initialState: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("service1", "service2"), }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1", "service2"), }, }, @@ -282,22 +300,22 @@ func TestUpdateNRPLoadBalancers(t *testing.T) { } } -func TestUpdateK8sEgress(t *testing.T) { +func TestEnqueueK8sEgressOperation(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Egresses: sets.NewString(), }, } - err := dt.UpdateK8sEgress(UpdateK8sResource{Operation: ADD, ID: "egress1"}) + err := dt.EnqueueK8sEgressOperation(UpdateK8sResource{Operation: ADD, ID: "egress1"}) assert.NoError(t, err) assert.True(t, dt.K8sResources.Egresses.Has("egress1")) - err = dt.UpdateK8sEgress(UpdateK8sResource{Operation: REMOVE, ID: "egress1"}) + err = dt.EnqueueK8sEgressOperation(UpdateK8sResource{Operation: REMOVE, ID: "egress1"}) assert.NoError(t, err) assert.False(t, dt.K8sResources.Egresses.Has("egress1")) - err = dt.UpdateK8sEgress(UpdateK8sResource{Operation: UPDATE, ID: "egress1"}) + err = dt.EnqueueK8sEgressOperation(UpdateK8sResource{Operation: UPDATE, ID: "egress1"}) assert.Error(t, err) assert.Contains(t, err.Error(), "error - ResourceType=Egress, Operation=UPDATE and ID=egress1") } @@ -329,10 +347,10 @@ func TestGetSyncNRPNATGateways(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Egresses: sets.NewString(tt.k8sEgresses...), }, - NRPResources: NRP_State{ + NRPResources: NRPState{ NATGateways: sets.NewString(tt.nrpNATGateways...), }, } @@ -354,10 +372,10 @@ func TestGetSyncNRPNATGateways(t *testing.T) { func TestUpdateNRPNATGateways(t *testing.T) { dt := &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Egresses: sets.NewString("egress1", "egress2", "egress4"), }, - NRPResources: NRP_State{ + NRPResources: NRPState{ NATGateways: sets.NewString("egress1", "egress3", "egress5"), }, } @@ -381,15 +399,15 @@ func TestUpdateLocationsAddresses(t *testing.T) { { name: "sync empty states", initialState: &DiffTracker{ - K8sResources: K8s_State{Nodes: map[string]Node{}}, - NRPResources: NRP_State{Locations: map[string]NRPLocation{}}, + K8sResources: K8sState{Nodes: map[string]Node{}}, + NRPResources: NRPState{Locations: map[string]NRPLocation{}}, }, expectedNRP: map[string]map[string][]string{}, }, { name: "add new location and address", initialState: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Nodes: map[string]Node{ "node1": { Pods: map[string]Pod{ @@ -401,7 +419,7 @@ func TestUpdateLocationsAddresses(t *testing.T) { }, }, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1"), NATGateways: sets.NewString("public1"), Locations: map[string]NRPLocation{}, @@ -414,7 +432,7 @@ func TestUpdateLocationsAddresses(t *testing.T) { { name: "complex case with multiple operations", initialState: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Nodes: map[string]Node{ "node1": { Pods: map[string]Pod{ @@ -431,7 +449,7 @@ func TestUpdateLocationsAddresses(t *testing.T) { }, }, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1", "service2", "service3", "service4", "service5"), NATGateways: sets.NewString("public1"), Locations: map[string]NRPLocation{ @@ -490,7 +508,7 @@ func TestGetSyncOperations(t *testing.T) { { name: "states already in sync", initialState: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("service1"), Egresses: sets.NewString("egress1"), Nodes: map[string]Node{ @@ -499,7 +517,7 @@ func TestGetSyncOperations(t *testing.T) { }}, }, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1"), NATGateways: sets.NewString("egress1"), Locations: map[string]NRPLocation{ @@ -514,7 +532,7 @@ func TestGetSyncOperations(t *testing.T) { { name: "services out of sync", initialState: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("service1", "service2"), Egresses: sets.NewString("egress1"), Nodes: map[string]Node{ @@ -523,7 +541,7 @@ func TestGetSyncOperations(t *testing.T) { }}, }, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("service1"), NATGateways: sets.NewString("egress1"), Locations: map[string]NRPLocation{ @@ -549,7 +567,7 @@ func TestGetSyncOperations(t *testing.T) { // This test verifies if the DiffTracker is able to sync K8s Cluster and NRP correctly // when there is a huge discrepancy between K8s Cluster and NRP. func TestInitializeDiffTracker(t *testing.T) { - K8sResources := K8s_State{ + K8sResources := K8sState{ Services: sets.NewString("Service0", "Service1", "Service2"), Egresses: sets.NewString("Egress0", "Egress1", "Egress2"), Nodes: map[string]Node{ @@ -569,7 +587,7 @@ func TestInitializeDiffTracker(t *testing.T) { }, } - NRPResources := NRP_State{ + NRPResources := NRPState{ LoadBalancers: sets.NewString("Service0", "Service6", "Service5"), NATGateways: sets.NewString("Egress0", "Egress6", "Egress5"), Locations: map[string]NRPLocation{ @@ -602,7 +620,8 @@ func TestInitializeDiffTracker(t *testing.T) { defer ctrl.Finish() mockFactory := mock_azclient.NewMockClientFactory(ctrl) mockKubeClient := fake.NewSimpleClientset() - diffTracker := InitializeDiffTracker(K8sResources, NRPResources, config, mockFactory, mockKubeClient) + diffTracker, err := InitializeDiffTracker(K8sResources, NRPResources, config, mockFactory, mockKubeClient) + assert.NoError(t, err) syncOperations := diffTracker.GetSyncOperations() diffTracker.UpdateNRPLoadBalancers(syncOperations.LoadBalancerUpdates) @@ -613,7 +632,7 @@ func TestInitializeDiffTracker(t *testing.T) { expectedDiffTracker := &DiffTracker{ K8sResources: K8sResources, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("Service0", "Service1", "Service2"), NATGateways: sets.NewString("Egress0", "Egress1", "Egress2"), Locations: map[string]NRPLocation{ diff --git a/pkg/provider/difftracker/k8s_state_updates.go b/pkg/provider/difftracker/k8s_state_updates.go index 4a421e58c4..34f6227f7c 100644 --- a/pkg/provider/difftracker/k8s_state_updates.go +++ b/pkg/provider/difftracker/k8s_state_updates.go @@ -1,3 +1,19 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( @@ -5,6 +21,7 @@ import ( "strings" "k8s.io/klog/v2" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" ) @@ -13,7 +30,10 @@ const ( ResourceTypeEgress = "Egress" ) -func updateK8Resource(input UpdateK8sResource, set *utilsets.IgnoreCaseSet, resourceType string) error { +// enqueueK8sResourceOperation applies the requested operation (ADD/REMOVE) to the +// in-memory K8s resource set. It does not perform any Azure update calls; it only +// mutates the local desired-state model that will later be reconciled with NRP. +func enqueueK8sResourceOperation(input UpdateK8sResource, set *utilsets.IgnoreCaseSet, resourceType string) error { if input.ID == "" { return fmt.Errorf("%s: empty ID not allowed", resourceType) } @@ -29,18 +49,24 @@ func updateK8Resource(input UpdateK8sResource, set *utilsets.IgnoreCaseSet, reso return nil } -func (dt *DiffTracker) UpdateK8sService(input UpdateK8sResource) error { +// EnqueueK8sServiceOperation records a service ADD/REMOVE in the local K8s state set. +// The change is reconciled with NRP later by the sync operations; this method itself +// performs no Azure calls. +func (dt *DiffTracker) EnqueueK8sServiceOperation(input UpdateK8sResource) error { dt.mu.Lock() defer dt.mu.Unlock() - return updateK8Resource(input, dt.K8sResources.Services, ResourceTypeService) + return enqueueK8sResourceOperation(input, dt.K8sResources.Services, ResourceTypeService) } -func (dt *DiffTracker) UpdateK8sEgress(input UpdateK8sResource) error { +// EnqueueK8sEgressOperation records an egress ADD/REMOVE in the local K8s state set. +// The change is reconciled with NRP later by the sync operations; this method itself +// performs no Azure calls. +func (dt *DiffTracker) EnqueueK8sEgressOperation(input UpdateK8sResource) error { dt.mu.Lock() defer dt.mu.Unlock() - return updateK8Resource(input, dt.K8sResources.Egresses, ResourceTypeEgress) + return enqueueK8sResourceOperation(input, dt.K8sResources.Egresses, ResourceTypeEgress) } // updateK8sEndpointsLocked updates K8s endpoints state. Assumes lock is already held. diff --git a/pkg/provider/difftracker/nrp_state_updates.go b/pkg/provider/difftracker/nrp_state_updates.go index 91f05841b5..5b59e69d7b 100644 --- a/pkg/provider/difftracker/nrp_state_updates.go +++ b/pkg/provider/difftracker/nrp_state_updates.go @@ -1,7 +1,24 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( "k8s.io/klog/v2" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" ) diff --git a/pkg/provider/difftracker/sync_operations.go b/pkg/provider/difftracker/sync_operations.go index 6b18b4d4d4..8d2050dd44 100644 --- a/pkg/provider/difftracker/sync_operations.go +++ b/pkg/provider/difftracker/sync_operations.go @@ -1,7 +1,24 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( "k8s.io/klog/v2" + utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" ) diff --git a/pkg/provider/difftracker/types.go b/pkg/provider/difftracker/types.go index 66c7f14e75..9cab51fee6 100644 --- a/pkg/provider/difftracker/types.go +++ b/pkg/provider/difftracker/types.go @@ -1,9 +1,26 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( "sync" "k8s.io/client-go/kubernetes" + "sigs.k8s.io/cloud-provider-azure/pkg/azclient" utilsets "sigs.k8s.io/cloud-provider-azure/pkg/util/sets" ) @@ -44,7 +61,7 @@ type NRPLocation struct { Addresses map[string]NRPAddress } -type NRP_State struct { +type NRPState struct { LoadBalancers *utilsets.IgnoreCaseSet NATGateways *utilsets.IgnoreCaseSet Locations map[string]NRPLocation @@ -59,7 +76,7 @@ type Node struct { Pods map[string]Pod } -type K8s_State struct { +type K8sState struct { Services *utilsets.IgnoreCaseSet Egresses *utilsets.IgnoreCaseSet Nodes map[string]Node @@ -69,8 +86,8 @@ type K8s_State struct { type DiffTracker struct { mu sync.Mutex // Protects concurrent access to DiffTracker - K8sResources K8s_State - NRPResources NRP_State + K8sResources K8sState + NRPResources NRPState LocalServiceNameToNRPServiceMap sync.Map diff --git a/pkg/provider/difftracker/util.go b/pkg/provider/difftracker/util.go index eb853bfaa4..f91a82df0c 100644 --- a/pkg/provider/difftracker/util.go +++ b/pkg/provider/difftracker/util.go @@ -1,3 +1,19 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( diff --git a/pkg/provider/difftracker/util_test.go b/pkg/provider/difftracker/util_test.go index 9ffce2621f..7b7b81136b 100644 --- a/pkg/provider/difftracker/util_test.go +++ b/pkg/provider/difftracker/util_test.go @@ -1,3 +1,19 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package difftracker import ( @@ -179,12 +195,12 @@ func TestDeepEqual(t *testing.T) { { name: "in sync - matching services and load balancers", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("svc1", "svc2"), Egresses: sets.NewString(), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("svc1", "svc2"), NATGateways: sets.NewString(), Locations: map[string]NRPLocation{}, @@ -195,12 +211,12 @@ func TestDeepEqual(t *testing.T) { { name: "not in sync - service count mismatch", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("svc1"), Egresses: sets.NewString(), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("svc1", "svc2"), NATGateways: sets.NewString(), Locations: map[string]NRPLocation{}, @@ -211,12 +227,12 @@ func TestDeepEqual(t *testing.T) { { name: "not in sync - service name mismatch", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("svc1"), Egresses: sets.NewString(), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString("svc2"), NATGateways: sets.NewString(), Locations: map[string]NRPLocation{}, @@ -227,12 +243,12 @@ func TestDeepEqual(t *testing.T) { { name: "in sync - matching egresses and NAT gateways", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString("egress1", "egress2"), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString(), NATGateways: sets.NewString("egress1", "egress2"), Locations: map[string]NRPLocation{}, @@ -243,12 +259,12 @@ func TestDeepEqual(t *testing.T) { { name: "not in sync - egress count mismatch", dt: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString("egress1"), Nodes: map[string]Node{}, }, - NRPResources: NRP_State{ + NRPResources: NRPState{ LoadBalancers: sets.NewString(), NATGateways: sets.NewString("egress1", "egress2"), Locations: map[string]NRPLocation{}, @@ -376,14 +392,14 @@ func TestDiffTrackerEquals(t *testing.T) { { name: "equal diff trackers", dt1: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("svc1"), Egresses: sets.NewString("egress1"), Nodes: map[string]Node{}, }, }, dt2: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("svc1"), Egresses: sets.NewString("egress1"), Nodes: map[string]Node{}, @@ -394,14 +410,14 @@ func TestDiffTrackerEquals(t *testing.T) { { name: "different services", dt1: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("svc1"), Egresses: sets.NewString(), Nodes: map[string]Node{}, }, }, dt2: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString("svc2"), Egresses: sets.NewString(), Nodes: map[string]Node{}, @@ -412,14 +428,14 @@ func TestDiffTrackerEquals(t *testing.T) { { name: "different egresses", dt1: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString("egress1"), Nodes: map[string]Node{}, }, }, dt2: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString("egress2"), Nodes: map[string]Node{}, @@ -430,14 +446,14 @@ func TestDiffTrackerEquals(t *testing.T) { { name: "different node count", dt1: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString(), Nodes: map[string]Node{"node1": {}}, }, }, dt2: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString(), Nodes: map[string]Node{}, @@ -448,7 +464,7 @@ func TestDiffTrackerEquals(t *testing.T) { { name: "different pod count in node", dt1: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString(), Nodes: map[string]Node{ @@ -457,7 +473,7 @@ func TestDiffTrackerEquals(t *testing.T) { }, }, dt2: &DiffTracker{ - K8sResources: K8s_State{ + K8sResources: K8sState{ Services: sets.NewString(), Egresses: sets.NewString(), Nodes: map[string]Node{ From 16a0b87da6e85794d45156436d73b4b537db91a3 Mon Sep 17 00:00:00 2001 From: George Edward Nechitoaia <58257818+georgeedward2000@users.noreply.github.com> Date: Wed, 20 May 2026 12:59:44 +0000 Subject: [PATCH 3/3] address review comments: scope helper to DiffTracker; clarify NRP type docs --- pkg/provider/difftracker/k8s_state_updates.go | 6 +++--- pkg/provider/difftracker/types.go | 14 ++++++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/provider/difftracker/k8s_state_updates.go b/pkg/provider/difftracker/k8s_state_updates.go index 34f6227f7c..2c8a20b7e7 100644 --- a/pkg/provider/difftracker/k8s_state_updates.go +++ b/pkg/provider/difftracker/k8s_state_updates.go @@ -33,7 +33,7 @@ const ( // enqueueK8sResourceOperation applies the requested operation (ADD/REMOVE) to the // in-memory K8s resource set. It does not perform any Azure update calls; it only // mutates the local desired-state model that will later be reconciled with NRP. -func enqueueK8sResourceOperation(input UpdateK8sResource, set *utilsets.IgnoreCaseSet, resourceType string) error { +func (dt *DiffTracker) enqueueK8sResourceOperation(input UpdateK8sResource, set *utilsets.IgnoreCaseSet, resourceType string) error { if input.ID == "" { return fmt.Errorf("%s: empty ID not allowed", resourceType) } @@ -56,7 +56,7 @@ func (dt *DiffTracker) EnqueueK8sServiceOperation(input UpdateK8sResource) error dt.mu.Lock() defer dt.mu.Unlock() - return enqueueK8sResourceOperation(input, dt.K8sResources.Services, ResourceTypeService) + return dt.enqueueK8sResourceOperation(input, dt.K8sResources.Services, ResourceTypeService) } // EnqueueK8sEgressOperation records an egress ADD/REMOVE in the local K8s state set. @@ -66,7 +66,7 @@ func (dt *DiffTracker) EnqueueK8sEgressOperation(input UpdateK8sResource) error dt.mu.Lock() defer dt.mu.Unlock() - return enqueueK8sResourceOperation(input, dt.K8sResources.Egresses, ResourceTypeEgress) + return dt.enqueueK8sResourceOperation(input, dt.K8sResources.Egresses, ResourceTypeEgress) } // updateK8sEndpointsLocked updates K8s endpoints state. Assumes lock is already held. diff --git a/pkg/provider/difftracker/types.go b/pkg/provider/difftracker/types.go index 9cab51fee6..43a8865cb7 100644 --- a/pkg/provider/difftracker/types.go +++ b/pkg/provider/difftracker/types.go @@ -53,18 +53,28 @@ const ( // -------------------------------------------------------------------------------- // DiffTracker keeps track of the state of the K8s cluster and NRP // -------------------------------------------------------------------------------- +// NRPAddress holds the NRP-side state for a single pod address (pod IP). type NRPAddress struct { - Services *utilsets.IgnoreCaseSet // all inbound and outbound identities + // Services holds the SGW service identities (LBs for inbound, NATGWs for + // outbound) currently associated with this address on the NRP side. + // These are SGW service identities, not Kubernetes Service names. + Services *utilsets.IgnoreCaseSet } +// NRPLocation holds the NRP-side state for a single node/VM and groups the +// pod addresses running on it. type NRPLocation struct { + // Addresses is keyed by pod IP. Each pod IP is added to the ServiceGateway + // as an address under this location once the pod is created. Addresses map[string]NRPAddress } type NRPState struct { LoadBalancers *utilsets.IgnoreCaseSet NATGateways *utilsets.IgnoreCaseSet - Locations map[string]NRPLocation + // Locations is keyed by node/VM IP (e.g. "10.0.0.1"). "Location" here is + // an SGW concept identifying a node, not an Azure region (e.g. "eastus2"). + Locations map[string]NRPLocation } type Pod struct {