Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [FEATURE] Querier: Implement Resource Based Throttling in Querier. #7442
* [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
* [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
* [ENHANCEMENT] Distributor: Introduce dynamic `Symbols` slice capacity pooling. #7398 #7401
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4693,6 +4693,18 @@ The `memberlist_config` configures the Gossip memberlist.
# CLI flag: -memberlist.packet-write-timeout
[packet_write_timeout: <duration> | default = 5s]

# Timeout for reading packet data from inbound connections. 0 = no limit.
# CLI flag: -memberlist.packet-read-timeout
[packet_read_timeout: <duration> | default = 5s]

# Maximum size in bytes of an inbound gossip packet. 0 = no limit.
# CLI flag: -memberlist.max-packet-size
[max_packet_size: <int> | default = 1048576]

# Maximum number of concurrent inbound TCP connections. 0 = no limit.
# CLI flag: -memberlist.max-concurrent-connections
[max_concurrent_connections: <int> | default = 100]

# Enable TLS on the memberlist transport layer.
# CLI flag: -memberlist.tls-enabled
[tls_enabled: <boolean> | default = false]
Expand Down
131 changes: 125 additions & 6 deletions pkg/ring/kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"golang.org/x/sync/semaphore"

"github.com/cortexproject/cortex/pkg/util/flagext"
cortextls "github.com/cortexproject/cortex/pkg/util/tls"
Expand Down Expand Up @@ -50,6 +51,15 @@ type TCPTransportConfig struct {
// Timeout for writing packet data. Zero = no timeout.
PacketWriteTimeout time.Duration `yaml:"packet_write_timeout"`

// Timeout for reading inbound packet data. Zero = no timeout.
PacketReadTimeout time.Duration `yaml:"packet_read_timeout"`

// Maximum size in bytes of a single inbound packet. Zero = no limit.
MaxPacketSize int64 `yaml:"max_packet_size"`

// Maximum number of concurrent inbound TCP connections. Zero = no limit.
MaxConcurrentConnections int `yaml:"max_concurrent_connections"`

// Transport logs lot of messages at debug level, so it deserves an extra flag for turning it on
TransportDebug bool `yaml:"-"`

Expand All @@ -72,6 +82,9 @@ func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix s
f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.")
f.DurationVar(&cfg.PacketDialTimeout, prefix+"memberlist.packet-dial-timeout", 5*time.Second, "Timeout used when connecting to other nodes to send packet.")
f.DurationVar(&cfg.PacketWriteTimeout, prefix+"memberlist.packet-write-timeout", 5*time.Second, "Timeout for writing 'packet' data.")
f.DurationVar(&cfg.PacketReadTimeout, prefix+"memberlist.packet-read-timeout", 5*time.Second, "Timeout for reading packet data from inbound connections. 0 = no limit.")
f.Int64Var(&cfg.MaxPacketSize, prefix+"memberlist.max-packet-size", 1*1024*1024 /*1MB*/, "Maximum size in bytes of an inbound gossip packet. 0 = no limit.")
f.IntVar(&cfg.MaxConcurrentConnections, prefix+"memberlist.max-concurrent-connections", 100, "Maximum number of concurrent inbound TCP connections. 0 = no limit.")
Comment thread
friedrichg marked this conversation as resolved.
f.BoolVar(&cfg.TransportDebug, prefix+"memberlist.transport-debug", false, "Log debug transport messages. Note: global log.level must be at debug level as well.")

f.BoolVar(&cfg.TLSEnabled, prefix+"memberlist.tls-enabled", false, "Enable TLS on the memberlist transport layer.")
Expand All @@ -90,6 +103,9 @@ type TCPTransport struct {
tcpListeners []net.Listener
tlsConfig *tls.Config

// connSemaphore limits the number of concurrent inbound TCP connections.
connSemaphore *semaphore.Weighted

shutdown atomic.Int32

advertiseMu sync.RWMutex
Expand All @@ -107,6 +123,9 @@ type TCPTransport struct {
sentPacketsBytes prometheus.Counter
sentPacketsErrors prometheus.Counter
unknownConnections prometheus.Counter
rejectedConnections prometheus.Counter
activeConnections prometheus.Gauge
packetReceiveDuration prometheus.Histogram
}

// NewTCPTransport returns a new tcp-based transport with the given configuration. On
Expand All @@ -125,6 +144,10 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
connCh: make(chan net.Conn),
}

if config.MaxConcurrentConnections > 0 {
t.connSemaphore = semaphore.NewWeighted(int64(config.MaxConcurrentConnections))
}

var err error
if config.TLSEnabled {
t.tlsConfig, err = config.TLS.GetTLSConfig()
Expand Down Expand Up @@ -222,7 +245,27 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) {
// No error, reset loop delay
loopDelay = 0

go t.handleConnection(conn)
// Enforce concurrent connection via semaphore.
if t.connSemaphore != nil {
if !t.connSemaphore.TryAcquire(1) {
t.rejectedConnections.Inc()
level.Debug(t.logger).Log("msg", "max concurrent connections reached, closing connection", "remote", conn.RemoteAddr())
_ = conn.Close()
continue
}
}

t.activeConnections.Inc()
go func() {
// handleConnection returns true when it wrapped the conn in a
// semaphoreConn and transferred ownership of the slot to that
// wrapper (stream path). In that case we must not release here.
semTransferred := t.handleConnection(conn)
if t.connSemaphore != nil && !semTransferred {
t.connSemaphore.Release(1)
}
t.activeConnections.Dec()
Comment thread
friedrichg marked this conversation as resolved.
Outdated
}()
}
}

Expand All @@ -235,7 +278,7 @@ func (t *TCPTransport) debugLog() log.Logger {
return noopLogger
}

func (t *TCPTransport) handleConnection(conn net.Conn) {
func (t *TCPTransport) handleConnection(conn net.Conn) (semTransferred bool) {
t.debugLog().Log("msg", "New connection", "addr", conn.RemoteAddr())

closeConn := true
Expand All @@ -245,6 +288,15 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
}
}()

// Apply a read deadline for the entire packet receive so that a slow or
// adversarial peer cannot hold the goroutine open indefinitely.
if t.cfg.PacketReadTimeout > 0 {
if err := conn.SetReadDeadline(time.Now().Add(t.cfg.PacketReadTimeout)); err != nil {
level.Warn(t.logger).Log("msg", "failed to set read deadline", "err", err, "remote", conn.RemoteAddr())
return
}
}

// let's read first byte, and determine what to do about this connection
msgType := []byte{0}
_, err := io.ReadFull(conn, msgType)
Expand All @@ -256,13 +308,28 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
if messageType(msgType[0]) == stream {
t.incomingStreams.Inc()

// hand over this connection to memberlist
// Stream connections are handed off to memberlist which manages them
// independently – clear the deadline so memberlist can use its own
// timeouts, then pass the connection over.
if t.cfg.PacketReadTimeout > 0 {
_ = conn.SetReadDeadline(time.Time{})
}

// hand over this connection to memberlist.
// If the semaphore is active, wrap the conn so that the slot is held
// for the real lifetime of the stream. The memberlist will close it.
closeConn = false
t.connCh <- conn
if t.connSemaphore != nil {
t.connCh <- &semaphoreConn{Conn: conn, sem: t.connSemaphore}
semTransferred = true
} else {
t.connCh <- conn
}
} else if messageType(msgType[0]) == packet {
// it's a memberlist "packet", which contains an address and data.
t.receivedPackets.Inc()

packetStart := time.Now()
// before reading packet, read the address
addrLengthBuf := []byte{0}
_, err := io.ReadFull(conn, addrLengthBuf)
Expand All @@ -280,14 +347,26 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
return
}

// read the rest to buffer -- this is the "packet" itself
buf, err := io.ReadAll(conn)
var reader io.Reader = conn
if t.cfg.MaxPacketSize > 0 {
// Read one byte beyond the limit so we can detect oversized packets.
reader = io.LimitReader(conn, t.cfg.MaxPacketSize+1)
}
buf, err := io.ReadAll(reader)
t.packetReceiveDuration.Observe(time.Since(packetStart).Seconds())
if err != nil {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "error while reading packet data", "err", err, "remote", conn.RemoteAddr())
return
}

// Reject oversized packets
if t.cfg.MaxPacketSize > 0 && int64(len(buf)) > t.cfg.MaxPacketSize {
t.receivedPacketsErrors.Inc()
level.Debug(t.logger).Log("msg", "packet too large, dropping", "size", len(buf), "max", t.cfg.MaxPacketSize, "remote", conn.RemoteAddr())
return
}

if len(buf) < md5.Size {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "not enough data received", "data_length", len(buf), "remote", conn.RemoteAddr())
Expand Down Expand Up @@ -318,6 +397,7 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
t.unknownConnections.Inc()
level.Error(t.logger).Log("msg", "unknown message type", "msgType", msgType, "remote", conn.RemoteAddr())
}
return
}

