From 8887d3b3bba959b182fd1dce2abd2648b3c1eed4 Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Fri, 15 May 2026 09:32:27 +0000 Subject: [PATCH 1/9] feat: add reverse mapping between podUID and podIdentifier to delete stale entries --- .../internal/kube/client.go | 216 ++++++++++++++++-- .../internal/kube/client_test.go | 166 ++++++++++++++ 2 files changed, 357 insertions(+), 25 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index fd82d9302376b..281a5e085a2af 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -56,11 +56,14 @@ type WatchClient struct { // A map containing Pod related data, used to associate them with resources. // Key can be either an IP address or Pod UID - Pods map[PodIdentifier]*Pod - Rules ExtractionRules - Filters Filters - Associations []Association - Exclude Excludes + Pods map[PodIdentifier]*Pod + // podIdentifiers tracks the PodIdentifier keys associated with each pod UID so + // identifiers that disappear on pod update can be cleaned after the delete grace period. + podIdentifiers map[string]map[PodIdentifier]struct{} + Rules ExtractionRules + Filters Filters + Associations []Association + Exclude Excludes // A map containing Namespace related data, used to associate them with resources. // Key is namespace name @@ -146,6 +149,7 @@ func New( } c.Pods = map[PodIdentifier]*Pod{} + c.podIdentifiers = map[string]map[PodIdentifier]struct{}{} c.Namespaces = map[string]*Namespace{} c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} @@ -390,6 +394,7 @@ func (c *WatchClient) Stop() { } func (c *WatchClient) handlePodAdd(obj any) { + podTableSize := 0 if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodAdded.Add(context.Background(), 1) } @@ -397,11 +402,11 @@ func (c *WatchClient) handlePodAdd(obj any) { c.telemetryBuilder.K8sWatcherPodAdded.Add(context.Background(), 1) } if pod, ok := obj.(*api_v1.Pod); ok { - c.addOrUpdatePod(pod) + podTableSize = c.addOrUpdatePod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) + podTableSize = c.podTableSize() } - podTableSize := len(c.Pods) if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize)) } @@ -411,6 +416,7 @@ func (c *WatchClient) handlePodAdd(obj any) { } func (c *WatchClient) handlePodUpdate(_, newPod any) { + podTableSize := 0 if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodUpdated.Add(context.Background(), 1) } @@ -419,11 +425,11 @@ func (c *WatchClient) handlePodUpdate(_, newPod any) { } if pod, ok := newPod.(*api_v1.Pod); ok { // TODO: update or remove based on whether container is ready/unready?. - c.addOrUpdatePod(pod) + podTableSize = c.addOrUpdatePod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", newPod)) + podTableSize = c.podTableSize() } - podTableSize := len(c.Pods) if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize)) } @@ -433,6 +439,7 @@ func (c *WatchClient) handlePodUpdate(_, newPod any) { } func (c *WatchClient) handlePodDelete(obj any) { + podTableSize := 0 if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodDeleted.Add(context.Background(), 1) } @@ -440,11 +447,11 @@ func (c *WatchClient) handlePodDelete(obj any) { c.telemetryBuilder.K8sWatcherPodDeleted.Add(context.Background(), 1) } if pod, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Pod); ok { - c.forgetPod(pod) + podTableSize = c.forgetPod(pod) } else { c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj)) + podTableSize = c.podTableSize() } - podTableSize := len(c.Pods) if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize)) } @@ -770,6 +777,7 @@ func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { // and the underlying state (ip<>pod mapping) has not changed. if p.PodUID == d.podUID { delete(c.Pods, d.id) + c.removePodIdentifierLocked(d.podUID, d.id) deleted = true } } @@ -1586,13 +1594,15 @@ func (c *WatchClient) getIdentifiersFromAssoc(pod *Pod) []PodIdentifier { return ids } -func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) { +func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) int { newPod := c.podFromAPI(pod) + identifiers := c.getIdentifiersFromAssoc(newPod) + staleIdentifiers := make([]deleteRequest, 0) c.m.Lock() - defer c.m.Unlock() - - identifiers := c.getIdentifiersFromAssoc(newPod) + if newPod.PodUID != "" { + staleIdentifiers = c.getStalePodIdentifiersForDeletionLocked(newPod.PodUID, identifiers) + } for i := range identifiers { id := identifiers[i] // compare initial scheduled timestamp for existing pod and new pod with same identifier @@ -1603,34 +1613,190 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) { if pod.Status.StartTime.Before(p.StartTime) { continue } + c.removePodIdentifierLocked(p.PodUID, id) } c.Pods[id] = newPod + c.addPodIdentifierLocked(newPod.PodUID, id) } + podTableSize := len(c.Pods) + c.m.Unlock() + + c.appendDeleteRequests(staleIdentifiers, true) + return podTableSize } -func (c *WatchClient) forgetPod(pod *api_v1.Pod) { +func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { podToRemove := c.podFromAPI(pod) identifiers := c.getIdentifiersFromAssoc(podToRemove) + podUID := string(pod.UID) + deleteRequests := make([]deleteRequest, 0) + + c.m.Lock() + if podUID != "" { + // Use the reverse index so deletes include identifiers that disappeared from the final pod object. + deleteRequests = c.deleteRequestsForPodLocked(podUID, identifiers) + } else { + // Pods without a UID are not tracked in the reverse index; preserve the existing identifier-based cleanup. + identifiers = c.existingPodIdentifiersLocked(podUID, identifiers) + deleteRequests = c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiers) + } + podTableSize := len(c.Pods) + c.m.Unlock() + + c.appendDeleteRequests(deleteRequests, false) + return podTableSize +} + +func (c *WatchClient) podTableSize() int { + c.m.RLock() + defer c.m.RUnlock() + return len(c.Pods) +} + +// addPodIdentifierLocked records that id is currently associated with podUID. +func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) { + if podUID == "" { + return + } + if c.podIdentifiers == nil { + c.podIdentifiers = map[string]map[PodIdentifier]struct{}{} + } + identifiers := c.podIdentifiers[podUID] + if identifiers == nil { + identifiers = map[PodIdentifier]struct{}{} + c.podIdentifiers[podUID] = identifiers + } + identifiers[id] = struct{}{} +} + +// removePodIdentifierLocked removes id from podUID's reverse index entry. +func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) { + if podUID == "" || c.podIdentifiers == nil { + return + } + identifiers := c.podIdentifiers[podUID] + delete(identifiers, id) + if len(identifiers) == 0 { + delete(c.podIdentifiers, podUID) + } +} + +// getStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state. +func (c *WatchClient) getStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { + // Construct a set of current identifiers in order to diff against the indexed identifiers. + currentIdentifiers := make(map[PodIdentifier]struct{}, len(identifiers)) + for i := range identifiers { + currentIdentifiers[identifiers[i]] = struct{}{} + } + indexedIdentifiers := c.podIdentifiers[podUID] + staleIdentifiers := make([]deleteRequest, 0) + for id := range indexedIdentifiers { + if _, ok := currentIdentifiers[id]; ok { + continue + } + if p, ok := c.Pods[id]; ok && p.PodUID == podUID { + staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID}) + } + } + return staleIdentifiers +} + +// deleteRequestsForPodLocked returns delete requests for all identifiers known for podUID. +func (c *WatchClient) deleteRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { + if len(c.podIdentifiers[podUID]) == 0 { + return c.buildDeletionRequestsForIdentifiersLocked(podUID, c.existingPodIdentifiersLocked(podUID, identifiers)) + } + + indexedIdentifiers := c.podIdentifiers[podUID] + identifiersToDelete := make([]PodIdentifier, 0, len(indexedIdentifiers)) + seen := map[PodIdentifier]struct{}{} + // Prefer identifiers recomputed from the delete event. + for i := range identifiers { + id := identifiers[i] + if _, ok := indexedIdentifiers[id]; !ok { + continue + } + if p, ok := c.Pods[id]; ok && p.PodUID == podUID { + identifiersToDelete = append(identifiersToDelete, id) + seen[id] = struct{}{} + } + } + // Include indexed identifiers that are missing from the delete event, such as historical container IDs. + for id := range indexedIdentifiers { + if _, ok := seen[id]; ok { + continue + } + if p, ok := c.Pods[id]; ok && p.PodUID == podUID { + identifiersToDelete = append(identifiersToDelete, id) + } + } + return c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiersToDelete) +} + +// buildDeletionRequestsForIdentifiersLocked converts identifiers owned by podUID into delayed delete requests. +func (c *WatchClient) buildDeletionRequestsForIdentifiersLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { + requests := make([]deleteRequest, 0, len(identifiers)) for i := range identifiers { id := identifiers[i] - p, ok := c.GetPod(id) + p, ok := c.Pods[id] + if !ok || p.PodUID != podUID { + continue + } + requests = append(requests, deleteRequest{id: id, podUID: podUID}) + } + return requests +} - if ok && p.PodUID == string(pod.UID) { - c.appendDeleteQueue(id, p.PodUID) +// existingPodIdentifiersLocked filters identifiers to those still mapped to podUID in c.Pods. +func (c *WatchClient) existingPodIdentifiersLocked(podUID string, identifiers []PodIdentifier) []PodIdentifier { + existingIdentifiers := make([]PodIdentifier, 0, len(identifiers)) + for i := range identifiers { + id := identifiers[i] + p, ok := c.Pods[id] + if !ok { + continue + } + if p.PodUID != podUID { + continue } + existingIdentifiers = append(existingIdentifiers, id) } + return existingIdentifiers } -func (c *WatchClient) appendDeleteQueue(podID PodIdentifier, podUID string) { +// appendDeleteRequests appends requests to the delayed delete queue. +func (c *WatchClient) appendDeleteRequests(requests []deleteRequest, skipQueued bool) { + if len(requests) == 0 { + return + } c.deleteMut.Lock() - c.deleteQueue = append(c.deleteQueue, deleteRequest{ - id: podID, - podUID: podUID, - ts: time.Now(), - }) + now := time.Now() + for i := range requests { + request := requests[i] + // Stale identifiers can be observed on many pod updates before the delete grace period expires. + // Avoid queueing the same delayed delete repeatedly in that path. + if skipQueued && c.isDeleteQueuedLocked(request.id, request.podUID) { + continue + } + c.deleteQueue = append(c.deleteQueue, deleteRequest{ + id: request.id, + podUID: request.podUID, + ts: now, + }) + } c.deleteMut.Unlock() } +// isDeleteQueuedLocked reports whether podID is already waiting for delayed deletion. +func (c *WatchClient) isDeleteQueuedLocked(podID PodIdentifier, podUID string) bool { + for i := range c.deleteQueue { + if c.deleteQueue[i].id == podID && c.deleteQueue[i].podUID == podUID { + return true + } + } + return false +} + func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool { // Check if user requested the pod to be ignored through annotations if v, ok := pod.Annotations[ignoreAnnotation]; ok { diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 8781db2c6bf8a..0a4ae5d50c926 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -57,6 +57,45 @@ func newPodIdentifier(from, name, value string) PodIdentifier { } } +func podWithContainerID(name, uid, containerID string, restartCount int32) *api_v1.Pod { + return &api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + }, + Status: api_v1.PodStatus{ + ContainerStatuses: []api_v1.ContainerStatus{{ + Name: "container", + ContainerID: "containerd://" + containerID, + RestartCount: restartCount, + }}, + }, + } +} + +func podWithIP(name, uid, podIP string, startTime time.Time) *api_v1.Pod { + return &api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: name, + UID: types.UID(uid), + }, + Status: api_v1.PodStatus{ + PodIP: podIP, + StartTime: &meta_v1.Time{Time: startTime}, + }, + } +} + +func assertDeleteQueueContains(t *testing.T, queue []deleteRequest, id PodIdentifier, podUID string) { + t.Helper() + for i := range queue { + if queue[i].id == id && queue[i].podUID == podUID { + return + } + } + assert.Failf(t, "missing delete request", "delete queue does not contain id %v for pod UID %q", id, podUID) +} + func podAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj any)) { assert.Empty(t, c.Pods) @@ -420,6 +459,133 @@ func TestPodUpdate(t *testing.T) { }) } +func TestPodUpdateCleansStaleContainerIDAssociation(t *testing.T) { + c, _ := newTestClient(t) + c.Rules = ExtractionRules{ContainerID: true} + c.Associations = []Association{{ + Sources: []AssociationSource{{ + From: ResourceSource, + Name: "container.id", + }}, + }} + + oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") + newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") + pod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 0) + + c.handlePodAdd(pod) + require.Contains(t, c.Pods, oldID) + require.Len(t, c.Pods, 2) + + updatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 1) + c.handlePodUpdate(pod, updatedPod) + + require.Contains(t, c.Pods, oldID) + require.Contains(t, c.Pods, newID) + require.Len(t, c.deleteQueue, 1) + assert.Equal(t, oldID, c.deleteQueue[0].id) + assert.Equal(t, "pod-uid", c.deleteQueue[0].podUID) + assert.Contains(t, c.podIdentifiers["pod-uid"], oldID) + assert.Contains(t, c.podIdentifiers["pod-uid"], newID) + + c.deleteLoopProcessing(time.Hour) + + assert.Contains(t, c.Pods, oldID) + assert.Contains(t, c.Pods, newID) + + c.deleteLoopProcessing(0) + + assert.NotContains(t, c.Pods, oldID) + assert.Contains(t, c.Pods, newID) + assert.Contains(t, c.podIdentifiers["pod-uid"], newID) +} + +func TestPodDeleteContainerIDAssociation(t *testing.T) { + c, _ := newTestClient(t) + c.Rules = ExtractionRules{ContainerID: true} + c.Associations = []Association{{ + Sources: []AssociationSource{{ + From: ResourceSource, + Name: "container.id", + }}, + }} + + oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") + newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") + pod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 0) + updatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 1) + + c.handlePodAdd(pod) + c.handlePodUpdate(pod, updatedPod) + c.deleteQueue = nil + + c.handlePodDelete(updatedPod) + + require.Len(t, c.deleteQueue, 3) + assertDeleteQueueContains(t, c.deleteQueue, oldID, "pod-uid") + assertDeleteQueueContains(t, c.deleteQueue, newID, "pod-uid") + assertDeleteQueueContains(t, c.deleteQueue, newPodIdentifier(ResourceSource, "k8s.pod.uid", "pod-uid"), "pod-uid") + + c.deleteLoopProcessing(0) + + assert.NotContains(t, c.Pods, oldID) + assert.NotContains(t, c.Pods, newID) + assert.NotContains(t, c.podIdentifiers, "pod-uid") +} + +func TestPodUpdateWithoutUIDDoesNotUseReverseIndexCleanup(t *testing.T) { + c, _ := newTestClient(t) + c.Rules = ExtractionRules{ContainerID: true} + c.Associations = []Association{{ + Sources: []AssociationSource{{ + From: ResourceSource, + Name: "container.id", + }}, + }} + + oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") + newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") + pod := podWithContainerID("pod-a", "", "old-container-id", 0) + updatedPod := podWithContainerID("pod-a", "", "new-container-id", 1) + + c.handlePodAdd(pod) + c.handlePodUpdate(pod, updatedPod) + + assert.Contains(t, c.Pods, oldID) + assert.Contains(t, c.Pods, newID) + assert.Empty(t, c.deleteQueue) + assert.Empty(t, c.podIdentifiers) +} + +func TestPodUpdateDoesNotDeleteSharedIdentifierOwnedByNewerPod(t *testing.T) { + c, _ := newTestClient(t) + c.Associations = []Association{{ + Sources: []AssociationSource{{ + From: ResourceSource, + Name: "k8s.pod.ip", + }}, + }} + + sharedIP := newPodIdentifier(ResourceSource, "k8s.pod.ip", "1.1.1.1") + podAUID := newPodIdentifier(ResourceSource, "k8s.pod.uid", "pod-a-uid") + podA := podWithIP("pod-a", "pod-a-uid", "1.1.1.1", time.Now()) + podB := podWithIP("pod-b", "pod-b-uid", "1.1.1.1", time.Now().Add(time.Second)) + + c.handlePodAdd(podA) + c.handlePodAdd(podB) + c.handlePodDelete(podA) + + require.Contains(t, c.Pods, sharedIP) + assert.Equal(t, "pod-b-uid", c.Pods[sharedIP].PodUID) + + c.deleteLoopProcessing(0) + + assert.NotContains(t, c.Pods, podAUID) + assert.NotContains(t, c.podIdentifiers, "pod-a-uid") + require.Contains(t, c.Pods, sharedIP) + assert.Equal(t, "pod-b-uid", c.Pods[sharedIP].PodUID) +} + func TestNamespaceUpdate(t *testing.T) { c, _ := newTestClient(t) namespaceAddAndUpdateTest(t, c, func(obj any) { From f961c3cc1b996a91c542cb0ff19cfaf7bd6d1d25 Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Mon, 18 May 2026 09:26:51 +0000 Subject: [PATCH 2/9] fix: apply codex review --- .../internal/kube/client.go | 169 ++++++++++-------- .../internal/kube/client_test.go | 41 ++++- .../internal/kube/kube.go | 3 + 3 files changed, 136 insertions(+), 77 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 281a5e085a2af..5212844eddba4 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -34,21 +34,24 @@ import ( // WatchClient is the main interface provided by this package to a kubernetes cluster. type WatchClient struct { - m sync.RWMutex - deleteMut sync.Mutex - logger *zap.Logger - kc kubernetes.Interface - mc clientmeta.Interface - informer cache.SharedInformer - namespaceInformer cache.SharedInformer - nodeInformer cache.SharedInformer - deploymentInformer cache.SharedInformer - statefulsetInformer cache.SharedInformer - daemonsetInformer cache.SharedInformer - jobInformer cache.SharedInformer - replicasetInformer cache.SharedInformer - cronJobRegex *regexp.Regexp - deleteQueue []deleteRequest + m sync.RWMutex + deleteMut sync.Mutex + logger *zap.Logger + kc kubernetes.Interface + mc clientmeta.Interface + informer cache.SharedInformer + namespaceInformer cache.SharedInformer + nodeInformer cache.SharedInformer + deploymentInformer cache.SharedInformer + statefulsetInformer cache.SharedInformer + daemonsetInformer cache.SharedInformer + jobInformer cache.SharedInformer + replicasetInformer cache.SharedInformer + cronJobRegex *regexp.Regexp + deleteQueue []*deleteRequest + // pendingStaleDeletes indexes delayed deletes created when an identifier disappears on pod update. + // The value points to the matching deleteQueue entry so it can be cancelled if the identifier returns. + pendingStaleDeletes map[pendingStaleDeleteKey]*deleteRequest stopCh chan struct{} waitForMetadata bool waitForMetadataTimeout time.Duration @@ -109,6 +112,11 @@ const podTemplateHashLabel = "pod-template-hash" var errCannotRetrieveImage = errors.New("cannot retrieve image name") +type pendingStaleDeleteKey struct { + podUID string + id PodIdentifier +} + type InformersFactoryList struct { newInformer InformerProvider newNamespaceInformer InformerProviderNamespace @@ -756,6 +764,7 @@ func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) { func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { var cutoff int now := time.Now() + c.m.Lock() c.deleteMut.Lock() for i := range c.deleteQueue { d := c.deleteQueue[i] @@ -766,12 +775,17 @@ func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { } toDelete := c.deleteQueue[:cutoff] c.deleteQueue = c.deleteQueue[cutoff:] - c.deleteMut.Unlock() - c.m.Lock() deleted := false for i := range toDelete { d := toDelete[i] + if d.cancelled { + continue + } + key := pendingStaleDeleteKey{podUID: d.podUID, id: d.id} + if c.pendingStaleDeletes[key] == d { + delete(c.pendingStaleDeletes, key) + } if p, ok := c.Pods[d.id]; ok { // Sanity check: make sure we are deleting the same pod // and the underlying state (ip<>pod mapping) has not changed. @@ -782,6 +796,7 @@ func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { } } } + c.deleteMut.Unlock() if deleted { c.compactPodMap() @@ -1603,6 +1618,7 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) int { if newPod.PodUID != "" { staleIdentifiers = c.getStalePodIdentifiersForDeletionLocked(newPod.PodUID, identifiers) } + currentIdentifiers := make([]PodIdentifier, 0, len(identifiers)) for i := range identifiers { id := identifiers[i] // compare initial scheduled timestamp for existing pod and new pod with same identifier @@ -1617,6 +1633,12 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) int { } c.Pods[id] = newPod c.addPodIdentifierLocked(newPod.PodUID, id) + currentIdentifiers = append(currentIdentifiers, id) + } + if len(currentIdentifiers) > 0 { + c.deleteMut.Lock() + c.cancelStaleDeletesLocked(newPod.PodUID, currentIdentifiers) + c.deleteMut.Unlock() } podTableSize := len(c.Pods) c.m.Unlock() @@ -1634,10 +1656,9 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { c.m.Lock() if podUID != "" { // Use the reverse index so deletes include identifiers that disappeared from the final pod object. - deleteRequests = c.deleteRequestsForPodLocked(podUID, identifiers) + deleteRequests = c.buildDeletionRequestsForPodLocked(podUID, identifiers) } else { // Pods without a UID are not tracked in the reverse index; preserve the existing identifier-based cleanup. - identifiers = c.existingPodIdentifiersLocked(podUID, identifiers) deleteRequests = c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiers) } podTableSize := len(c.Pods) @@ -1647,10 +1668,27 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { return podTableSize } -func (c *WatchClient) podTableSize() int { - c.m.RLock() - defer c.m.RUnlock() - return len(c.Pods) +// getStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state. +func (c *WatchClient) getStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { + indexedIdentifiers := c.podIdentifiers[podUID] + if len(indexedIdentifiers) == 0 { + return nil + } + // Construct a set of current identifiers in order to diff against the indexed identifiers. + currentIdentifiers := make(map[PodIdentifier]struct{}, len(identifiers)) + for i := range identifiers { + currentIdentifiers[identifiers[i]] = struct{}{} + } + staleIdentifiers := make([]deleteRequest, 0) + for id := range indexedIdentifiers { + if _, ok := currentIdentifiers[id]; ok { + continue + } + if p, ok := c.Pods[id]; ok && p.PodUID == podUID { + staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID}) + } + } + return staleIdentifiers } // addPodIdentifierLocked records that id is currently associated with podUID. @@ -1681,35 +1719,30 @@ func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) } } -// getStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state. -func (c *WatchClient) getStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { - // Construct a set of current identifiers in order to diff against the indexed identifiers. - currentIdentifiers := make(map[PodIdentifier]struct{}, len(identifiers)) - for i := range identifiers { - currentIdentifiers[identifiers[i]] = struct{}{} +// cancelStaleDeletesLocked invalidates delayed stale deletes when their identifiers become current again. +func (c *WatchClient) cancelStaleDeletesLocked(podUID string, identifiers []PodIdentifier) { + if podUID == "" || len(c.pendingStaleDeletes) == 0 { + return } - indexedIdentifiers := c.podIdentifiers[podUID] - staleIdentifiers := make([]deleteRequest, 0) - for id := range indexedIdentifiers { - if _, ok := currentIdentifiers[id]; ok { - continue - } - if p, ok := c.Pods[id]; ok && p.PodUID == podUID { - staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID}) + for i := range identifiers { + key := pendingStaleDeleteKey{podUID: podUID, id: identifiers[i]} + if request := c.pendingStaleDeletes[key]; request != nil { + request.cancelled = true + delete(c.pendingStaleDeletes, key) } } - return staleIdentifiers } -// deleteRequestsForPodLocked returns delete requests for all identifiers known for podUID. -func (c *WatchClient) deleteRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { +// buildDeletionRequestsForPodLocked gathers identifiers known for podUID, then builds delayed delete requests for them. +func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { if len(c.podIdentifiers[podUID]) == 0 { - return c.buildDeletionRequestsForIdentifiersLocked(podUID, c.existingPodIdentifiersLocked(podUID, identifiers)) + return c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiers) } indexedIdentifiers := c.podIdentifiers[podUID] identifiersToDelete := make([]PodIdentifier, 0, len(indexedIdentifiers)) seen := map[PodIdentifier]struct{}{} + // Prefer identifiers recomputed from the delete event. for i := range identifiers { id := identifiers[i] @@ -1747,25 +1780,8 @@ func (c *WatchClient) buildDeletionRequestsForIdentifiersLocked(podUID string, i return requests } -// existingPodIdentifiersLocked filters identifiers to those still mapped to podUID in c.Pods. -func (c *WatchClient) existingPodIdentifiersLocked(podUID string, identifiers []PodIdentifier) []PodIdentifier { - existingIdentifiers := make([]PodIdentifier, 0, len(identifiers)) - for i := range identifiers { - id := identifiers[i] - p, ok := c.Pods[id] - if !ok { - continue - } - if p.PodUID != podUID { - continue - } - existingIdentifiers = append(existingIdentifiers, id) - } - return existingIdentifiers -} - // appendDeleteRequests appends requests to the delayed delete queue. -func (c *WatchClient) appendDeleteRequests(requests []deleteRequest, skipQueued bool) { +func (c *WatchClient) appendDeleteRequests(requests []deleteRequest, cancellable bool) { if len(requests) == 0 { return } @@ -1773,28 +1789,29 @@ func (c *WatchClient) appendDeleteRequests(requests []deleteRequest, skipQueued now := time.Now() for i := range requests { request := requests[i] - // Stale identifiers can be observed on many pod updates before the delete grace period expires. - // Avoid queueing the same delayed delete repeatedly in that path. - if skipQueued && c.isDeleteQueuedLocked(request.id, request.podUID) { - continue + request.ts = now + req := &request + if cancellable { + // Cancellable requests are created for identifiers that disappeared on pod update. + // Index one pending request per identifier so it can be cancelled if the identifier returns. + key := pendingStaleDeleteKey{podUID: request.podUID, id: request.id} + if _, ok := c.pendingStaleDeletes[key]; ok { + continue + } + if c.pendingStaleDeletes == nil { + c.pendingStaleDeletes = map[pendingStaleDeleteKey]*deleteRequest{} + } + c.pendingStaleDeletes[key] = req } - c.deleteQueue = append(c.deleteQueue, deleteRequest{ - id: request.id, - podUID: request.podUID, - ts: now, - }) + c.deleteQueue = append(c.deleteQueue, req) } c.deleteMut.Unlock() } -// isDeleteQueuedLocked reports whether podID is already waiting for delayed deletion. -func (c *WatchClient) isDeleteQueuedLocked(podID PodIdentifier, podUID string) bool { - for i := range c.deleteQueue { - if c.deleteQueue[i].id == podID && c.deleteQueue[i].podUID == podUID { - return true - } - } - return false +func (c *WatchClient) podTableSize() int { + c.m.RLock() + defer c.m.RUnlock() + return len(c.Pods) } func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool { diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index 0a4ae5d50c926..efca592559e47 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -86,7 +86,7 @@ func podWithIP(name, uid, podIP string, startTime time.Time) *api_v1.Pod { } } -func assertDeleteQueueContains(t *testing.T, queue []deleteRequest, id PodIdentifier, podUID string) { +func assertDeleteQueueContains(t *testing.T, queue []*deleteRequest, id PodIdentifier, podUID string) { t.Helper() for i := range queue { if queue[i].id == id && queue[i].podUID == podUID { @@ -500,6 +500,45 @@ func TestPodUpdateCleansStaleContainerIDAssociation(t *testing.T) { assert.Contains(t, c.podIdentifiers["pod-uid"], newID) } +func TestPodUpdateCancelStaleIdentifierDeletion(t *testing.T) { + c, _ := newTestClient(t) + c.Rules = ExtractionRules{ContainerID: true} + c.Associations = []Association{{ + Sources: []AssociationSource{{ + From: ResourceSource, + Name: "container.id", + }}, + }} + + oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") + newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") + pod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 0) + updatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 1) + restoredPod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 2) + + c.handlePodAdd(pod) + c.handlePodUpdate(pod, updatedPod) + + require.Len(t, c.deleteQueue, 1) + assert.Equal(t, oldID, c.deleteQueue[0].id) + require.Len(t, c.pendingStaleDeletes, 1) + + c.handlePodUpdate(updatedPod, restoredPod) + + require.Len(t, c.deleteQueue, 2) + assert.True(t, c.deleteQueue[0].cancelled) + assert.Equal(t, oldID, c.deleteQueue[0].id) + assert.False(t, c.deleteQueue[1].cancelled) + assert.Equal(t, newID, c.deleteQueue[1].id) + require.Len(t, c.pendingStaleDeletes, 1) + + c.deleteLoopProcessing(0) + + assert.Contains(t, c.Pods, oldID) + assert.NotContains(t, c.Pods, newID) + assert.Empty(t, c.pendingStaleDeletes) +} + func TestPodDeleteContainerIDAssociation(t *testing.T) { c, _ := newTestClient(t) c.Rules = ExtractionRules{ContainerID: true} diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 7101d74319ca8..e1a0519e58fb8 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -175,6 +175,9 @@ type deleteRequest struct { // contains uid of pod to remove from pods map podUID string ts time.Time + // cancelled is true when this delayed delete was created for a stale identifier, + // but the identifier became current again before the grace period expired. + cancelled bool } // Filters is used to instruct the client on how to filter out k8s pods. From b85821d359a538dd710ed1b659b2185c020a8f98 Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Mon, 18 May 2026 09:46:45 +0000 Subject: [PATCH 3/9] refactor: regroup both deletion path under buildDeletionRequestsForPodLocked --- .../k8sattributesprocessor/internal/kube/client.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 5212844eddba4..a15041d37a5de 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -1654,13 +1654,7 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { deleteRequests := make([]deleteRequest, 0) c.m.Lock() - if podUID != "" { - // Use the reverse index so deletes include identifiers that disappeared from the final pod object. - deleteRequests = c.buildDeletionRequestsForPodLocked(podUID, identifiers) - } else { - // Pods without a UID are not tracked in the reverse index; preserve the existing identifier-based cleanup. - deleteRequests = c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiers) - } + deleteRequests = c.buildDeletionRequestsForPodLocked(podUID, identifiers) podTableSize := len(c.Pods) c.m.Unlock() @@ -1735,10 +1729,12 @@ func (c *WatchClient) cancelStaleDeletesLocked(podUID string, identifiers []PodI // buildDeletionRequestsForPodLocked gathers identifiers known for podUID, then builds delayed delete requests for them. func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { - if len(c.podIdentifiers[podUID]) == 0 { + if podUID == "" || len(c.podIdentifiers[podUID]) == 0 { + // Pods without a UID are not tracked in the reverse index; preserve the existing identifier-based cleanup. return c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiers) } + // Use the reverse index so deletes include identifiers that disappeared from the final pod object. indexedIdentifiers := c.podIdentifiers[podUID] identifiersToDelete := make([]PodIdentifier, 0, len(indexedIdentifiers)) seen := map[PodIdentifier]struct{}{} From 3e0ffe3faa5cfa5741381142ac45806c4a1f64db Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Wed, 20 May 2026 13:33:27 +0000 Subject: [PATCH 4/9] feat: add changelog + remove unneeded checks --- ...fix-k8sattributes-stale-container-ids.yaml | 27 +++++++++++++++++++ .../internal/kube/client.go | 5 +--- 2 files changed, 28 insertions(+), 4 deletions(-) create mode 100644 .chloggen/fix-k8sattributes-stale-container-ids.yaml diff --git a/.chloggen/fix-k8sattributes-stale-container-ids.yaml b/.chloggen/fix-k8sattributes-stale-container-ids.yaml new file mode 100644 index 0000000000000..26c044004d5b8 --- /dev/null +++ b/.chloggen/fix-k8sattributes-stale-container-ids.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: processor/k8s_attributes + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Prevent unbounded memory growth by cleaning up stale pod identifiers, including container.id entries left behind after container restarts + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [48398] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index a15041d37a5de..ae557b8023c7f 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -1690,9 +1690,6 @@ func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) { if podUID == "" { return } - if c.podIdentifiers == nil { - c.podIdentifiers = map[string]map[PodIdentifier]struct{}{} - } identifiers := c.podIdentifiers[podUID] if identifiers == nil { identifiers = map[PodIdentifier]struct{}{} @@ -1703,7 +1700,7 @@ func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) { // removePodIdentifierLocked removes id from podUID's reverse index entry. func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) { - if podUID == "" || c.podIdentifiers == nil { + if podUID == "" { return } identifiers := c.podIdentifiers[podUID] From e853f29485463ee727d04fd3253671c9577bd589 Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Wed, 20 May 2026 14:55:12 +0000 Subject: [PATCH 5/9] feat: replace pendingStaleDletes with time marker --- .../internal/kube/client.go | 139 ++++++++---------- .../internal/kube/client_test.go | 40 ++++- .../internal/kube/kube.go | 3 - 3 files changed, 94 insertions(+), 88 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index ae557b8023c7f..91b2d55e03c52 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -34,24 +34,21 @@ import ( // WatchClient is the main interface provided by this package to a kubernetes cluster. type WatchClient struct { - m sync.RWMutex - deleteMut sync.Mutex - logger *zap.Logger - kc kubernetes.Interface - mc clientmeta.Interface - informer cache.SharedInformer - namespaceInformer cache.SharedInformer - nodeInformer cache.SharedInformer - deploymentInformer cache.SharedInformer - statefulsetInformer cache.SharedInformer - daemonsetInformer cache.SharedInformer - jobInformer cache.SharedInformer - replicasetInformer cache.SharedInformer - cronJobRegex *regexp.Regexp - deleteQueue []*deleteRequest - // pendingStaleDeletes indexes delayed deletes created when an identifier disappears on pod update. - // The value points to the matching deleteQueue entry so it can be cancelled if the identifier returns. - pendingStaleDeletes map[pendingStaleDeleteKey]*deleteRequest + m sync.RWMutex + deleteMut sync.Mutex + logger *zap.Logger + kc kubernetes.Interface + mc clientmeta.Interface + informer cache.SharedInformer + namespaceInformer cache.SharedInformer + nodeInformer cache.SharedInformer + deploymentInformer cache.SharedInformer + statefulsetInformer cache.SharedInformer + daemonsetInformer cache.SharedInformer + jobInformer cache.SharedInformer + replicasetInformer cache.SharedInformer + cronJobRegex *regexp.Regexp + deleteQueue []deleteRequest stopCh chan struct{} waitForMetadata bool waitForMetadataTimeout time.Duration @@ -62,7 +59,7 @@ type WatchClient struct { Pods map[PodIdentifier]*Pod // podIdentifiers tracks the PodIdentifier keys associated with each pod UID so // identifiers that disappear on pod update can be cleaned after the delete grace period. - podIdentifiers map[string]map[PodIdentifier]struct{} + podIdentifiers map[string]map[PodIdentifier]podIdentifierState Rules ExtractionRules Filters Filters Associations []Association @@ -112,9 +109,10 @@ const podTemplateHashLabel = "pod-template-hash" var errCannotRetrieveImage = errors.New("cannot retrieve image name") -type pendingStaleDeleteKey struct { - podUID string - id PodIdentifier +type podIdentifierState struct { + // staleSinceUnixNano is zero when the identifier is currently active. + // Non-zero values identify since when the stale period represented by a delayed delete request started. + staleSinceUnixNano int64 } type InformersFactoryList struct { @@ -157,7 +155,7 @@ func New( } c.Pods = map[PodIdentifier]*Pod{} - c.podIdentifiers = map[string]map[PodIdentifier]struct{}{} + c.podIdentifiers = map[string]map[PodIdentifier]podIdentifierState{} c.Namespaces = map[string]*Namespace{} c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} @@ -764,7 +762,6 @@ func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) { func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { var cutoff int now := time.Now() - c.m.Lock() c.deleteMut.Lock() for i := range c.deleteQueue { d := c.deleteQueue[i] @@ -775,28 +772,37 @@ func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { } toDelete := c.deleteQueue[:cutoff] c.deleteQueue = c.deleteQueue[cutoff:] + c.deleteMut.Unlock() + c.m.Lock() deleted := false for i := range toDelete { d := toDelete[i] - if d.cancelled { - continue - } - key := pendingStaleDeleteKey{podUID: d.podUID, id: d.id} - if c.pendingStaleDeletes[key] == d { - delete(c.pendingStaleDeletes, key) + identifiers := c.podIdentifiers[d.podUID] + identifierState, indexed := identifiers[d.id] + if indexed { + // staleSinceUnixNano == 0 means the identifier is currently active. + // Otherwise, we check if the stale period is identical to the delete request timestamp in order to avoid deleting the identifier prematurely. + if identifierState.staleSinceUnixNano == 0 || identifierState.staleSinceUnixNano != d.ts.UnixNano() { + continue + } + delete(identifiers, d.id) + if len(identifiers) == 0 { + delete(c.podIdentifiers, d.podUID) + } } if p, ok := c.Pods[d.id]; ok { // Sanity check: make sure we are deleting the same pod // and the underlying state (ip<>pod mapping) has not changed. if p.PodUID == d.podUID { delete(c.Pods, d.id) - c.removePodIdentifierLocked(d.podUID, d.id) + if !indexed { + c.removePodIdentifierLocked(d.podUID, d.id) + } deleted = true } } } - c.deleteMut.Unlock() if deleted { c.compactPodMap() @@ -1616,9 +1622,8 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) int { c.m.Lock() if newPod.PodUID != "" { - staleIdentifiers = c.getStalePodIdentifiersForDeletionLocked(newPod.PodUID, identifiers) + staleIdentifiers = c.markStalePodIdentifiersForDeletionLocked(newPod.PodUID, identifiers) } - currentIdentifiers := make([]PodIdentifier, 0, len(identifiers)) for i := range identifiers { id := identifiers[i] // compare initial scheduled timestamp for existing pod and new pod with same identifier @@ -1633,17 +1638,11 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) int { } c.Pods[id] = newPod c.addPodIdentifierLocked(newPod.PodUID, id) - currentIdentifiers = append(currentIdentifiers, id) - } - if len(currentIdentifiers) > 0 { - c.deleteMut.Lock() - c.cancelStaleDeletesLocked(newPod.PodUID, currentIdentifiers) - c.deleteMut.Unlock() } podTableSize := len(c.Pods) c.m.Unlock() - c.appendDeleteRequests(staleIdentifiers, true) + c.appendDeleteRequests(staleIdentifiers) return podTableSize } @@ -1658,12 +1657,12 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { podTableSize := len(c.Pods) c.m.Unlock() - c.appendDeleteRequests(deleteRequests, false) + c.appendDeleteRequests(deleteRequests) return podTableSize } -// getStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state. -func (c *WatchClient) getStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { +// markStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state and marks them as stale by setting the staleSinceUnixNano field. +func (c *WatchClient) markStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { indexedIdentifiers := c.podIdentifiers[podUID] if len(indexedIdentifiers) == 0 { return nil @@ -1674,12 +1673,19 @@ func (c *WatchClient) getStalePodIdentifiersForDeletionLocked(podUID string, ide currentIdentifiers[identifiers[i]] = struct{}{} } staleIdentifiers := make([]deleteRequest, 0) + now := time.Now() for id := range indexedIdentifiers { if _, ok := currentIdentifiers[id]; ok { continue } if p, ok := c.Pods[id]; ok && p.PodUID == podUID { - staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID}) + identifierState := indexedIdentifiers[id] + if identifierState.staleSinceUnixNano != 0 { + continue + } + identifierState.staleSinceUnixNano = now.UnixNano() + indexedIdentifiers[id] = identifierState + staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID, ts: now}) } } return staleIdentifiers @@ -1692,10 +1698,10 @@ func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) { } identifiers := c.podIdentifiers[podUID] if identifiers == nil { - identifiers = map[PodIdentifier]struct{}{} + identifiers = map[PodIdentifier]podIdentifierState{} c.podIdentifiers[podUID] = identifiers } - identifiers[id] = struct{}{} + identifiers[id] = podIdentifierState{} } // removePodIdentifierLocked removes id from podUID's reverse index entry. @@ -1710,20 +1716,6 @@ func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) } } -// cancelStaleDeletesLocked invalidates delayed stale deletes when their identifiers become current again. -func (c *WatchClient) cancelStaleDeletesLocked(podUID string, identifiers []PodIdentifier) { - if podUID == "" || len(c.pendingStaleDeletes) == 0 { - return - } - for i := range identifiers { - key := pendingStaleDeleteKey{podUID: podUID, id: identifiers[i]} - if request := c.pendingStaleDeletes[key]; request != nil { - request.cancelled = true - delete(c.pendingStaleDeletes, key) - } - } -} - // buildDeletionRequestsForPodLocked gathers identifiers known for podUID, then builds delayed delete requests for them. func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { if podUID == "" || len(c.podIdentifiers[podUID]) == 0 { @@ -1762,19 +1754,23 @@ func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifie // buildDeletionRequestsForIdentifiersLocked converts identifiers owned by podUID into delayed delete requests. func (c *WatchClient) buildDeletionRequestsForIdentifiersLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { requests := make([]deleteRequest, 0, len(identifiers)) + now := time.Now() for i := range identifiers { id := identifiers[i] p, ok := c.Pods[id] if !ok || p.PodUID != podUID { continue } - requests = append(requests, deleteRequest{id: id, podUID: podUID}) + if indexedIdentifiers := c.podIdentifiers[podUID]; indexedIdentifiers != nil { + indexedIdentifiers[id] = podIdentifierState{staleSinceUnixNano: now.UnixNano()} + } + requests = append(requests, deleteRequest{id: id, podUID: podUID, ts: now}) } return requests } // appendDeleteRequests appends requests to the delayed delete queue. -func (c *WatchClient) appendDeleteRequests(requests []deleteRequest, cancellable bool) { +func (c *WatchClient) appendDeleteRequests(requests []deleteRequest) { if len(requests) == 0 { return } @@ -1782,21 +1778,10 @@ func (c *WatchClient) appendDeleteRequests(requests []deleteRequest, cancellable now := time.Now() for i := range requests { request := requests[i] - request.ts = now - req := &request - if cancellable { - // Cancellable requests are created for identifiers that disappeared on pod update. - // Index one pending request per identifier so it can be cancelled if the identifier returns. - key := pendingStaleDeleteKey{podUID: request.podUID, id: request.id} - if _, ok := c.pendingStaleDeletes[key]; ok { - continue - } - if c.pendingStaleDeletes == nil { - c.pendingStaleDeletes = map[pendingStaleDeleteKey]*deleteRequest{} - } - c.pendingStaleDeletes[key] = req + if request.ts.IsZero() { + request.ts = now } - c.deleteQueue = append(c.deleteQueue, req) + c.deleteQueue = append(c.deleteQueue, request) } c.deleteMut.Unlock() } diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index efca592559e47..b0bf4a986bf57 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -86,7 +86,7 @@ func podWithIP(name, uid, podIP string, startTime time.Time) *api_v1.Pod { } } -func assertDeleteQueueContains(t *testing.T, queue []*deleteRequest, id PodIdentifier, podUID string) { +func assertDeleteQueueContains(t *testing.T, queue []deleteRequest, id PodIdentifier, podUID string) { t.Helper() for i := range queue { if queue[i].id == id && queue[i].podUID == podUID { @@ -515,28 +515,52 @@ func TestPodUpdateCancelStaleIdentifierDeletion(t *testing.T) { pod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 0) updatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 1) restoredPod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 2) + secondUpdatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 3) c.handlePodAdd(pod) c.handlePodUpdate(pod, updatedPod) require.Len(t, c.deleteQueue, 1) assert.Equal(t, oldID, c.deleteQueue[0].id) - require.Len(t, c.pendingStaleDeletes, 1) + assert.NotZero(t, c.podIdentifiers["pod-uid"][oldID].staleSinceUnixNano) c.handlePodUpdate(updatedPod, restoredPod) require.Len(t, c.deleteQueue, 2) - assert.True(t, c.deleteQueue[0].cancelled) assert.Equal(t, oldID, c.deleteQueue[0].id) - assert.False(t, c.deleteQueue[1].cancelled) assert.Equal(t, newID, c.deleteQueue[1].id) - require.Len(t, c.pendingStaleDeletes, 1) + assert.Zero(t, c.podIdentifiers["pod-uid"][oldID].staleSinceUnixNano) + assert.NotZero(t, c.podIdentifiers["pod-uid"][newID].staleSinceUnixNano) - c.deleteLoopProcessing(0) + c.handlePodUpdate(restoredPod, secondUpdatedPod) + + require.Len(t, c.deleteQueue, 3) + assert.Equal(t, oldID, c.deleteQueue[0].id) + assert.Equal(t, newID, c.deleteQueue[1].id) + assert.Equal(t, oldID, c.deleteQueue[2].id) + assert.NotZero(t, c.podIdentifiers["pod-uid"][oldID].staleSinceUnixNano) + assert.Zero(t, c.podIdentifiers["pod-uid"][newID].staleSinceUnixNano) + + gracePeriod := time.Hour + expiredTS := c.deleteQueue[2].ts.Add(-2 * gracePeriod) + c.deleteQueue[0].ts = expiredTS + c.deleteQueue[1].ts = expiredTS + + c.deleteLoopProcessing(gracePeriod) assert.Contains(t, c.Pods, oldID) - assert.NotContains(t, c.Pods, newID) - assert.Empty(t, c.pendingStaleDeletes) + assert.Contains(t, c.Pods, newID) + assert.Contains(t, c.podIdentifiers["pod-uid"], oldID) + assert.Contains(t, c.podIdentifiers["pod-uid"], newID) + require.Len(t, c.deleteQueue, 1) + assert.Equal(t, oldID, c.deleteQueue[0].id) + + c.deleteLoopProcessing(0) + + assert.NotContains(t, c.Pods, oldID) + assert.Contains(t, c.Pods, newID) + assert.NotContains(t, c.podIdentifiers["pod-uid"], oldID) + assert.Contains(t, c.podIdentifiers["pod-uid"], newID) } func TestPodDeleteContainerIDAssociation(t *testing.T) { diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index e1a0519e58fb8..7101d74319ca8 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -175,9 +175,6 @@ type deleteRequest struct { // contains uid of pod to remove from pods map podUID string ts time.Time - // cancelled is true when this delayed delete was created for a stale identifier, - // but the identifier became current again before the grace period expired. - cancelled bool } // Filters is used to instruct the client on how to filter out k8s pods. From 6d0535df9d302eec6ca8b19cf59cce4fd20240f0 Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Thu, 21 May 2026 08:55:38 +0000 Subject: [PATCH 6/9] chore: apply review + lint --- .../internal/kube/client.go | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 91b2d55e03c52..d1ff465de967f 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -400,7 +400,7 @@ func (c *WatchClient) Stop() { } func (c *WatchClient) handlePodAdd(obj any) { - podTableSize := 0 + var podTableSize int if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodAdded.Add(context.Background(), 1) } @@ -422,7 +422,7 @@ func (c *WatchClient) handlePodAdd(obj any) { } func (c *WatchClient) handlePodUpdate(_, newPod any) { - podTableSize := 0 + var podTableSize int if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodUpdated.Add(context.Background(), 1) } @@ -445,7 +445,7 @@ func (c *WatchClient) handlePodUpdate(_, newPod any) { } func (c *WatchClient) handlePodDelete(obj any) { - podTableSize := 0 + var podTableSize int if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() { c.telemetryBuilder.OtelsvcK8sPodDeleted.Add(context.Background(), 1) } @@ -759,6 +759,20 @@ func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) { } } +// deleteLoopProcessing drains delete requests that are past the grace period. +// +// Delete requests come from two paths. +// - Pod updates queue a request when an +// identifier disappears from the current pod state +// - Pod deletes queue requests for every indexed identifier for that UID, +// including historical identifiers missing from the final pod object. +// +// podIdentifiers stores the per-identifier state used by both paths. Active +// identifiers have a zero staleSinceUnixNano. Identifiers pending deletion keep +// the same timestamp as their delete request ts. At delete time, the timestamps +// must match; otherwise the queued request is obsolete because the identifier +// became active again or entered a newer stale period before the grace period +// expired. func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { var cutoff int now := time.Now() @@ -1650,10 +1664,9 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { podToRemove := c.podFromAPI(pod) identifiers := c.getIdentifiersFromAssoc(podToRemove) podUID := string(pod.UID) - deleteRequests := make([]deleteRequest, 0) c.m.Lock() - deleteRequests = c.buildDeletionRequestsForPodLocked(podUID, identifiers) + deleteRequests := c.buildDeletionRequestsForPodLocked(podUID, identifiers) podTableSize := len(c.Pods) c.m.Unlock() From 5c04d0d6f0870543375f54b6de0140e32bdfa789 Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Thu, 21 May 2026 14:23:14 +0000 Subject: [PATCH 7/9] apply review from Gandem --- .../internal/kube/client.go | 21 +++++----------- .../internal/kube/client_test.go | 24 +++++++++---------- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index d1ff465de967f..0352f1a4bdd9a 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -800,10 +800,7 @@ func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { if identifierState.staleSinceUnixNano == 0 || identifierState.staleSinceUnixNano != d.ts.UnixNano() { continue } - delete(identifiers, d.id) - if len(identifiers) == 0 { - delete(c.podIdentifiers, d.podUID) - } + c.removePodIdentifierLocked(d.podUID, d.id) } if p, ok := c.Pods[d.id]; ok { // Sanity check: make sure we are deleting the same pod @@ -1731,13 +1728,13 @@ func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) // buildDeletionRequestsForPodLocked gathers identifiers known for podUID, then builds delayed delete requests for them. func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { - if podUID == "" || len(c.podIdentifiers[podUID]) == 0 { + indexedIdentifiers := c.podIdentifiers[podUID] + if podUID == "" || len(indexedIdentifiers) == 0 { // Pods without a UID are not tracked in the reverse index; preserve the existing identifier-based cleanup. return c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiers) } // Use the reverse index so deletes include identifiers that disappeared from the final pod object. - indexedIdentifiers := c.podIdentifiers[podUID] identifiersToDelete := make([]PodIdentifier, 0, len(indexedIdentifiers)) seen := map[PodIdentifier]struct{}{} @@ -1768,13 +1765,14 @@ func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifie func (c *WatchClient) buildDeletionRequestsForIdentifiersLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { requests := make([]deleteRequest, 0, len(identifiers)) now := time.Now() + indexedIdentifiers := c.podIdentifiers[podUID] for i := range identifiers { id := identifiers[i] p, ok := c.Pods[id] if !ok || p.PodUID != podUID { continue } - if indexedIdentifiers := c.podIdentifiers[podUID]; indexedIdentifiers != nil { + if indexedIdentifiers != nil { indexedIdentifiers[id] = podIdentifierState{staleSinceUnixNano: now.UnixNano()} } requests = append(requests, deleteRequest{id: id, podUID: podUID, ts: now}) @@ -1788,14 +1786,7 @@ func (c *WatchClient) appendDeleteRequests(requests []deleteRequest) { return } c.deleteMut.Lock() - now := time.Now() - for i := range requests { - request := requests[i] - if request.ts.IsZero() { - request.ts = now - } - c.deleteQueue = append(c.deleteQueue, request) - } + c.deleteQueue = append(c.deleteQueue, requests...) c.deleteMut.Unlock() } diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index b0bf4a986bf57..f6dcc64cbbaf5 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -57,10 +57,10 @@ func newPodIdentifier(from, name, value string) PodIdentifier { } } -func podWithContainerID(name, uid, containerID string, restartCount int32) *api_v1.Pod { +func podWithContainerID(uid, containerID string, restartCount int32) *api_v1.Pod { return &api_v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ - Name: name, + Name: "pod-a", UID: types.UID(uid), }, Status: api_v1.PodStatus{ @@ -471,13 +471,13 @@ func TestPodUpdateCleansStaleContainerIDAssociation(t *testing.T) { oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") - pod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 0) + pod := podWithContainerID("pod-uid", "old-container-id", 0) c.handlePodAdd(pod) require.Contains(t, c.Pods, oldID) require.Len(t, c.Pods, 2) - updatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 1) + updatedPod := podWithContainerID("pod-uid", "new-container-id", 1) c.handlePodUpdate(pod, updatedPod) require.Contains(t, c.Pods, oldID) @@ -512,10 +512,10 @@ func TestPodUpdateCancelStaleIdentifierDeletion(t *testing.T) { oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") - pod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 0) - updatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 1) - restoredPod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 2) - secondUpdatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 3) + pod := podWithContainerID("pod-uid", "old-container-id", 0) + updatedPod := podWithContainerID("pod-uid", "new-container-id", 1) + restoredPod := podWithContainerID("pod-uid", "old-container-id", 2) + secondUpdatedPod := podWithContainerID("pod-uid", "new-container-id", 3) c.handlePodAdd(pod) c.handlePodUpdate(pod, updatedPod) @@ -575,8 +575,8 @@ func TestPodDeleteContainerIDAssociation(t *testing.T) { oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") - pod := podWithContainerID("pod-a", "pod-uid", "old-container-id", 0) - updatedPod := podWithContainerID("pod-a", "pod-uid", "new-container-id", 1) + pod := podWithContainerID("pod-uid", "old-container-id", 0) + updatedPod := podWithContainerID("pod-uid", "new-container-id", 1) c.handlePodAdd(pod) c.handlePodUpdate(pod, updatedPod) @@ -608,8 +608,8 @@ func TestPodUpdateWithoutUIDDoesNotUseReverseIndexCleanup(t *testing.T) { oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") - pod := podWithContainerID("pod-a", "", "old-container-id", 0) - updatedPod := podWithContainerID("pod-a", "", "new-container-id", 1) + pod := podWithContainerID("", "old-container-id", 0) + updatedPod := podWithContainerID("", "new-container-id", 1) c.handlePodAdd(pod) c.handlePodUpdate(pod, updatedPod) From 810533933a17689f0ea727cee8837ccbdf55e71d Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Thu, 21 May 2026 16:30:06 +0000 Subject: [PATCH 8/9] Apply Tyler's review --- .../k8sattributesprocessor/internal/kube/client.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index 0352f1a4bdd9a..ea1814bf1503c 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -59,7 +59,7 @@ type WatchClient struct { Pods map[PodIdentifier]*Pod // podIdentifiers tracks the PodIdentifier keys associated with each pod UID so // identifiers that disappear on pod update can be cleaned after the delete grace period. - podIdentifiers map[string]map[PodIdentifier]podIdentifierState + podIdentifiers map[podUID]map[PodIdentifier]podIdentifierState Rules ExtractionRules Filters Filters Associations []Association @@ -109,9 +109,12 @@ const podTemplateHashLabel = "pod-template-hash" var errCannotRetrieveImage = errors.New("cannot retrieve image name") +type podUID = string + type podIdentifierState struct { // staleSinceUnixNano is zero when the identifier is currently active. - // Non-zero values identify since when the stale period represented by a delayed delete request started. + // Non-zero values identify when the identifier became stale. + // They must match the delete request timestamp so an old request cannot shorten a newer stale period. staleSinceUnixNano int64 } @@ -155,7 +158,7 @@ func New( } c.Pods = map[PodIdentifier]*Pod{} - c.podIdentifiers = map[string]map[PodIdentifier]podIdentifierState{} + c.podIdentifiers = map[podUID]map[PodIdentifier]podIdentifierState{} c.Namespaces = map[string]*Namespace{} c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} @@ -1672,6 +1675,7 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { } // markStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state and marks them as stale by setting the staleSinceUnixNano field. +// The caller must hold c.m. func (c *WatchClient) markStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { indexedIdentifiers := c.podIdentifiers[podUID] if len(indexedIdentifiers) == 0 { @@ -1702,6 +1706,7 @@ func (c *WatchClient) markStalePodIdentifiersForDeletionLocked(podUID string, id } // addPodIdentifierLocked records that id is currently associated with podUID. +// The caller must hold c.m. func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) { if podUID == "" { return @@ -1715,6 +1720,7 @@ func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) { } // removePodIdentifierLocked removes id from podUID's reverse index entry. +// The caller must hold c.m. func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) { if podUID == "" { return @@ -1727,6 +1733,7 @@ func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) } // buildDeletionRequestsForPodLocked gathers identifiers known for podUID, then builds delayed delete requests for them. +// The caller must hold c.m. func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { indexedIdentifiers := c.podIdentifiers[podUID] if podUID == "" || len(indexedIdentifiers) == 0 { @@ -1762,6 +1769,7 @@ func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifie } // buildDeletionRequestsForIdentifiersLocked converts identifiers owned by podUID into delayed delete requests. +// The caller must hold c.m. func (c *WatchClient) buildDeletionRequestsForIdentifiersLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { requests := make([]deleteRequest, 0, len(identifiers)) now := time.Now() From 13029dd373e80212b478a9fd83fa59102e980c5e Mon Sep 17 00:00:00 2001 From: Martin Levesque Date: Fri, 22 May 2026 08:51:52 +0000 Subject: [PATCH 9/9] feat: rollback to version without time marker --- .../internal/kube/client.go | 94 ++++++++----------- .../internal/kube/client_test.go | 76 ++------------- 2 files changed, 49 insertions(+), 121 deletions(-) diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index ea1814bf1503c..ba68eec379599 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -59,7 +59,7 @@ type WatchClient struct { Pods map[PodIdentifier]*Pod // podIdentifiers tracks the PodIdentifier keys associated with each pod UID so // identifiers that disappear on pod update can be cleaned after the delete grace period. - podIdentifiers map[podUID]map[PodIdentifier]podIdentifierState + podIdentifiers map[podUID]map[PodIdentifier]struct{} Rules ExtractionRules Filters Filters Associations []Association @@ -111,13 +111,6 @@ var errCannotRetrieveImage = errors.New("cannot retrieve image name") type podUID = string -type podIdentifierState struct { - // staleSinceUnixNano is zero when the identifier is currently active. - // Non-zero values identify when the identifier became stale. - // They must match the delete request timestamp so an old request cannot shorten a newer stale period. - staleSinceUnixNano int64 -} - type InformersFactoryList struct { newInformer InformerProvider newNamespaceInformer InformerProviderNamespace @@ -158,7 +151,7 @@ func New( } c.Pods = map[PodIdentifier]*Pod{} - c.podIdentifiers = map[podUID]map[PodIdentifier]podIdentifierState{} + c.podIdentifiers = map[podUID]map[PodIdentifier]struct{}{} c.Namespaces = map[string]*Namespace{} c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} @@ -762,20 +755,9 @@ func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) { } } -// deleteLoopProcessing drains delete requests that are past the grace period. -// -// Delete requests come from two paths. -// - Pod updates queue a request when an -// identifier disappears from the current pod state -// - Pod deletes queue requests for every indexed identifier for that UID, -// including historical identifiers missing from the final pod object. -// -// podIdentifiers stores the per-identifier state used by both paths. Active -// identifiers have a zero staleSinceUnixNano. Identifiers pending deletion keep -// the same timestamp as their delete request ts. At delete time, the timestamps -// must match; otherwise the queued request is obsolete because the identifier -// became active again or entered a newer stale period before the grace period -// expired. +// deleteLoopProcessing removes pod associations from Pods and podIdentifiers once +// their gracePeriod as elapsed. +// Note: if a podIdentifier is deleted, re-added and deleted again within the gracePeriod, the gracePeriod might not be respected. func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { var cutoff int now := time.Now() @@ -795,24 +777,12 @@ func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) { deleted := false for i := range toDelete { d := toDelete[i] - identifiers := c.podIdentifiers[d.podUID] - identifierState, indexed := identifiers[d.id] - if indexed { - // staleSinceUnixNano == 0 means the identifier is currently active. - // Otherwise, we check if the stale period is identical to the delete request timestamp in order to avoid deleting the identifier prematurely. - if identifierState.staleSinceUnixNano == 0 || identifierState.staleSinceUnixNano != d.ts.UnixNano() { - continue - } - c.removePodIdentifierLocked(d.podUID, d.id) - } if p, ok := c.Pods[d.id]; ok { // Sanity check: make sure we are deleting the same pod // and the underlying state (ip<>pod mapping) has not changed. if p.PodUID == d.podUID { delete(c.Pods, d.id) - if !indexed { - c.removePodIdentifierLocked(d.podUID, d.id) - } + c.removePodIdentifierLocked(d.podUID, d.id) deleted = true } } @@ -1674,7 +1644,7 @@ func (c *WatchClient) forgetPod(pod *api_v1.Pod) int { return podTableSize } -// markStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state and marks them as stale by setting the staleSinceUnixNano field. +// markStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state. // The caller must hold c.m. func (c *WatchClient) markStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { indexedIdentifiers := c.podIdentifiers[podUID] @@ -1687,19 +1657,12 @@ func (c *WatchClient) markStalePodIdentifiersForDeletionLocked(podUID string, id currentIdentifiers[identifiers[i]] = struct{}{} } staleIdentifiers := make([]deleteRequest, 0) - now := time.Now() for id := range indexedIdentifiers { if _, ok := currentIdentifiers[id]; ok { continue } if p, ok := c.Pods[id]; ok && p.PodUID == podUID { - identifierState := indexedIdentifiers[id] - if identifierState.staleSinceUnixNano != 0 { - continue - } - identifierState.staleSinceUnixNano = now.UnixNano() - indexedIdentifiers[id] = identifierState - staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID, ts: now}) + staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID}) } } return staleIdentifiers @@ -1713,10 +1676,10 @@ func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) { } identifiers := c.podIdentifiers[podUID] if identifiers == nil { - identifiers = map[PodIdentifier]podIdentifierState{} + identifiers = map[PodIdentifier]struct{}{} c.podIdentifiers[podUID] = identifiers } - identifiers[id] = podIdentifierState{} + identifiers[id] = struct{}{} } // removePodIdentifierLocked removes id from podUID's reverse index entry. @@ -1772,18 +1735,18 @@ func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifie // The caller must hold c.m. func (c *WatchClient) buildDeletionRequestsForIdentifiersLocked(podUID string, identifiers []PodIdentifier) []deleteRequest { requests := make([]deleteRequest, 0, len(identifiers)) - now := time.Now() - indexedIdentifiers := c.podIdentifiers[podUID] + seen := make(map[PodIdentifier]struct{}, len(identifiers)) for i := range identifiers { id := identifiers[i] + if _, ok := seen[id]; ok { + continue + } + seen[id] = struct{}{} p, ok := c.Pods[id] if !ok || p.PodUID != podUID { continue } - if indexedIdentifiers != nil { - indexedIdentifiers[id] = podIdentifierState{staleSinceUnixNano: now.UnixNano()} - } - requests = append(requests, deleteRequest{id: id, podUID: podUID, ts: now}) + requests = append(requests, deleteRequest{id: id, podUID: podUID}) } return requests } @@ -1794,10 +1757,33 @@ func (c *WatchClient) appendDeleteRequests(requests []deleteRequest) { return } c.deleteMut.Lock() - c.deleteQueue = append(c.deleteQueue, requests...) + now := time.Now() + for i := range requests { + request := requests[i] + // Stale identifiers can be observed on many pod updates before the delete grace period expires. + // Avoid queueing the same delayed delete repeatedly in that path. + if c.isDeleteQueuedLocked(request.id, request.podUID) { + continue + } + c.deleteQueue = append(c.deleteQueue, deleteRequest{ + id: request.id, + podUID: request.podUID, + ts: now, + }) + } c.deleteMut.Unlock() } +// isDeleteQueuedLocked reports whether podID is already waiting for delayed deletion. +func (c *WatchClient) isDeleteQueuedLocked(podID PodIdentifier, podUID string) bool { + for i := range c.deleteQueue { + if c.deleteQueue[i].id == podID && c.deleteQueue[i].podUID == podUID { + return true + } + } + return false +} + func (c *WatchClient) podTableSize() int { c.m.RLock() defer c.m.RUnlock() diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index f6dcc64cbbaf5..d100605ff8817 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -500,69 +500,6 @@ func TestPodUpdateCleansStaleContainerIDAssociation(t *testing.T) { assert.Contains(t, c.podIdentifiers["pod-uid"], newID) } -func TestPodUpdateCancelStaleIdentifierDeletion(t *testing.T) { - c, _ := newTestClient(t) - c.Rules = ExtractionRules{ContainerID: true} - c.Associations = []Association{{ - Sources: []AssociationSource{{ - From: ResourceSource, - Name: "container.id", - }}, - }} - - oldID := newPodIdentifier(ResourceSource, "container.id", "old-container-id") - newID := newPodIdentifier(ResourceSource, "container.id", "new-container-id") - pod := podWithContainerID("pod-uid", "old-container-id", 0) - updatedPod := podWithContainerID("pod-uid", "new-container-id", 1) - restoredPod := podWithContainerID("pod-uid", "old-container-id", 2) - secondUpdatedPod := podWithContainerID("pod-uid", "new-container-id", 3) - - c.handlePodAdd(pod) - c.handlePodUpdate(pod, updatedPod) - - require.Len(t, c.deleteQueue, 1) - assert.Equal(t, oldID, c.deleteQueue[0].id) - assert.NotZero(t, c.podIdentifiers["pod-uid"][oldID].staleSinceUnixNano) - - c.handlePodUpdate(updatedPod, restoredPod) - - require.Len(t, c.deleteQueue, 2) - assert.Equal(t, oldID, c.deleteQueue[0].id) - assert.Equal(t, newID, c.deleteQueue[1].id) - assert.Zero(t, c.podIdentifiers["pod-uid"][oldID].staleSinceUnixNano) - assert.NotZero(t, c.podIdentifiers["pod-uid"][newID].staleSinceUnixNano) - - c.handlePodUpdate(restoredPod, secondUpdatedPod) - - require.Len(t, c.deleteQueue, 3) - assert.Equal(t, oldID, c.deleteQueue[0].id) - assert.Equal(t, newID, c.deleteQueue[1].id) - assert.Equal(t, oldID, c.deleteQueue[2].id) - assert.NotZero(t, c.podIdentifiers["pod-uid"][oldID].staleSinceUnixNano) - assert.Zero(t, c.podIdentifiers["pod-uid"][newID].staleSinceUnixNano) - - gracePeriod := time.Hour - expiredTS := c.deleteQueue[2].ts.Add(-2 * gracePeriod) - c.deleteQueue[0].ts = expiredTS - c.deleteQueue[1].ts = expiredTS - - c.deleteLoopProcessing(gracePeriod) - - assert.Contains(t, c.Pods, oldID) - assert.Contains(t, c.Pods, newID) - assert.Contains(t, c.podIdentifiers["pod-uid"], oldID) - assert.Contains(t, c.podIdentifiers["pod-uid"], newID) - require.Len(t, c.deleteQueue, 1) - assert.Equal(t, oldID, c.deleteQueue[0].id) - - c.deleteLoopProcessing(0) - - assert.NotContains(t, c.Pods, oldID) - assert.Contains(t, c.Pods, newID) - assert.NotContains(t, c.podIdentifiers["pod-uid"], oldID) - assert.Contains(t, c.podIdentifiers["pod-uid"], newID) -} - func TestPodDeleteContainerIDAssociation(t *testing.T) { c, _ := newTestClient(t) c.Rules = ExtractionRules{ContainerID: true} @@ -703,7 +640,7 @@ func TestPodDelete(t *testing.T) { tsBeforeDelete := time.Now() c.handlePodDelete(pod) assert.Len(t, c.Pods, 5) - assert.Len(t, c.deleteQueue, 3) + assert.Len(t, c.deleteQueue, 2) deleteRequest := c.deleteQueue[0] assert.Equal(t, newPodIdentifier("connection", "k8s.pod.ip", "1.1.1.1"), deleteRequest.id) assert.False(t, deleteRequest.ts.Before(tsBeforeDelete)) @@ -718,7 +655,7 @@ func TestPodDelete(t *testing.T) { tsBeforeDelete = time.Now() c.handlePodDelete(cache.DeletedFinalStateUnknown{Obj: pod}) assert.Len(t, c.Pods, 5) - assert.Len(t, c.deleteQueue, 5) + assert.Len(t, c.deleteQueue, 3) deleteRequest = c.deleteQueue[0] assert.Equal(t, newPodIdentifier("connection", "k8s.pod.ip", "2.2.2.2"), deleteRequest.id) assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", deleteRequest.podUID) @@ -729,6 +666,11 @@ func TestPodDelete(t *testing.T) { assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", deleteRequest.podUID) assert.False(t, deleteRequest.ts.Before(tsBeforeDelete)) assert.False(t, deleteRequest.ts.After(time.Now())) + deleteRequest = c.deleteQueue[2] + assert.Equal(t, newPodIdentifier("resource_attribute", "k8s.pod.ip", "2.2.2.2"), deleteRequest.id) + assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", deleteRequest.podUID) + assert.False(t, deleteRequest.ts.Before(tsBeforeDelete)) + assert.False(t, deleteRequest.ts.After(time.Now())) } func TestNamespaceDelete(t *testing.T) { @@ -813,7 +755,7 @@ func TestDeleteLoop(t *testing.T) { c.handlePodDelete(pod) assert.Len(t, c.Pods, 2) - assert.Len(t, c.deleteQueue, 3) + assert.Len(t, c.deleteQueue, 2) gracePeriod := time.Millisecond * 500 go c.deleteLoop(time.Millisecond, gracePeriod) @@ -823,7 +765,7 @@ func TestDeleteLoop(t *testing.T) { assert.Len(t, c.Pods, 2) c.m.Unlock() c.deleteMut.Lock() - assert.Len(t, c.deleteQueue, 3) + assert.Len(t, c.deleteQueue, 2) c.deleteMut.Unlock() time.Sleep(gracePeriod + (time.Millisecond * 50))