Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 90 additions & 35 deletions pkg/server/backend_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,12 @@ type DefaultBackendStorage struct {
// randomly pick a key from a map (in this case, the backends) in
// Golang.
agentIDs []string
random *rand.Rand
// nonDrainingIDs tracks identifiers whose primary backend is not draining.
// Used to avoid scanning all identifiers on every selection.
nonDrainingIDs []string
// nonDrainingIndex tracks index positions in nonDrainingIDs for O(1) removals.
nonDrainingIndex map[string]int
random *rand.Rand
// idTypes contains the valid identifier types for this
// DefaultBackendStorage. The DefaultBackendStorage may only tolerate certain
// types of identifiers when associating to a specific BackendManager,
Expand Down Expand Up @@ -244,26 +249,69 @@ func NewDefaultBackendStorage(idTypes []header.IdentifierType, proxyStrategy pro
metrics.Metrics.SetTotalBackendCount(proxyStrategy, 0)

return &DefaultBackendStorage{
backends: make(map[string][]*Backend),
random: rand.New(rand.NewSource(time.Now().UnixNano())), /* #nosec G404 */
idTypes: idTypes,
proxyStrategy: proxyStrategy,
backends: make(map[string][]*Backend),
nonDrainingIndex: make(map[string]int),
random: rand.New(rand.NewSource(time.Now().UnixNano())), /* #nosec G404 */
idTypes: idTypes,
proxyStrategy: proxyStrategy,
}
}

func containIDType(idTypes []header.IdentifierType, idType header.IdentifierType) bool {
return slices.Contains(idTypes, idType)
}

func (s *DefaultBackendStorage) addNonDrainingIdentifierLocked(identifier string) {
if _, ok := s.nonDrainingIndex[identifier]; ok {
return
}
s.nonDrainingIDs = append(s.nonDrainingIDs, identifier)
s.nonDrainingIndex[identifier] = len(s.nonDrainingIDs) - 1
}

func (s *DefaultBackendStorage) removeNonDrainingIdentifierLocked(identifier string) {
idx, ok := s.nonDrainingIndex[identifier]
if !ok {
return
}
lastIdx := len(s.nonDrainingIDs) - 1
if idx != lastIdx {
lastIdentifier := s.nonDrainingIDs[lastIdx]
s.nonDrainingIDs[idx] = lastIdentifier
s.nonDrainingIndex[lastIdentifier] = idx
}
s.nonDrainingIDs = s.nonDrainingIDs[:lastIdx]
delete(s.nonDrainingIndex, identifier)
}

func (s *DefaultBackendStorage) refreshNonDrainingIdentifierLocked(identifier string) {
backends, ok := s.backends[identifier]
if !ok || len(backends) == 0 {
s.removeNonDrainingIdentifierLocked(identifier)
return
}

// For a given identifier, routing always uses the first backend.
if backends[0].IsDraining() {
s.removeNonDrainingIdentifierLocked(identifier)
return
}
s.addNonDrainingIdentifierLocked(identifier)
}

// addBackend adds a backend.
func (s *DefaultBackendStorage) addBackend(identifier string, idType header.IdentifierType, backend *Backend) {
s.mu.Lock()
defer s.mu.Unlock()
s.addBackendLocked(identifier, idType, backend)
}

func (s *DefaultBackendStorage) addBackendLocked(identifier string, idType header.IdentifierType, backend *Backend) {
if !containIDType(s.idTypes, idType) {
klog.V(3).InfoS("fail to add backend", "backend", identifier, "error", &ErrWrongIDType{idType, s.idTypes})
return
}
klog.V(2).InfoS("Register backend for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
_, ok := s.backends[identifier]
if ok {
for _, b := range s.backends[identifier] {
Expand All @@ -276,44 +324,56 @@ func (s *DefaultBackendStorage) addBackend(identifier string, idType header.Iden
return
}
s.backends[identifier] = []*Backend{backend}
s.refreshNonDrainingIdentifierLocked(identifier)
metrics.Metrics.SetBackendCountDeprecated(len(s.backends))
metrics.Metrics.SetTotalBackendCount(s.proxyStrategy, len(s.backends))
s.agentIDs = append(s.agentIDs, identifier)
}

// removeBackend removes a backend.
func (s *DefaultBackendStorage) removeBackend(identifier string, idType header.IdentifierType, backend *Backend) {
s.mu.Lock()
defer s.mu.Unlock()
s.removeBackendLocked(identifier, idType, backend)
}

func (s *DefaultBackendStorage) removeBackendLocked(identifier string, idType header.IdentifierType, backend *Backend) {
if !containIDType(s.idTypes, idType) {
klog.ErrorS(&ErrWrongIDType{idType, s.idTypes}, "fail to remove backend")
return
}
klog.V(2).InfoS("Remove connection for agent", "agentID", identifier)
s.mu.Lock()
defer s.mu.Unlock()
backends, ok := s.backends[identifier]
if !ok {
klog.V(1).InfoS("Cannot find agent in backends", "identifier", identifier)
return
}
var found bool
var removedFirst bool
for i, b := range backends {
if b == backend {
s.backends[identifier] = append(s.backends[identifier][:i], s.backends[identifier][i+1:]...)
if i == 0 && len(s.backends[identifier]) != 0 {
klog.V(1).InfoS("This should not happen. Removed connection that is not the first connection", "agentID", identifier)
}
if i == 0 {
removedFirst = true
}
found = true
}
}
if len(s.backends[identifier]) == 0 {
delete(s.backends, identifier)
s.removeNonDrainingIdentifierLocked(identifier)
for i := range s.agentIDs {
if s.agentIDs[i] == identifier {
s.agentIDs[i] = s.agentIDs[len(s.agentIDs)-1]
s.agentIDs = s.agentIDs[:len(s.agentIDs)-1]
break
}
}
} else if removedFirst {
s.refreshNonDrainingIdentifierLocked(identifier)
}
if !found {
klog.V(1).InfoS("Could not find connection matching identifier to remove", "agentID", identifier, "idType", idType)
Expand Down Expand Up @@ -361,35 +421,30 @@ func (s *DefaultBackendStorage) GetRandomBackend() (*Backend, error) {
return nil, &ErrNotFound{}
}

var firstDrainingBackend *Backend

// Start at a random agent and check each agent in sequence
startIdx := s.random.Intn(len(s.agentIDs))
for i := 0; i < len(s.agentIDs); i++ {
// Wrap around using modulo
currentIdx := (startIdx + i) % len(s.agentIDs)
agentID := s.agentIDs[currentIdx]
// always return the first connection to an agent, because the agent
// will close later connections if there are multiple.
backend := s.backends[agentID][0]

if !backend.IsDraining() {
klog.V(3).InfoS("Pick agent as backend", "agentID", agentID)
return backend, nil
// Prefer random selection from known non-draining identifiers.
// This avoids a full scan on every selection.
for len(s.nonDrainingIDs) > 0 {
idx := s.random.Intn(len(s.nonDrainingIDs))
identifier := s.nonDrainingIDs[idx]
backends, ok := s.backends[identifier]
if !ok || len(backends) == 0 {
s.removeNonDrainingIdentifierLocked(identifier)
continue
}

// Keep track of first draining backend as fallback
if firstDrainingBackend == nil {
firstDrainingBackend = backend
backend := backends[0]
// A backend may have transitioned to draining since the pool was updated.
// Remove stale entries lazily and retry.
if backend.IsDraining() {
s.removeNonDrainingIdentifierLocked(identifier)
continue
}
klog.V(3).InfoS("Pick agent as backend", "agentID", identifier)
return backend, nil
}

// All agents are draining, use one as fallback
if firstDrainingBackend != nil {
agentID := firstDrainingBackend.id
klog.V(3).InfoS("No non-draining backends available, using draining backend as fallback", "agentID", agentID)
return firstDrainingBackend, nil
}

return nil, &ErrNotFound{}
agentID := s.agentIDs[s.random.Intn(len(s.agentIDs))]
backend := s.backends[agentID][0]
klog.V(3).InfoS("No non-draining backends available, using draining backend as fallback", "agentID", agentID)
return backend, nil
}
Loading
Loading