Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,7 @@ func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerSt
return nil
}
ctx = NewContextWithServerTransportStream(ctx, stream)
ctx = context.WithValue(ctx, compressKey{}, &compressOptions{})
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
if appErr != nil {
appStatus, ok := status.FromError(appErr)
Expand Down Expand Up @@ -1470,7 +1471,11 @@ func (s *Server) processUnaryRPC(ctx context.Context, stream *transport.ServerSt
if stream.SendCompress() != sendCompressorName {
comp = encoding.GetCompressor(stream.SendCompress())
}
if err := s.sendResponse(ctx, stream, reply, cp, opts, comp); err != nil {
compV0, compV1 := cp, comp
if opts, ok := ctx.Value(compressKey{}).(*compressOptions); ok && opts.DoNotCompress {
compV0, compV1 = nil, nil
}
Comment on lines +1473 to +1476
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is executed before the server handler is invoked. This would mean that stream.IsCompressionEnabled would always return true. Am I missing something here? Do we really need this check here?

And if we really need this check, why is it missing for the streaming case?

if err := s.sendResponse(ctx, stream, reply, compV0, opts, compV1); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
Expand Down Expand Up @@ -1585,6 +1590,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv
sh.HandleRPC(ctx, statsBegin)
}
ctx = NewContextWithServerTransportStream(ctx, stream)
ctx = context.WithValue(ctx, compressKey{}, &compressOptions{})
ss := &serverStream{
ctx: ctx,
s: stream,
Expand Down
47 changes: 44 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package grpc
import (
"context"
"errors"
"fmt"
"io"
"math"
rand "math/rand/v2"
Expand Down Expand Up @@ -51,6 +52,33 @@ import (

var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))

// SetMessageCompression enables or disables per-message compression on a stream
// if a compressor is specified for the stream (e.g. using UseCompressor or
// SetSendCompressor) and if the encoding type is supported by the receiver.
// By default, message compression is enabled, but is a no-op if compression
// is not enabled on the stream.
//
// This method must not be called concurrently with SendMsg.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we instead have the enableCompression field in transport.ServerStream be an atomic.Bool and remove this restriction that this method not be called concurrently with SendMsg?

I don't expect people to invoke this method repeatedly to cause any performance concerns. Therefore, simplifying the API seems like a better win.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is advisable. If a call to SendMessage is in flight and someone toggles compression during that call, we don't want to non-deterministically have compression for that message. I think it's better to leave the messaging about not calling it concurrently with SendMessage, and it's even better without the atomic so that tsan can detect the race for you.

Sorry for the conflicting advice.

//
// # Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetMessageCompression(ctx context.Context, enable bool) error {
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
opts, ok := ctx.Value(compressKey{}).(*compressOptions)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few things here that I'm not convinced about:

  • We are adding the compressKey for every stream context whether or not condition compression enabling/disabling will be used or not. I'm not sure how much of a performance overhead this would be. @arjan-bal : Do you have any thoughts?
  • We are modifying a value stored in a context here, which breaks the guarantee that contexts are immutable in Go. I agree that we do have this pattern in a few places already in our codebase, but if we can avoid it, that would be great. One option is to avoid it would be not store a pointer to compressOptions, but instead store it by value, and change this function to return a new context that contains compressKey with the given value. This would mean that the caller on the client and the server would have to use the newly returned context going forward. @dfawley : Do you have any objections to change this function to return a context instead?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I thought about this approach too, but returning a new context doesn't work for streams.
The reason is that serverStream.SendMsg reads from ss.ctx — the context that was captured when the stream was created. Even if SetMessageCompression returns a new context, ss.ctx inside the stream object doesn't change, so SendMsg would never pick it up. The same issue applies to clientStream and addrConnStream.
Since there's no way to update the stream's internal context from the outside, mutating the pointer was the approach I went with. That said, I'm open to other ideas if there's a cleaner way to handle this — happy to adjust if you have something in mind.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, I wanted to share my thinking on the compressKey overhead. Since SetMessageCompression is already a no-op without a compressor, I considered adding it only when a compressor is actually set. But the tricky part is when SetSendCompressor is called first — the compressor isn't known at stream creation time. So I kept it as is for now. Happy to explore this further if you think it's worth optimizing.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thought — to address the mutability concern, I'm planning to
switch from plain bool to atomic.Bool in compressOptions. Would that work?

Copy link
Copy Markdown
Contributor

@arjan-bal arjan-bal Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presently we have ~24 allocs/op for streaming RPCs and ~140 allocs/op for unary RPCs for client and server side combined. Even a single extra heap allocation does show up as a <1% QPS drop in our benchmarks when sending very small messages. Here is how to run the benchmarks for unary RPCs:

$ git checkout master
$ RUN_NAME=unary-before
$ go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \
   -compression=off -maxConcurrentCalls=200 -trace=off \
   -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}"

$ git checkout feature-branch
$ RUN_NAME=unary-after
$ go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \
   -compression=off -maxConcurrentCalls=200 -trace=off \
   -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}"

# Compare the results
$ go run benchmark/benchresult/main.go unary-before unary-after

We should ensure that users not utilizing the new SetMessageCompression API avoid the extra allocation. Could we treat the absence of compressKey{} as "compression enabled" to match current behavior? This should should avoid heap allocations for existing use-cases.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the benchmarks and the results confirm your concern:

  • Allocs/op: 143 → 147 (+2.79%)
  • 99th percentile latency: 2.636ms → 3.051ms (+15.74%)

I'll fix this by treating the absence of compressKey as
"compression enabled", so users not using SetMessageCompression
avoid the extra allocation entirely.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the implementation to address the allocation concern.

Server side: moved doNotCompress into transport.ServerStream directly, no context allocation needed.
Client side: compressKey is added to context only when a compressor is set, so users not using SetMessageCompression get zero extra allocations.

Benchmark results (unary, compression=off):

           Title       Before        After Percentage
        TotalOps      8318091      8070904    -2.97%
        Bytes/op      9927.29      9883.28    -0.44%
       Allocs/op       143.22       143.03     0.00%
        50th-Lat   1.210583ms   1.255084ms     3.68%
        90th-Lat   2.083459ms   2.034416ms    -2.35%
        99th-Lat   4.868417ms   4.718292ms    -3.08%
         Avg-Lat   1.440822ms   1.483993ms     3.00%

Benchmark results (unary, compression=gzip):

           Title       Before        After Percentage
        TotalOps      2591595      2578328    -0.51%
        Bytes/op     14332.39     14312.50    -0.14%
       Allocs/op       183.36       185.21    +1.09%
        50th-Lat   4.095041ms     4.1095ms     0.35%
        90th-Lat   6.164916ms   6.204125ms     0.64%
        99th-Lat  15.452875ms   15.49425ms     0.27%
         Avg-Lat   4.624957ms   4.648262ms     0.50%

The +2 allocs in gzip case is expected since compressKey is only added when a compressor is set.

@arjan-bal @easwars
This is one of my first contributions to the project, so any guidance or feedback would be greatly appreciated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for optimizing this. The results seem good.

if !ok || opts == nil {
return fmt.Errorf("grpc: SetMessageCompression called on an uninitialized or non-stream context")
Comment thread
arjan-bal marked this conversation as resolved.
Outdated
}
opts.DoNotCompress = !enable
return nil
}

