Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions internal/envconfig/envconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ var (
// unconditionally.
XDSEndpointHashKeyBackwardCompat = boolFromEnv("GRPC_XDS_ENDPOINT_HASH_KEY_BACKWARD_COMPAT", false)

// LabelServerGoroutines controls setting [runtime/pprof.Labels] on the
// goroutines spawned for various reasons.
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
// For now, this is limited to the goroutines spawned to handle incoming requests on the server.
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
// Set "GRPC_GO_SERVER_GOROUTINE_LABELS" to "grpc.method=false" to disable this grpc.method label.
// The right-hand-side of the key=value pairs is parsed by
// [strconv.ParseBool], so other values are supported including 1, 0, f, and t.
// This variable is a bit-field
LabelServerGoroutines = goroutineLabelsFromEnv("GRPC_GO_SERVER_GOROUTINE_LABELS", GoroutineLabelServerMethod)

// RingHashSetRequestHashKey is set if the ring hash balancer can get the
// request hash header by setting the "requestHashHeader" field, according
// to gRFC A76. It can be disabled by setting the environment variable
Expand Down Expand Up @@ -166,3 +175,47 @@ func uint64FromEnv(envVar string, def, min, max uint64) uint64 {
}
return v
}

// GoroutineLabels is a bitfield indicating which goroutine labels are enabled
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
type GoroutineLabels uint16

func goroutineLabelsFromEnv(envVar string, def GoroutineLabels) GoroutineLabels {
val := def
v := os.Getenv(envVar)
for s := range strings.SplitSeq(v, ",") {
Comment thread
arjan-bal marked this conversation as resolved.
s = strings.TrimSpace(s)
if len(s) == 0 {
continue
}
pre, post, ok := strings.Cut(s, "=")
if !ok {
// no equals sign
continue
}
post = strings.TrimSpace(post)
pre = strings.TrimSpace(pre)
entVal := GoroutineLabels(0)
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
rhs, parseErr := strconv.ParseBool(post)
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
if parseErr != nil {
continue
}
switch {
case strings.EqualFold(pre, "grpc.method"):
entVal = GoroutineLabelServerMethod
default:
// ignore
}
switch rhs {
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
case false:
val &^= entVal
case true:
val |= entVal
}
}
return val
}

const (
// Set the grpc.method label on new server-side gRPC streams
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
GoroutineLabelServerMethod GoroutineLabels = 1 << iota
)
85 changes: 85 additions & 0 deletions internal/envconfig/envconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,88 @@ func (s) TestBoolFromEnv(t *testing.T) {
})
}
}

