Skip to content
27 changes: 27 additions & 0 deletions .chloggen/fix-k8sattributes-stale-container-ids.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: processor/k8s_attributes

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Prevent unbounded memory growth by cleaning up stale pod identifiers, including container.id entries left behind after container restarts

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [48398]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
209 changes: 184 additions & 25 deletions processor/k8sattributesprocessor/internal/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ type WatchClient struct {

// A map containing Pod related data, used to associate them with resources.
// Key can be either an IP address or Pod UID
Pods map[PodIdentifier]*Pod
Rules ExtractionRules
Filters Filters
Associations []Association
Exclude Excludes
Pods map[PodIdentifier]*Pod
// podIdentifiers tracks the PodIdentifier keys associated with each pod UID so
// identifiers that disappear on pod update can be cleaned after the delete grace period.
podIdentifiers map[podUID]map[PodIdentifier]struct{}
Rules ExtractionRules
Filters Filters
Associations []Association
Exclude Excludes

// A map containing Namespace related data, used to associate them with resources.
// Key is namespace name
Expand Down Expand Up @@ -106,6 +109,8 @@ const podTemplateHashLabel = "pod-template-hash"

var errCannotRetrieveImage = errors.New("cannot retrieve image name")

type podUID = string

type InformersFactoryList struct {
newInformer InformerProvider
newNamespaceInformer InformerProviderNamespace
Expand Down Expand Up @@ -146,6 +151,7 @@ func New(
}

c.Pods = map[PodIdentifier]*Pod{}
c.podIdentifiers = map[podUID]map[PodIdentifier]struct{}{}
c.Namespaces = map[string]*Namespace{}
c.Nodes = map[string]*Node{}
c.ReplicaSets = map[string]*ReplicaSet{}
Expand Down Expand Up @@ -390,18 +396,19 @@ func (c *WatchClient) Stop() {
}

func (c *WatchClient) handlePodAdd(obj any) {
var podTableSize int
if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.OtelsvcK8sPodAdded.Add(context.Background(), 1)
}
if metadata.ProcessorK8sattributesTelemetryEnableNewFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.K8sWatcherPodAdded.Add(context.Background(), 1)
}
if pod, ok := obj.(*api_v1.Pod); ok {
c.addOrUpdatePod(pod)
podTableSize = c.addOrUpdatePod(pod)
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
podTableSize = c.podTableSize()
}
podTableSize := len(c.Pods)
if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize))
}
Expand All @@ -411,6 +418,7 @@ func (c *WatchClient) handlePodAdd(obj any) {
}

func (c *WatchClient) handlePodUpdate(_, newPod any) {
var podTableSize int
if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.OtelsvcK8sPodUpdated.Add(context.Background(), 1)
}
Expand All @@ -419,11 +427,11 @@ func (c *WatchClient) handlePodUpdate(_, newPod any) {
}
if pod, ok := newPod.(*api_v1.Pod); ok {
// TODO: update or remove based on whether container is ready/unready?.
c.addOrUpdatePod(pod)
podTableSize = c.addOrUpdatePod(pod)
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", newPod))
podTableSize = c.podTableSize()
}
podTableSize := len(c.Pods)
if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize))
}
Expand All @@ -433,18 +441,19 @@ func (c *WatchClient) handlePodUpdate(_, newPod any) {
}

func (c *WatchClient) handlePodDelete(obj any) {
var podTableSize int
if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.OtelsvcK8sPodDeleted.Add(context.Background(), 1)
}
if metadata.ProcessorK8sattributesTelemetryEnableNewFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.K8sWatcherPodDeleted.Add(context.Background(), 1)
}
if pod, ok := ignoreDeletedFinalStateUnknown(obj).(*api_v1.Pod); ok {
c.forgetPod(pod)
podTableSize = c.forgetPod(pod)
} else {
c.logger.Error("object received was not of type api_v1.Pod", zap.Any("received", obj))
podTableSize = c.podTableSize()
}
podTableSize := len(c.Pods)
if !metadata.ProcessorK8sattributesTelemetryDisableOldFormatMetricsFeatureGate.IsEnabled() {
c.telemetryBuilder.OtelsvcK8sPodTableSize.Record(context.Background(), int64(podTableSize))
}
Expand Down Expand Up @@ -746,6 +755,9 @@ func (c *WatchClient) deleteLoop(interval, gracePeriod time.Duration) {
}
}

