diff --git a/test/e2e/grpc_test.go b/test/e2e/grpc_test.go index e9bea8552..ab65b626b 100644 --- a/test/e2e/grpc_test.go +++ b/test/e2e/grpc_test.go @@ -21,7 +21,7 @@ func TestGrpc(t *testing.T) { f.BuildAndRun("grpcclient", "-addr", "127.0.0.1:50051", "-name", "OpenTelemetry") f.Run("grpcclient", "-addr", "127.0.0.1:50051", "-stream") - testutil.WaitForSpanFlush(t) + f.WaitForSpans(4) // 2 traces (unary + stream) × 2 spans (client + server) f.RequireTraceCount(2) // unary + stream f.RequireSpansPerTrace(2) // client + server per trace diff --git a/test/integration/grpc_server_test.go b/test/integration/grpc_server_test.go index 75d94affc..d1596ac6a 100644 --- a/test/integration/grpc_server_test.go +++ b/test/integration/grpc_server_test.go @@ -47,7 +47,7 @@ func TestGRPCServer(t *testing.T) { client := NewGRPCClient(t, "localhost:50051") tc.exercise(t, client) - testutil.WaitForSpanFlush(t) + f.WaitForSpans(1) span := f.RequireSingleSpan() testutil.RequireGRPCServerSemconv(t, span, "greeter.Greeter", tc.method, 0) diff --git a/test/integration/grpc_shutdown_test.go b/test/integration/grpc_shutdown_test.go index bece77b64..b5e1bc75c 100644 --- a/test/integration/grpc_shutdown_test.go +++ b/test/integration/grpc_shutdown_test.go @@ -40,7 +40,7 @@ func TestGRPCServerTelemetryFlushOnSignal(t *testing.T) { require.NoError(t, cmd.Process.Signal(os.Interrupt)) waitForProcessExit(t, cmd, 10*time.Second) - testutil.WaitForSpanFlush(t) + f.WaitForSpans(1) spans := testutil.AllSpans(f.Traces()) require.NotEmpty(t, spans, "expected spans to be flushed on SIGINT shutdown") diff --git a/test/integration/http_server_test.go b/test/integration/http_server_test.go index 6ac5e9903..7ee764441 100644 --- a/test/integration/http_server_test.go +++ b/test/integration/http_server_test.go @@ -44,7 +44,7 @@ func TestHTTPServer(t *testing.T) { require.NoError(t, err) defer resp.Body.Close() require.Equal(t, http.StatusOK, resp.StatusCode) - testutil.WaitForSpanFlush(t) + f.WaitForSpans(1) span := f.RequireSingleSpan() testutil.RequireHTTPServerSemconv( diff --git a/test/testutil/collector.go b/test/testutil/collector.go index cd3fe2e4a..2614f5bd2 100644 --- a/test/testutil/collector.go +++ b/test/testutil/collector.go @@ -27,6 +27,23 @@ func (c *Collector) GetTraces() ptrace.Traces { return c.traces } +// SpanCount returns the total number of collected spans under the lock. +func (c *Collector) SpanCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return countSpans(c.traces) +} + +func countSpans(td ptrace.Traces) int { + n := 0 + for i := range td.ResourceSpans().Len() { + for j := range td.ResourceSpans().At(i).ScopeSpans().Len() { + n += td.ResourceSpans().At(i).ScopeSpans().At(j).Spans().Len() + } + } + return n +} + // StartCollector starts an in-memory OTLP HTTP server that collects traces func StartCollector(t *testing.T) *Collector { c := &Collector{traces: ptrace.NewTraces()} diff --git a/test/testutil/fixture.go b/test/testutil/fixture.go index 8170d6b18..90403da8e 100644 --- a/test/testutil/fixture.go +++ b/test/testutil/fixture.go @@ -142,6 +142,12 @@ func (f *TestFixture) RequireSpansPerTrace(expected int) { } } +// WaitForSpans polls until at least minSpans spans arrive in the collector. +func (f *TestFixture) WaitForSpans(minSpans int) { + f.t.Helper() + WaitForSpans(f.t, f.collector, minSpans) +} + // RequireSingleSpan asserts exactly 1 trace with 1 span and returns it. func (f *TestFixture) RequireSingleSpan() ptrace.Span { f.RequireTraceCount(1) diff --git a/test/testutil/readiness.go b/test/testutil/readiness.go index fe7552b9d..c6ea03e92 100644 --- a/test/testutil/readiness.go +++ b/test/testutil/readiness.go @@ -14,6 +14,8 @@ import ( const ( defaultReadinessTimeout = 10 * time.Second defaultReadinessInterval = 100 * time.Millisecond + defaultSpanPollTimeout = 3 * time.Second + defaultSpanPollInterval = 25 * time.Millisecond ) // WaitForTCP waits until a TCP connection can be established. @@ -29,8 +31,22 @@ func WaitForTCP(t *testing.T, addr string) { }, defaultReadinessTimeout, defaultReadinessInterval, "timeout waiting for TCP readiness at %s", addr) } -// WaitForSpanFlush waits for spans to be flushed to collector. -func WaitForSpanFlush(t *testing.T) { +// WaitForSpans polls the collector until at least minSpans spans are received or the timeout expires. +func WaitForSpans(t *testing.T, c *Collector, minSpans int) { t.Helper() - time.Sleep(200 * time.Millisecond) + if !pollForSpans(c, minSpans, defaultSpanPollTimeout) { + t.Fatalf("timeout waiting for %d span(s), collector has %d", minSpans, c.SpanCount()) + } +} + +// pollForSpans returns true if at least minSpans spans arrive within timeout. +func pollForSpans(c *Collector, minSpans int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + if c.SpanCount() >= minSpans { + return true + } + time.Sleep(defaultSpanPollInterval) + } + return false } diff --git a/test/testutil/readiness_test.go b/test/testutil/readiness_test.go new file mode 100644 index 000000000..e27480cee --- /dev/null +++ b/test/testutil/readiness_test.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package testutil + +import ( + "bytes" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// sendSpan posts a single span to the collector via its HTTP endpoint. +func sendSpan(t *testing.T, c *Collector) { + t.Helper() + td := ptrace.NewTraces() + rs := td.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + ss.Spans().AppendEmpty() + + var m ptrace.ProtoMarshaler + data, err := m.MarshalTraces(td) + require.NoError(t, err) + + resp, err := http.Post(c.URL+"/v1/traces", "application/x-protobuf", bytes.NewReader(data)) + require.NoError(t, err) + defer resp.Body.Close() + require.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestWaitForSpans_returnsWhenSpansArrive(t *testing.T) { + c := StartCollector(t) + sendSpan(t, c) + + start := time.Now() + WaitForSpans(t, c, 1) + assert.Less(t, time.Since(start), 100*time.Millisecond) +} + +func TestWaitForSpans_pollsUntilSpansArrive(t *testing.T) { + c := StartCollector(t) + + go func() { + time.Sleep(100 * time.Millisecond) + sendSpan(t, c) + }() + + WaitForSpans(t, c, 1) + assert.Equal(t, 1, c.SpanCount()) +} + +func TestWaitForSpans_timesOut(t *testing.T) { + c := StartCollector(t) + // No spans sent — polling should report failure within the short timeout. + ok := pollForSpans(c, 1, 50*time.Millisecond) + assert.False(t, ok) +}