Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions keps/2724-topology-aware-scheduling/kep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ feature-gates:
- name: TASReplaceNodeOnNodeTaints
- name: TASMultiLayerTopology
- name: TASRespectNodeAffinityPreferred
- name: TASHandleOverlappingFlavors
disable-supported: true

# The following PRR answers are required at beta release
Expand Down
24 changes: 22 additions & 2 deletions pkg/cache/scheduler/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scheduler
import (
"iter"
"maps"
"slices"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -198,13 +199,32 @@ func (c *ClusterQueueSnapshot) FindTopologyAssignmentsForWorkload(
option(opts)
}

// One shared assumedUsage map per TopologyName for this workload. PodSets
// landing on different sibling flavors (same Topology, hostname leaf)
// reserve against the same map, preventing intra-workload self-overlap on
// a shared physical node. Cache-write-time aggregation does not cover
// in-flight reservations because pending workloads have not hit addUsage.
Comment thread
tenzen-y marked this conversation as resolved.
sharedAssumed := make(map[kueue.TopologyReference]map[utiltas.TopologyDomainID]resources.Requests)

result := make(TASAssignmentsResult)
for tasFlavor, flavorTASRequests := range tasRequestsByFlavor {
// Iterate flavors in deterministic order so the shared assumedUsage map
// produces reproducible placements across scheduling cycles. Go map
// iteration order is randomized otherwise.
for _, tasFlavor := range slices.Sorted(maps.Keys(tasRequestsByFlavor)) {
flavorTASRequests := tasRequestsByFlavor[tasFlavor]
// We assume the `tasFlavor` is already in the snapshot as this was
// already checked earlier during flavor assignment, and the set of
// flavors is immutable in snapshot.
tasFlavorCache := c.TASFlavors[tasFlavor]
flvResult := tasFlavorCache.FindTopologyAssignmentsForFlavor(flavorTASRequests, options...)
flvOpts := options
if tasFlavorCache.isLowestLevelNode {
topo := tasFlavorCache.topologyName
if sharedAssumed[topo] == nil {
sharedAssumed[topo] = make(map[utiltas.TopologyDomainID]resources.Requests)
}
flvOpts = append(slices.Clone(options), WithSharedAssumedUsage(sharedAssumed[topo]))
}
flvResult := tasFlavorCache.FindTopologyAssignmentsForFlavor(flavorTASRequests, flvOpts...)
maps.Copy(result, flvResult)
}
return result
Expand Down
37 changes: 35 additions & 2 deletions pkg/cache/scheduler/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ import (
"sigs.k8s.io/kueue/pkg/cache/hierarchy"
queueafs "sigs.k8s.io/kueue/pkg/cache/queue/afs"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/resources"
afs "sigs.k8s.io/kueue/pkg/util/admissionfairsharing"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -192,8 +194,15 @@ func (c *Cache) Snapshot(ctx context.Context, options ...SnapshotOption) (*Snaps
}
tasSnapshots := make(map[kueue.ResourceFlavorReference]*TASFlavorSnapshot)
if features.Enabled(features.TopologyAwareScheduling) {
for flavor, cache := range c.tasCache.Clone() {
tasSnapshots[flavor] = cache.snapshot(log, c.tasCache.nodesCache.find(cache.flavor.NodeLabels, cache.topology.Levels))
sharedSnapshotUsage := c.snapshotTopologyUsages()
flavorClone := c.tasCache.Clone()
for flavor, cache := range flavorClone {
nodes := c.tasCache.nodesCache.find(cache.flavor.NodeLabels, cache.topology.Levels)
var sharedUsage map[utiltas.TopologyDomainID]resources.Requests
if cache.topology.Usage != nil {
Comment thread
tenzen-y marked this conversation as resolved.
sharedUsage = sharedSnapshotUsage[cache.flavor.TopologyName]
}
tasSnapshots[flavor] = cache.snapshot(log, nodes, sharedUsage)
}
}
for _, cq := range cqNames {
Expand Down Expand Up @@ -238,6 +247,30 @@ func skipInactiveCQReason(cq *clusterQueue) inactiveCQReason {
return ""
}

// snapshotTopologyUsages takes snapshots from topologies[T].Usage
// once per eligible Topology so every sibling-flavor snapshot for T aliases the same fresh map.
// In-cycle mutations (preemption, fair-sharing) therefore propagate
// cross-flavor but do not leak back to the live cache.
func (c *Cache) snapshotTopologyUsages() map[kueue.TopologyReference]map[utiltas.TopologyDomainID]resources.Requests {
c.tasCache.RLock()
defer c.tasCache.RUnlock()

topologyUsages := make(map[kueue.TopologyReference]map[utiltas.TopologyDomainID]resources.Requests)
for topologyName, info := range c.tasCache.topologies {
if info.Usage == nil {
continue
}
clonedUsage := make(map[utiltas.TopologyDomainID]resources.Requests, info.Usage.Len())

info.Usage.Range(func(domainID utiltas.TopologyDomainID, req resources.Requests) bool {
clonedUsage[domainID] = req.Clone()
return true
})
topologyUsages[topologyName] = clonedUsage
}
return topologyUsages
}

// snapshotClusterQueue creates a copy of ClusterQueue that includes
// references to immutable objects and deep copies of changing ones.
func (c *Cache) snapshotClusterQueue(
Expand Down
4 changes: 4 additions & 0 deletions pkg/cache/scheduler/tas_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2"
"sigs.k8s.io/kueue/pkg/resources"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
utiltas "sigs.k8s.io/kueue/pkg/util/tas"
)

Expand Down Expand Up @@ -95,6 +96,9 @@ func (t *tasCache) AddTopology(topology *kueue.Topology) {
tInfo := topologyInformation{
Levels: utiltas.Levels(topology),
}
if len(tInfo.Levels) > 0 && utiltas.IsLowestLevelHostname(tInfo.Levels) {
tInfo.Usage = utilmaps.NewSyncMap[utiltas.TopologyDomainID, resources.Requests](0)
}
t.topologies[name] = tInfo
for fName, flavorInfo := range t.flavors {
if flavorInfo.TopologyName == name {
Expand Down
192 changes: 170 additions & 22 deletions pkg/cache/scheduler/tas_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
tasindexer "sigs.k8s.io/kueue/pkg/controller/tas/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/resources"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/util/tas"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
utiltestingapi "sigs.k8s.io/kueue/pkg/util/testing/v1beta2"
testingnode "sigs.k8s.io/kueue/pkg/util/testingjobs/node"
testingpod "sigs.k8s.io/kueue/pkg/util/testingjobs/pod"
"sigs.k8s.io/kueue/pkg/workload"
)

// PodSetTestCase defines a test case for a single podset in the consolidated test.
Expand Down Expand Up @@ -382,12 +384,14 @@ func TestFindTopologyAssignments(t *testing.T) {
}

cases := map[string]struct {
featureGates map[featuregate.Feature]bool
nodes []corev1.Node
pods []corev1.Pod
levels []string
nodeLabels map[string]string
podSets []PodSetTestCase
featureGates map[featuregate.Feature]bool
nodes []corev1.Node
pods []corev1.Pod
levels []string
nodeLabels map[string]string
sharedUsage map[tas.TopologyDomainID]resources.Requests
priorFlavorUsage []workload.TopologyDomainRequests
podSets []PodSetTestCase
}{
"minimize the number of used racks before optimizing the number of nodes; BestFit": {
// Solution by optimizing the number of racks then nodes: [r3]: [x1,x6,x2,x4]
Expand Down Expand Up @@ -7074,6 +7078,56 @@ func TestFindTopologyAssignments(t *testing.T) {
},
},
},
"sibling-flavor usage on the shared hostname reduces remaining capacity": {
levels: []string{tasBlockLabel, corev1.LabelHostname},
nodes: defaultNodes,
nodeLabels: map[string]string{},
priorFlavorUsage: []workload.TopologyDomainRequests{
{
Values: []string{"x3"},
SinglePodRequests: resources.Requests{corev1.ResourceCPU: 1000},
Count: 1,
},
},
podSets: []PodSetTestCase{
{
podSetName: "main",
topologyRequest: &kueue.PodSetTopologyRequest{
Required: ptr.To(corev1.LabelHostname),
},
requests: resources.Requests{corev1.ResourceCPU: 1000},
count: 1,
wantAssignment: &tas.TopologyAssignment{
Levels: []string{corev1.LabelHostname},
Domains: []tas.TopologyDomainAssignment{{Count: 1, Values: []string{"x1"}}},
},
},
},
},
"non-hostname-leaf topology: nil sharedUsage preserves per-flavor behavior": {
// Topology lowest level is rack (not hostname). In the new model
// topologyInformation.Usage is nil for such topologies, so
// NewTASFlavorCache gives each flavor a private map. The harness
// leaves tc.sharedUsage nil. The snapshot() call then takes the
// per-flavor branch and iterates c.usage (empty here).
levels: []string{tasBlockLabel, tasRackLabel},
nodes: defaultNodes,
nodeLabels: map[string]string{},
podSets: []PodSetTestCase{
{
podSetName: "main",
topologyRequest: &kueue.PodSetTopologyRequest{
Required: ptr.To(tasRackLabel),
},
requests: resources.Requests{corev1.ResourceCPU: 1000},
count: 1,
wantAssignment: &tas.TopologyAssignment{
Levels: []string{tasBlockLabel, tasRackLabel},
Domains: []tas.TopologyDomainAssignment{{Count: 1, Values: []string{"b1", "r1"}}},
},
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand All @@ -7100,6 +7154,9 @@ func TestFindTopologyAssignments(t *testing.T) {
topologyInformation := topologyInformation{
Levels: tc.levels,
}
if len(topologyInformation.Levels) > 0 && tas.IsLowestLevelHostname(topologyInformation.Levels) {
topologyInformation.Usage = utilmaps.NewSyncMap[tas.TopologyDomainID, resources.Requests](0)
}
flavorInformation := flavorInformation{
TopologyName: "default",
NodeLabels: tc.nodeLabels,
Expand All @@ -7108,7 +7165,22 @@ func TestFindTopologyAssignments(t *testing.T) {
tasCache.Update(&pod, log)
}
tasFlavorCache := tasCache.NewTASFlavorCache(topologyInformation, flavorInformation)
snapshot := tasFlavorCache.snapshot(log, tasCache.nodesCache.find(tasFlavorCache.flavor.NodeLabels, tasFlavorCache.topology.Levels))
// Drive shared-usage state via a sibling TASFlavorCache + real
// addUsage call, then clone like Cache.Snapshot does in prod.
if tc.priorFlavorUsage != nil {
siblingFlavorCache := tasCache.NewTASFlavorCache(topologyInformation, flavorInformation)
siblingFlavorCache.addUsage("prior-wl", tc.priorFlavorUsage)
usageLength := topologyInformation.Usage.Len()
if usageLength != 0 {
clonedUsage := make(map[tas.TopologyDomainID]resources.Requests, usageLength)
topologyInformation.Usage.Range(func(domainID tas.TopologyDomainID, req resources.Requests) bool {
clonedUsage[domainID] = req.Clone()
return true
})
tc.sharedUsage = clonedUsage
}
}
snapshot := tasFlavorCache.snapshot(log, tasCache.nodesCache.find(tasFlavorCache.flavor.NodeLabels, tasFlavorCache.topology.Levels), tc.sharedUsage)
flavorTASRequests := make([]TASPodSetRequests, 0, len(tc.podSets))
wantResult := make(TASAssignmentsResult)
for _, ps := range tc.podSets {
Expand Down Expand Up @@ -7168,16 +7240,18 @@ func TestFindTopologyAssignmentsMultiLayerReplacement(t *testing.T) {
podSetName := kueue.PodSetReference("main")

cases := map[string]struct {
levels []string
nodes []corev1.Node
pods []corev1.Pod
existingTA *kueue.TopologyAssignment
admissionCount int32
unhealthyNode string
topologyRequest *kueue.PodSetTopologyRequest
count int32
wantAssignment *tas.TopologyAssignment
wantReason string
levels []string
nodes []corev1.Node
pods []corev1.Pod
existingTA *kueue.TopologyAssignment
admissionCount int32
unhealthyNode string
topologyRequest *kueue.PodSetTopologyRequest
count int32
sharedUsage map[tas.TopologyDomainID]resources.Requests
priorFlavorUsage []workload.TopologyDomainRequests
wantAssignment *tas.TopologyAssignment
wantReason string
}{
"replace unhealthy node in incomplete rack slice": {
// b1
Expand Down Expand Up @@ -7441,6 +7515,62 @@ func TestFindTopologyAssignmentsMultiLayerReplacement(t *testing.T) {
// Without fix: sliceSize=1, scatters x4(1)+x5(1) → wrongly succeeds.
wantReason: `topology "default" doesn't allow to fit any of 1 slice(s). Total nodes: 4; excluded: topologyDomain: 2`,
},
"sibling-flavor sharedUsage on replacement candidate blocks the replacement": {
// b1
// / \
// r1 r2
// / \ / \
// x1 x2 x3 x4
// ^(NotReady)
//
// x4 has 1 CPU allocatable. priorFlavorUsage drives a sibling
// TASFlavorCache's addUsage that consumes 1 CPU on x4 through
// the real write path. The harness clones the resulting shared
// map into tc.sharedUsage. x4 is the only candidate in r2 for
// replacing x3, and its effective capacity is 0 CPU, so
// replacement must fail with the no-capacity reason.
nodes: []corev1.Node{
*testingnode.MakeNode("b1-r1-x1").
Label(tasBlockLabel, "b1").Label(tasRackLabel, "r1").Label(corev1.LabelHostname, "x1").
StatusAllocatable(corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourcePods: resource.MustParse("10")}).
Ready().Obj(),
*testingnode.MakeNode("b1-r1-x2").
Label(tasBlockLabel, "b1").Label(tasRackLabel, "r1").Label(corev1.LabelHostname, "x2").
StatusAllocatable(corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourcePods: resource.MustParse("10")}).
Ready().Obj(),
*testingnode.MakeNode("b1-r2-x3").
Label(tasBlockLabel, "b1").Label(tasRackLabel, "r2").Label(corev1.LabelHostname, "x3").
StatusAllocatable(corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourcePods: resource.MustParse("10")}).
NotReady().Obj(),
*testingnode.MakeNode("b1-r2-x4").
Label(tasBlockLabel, "b1").Label(tasRackLabel, "r2").Label(corev1.LabelHostname, "x4").
StatusAllocatable(corev1.ResourceList{corev1.ResourceCPU: resource.MustParse("1"), corev1.ResourcePods: resource.MustParse("10")}).
Ready().Obj(),
},
existingTA: utiltestingapi.MakeTopologyAssignment([]string{corev1.LabelHostname}).
Domain(tas.TopologyDomainAssignment{Count: 1, Values: []string{"x1"}}).
Domain(tas.TopologyDomainAssignment{Count: 1, Values: []string{"x2"}}).
Domain(tas.TopologyDomainAssignment{Count: 1, Values: []string{"x3"}}).
Domain(tas.TopologyDomainAssignment{Count: 1, Values: []string{"x4"}}).
Obj(),
admissionCount: 4,
unhealthyNode: "x3",
topologyRequest: &kueue.PodSetTopologyRequest{
Required: ptr.To(tasBlockLabel),
PodsetSliceRequiredTopologyConstraints: []kueue.PodsetSliceRequiredTopologyConstraint{
{Topology: tasRackLabel, Size: 2},
},
},
count: 4,
priorFlavorUsage: []workload.TopologyDomainRequests{
{
Values: []string{"x4"},
SinglePodRequests: resources.Requests{corev1.ResourceCPU: 1000},
Count: 1,
},
},
wantReason: `topology "default" doesn't allow to fit any of 1 pod(s). Total nodes: 3; excluded: resource "cpu": 1, topologyDomain: 2`,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -7490,12 +7620,30 @@ func TestFindTopologyAssignmentsMultiLayerReplacement(t *testing.T) {
if tcLevels == nil {
tcLevels = defaultLevels
}
tasFlavorCache := tasCache.NewTASFlavorCache(
topologyInformation{Levels: tcLevels},
flavorInformation{TopologyName: "default"},
)
topologyInformation := topologyInformation{Levels: tcLevels}
if len(topologyInformation.Levels) > 0 && tas.IsLowestLevelHostname(topologyInformation.Levels) {
topologyInformation.Usage = utilmaps.NewSyncMap[tas.TopologyDomainID, resources.Requests](0)
}
flavorInformation := flavorInformation{TopologyName: "default"}
tasFlavorCache := tasCache.NewTASFlavorCache(topologyInformation, flavorInformation)

// Drive shared-usage state via a sibling TASFlavorCache + real
// addUsage call, then clone like Cache.Snapshot does in prod.
if tc.priorFlavorUsage != nil {
siblingFlavorCache := tasCache.NewTASFlavorCache(topologyInformation, flavorInformation)
siblingFlavorCache.addUsage("prior-wl", tc.priorFlavorUsage)
usageLength := topologyInformation.Usage.Len()
if usageLength != 0 {
clonedUsage := make(map[tas.TopologyDomainID]resources.Requests, usageLength)
topologyInformation.Usage.Range(func(domainID tas.TopologyDomainID, req resources.Requests) bool {
clonedUsage[domainID] = req.Clone()
return true
})
tc.sharedUsage = clonedUsage
}
}

snapshot := tasFlavorCache.snapshot(log, tasCache.nodesCache.find(tasFlavorCache.flavor.NodeLabels, tasFlavorCache.topology.Levels))
snapshot := tasFlavorCache.snapshot(log, tasCache.nodesCache.find(tasFlavorCache.flavor.NodeLabels, tasFlavorCache.topology.Levels), tc.sharedUsage)
result := snapshot.FindTopologyAssignmentsForFlavor(flavorTASRequests, WithWorkload(wl))

psResult, ok := result[podSetName]
Expand Down
Loading