// deleteLoopProcessing removes pod associations from Pods and podIdentifiers once
// their gracePeriod as elapsed.
// Note: if a podIdentifier is deleted, re-added and deleted again within the gracePeriod, the gracePeriod might not be respected.
func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) {
var cutoff int
now := time.Now()
Expand All @@ -770,6 +782,7 @@ func (c *WatchClient) deleteLoopProcessing(gracePeriod time.Duration) {
// and the underlying state (ip<>pod mapping) has not changed.
if p.PodUID == d.podUID {
delete(c.Pods, d.id)
c.removePodIdentifierLocked(d.podUID, d.id)
deleted = true
}
}
Expand Down Expand Up @@ -1586,13 +1599,15 @@ func (c *WatchClient) getIdentifiersFromAssoc(pod *Pod) []PodIdentifier {
return ids
}

func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) int {
newPod := c.podFromAPI(pod)
identifiers := c.getIdentifiersFromAssoc(newPod)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call use to happen inside the lock. Is it safe to move it outside?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be safe. getIdentifiersFromAssoc only reads c.Associations from the WatchClient, and that config is not mutated at runtime (only used here). Everything else it uses comes from the local newPod.

staleIdentifiers := make([]deleteRequest, 0)

c.m.Lock()
defer c.m.Unlock()

identifiers := c.getIdentifiersFromAssoc(newPod)
if newPod.PodUID != "" {
staleIdentifiers = c.markStalePodIdentifiersForDeletionLocked(newPod.PodUID, identifiers)
}
for i := range identifiers {
id := identifiers[i]
// compare initial scheduled timestamp for existing pod and new pod with same identifier
Expand All @@ -1603,34 +1618,178 @@ func (c *WatchClient) addOrUpdatePod(pod *api_v1.Pod) {
if pod.Status.StartTime.Before(p.StartTime) {
continue
}
c.removePodIdentifierLocked(p.PodUID, id)
}
c.Pods[id] = newPod
c.addPodIdentifierLocked(newPod.PodUID, id)
}
podTableSize := len(c.Pods)
c.m.Unlock()

c.appendDeleteRequests(staleIdentifiers)
return podTableSize
}

func (c *WatchClient) forgetPod(pod *api_v1.Pod) {
func (c *WatchClient) forgetPod(pod *api_v1.Pod) int {
podToRemove := c.podFromAPI(pod)
identifiers := c.getIdentifiersFromAssoc(podToRemove)
podUID := string(pod.UID)

c.m.Lock()
deleteRequests := c.buildDeletionRequestsForPodLocked(podUID, identifiers)
podTableSize := len(c.Pods)
c.m.Unlock()

c.appendDeleteRequests(deleteRequests)
return podTableSize
}

// markStalePodIdentifiersForDeletionLocked returns delete requests for indexed identifiers missing from the current pod state.
// The caller must hold c.m.
func (c *WatchClient) markStalePodIdentifiersForDeletionLocked(podUID string, identifiers []PodIdentifier) []deleteRequest {
indexedIdentifiers := c.podIdentifiers[podUID]
if len(indexedIdentifiers) == 0 {
return nil
}
// Construct a set of current identifiers in order to diff against the indexed identifiers.
currentIdentifiers := make(map[PodIdentifier]struct{}, len(identifiers))
for i := range identifiers {
currentIdentifiers[identifiers[i]] = struct{}{}
}
staleIdentifiers := make([]deleteRequest, 0)
for id := range indexedIdentifiers {
if _, ok := currentIdentifiers[id]; ok {
continue
}
if p, ok := c.Pods[id]; ok && p.PodUID == podUID {
staleIdentifiers = append(staleIdentifiers, deleteRequest{id: id, podUID: podUID})
}
}
return staleIdentifiers
}

// addPodIdentifierLocked records that id is currently associated with podUID.
// The caller must hold c.m.
func (c *WatchClient) addPodIdentifierLocked(podUID string, id PodIdentifier) {
Comment thread
TylerHelmuth marked this conversation as resolved.
if podUID == "" {
return
}
identifiers := c.podIdentifiers[podUID]
if identifiers == nil {
identifiers = map[PodIdentifier]struct{}{}
c.podIdentifiers[podUID] = identifiers
}
identifiers[id] = struct{}{}
}

// removePodIdentifierLocked removes id from podUID's reverse index entry.
// The caller must hold c.m.
func (c *WatchClient) removePodIdentifierLocked(podUID string, id PodIdentifier) {
if podUID == "" {
return
}
identifiers := c.podIdentifiers[podUID]
delete(identifiers, id)
if len(identifiers) == 0 {
delete(c.podIdentifiers, podUID)
}
}

// buildDeletionRequestsForPodLocked gathers identifiers known for podUID, then builds delayed delete requests for them.
// The caller must hold c.m.
func (c *WatchClient) buildDeletionRequestsForPodLocked(podUID string, identifiers []PodIdentifier) []deleteRequest {
indexedIdentifiers := c.podIdentifiers[podUID]
if podUID == "" || len(indexedIdentifiers) == 0 {
// Pods without a UID are not tracked in the reverse index; preserve the existing identifier-based cleanup.
return c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiers)
}

// Use the reverse index so deletes include identifiers that disappeared from the final pod object.
identifiersToDelete := make([]PodIdentifier, 0, len(indexedIdentifiers))
seen := map[PodIdentifier]struct{}{}

// Prefer identifiers recomputed from the delete event.
for i := range identifiers {
id := identifiers[i]
p, ok := c.GetPod(id)
if _, ok := indexedIdentifiers[id]; !ok {
continue
}
if p, ok := c.Pods[id]; ok && p.PodUID == podUID {
identifiersToDelete = append(identifiersToDelete, id)
seen[id] = struct{}{}
}
}
// Include indexed identifiers that are missing from the delete event, such as historical container IDs.
for id := range indexedIdentifiers {
if _, ok := seen[id]; ok {
continue
}
if p, ok := c.Pods[id]; ok && p.PodUID == podUID {
identifiersToDelete = append(identifiersToDelete, id)
}
}
return c.buildDeletionRequestsForIdentifiersLocked(podUID, identifiersToDelete)
}

