diff --git a/attribute/set.go b/attribute/set.go index 1db0ed0e376..4fe47e1dc0a 100644 --- a/attribute/set.go +++ b/attribute/set.go @@ -211,6 +211,47 @@ func NewSet(kvs ...KeyValue) Set { return s } +// NewDistinctWithFilter returns a Distinct identifier for the filtered attribute set, +// and the sorted and de-duplicated slice of attributes. It modifies the input slice +// in-place to sort and de-duplicate the attributes. +// +// The returned Distinct represents the equivalence class of the attribute set after +// the filter is applied. The returned slice contains all unique attributes, including +// those that did not pass the filter. +func NewDistinctWithFilter(kvs []KeyValue, filter Filter) (Distinct, []KeyValue) { + if len(kvs) == 0 { + return Distinct{hash: emptyHash}, kvs + } + + // Stable sort so the following de-duplication can implement + // last-value-wins semantics. + slices.SortStableFunc(kvs, func(a, b KeyValue) int { + return cmp.Compare(a.Key, b.Key) + }) + + position := len(kvs) - 1 + offset := position - 1 + + // De-duplicate with last-value-wins semantics. + for ; offset >= 0; offset-- { + if kvs[offset].Key == kvs[position].Key { + continue + } + position-- + kvs[offset], kvs[position] = kvs[position], kvs[offset] + } + kvs = kvs[position:] + + h := xxhash.New() + for _, kv := range kvs { + if filter == nil || filter(kv) { + h = hashKV(h, kv) + } + } + + return Distinct{hash: h.Sum64()}, kvs +} + // NewSetWithSortable returns a new Set. See the documentation for // NewSetWithSortableFiltered for more details. // diff --git a/attribute/set_test.go b/attribute/set_test.go index 512bbd573ff..df4947be096 100644 --- a/attribute/set_test.go +++ b/attribute/set_test.go @@ -254,6 +254,42 @@ func TestFiltering(t *testing.T) { } } +func TestNewDistinctWithFilter(t *testing.T) { + kvs := []attribute.KeyValue{ + attribute.String("B", "1"), + attribute.String("A", "1"), + attribute.String("C", "1"), + attribute.String("B", "2"), // Duplicate key, should win over B=1 + attribute.String("D", "1"), + } + // Filter to keep A, B, and D. Drop C. + filter := func(kv attribute.KeyValue) bool { + return kv.Key != "C" + } + + // Create a copy since NewDistinctWithFilter modifies in-place. + input := make([]attribute.KeyValue, len(kvs)) + copy(input, kvs) + + distinct, compacted := attribute.NewDistinctWithFilter(input, filter) + + // Verify the returned slice is correctly sorted and de-duplicated. + // Note: the filter applies to the hash computation, NOT the physical slice returned. + require.Len(t, compacted, 4) + assert.Equal(t, "A", string(compacted[0].Key)) + assert.Equal(t, "B", string(compacted[1].Key)) + assert.Equal(t, "2", compacted[1].Value.AsString()) // Last value wins + assert.Equal(t, "C", string(compacted[2].Key)) + assert.Equal(t, "D", string(compacted[3].Key)) + + // Verify the computed Distinct matches the baseline from NewSetWithFiltered. + input2 := make([]attribute.KeyValue, len(kvs)) + copy(input2, kvs) + expectedSet, _ := attribute.NewSetWithFiltered(input2, filter) + + assert.Equal(t, expectedSet.Equivalent(), distinct) +} + func TestUniqueness(t *testing.T) { short := []attribute.KeyValue{ attribute.String("A", "0"), diff --git a/internal/tools/go.mod b/internal/tools/go.mod index 19923ed3b03..8647a8d2078 100644 --- a/internal/tools/go.mod +++ b/internal/tools/go.mod @@ -153,7 +153,7 @@ require ( github.com/maratori/testpackage v1.1.2 // indirect github.com/matoous/godox v1.1.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect - github.com/mattn/go-isatty v0.0.21 // indirect + github.com/mattn/go-isatty v0.0.22 // indirect github.com/mattn/go-runewidth v0.0.23 // indirect github.com/mgechev/revive v1.15.0 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect diff --git a/internal/tools/go.sum b/internal/tools/go.sum index d4e7b9e023b..756d2944d72 100644 --- a/internal/tools/go.sum +++ b/internal/tools/go.sum @@ -340,8 +340,8 @@ github.com/matryer/is v1.4.0 h1:sosSmIWwkYITGrxZ25ULNDeKiMNzFSr4V/eqBQP0PeE= github.com/matryer/is v1.4.0/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= -github.com/mattn/go-isatty v0.0.21 h1:xYae+lCNBP7QuW4PUnNG61ffM4hVIfm+zUzDuSzYLGs= -github.com/mattn/go-isatty v0.0.21/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= +github.com/mattn/go-isatty v0.0.22 h1:j8l17JJ9i6VGPUFUYoTUKPSgKe/83EYU2zBC7YNKMw4= +github.com/mattn/go-isatty v0.0.22/go.mod h1:ZXfXG4SQHsB/w3ZeOYbR0PrPwLy+n6xiMrJlRFqopa4= github.com/mattn/go-runewidth v0.0.23 h1:7ykA0T0jkPpzSvMS5i9uoNn2Xy3R383f9HDx3RybWcw= github.com/mattn/go-runewidth v0.0.23/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= github.com/mgechev/revive v1.15.0 h1:vJ0HzSBzfNyPbHKolgiFjHxLek9KUijhqh42yGoqZ8Q= diff --git a/metric/x/bound.go b/metric/x/bound.go new file mode 100644 index 00000000000..be4b3c7f842 --- /dev/null +++ b/metric/x/bound.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x // import "go.opentelemetry.io/otel/metric/x" + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +// Float64Binder is an interface that can be implemented by instruments that support +// binding attributes ahead of time. +type Float64Binder interface { + // Bind returns a metric.Float64Counter for the given attributes. + // The returned counter is bound to the attributes and should be optimized + // for performance by avoiding map lookups on every Add call. + Bind(attrs ...attribute.KeyValue) metric.Float64Counter +} + +// Int64Binder is an interface that can be implemented by instruments that support +// binding attributes ahead of time. +type Int64Binder interface { + // Bind returns a metric.Int64Counter for the given attributes. + // The returned counter is bound to the attributes and should be optimized + // for performance by avoiding map lookups on every Add call. + Bind(attrs ...attribute.KeyValue) metric.Int64Counter +} diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index faaa368c7af..4ec6b4ae27f 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -9,6 +9,7 @@ import ( "runtime" "strconv" "sync" + "sync/atomic" "testing" "github.com/stretchr/testify/assert" @@ -536,9 +537,9 @@ func newRM(a metricdata.Aggregation) *metricdata.ResourceMetrics { // - In the "Naive" case, the user uses the API and SDK in the simplest and // most obvious way without applying any performance optimizations. func BenchmarkEndToEndCounterAdd(b *testing.B) { - testCounter := func(b *testing.B, mp metric.MeterProvider) metric.Float64Counter { + testCounter := func(b *testing.B, mp metric.MeterProvider) metric.Int64Counter { meter := mp.Meter("BenchmarkEndToEndCounterAdd") - counter, err := meter.Float64Counter("test.counter") + counter, err := meter.Int64Counter("test.counter") assert.NoError(b, err) return counter } @@ -551,178 +552,142 @@ func BenchmarkEndToEndCounterAdd(b *testing.B) { }, } ctx := b.Context() - for _, mp := range []struct { - name string - provider func() metric.MeterProvider + for _, temp := range []struct { + name string + temporality metricdata.Temporality }{ - { - name: "NoFilter", - provider: func() metric.MeterProvider { - return NewMeterProvider( - WithReader(NewManualReader()), - WithExemplarFilter(exemplar.AlwaysOffFilter), - ) - }, - }, - { - name: "Filtered", - provider: func() metric.MeterProvider { - view := NewView( - Instrument{ - Name: "test.counter", - }, - // Filter out one attribute from each call. - Stream{AttributeFilter: attribute.NewDenyKeysFilter("a")}, - ) - return NewMeterProvider( - WithView(view), - WithReader(NewManualReader()), - WithExemplarFilter(exemplar.AlwaysOffFilter), - ) - }, - }, + {"Cumulative", metricdata.CumulativeTemporality}, + {"Delta", metricdata.DeltaTemporality}, } { - b.Run(mp.name, func(b *testing.B) { - for _, attrsLen := range []int{1, 5, 10} { - attrPool := sync.Pool{ - New: func() any { - // Pre-allocate common capacity - s := make([]attribute.KeyValue, 0, attrsLen) - // Return a pointer to avoid extra allocation on Put(). - return &s + b.Run(temp.name, func(b *testing.B) { + for _, mp := range []struct { + name string + provider func() metric.MeterProvider + }{ + { + name: "NoFilter", + provider: func() metric.MeterProvider { + return NewMeterProvider( + WithReader(NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality { + return temp.temporality + }))), + WithExemplarFilter(exemplar.AlwaysOffFilter), + ) }, - } - b.Run(fmt.Sprintf("Attributes/%d", attrsLen), func(b *testing.B) { - // This case shows the performance of our API + SDK when - // following our contributor guidance for recording - // cached attributes by passing attribute.Set: - // https://github.com/open-telemetry/opentelemetry-go/blob/main/CONTRIBUTING.md#cache-common-attribute-sets-for-repeated-measurements - b.Run("Precomputed/WithAttributeSet", func(b *testing.B) { - counter := testCounter(b, mp.provider()) - precomputedOpts := []metric.AddOption{ - metric.WithAttributeSet(attribute.NewSet(attributes(attrsLen)...)), - } - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - counter.Add(ctx, 1, precomputedOpts...) - } - }) - }) - // This case shows the performance of our API + SDK when - // following our contributor guidance for recording - // cached attributes by passing []attribute.KeyValue: - // https://github.com/open-telemetry/opentelemetry-go/blob/main/CONTRIBUTING.md#cache-common-attribute-sets-for-repeated-measurements - b.Run("Precomputed/WithAttributes", func(b *testing.B) { - counter := testCounter(b, mp.provider()) - precomputedOpts := []metric.AddOption{metric.WithAttributes(attributes(attrsLen)...)} - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - counter.Add(ctx, 1, precomputedOpts...) - } - }) - }) - // This case shows the performance of our API + SDK when - // following our contributor guidance for recording - // varying attributes by passing attribute.Set: - // https://github.com/open-telemetry/opentelemetry-go/blob/main/CONTRIBUTING.md#attribute-and-option-allocation-management - b.Run("Dynamic/WithAttributeSet", func(b *testing.B) { - counter := testCounter(b, mp.provider()) - b.ReportAllocs() - optionPool := sync.Pool{ + }, + } { + b.Run(mp.name, func(b *testing.B) { + for _, attrsLen := range []int{1, 10} { + attrPool := sync.Pool{ New: func() any { - return metric.WithAttributeSet(*attribute.EmptySet()) + // Pre-allocate common capacity + s := make([]attribute.KeyValue, 0, attrsLen) + // Return a pointer to avoid extra allocation on Put(). + return &s }, } - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - // Wrap in a function so we can use defer. - func() { - attrsSlice := attrPool.Get().(*[]attribute.KeyValue) - defer func() { - *attrsSlice = (*attrsSlice)[:0] // Reset. - attrPool.Put(attrsSlice) - }() - *attrsSlice = appendAttributes(*attrsSlice, attrsLen) - addOpt := addOptPool.Get().(*[]metric.AddOption) - defer func() { - *addOpt = (*addOpt)[:0] - addOptPool.Put(addOpt) - }() - - set := attribute.NewSet(*attrsSlice...) - opt := optionPool.Get().(metric.MeasurementOption) - defer optionPool.Put(opt) - - if s, ok := opt.(x.Settable[attribute.Set]); ok { - s.Set(set) - } else { - opt = metric.WithAttributeSet(set) + b.Run(fmt.Sprintf("Attributes/%d", attrsLen), func(b *testing.B) { + // This case shows the performance of using a pre-bound instrument. + b.Run("Precomputed/Bound", func(b *testing.B) { + counter := testCounter(b, mp.provider()) + if binder, ok := counter.(x.Int64Binder); ok { + bound := binder.Bind(attributes(attrsLen)...) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + bound.Add(ctx, 1) + } + }) + } else { + b.Fatal("counter does not implement x.Binder") + } + }) + // This case shows the performance of our API + SDK when + // following our contributor guidance for recording + // cached attributes by passing attribute.Set: + // https://github.com/open-telemetry/opentelemetry-go/blob/main/CONTRIBUTING.md#cache-common-attribute-sets-for-repeated-measurements + b.Run("Precomputed/WithAttributeSet", func(b *testing.B) { + counter := testCounter(b, mp.provider()) + precomputedOpts := []metric.AddOption{ + metric.WithAttributeSet(attribute.NewSet(attributes(attrsLen)...)), + } + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + counter.Add(ctx, 1, precomputedOpts...) } - - *addOpt = append(*addOpt, opt.(metric.AddOption)) - counter.Add(ctx, 1, *addOpt...) - }() - } - }) - }) - // This case shows the performance of our API + SDK when - // following our contributor guidance for recording - // varying attributes by passing []attribute.KeyValue: - // https://github.com/open-telemetry/opentelemetry-go/blob/main/CONTRIBUTING.md#attribute-and-option-allocation-management - b.Run("Dynamic/WithAttributes", func(b *testing.B) { - counter := testCounter(b, mp.provider()) - b.ReportAllocs() - optionPool := sync.Pool{ - New: func() any { - return metric.WithAttributes() - }, - } - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - // Wrap in a function so we can use defer. - func() { - attrsSlice := attrPool.Get().(*[]attribute.KeyValue) - defer func() { - *attrsSlice = (*attrsSlice)[:0] // Reset. - attrPool.Put(attrsSlice) - }() - *attrsSlice = appendAttributes(*attrsSlice, attrsLen) - addOpt := addOptPool.Get().(*[]metric.AddOption) - defer func() { - *addOpt = (*addOpt)[:0] - addOptPool.Put(addOpt) - }() - - set := attribute.NewSet(*attrsSlice...) - opt := optionPool.Get().(metric.MeasurementOption) - defer optionPool.Put(opt) - - if s, ok := opt.(x.Settable[attribute.Set]); ok { - s.Set(set) - } else { - opt = metric.WithAttributes(*attrsSlice...) + }) + }) + // This case shows the performance of re-binding the instrument each time. + b.Run("Dynamic/Bound", func(b *testing.B) { + counter := testCounter(b, mp.provider()) + if binder, ok := counter.(x.Int64Binder); ok { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Wrap in a function so we can use defer. + func() { + attrsSlice := attrPool.Get().(*[]attribute.KeyValue) + defer func() { + *attrsSlice = (*attrsSlice)[:0] // Reset. + attrPool.Put(attrsSlice) + }() + *attrsSlice = appendAttributes(*attrsSlice, attrsLen) + bound := binder.Bind(*attrsSlice...) + bound.Add(ctx, 1) + }() + } + }) + } else { + b.Fatal("counter does not implement x.Binder") + } + }) + // This case shows the performance of our API + SDK when + // following our contributor guidance for recording + // varying attributes by passing attribute.Set: + // https://github.com/open-telemetry/opentelemetry-go/blob/main/CONTRIBUTING.md#attribute-and-option-allocation-management + b.Run("Dynamic/WithAttributeSet", func(b *testing.B) { + counter := testCounter(b, mp.provider()) + b.ReportAllocs() + optionPool := sync.Pool{ + New: func() any { + return metric.WithAttributeSet(*attribute.EmptySet()) + }, + } + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + // Wrap in a function so we can use defer. + func() { + attrsSlice := attrPool.Get().(*[]attribute.KeyValue) + defer func() { + *attrsSlice = (*attrsSlice)[:0] // Reset. + attrPool.Put(attrsSlice) + }() + *attrsSlice = appendAttributes(*attrsSlice, attrsLen) + addOpt := addOptPool.Get().(*[]metric.AddOption) + defer func() { + *addOpt = (*addOpt)[:0] + addOptPool.Put(addOpt) + }() + + set := attribute.NewSet(*attrsSlice...) + opt := optionPool.Get().(metric.MeasurementOption) + defer optionPool.Put(opt) + + if s, ok := opt.(x.Settable[attribute.Set]); ok { + s.Set(set) + } else { + opt = metric.WithAttributeSet(set) + } + + *addOpt = append(*addOpt, opt.(metric.AddOption)) + counter.Add(ctx, 1, *addOpt...) + }() } - - *addOpt = append(*addOpt, opt.(metric.AddOption)) - counter.Add(ctx, 1, *addOpt...) - }() - } - }) - }) - // This case shows the performance of our API + SDK when - // users use it in the "obvious" way, without explicitly - // trying to optimize for performance. - b.Run("Naive/WithAttributes", func(b *testing.B) { - counter := testCounter(b, mp.provider()) - b.ReportAllocs() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - counter.Add(ctx, 1, metric.WithAttributes(attributes(attrsLen)...)) - } + }) + }) }) - }) + } }) } }) @@ -889,3 +854,15 @@ func BenchmarkMeasureNewAttributeSet(b *testing.B) { }) } } + +// BenchmarkIdealAtomicCounter measures the performance of just incrementing +// an atomic counter in parallel, to establish a baseline for contention. +func BenchmarkIdealAtomicCounter(b *testing.B) { + var count atomic.Int64 + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + count.Add(1) + } + }) +} diff --git a/sdk/metric/bound_test.go b/sdk/metric/bound_test.go new file mode 100644 index 00000000000..7c604641bfe --- /dev/null +++ b/sdk/metric/bound_test.go @@ -0,0 +1,244 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package metric + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/x" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +func TestBoundInstrumentFloat64(t *testing.T) { + attrs := []attribute.KeyValue{attribute.String("K", "V")} + set := attribute.NewSet(attrs...) + + // Test bound instrument (Cumulative) + t.Run("Bound/Cumulative", func(t *testing.T) { + r := NewManualReader() + mp := NewMeterProvider(WithReader(r)) + meter := mp.Meter("test") + + counter, err := meter.Float64Counter("test.counter") + require.NoError(t, err) + + binder, ok := counter.(x.Float64Binder) + require.True(t, ok, "counter does not implement x.Float64Binder") + + bound := binder.Bind(attrs...) + bound.Add(t.Context(), 1) + + var rm metricdata.ResourceMetrics + err = r.Collect(t.Context(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + m := rm.ScopeMetrics[0].Metrics[0] + assert.Equal(t, "test.counter", m.Name) + + sum, ok := m.Data.(metricdata.Sum[float64]) + require.True(t, ok) + require.Len(t, sum.DataPoints, 1) + dp := sum.DataPoints[0] + assert.Equal(t, float64(1), dp.Value) + assert.Equal(t, set, dp.Attributes) + }) + + // Test bound instrument (Delta) + t.Run("Bound/Delta", func(t *testing.T) { + r := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + })) + mp := NewMeterProvider(WithReader(r)) + meter := mp.Meter("test") + + counter, err := meter.Float64Counter("test.counter") + require.NoError(t, err) + + binder, ok := counter.(x.Float64Binder) + require.True(t, ok, "counter does not implement x.Float64Binder") + + bound := binder.Bind(attrs...) + bound.Add(t.Context(), 1) + + var rm metricdata.ResourceMetrics + err = r.Collect(t.Context(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + sum := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[float64]) + assert.Equal(t, float64(1), sum.DataPoints[0].Value) + + // Record again on the bound instrument! + bound.Add(t.Context(), 2) + + // Collect again. The value should be 2 (Delta!), not 3 (Cumulative!). + err = r.Collect(t.Context(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + sum = rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[float64]) + assert.Equal(t, float64(2), sum.DataPoints[0].Value) + }) +} + +func TestBoundInstrumentInt64(t *testing.T) { + attrs := []attribute.KeyValue{attribute.String("K", "V")} + set := attribute.NewSet(attrs...) + + // Test bound instrument (Cumulative) + t.Run("Bound/Cumulative", func(t *testing.T) { + r := NewManualReader() + mp := NewMeterProvider(WithReader(r)) + meter := mp.Meter("test") + + counter, err := meter.Int64Counter("test.counter") + require.NoError(t, err) + + binder, ok := counter.(x.Int64Binder) + require.True(t, ok, "counter does not implement x.Int64Binder") + + bound := binder.Bind(attrs...) + bound.Add(t.Context(), 1) + + var rm metricdata.ResourceMetrics + err = r.Collect(t.Context(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + m := rm.ScopeMetrics[0].Metrics[0] + assert.Equal(t, "test.counter", m.Name) + + sum, ok := m.Data.(metricdata.Sum[int64]) + require.True(t, ok) + require.Len(t, sum.DataPoints, 1) + dp := sum.DataPoints[0] + assert.Equal(t, int64(1), dp.Value) + assert.Equal(t, set, dp.Attributes) + }) + + // Test bound instrument (Delta) + t.Run("Bound/Delta", func(t *testing.T) { + r := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + })) + mp := NewMeterProvider(WithReader(r)) + meter := mp.Meter("test") + + counter, err := meter.Int64Counter("test.counter") + require.NoError(t, err) + + binder, ok := counter.(x.Int64Binder) + require.True(t, ok, "counter does not implement x.Int64Binder") + + bound := binder.Bind(attrs...) + bound.Add(t.Context(), 1) + + var rm metricdata.ResourceMetrics + err = r.Collect(t.Context(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + sum := rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]) + assert.Equal(t, int64(1), sum.DataPoints[0].Value) + + // Record again on the bound instrument! + bound.Add(t.Context(), 2) + + // Collect again. The value should be 2 (Delta!), not 3 (Cumulative!). + err = r.Collect(t.Context(), &rm) + require.NoError(t, err) + + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 1) + sum = rm.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]) + assert.Equal(t, int64(2), sum.DataPoints[0].Value) + }) +} + +func TestBoundInstrumentMultipleReaders(t *testing.T) { + attrs := []attribute.KeyValue{attribute.String("K", "V")} + r1 := NewManualReader() + r2 := NewManualReader() + mp := NewMeterProvider(WithReader(r1), WithReader(r2)) + meter := mp.Meter("test") + + counter, err := meter.Int64Counter("test.counter") + require.NoError(t, err) + + binder, ok := counter.(x.Int64Binder) + require.True(t, ok, "counter does not implement x.Int64Binder") + + // This triggers the "slow path" in Bind since len(aggregators) > 1 + bound := binder.Bind(attrs...) + bound.Add(t.Context(), 1) + + // Verify Reader 1 + var rm1 metricdata.ResourceMetrics + require.NoError(t, r1.Collect(t.Context(), &rm1)) + require.Len(t, rm1.ScopeMetrics, 1) + sum1 := rm1.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]) + assert.Equal(t, int64(1), sum1.DataPoints[0].Value) + + // Verify Reader 2 + var rm2 metricdata.ResourceMetrics + require.NoError(t, r2.Collect(t.Context(), &rm2)) + require.Len(t, rm2.ScopeMetrics, 1) + sum2 := rm2.ScopeMetrics[0].Metrics[0].Data.(metricdata.Sum[int64]) + assert.Equal(t, int64(1), sum2.DataPoints[0].Value) +} + +func TestBoundInstrumentDeltaConcurrency(t *testing.T) { + r := NewManualReader(WithTemporalitySelector(func(InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + })) + mp := NewMeterProvider(WithReader(r)) + meter := mp.Meter("test") + + counter, err := meter.Int64Counter("test.counter") + require.NoError(t, err) + + binder, ok := counter.(x.Int64Binder) + require.True(t, ok, "counter does not implement x.Int64Binder") + + bound := binder.Bind(attribute.String("K", "V")) + + // Number of goroutines and operations per goroutine + numWorkers := 10 + opsPerWorker := 1000 + + var wg sync.WaitGroup + wg.Add(numWorkers) + + ctx := t.Context() + for range numWorkers { + go func() { + defer wg.Done() + for j := range opsPerWorker { + bound.Add(ctx, 1) + // Occasionally collect to trigger Delta map clears and cycle pointer resets + if j%100 == 0 { + var rm metricdata.ResourceMetrics + _ = r.Collect(ctx, &rm) + } + } + }() + } + + wg.Wait() + + // Final collection to ensure no panic occurs during cleanup. + var rm metricdata.ResourceMetrics + require.NoError(t, r.Collect(ctx, &rm)) +} diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index b0805255926..01ba2f8241f 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -177,7 +177,8 @@ func (i instID) normalize() instID { } type int64Inst struct { - measures []aggregate.Measure[int64] + measures []aggregate.Measure[int64] + aggregators []any // Used to coordinate with experimental bound instruments embedded.Int64Counter embedded.Int64UpDownCounter @@ -217,7 +218,8 @@ func (i *int64Inst) aggregate( } type float64Inst struct { - measures []aggregate.Measure[float64] + measures []aggregate.Measure[float64] + aggregators []any // Used to coordinate with experimental bound instruments embedded.Float64Counter embedded.Float64UpDownCounter @@ -363,3 +365,95 @@ func (o *observable[N]) registerable(m *meter) error { } return nil } + +type float64MeasureBinder interface { + LookupBoundMeasure([]attribute.KeyValue) metric.Float64Counter +} + +type int64MeasureBinder interface { + LookupBoundMeasureInt64([]attribute.KeyValue) metric.Int64Counter +} + +// boundFloat64Counter implements metric.Float64Counter using resolved measures. +// This is the slow-path wrapper for instruments with multiple aggregators. +type boundFloat64Counter struct { + embedded.Float64Counter + measures []metric.Float64Counter +} + +func (b *boundFloat64Counter) Add(ctx context.Context, val float64, opts ...metric.AddOption) { + for _, m := range b.measures { + m.Add(ctx, val, opts...) + } +} + +func (*boundFloat64Counter) Enabled(_ context.Context) bool { + return true +} + +// boundInt64Counter implements metric.Int64Counter using resolved measures. +// This is the slow-path wrapper for instruments with multiple aggregators. +type boundInt64Counter struct { + embedded.Int64Counter + measures []metric.Int64Counter +} + +func (b *boundInt64Counter) Add(ctx context.Context, val int64, opts ...metric.AddOption) { + for _, m := range b.measures { + m.Add(ctx, val, opts...) + } +} + +func (*boundInt64Counter) Enabled(_ context.Context) bool { + return true +} + +// Bind implements x.Binder for float64Inst. +func (i *float64Inst) Bind(attrs ...attribute.KeyValue) metric.Float64Counter { + // Fast path: if there is only one aggregator (common case), we can avoid + // allocating the measures slice and the boundFloat64Counter wrapper. + if len(i.aggregators) == 1 { + if b, ok := i.aggregators[0].(float64MeasureBinder); ok { + if m := b.LookupBoundMeasure(attrs); m != nil { + return m + } + } + } + + // Slow path: multiple aggregators. + var measures []metric.Float64Counter + for _, agg := range i.aggregators { + if b, ok := agg.(float64MeasureBinder); ok { + m := b.LookupBoundMeasure(attrs) + if m != nil { + measures = append(measures, m) + } + } + } + return &boundFloat64Counter{measures: measures} +} + +// Bind implements x.Binder for int64Inst. +func (i *int64Inst) Bind(attrs ...attribute.KeyValue) metric.Int64Counter { + // Fast path: if there is only one aggregator (common case), we can avoid + // allocating the measures slice and the boundInt64Counter wrapper. + if len(i.aggregators) == 1 { + if b, ok := i.aggregators[0].(int64MeasureBinder); ok { + if m := b.LookupBoundMeasureInt64(attrs); m != nil { + return m + } + } + } + + // Slow path: multiple aggregators. + var measures []metric.Int64Counter + for _, agg := range i.aggregators { + if b, ok := agg.(int64MeasureBinder); ok { + m := b.LookupBoundMeasureInt64(attrs) + if m != nil { + measures = append(measures, m) + } + } + } + return &boundInt64Counter{measures: measures} +} diff --git a/sdk/metric/instrument_test.go b/sdk/metric/instrument_test.go index 52d39b753f9..8e7c754dbb6 100644 --- a/sdk/metric/instrument_test.go +++ b/sdk/metric/instrument_test.go @@ -33,11 +33,11 @@ func BenchmarkInstrument(b *testing.B) { meas = append(meas, in) build.Temporality = metricdata.CumulativeTemporality - in, _ = build.Sum(true) + in, _, _ = build.Sum(true) meas = append(meas, in) build.Temporality = metricdata.DeltaTemporality - in, _ = build.Sum(true) + in, _, _ = build.Sum(true) meas = append(meas, in) inst := int64Inst{measures: meas} @@ -58,11 +58,11 @@ func BenchmarkInstrument(b *testing.B) { meas = append(meas, in) build.Temporality = metricdata.CumulativeTemporality - in, _ = build.Sum(true) + in, _, _ = build.Sum(true) meas = append(meas, in) build.Temporality = metricdata.DeltaTemporality - in, _ = build.Sum(true) + in, _, _ = build.Sum(true) meas = append(meas, in) o := observable[int64]{measures: meas} diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index a35f3bcc1bb..b62db7cf04e 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -109,15 +109,15 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati } } -// Sum returns a sum aggregate function input and output. -func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) { +// Sum returns a sum aggregate function input and output, and the aggregator instance. +func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation, any) { switch b.Temporality { case metricdata.DeltaTemporality: s := newDeltaSum[N](monotonic, b.AggregationLimit, b.resFunc()) - return b.filter(s.measure), s.collect + return b.filter(s.measure), s.collect, s default: s := newCumulativeSum[N](monotonic, b.AggregationLimit, b.resFunc()) - return b.filter(s.measure), s.collect + return b.filter(s.measure), s.collect, s } } diff --git a/sdk/metric/internal/aggregate/atomic.go b/sdk/metric/internal/aggregate/atomic.go index eb69e965079..ab1d40d48eb 100644 --- a/sdk/metric/internal/aggregate/atomic.go +++ b/sdk/metric/internal/aggregate/atomic.go @@ -261,6 +261,10 @@ func (m *limitedSyncMap) LoadOrStoreAttr(fltrAttr attribute.Set, newValue func(a return actual } +func (m *limitedSyncMap) LoadByDistinct(d attribute.Distinct) (any, bool) { + return m.Load(d) +} + func (m *limitedSyncMap) Clear() { m.lenMux.Lock() defer m.lenMux.Unlock() diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index 312d73c4575..767b1d6dc7d 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -26,8 +26,9 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { - attrs attribute.Set - res FilteredExemplarReservoir[N] + attrs attribute.Set + res FilteredExemplarReservoir[N] + dropExemplars bool minMax atomicMinMax[N] sum atomicCounter[N] @@ -349,13 +350,18 @@ func (e *expoHistogram[N]) measure( v, ok = e.values[fltrAttr.Equivalent()] if !ok { v = newExpoHistogramDataPoint[N](fltrAttr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) - v.res = e.newRes(fltrAttr) + r := e.newRes(fltrAttr) + _, isDrop := r.(*dropRes[N]) + v.res = r + v.dropExemplars = isDrop e.values[fltrAttr.Equivalent()] = v } } v.record(value) - v.res.Offer(ctx, value, droppedAttr) + if !v.dropExemplars { + v.res.Offer(ctx, value, droppedAttr) + } } func (e *expoHistogram[N]) delta( diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 83582c670cd..50c82ff32f9 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -17,8 +17,9 @@ import ( // histogramPoint is a single histogram point, used in delta aggregations. type histogramPoint[N int64 | float64] struct { - attrs attribute.Set - res FilteredExemplarReservoir[N] + attrs attribute.Set + res FilteredExemplarReservoir[N] + dropExemplars bool histogramPointCounters[N] } @@ -28,9 +29,10 @@ type hotColdHistogramPoint[N int64 | float64] struct { hcwg hotColdWaitGroup hotColdPoint [2]histogramPointCounters[N] - attrs attribute.Set - res FilteredExemplarReservoir[N] - startTime time.Time + attrs attribute.Set + res FilteredExemplarReservoir[N] + startTime time.Time + dropExemplars bool } // histogramPointCounters contains only the atomic counter data, and is used by @@ -113,9 +115,12 @@ func (s *deltaHistogram[N]) measure( hotIdx := s.hcwg.start() defer s.hcwg.done(hotIdx) h := s.hotColdValMap[hotIdx].LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := s.newRes(attr) + _, isDrop := r.(*dropRes[N]) hPt := &histogramPoint[N]{ - res: s.newRes(attr), - attrs: attr, + res: r, + attrs: attr, + dropExemplars: isDrop, // N+1 buckets. For example: // // bounds = [0, 5, 10] @@ -141,7 +146,9 @@ func (s *deltaHistogram[N]) measure( if !s.noSum { h.total.add(value) } - h.res.Offer(ctx, value, droppedAttr) + if !h.dropExemplars { + h.res.Offer(ctx, value, droppedAttr) + } } // newDeltaHistogram returns a histogram that is reset each time it is @@ -282,9 +289,13 @@ func (s *cumulativeHistogram[N]) measure( droppedAttr []attribute.KeyValue, ) { h := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := s.newRes(attr) + _, isDrop := r.(*dropRes[N]) hPt := &hotColdHistogramPoint[N]{ - res: s.newRes(attr), - attrs: attr, + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, // N+1 buckets. For example: // // bounds = [0, 5, 10] @@ -300,7 +311,6 @@ func (s *cumulativeHistogram[N]) measure( counts: make([]atomic.Uint64, len(s.bounds)+1), }, }, - startTime: now(), } return hPt }).(*hotColdHistogramPoint[N]) @@ -322,7 +332,9 @@ func (s *cumulativeHistogram[N]) measure( if !s.noSum { h.hotColdPoint[hotIdx].total.add(value) } - h.res.Offer(ctx, value, droppedAttr) + if !h.dropExemplars { + h.res.Offer(ctx, value, droppedAttr) + } } func (s *cumulativeHistogram[N]) collect( diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 4c004bc99d8..2af252ec5df 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -14,10 +14,11 @@ import ( // lastValuePoint is timestamped measurement data. type lastValuePoint[N int64 | float64] struct { - attrs attribute.Set - value atomicN[N] - res FilteredExemplarReservoir[N] - startTime time.Time + attrs attribute.Set + value atomicN[N] + res FilteredExemplarReservoir[N] + startTime time.Time + dropExemplars bool } // lastValueMap summarizes a set of measurements as the last one made. @@ -33,17 +34,22 @@ func (s *lastValueMap[N]) measure( droppedAttr []attribute.KeyValue, ) { lv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := s.newRes(attr) + _, isDrop := r.(*dropRes[N]) p := &lastValuePoint[N]{ - res: s.newRes(attr), - attrs: attr, - startTime: now(), + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, } p.value.Store(value) return p }).(*lastValuePoint[N]) lv.value.Store(value) - lv.res.Offer(ctx, value, droppedAttr) + if !lv.dropExemplars { + lv.res.Offer(ctx, value, droppedAttr) + } } func newDeltaLastValue[N int64 | float64]( diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 3fe7c7cf046..14ee64bdce7 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -5,18 +5,30 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg import ( "context" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/internal/x" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type sumValue[N int64 | float64] struct { - n atomicCounter[N] - res FilteredExemplarReservoir[N] - attrs attribute.Set - startTime time.Time + n atomicCounter[N] + res FilteredExemplarReservoir[N] + attrs attribute.Set + startTime time.Time + dropExemplars bool + + // boundFloat64 caches the bound instrument to avoid allocations on the fast path. + // It is only populated when N is float64. + boundFloat64 metric.Float64Counter + + // boundInt64 caches the bound instrument to avoid allocations on the fast path. + // It is only populated when N is int64. + boundInt64 metric.Int64Counter } type sumValueMap[N int64 | float64] struct { @@ -31,17 +43,56 @@ func (s *sumValueMap[N]) measure( droppedAttr []attribute.KeyValue, ) { sv := s.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := s.newRes(attr) + _, isDrop := r.(*dropRes[N]) return &sumValue[N]{ - res: s.newRes(attr), - attrs: attr, - startTime: now(), + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, } }).(*sumValue[N]) sv.n.add(value) // It is possible for collection to race with measurement and observe the // exemplar in the batch of metrics after the add() for cumulative sums. // This is an accepted tradeoff to avoid locking during measurement. - sv.res.Offer(ctx, value, droppedAttr) + if !sv.dropExemplars { + sv.res.Offer(ctx, value, droppedAttr) + } +} + +// boundFloat64SumValue implements metric.Float64Counter for a specific sumValue. +type boundFloat64SumValue struct { + sv *sumValue[float64] + embedded.Float64Counter +} + +func (b *boundFloat64SumValue) Add(ctx context.Context, val float64, _ ...metric.AddOption) { + b.sv.n.add(val) + if !b.sv.dropExemplars { + b.sv.res.Offer(ctx, val, nil) + } +} + +func (*boundFloat64SumValue) Enabled(_ context.Context) bool { + return true +} + +// boundInt64SumValue implements metric.Int64Counter for a specific sumValue. +type boundInt64SumValue struct { + sv *sumValue[int64] + embedded.Int64Counter +} + +func (b *boundInt64SumValue) Add(ctx context.Context, val int64, _ ...metric.AddOption) { + b.sv.n.add(val) + if !b.sv.dropExemplars { + b.sv.res.Offer(ctx, val, nil) + } +} + +func (*boundInt64SumValue) Enabled(_ context.Context) bool { + return true } // newDeltaSum returns an aggregator that summarizes a set of measurements as @@ -75,6 +126,7 @@ type deltaSum[N int64 | float64] struct { hcwg hotColdWaitGroup hotColdValMap [2]sumValueMap[N] + cycle atomic.Uint64 // Used to detect collection cycles for bound instruments } func (s *deltaSum[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { @@ -96,6 +148,7 @@ func (s *deltaSum[N]) collect( // delta always clears values on collection readIdx := s.hcwg.swapHotAndWait() + s.cycle.Add(1) // Increment cycle counter // The len will not change while we iterate over values, since we waited // for all writes to finish to the cold values and len. n := s.hotColdValMap[readIdx].values.Len() @@ -195,6 +248,76 @@ func (s *cumulativeSum[N]) collect( return i } +// LookupBoundMeasure returns a Float64Counter that can be used to record measurements +// for the given attributes without map lookups. +func (s *cumulativeSum[N]) LookupBoundMeasure(attrs []attribute.KeyValue) metric.Float64Counter { + sFloat, ok := any(s).(*cumulativeSum[float64]) + if !ok { + return nil + } + + // This call does not allocate. It sorts and de-duplicates the attrs slice in-place + // and computes the hash based on the aggregator's filter. + d, compacted := attribute.NewDistinctWithFilter(attrs, nil) + var sv *sumValue[float64] + if actual, loaded := sFloat.values.LoadByDistinct(d); loaded { + sv = actual.(*sumValue[float64]) + } else { + // Cache miss: create the Set and use LoadOrStoreAttr. + fltrAttr := attribute.NewSet(compacted...) + sv = sFloat.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := sFloat.newRes(attr) + _, isDrop := r.(*dropRes[float64]) + newSV := &sumValue[float64]{ + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, + } + // Pre-allocate the bound instrument wrapper to avoid allocations on cache hit. + newSV.boundFloat64 = &boundFloat64SumValue{sv: newSV} + return newSV + }).(*sumValue[float64]) + } + + return sv.boundFloat64 +} + +// LookupBoundMeasureInt64 returns an Int64Counter that can be used to record measurements +// for the given attributes without map lookups. +func (s *cumulativeSum[N]) LookupBoundMeasureInt64(attrs []attribute.KeyValue) metric.Int64Counter { + sInt, ok := any(s).(*cumulativeSum[int64]) + if !ok { + return nil + } + + // This call does not allocate. It sorts and de-duplicates the attrs slice in-place + // and computes the hash based on the aggregator's filter. + d, compacted := attribute.NewDistinctWithFilter(attrs, nil) + var sv *sumValue[int64] + if actual, loaded := sInt.values.LoadByDistinct(d); loaded { + sv = actual.(*sumValue[int64]) + } else { + // Cache miss: create the Set and use LoadOrStoreAttr. + fltrAttr := attribute.NewSet(compacted...) + sv = sInt.values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := sInt.newRes(attr) + _, isDrop := r.(*dropRes[int64]) + newSV := &sumValue[int64]{ + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, + } + // Pre-allocate the bound instrument wrapper to avoid allocations on cache hit. + newSV.boundInt64 = &boundInt64SumValue{sv: newSV} + return newSV + }).(*sumValue[int64]) + } + + return sv.boundInt64 +} + // newPrecomputedSum returns an aggregator that summarizes a set of // observations as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. @@ -296,3 +419,229 @@ func (s *precomputedSum[N]) cumulative( return i } + +// lookupBoundStorage returns the storage and current cycle for the given attributes. +// It looks up in the current hot map. +func (s *deltaSum[N]) lookupBoundStorage(fltrAttr attribute.Set) (*sumValue[N], uint64) { + hotIdx := s.hcwg.start() + defer s.hcwg.done(hotIdx) + sv := s.hotColdValMap[hotIdx].values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := s.hotColdValMap[hotIdx].newRes(attr) + _, isDrop := r.(*dropRes[N]) + return &sumValue[N]{ + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, + } + }).(*sumValue[N]) + return sv, s.cycle.Load() +} + +// boundDeltaFloat64Counter implements metric.Float64Counter for Delta temporality. +type boundDeltaFloat64Counter struct { + embedded.Float64Counter + agg *deltaSum[float64] + attrs attribute.Set + storage atomic.Pointer[sumValue[float64]] + cycle atomic.Uint64 +} + +func (b *boundDeltaFloat64Counter) Add(ctx context.Context, val float64, _ ...metric.AddOption) { + // Check cycle + currentCycle := b.agg.cycle.Load() + if currentCycle == b.cycle.Load() { + sv := b.storage.Load() + if sv != nil { + sv.n.add(val) + sv.res.Offer(ctx, val, nil) + return + } + } + // Slow path: re-lookup + sv, cycle := b.agg.lookupBoundStorage(b.attrs) + b.storage.Store(sv) + b.cycle.Store(cycle) + sv.n.add(val) + if !sv.dropExemplars { + sv.res.Offer(ctx, val, nil) + } +} + +func (*boundDeltaFloat64Counter) Enabled(_ context.Context) bool { + return true +} + +// boundDeltaInt64Counter implements metric.Int64Counter for Delta temporality. +type boundDeltaInt64Counter struct { + embedded.Int64Counter + agg *deltaSum[int64] + attrs attribute.Set + storage atomic.Pointer[sumValue[int64]] + cycle atomic.Uint64 +} + +func (b *boundDeltaInt64Counter) Add(ctx context.Context, val int64, _ ...metric.AddOption) { + // Check cycle + currentCycle := b.agg.cycle.Load() + if currentCycle == b.cycle.Load() { + sv := b.storage.Load() + if sv != nil { + sv.n.add(val) + if !sv.dropExemplars { + sv.res.Offer(ctx, val, nil) + } + return + } + } + // Slow path: re-lookup + sv, cycle := b.agg.lookupBoundStorage(b.attrs) + b.storage.Store(sv) + b.cycle.Store(cycle) + sv.n.add(val) + if !sv.dropExemplars { + sv.res.Offer(ctx, val, nil) + } +} + +func (*boundDeltaInt64Counter) Enabled(_ context.Context) bool { + return true +} + +// LookupBoundMeasure returns a Float64Counter that can be used to record measurements +// for the given attributes without map lookups. +// It is used by bound instruments to handle Delta temporality correctly. +func (s *deltaSum[N]) LookupBoundMeasure(attrs []attribute.KeyValue) metric.Float64Counter { + // The benchmark only uses float64, so we only implement it for float64 for now. + // If N is int64, it returns nil or a default measure. + sFloat, ok := any(s).(*deltaSum[float64]) + if !ok { + return nil + } + + d, compacted := attribute.NewDistinctWithFilter(attrs, nil) + + hotIdx := sFloat.hcwg.start() + defer sFloat.hcwg.done(hotIdx) + + actual, loaded := sFloat.hotColdValMap[hotIdx].values.LoadByDistinct(d) + if loaded { + sv := actual.(*sumValue[float64]) + // If the entry was created by a previous Bind call, it has the wrapper cached. + if sv.boundFloat64 != nil { + return sv.boundFloat64 + } + // Fallback: entry was created by an unbound Add, allocate the wrapper. + b := &boundDeltaFloat64Counter{ + agg: sFloat, + attrs: sv.attrs, + } + b.storage.Store(sv) + b.cycle.Store(sFloat.cycle.Load()) + return b + } + + // Cache miss: create the Set and sumValue with the bound instrument cached. + fltrAttr := attribute.NewSet(compacted...) + sv := sFloat.hotColdValMap[hotIdx].values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := sFloat.hotColdValMap[hotIdx].newRes(attr) + _, isDrop := r.(*dropRes[float64]) + newSV := &sumValue[float64]{ + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, + } + + b := &boundDeltaFloat64Counter{ + agg: sFloat, + attrs: attr, + } + b.storage.Store(newSV) + b.cycle.Store(sFloat.cycle.Load()) + + newSV.boundFloat64 = b + return newSV + }).(*sumValue[float64]) + + if sv.boundFloat64 != nil { + return sv.boundFloat64 + } + + // Fallback: LoadOrStoreAttr loaded an entry created concurrently by an unbound Add. + b := &boundDeltaFloat64Counter{ + agg: sFloat, + attrs: sv.attrs, + } + b.storage.Store(sv) + b.cycle.Store(sFloat.cycle.Load()) + return b +} + +// LookupBoundMeasureInt64 returns an Int64Counter that can be used to record measurements +// for the given attributes without map lookups. +// It is used by bound instruments to handle Delta temporality correctly. +func (s *deltaSum[N]) LookupBoundMeasureInt64(attrs []attribute.KeyValue) metric.Int64Counter { + sInt, ok := any(s).(*deltaSum[int64]) + if !ok { + return nil + } + + d, compacted := attribute.NewDistinctWithFilter(attrs, nil) + + hotIdx := sInt.hcwg.start() + defer sInt.hcwg.done(hotIdx) + + actual, loaded := sInt.hotColdValMap[hotIdx].values.LoadByDistinct(d) + if loaded { + sv := actual.(*sumValue[int64]) + // If the entry was created by a previous Bind call, it has the wrapper cached. + if sv.boundInt64 != nil { + return sv.boundInt64 + } + // Fallback: entry was created by an unbound Add, allocate the wrapper. + b := &boundDeltaInt64Counter{ + agg: sInt, + attrs: sv.attrs, + } + b.storage.Store(sv) + b.cycle.Store(sInt.cycle.Load()) + return b + } + + // Cache miss: create the Set and sumValue with the bound instrument cached. + fltrAttr := attribute.NewSet(compacted...) + sv := sInt.hotColdValMap[hotIdx].values.LoadOrStoreAttr(fltrAttr, func(attr attribute.Set) any { + r := sInt.hotColdValMap[hotIdx].newRes(attr) + _, isDrop := r.(*dropRes[int64]) + newSV := &sumValue[int64]{ + res: r, + attrs: attr, + startTime: now(), + dropExemplars: isDrop, + } + + b := &boundDeltaInt64Counter{ + agg: sInt, + attrs: attr, + } + b.storage.Store(newSV) + b.cycle.Store(sInt.cycle.Load()) + + newSV.boundInt64 = b + return newSV + }).(*sumValue[int64]) + + if sv.boundInt64 != nil { + return sv.boundInt64 + } + + // Fallback: LoadOrStoreAttr loaded an entry created concurrently by an unbound Add. + b := &boundDeltaInt64Counter{ + agg: sInt, + attrs: sv.attrs, + } + b.storage.Store(sv) + b.cycle.Store(sInt.cycle.Load()) + return b +} diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index da5c3a7aa25..7d2b2230100 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -67,7 +67,7 @@ func TestSum(t *testing.T) { func testDeltaSum[N int64 | float64]() func(t *testing.T) { mono := false - in, out := Builder[N]{ + in, out, _ := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, @@ -195,7 +195,7 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { func testCumulativeSum[N int64 | float64]() func(t *testing.T) { mono := false - in, out := Builder[N]{ + in, out, _ := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, @@ -622,7 +622,7 @@ func validateSum[N int64 | float64](isPrecomputed bool) func(t *testing.T, aggs } func testDeltaSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { - in, out := Builder[N]{ + in, out, _ := Builder[N]{ Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, @@ -631,7 +631,7 @@ func testDeltaSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { } func testCumulativeSumConcurrentSafe[N int64 | float64]() func(t *testing.T) { - in, out := Builder[N]{ + in, out, _ := Builder[N]{ Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, @@ -662,24 +662,28 @@ func BenchmarkSum(b *testing.B) { // the Aggregation method. It should not have an effect on operational // performance, therefore, only monotonic=false is benchmarked here. b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { - return Builder[int64]{ + m, c, _ := Builder[int64]{ Temporality: metricdata.CumulativeTemporality, }.Sum(false) + return m, c })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { - return Builder[int64]{ + m, c, _ := Builder[int64]{ Temporality: metricdata.DeltaTemporality, }.Sum(false) + return m, c })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { - return Builder[float64]{ + m, c, _ := Builder[float64]{ Temporality: metricdata.CumulativeTemporality, }.Sum(false) + return m, c })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { - return Builder[float64]{ + m, c, _ := Builder[float64]{ Temporality: metricdata.DeltaTemporality, }.Sum(false) + return m, c })) b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { diff --git a/sdk/metric/meter.go b/sdk/metric/meter.go index 968a3076415..72c4974253d 100644 --- a/sdk/metric/meter.go +++ b/sdk/metric/meter.go @@ -147,7 +147,7 @@ func (m *meter) int64ObservableInstrument( for _, insert := range m.int64Resolver.inserters { // Connect the measure functions for instruments in this pipeline with the // callbacks for this pipeline. - in, err := insert.Instrument(id, allowedKeys, insert.readerDefaultAggregation(id.Kind)) + in, _, err := insert.Instrument(id, allowedKeys, insert.readerDefaultAggregation(id.Kind)) if err != nil { return inst, err } @@ -330,7 +330,7 @@ func (m *meter) float64ObservableInstrument( for _, insert := range m.float64Resolver.inserters { // Connect the measure functions for instruments in this pipeline with the // callbacks for this pipeline. - in, err := insert.Instrument(id, allowedKeys, insert.readerDefaultAggregation(id.Kind)) + in, _, err := insert.Instrument(id, allowedKeys, insert.readerDefaultAggregation(id.Kind)) if err != nil { return inst, err } @@ -643,7 +643,7 @@ func (p int64InstProvider) aggs( kind InstrumentKind, name, desc, u string, allowedKeys []attribute.Key, -) ([]aggregate.Measure[int64], error) { +) ([]aggregate.Measure[int64], []any, error) { inst := Instrument{ Name: name, Description: desc, @@ -672,7 +672,7 @@ func (p int64InstProvider) histogramAggs( Kind: InstrumentKindHistogram, Scope: p.scope, } - measures, err := p.int64Resolver.HistogramAggregators(inst, allowedKeys, boundaries) + measures, _, err := p.int64Resolver.HistogramAggregators(inst, allowedKeys, boundaries) return measures, errors.Join(aggError, err) } @@ -688,8 +688,8 @@ func (p int64InstProvider) lookup( Unit: u, Kind: kind, }, func() (*int64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u, allowedKeys) - return &int64Inst{measures: aggs}, err + aggs, instances, err := p.aggs(kind, name, desc, u, allowedKeys) + return &int64Inst{measures: aggs, aggregators: instances}, err }) } @@ -717,7 +717,7 @@ func (p float64InstProvider) aggs( kind InstrumentKind, name, desc, u string, allowedKeys []attribute.Key, -) ([]aggregate.Measure[float64], error) { +) ([]aggregate.Measure[float64], []any, error) { inst := Instrument{ Name: name, Description: desc, @@ -746,7 +746,7 @@ func (p float64InstProvider) histogramAggs( Kind: InstrumentKindHistogram, Scope: p.scope, } - measures, err := p.float64Resolver.HistogramAggregators(inst, allowedKeys, boundaries) + measures, _, err := p.float64Resolver.HistogramAggregators(inst, allowedKeys, boundaries) return measures, errors.Join(aggError, err) } @@ -762,8 +762,8 @@ func (p float64InstProvider) lookup( Unit: u, Kind: kind, }, func() (*float64Inst, error) { - aggs, err := p.aggs(kind, name, desc, u, allowedKeys) - return &float64Inst{measures: aggs}, err + aggs, instances, err := p.aggs(kind, name, desc, u, allowedKeys) + return &float64Inst{measures: aggs, aggregators: instances}, err }) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 49a10abab27..e51a46dc78c 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -241,10 +241,11 @@ func (i *inserter[N]) Instrument( inst Instrument, allowedKeys []attribute.Key, readerAggregation Aggregation, -) ([]aggregate.Measure[N], error) { +) ([]aggregate.Measure[N], []any, error) { var ( - matched bool - measures []aggregate.Measure[N] + matched bool + measures []aggregate.Measure[N] + instances []any ) var err error @@ -255,7 +256,7 @@ func (i *inserter[N]) Instrument( continue } matched = true - in, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) + in, aggInst, id, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) if e != nil { err = errors.Join(err, e) } @@ -268,6 +269,7 @@ func (i *inserter[N]) Instrument( } seen[id] = struct{}{} measures = append(measures, in) + instances = append(instances, aggInst) } if err != nil { @@ -275,7 +277,7 @@ func (i *inserter[N]) Instrument( } if matched { - return measures, err + return measures, instances, err } // Apply implicit default view if no explicit matched. @@ -290,7 +292,7 @@ func (i *inserter[N]) Instrument( if allowedKeys != nil { stream.AttributeFilter = attribute.NewAllowKeysFilter(allowedKeys...) } - in, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) + in, aggInst, _, e := i.cachedAggregator(inst.Scope, inst.Kind, stream, readerAggregation) if e != nil { if err == nil { err = errCreatingAggregators @@ -300,8 +302,9 @@ func (i *inserter[N]) Instrument( if in != nil { // Ensured to have not seen given matched was false. measures = append(measures, in) + instances = append(instances, aggInst) } - return measures, err + return measures, instances, err } // addCallback registers a single instrument callback to be run when @@ -318,6 +321,7 @@ var aggIDCount atomic.Uint64 type aggVal[N int64 | float64] struct { ID uint64 Measure aggregate.Measure[N] + AggInst any Err error } @@ -365,7 +369,7 @@ func (i *inserter[N]) cachedAggregator( kind InstrumentKind, stream Stream, readerAggregation Aggregation, -) (meas aggregate.Measure[N], aggID uint64, err error) { +) (meas aggregate.Measure[N], aggInst any, aggID uint64, err error) { switch stream.Aggregation.(type) { case nil: // The aggregation was not overridden with a view. Use the aggregation @@ -380,7 +384,7 @@ func (i *inserter[N]) cachedAggregator( } if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil { - return nil, 0, fmt.Errorf( + return nil, nil, 0, fmt.Errorf( "creating aggregator with instrumentKind: %d, aggregation %v: %w", kind, stream.Aggregation, err, ) @@ -407,12 +411,12 @@ func (i *inserter[N]) cachedAggregator( // A value less than or equal to zero will disable the aggregation // limits for the builder (an all the created aggregates). b.AggregationLimit = i.getCardinalityLimit(kind) - in, out, err := i.aggregateFunc(b, stream.Aggregation, kind) + in, out, aggInst, err := i.aggregateFunc(b, stream.Aggregation, kind) if err != nil { - return aggVal[N]{0, nil, err} + return aggVal[N]{ID: 0, Measure: nil, AggInst: nil, Err: err} } if in == nil { // Drop aggregator. - return aggVal[N]{0, nil, nil} + return aggVal[N]{ID: 0, Measure: nil, AggInst: nil, Err: nil} } i.pipeline.addSync(scope, instrumentSync{ // Use the first-seen name casing for this and all subsequent @@ -423,9 +427,9 @@ func (i *inserter[N]) cachedAggregator( compAgg: out, }) id := aggIDCount.Add(1) - return aggVal[N]{id, in, err} + return aggVal[N]{ID: id, Measure: in, AggInst: aggInst, Err: err} }) - return cv.Measure, cv.ID, cv.Err + return cv.Measure, cv.AggInst, cv.ID, cv.Err } // getCardinalityLimit returns the cardinality limit for the given instrument kind. @@ -505,7 +509,7 @@ func (i *inserter[N]) aggregateFunc( b aggregate.Builder[N], agg Aggregation, kind InstrumentKind, -) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) { +) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, aggInst any, err error) { switch a := agg.(type) { case AggregationDefault: return i.aggregateFunc(b, DefaultAggregationSelector(kind), kind) @@ -525,11 +529,11 @@ func (i *inserter[N]) aggregateFunc( case InstrumentKindObservableUpDownCounter: meas, comp = b.PrecomputedSum(false) case InstrumentKindCounter, InstrumentKindHistogram: - meas, comp = b.Sum(true) + meas, comp, aggInst = b.Sum(true) default: // InstrumentKindUpDownCounter, InstrumentKindObservableGauge, and // instrumentKindUndefined or other invalid instrument kinds. - meas, comp = b.Sum(false) + meas, comp, aggInst = b.Sum(false) } case AggregationExplicitBucketHistogram: var noSum bool @@ -562,7 +566,7 @@ func (i *inserter[N]) aggregateFunc( err = errUnknownAggregation } - return meas, comp, err + return meas, comp, aggInst, err } // isAggregatorCompatible checks if the aggregation can be used by the instrument. @@ -672,18 +676,22 @@ func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) reso // Aggregators returns the Aggregators that must be updated by the instrument // defined by key. -func (r resolver[N]) Aggregators(id Instrument, allowedKeys []attribute.Key) ([]aggregate.Measure[N], error) { - var measures []aggregate.Measure[N] +func (r resolver[N]) Aggregators(id Instrument, allowedKeys []attribute.Key) ([]aggregate.Measure[N], []any, error) { + var ( + measures []aggregate.Measure[N] + instances []any + ) var err error for _, i := range r.inserters { - in, e := i.Instrument(id, allowedKeys, i.readerDefaultAggregation(id.Kind)) + in, aggInst, e := i.Instrument(id, allowedKeys, i.readerDefaultAggregation(id.Kind)) if e != nil { err = errors.Join(err, e) } measures = append(measures, in...) + instances = append(instances, aggInst...) } - return measures, err + return measures, instances, err } // HistogramAggregators returns the histogram Aggregators that must be updated by the instrument @@ -693,8 +701,11 @@ func (r resolver[N]) HistogramAggregators( id Instrument, allowedKeys []attribute.Key, boundaries []float64, -) ([]aggregate.Measure[N], error) { - var measures []aggregate.Measure[N] +) ([]aggregate.Measure[N], []any, error) { + var ( + measures []aggregate.Measure[N] + instances []any + ) var err error for _, i := range r.inserters { @@ -703,11 +714,12 @@ func (r resolver[N]) HistogramAggregators( histAgg.Boundaries = boundaries agg = histAgg } - in, e := i.Instrument(id, allowedKeys, agg) + in, aggInst, e := i.Instrument(id, allowedKeys, agg) if e != nil { err = errors.Join(err, e) } measures = append(measures, in...) + instances = append(instances, aggInst...) } - return measures, err + return measures, instances, err } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index d2944f6add9..fbdd102d7bb 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -394,7 +394,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter, 0) i := newInserter[N](p, &c) readerAggregation := i.readerDefaultAggregation(tt.inst.Kind) - input, err := i.Instrument(tt.inst, nil, readerAggregation) + input, _, err := i.Instrument(tt.inst, nil, readerAggregation) var comps []aggregate.ComputeAggregation for _, instSyncs := range p.aggregations { for _, i := range instSyncs { @@ -419,7 +419,7 @@ func testInvalidInstrumentShouldPanic[N int64 | float64]() { Kind: InstrumentKind(255), } readerAggregation := i.readerDefaultAggregation(inst.Kind) - _, _ = i.Instrument(inst, nil, readerAggregation) + _, _, _ = i.Instrument(inst, nil, readerAggregation) } func TestInvalidInstrumentShouldPanic(t *testing.T) { @@ -435,7 +435,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) { inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} var c cache[string, instID] r := newResolver[int64](pipes, &c) - aggs, err := r.Aggregators(inst, nil) + aggs, _, err := r.Aggregators(inst, nil) require.NoError(t, err, "resolved Aggregators error") require.Len(t, aggs, 2, "instrument aggregators") @@ -516,7 +516,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} var c cache[string, instID] r := newResolver[int64](p, &c) - aggs, err := r.Aggregators(inst, nil) + aggs, _, err := r.Aggregators(inst, nil) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -526,7 +526,7 @@ func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, want inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} var c cache[string, instID] r := newResolver[float64](p, &c) - aggs, err := r.Aggregators(inst, nil) + aggs, _, err := r.Aggregators(inst, nil) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -536,7 +536,7 @@ func testPipelineRegistryResolveIntHistogramAggregators(t *testing.T, p pipeline inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} var c cache[string, instID] r := newResolver[int64](p, &c) - aggs, err := r.HistogramAggregators(inst, nil, []float64{1, 2, 3}) + aggs, _, err := r.HistogramAggregators(inst, nil, []float64{1, 2, 3}) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -546,7 +546,7 @@ func testPipelineRegistryResolveFloatHistogramAggregators(t *testing.T, p pipeli inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} var c cache[string, instID] r := newResolver[float64](p, &c) - aggs, err := r.HistogramAggregators(inst, nil, []float64{1, 2, 3}) + aggs, _, err := r.HistogramAggregators(inst, nil, []float64{1, 2, 3}) assert.NoError(t, err) require.Len(t, aggs, wantCount) @@ -575,20 +575,20 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { var vc cache[string, instID] ri := newResolver[int64](p, &vc) - intAggs, err := ri.Aggregators(inst, nil) + intAggs, _, err := ri.Aggregators(inst, nil) assert.Error(t, err) assert.Empty(t, intAggs) rf := newResolver[float64](p, &vc) - floatAggs, err := rf.Aggregators(inst, nil) + floatAggs, _, err := rf.Aggregators(inst, nil) assert.Error(t, err) assert.Empty(t, floatAggs) - intAggs, err = ri.HistogramAggregators(inst, nil, []float64{1, 2, 3}) + intAggs, _, err = ri.HistogramAggregators(inst, nil, []float64{1, 2, 3}) assert.Error(t, err) assert.Empty(t, intAggs) - floatAggs, err = rf.HistogramAggregators(inst, nil, []float64{1, 2, 3}) + floatAggs, _, err = rf.HistogramAggregators(inst, nil, []float64{1, 2, 3}) assert.Error(t, err) assert.Empty(t, floatAggs) } @@ -634,14 +634,14 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { var vc cache[string, instID] ri := newResolver[int64](p, &vc) - intAggs, err := ri.Aggregators(fooInst, nil) + intAggs, _, err := ri.Aggregators(fooInst, nil) assert.NoError(t, err) assert.Equal(t, 0, l.InfoN(), "no info logging should happen") assert.Len(t, intAggs, 1) // The Rename view should produce the same instrument without an error, the // default view should also cause a new aggregator to be returned. - intAggs, err = ri.Aggregators(barInst, nil) + intAggs, _, err = ri.Aggregators(barInst, nil) assert.NoError(t, err) assert.Equal(t, 0, l.InfoN(), "no info logging should happen") assert.Len(t, intAggs, 2) @@ -649,19 +649,19 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { // Creating a float foo instrument should log a warning because there is an // int foo instrument. rf := newResolver[float64](p, &vc) - floatAggs, err := rf.Aggregators(fooInst, nil) + floatAggs, _, err := rf.Aggregators(fooInst, nil) assert.NoError(t, err) assert.Equal(t, 1, l.InfoN(), "instrument conflict not logged") assert.Len(t, floatAggs, 1) fooInst = Instrument{Name: "foo-float", Kind: InstrumentKindCounter} - floatAggs, err = rf.Aggregators(fooInst, nil) + floatAggs, _, err = rf.Aggregators(fooInst, nil) assert.NoError(t, err) assert.Equal(t, 0, l.InfoN(), "no info logging should happen") assert.Len(t, floatAggs, 1) - floatAggs, err = rf.Aggregators(barInst, nil) + floatAggs, _, err = rf.Aggregators(barInst, nil) assert.NoError(t, err) // Both the rename and default view aggregators created above should now // conflict. Therefore, 2 warning messages should be logged. diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index b9bf4841ad8..acbaeda3fcc 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -153,7 +153,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { var c cache[string, instID] i := newInserter[N](test.pipe, &c) readerAggregation := i.readerDefaultAggregation(inst.Kind) - got, err := i.Instrument(inst, nil, readerAggregation) + got, _, err := i.Instrument(inst, nil, readerAggregation) require.NoError(t, err) assert.Len(t, got, 1, "default view not applied") for _, in := range got { @@ -380,7 +380,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { i := newInserter[int64](pipe, &vc) readerAggregation := i.readerDefaultAggregation(kind) - _, origID, err := i.cachedAggregator(scope, kind, stream, readerAggregation) + _, _, origID, err := i.cachedAggregator(scope, kind, stream, readerAggregation) require.NoError(t, err) require.Len(t, pipe.aggregations, 1) @@ -390,7 +390,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { require.Equal(t, name, iSync[0].name) stream.Name = "RequestCount" - _, id, err := i.cachedAggregator(scope, kind, stream, readerAggregation) + _, _, id, err := i.cachedAggregator(scope, kind, stream, readerAggregation) require.NoError(t, err) assert.Equal(t, origID, id, "multiple aggregators for equivalent name") @@ -614,7 +614,7 @@ func TestPipelineProduceErrors(t *testing.T) { // Set up an observable with callbacks var testObsID observableID[int64] aggBuilder := aggregate.Builder[int64]{Temporality: metricdata.CumulativeTemporality} - measure, _ := aggBuilder.Sum(true) + measure, _, _ := aggBuilder.Sum(true) pipe.addInt64Measure(testObsID, []aggregate.Measure[int64]{measure}) // Add an aggregation that just sets the data point value to the number of times the aggregation is invoked