type addr string
Expand All @@ -330,6 +410,20 @@ func (a addr) String() string {
return string(a)
}

// semaphoreConn wraps a net.Conn and releases a semaphore slot exactly once
// when the connection is closed. It is used on the stream path to keep the
// concurrent-connection slot held for the real lifetime of the connection.
type semaphoreConn struct {
net.Conn
sem *semaphore.Weighted
once sync.Once
}

func (c *semaphoreConn) Close() error {
c.once.Do(func() { c.sem.Release(1) })
return c.Conn.Close()
}

func (t *TCPTransport) getConnection(addr string, timeout time.Duration) (net.Conn, error) {
if t.cfg.TLSEnabled {
return tls.DialWithDialer(&net.Dialer{Timeout: timeout}, "tcp", addr, t.tlsConfig)
Expand Down Expand Up @@ -634,4 +728,29 @@ func (t *TCPTransport) registerMetrics(registerer prometheus.Registerer) {
Name: "unknown_connections_total",
Help: "Number of unknown TCP connections (not a packet or stream)",
})

t.rejectedConnections = promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: t.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "rejected_connections_total",
Help: "Number of inbound TCP connections rejected because the concurrent connection limit was reached",
})

t.activeConnections = promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
Namespace: t.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "active_connections",
Help: "Current number of active inbound TCP connections.",
})

t.packetReceiveDuration = promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: t.cfg.MetricsNamespace,
Subsystem: subsystem,
Name: "packet_receive_duration_seconds",
Help: "Duration (in seconds) of inbound packet-type message reads.",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 1 * time.Hour,
})
}
Loading
Loading