if ok && p.PodUID == string(pod.UID) {
c.appendDeleteQueue(id, p.PodUID)
// buildDeletionRequestsForIdentifiersLocked converts identifiers owned by podUID into delayed delete requests.
// The caller must hold c.m.
func (c *WatchClient) buildDeletionRequestsForIdentifiersLocked(podUID string, identifiers []PodIdentifier) []deleteRequest {
requests := make([]deleteRequest, 0, len(identifiers))
seen := make(map[PodIdentifier]struct{}, len(identifiers))
for i := range identifiers {
id := identifiers[i]
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
p, ok := c.Pods[id]
if !ok || p.PodUID != podUID {
continue
}
requests = append(requests, deleteRequest{id: id, podUID: podUID})
}
return requests
}

func (c *WatchClient) appendDeleteQueue(podID PodIdentifier, podUID string) {
// appendDeleteRequests appends requests to the delayed delete queue.
func (c *WatchClient) appendDeleteRequests(requests []deleteRequest) {
if len(requests) == 0 {
return
}
c.deleteMut.Lock()
c.deleteQueue = append(c.deleteQueue, deleteRequest{
id: podID,
podUID: podUID,
ts: time.Now(),
})
now := time.Now()
for i := range requests {
request := requests[i]
// Stale identifiers can be observed on many pod updates before the delete grace period expires.
// Avoid queueing the same delayed delete repeatedly in that path.
if c.isDeleteQueuedLocked(request.id, request.podUID) {
continue
}
c.deleteQueue = append(c.deleteQueue, deleteRequest{
id: request.id,
podUID: request.podUID,
ts: now,
})
}
c.deleteMut.Unlock()
}

// isDeleteQueuedLocked reports whether podID is already waiting for delayed deletion.
func (c *WatchClient) isDeleteQueuedLocked(podID PodIdentifier, podUID string) bool {
for i := range c.deleteQueue {
if c.deleteQueue[i].id == podID && c.deleteQueue[i].podUID == podUID {
return true
}
}
return false
}

func (c *WatchClient) podTableSize() int {
c.m.RLock()
defer c.m.RUnlock()
return len(c.Pods)
}

func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool {
// Check if user requested the pod to be ignored through annotations
if v, ok := pod.Annotations[ignoreAnnotation]; ok {
Expand Down
Loading
Loading