Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
flag "github.com/spf13/pflag"
"golang.org/x/sys/unix"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
"k8s.io/component-base/logs"
logsapi "k8s.io/component-base/logs/api/v1"
Expand Down Expand Up @@ -149,14 +150,22 @@ func main() {
meta.EnableEcs(http.DefaultTransport)

cfg, err := options.GetRestConfig()
var kubeClient kubernetes.Interface
var k8sVersion *k8sversion.Version
if err != nil {
klog.Warningf("newGlobalConfig: build kubeconfig failed: %v", err)
} else {
kubeClient, err := kubernetes.NewForConfig(cfg)
kubeClient, err = kubernetes.NewForConfig(cfg)
if err != nil {
klog.Warningf("Error building kubernetes clientset: %v", err)
} else {
meta.EnableKubernetes(kubeClient)

// Detect Kubernetes version for feature compatibility
k8sVersion, err = detectKubernetesVersion(kubeClient)
if err != nil {
klog.Warningf("Failed to detect kubernetes version: %v", err)
}
}
}

Expand Down Expand Up @@ -209,7 +218,7 @@ func main() {
case TypePluginOSS:
go func(endPoint string) {
defer wg.Done()
driver := oss.NewDriver(endPoint, meta, serviceType, csiCfg)
driver := oss.NewDriver(endPoint, meta, serviceType, csiCfg, k8sVersion)
driver.Run()
}(endPointName)
case TypePluginDISK:
Expand Down Expand Up @@ -327,3 +336,19 @@ func getCSIPluginConfig() (config utils.Config) {
config.ConfigMap = cm.Data
return
}

// detectKubernetesVersion detects the Kubernetes server version
func detectKubernetesVersion(clientset kubernetes.Interface) (*k8sversion.Version, error) {
serverVersion, err := clientset.Discovery().ServerVersion()
if err != nil {
return nil, fmt.Errorf("failed to get server version: %w", err)
}

// Parse the version string (format: "v1.31.0" or "v1.31.0-eks-...")
k8sVersion, err := k8sversion.ParseSemantic(serverVersion.GitVersion)
if err != nil {
return nil, fmt.Errorf("failed to parse version %q: %w", serverVersion.GitVersion, err)
}

return k8sVersion, nil
}
46 changes: 36 additions & 10 deletions pkg/mounter/fuse_pod_manager/fuse_pod_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
informercorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
Expand Down Expand Up @@ -192,14 +192,16 @@ func ExtractFuseContainerConfig(csiCfg utils.Config, name string) (config FuseCo
}

type FusePodManager struct {
client kubernetes.Interface
client kubernetes.Interface
constrainResourceVersion bool
FuseMounterType
}

func NewFusePodManager(fuseType FuseMounterType, client kubernetes.Interface) *FusePodManager {
func NewFusePodManager(fuseType FuseMounterType, client kubernetes.Interface, constrainResourceVersion bool) *FusePodManager {
return &FusePodManager{
client: client,
FuseMounterType: fuseType,
client: client,
constrainResourceVersion: constrainResourceVersion,
FuseMounterType: fuseType,
}
}

Expand Down Expand Up @@ -337,10 +339,35 @@ func (fpm *FusePodManager) Delete(c *FusePodContext) error {
logger := klog.FromContext(ctx).WithValues("namespace", c.Namespace)

_, listOptions := fpm.labelsAndListOptionsFor(c, "")
informer := informercorev1.NewFilteredPodInformer(fpm.client, c.Namespace, 0, nil, func(options *metav1.ListOptions) {
options.FieldSelector = listOptions.FieldSelector
options.LabelSelector = listOptions.LabelSelector
})

podClient := fpm.client.CoreV1().Pods(c.Namespace)

// Determine ResourceVersion strategy based on constrainResourceVersion flag
// true: Constrain RV="0" to read from watch cache (K8s < 1.31, avoids etcd pressure)
// false: Use RV="" for consistent reads (K8s >= 1.31, Consistent Reads from Cache)
resourceVersion := "0"
if !fpm.constrainResourceVersion {
resourceVersion = ""
}

// Create custom ListWatch with ResourceVersion override
// Important: We must override ResourceVersion in the ListFunc because
// the Reflector will set it to relistResourceVersion() which returns "0"
// for initial list. We override it based on our strategy.
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
options.ResourceVersion = resourceVersion
options.FieldSelector = listOptions.FieldSelector
options.LabelSelector = listOptions.LabelSelector
return podClient.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = listOptions.FieldSelector
options.LabelSelector = listOptions.LabelSelector
return podClient.Watch(ctx, options)
},
}
informer := cache.NewSharedIndexInformer(lw, &corev1.Pod{}, 0, nil)
deleteNotify := make(chan struct{}, 1)
deleteNotify <- struct{}{}
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -362,7 +389,6 @@ func (fpm *FusePodManager) Delete(c *FusePodContext) error {
return fmt.Errorf("failed to wait for caches to sync: %w", ctx.Err())
}

podClient := fpm.client.CoreV1().Pods(c.Namespace)
for _, obj := range informer.GetStore().List() {
pod := obj.(*corev1.Pod)
if pod.DeletionTimestamp == nil {
Expand Down
59 changes: 40 additions & 19 deletions pkg/mounter/fuse_pod_manager/fuse_pod_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestCreate(t *testing.T) {
pod.ResourceVersion = "1"
return false, pod, nil
})
fpm := NewFusePodManager(testFuse{}, client)
fpm := NewFusePodManager(testFuse{}, client, false)

go func() {
// slimulate kubelet
Expand Down Expand Up @@ -124,24 +124,45 @@ func TestCreate(t *testing.T) {
}

func TestDelete(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
client := fake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-fuse-pod",
Namespace: "test-fuse",
Labels: map[string]string{FuseVolumeIdLabelKey: "test-volume"},
tests := []struct {
name string
constrainRV bool
}{
{
name: "Delete with constrainRV=true (K8s < 1.31)",
constrainRV: true,
},
})
{
name: "Delete with constrainRV=false (K8s >= 1.31)",
constrainRV: false,
},
}

fpm := NewFusePodManager(testFuse{}, client)
err := fpm.Delete(&FusePodContext{
Context: ctx,
Namespace: "test-fuse",
VolumeId: "test-volume",
NodeName: "test-node",
})
require.NoError(t, err)
pods, err := client.CoreV1().Pods("test-fuse").List(ctx, metav1.ListOptions{})
require.NoError(t, err)
assert.Empty(t, pods.Items)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)

client := fake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-fuse-pod",
Namespace: "test-fuse",
Labels: map[string]string{FuseVolumeIdLabelKey: "test-volume"},
},
})

