Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .nojekyll
Empty file.
6 changes: 3 additions & 3 deletions bandwidth-operator/dist/chart/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ apiVersion: v2
name: vaclab-bandwidth-operator
description: Vaclab Kubernetes operator for bandwidth-aware workload scheduling in Vaclab environments
type: application
version: 0.1.8
appVersion: "0.1.8"
icon: "https://example.com/icon.png"
version: 0.2.1
appVersion: "0.2.1"
icon: "https://raw.githubusercontent.com/vacp2p/vaclab-2/feat/add_lab_components/extras/vac-logo-light-no-bg.png"
keywords:
- kubernetes
- operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ spec:
{{- end }}
command:
- /manager
image: katakuri100/vaclab-bandwidth-operator:v0.1.8
image: katakuri100/vaclab-bandwidth-operator:v0.2.1
imagePullPolicy: IfNotPresent
livenessProbe:
httpGet:
Expand Down
45 changes: 28 additions & 17 deletions bandwidth-operator/internal/controller/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ func (r *BandwidthReconciler) setDefaultBandwidthValues(bandwidth *networkingv1.
func (r *BandwidthReconciler) UpdateBandwidthStatus(ctx context.Context, bandwidth *networkingv1.Bandwidth) error {
// UpdatedAt is set by caller only when status actually changes
err := r.Status().Update(ctx, bandwidth)
// If conflict, return without error to allow retry
if errors.IsConflict(err) {
return nil
}
// Return conflict errors so controller can requeue
return err
}

Expand Down Expand Up @@ -173,22 +170,36 @@ func (r *BandwidthReconciler) createBandwidthResourcesForAllNodes(ctx context.Co
return err
}

// Now update the status after creation
// Now update the status after creation with retry on conflict
// Re-get the object to get the latest version
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &bw); err != nil {
return err
}

bw.Status.Status = networkingv1.Created
bw.Status.Capacity = bw.Spec.Capacity
bw.Status.UpdatedAt = metav1.NewTime(time.Now())
maxRetries := 3
for attempt := 0; attempt < maxRetries; attempt++ {
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &bw); err != nil {
if attempt == maxRetries-1 {
// Give up on Get failures, let reconcile loop handle it
break
}
time.Sleep(100 * time.Millisecond)
continue
}

if err := r.UpdateBandwidthStatus(ctx, &bw); err != nil {
return err
bw.Status.Status = networkingv1.Created
bw.Status.Capacity = bw.Spec.Capacity
bw.Status.UpdatedAt = metav1.NewTime(time.Now())

if err := r.Status().Update(ctx, &bw); err != nil {
if errors.IsConflict(err) && attempt < maxRetries-1 {
// Retry on conflict
time.Sleep(100 * time.Millisecond)
continue
}
// On last attempt or non-conflict error, let reconcile loop handle it
break
}
// Success
r.generateEvent(networkingv1.EventVaclabNodeBandwidthCreated, &bw)
break
}

// Generate event
r.generateEvent(networkingv1.EventVaclabNodeBandwidthCreated, &bw)
}

