Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .chloggen/fix-4074-initialize-scrape-target-labels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
component: target allocator

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Initialize scrape target labels the same way Prometheus does to prevent hash inconsistencies.

# One or more tracking issues related to the change
issues: [4074]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Target labels are now populated with scrape config defaults (job, __metrics_path__, __scheme__,
__scrape_interval__, __scrape_timeout__) before hashing, matching Prometheus's PopulateDiscoveredLabels
behavior. This eliminates hash collisions for targets with the same address but different scrape
configurations.
2 changes: 1 addition & 1 deletion cmd/otel-allocator/internal/prehook/relabel.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (tf *relabelConfigTargetFilter) Apply(targets []*target.Item) []*target.Ite
if keepTarget {
// Compute hash immediately while we have the builder, skipping meta labels.
// This avoids materializing the filtered labels.
hash := target.HashFromBuilder(builder, tItem.JobName)
hash := target.HashFromBuilder(builder)
targets[writeIndex] = target.NewItem(
tItem.JobName,
tItem.TargetURL,
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/internal/prehook/relabel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func MakeTargetFromProm(rCfgs []*relabel.Config, rawTarget *target.Item) (*targe
}

// Compute the hash from the builder, skipping meta labels
hash := target.HashFromBuilder(lb, rawTarget.JobName)
hash := target.HashFromBuilder(lb)
newTarget := target.NewItem(
rawTarget.JobName,
rawTarget.TargetURL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ <h1>Collector: test-collector2</h1>
<a href="/debug/job?job_id=test-job">test-job</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/target?target_hash=14478377932637680246">test-url</a>
<a href="/debug/target?target_hash=4436829824815111996">test-url</a>
</td>
<td style="vertical-align: top;">

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ <h1>Collector: test-collector2</h1>
<a href="/debug/job?job_id=test-job">test-job</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/target?target_hash=14478377932637680246">test-url</a>
<a href="/debug/target?target_hash=4436829824815111996">test-url</a>
</td>
<td style="vertical-align: top;">

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ <h1>OpenTelemetry Target Allocator</h1>
<a href="/debug/targets">Targets</a>
</td>
<td style="vertical-align: top;">
3
2
</td>
</tr>
</table>
Expand Down Expand Up @@ -76,10 +76,10 @@ <h1>OpenTelemetry Target Allocator</h1>
<a href="/debug/collector?collector_id=test-collector2">test-collector2</a>
</td>
<td style="vertical-align: top;">
1
0
</td>
<td style="vertical-align: top;">
1
0
</td>
</tr>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ <h1>OpenTelemetry Target Allocator - Targets</h1>
</thead>
<tr style="background: #eee">
<td style="vertical-align: top;">
<a href="/debug/job?job_id=test-job2">test-job2</a>
<a href="/debug/job?job_id=test-job">test-job</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/target?target_hash=7138104773299337929">test-url3</a>
<a href="/debug/target?target_hash=4436829824815111996">test-url</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/collector?collector_id=test-collector1">test-collector1</a>
Expand All @@ -45,24 +45,10 @@ <h1>OpenTelemetry Target Allocator - Targets</h1>
</tr>
<tr >
<td style="vertical-align: top;">
<a href="/debug/job?job_id=test-job">test-job</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/target?target_hash=9613545242644869502">test-url2</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/collector?collector_id=test-collector2">test-collector2</a>
</td>
<td style="vertical-align: top;">

</td>
</tr>
<tr style="background: #eee">
<td style="vertical-align: top;">
<a href="/debug/job?job_id=test-job">test-job</a>
<a href="/debug/job?job_id=test-job2">test-job2</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/target?target_hash=14478377932637680246">test-url</a>
<a href="/debug/target?target_hash=15568808877678109432">test-url3</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/collector?collector_id=test-collector1">test-collector1</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ <h1>OpenTelemetry Target Allocator - Targets</h1>
<a href="/debug/job?job_id=test-job">test-job</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/target?target_hash=14478377932637680246">test-url</a>
<a href="/debug/target?target_hash=4436829824815111996">test-url</a>
</td>
<td style="vertical-align: top;">
<a href="/debug/collector?collector_id=test-collector1">test-collector1</a>
Expand Down
142 changes: 105 additions & 37 deletions cmd/otel-allocator/internal/target/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"context"
"hash"
"hash/fnv"
"maps"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -36,6 +34,7 @@ type Discoverer struct {
close chan struct{}
mtxScrape sync.Mutex // Guards the fields below.
configsMap map[allocatorWatcher.EventSource][]*promconfig.ScrapeConfig
jobToScrapeConfig map[string]*promconfig.ScrapeConfig
hook discoveryHook
scrapeConfigsHash hash.Hash
scrapeConfigsUpdater scrapeConfigsUpdater
Expand Down Expand Up @@ -109,7 +108,7 @@ func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, scrapeConf
// If the hash has changed, updated stored hash and send the new config.
// Otherwise, skip updating scrape configs.
if m.scrapeConfigsUpdater != nil && m.scrapeConfigsHash != hash {
err := m.scrapeConfigsUpdater.UpdateScrapeConfigResponse(jobToScrapeConfig)
err = m.scrapeConfigsUpdater.UpdateScrapeConfigResponse(jobToScrapeConfig)
if err != nil {
return err
}
Expand All @@ -120,7 +119,20 @@ func (m *Discoverer) ApplyConfig(source allocatorWatcher.EventSource, scrapeConf
if m.hook != nil {
m.hook.SetConfig(relabelCfg)
}
return m.manager.ApplyConfig(discoveryCfg)

err = m.manager.ApplyConfig(discoveryCfg)
if err != nil {
return err
}

// Store scrape configs only after all operations succeed to avoid
// inconsistent state on partial failure. Guard with mtxScrape since
// Reload() reads this field concurrently under the same lock.
m.mtxScrape.Lock()
m.jobToScrapeConfig = jobToScrapeConfig
m.mtxScrape.Unlock()

return nil
}

func (m *Discoverer) Run() error {
Expand Down Expand Up @@ -184,14 +196,16 @@ func (m *Discoverer) Reload() {
}
targets := make([]*Item, targetCount)

jobToScrapeConfig := m.jobToScrapeConfig
targetsAssigned := 0
for jobName, groups := range m.targetSets {
wg.Add(1)
scrapeConfig := jobToScrapeConfig[jobName]
// Run the sync in parallel as these take a while and at high load can't catch up.
go func(jobName string, groups []*targetgroup.Group, intoTargets []*Item) {
m.processTargetGroups(jobName, groups, intoTargets)
go func(jobName string, groups []*targetgroup.Group, intoTargets []*Item, cfg *promconfig.ScrapeConfig) {
m.processTargetGroups(jobName, groups, intoTargets, cfg)
wg.Done()
}(jobName, groups, targets[targetsAssigned:])
}(jobName, groups, targets[targetsAssigned:], scrapeConfig)
for _, group := range groups {
targetsAssigned += len(group.Targets)
}
Expand All @@ -201,13 +215,15 @@ func (m *Discoverer) Reload() {
m.processTargetsCallBack(targets)
}

// processTargetGroups processes the target groups and returns a map of targets.
func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.Group, intoTargets []*Item) {
// the builder for group labels
// processTargetGroups processes the target groups and populates labels the same way
// Prometheus does (via scrape.PopulateDiscoveredLabels) before creating target items.
// This ensures the label set includes scrape config defaults (job, metrics_path,
// scheme, scrape_interval, scrape_timeout) so that target hashes are consistent
// with what Prometheus computes.
func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.Group, intoTargets []*Item, cfg *promconfig.ScrapeConfig) {
lb := labels.NewBuilder(labels.EmptyLabels())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe we had this in the past, but it became a major performance issue, have you run any performance benchmarks to ensure we don't have a regression?

groupBuilder := labels.NewScratchBuilder(labelBuilderPreallocSize)

// a slice for sorting target label names, we allocate it here to avoid doing it in the hot loop
targetLabelNames := make([]string, 0, labelBuilderPreallocSize)
defaults := newDiscoveredLabelDefaults(cfg)

begin := time.Now()
defer func() {
Expand All @@ -216,39 +232,91 @@ func (m *Discoverer) processTargetGroups(jobName string, groups []*targetgroup.G
var count float64
index := 0
for _, tg := range groups {
groupBuilder.Reset()
for ln, lv := range tg.Labels {
groupBuilder.Add(string(ln), string(lv))
}
groupBuilder.Sort()
groupLabels := populateGroupLabels(&groupBuilder, tg.Labels)
for _, t := range tg.Targets {
count++
// ScratchBuilder is a struct containing a slice of labels. By assigning to a new variable, we get a copy
// of the struct, with a new slice pointing to the same underlying array. As long as we don't mutate the
// original slice and only append to it, we can avoid copying the group labels.
targetBuilder := groupBuilder
targetLabelNames = targetLabelNames[:0]

// We can't sort the whole builder slice, because that would modify the underlying groupBuilder. Instead,
// we sort the labels in a separate slice. As a result, the group labels and the target labels are sorted
// subslices of the builder slice, which is in itself not sorted. This is fine, as we don't care what the
// order of labels is - just that it's consistent, so the hash is always the same.
for ln := range maps.Keys(t) {
targetLabelNames = append(targetLabelNames, string(ln))
}
slices.Sort(targetLabelNames)
for _, ln := range targetLabelNames {
lv := t[model.LabelName(ln)]
targetBuilder.Add(ln, string(lv))
}
item := NewItem(jobName, string(t[model.AddressLabel]), targetBuilder.Labels(), "")
lset := populateDiscoveredLabels(lb, defaults, t, groupLabels)
item := NewItem(jobName, string(t[model.AddressLabel]), lset, "")
intoTargets[index] = item
index++
}
}
m.targetsDiscovered.Record(context.Background(), count, metric.WithAttributes(attribute.String("job.name", jobName)))
}

func populateGroupLabels(groupBuilder *labels.ScratchBuilder, tgLabels model.LabelSet) labels.Labels {
groupBuilder.Reset()
for ln, lv := range tgLabels {
if lv != "" {
groupBuilder.Add(string(ln), string(lv))
}
}
groupBuilder.Sort()
return groupBuilder.Labels()
}

type discoveredLabelDefaults struct {
jobName string
scrapeInterval string
scrapeTimeout string
metricsPath string
scheme string
params []labels.Label
}

func newDiscoveredLabelDefaults(cfg *promconfig.ScrapeConfig) discoveredLabelDefaults {
defaults := discoveredLabelDefaults{
jobName: cfg.JobName,
scrapeInterval: cfg.ScrapeInterval.String(),
scrapeTimeout: cfg.ScrapeTimeout.String(),
metricsPath: cfg.MetricsPath,
scheme: cfg.Scheme,
}

if len(cfg.Params) > 0 {
defaults.params = make([]labels.Label, 0, len(cfg.Params))
for k, v := range cfg.Params {
if len(v) > 0 {
defaults.params = append(defaults.params, labels.Label{
Name: model.ParamLabelPrefix + k,
Value: v[0],
})
}
}
}

return defaults
}

// populateDiscoveredLabels matches scrape.PopulateDiscoveredLabels. Keep this
// local hot-path helper in sync with Prometheus and covered by the comparison
// test, because calling the Prometheus helper directly repeats group-label and
// scrape-default work for every target.
func populateDiscoveredLabels(lb *labels.Builder, defaults discoveredLabelDefaults, tLabels model.LabelSet, groupLabels labels.Labels) labels.Labels {
lb.Reset(groupLabels)
for ln, lv := range tLabels {
lb.Set(string(ln), string(lv))
}

setDefaultLabel(lb, model.JobLabel, defaults.jobName)
setDefaultLabel(lb, model.ScrapeIntervalLabel, defaults.scrapeInterval)
setDefaultLabel(lb, model.ScrapeTimeoutLabel, defaults.scrapeTimeout)
setDefaultLabel(lb, model.MetricsPathLabel, defaults.metricsPath)
setDefaultLabel(lb, model.SchemeLabel, defaults.scheme)

for _, param := range defaults.params {
setDefaultLabel(lb, param.Name, param.Value)
}

return lb.Labels()
}

func setDefaultLabel(lb *labels.Builder, name, value string) {
if lb.Get(name) == "" {
lb.Set(name, value)
}
}

// Run receives and saves target set updates and triggers the scraping loops reloading.
// Reloading happens in the background so that it doesn't block receiving targets updates.
func (m *Discoverer) run(tsets <-chan map[string][]*targetgroup.Group) error {
Expand Down
Loading
Loading