Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 237 additions & 3 deletions konnectivity-client/pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,32 @@ type Tunnel interface {
Done() <-chan struct{}
}

// ReusableTunnel is a Tunnel whose DialContext may be called many times.
// Each returned net.Conn is independent. The caller is responsible for
// calling Close to release the underlying gRPC ClientConn.
//
// A ReusableTunnel owns a single underlying *grpc.ClientConn whose lifetime
// matches the tunnel's; per-dial work happens on new gRPC Proxy streams over
// that shared connection rather than new gRPC ClientConn connections.
//
// Done semantics differ subtly from a single-use Tunnel: Done is closed only
// after Close has been called and all in-flight per-dial child streams have
// drained. It does not fire on remote unreachability alone; callers detect
// transport failure via DialContext errors (typed via GetDialFailureReason)
// and decide whether to Close and rebuild.
type ReusableTunnel interface {
Tunnel
// Close releases the tunnel's resources, including the underlying
// gRPC ClientConn. In-flight DialContext calls are cancelled.
//
// Close blocks until all per-dial child streams have drained and Done
// has fired. Concurrent Close calls all observe the same post-condition:
// every caller returns only after teardown is complete. The first
// caller's return value carries any error from closing the underlying
// ClientConn; subsequent callers return nil.
Close() error
}

type dialResult struct {
err *dialFailure
connid int64
Expand Down Expand Up @@ -179,21 +205,229 @@ func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context,
return nil, err
}

grpcClient := client.NewProxyServiceClient(c)
proxyClient := client.NewProxyServiceClient(c)

stream, err := grpcClient.Proxy(tunnelCtx)
tunnel, err := newSingleStreamTunnel(tunnelCtx, proxyClient, c.Close)
if err != nil {
c.Close()
return nil, err
}

tunnel := newUnstartedTunnel(stream, c)
return tunnel, nil
}

// newSingleStreamTunnel creates a Tunnel backed by a single Proxy stream on
// proxyClient. closeFn is invoked from the serve() goroutine on exit (via the
// clientConn.Close interface), in place of the previous unconditional
// grpcConn.Close().
//
// tunnelCtx bounds both Proxy() RPC establishment and the resulting stream's
// serve goroutine, matching the existing CreateSingleUseGrpcTunnelWithContext
// semantics. Callers wanting to bound only establishment must do so themselves.
func newSingleStreamTunnel(tunnelCtx context.Context, proxyClient client.ProxyServiceClient, closeFn func() error) (Tunnel, error) {
stream, err := proxyClient.Proxy(tunnelCtx)
if err != nil {
return nil, err
}

tunnel := newUnstartedTunnel(stream, closerFunc(closeFn))

go tunnel.serve(tunnelCtx)

return tunnel, nil
}

// closerFunc adapts a func() error to the clientConn interface (which only
// requires Close() error).
type closerFunc func() error

func (c closerFunc) Close() error { return c() }

// reusableGrpcTunnel implements ReusableTunnel with one shared *grpc.ClientConn
// and one Proxy stream per DialContext call.
type reusableGrpcTunnel struct {
cc clientConn
proxyClient client.ProxyServiceClient

// tunnelCtx is the parent of every per-dial stream context. Cancelling it
// (during Close) aborts all in-flight streams.
tunnelCtx context.Context
tunnelCancel context.CancelFunc

// acceptMu guards `closing` and synchronizes "is the tunnel accepting
// children?" with children.Add(1). Close flips closing under the same
// mutex before calling children.Wait, eliminating the Add-vs-Wait race
// (sync.WaitGroup requires that positive Adds happen-before any Wait
// when the counter is zero).
acceptMu sync.Mutex
closing bool

children sync.WaitGroup
done chan struct{}
}

// CreateGRPCTunnel creates a reusable Tunnel to a konnectivity proxy at
// address. The returned ReusableTunnel owns a single underlying
// *grpc.ClientConn whose lifetime matches the tunnel's. Callers control
// transport (UDS, TCP, mTLS, ...) via grpc.DialOptions.
//
// konnectivity-client takes no opinion on remote-failure detection,
// reconnect, or load balancing. Callers detect transport failure via
// DialContext errors (typed via GetDialFailureReason) and decide whether
// to Close and rebuild. For TLS cert reload or load balancing across proxy
// replicas, configure the underlying gRPC ClientConn with appropriate
// DialOptions, such as dynamic TransportCredentials or
// grpc.WithDefaultServiceConfig.
//
// MaxConcurrentStreams on both client and server bounds the per-tunnel
// parallelism; phase 1 does not change defaults.
func CreateGRPCTunnel(ctx context.Context, address string, opts ...grpc.DialOption) (ReusableTunnel, error) {
cc, err := grpc.DialContext(ctx, address, opts...)
if err != nil {
return nil, err
}
return newReusableGrpcTunnel(cc, client.NewProxyServiceClient(cc)), nil
}

// newReusableGrpcTunnel constructs a reusableGrpcTunnel from an already-dialed
// connection and its corresponding ProxyServiceClient. Split out from
// CreateGRPCTunnel so tests can inject a fake clientConn and a fake
// ProxyServiceClient backed by the existing fake-stream harness.
func newReusableGrpcTunnel(cc clientConn, proxyClient client.ProxyServiceClient) *reusableGrpcTunnel {
tunnelCtx, cancel := context.WithCancel(context.Background())
return &reusableGrpcTunnel{
cc: cc,
proxyClient: proxyClient,
tunnelCtx: tunnelCtx,
tunnelCancel: cancel,
done: make(chan struct{}),
}
}

