Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions konnectivity-client/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,44 @@ func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context,
return nil, err
}

grpcClient := client.NewProxyServiceClient(c)
proxyClient := client.NewProxyServiceClient(c)

stream, err := grpcClient.Proxy(tunnelCtx)
tunnel, err := newSingleStreamTunnel(tunnelCtx, proxyClient, c.Close)
if err != nil {
c.Close()
return nil, err
}

tunnel := newUnstartedTunnel(stream, c)
return tunnel, nil
}

// newSingleStreamTunnel creates a Tunnel backed by a single Proxy stream on
// proxyClient. closeFn is invoked from the serve() goroutine on exit (via the
// clientConn.Close interface), in place of the previous unconditional
// grpcConn.Close().
//
// tunnelCtx bounds both Proxy() RPC establishment and the resulting stream's
// serve goroutine, matching the existing CreateSingleUseGrpcTunnelWithContext
// semantics. Callers wanting to bound only establishment must do so themselves.
func newSingleStreamTunnel(tunnelCtx context.Context, proxyClient client.ProxyServiceClient, closeFn func() error) (Tunnel, error) {
stream, err := proxyClient.Proxy(tunnelCtx)
if err != nil {
return nil, err
}

tunnel := newUnstartedTunnel(stream, closerFunc(closeFn))

go tunnel.serve(tunnelCtx)

return tunnel, nil
}

// closerFunc adapts a func() error to the clientConn interface (which only
// requires Close() error).
type closerFunc func() error

func (c closerFunc) Close() error { return c() }

func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *grpcTunnel {
t := grpcTunnel{
stream: stream,
Expand Down
Loading