fpm := NewFusePodManager(testFuse{}, client, tt.constrainRV)
err := fpm.Delete(&FusePodContext{
Context: ctx,
Namespace: "test-fuse",
VolumeId: "test-volume",
NodeName: "test-node",
})
require.NoError(t, err)

// Verify pod was deleted
pods, err := client.CoreV1().Pods("test-fuse").List(ctx, metav1.ListOptions{})
require.NoError(t, err)
assert.Empty(t, pods.Items)
})
}
}
4 changes: 2 additions & 2 deletions pkg/mounter/fuse_pod_manager/oss/oss_fuse_pod_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type OSSFusePodManager struct {
OSSFuseMounterType
}

func NewOSSFusePodManager(fuseType OSSFuseMounterType, client kubernetes.Interface) *OSSFusePodManager {
manager := fpm.NewFusePodManager(fuseType, client)
func NewOSSFusePodManager(fuseType OSSFuseMounterType, client kubernetes.Interface, constrainResourceVersion bool) *OSSFusePodManager {
manager := fpm.NewFusePodManager(fuseType, client, constrainResourceVersion)
return &OSSFusePodManager{
*manager,
fuseType,
Expand Down
45 changes: 43 additions & 2 deletions pkg/mounter/fuse_pod_manager/oss/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package oss

import (
"maps"
"os"
"strconv"

"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/mounter"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
k8sver "k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)

var (
Expand Down Expand Up @@ -65,13 +69,50 @@ func GetAllRegisteredFuseTypes() []string {
return types
}

// ShouldConstrainResourceVersion determines whether to constrain ResourceVersion to "" for consistent reads
// Priority: ENV override > K8s version detection
// - ENV OSS_FUSE_CONSTRAIN_RV=true/false: If set, use this value
// - K8s >= 1.31: false (Consistent Reads from Cache supported, no need to constrain)
// - K8s < 1.31: true (Constrain RV="0" to use watch cache and avoid etcd pressure)
// - K8s unknown: true (Conservative approach, constrain RV="0")
func ShouldConstrainResourceVersion(k8sVersion *k8sver.Version) bool {
// Check ENV override first
if envVal := os.Getenv("OSS_FUSE_CONSTRAIN_RV"); envVal != "" {
if constrained, err := strconv.ParseBool(envVal); err == nil {
if constrained {
klog.Warningf("ENV OSS_FUSE_CONSTRAIN_RV=true, constraining ResourceVersion to '0' for fuse pod delete operations")
} else {
klog.Infof("ENV OSS_FUSE_CONSTRAIN_RV=false, using ResourceVersion='' for fuse pod delete operations")
}
return constrained
}
klog.Warningf("ENV OSS_FUSE_CONSTRAIN_RV has invalid value %q, ignoring", envVal)
}

// Fall back to K8s version detection
if k8sVersion == nil {
klog.Errorf("K8s version unknown, constraining ResourceVersion to '0' for fuse pod delete operations (conservative approach)")
return true
}

if k8sVersion.AtLeast(k8sver.MajorMinor(1, 31)) {
klog.Infof("K8s version %s >= 1.31, not constraining ResourceVersion (Consistent Reads from Cache supported)", k8sVersion.String())
return false
}

klog.Warningf("K8s version %s < 1.31, constraining ResourceVersion to '0' for fuse pod delete operations (avoid etcd pressure)", k8sVersion.String())
return true
}

// GetAllOSSFusePodManagers creates a map of all registered OSS fuse pod managers
// configmap can be nil if not available (e.g., in CSI agent mode)
// client can be nil if not needed (e.g., in CSI agent mode)
func GetAllOSSFusePodManagers(csiCfg utils.Config, m metadata.MetadataProvider, client kubernetes.Interface) map[string]*OSSFusePodManager {
func GetAllOSSFusePodManagers(csiCfg utils.Config, m metadata.MetadataProvider, client kubernetes.Interface, k8sVersion *k8sver.Version) map[string]*OSSFusePodManager {
constrainRV := ShouldConstrainResourceVersion(k8sVersion)

fusePodManagers := make(map[string]*OSSFusePodManager, len(fstypeToFactory))
for fstype, factory := range fstypeToFactory {
fusePodManagers[fstype] = NewOSSFusePodManager(factory(csiCfg, m), client)
fusePodManagers[fstype] = NewOSSFusePodManager(factory(csiCfg, m), client, constrainRV)
}
return fusePodManagers
}
Expand Down
68 changes: 67 additions & 1 deletion pkg/mounter/fuse_pod_manager/oss/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
k8sver "k8s.io/apimachinery/pkg/util/version"
)

func TestGetFuseMounterPath(t *testing.T) {
Expand Down Expand Up @@ -71,6 +72,71 @@ func TestGetAllRegisteredFuseTypes(t *testing.T) {
assert.True(t, typeMap[testType], "test type should be registered")
}

func TestShouldConstrainResourceVersion(t *testing.T) {
tests := []struct {
name string
k8sVersion *k8sver.Version
envValue string
expectedResult bool
}{
{
name: "K8s 1.31 should not constrain RV",
k8sVersion: k8sver.MajorMinor(1, 31),
expectedResult: false,
},
{
name: "K8s 1.32 should not constrain RV",
k8sVersion: k8sver.MajorMinor(1, 32),
expectedResult: false,
},
{
name: "K8s 1.30 should constrain RV",
k8sVersion: k8sver.MajorMinor(1, 30),
expectedResult: true,
},
{
name: "K8s 1.28 should constrain RV",
k8sVersion: k8sver.MajorMinor(1, 28),
expectedResult: true,
},
{
name: "nil version should constrain RV",
k8sVersion: nil,
expectedResult: true,
},
{
name: "ENV=true should override K8s 1.31",
k8sVersion: k8sver.MajorMinor(1, 31),
envValue: "true",
expectedResult: true,
},
{
name: "ENV=false should override K8s 1.30",
k8sVersion: k8sver.MajorMinor(1, 30),
envValue: "false",
expectedResult: false,
},
{
name: "ENV invalid value should be ignored",
k8sVersion: k8sver.MajorMinor(1, 30),
envValue: "invalid",
expectedResult: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Set ENV if specified
if tt.envValue != "" {
t.Setenv("OSS_FUSE_CONSTRAIN_RV", tt.envValue)
}

result := ShouldConstrainResourceVersion(tt.k8sVersion)
assert.Equal(t, tt.expectedResult, result)
})
}
}

func TestGetAllOSSFusePodManagers(t *testing.T) {
fakeMeta := metadata.NewMetadata()

Expand All @@ -82,7 +148,7 @@ func TestGetAllOSSFusePodManagers(t *testing.T) {
RegisterFuseMounter(testType, testFactory)

// Test with nil configmap and nil client (CSI agent mode)
managers := GetAllOSSFusePodManagers(utils.Config{}, fakeMeta, nil)
managers := GetAllOSSFusePodManagers(utils.Config{}, fakeMeta, nil, nil)

// Should have at least the test manager
assert.GreaterOrEqual(t, len(managers), 1, "Should have at least 1 fuse pod manager")
Expand Down
2 changes: 2 additions & 0 deletions pkg/oss/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
k8sver "k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)
Expand All @@ -55,6 +56,7 @@ type controllerServer struct {
fusePodManagers map[string]*ossfpm.OSSFusePodManager
legacyPods sets.Set[podLoc]
legacyPodsMu sync.Mutex
k8sVersion *k8sver.Version
common.GenericControllerServer
}

Expand Down
Loading