func (r *reusableGrpcTunnel) DialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) {
// Pre-validate the protocol before allocating a stream, matching the
// existing grpcTunnel.dialContext check. Observe as DialFailureUnknown
// to match the existing single-use behavior; the existing path also
// returns a plain errors.New here, which GetDialFailureReason classifies
// as Unknown.
if protocol != "tcp" {
metrics.Metrics.ObserveDialFailure(metrics.DialFailureUnknown)
return nil, errors.New("protocol not supported")
}

// Atomically check that we are accepting and reserve a child slot.
// The mutex (not an atomic flag) is required because sync.WaitGroup.Add
// must happen-before any Wait when the counter is zero; checking
// `closing` and calling `children.Add(1)` must be a single critical
// section with respect to Close.
r.acceptMu.Lock()
if r.closing {
r.acceptMu.Unlock()
// Typed so callers (k/k connector) see this as a transport-class
// failure (the tunnel is gone; the caller should rebuild).
metrics.Metrics.ObserveDialFailure(metrics.DialFailureTunnelClosed)
return nil, &dialFailure{"tunnel closed", metrics.DialFailureTunnelClosed}
}
r.children.Add(1)
r.acceptMu.Unlock()

streamCtx, streamCancel := context.WithCancel(r.tunnelCtx)

// Race Proxy() establishment against requestCtx so callers' dial timeouts
// are honored. Cancelling streamCtx aborts the in-flight Proxy() inside
// newSingleStreamTunnel.
type result struct {
t Tunnel
err error
}
resCh := make(chan result, 1)
go func() {
t, err := newSingleStreamTunnel(streamCtx, r.proxyClient, func() error {
streamCancel()
return nil
})
resCh <- result{t, err}
}()

var inner Tunnel
select {
case res := <-resCh:
if res.err != nil {
streamCancel()
r.children.Done()
// Typed dialFailure so callers' shouldInvalidateTunnel can
// distinguish stream-setup failure from per-dial backend errors.
metrics.Metrics.ObserveDialFailure(metrics.DialFailureStreamSetup)
return nil, &dialFailure{res.err.Error(), metrics.DialFailureStreamSetup}
}
inner = res.t
case <-requestCtx.Done():
streamCancel()
// Drain the goroutine. If newSingleStreamTunnel completed
// successfully just before requestCtx fired, an inner Tunnel exists
// whose serve goroutine is winding down; track it so children.Done
// fires only after its serve exits, preserving the Done contract.
res := <-resCh
if res.t != nil {
go func() {
<-res.t.Done()
r.children.Done()
}()
} else {
r.children.Done()
}
// Typed as DialFailureContext (matches existing single-use path)
// so callers treat this as per-dial, not transport-class. Observed
// here because this failure happens before inner.DialContext, so
// the existing per-dial metric path will not see it.
metrics.Metrics.ObserveDialFailure(metrics.DialFailureContext)
return nil, &dialFailure{"dial timeout, context", metrics.DialFailureContext}
}

// Decrement when the per-dial stream's serve loop exits.
go func() {
<-inner.Done()
r.children.Done()
}()

// inner.DialContext honors requestCtx for the DIAL_REQ/DIAL_RSP exchange.
// Errors returned here are typed dialFailures from the existing path
// (DialFailureEndpoint, DialFailureContext, DialFailureTimeout, ...).
//
// Some inner.DialContext error paths (notably Send failure inside the
// existing grpcTunnel.dialContext) do not call closeTunnel themselves.
// We must cancel the stream context on every error to guarantee the
// per-dial stream's serve goroutine exits, so children.Done can fire.
conn, err := inner.DialContext(requestCtx, protocol, address)
if err != nil {
streamCancel()
return nil, err
}
return conn, nil
}

func (r *reusableGrpcTunnel) Done() <-chan struct{} { return r.done }

func (r *reusableGrpcTunnel) Close() error {
r.acceptMu.Lock()
if r.closing {
r.acceptMu.Unlock()
// Already closing. Block until the first caller finishes draining
// so all callers observe the same post-condition: resources
// released, Done fired. Documented on ReusableTunnel.Close.
<-r.done
return nil
}
r.closing = true
r.acceptMu.Unlock()

r.tunnelCancel() // cancel all child stream contexts
r.children.Wait() // safe: no new Add can race because closing is set under acceptMu
err := r.cc.Close()
close(r.done)
return err
}

func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *grpcTunnel {
t := grpcTunnel{
stream: stream,
Expand Down
7 changes: 7 additions & 0 deletions konnectivity-client/pkg/client/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ const (
DialFailureTunnelClosed DialFailureReason = "tunnelclosed"
// DialFailureAlreadyStarted indicates that a single-use tunnel dialer was already used once.
DialFailureAlreadyStarted DialFailureReason = "tunnelstarted"
// DialFailureStreamSetup indicates that opening a new Proxy stream on a
// reusable tunnel's shared gRPC connection failed before any DIAL_REQ was
// sent. Distinct from DialFailureEndpoint (backend dial) and
// DialFailureContext (caller-side cancellation) so callers can selectively
// invalidate the shared transport on stream-setup failures while leaving
// it intact on backend or caller-class failures.
DialFailureStreamSetup DialFailureReason = "streamsetup"
)

type ClientConnectionStatus string
Expand Down
Loading
Loading