func (s) TestGoroutineLabelsFromEnv(t *testing.T) {
var testCases = []struct {
name string
val string
def GoroutineLabels
want GoroutineLabels
}{
{
name: "unset_env_non-zero_default",
val: "",
def: GoroutineLabelServerMethod,
want: GoroutineLabelServerMethod,
}, {
name: "unset_env_zero_default",
val: "",
def: 0,
want: 0,
}, {
name: "force-enable_zero_default",
val: "grpc.method=true",
def: 0,
want: GoroutineLabelServerMethod,
}, {
name: "force-enable_zero_default_with_whitespace",
val: " grpc.method\t= true",
def: 0,
want: GoroutineLabelServerMethod,
}, {
name: "force-enable_zero_default_with_other_garbage",
val: "grpc.method=true,foobar",
def: 0,
want: GoroutineLabelServerMethod,
}, {
name: "force-enable_numeric_zero_default_with_other_garbage",
val: "grpc.method=1,foobar",
def: 0,
want: GoroutineLabelServerMethod,
}, {
name: "force-disable_zero_default",
val: "grpc.method=false",
def: 0,
want: 0,
}, {
name: "force-disable_non-zero_default",
val: "grpc.method=false",
def: GoroutineLabelServerMethod,
want: 0,
}, {
name: "force-disable_non-zero_default_numeric",
val: "grpc.method=0",
def: GoroutineLabelServerMethod,
want: 0,
}, {
name: "unknown_val_no_equal",
val: "grpc.unknown.garbage",
def: GoroutineLabelServerMethod,
want: GoroutineLabelServerMethod,
}, {
name: "unknown_val",
val: "grpc.unknown.garbage=fooble",
def: GoroutineLabelServerMethod,
want: GoroutineLabelServerMethod,
}, {
name: "unparseable_rhs",
val: "grpc.method=quux",
def: GoroutineLabelServerMethod,
want: GoroutineLabelServerMethod,
},
}
for _, tc := range testCases {
t.Run("", func(t *testing.T) {
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
const testVar = "testvar"
if tc.val == "" {
os.Unsetenv(testVar)
} else {
os.Setenv(testVar, tc.val)
}
if got := goroutineLabelsFromEnv(testVar, tc.def); got != tc.want {
t.Errorf("goroutineLabelsFromEnv(%q(=%q), %v) = %v; want %v", testVar, tc.val, tc.def, got, tc.want)
}
})
}

}
7 changes: 7 additions & 0 deletions server.go
Comment thread
arjan-bal marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/http"
"reflect"
"runtime"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1785,6 +1786,12 @@ func (s *Server) handleMalformedMethodName(stream *transport.ServerStream, ti *t
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.ServerStream) {
ctx := stream.Context()
ctx = contextWithServer(ctx, s)
if envconfig.LabelServerGoroutines&envconfig.GoroutineLabelServerMethod != 0 {
// This method always runs in its own goroutine, so we can set a
// goroutine label without needing to restore a previous context.
ctx = pprof.WithLabels(ctx, pprof.Labels("grpc.method", stream.Method()))
pprof.SetGoroutineLabels(ctx)
}
var ti *traceInfo
if EnableTracing {
tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
Expand Down
106 changes: 106 additions & 0 deletions test/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ package test
import (
"context"
"io"
"runtime/pprof"
"sync/atomic"
"testing"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -70,6 +72,110 @@ func (s) TestServerReturningContextError(t *testing.T) {

}

func pprofCtxCollectLabels(ctx context.Context) map[string]string {
seenLabels := map[string]string{}
pprof.ForLabels(ctx, func(k, val string) bool {
seenLabels[k] = val
return true
})
return seenLabels
}

// TestServerSetGoroutineLabelsInContext verifies that when enabled, the
// grpc.method runtime/pprof goroutine label gets set in the context that's
// passed to the handlers
func (s) TestServerSetGoroutineLabelsInContext(t *testing.T) {
oldGoroutineLabelCfg := envconfig.LabelServerGoroutines
defer func() { envconfig.LabelServerGoroutines = oldGoroutineLabelCfg }()
envconfig.LabelServerGoroutines = envconfig.GoroutineLabelServerMethod
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
ctxLabels := pprofCtxCollectLabels(ctx)
if val, ok := ctxLabels["grpc.method"]; !ok {
t.Errorf("missing \"grpc.method\" label; found labels: %v", ctxLabels)
} else if expVal := "/grpc.testing.TestService/EmptyCall"; val != expVal {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Here and elsewhere s/expVal/wantVal

See: https://google.github.io/styleguide/go/decisions#got-before-want

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

t.Errorf("unexpected value for \"grpc.method\" label %q; want %q", ctxLabels["grpc.method"], expVal)
}
return &testpb.Empty{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
ctxLabels := pprofCtxCollectLabels(stream.Context())
if val, ok := ctxLabels["grpc.method"]; !ok {
t.Errorf("missing \"grpc.method\" label; found labels: %v", ctxLabels)
} else if expVal := "/grpc.testing.TestService/FullDuplexCall"; val != expVal {
t.Errorf("unexpected value for \"grpc.method\" label %q; want %q", ctxLabels["grpc.method"], expVal)
}
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
if err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The assignment and the conditional can be on the same line. Here and elsewhere in this test.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
Done.

t.Fatalf("ss.Client.EmptyCall() got error %v; want OK", err)
}

stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("unexpected error starting the stream: %v", err)
}
_, err = stream.Recv()
if err != io.EOF {
t.Fatalf("ss.Client.FullDuplexCall().Recv() got error %v; want io.EOF", err)
}
}

// TestServerSetGoroutineLabelsInContextEnvVarDisabled verifies that when disable, the
// grpc.method runtime/pprof goroutine label does _not_ get set in the context that's
// passed to the handlers
func (s) TestServerSetGoroutineLabelsInContextEnvVarDisabled(t *testing.T) {
oldGoroutineLabelCfg := envconfig.LabelServerGoroutines
defer func() { envconfig.LabelServerGoroutines = oldGoroutineLabelCfg }()
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
// clear the existing value
envconfig.LabelServerGoroutines = 0
ss := &stubserver.StubServer{
EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
ctxLabels := pprofCtxCollectLabels(ctx)
if val, ok := ctxLabels["grpc.method"]; ok {
t.Errorf("\"grpc.method\" label set with value %q; found labels: %v", val, ctxLabels)
}
return &testpb.Empty{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
ctxLabels := pprofCtxCollectLabels(stream.Context())
if val, ok := ctxLabels["grpc.method"]; ok {
t.Errorf("\"grpc.method\" label set with value %q; found labels: %v", val, ctxLabels)
}
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
if err != nil {
t.Fatalf("ss.Client.EmptyCall() got error %v; want OK", err)
}

stream, err := ss.Client.FullDuplexCall(ctx)
if err != nil {
t.Fatalf("unexpected error starting the stream: %v", err)
}
_, err = stream.Recv()
if err != io.EOF {
t.Fatalf("ss.Client.FullDuplexCall().Recv() got error %v; want io.EOF", err)
}
}

func (s) TestChainUnaryServerInterceptor(t *testing.T) {
var (
firstIntKey = ctxKey("firstIntKey")
Expand Down
Loading