Skip to content
2 changes: 1 addition & 1 deletion test/e2e/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestGrpc(t *testing.T) {

f.BuildAndRun("grpcclient", "-addr", "127.0.0.1:50051", "-name", "OpenTelemetry")
f.Run("grpcclient", "-addr", "127.0.0.1:50051", "-stream")
testutil.WaitForSpanFlush(t)
f.WaitForSpans(4) // 2 traces (unary + stream) × 2 spans (client + server)

f.RequireTraceCount(2) // unary + stream
f.RequireSpansPerTrace(2) // client + server per trace
Expand Down
2 changes: 1 addition & 1 deletion test/integration/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestGRPCServer(t *testing.T) {

client := NewGRPCClient(t, "localhost:50051")
tc.exercise(t, client)
testutil.WaitForSpanFlush(t)
f.WaitForSpans(1)

span := f.RequireSingleSpan()
testutil.RequireGRPCServerSemconv(t, span, "greeter.Greeter", tc.method, 0)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/grpc_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGRPCServerTelemetryFlushOnSignal(t *testing.T) {

require.NoError(t, cmd.Process.Signal(os.Interrupt))
waitForProcessExit(t, cmd, 10*time.Second)
testutil.WaitForSpanFlush(t)
f.WaitForSpans(1)

spans := testutil.AllSpans(f.Traces())
require.NotEmpty(t, spans, "expected spans to be flushed on SIGINT shutdown")
Expand Down
2 changes: 1 addition & 1 deletion test/integration/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestHTTPServer(t *testing.T) {
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
testutil.WaitForSpanFlush(t)
f.WaitForSpans(1)

span := f.RequireSingleSpan()
testutil.RequireHTTPServerSemconv(
Expand Down
17 changes: 17 additions & 0 deletions test/testutil/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,23 @@ func (c *Collector) GetTraces() ptrace.Traces {
return c.traces
}

// SpanCount returns the total number of collected spans under the lock.
func (c *Collector) SpanCount() int {
c.mu.Lock()
defer c.mu.Unlock()
return countSpans(c.traces)
}

func countSpans(td ptrace.Traces) int {
n := 0
for i := range td.ResourceSpans().Len() {
for j := range td.ResourceSpans().At(i).ScopeSpans().Len() {
n += td.ResourceSpans().At(i).ScopeSpans().At(j).Spans().Len()
}
}
return n
}

// StartCollector starts an in-memory OTLP HTTP server that collects traces
func StartCollector(t *testing.T) *Collector {
c := &Collector{traces: ptrace.NewTraces()}
Expand Down
6 changes: 6 additions & 0 deletions test/testutil/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ func (f *TestFixture) RequireSpansPerTrace(expected int) {
}
}

// WaitForSpans polls until at least minSpans spans arrive in the collector.
func (f *TestFixture) WaitForSpans(minSpans int) {
f.t.Helper()
WaitForSpans(f.t, f.collector, minSpans)
}

// RequireSingleSpan asserts exactly 1 trace with 1 span and returns it.
func (f *TestFixture) RequireSingleSpan() ptrace.Span {
f.RequireTraceCount(1)
Expand Down
22 changes: 19 additions & 3 deletions test/testutil/readiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
const (
defaultReadinessTimeout = 10 * time.Second
defaultReadinessInterval = 100 * time.Millisecond
defaultSpanPollTimeout = 3 * time.Second
defaultSpanPollInterval = 25 * time.Millisecond
)

// WaitForTCP waits until a TCP connection can be established.
Expand All @@ -29,8 +31,22 @@ func WaitForTCP(t *testing.T, addr string) {
}, defaultReadinessTimeout, defaultReadinessInterval, "timeout waiting for TCP readiness at %s", addr)
}

// WaitForSpanFlush waits for spans to be flushed to collector.
func WaitForSpanFlush(t *testing.T) {
// WaitForSpans polls the collector until at least minSpans spans are received or the timeout expires.
func WaitForSpans(t *testing.T, c *Collector, minSpans int) {
t.Helper()
time.Sleep(200 * time.Millisecond)
if !pollForSpans(c, minSpans, defaultSpanPollTimeout) {
t.Fatalf("timeout waiting for %d span(s), collector has %d", minSpans, c.SpanCount())
}
}

// pollForSpans returns true if at least minSpans spans arrive within timeout.
func pollForSpans(c *Collector, minSpans int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if c.SpanCount() >= minSpans {
return true
}
time.Sleep(defaultSpanPollInterval)
}
return false
}
61 changes: 61 additions & 0 deletions test/testutil/readiness_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package testutil

import (
"bytes"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// sendSpan posts a single span to the collector via its HTTP endpoint.
func sendSpan(t *testing.T, c *Collector) {
t.Helper()
td := ptrace.NewTraces()
rs := td.ResourceSpans().AppendEmpty()
ss := rs.ScopeSpans().AppendEmpty()
ss.Spans().AppendEmpty()

var m ptrace.ProtoMarshaler
data, err := m.MarshalTraces(td)
require.NoError(t, err)

resp, err := http.Post(c.URL+"/v1/traces", "application/x-protobuf", bytes.NewReader(data))
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusOK, resp.StatusCode)
}

func TestWaitForSpans_returnsWhenSpansArrive(t *testing.T) {
c := StartCollector(t)
sendSpan(t, c)

start := time.Now()
WaitForSpans(t, c, 1)
assert.Less(t, time.Since(start), 100*time.Millisecond)
}

func TestWaitForSpans_pollsUntilSpansArrive(t *testing.T) {
c := StartCollector(t)

go func() {
time.Sleep(100 * time.Millisecond)
sendSpan(t, c)
}()

WaitForSpans(t, c, 1)
assert.Equal(t, 1, c.SpanCount())
}

func TestWaitForSpans_timesOut(t *testing.T) {
c := StartCollector(t)
// No spans sent — polling should report failure within the short timeout.
ok := pollForSpans(c, 1, 50*time.Millisecond)
assert.False(t, ok)
}
Loading