return nil
Expand Down
198 changes: 94 additions & 104 deletions bandwidth-operator/internal/controller/setup_bandwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,31 @@ func (r *BandwidthReconciler) SetupBandwidthResource(ctx context.Context, bandwi
})
}

} else {
// pod not created yet, in pendning state, freshly reserved by the scheduler
// during reserve stage, should have requested bandwidth defined in the request and pod uid
// should be taken into account for capacity calculation because scheduler will remove it in case of scheduling failure
if !strings.EqualFold(req.PodUid, "") && (req.Bandwidth.UlMbps > 0 || req.Bandwidth.DlMbps > 0) {
usedUlNetwork += req.Bandwidth.UlMbps
usedDlNetwork += req.Bandwidth.DlMbps
usedUlLocal += req.Bandwidth.UlMbps
usedDlLocal += req.Bandwidth.DlMbps
reservationInfo = append(reservationInfo, networkingv1.ReservationInfo{
PodName: req.PodName,
PodUid: req.PodUid,
Bandwidth: networkingv1.Capacity{
Network: networkingv1.BandwidthDefinition{
UlMbps: req.Bandwidth.UlMbps,
DlMbps: req.Bandwidth.DlMbps,
},
Local: networkingv1.BandwidthDefinition{
UlMbps: req.Bandwidth.UlMbps,
DlMbps: req.Bandwidth.DlMbps,
},
},
Namespace: req.Namespace,
})
}
}
}
usedCapacity.Local = networkingv1.BandwidthDefinition{
Expand Down Expand Up @@ -191,91 +216,6 @@ func (r *BandwidthReconciler) SetupBandwidthResource(ctx context.Context, bandwi
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}

// used only when manually changing the spec of an existing Bandwidth resource
// only max capacity changes are allowed manually
func (r *BandwidthReconciler) CheckAndUpdateBandwidth(ctx context.Context, bandwidth *networkingv1.Bandwidth) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("watching vaclab Bandwidth resource for updates")

nodeName := bandwidth.Spec.Node
bwName := bandwidth.Name
//make sure node exists and bw resource is valid
var node corev1.Node
nodeErr := r.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if nodeErr != nil {
if apierrors.IsNotFound(nodeErr) {
// Node gone: need to stop and clean up any existing bandwidth resource
log.Info("Node not found, cleaning up Bandwidth resource", "node", nodeName)
var bw networkingv1.Bandwidth
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &bw); err == nil {
_ = r.Delete(ctx, &bw) // ignore error
}
bandwidth.Status.Status = networkingv1.Error
bandwidth.Status.ErrorReason = "node not found"
r.generateEvent(networkingv1.EventVaclabNodeBandwidthFailed, bandwidth)
return ctrl.Result{}, nil
}
bandwidth.Status.Status = networkingv1.Error
bandwidth.Status.ErrorReason = "unable to fetch node information"
log.Info("Node not found, cleaning up Bandwidth resource", "node", nodeName)
var bw networkingv1.Bandwidth
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &bw); err == nil {
_ = r.Delete(ctx, &bw) // ignore error
}
r.generateEvent(networkingv1.EventVaclabNodeBandwidthFailed, bandwidth)
return ctrl.Result{}, nodeErr
}
if !strings.EqualFold(bwName, node.Name) {
log.Error(nil, "Node name mismatch", "expected", nodeName, "found", node.Name)
bandwidth.Status.Status = networkingv1.Error
bandwidth.Status.ErrorReason = "name mismatch between nodeName and bandwidthName"
log.Info("Node not found, cleaning up Bandwidth resource", "node", nodeName)
var bw networkingv1.Bandwidth
if err := r.Get(ctx, types.NamespacedName{Name: nodeName}, &bw); err == nil {
_ = r.Delete(ctx, &bw) // ignore error
}
r.generateEvent(networkingv1.EventVaclabNodeBandwidthFailed, bandwidth)
return ctrl.Result{}, nil
}

//currentSpec := bandwidth.Spec.Requests
state := bandwidth.Status
if len(state.Reservations) != len(bandwidth.Spec.Requests) {
// changes in requests are not allowed
log.Error(nil, "Changes in bandwidth requests are not allowed after creation", "node", nodeName)
return ctrl.Result{}, nil
}

//expectedCapacity := bandwidth.Status.Capacity
if state.Capacity != bandwidth.Spec.Capacity {
state.Capacity = bandwidth.Spec.Capacity
state.Remaining = networkingv1.Capacity{
Local: networkingv1.BandwidthDefinition{
UlMbps: bandwidth.Spec.Capacity.Local.UlMbps - state.Used.Local.UlMbps,
DlMbps: bandwidth.Spec.Capacity.Local.DlMbps - state.Used.Local.DlMbps,
},
Network: networkingv1.BandwidthDefinition{
UlMbps: bandwidth.Spec.Capacity.Network.UlMbps - state.Used.Network.UlMbps,
DlMbps: bandwidth.Spec.Capacity.Network.DlMbps - state.Used.Network.DlMbps,
},
}
}
bandwidth.Status = state
bandwidth.Status.UpdatedAt = metav1.NewTime(time.Now())

if err := r.UpdateBandwidthStatus(ctx, bandwidth); err != nil {
log.Error(err, "unable to update bandwidth resource status", "node", nodeName)
return ctrl.Result{}, err
}
r.Update(ctx, bandwidth)
// Requeue to check the status later
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

/*func (r *BandwidthReconciler) SyncFromSpecAndPods(ctx context.Context, bw *networkingv1.Bandwidth) (ctrl.Result, error) {
return r.syncFromSpecAndPodsWithVisited(ctx, bw, make(map[string]bool))
}*/