type compressOptions struct {
DoNotCompress bool
Comment thread
Dostonlv marked this conversation as resolved.
Outdated
}

type compressKey struct{}

// StreamHandler defines the handler called by gRPC server to complete the
// execution of a streaming RPC. srv is the service implementation on which the
// RPC was invoked.
Expand Down Expand Up @@ -299,6 +327,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
} else {
ctx, cancel = context.WithCancel(ctx)
}
ctx = context.WithValue(ctx, compressKey{}, &compressOptions{})
defer func() {
if err != nil {
cancel()
Expand Down Expand Up @@ -954,7 +983,11 @@ func (cs *clientStream) SendMsg(m any) (err error) {
}

// load hdr, payload, data
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, cs.compressorV0, cs.compressorV1, cs.cc.dopts.copts.BufferPool)
compV0, compV1 := cs.compressorV0, cs.compressorV1
if opts, ok := cs.ctx.Value(compressKey{}).(*compressOptions); ok && opts.DoNotCompress {
compV0, compV1 = nil, nil
}
hdr, data, payload, pf, err := prepareMsg(m, cs.codec, compV0, compV1, cs.cc.dopts.copts.BufferPool)
if err != nil {
return err
}
Expand Down Expand Up @@ -1461,7 +1494,11 @@ func (as *addrConnStream) SendMsg(m any) (err error) {
}

// load hdr, payload, data
hdr, data, payload, pf, err := prepareMsg(m, as.codec, as.sendCompressorV0, as.sendCompressorV1, as.ac.dopts.copts.BufferPool)
compV0, compV1 := as.sendCompressorV0, as.sendCompressorV1
if opts, ok := as.ctx.Value(compressKey{}).(*compressOptions); ok && opts.DoNotCompress {
compV0, compV1 = nil, nil
}
hdr, data, payload, pf, err := prepareMsg(m, as.codec, compV0, compV1, as.ac.dopts.copts.BufferPool)
if err != nil {
return err
}
Expand Down Expand Up @@ -1741,7 +1778,11 @@ func (ss *serverStream) SendMsg(m any) (err error) {
}

// load hdr, payload, data
hdr, data, payload, pf, err := prepareMsg(m, ss.codec, ss.compressorV0, ss.compressorV1, ss.p.bufferPool)
compV0, compV1 := ss.compressorV0, ss.compressorV1
if opts, ok := ss.ctx.Value(compressKey{}).(*compressOptions); ok && opts.DoNotCompress {
compV0, compV1 = nil, nil
}
hdr, data, payload, pf, err := prepareMsg(m, ss.codec, compV0, compV1, ss.p.bufferPool)
if err != nil {
return err
}
Expand Down
136 changes: 136 additions & 0 deletions test/compressor_test.go
Comment thread
arjan-bal marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"io"
"reflect"
"strings"
"sync/atomic"
"testing"

"google.golang.org/grpc"
Expand All @@ -34,6 +35,7 @@ import (
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"

testgrpc "google.golang.org/grpc/interop/grpc_testing"
Expand Down Expand Up @@ -692,3 +694,137 @@ func (s) TestGzipBadChecksum(t *testing.T) {
t.Errorf("ss.Client.UnaryCall(_) = _, %v\n\twant: _, status(codes.Internal, contains %q)", err, gzip.ErrChecksum)
}
}

type statsHandler struct {
Comment thread
Dostonlv marked this conversation as resolved.
Outdated
stats.Handler
compress atomic.Int32
decompress atomic.Int32
}

func (h *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { return ctx }
func (h *statsHandler) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { return ctx }
func (h *statsHandler) HandleConn(context.Context, stats.ConnStats) {}
func (h *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
switch st := s.(type) {
case *stats.OutPayload:
if st.CompressedLength < st.Length {
h.compress.Add(1)
}
case *stats.InPayload:
if st.CompressedLength < st.Length {
h.decompress.Add(1)
}
}
}

func (s) TestMessageCompression_Stream(t *testing.T) {
Comment thread
Dostonlv marked this conversation as resolved.
Outdated

Comment thread
Dostonlv marked this conversation as resolved.
Outdated
sh := &statsHandler{}
ss := &stubserver.StubServer{
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
if _, err := stream.Recv(); err != nil {
return err
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: &testpb.Payload{Body: make([]byte, 1000)},
}); err != nil {
return err
}
if _, err := stream.Recv(); err != nil {
return err
}
if err := grpc.SetMessageCompression(stream.Context(), false); err != nil {
return err
}
if err := stream.Send(&testpb.StreamingOutputCallResponse{
Payload: &testpb.Payload{Body: make([]byte, 1000)},
}); err != nil {
return err
}
if _, err := stream.Recv(); err != nil {
return err
}
if err := grpc.SetMessageCompression(stream.Context(), true); err != nil {
return err
}
return stream.Send(&testpb.StreamingOutputCallResponse{
Payload: &testpb.Payload{Body: make([]byte, 1000)},
})
},
}

if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting server: %v", err)
}
defer ss.Stop()
if err := ss.StartClient(grpc.WithStatsHandler(sh)); err != nil {
Comment thread
Dostonlv marked this conversation as resolved.
Outdated
t.Fatalf("Error starting client: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

stream, err := ss.Client.FullDuplexCall(ctx, grpc.UseCompressor("gzip"))
if err != nil {
t.Fatalf("FullDuplexCall failed: %v", err)
}

// 1. Send first compressed message
stream.Send(&testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: make([]byte, 1000)}})
stream.Recv()
if sh.compress.Load() != 1 || sh.decompress.Load() != 1 {
t.Fatalf("After Call 1, expected 1, 1. got %d, %d", sh.compress.Load(), sh.decompress.Load())
Comment thread
Dostonlv marked this conversation as resolved.
Outdated
}

// 2. Disable message compression and send second message
grpc.SetMessageCompression(stream.Context(), false)
stream.Send(&testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: make([]byte, 1000)}})
stream.Recv()
if sh.compress.Load() != 1 || sh.decompress.Load() != 1 {
t.Fatalf("After Call 2, expected 1, 1. got %d, %d", sh.compress.Load(), sh.decompress.Load())
}

// 3. Enable message compression and send third message
grpc.SetMessageCompression(stream.Context(), true)
stream.Send(&testpb.StreamingOutputCallRequest{Payload: &testpb.Payload{Body: make([]byte, 1000)}})
stream.Recv()
if sh.compress.Load() != 2 || sh.decompress.Load() != 2 {
t.Fatalf("After Call 3, expected 2, 2. got %d, %d", sh.compress.Load(), sh.decompress.Load())
}
}

func (s) TestMessageCompression_Unary(t *testing.T) {
sh := &statsHandler{}
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
grpc.SetSendCompressor(ctx, "gzip")
if in.ResponseSize == 0 {
grpc.SetMessageCompression(ctx, false)
}
return &testpb.SimpleResponse{Payload: &testpb.Payload{Body: make([]byte, 10000)}}, nil
},
}

if err := ss.Start(nil); err != nil {
t.Fatalf("Error starting server: %v", err)
}
defer ss.Stop()
if err := ss.StartClient(grpc.WithStatsHandler(sh)); err != nil {
t.Fatalf("Error starting client: %v", err)
}

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

// Call 1: Compression ON
ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 1, Payload: &testpb.Payload{Body: make([]byte, 1000)}}, grpc.UseCompressor("gzip"))
if sh.compress.Load() != 1 || sh.decompress.Load() != 1 {
t.Fatalf("Expected 1/1, got %d/%d", sh.compress.Load(), sh.decompress.Load())
}

// Call 2: Compression OFF (for response, but request is still compressed by UseCompressor)
ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 0, Payload: &testpb.Payload{Body: make([]byte, 1000)}}, grpc.UseCompressor("gzip"))
if sh.compress.Load() != 2 || sh.decompress.Load() != 1 {
t.Fatalf("Expected 2/1, got %d/%d", sh.compress.Load(), sh.decompress.Load())
}
}
Loading