diff --git a/konnectivity-client/pkg/client/client.go b/konnectivity-client/pkg/client/client.go index 845a38c11..06b5f78ef 100644 --- a/konnectivity-client/pkg/client/client.go +++ b/konnectivity-client/pkg/client/client.go @@ -179,21 +179,44 @@ func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context, return nil, err } - grpcClient := client.NewProxyServiceClient(c) + proxyClient := client.NewProxyServiceClient(c) - stream, err := grpcClient.Proxy(tunnelCtx) + tunnel, err := newSingleStreamTunnel(tunnelCtx, proxyClient, c.Close) if err != nil { c.Close() return nil, err } - tunnel := newUnstartedTunnel(stream, c) + return tunnel, nil +} + +// newSingleStreamTunnel creates a Tunnel backed by a single Proxy stream on +// proxyClient. closeFn is invoked from the serve() goroutine on exit (via the +// clientConn.Close interface), in place of the previous unconditional +// grpcConn.Close(). +// +// tunnelCtx bounds both Proxy() RPC establishment and the resulting stream's +// serve goroutine, matching the existing CreateSingleUseGrpcTunnelWithContext +// semantics. Callers wanting to bound only establishment must do so themselves. +func newSingleStreamTunnel(tunnelCtx context.Context, proxyClient client.ProxyServiceClient, closeFn func() error) (Tunnel, error) { + stream, err := proxyClient.Proxy(tunnelCtx) + if err != nil { + return nil, err + } + + tunnel := newUnstartedTunnel(stream, closerFunc(closeFn)) go tunnel.serve(tunnelCtx) return tunnel, nil } +// closerFunc adapts a func() error to the clientConn interface (which only +// requires Close() error). +type closerFunc func() error + +func (c closerFunc) Close() error { return c() } + func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *grpcTunnel { t := grpcTunnel{ stream: stream,