// syncSingleNodeBandwidth updates a single bandwidth resource without cascading to related nodes
// Used for background updates triggered by sibling pod changes
func (r *BandwidthReconciler) syncSingleNodeBandwidth(ctx context.Context, bw *networkingv1.Bandwidth) error {
Expand Down Expand Up @@ -335,12 +275,34 @@ func (r *BandwidthReconciler) syncSingleNodeBandwidth(ctx context.Context, bw *n

for _, req := range bwRequests {
pod, exists := podsWithBw[req.PodUid]
foundUnknown := false
if !exists {
log.V(1).Info("dropping request for missing pod", "podUid", req.PodUid, "podName", req.PodName)
continue
}
// Try to get pod status using client and pod UID
// keep the request if pod is pending (scheduled but not created yet)
var unknownPod corev1.Pod
if err := r.Get(ctx, types.NamespacedName{Name: req.PodName, Namespace: req.Namespace}, &unknownPod); err != nil || string(unknownPod.UID) != req.PodUid {
log.V(1).Info("dropping request for missing pod", "podUid", req.PodUid, "podName", req.PodName)
continue
}

ul, dl := r.GetBandwidthFromAnnotation(pod)
if unknownPod.DeletionTimestamp != nil || unknownPod.Status.Phase == corev1.PodSucceeded || unknownPod.Status.Phase == corev1.PodFailed {
log.V(1).Info("dropping request for terminating/completed pod", "podUid", req.PodUid, "podName", req.PodName)
continue
}

if unknownPod.Status.Phase != corev1.PodPending {
log.V(1).Info("dropping request for pod without bandwidth annotation", "podUid", req.PodUid, "podName", req.PodName)
continue
}
foundUnknown = true
pod = unknownPod
}
var ul, dl int64
if foundUnknown {
ul, dl = req.Bandwidth.UlMbps, req.Bandwidth.DlMbps
} else {
ul, dl = r.GetBandwidthFromAnnotation(pod)
}
usedUlLocal += ul
usedDlLocal += dl

Expand Down Expand Up @@ -519,28 +481,44 @@ func (r *BandwidthReconciler) SyncFromSpecAndPods(ctx context.Context, bw *netwo

for _, req := range bwRequests {
pod, exists := podsWithBw[req.PodUid]
isPendingPod := false
if !exists {
// Pod gone or no bw annotation anymore => drop request
// Check if this pod had siblings on other nodes - they need updates
if oldReq, hadRequest := existingReqs[req.PodUid]; hadRequest {
log.Info("dropping request for missing pod", "podUid", req.PodUid, "podName", req.PodName)
// If pod was part of a workload with siblings, mark related nodes as affected
// We need to fetch sibling info to know which nodes to notify
// Use a best-effort approach: get pod from old request data if possible
var oldPod corev1.Pod
if err := r.Get(ctx, types.NamespacedName{Name: oldReq.PodName, Namespace: oldReq.Namespace}, &oldPod); err == nil {
_, nodes, _ := r.allSiblingsOnCurrentNode(ctx, &oldPod, nodeName)
for _, n := range nodes {
if !strings.EqualFold(n, nodeName) {
affectedRelatedNodes[n] = true
var pendingPod corev1.Pod
if err := r.Get(ctx, types.NamespacedName{Name: req.PodName, Namespace: req.Namespace}, &pendingPod); err == nil && string(pendingPod.UID) == req.PodUid {
if pendingPod.Status.Phase == corev1.PodPending {
// Pod is pending - keep the request as is
isPendingPod = true
pod = pendingPod
}
}
if !isPendingPod {
// Pod gone or no bw annotation anymore => drop request
// Check if this pod had siblings on other nodes - they need updates
if oldReq, hadRequest := existingReqs[req.PodUid]; hadRequest {
log.Info("dropping request for missing pod", "podUid", req.PodUid, "podName", req.PodName)
// If pod was part of a workload with siblings, mark related nodes as affected
// We need to fetch sibling info to know which nodes to notify
// Use a best-effort approach: get pod from old request data if possible
var oldPod corev1.Pod
if err := r.Get(ctx, types.NamespacedName{Name: oldReq.PodName, Namespace: oldReq.Namespace}, &oldPod); err == nil {
_, nodes, _ := r.allSiblingsOnCurrentNode(ctx, &oldPod, nodeName)
for _, n := range nodes {
if !strings.EqualFold(n, nodeName) {
affectedRelatedNodes[n] = true
}
}
}
}
continue
}
continue
}

ul, dl := r.GetBandwidthFromAnnotation(pod)
var ul, dl int64
if isPendingPod {
ul, dl = req.Bandwidth.UlMbps, req.Bandwidth.DlMbps
} else {
ul, dl = r.GetBandwidthFromAnnotation(pod)
}
// local bandwidth is always used
// since pods are always connected to virtual bridge
usedUlLocal += ul
Expand Down Expand Up @@ -707,6 +685,18 @@ func (r *BandwidthReconciler) SyncFromSpecAndPods(ctx context.Context, bw *netwo
return ctrl.Result{}, err
}
log.Info("bandwidth spec updated", "node", nodeName, "requests", len(cleanRequests))

// Re-fetch to get updated resourceVersion after spec update
if err := r.Get(ctx, client.ObjectKeyFromObject(bw), bw); err != nil {
return ctrl.Result{}, err
}
// Re-apply status fields since we just fetched fresh object
bw.Status.Used = usedCapacity
bw.Status.Remaining = remainingCapacity
bw.Status.Reservations = reservationInfo
bw.Status.Capacity = bw.Spec.Capacity
bw.Status.Status = networkingv1.Created
bw.Status.ErrorReason = ""
}

// Only update status if it actually changed
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading