Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ kubectl delete --wait=false --filename=demo/basic-resourceclaimtemplate.yaml \
--filename=demo/basic-shared-claim-across-containers.yaml \
--filename=demo/basic-shared-claim-across-pods.yaml \
--filename=demo/basic-resourceclaim-opaque-config.yaml \
--filename=demo/admin-access.yaml
--filename=demo/admin-access.yaml \
--filename=demo/binding-conditions/binding-conditions.yaml
```

And wait for them to terminate:
Expand All @@ -417,6 +418,7 @@ basic-shared-claim-across-containers pod1 1/1 Terminating 0 3
basic-shared-claim-across-pods pod0 1/1 Terminating 0 31m
basic-resourceclaim-opaque-config pod0 4/4 Terminating 0 31m
admin-access pod0 1/1 Terminating 0 31m
binding-conditions pod0 1/1 Terminating 0 31m
...
```

Expand Down
114 changes: 114 additions & 0 deletions cmd/dra-example-controller/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright The Kubernetes Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"fmt"
"sync"

resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// Plugin processes a ResourceClaim that has been allocated for this driver.
// Each plugin is responsible for updating the claim status if needed.
// New functionality can be added by implementing this interface and
// registering the plugin in main().
type Plugin interface {
// Name returns the name of the plugin.
Name() string
Reconcile(ctx context.Context, c client.Client, claim *resourceapi.ResourceClaim) error
}

// ClaimReconciler watches ResourceClaims and runs registered Plugins on
// claims allocated for its driver.
type ClaimReconciler struct {
client client.Client
driverName string
plugins []Plugin
}

func NewClaimReconciler(mgr ctrl.Manager, driverName string, plugins []Plugin) *ClaimReconciler {
return &ClaimReconciler{
client: mgr.GetClient(),
driverName: driverName,
plugins: plugins,
}
}

func (r *ClaimReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&resourceapi.ResourceClaim{}).
Complete(r)
}

func (r *ClaimReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var claim resourceapi.ResourceClaim
if err := r.client.Get(ctx, req.NamespacedName, &claim); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, fmt.Errorf("get claim: %w", err)
}

if !r.isRelevant(&claim) {
return ctrl.Result{}, nil
}

var wg sync.WaitGroup
errChan := make(chan error, len(r.plugins))

for _, p := range r.plugins {
wg.Add(1)
go func(plugin Plugin) {
defer wg.Done()
if err := plugin.Reconcile(ctx, r.client, &claim); err != nil {
errChan <- fmt.Errorf("%s: %w", plugin.Name(), err)
}
}(p)
}

wg.Wait()
close(errChan)

var errs []error
for err := range errChan {
errs = append(errs, err)
}

if len(errs) > 0 {
return ctrl.Result{}, fmt.Errorf("plugins failed: %v", errs)
}

return ctrl.Result{}, nil
}

// isRelevant returns true if the claim has any allocation results for this driver.
func (r *ClaimReconciler) isRelevant(claim *resourceapi.ResourceClaim) bool {
if claim.Status.Allocation == nil {
return false
}
for _, result := range claim.Status.Allocation.Devices.Results {
if result.Driver == r.driverName {
return true
}
}
return false
}
118 changes: 118 additions & 0 deletions cmd/dra-example-controller/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright The Kubernetes Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"flag"
"fmt"
"os"
"sort"
"strings"

resourceapi "k8s.io/api/resource/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

"sigs.k8s.io/dra-example-driver/cmd/dra-example-controller/plugins"
)

var scheme = runtime.NewScheme()

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(resourceapi.AddToScheme(scheme))
}

// PluginFactory creates a Plugin for the given driver name.
type PluginFactory func(driverName string) Plugin

// pluginRegistry maps plugin names to their factory functions.
// Add new plugins here.
var pluginRegistry = map[string]PluginFactory{
plugins.BindingConditions: func(driverName string) Plugin {
return plugins.NewBindingConditionsPlugin(driverName)
},
}

// pluginNames returns sorted plugin names for help text.
func pluginNames() []string {
names := make([]string, 0, len(pluginRegistry))
for name := range pluginRegistry {
names = append(names, name)
}
sort.Strings(names)
return names
}

// enablePlugins is a flag.Value that collects --enable-plugin values.
// Values can be specified as comma-separated or by repeating the flag.
type enablePlugins []string

func (e *enablePlugins) String() string { return strings.Join(*e, ",") }
func (e *enablePlugins) Set(v string) error {
for _, name := range strings.Split(v, ",") {
name = strings.TrimSpace(name)
if name == "" {
continue
}
if _, ok := pluginRegistry[name]; !ok {
return fmt.Errorf("unknown plugin %q (available: %s)", name, strings.Join(pluginNames(), ", "))
}
*e = append(*e, name)
}
return nil
}

func main() {
var driverName string
var enabled enablePlugins
flag.StringVar(&driverName, "driver-name", "gpu.example.com", "The driver name to filter ResourceClaims by.")
flag.Var(&enabled, "enable-plugin",
fmt.Sprintf("Enable a plugin (can be specified multiple times). Available: %s", strings.Join(pluginNames(), ", ")))
opts := zap.Options{Development: true}
opts.BindFlags(flag.CommandLine)
flag.Parse()

ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts)))

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Error creating manager: %v\n", err)
os.Exit(1)
}

// Build the plugin list from flags.
var plugins []Plugin
for _, name := range enabled {
plugins = append(plugins, pluginRegistry[name](driverName))
}

if err := NewClaimReconciler(mgr, driverName, plugins).SetupWithManager(mgr); err != nil {
fmt.Fprintf(os.Stderr, "Error setting up controller: %v\n", err)
os.Exit(1)
}

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
fmt.Fprintf(os.Stderr, "Error running manager: %v\n", err)
os.Exit(1)
}
}
140 changes: 140 additions & 0 deletions cmd/dra-example-controller/plugins/bindingconditions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright The Kubernetes Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package plugins

import (
"context"

resourceapi "k8s.io/api/resource/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

// BindingConditionsPlugin satisfies binding conditions for allocated devices
// by marking them as ready. In a real driver this would check actual device
// readiness before setting the condition.
type BindingConditionsPlugin struct {
driverName string
}

func NewBindingConditionsPlugin(driverName string) *BindingConditionsPlugin {
return &BindingConditionsPlugin{driverName: driverName}
}

func (p *BindingConditionsPlugin) Name() string {
return BindingConditions
}

func (p *BindingConditionsPlugin) Reconcile(ctx context.Context, c client.Client, claim *resourceapi.ResourceClaim) error {
if claim.Status.Allocation == nil {
return nil
}

logger := log.FromContext(ctx)
modified := false
now := metav1.Now()

for _, result := range claim.Status.Allocation.Devices.Results {
if result.Driver != p.driverName || len(result.BindingConditions) == 0 {
continue
}
for _, condType := range result.BindingConditions {
if isConditionTrue(claim, result, condType) {
continue
}
setDeviceCondition(claim, result, condType, now)
logger.Info("Set binding condition",
"device", result.Device,
"condition", condType,
)
modified = true
}
}

if !modified {
return nil
}

logger.Info("Updating ResourceClaim status", "name", claim.Name)
return c.Status().Update(ctx, claim)
}

// isConditionTrue checks whether a device already has the given condition set to True.
func isConditionTrue(
claim *resourceapi.ResourceClaim,
result resourceapi.DeviceRequestAllocationResult,
condType string,
) bool {
for _, d := range claim.Status.Devices {
if d.Driver == result.Driver && d.Pool == result.Pool && d.Device == result.Device {
for _, c := range d.Conditions {
if c.Type == condType && c.Status == metav1.ConditionTrue {
return true
}
}
}
}
return false
}

// setDeviceCondition adds or updates a condition for a device in the claim status.
func setDeviceCondition(
claim *resourceapi.ResourceClaim,
result resourceapi.DeviceRequestAllocationResult,
condType string,
now metav1.Time,
) {
// Find existing device status entry.
for i := range claim.Status.Devices {
d := &claim.Status.Devices[i]
if d.Driver == result.Driver && d.Pool == result.Pool && d.Device == result.Device {
// Update or append the condition within this entry.
for j := range d.Conditions {
if d.Conditions[j].Type == condType {
d.Conditions[j].Status = metav1.ConditionTrue
d.Conditions[j].Reason = "Ready"
d.Conditions[j].Message = "Device is ready"
d.Conditions[j].LastTransitionTime = now
return
}
}
d.Conditions = append(d.Conditions, metav1.Condition{
Type: condType,
Status: metav1.ConditionTrue,
Reason: "Ready",
Message: "Device is ready",
LastTransitionTime: now,
})
return
}
}

// No existing entry; create a new one.
claim.Status.Devices = append(claim.Status.Devices, resourceapi.AllocatedDeviceStatus{
Driver: result.Driver,
Pool: result.Pool,
Device: result.Device,
Conditions: []metav1.Condition{{
Type: condType,
Status: metav1.ConditionTrue,
Reason: "Ready",
Message: "Device is ready",
LastTransitionTime: now,
}},
})
}
Loading