diff --git a/pkg/disk/bdf.go b/pkg/disk/bdf.go index d8e0276bb8..82a8569f70 100644 --- a/pkg/disk/bdf.go +++ b/pkg/disk/bdf.go @@ -19,7 +19,6 @@ import ( "time" "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" utilsio "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/io" "github.com/pkg/errors" @@ -44,12 +43,6 @@ const ( iohubSrviovDriver = "iohub_sriov" virtioPciDriver = "virtio-pci" - // InstanceStatusStopped ecs stopped status - InstanceStatusStopped = "Stopped" - // DiskBdfTagKey disk bdf tag - DiskBdfTagKey = "bdf.csi.aliyun.com" - // DiskBdfCheckTagKey disk bdf check tag - DiskBdfCheckTagKey = "check.bdf.csi.aliyun.com" // Vfhp Reconcile period VfhpReconcilePeriod = 600 ) @@ -355,53 +348,6 @@ func clearBdfInfo(diskID, bdf string) (err error) { return nil } -func forceDetachAllowed(ecsClient cloud.ECSInterface, disk *ecs.Disk) (allowed bool, err error) { - // The following case allow detach: - // 1. no depend bdf - // 2. instance status is stopped - - // case 1 - describeDisksRequest := ecs.CreateDescribeDisksRequest() - describeDisksRequest.RegionId = GlobalConfigVar.Region - describeDisksRequest.DiskIds = "[\"" + disk.DiskId + "\"]" - diskResponse, err := ecsClient.DescribeDisks(describeDisksRequest) - if err != nil { - klog.Warningf("forceDetachAllowed: error with DescribeDisks: %s, %s", disk.DiskId, err.Error()) - return false, errors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId) - } - disks := diskResponse.Disks.Disk - klog.Infof("forceDetachAllowed: diskResponse: %+v", diskResponse) - if len(disks) == 0 { - klog.Warningf("forceDetachAllowed: no disk found: %s", disk.DiskId) - return false, errors.Wrapf(err, "forceDetachAllowed: Get disk empty, ID=%s", disk.DiskId) - } - bdfTagExist := false - for _, tag := range disks[0].Tags.Tag { - if tag.TagKey == DiskBdfTagKey { - bdfTagExist = true - } - } - if !bdfTagExist { - return true, nil - } - - request := ecs.CreateDescribeInstancesRequest() - request.RegionId = disk.RegionId - request.InstanceIds = "[\"" + disk.InstanceId + "\"]" - instanceResponse, err := ecsClient.DescribeInstances(request) - klog.Infof("forceDetachAllowed: instanceResponse: %+v", instanceResponse) - if err != nil { - return false, errors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId) - } - if len(instanceResponse.Instances.Instance) == 0 { - return false, errors.Errorf("Describe Instance with empty response: %s", disk.InstanceId) - } - inst := instanceResponse.Instances.Instance[0] - klog.Infof("forceDetachAllowed: Instance status is %s", inst.Status) - // case 2 - return inst.Status == InstanceStatusStopped, nil -} - var vfOnce = new(sync.Once) // isVF means ecsClient running in VF mode diff --git a/pkg/disk/cloud.go b/pkg/disk/cloud.go index 05c42f220d..3b40f3d028 100644 --- a/pkg/disk/cloud.go +++ b/pkg/disk/cloud.go @@ -1,5 +1,3 @@ -//go:build !windows - /* Copyright 2019 The Kubernetes Authors. @@ -41,6 +39,7 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" + perrors "github.com/pkg/errors" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" v1 "k8s.io/api/core/v1" @@ -61,10 +60,28 @@ const ( DISK_RESIZE_PROCESSING_TIMEOUT = 30 * time.Second ) +// Disk status returned in ecs.DescribeDisks +const ( + DiskStatusInuse = "In_use" + DiskStatusAttaching = "Attaching" + DiskStatusDetaching = "Detaching" + DiskStatusAvailable = "Available" +) + const ( SnapshotStatusAccomplished = "accomplished" DiskMultiAttachDisabled = "Disabled" DiskMultiAttachEnabled = "Enabled" + + // InstanceStatusStopped ecs stopped status + InstanceStatusStopped = "Stopped" +) + +const ( + // DiskAttachedKey attached key + DiskAttachedKey = "k8s.aliyun.com" + // DiskAttachedValue attached value + DiskAttachedValue = "true" ) type DiskAttachDetach struct { @@ -77,8 +94,7 @@ type DiskAttachDetach struct { detachThrottler *throttle.Throttler detaching sync.Map - dev *DeviceManager - devMap *devMap + repo *diskRepo } type DiskCreateDelete struct { @@ -88,109 +104,6 @@ type DiskCreateDelete struct { deleteThrottler *throttle.Throttler } -// GetRootBlockDevice get device name -func (ad *DiskAttachDetach) GetRootBlockDevice(logger klog.Logger, diskID string) (string, error) { - device, err := ad.dev.GetRootBlockBySerial(strings.TrimPrefix(diskID, "d-")) - if err == nil { - return device, nil - } - device, err2 := ad.devMap.Get(logger, diskID) - if device == "" { - return "", errors.Join(err, err2) // err2 may be nil, which is OK - } - klog.Infof("GetRootBlockDevice: got disk %s device name %s from devMap", diskID, device) - return device, nil -} - -func (ad *DiskAttachDetach) GetVolumeDeviceName(logger klog.Logger, diskID string) (string, error) { - root, err := ad.GetRootBlockDevice(logger, diskID) - if err != nil { - return "", err - } - return ad.dev.adaptDevicePartition(root) -} - -func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) { - after, err := ad.dev.ListBlocks() - if err != nil { - return nil, fmt.Errorf("cannot list devices after attach: %w", err) - } - - var disks []string - for d := range after.Difference(before) { - serial, err := ad.dev.GetDeviceSerial(d) - if err != nil { - return nil, fmt.Errorf("get device serial for disk %s failed: %w", d, err) - } - if serial == "" { - disks = append(disks, "/dev/"+d) - } - } - return disks, nil -} - -func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) { - logger := klog.FromContext(ctx) - var bdf, device string - var err error - for { - if serial != "" { - device, err = ad.dev.WaitRootBlock(ctx, serial) - if err == nil { - logger.V(2).Info("found disk by serial", "serial", serial, "device", device) - break - } - err = fmt.Errorf("disk attached but not found by serial %s: %w", serial, err) - } else if before != nil { - var disks []string - disks, err = ad.possibleDisks(before) - if err != nil { - return "", fmt.Errorf("failed to find disk without serial: %v", err) - } - if len(disks) == 1 { - device = disks[0] - err := ad.devMap.Add(diskID, device) - if err != nil { - return "", fmt.Errorf("failed to populate devMap: %v", err) - } - logger.V(2).Info("found device by diff", "device", device) - break - } else { - // device count is not expected, should retry (later by detaching and attaching again) - err = fmt.Errorf("disk attached, but got %d devices, will retry later", len(disks)) - } - } - - if !IsVFNode() { - return "", err - } - if bdf != "" { - // second attempt after bindBdfDisk - var errBDF error - device, errBDF = GetDeviceByBdf(bdf, true) - if errBDF != nil { - return "", fmt.Errorf("%v. failed to find by BDF: %v", err, errBDF) - } - logger.V(2).Info("found device by BDF", "BDF", bdf, "device", device) - break - } - // On VF node, try bind driver - bdf, err = bindBdfDisk(diskID) - if err != nil { - if err := unbindBdfDisk(diskID); err != nil { - return "", fmt.Errorf("NodeStageVolume: failed to detach bdf: %v", err) - } - return "", fmt.Errorf("NodeStageVolume: failed to attach bdf: %v", err) - } - if bdf == "" { - // avoid infinite loop - return "", fmt.Errorf("BDF not found") - } - // continue and retry finding device - } - return device, nil -} - type attachAction int const ( @@ -236,9 +149,56 @@ func chooseAttachAction(disk *ecs.Disk, instanceID string) (attachAction, error) return attachNormally, nil } +func forceDetachAllowed(ecsClient cloud.ECSInterface, disk *ecs.Disk) (allowed bool, err error) { + // The following case allow detach: + // 1. no depend bdf + // 2. instance status is stopped + + // case 1 + describeDisksRequest := ecs.CreateDescribeDisksRequest() + describeDisksRequest.RegionId = GlobalConfigVar.Region + describeDisksRequest.DiskIds = "[\"" + disk.DiskId + "\"]" + diskResponse, err := ecsClient.DescribeDisks(describeDisksRequest) + if err != nil { + klog.Warningf("forceDetachAllowed: error with DescribeDisks: %s, %s", disk.DiskId, err.Error()) + return false, perrors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId) + } + disks := diskResponse.Disks.Disk + klog.Infof("forceDetachAllowed: diskResponse: %+v", diskResponse) + if len(disks) == 0 { + klog.Warningf("forceDetachAllowed: no disk found: %s", disk.DiskId) + return false, perrors.Wrapf(err, "forceDetachAllowed: Get disk empty, ID=%s", disk.DiskId) + } + bdfTagExist := false + for _, tag := range disks[0].Tags.Tag { + if tag.TagKey == DiskBdfTagKey { + bdfTagExist = true + } + } + if !bdfTagExist { + return true, nil + } + + request := ecs.CreateDescribeInstancesRequest() + request.RegionId = disk.RegionId + request.InstanceIds = "[\"" + disk.InstanceId + "\"]" + instanceResponse, err := ecsClient.DescribeInstances(request) + klog.Infof("forceDetachAllowed: instanceResponse: %+v", instanceResponse) + if err != nil { + return false, perrors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId) + } + if len(instanceResponse.Instances.Instance) == 0 { + return false, perrors.Errorf("Describe Instance with empty response: %s", disk.InstanceId) + } + inst := instanceResponse.Instances.Instance[0] + klog.Infof("forceDetachAllowed: Instance status is %s", inst.Status) + // case 2 + return inst.Status == InstanceStatusStopped, nil +} + // Attach Alibaba Cloud disk. // Returns device path if fromNode, disk serial number otherwise. -func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) { +func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string) (string, error) { logger := klog.FromContext(ctx) logger.V(2).Info("Starting Do AttachDisk") @@ -251,7 +211,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin return "", status.Errorf(codes.NotFound, "AttachDisk: csi can't find disk: %s in region: %s, Please check if the cloud disk exists, if the region is correct, or if the csi permissions are correct", diskID, GlobalConfigVar.Region) } - if !fromNode && disk.SerialNumber == "" { + if ad.repo == nil && disk.SerialNumber == "" { if GlobalConfigVar.ADControllerEnable { return "", status.Errorf(codes.InvalidArgument, "Disk %s does not have serial number but AD controller is enabled, we cannot attach this disk. "+ @@ -310,14 +270,14 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin switch action { case alreadyAttached: - if !fromNode { + if ad.repo == nil { logger.V(2).Info("already attached, skipping") return disk.SerialNumber, nil } if disk.SerialNumber != "" { - return ad.dev.WaitRootBlock(ctx, disk.SerialNumber) + return ad.repo.WaitRootBlock(ctx, disk.SerialNumber) } - device, err := ad.devMap.Get(logger, diskID) + device, err := ad.repo.GetAttached(logger, diskID) if err != nil { return "", err } @@ -346,8 +306,8 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin // Step 3: Attach Disk, list device before attach disk var before sets.Set[string] - if fromNode && disk.SerialNumber == "" { - before, err = DefaultDeviceManager.ListBlocks() + if ad.repo != nil && disk.SerialNumber == "" { + before, err = ad.repo.ListBlocks() if err != nil { return "", status.Errorf(codes.Aborted, "AttachDisk: Can't list devices before attach: %v", err) } @@ -391,8 +351,8 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin } // step 5: diff device with previous files under /dev - if fromNode { - device, err := ad.findDevice(ctx, diskID, disk.SerialNumber, before) + if ad.repo != nil { + device, err := ad.repo.findDevice(ctx, diskID, disk.SerialNumber, before) if err != nil { return "", status.Error(codes.Aborted, err.Error()) } diff --git a/pkg/disk/cloud_test.go b/pkg/disk/cloud_test.go index 0231ef227a..b8871fdf4f 100644 --- a/pkg/disk/cloud_test.go +++ b/pkg/disk/cloud_test.go @@ -616,7 +616,6 @@ func testAttachDetach(t *testing.T) (context.Context, *cloud.MockECSInterface, * batcher: b, attachThrottler: defaultThrottler(), detachThrottler: defaultThrottler(), - dev: DefaultDeviceManager, } } @@ -736,7 +735,7 @@ func TestAttachDisk(t *testing.T) { }) c.EXPECT().DescribeDisks(gomock.Any()).Return(diskResp(tc.after), nil).After(attachCall) } - serial, err := ad.attachDisk(ctx, "d-testdiskid", "i-testinstanceid", false) + serial, err := ad.attachDisk(ctx, "d-testdiskid", "i-testinstanceid") assert.Equal(t, tc.forceAttach, force) if tc.expectErr { diff --git a/pkg/disk/constants.go b/pkg/disk/constants.go index 163163a72c..b078e21d4c 100644 --- a/pkg/disk/constants.go +++ b/pkg/disk/constants.go @@ -117,4 +117,9 @@ const ( SNAPSHOT_MIN_RETENTION_DAYS = 1 PUBLISH_CONTEXT_SERIAL = "serialNumber" + + // DiskBdfTagKey disk bdf tag + DiskBdfTagKey = "bdf.csi.aliyun.com" + // DiskBdfCheckTagKey disk bdf check tag + DiskBdfCheckTagKey = "check.bdf.csi.aliyun.com" ) diff --git a/pkg/disk/controllerserver.go b/pkg/disk/controllerserver.go index c83cf0be00..5e4c4a05f7 100644 --- a/pkg/disk/controllerserver.go +++ b/pkg/disk/controllerserver.go @@ -1,5 +1,3 @@ -//go:build !windows - /* Copyright 2019 The Kubernetes Authors. @@ -348,7 +346,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs klog.Infof("ControllerPublishVolume: start attach disk: %s to node: %s", req.VolumeId, req.NodeId) - serial, err := cs.ad.attachDisk(ctx, req.VolumeId, req.NodeId, false) + serial, err := cs.ad.attachDisk(ctx, req.VolumeId, req.NodeId) if err != nil { klog.Errorf("ControllerPublishVolume: attach disk: %s to node: %s with error: %s", req.VolumeId, req.NodeId, err.Error()) return nil, err diff --git a/pkg/disk/csi_agent.go b/pkg/disk/csi_agent.go index 222eaf1ffe..883c751344 100644 --- a/pkg/disk/csi_agent.go +++ b/pkg/disk/csi_agent.go @@ -37,8 +37,10 @@ func NewCSIAgent() *CSIAgent { podCGroup: podCgroup, locks: utils.NewVolumeLocks(), ad: DiskAttachDetach{ - dev: DefaultDeviceManager, - devMap: &devMap{}, // Nobody will add to this map. + repo: &diskRepo{ + dev: DefaultDeviceManager, + devMap: &devMap{}, // Nobody will add to this map. + }, }, }, } diff --git a/pkg/disk/disk.go b/pkg/disk/disk.go index 8ef4c4760a..3be91ce3e8 100644 --- a/pkg/disk/disk.go +++ b/pkg/disk/disk.go @@ -1,5 +1,3 @@ -//go:build !windows - /* Copyright 2019 The Kubernetes Authors. @@ -117,7 +115,6 @@ func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.S if serviceType&utils.Node != 0 { GlobalConfigVar.NodeID = metadata.MustGet(m, metadata.InstanceID) - DefaultDeviceManager.DisableSerial = IsVFNode() } else { GlobalConfigVar.NodeID = "not-retrieved" // make csi-common happy } @@ -142,7 +139,7 @@ func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.S servers.ControllerServer = NewControllerServer(csiCfg, client, m) } if serviceType&utils.Node != 0 { - servers.NodeServer = NewNodeServer(client, m) + servers.NodeServer = NewNodeServer(csiCfg, client, m) } if features.FunctionalMutableFeatureGate.Enabled(features.EnableVolumeGroupSnapshots) { servers.GroupControllerServer = NewGroupControllerServer() @@ -237,7 +234,6 @@ func GlobalConfigSet(m metadata.MetadataProvider, csiCfg utils.Config) { } else { RegionalDiskTopologyKey = v1.LabelTopologyRegion } - DefaultDeviceManager.EnableDiskPartition = csiCfg.GetBool("disk-partition-enable", "DISK_PARTITION_ENABLE", true) klog.Infof("Starting with GlobalConfigVar: ADControllerEnable(%t), DiskTagEnable(%t), DiskBdfEnable(%t), MetricEnable(%t), DetachDisabled(%t), DetachBeforeDelete(%t), ClusterID(%s)", GlobalConfigVar.ADControllerEnable, GlobalConfigVar.DiskTagEnable, diff --git a/pkg/disk/disk_repo_unix.go b/pkg/disk/disk_repo_unix.go new file mode 100644 index 0000000000..5835e5b9a2 --- /dev/null +++ b/pkg/disk/disk_repo_unix.go @@ -0,0 +1,137 @@ +//go:build unix + +package disk + +import ( + "context" + "errors" + "fmt" + "strings" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" +) + +type diskRepo struct { + dev *DeviceManager + devMap *devMap +} + +func (ad *diskRepo) WaitRootBlock(ctx context.Context, serial string) (string, error) { + return ad.dev.WaitRootBlock(ctx, serial) +} + +func (ad *diskRepo) GetAttached(logger klog.Logger, diskID string) (string, error) { + return ad.devMap.Get(logger, diskID) +} + +func (ad *diskRepo) DeleteAttached(diskID string) { + ad.devMap.Delete(diskID) +} + +func (ad *diskRepo) ListBlocks() (sets.Set[string], error) { + return ad.dev.ListBlocks() +} + +// GetRootBlockDevice get device name +func (ad *diskRepo) GetRootBlockDevice(logger klog.Logger, diskID string) (string, error) { + device, err := ad.dev.GetRootBlockBySerial(strings.TrimPrefix(diskID, "d-")) + if err == nil { + return device, nil + } + device, err2 := ad.devMap.Get(logger, diskID) + if device == "" { + return "", errors.Join(err, err2) // err2 may be nil, which is OK + } + klog.Infof("GetRootBlockDevice: got disk %s device name %s from devMap", diskID, device) + return device, nil +} + +func (ad *diskRepo) GetVolumeDeviceName(logger klog.Logger, diskID string) (string, error) { + root, err := ad.GetRootBlockDevice(logger, diskID) + if err != nil { + return "", err + } + return ad.dev.adaptDevicePartition(root) +} + +func (ad *diskRepo) possibleDisks(before sets.Set[string]) ([]string, error) { + after, err := ad.dev.ListBlocks() + if err != nil { + return nil, fmt.Errorf("cannot list devices after attach: %w", err) + } + + var disks []string + for d := range after.Difference(before) { + serial, err := ad.dev.GetDeviceSerial(d) + if err != nil { + return nil, fmt.Errorf("get device serial for disk %s failed: %w", d, err) + } + if serial == "" { + disks = append(disks, "/dev/"+d) + } + } + return disks, nil +} + +func (ad *diskRepo) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) { + logger := klog.FromContext(ctx) + var bdf, device string + var err error + for { + if serial != "" { + device, err = ad.dev.WaitRootBlock(ctx, serial) + if err == nil { + logger.V(2).Info("found disk by serial", "serial", serial, "device", device) + break + } + err = fmt.Errorf("disk attached but not found by serial %s: %w", serial, err) + } else if before != nil { + var disks []string + disks, err = ad.possibleDisks(before) + if err != nil { + return "", fmt.Errorf("failed to find disk without serial: %v", err) + } + if len(disks) == 1 { + device = disks[0] + err := ad.devMap.Add(diskID, device) + if err != nil { + return "", fmt.Errorf("failed to populate devMap: %v", err) + } + logger.V(2).Info("found device by diff", "device", device) + break + } else { + // device count is not expected, should retry (later by detaching and attaching again) + err = fmt.Errorf("disk attached, but got %d devices, will retry later", len(disks)) + } + } + + if !IsVFNode() { + return "", err + } + if bdf != "" { + // second attempt after bindBdfDisk + var errBDF error + device, errBDF = GetDeviceByBdf(bdf, true) + if errBDF != nil { + return "", fmt.Errorf("%v. failed to find by BDF: %v", err, errBDF) + } + logger.V(2).Info("found device by BDF", "BDF", bdf, "device", device) + break + } + // On VF node, try bind driver + bdf, err = bindBdfDisk(diskID) + if err != nil { + if err := unbindBdfDisk(diskID); err != nil { + return "", fmt.Errorf("NodeStageVolume: failed to detach bdf: %v", err) + } + return "", fmt.Errorf("NodeStageVolume: failed to attach bdf: %v", err) + } + if bdf == "" { + // avoid infinite loop + return "", fmt.Errorf("BDF not found") + } + // continue and retry finding device + } + return device, nil +} diff --git a/pkg/disk/disk_repo_windows.go b/pkg/disk/disk_repo_windows.go new file mode 100644 index 0000000000..eeb95b41a0 --- /dev/null +++ b/pkg/disk/disk_repo_windows.go @@ -0,0 +1,30 @@ +package disk + +import ( + "context" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" +) + +type diskRepo struct{} + +func (ad *diskRepo) WaitRootBlock(ctx context.Context, serial string) (string, error) { + panic("not implemented") +} + +func (ad *diskRepo) GetAttached(logger klog.Logger, diskID string) (string, error) { + panic("not implemented") +} + +func (ad *diskRepo) DeleteAttached(diskID string) { + panic("not implemented") +} + +func (ad *diskRepo) ListBlocks() (sets.Set[string], error) { + panic("not implemented") +} + +func (ad *diskRepo) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) { + panic("not implemented") +} diff --git a/pkg/disk/disk_windows.go b/pkg/disk/disk_windows.go index a02bbed0b7..5244a445ad 100644 --- a/pkg/disk/disk_windows.go +++ b/pkg/disk/disk_windows.go @@ -1,20 +1,6 @@ package disk -import ( - "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata" - "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" -) - -type DISK struct{} - -func NewDriver(m metadata.MetadataProvider, endpoint string, serviceType utils.ServiceType, csiCfg utils.Config) *DISK { - panic("Disk driver is not supported on Windows yet") -} - -func (d *DISK) Run() { - panic("Disk driver is not supported on Windows yet") -} +import "github.com/container-storage-interface/spec/lib/go/csi" type CSIAgent struct { csi.UnimplementedNodeServer diff --git a/pkg/disk/group_volume_snapshot_utils.go b/pkg/disk/group_volume_snapshot_utils.go index 3691861750..cbfc8ff39b 100644 --- a/pkg/disk/group_volume_snapshot_utils.go +++ b/pkg/disk/group_volume_snapshot_utils.go @@ -1,5 +1,3 @@ -//go:build !windows - package disk import ( diff --git a/pkg/disk/group_volume_snapshot_utils_test.go b/pkg/disk/group_volume_snapshot_utils_test.go index 6aea283eaa..ff9603b83c 100644 --- a/pkg/disk/group_volume_snapshot_utils_test.go +++ b/pkg/disk/group_volume_snapshot_utils_test.go @@ -1,5 +1,3 @@ -//go:build !windows - package disk import ( diff --git a/pkg/disk/groupcontrollerserver.go b/pkg/disk/groupcontrollerserver.go index 90e716c5f4..cfc0f8f765 100644 --- a/pkg/disk/groupcontrollerserver.go +++ b/pkg/disk/groupcontrollerserver.go @@ -1,5 +1,3 @@ -//go:build !windows - package disk import ( diff --git a/pkg/disk/identityserver.go b/pkg/disk/identityserver.go index f5915f2eb0..64f2dbfd0e 100644 --- a/pkg/disk/identityserver.go +++ b/pkg/disk/identityserver.go @@ -1,5 +1,3 @@ -//go:build !windows - /* Copyright 2019 The Kubernetes Authors. diff --git a/pkg/disk/nodeserver.go b/pkg/disk/nodeserver.go index befd3b1a46..43a9531c90 100644 --- a/pkg/disk/nodeserver.go +++ b/pkg/disk/nodeserver.go @@ -69,14 +69,6 @@ type nodeServer struct { common.GenericNodeServer } -// Disk status returned in ecs.DescribeDisks -const ( - DiskStatusInuse = "In_use" - DiskStatusAttaching = "Attaching" - DiskStatusDetaching = "Detaching" - DiskStatusAvailable = "Available" -) - const ( // DiskStatusAttached disk attached status DiskStatusAttached = "attached" @@ -88,16 +80,8 @@ const ( OmitFilesystemCheck = "omitfsck" // MkfsOptions tag MkfsOptions = "mkfsOptions" - // DiskAttachedKey attached key - DiskAttachedKey = "k8s.aliyun.com" - // DiskAttachedValue attached value - DiskAttachedValue = "true" // RundSocketDir dir RundSocketDir = "/host/etc/kubernetes/volumes/rund/" - // CreateDiskARN ARN parameter of the CreateDisk interface - CreateDiskARN = "alibabacloud.com/createdisk-arn" - // PVC annotation key of KMS key ID, override the storage class parameter kmsKeyId - KMSKeyID = "alibabacloud.com/kms-key-id" // DefaultMaxVolumesPerNode define default max ebs one node DefaultMaxVolumesPerNode = 15 // NOUUID is xfs fs mount opts @@ -162,7 +146,10 @@ func parseVolumeCountEnv() (int, error) { } // NewNodeServer creates node server -func NewNodeServer(ecs cloud.ECSInterface, m metadata.MetadataProvider) csi.NodeServer { +func NewNodeServer(csiCfg utils.Config, ecs cloud.ECSInterface, m metadata.MetadataProvider) csi.NodeServer { + DefaultDeviceManager.DisableSerial = IsVFNode() + DefaultDeviceManager.EnableDiskPartition = csiCfg.GetBool("disk-partition-enable", "DISK_PARTITION_ENABLE", true) + // Create Directory err := os.MkdirAll(RundSocketDir, os.FileMode(0755)) if err != nil { @@ -222,8 +209,10 @@ func NewNodeServer(ecs cloud.ECSInterface, m metadata.MetadataProvider) csi.Node attachThrottler: defaultThrottler(), detachThrottler: defaultThrottler(), - dev: DefaultDeviceManager, - devMap: devMap, + repo: &diskRepo{ + dev: DefaultDeviceManager, + devMap: devMap, + }, }, locks: utils.NewVolumeLocks(), GenericNodeServer: common.GenericNodeServer{ @@ -378,7 +367,7 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } // check device name available - expectName, err := ns.ad.GetVolumeDeviceName(logger, req.VolumeId) + expectName, err := ns.ad.repo.GetVolumeDeviceName(logger, req.VolumeId) if err != nil { return nil, status.Errorf(codes.Internal, "get device name: %v", err) } @@ -554,7 +543,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol // for capability with old controller serial = strings.TrimPrefix(req.VolumeId, "d-") } - device, err = ns.ad.findDevice(ctx, req.VolumeId, serial, nil) + device, err = ns.ad.repo.findDevice(ctx, req.VolumeId, serial, nil) if err != nil { if GlobalConfigVar.ADControllerEnable || isMultiAttach { return nil, status.Errorf(defaultErrCode, "ADController Enabled, but disk can't be found: %v", err) @@ -564,7 +553,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } } if device == "" { - device, err = ns.ad.attachDisk(ctx, req.GetVolumeId(), ns.NodeID, true) + device, err = ns.ad.attachDisk(ctx, req.GetVolumeId(), ns.NodeID) if err != nil { fullErrorMessage := utils.FindSuggestionByErrorMessage(err.Error(), utils.DiskAttachDetach) logger.Error(err, "Attach volume failed", "suggestion", fullErrorMessage) @@ -749,7 +738,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag } // All device related errors are not fatal, just log it - device, err := ns.ad.dev.GetRootBlockBySerial(strings.TrimPrefix(req.VolumeId, "d-")) + device, err := ns.ad.repo.dev.GetRootBlockBySerial(strings.TrimPrefix(req.VolumeId, "d-")) if err != nil { if errors.Is(err, os.ErrNotExist) { // devices without serial should already have xattr set on NodeStageVolume @@ -784,7 +773,7 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag logger.Error(err, "Detach failed") return nil, err } - ns.ad.devMap.Delete(req.VolumeId) + ns.ad.repo.DeleteAttached(req.VolumeId) return &csi.NodeUnstageVolumeResponse{}, nil } @@ -920,7 +909,7 @@ func (ns *nodeServer) localExpandVolume(ctx context.Context, req *csi.NodeExpand diskID := req.GetVolumeId() logger := klog.FromContext(ctx) - devicePath, err := ns.ad.GetVolumeDeviceName(logger, diskID) + devicePath, err := ns.ad.repo.GetVolumeDeviceName(logger, diskID) if err != nil { if errors.Is(err, os.ErrNotExist) { return nil, status.Errorf(codes.NotFound, "can't get devicePath for: %s", diskID) @@ -1209,7 +1198,7 @@ func (ns *nodeServer) mountRunvVolumes(logger klog.Logger, volumeId, sourcePath, if err := ns.unmountStageTarget(logger, sourcePath); err != nil { return status.Errorf(codes.InvalidArgument, "runv: unmountStageTarget %s: %v", sourcePath, err) } - deviceName, err := ns.ad.GetRootBlockDevice(logger, volumeId) + deviceName, err := ns.ad.repo.GetRootBlockDevice(logger, volumeId) if err != nil { return status.Errorf(codes.InvalidArgument, "runv: cannot get local deviceName: %v", err) } @@ -1245,7 +1234,7 @@ func (ns *nodeServer) mountRunvVolumes(logger klog.Logger, volumeId, sourcePath, func (ns *nodeServer) mountRunDVolumes(logger klog.Logger, volumeId, pvName, sourcePath, targetPath, fsType, mkfsOptions string, isRawBlock, pvmMode bool, mountFlags []string) (bool, error) { logger.V(2).Info("Mount in RunD csi 3.0/2.0 protocol") - deviceName, err := ns.ad.GetRootBlockDevice(logger, volumeId) + deviceName, err := ns.ad.repo.GetRootBlockDevice(logger, volumeId) if err != nil { logger.V(1).Info("RunD volume device not found", "err", err) // maybe OK, we can find the device by xdragon-bdf below. @@ -1426,7 +1415,7 @@ func (ns *nodeServer) checkMountedOfRunvAndRund(logger klog.Logger, volumeId, ta } } - device, err := ns.ad.GetRootBlockDevice(logger, volumeId) + device, err := ns.ad.repo.GetRootBlockDevice(logger, volumeId) if err != nil { // In VFIO mode, an empty device is an expected condition, so the resulting error should be ignored. logger.V(1).Info("GetVolumeDeviceName failed", "error", err) diff --git a/pkg/disk/nodeserver_windows.go b/pkg/disk/nodeserver_windows.go new file mode 100644 index 0000000000..3fffa50e79 --- /dev/null +++ b/pkg/disk/nodeserver_windows.go @@ -0,0 +1,12 @@ +package disk + +import ( + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" +) + +func NewNodeServer(csiCfg utils.Config, ecs cloud.ECSInterface, m metadata.MetadataProvider) csi.NodeServer { + panic("disk driver is not supported on Windows") +} diff --git a/pkg/disk/utils.go b/pkg/disk/utils.go index 8eb1b3269e..0d4f8c579b 100644 --- a/pkg/disk/utils.go +++ b/pkg/disk/utils.go @@ -1,5 +1,3 @@ -//go:build !windows - /* Copyright 2019 The Kubernetes Authors. @@ -45,25 +43,20 @@ import ( "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" alicred_old "github.com/aliyun/credentials-go/credentials" "github.com/container-storage-interface/spec/lib/go/csi" - "github.com/containerd/ttrpc" volumeSnapshotV1 "github.com/kubernetes-csi/external-snapshotter/client/v8/apis/volumesnapshot/v1" snapClientset "github.com/kubernetes-csi/external-snapshotter/client/v8/clientset/versioned" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/common" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/credentials" - proto "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/proto" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" utilshttp "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/http" - utilsio "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/io" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/version" - perrors "github.com/pkg/errors" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" - k8smount "k8s.io/mount-utils" ) var ( @@ -75,6 +68,10 @@ var ( const ( DISK_TAG_PREFIX = "diskTags/" instanceTypeInfoAnnotation = "alibabacloud.com/instance-type-info" + // PVC annotation key of KMS key ID, override the storage class parameter kmsKeyId + KMSKeyID = "alibabacloud.com/kms-key-id" + // CreateDiskARN ARN parameter of the CreateDisk interface + CreateDiskARN = "alibabacloud.com/createdisk-arn" ) // LingjunConfigFile is used to detect Lingjun node on the node side @@ -722,71 +719,6 @@ func validateCapabilities(capabilities []*csi.VolumeCapability) (bool, error) { return multiAttachRequired, nil } -func getMountedVolumeDevice(mnts []k8smount.MountInfo, targetPath string) string { - for _, mnt := range mnts { - if mnt.MountPoint == targetPath { - return mnt.Root - } - } - return "" -} - -func isDeviceMountedAt(mnts []k8smount.MountInfo, device, targetPath string) bool { - for _, mnt := range mnts { - if mnt.MountPoint == targetPath && mnt.Source == device { - return true - } - } - return false -} - -const mountInfoPath = "/proc/self/mountinfo" - -func CheckDeviceAvailable(devicePath, volumeID, targetPath string) error { - return checkDeviceAvailable(mountInfoPath, devicePath, volumeID, targetPath) -} - -func checkDeviceAvailable(mountinfoPath, devicePath, volumeID, targetPath string) error { - if devicePath == "" { - return fmt.Errorf("devicePath is empty, cannot used for Volume") - } - - mnts, err := k8smount.ParseMountInfo(mountinfoPath) - if err != nil { - return err - } - - // TODO: remove in next version - // Since devicePath has been change to symlink, we will never run into the following logics. - // block volume - if devicePath == "devtmpfs" { - device := getMountedVolumeDevice(mnts, targetPath) - newVolumeID, err := GetVolumeIDByDevice(device) - if err != nil { - return nil - } - if newVolumeID != volumeID { - return fmt.Errorf("device [%s] associate with volumeID: [%s] rather than volumeID: [%s]", device, newVolumeID, volumeID) - } - - return nil - } - - if !utils.IsFileExisting(devicePath) { - return fmt.Errorf("devicePath(%s) is empty, cannot used for Volume", devicePath) - } - - // check the device is used for system - if devicePath == "/dev/vda" || devicePath == "/dev/vda1" { - klog.Warningf("checkDeviceAvailable: devicePath(%s) may be system device: %s", devicePath, volumeID) - } - - if isDeviceMountedAt(mnts, devicePath, utils.KubeletRootDir) { - return fmt.Errorf("devicePath(%s) is used as DataDisk for kubelet, cannot used for Volume", devicePath) - } - return nil -} - func getBlockDeviceCapacity(devicePath string) int64 { file, err := os.Open(devicePath) @@ -1166,117 +1098,6 @@ func getNodeTypeFromEFLOOpenAPI(efloClient cloud.EFLOInterface, lingjunID string return *resp.Body.NodeType, nil } -func getVolumeCountFromOpenAPI(getNode func() (*v1.Node, error), c cloud.ECSInterface, efloC cloud.EFLOInterface, m metadata.MetadataProvider, dev utilsio.DiskLister, lingjunID string) (int, error) { - // An attached disk is not managed by us if: - // 1. it is not in node.Status.VolumesInUse or node.Status.VolumesAttached; and - // 2. it does not have the xattr set. - // 1 may fail because the info in node.Status is removed before ControllerUnpublishVolume. - // 2 may fail because the disk may be just attached and not have the xattr set yet. - // Combine 1 and 2 to get the accurate "not managed" disk list. - // We should exclude these disks from available count. - // e.g. static/dynamic PVs are managed, OS disk or manually attached disks are not managed. - - disks, err := listDiskXattrs(dev) - if err != nil { - return 0, fmt.Errorf("failed to list devices: %w", err) - } - managedDisks := sets.KeySet(disks) - - // To ensure all the managed attachedDisks also present in managedDisks, - // ECS OpenAPI should goes after ListDisks because the just detached disk should - // disappear from ListDisks after OpenAPI; - // ECS OpenAPI should goes before getNode because the just attached disk should - // appear in node before OpenAPI; - attachedDisks, err := getAttachedCloudDisks(c, m) - if err != nil { - return 0, err - } - klog.Infof("getVolumeCount: found %d attached disks", len(attachedDisks)) - - node, err := getNode() - if err != nil { - return 0, err - } - - availableCount, err := getAvailableDiskCount(node, c, efloC, m, lingjunID) - if err != nil { - return 0, err - } - - prefix := fmt.Sprintf("kubernetes.io/csi/%s^", driverName) - getDiskId := func(n v1.UniqueVolumeName) string { - if strings.HasPrefix(string(n), prefix) { - return string(n[len(prefix):]) - } - return "" - } - - for _, volume := range node.Status.VolumesInUse { - if disk := getDiskId(volume); disk != "" { - managedDisks.Insert(disk) - } - } - for _, volume := range node.Status.VolumesAttached { - if disk := getDiskId(volume.Name); disk != "" { - managedDisks.Insert(disk) - } - } - - for _, disk := range attachedDisks { - if !managedDisks.Has(disk) { - klog.Infof("getVolumeCount: disk %s is not managed by us", disk) - availableCount-- - } - } - - return availableCount, nil -} - -// checkRundVolumeExpand -func checkRundVolumeExpand(req *csi.NodeExpandVolumeRequest) (bool, error) { - klog.Infof("checkRundVolumeExpand: volumePath: %s", req.VolumePath) - pvName := utils.GetPvNameFormPodMnt(req.VolumePath) - if pvName == "" { - klog.Errorf("checkRundVolumeExpand: cannot get pvname from volumePath %s", req.VolumePath) - return false, perrors.Errorf("cannot get pvname from volumePath %s for volume %s", req.VolumePath, req.VolumeId) - } - var grpcVolume string - socketFile := filepath.Join(RundSocketDir, pvName) - if utils.IsFileExisting(socketFile) { - grpcVolume = pvName - } else { - socketFile = filepath.Join(RundSocketDir, req.VolumeId) - if !utils.IsFileExisting(socketFile) { - klog.Infof("checkRundVolumeExpand: socketfile: %s not exists, trying runc expanding", socketFile) - return false, nil - } - grpcVolume = req.VolumeId - } - klog.Infof("checkRundVolumeExpand: rund socket dir: %s exists", socketFile) - - // connect to rund server with timeout - clientConn, err := net.DialTimeout("unix", socketFile, 1*time.Second) - if err != nil { - klog.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error: %s", req.VolumeId, req.VolumePath, err.Error()) - return true, perrors.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error: %s", req.VolumeId, req.VolumePath, err.Error()) - } - defer clientConn.Close() - - // send volume spec to rund to expand volume fs - volumeSize := strconv.FormatInt(req.GetCapacityRange().GetRequiredBytes(), 10) - client := proto.NewExtendedStatusClient(ttrpc.NewClient(clientConn)) - resp, err := client.ExpandVolume(context.Background(), &proto.ExpandVolumeRequest{ - Volume: grpcVolume, - }) - if err != nil { - klog.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error response: %s", req.VolumeId, req.VolumePath, err.Error()) - return true, perrors.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error response: %s", req.VolumeId, req.VolumePath, err.Error()) - } - - klog.Infof("RundVolumeExpand: Expand VolumeFS(%s) to(%s) successful with response: %s", pvName, volumeSize, resp) - return true, nil -} - func checkOption(opt string) bool { switch opt { case "enable", "true", "yes": diff --git a/pkg/disk/utils_test.go b/pkg/disk/utils_test.go index 6dbd9c15b0..9670d2fd54 100644 --- a/pkg/disk/utils_test.go +++ b/pkg/disk/utils_test.go @@ -1,5 +1,3 @@ -//go:build !windows - /* Copyright 2019 The Kubernetes Authors. @@ -22,7 +20,6 @@ import ( "context" "errors" "os" - "path" "path/filepath" "testing" @@ -34,14 +31,12 @@ import ( "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata" "github.com/stretchr/testify/assert" - "golang.org/x/sys/unix" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" "k8s.io/klog/v2/ktesting" - "k8s.io/mount-utils" "k8s.io/utils/ptr" ) @@ -260,24 +255,6 @@ const DescribeDisksResponse = `{ } }` -type MockDisks struct { - disks []string - base string -} - -func (m *MockDisks) ListDisks() ([]string, error) { - return m.disks, nil -} - -func (m *MockDisks) AddDisk(t testing.TB, path string, diskID []byte) { - p := filepath.Join(m.base, path) - m.disks = append(m.disks, p) - assert.NoError(t, os.WriteFile(p, []byte{}, 0644)) - if diskID != nil { - assert.NoError(t, unix.Setxattr(p, DiskXattrName, diskID, 0)) - } -} - func TestGetAvailableDiskCount(t *testing.T) { resp := ecs.CreateDescribeInstanceTypesResponse() cloud.UnmarshalAcsResponse([]byte(DescribeInstanceTypesResponse), resp) @@ -476,39 +453,6 @@ func TestGetAvailableDiskCountFromOpenAPI(t *testing.T) { } } -const longDiskID = "d-some-very-looooooooooooooooog-value-that-cause-getxattr-to-fail" - -func TestGetVolumeCountFromOpenAPI(t *testing.T) { - testDiskXattr(t) - - ctrl := gomock.NewController(t) - c := cloud.NewMockECSInterface(ctrl) - efloc := cloud.NewMockEFLOInterface(ctrl) - - describeDisksResponse := ecs.CreateDescribeDisksResponse() - cloud.UnmarshalAcsResponse([]byte(DescribeDisksResponse), describeDisksResponse) - c.EXPECT().DescribeDisks(gomock.Any()).Return(describeDisksResponse, nil) - - describeInstanceTypesResponse := ecs.CreateDescribeInstanceTypesResponse() - cloud.UnmarshalAcsResponse([]byte(DescribeInstanceTypesResponse), describeInstanceTypesResponse) - c.EXPECT().DescribeInstanceTypes(gomock.Any()).Return(describeInstanceTypesResponse, nil) - - dev := MockDisks{base: t.TempDir() + "/dev"} - assert.NoError(t, os.MkdirAll(dev.base, 0755)) - - // add xattr to CSI attached disk - dev.AddDisk(t, "node-for-testingdetachingdisk", []byte("d-testingdetachingdisk")) - // manually attached disk has no xattr - dev.AddDisk(t, "node-for-2zeh74nnxxrobxz49eug", nil) - // an arbitrary error for getxattr, we should ignore it - dev.AddDisk(t, "node-for-testinglocaldisk", []byte(longDiskID)) - - getNode := func() (*corev1.Node, error) { return testNode(), nil } - count, err := getVolumeCountFromOpenAPI(getNode, c, efloc, testMetadata, &dev, "") - assert.NoError(t, err) - assert.Equal(t, 7, count) // 7 = 9 available disks - 1 system disk (d-2ze49fivxwkwxl36o1d3) - 1 manually attached (d-2zeh74nnxxrobxz49eug) -} - func TestGetAvailableDiskTypes(t *testing.T) { descJson := `{ "RequestId": "6ECCECF5-945D-58FB-9BA9-312DBEE3F611", @@ -686,86 +630,6 @@ func TestPatchForNodeExisting(t *testing.T) { assert.Nil(t, patch) } -func writeMountinfo(t *testing.T, mountInfo string) string { - mountInfoPath := path.Join(os.TempDir(), "mountinfo") - err := os.WriteFile(mountInfoPath, []byte(mountInfo), 0o644) - assert.NoError(t, err) - return mountInfoPath -} - -func parseMountinfo(t *testing.T, mountInfo string) []mount.MountInfo { - mountInfoPath := writeMountinfo(t, mountInfo) - mnts, err := mount.ParseMountInfo(mountInfoPath) - assert.NoError(t, err) - return mnts -} - -func TestGetMountedVolumeDevice(t *testing.T) { - cases := []struct { - name string - mountInfo []mount.MountInfo - device string - }{ - { - name: "mounted", - mountInfo: parseMountinfo(t, "707 97 0:5 /vdc /path/to/volumeDevice rw,nosuid shared:21 - devtmpfs devtmpfs rw,size=7901960k,nr_inodes=1975490,mode=755"), - device: "/vdc", - }, - { - name: "not mounted", - mountInfo: parseMountinfo(t, "707 97 0:5 /vdc /path/to/another_volumeDevice rw,nosuid shared:21 - devtmpfs devtmpfs rw,size=7901960k,nr_inodes=1975490,mode=755"), - device: "", - }, - } - - for _, test := range cases { - t.Run(test.name, func(t *testing.T) { - t.Parallel() - device := getMountedVolumeDevice(test.mountInfo, "/path/to/volumeDevice") - assert.Equal(t, test.device, device) - }) - } -} - -func TestIsDeviceMountedAt(t *testing.T) { - cases := []struct { - name string - mountInfo []mount.MountInfo - mounted bool - }{ - { - name: "mounted", - mountInfo: parseMountinfo(t, "291 97 253:16 / /path/to/mountpoint rw,relatime shared:160 - ext4 /dev/vdb rw"), - mounted: true, - }, - { - name: "wrong device", - mountInfo: parseMountinfo(t, "291 97 253:16 / /path/to/mountpoint rw,relatime shared:160 - ext4 /dev/vdc rw"), - mounted: false, - }, - { - name: "wrong path", - mountInfo: parseMountinfo(t, "291 97 253:16 / /path/to/another/mountpoint rw,relatime shared:160 - ext4 /dev/vdb rw"), - mounted: false, - }, - } - - for _, test := range cases { - t.Run(test.name, func(t *testing.T) { - t.Parallel() - mounted := isDeviceMountedAt(test.mountInfo, "/dev/vdb", "/path/to/mountpoint") - assert.Equal(t, test.mounted, mounted) - }) - } -} - -func TestCheckDeviceAvailableError(t *testing.T) { - err := checkDeviceAvailable("/not/exist", "/dev/vdc", "d-2zedmdfyiz2num45yx60", "/path/to/mountpoint") - if !errors.Is(err, os.ErrNotExist) { - t.Errorf("expected os.ErrNotExist, got %v", err) - } -} - func TestDiskSize(t *testing.T) { size := DiskSize{22014345216} assert.Equal(t, "20.502 GiB (0x520284000)", size.String()) diff --git a/pkg/disk/utils_unix.go b/pkg/disk/utils_unix.go new file mode 100644 index 0000000000..81399cfb42 --- /dev/null +++ b/pkg/disk/utils_unix.go @@ -0,0 +1,206 @@ +//go:build !windows + +package disk + +import ( + "context" + "fmt" + "net" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/containerd/ttrpc" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud/metadata" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/proto" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils" + utilsio "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/io" + perrors "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + k8smount "k8s.io/mount-utils" +) + +func getVolumeCountFromOpenAPI(getNode func() (*v1.Node, error), c cloud.ECSInterface, efloC cloud.EFLOInterface, m metadata.MetadataProvider, dev utilsio.DiskLister, lingjunID string) (int, error) { + // An attached disk is not managed by us if: + // 1. it is not in node.Status.VolumesInUse or node.Status.VolumesAttached; and + // 2. it does not have the xattr set. + // 1 may fail because the info in node.Status is removed before ControllerUnpublishVolume. + // 2 may fail because the disk may be just attached and not have the xattr set yet. + // Combine 1 and 2 to get the accurate "not managed" disk list. + // We should exclude these disks from available count. + // e.g. static/dynamic PVs are managed, OS disk or manually attached disks are not managed. + + disks, err := listDiskXattrs(dev) + if err != nil { + return 0, fmt.Errorf("failed to list devices: %w", err) + } + managedDisks := sets.KeySet(disks) + + // To ensure all the managed attachedDisks also present in managedDisks, + // ECS OpenAPI should goes after ListDisks because the just detached disk should + // disappear from ListDisks after OpenAPI; + // ECS OpenAPI should goes before getNode because the just attached disk should + // appear in node before OpenAPI; + attachedDisks, err := getAttachedCloudDisks(c, m) + if err != nil { + return 0, err + } + klog.Infof("getVolumeCount: found %d attached disks", len(attachedDisks)) + + node, err := getNode() + if err != nil { + return 0, err + } + + availableCount, err := getAvailableDiskCount(node, c, efloC, m, lingjunID) + if err != nil { + return 0, err + } + + prefix := fmt.Sprintf("kubernetes.io/csi/%s^", driverName) + getDiskId := func(n v1.UniqueVolumeName) string { + if strings.HasPrefix(string(n), prefix) { + return string(n[len(prefix):]) + } + return "" + } + + for _, volume := range node.Status.VolumesInUse { + if disk := getDiskId(volume); disk != "" { + managedDisks.Insert(disk) + } + } + for _, volume := range node.Status.VolumesAttached { + if disk := getDiskId(volume.Name); disk != "" { + managedDisks.Insert(disk) + } + } + + for _, disk := range attachedDisks { + if !managedDisks.Has(disk) { + klog.Infof("getVolumeCount: disk %s is not managed by us", disk) + availableCount-- + } + } + + return availableCount, nil +} + +// checkRundVolumeExpand +func checkRundVolumeExpand(req *csi.NodeExpandVolumeRequest) (bool, error) { + klog.Infof("checkRundVolumeExpand: volumePath: %s", req.VolumePath) + pvName := utils.GetPvNameFormPodMnt(req.VolumePath) + if pvName == "" { + klog.Errorf("checkRundVolumeExpand: cannot get pvname from volumePath %s", req.VolumePath) + return false, perrors.Errorf("cannot get pvname from volumePath %s for volume %s", req.VolumePath, req.VolumeId) + } + var grpcVolume string + socketFile := filepath.Join(RundSocketDir, pvName) + if utils.IsFileExisting(socketFile) { + grpcVolume = pvName + } else { + socketFile = filepath.Join(RundSocketDir, req.VolumeId) + if !utils.IsFileExisting(socketFile) { + klog.Infof("checkRundVolumeExpand: socketfile: %s not exists, trying runc expanding", socketFile) + return false, nil + } + grpcVolume = req.VolumeId + } + klog.Infof("checkRundVolumeExpand: rund socket dir: %s exists", socketFile) + + // connect to rund server with timeout + clientConn, err := net.DialTimeout("unix", socketFile, 1*time.Second) + if err != nil { + klog.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error: %s", req.VolumeId, req.VolumePath, err.Error()) + return true, perrors.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error: %s", req.VolumeId, req.VolumePath, err.Error()) + } + defer func() { + if err := clientConn.Close(); err != nil { + klog.ErrorS(err, "close rund socket") + } + }() + + // send volume spec to rund to expand volume fs + volumeSize := strconv.FormatInt(req.GetCapacityRange().GetRequiredBytes(), 10) + client := proto.NewExtendedStatusClient(ttrpc.NewClient(clientConn)) + resp, err := client.ExpandVolume(context.Background(), &proto.ExpandVolumeRequest{ + Volume: grpcVolume, + }) + if err != nil { + klog.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error response: %s", req.VolumeId, req.VolumePath, err.Error()) + return true, perrors.Errorf("checkRundExpand: volume %s, volumepath %s, connect to rund server with error response: %s", req.VolumeId, req.VolumePath, err.Error()) + } + + klog.Infof("RundVolumeExpand: Expand VolumeFS(%s) to(%s) successful with response: %s", pvName, volumeSize, resp) + return true, nil +} + +func getMountedVolumeDevice(mnts []k8smount.MountInfo, targetPath string) string { + for _, mnt := range mnts { + if mnt.MountPoint == targetPath { + return mnt.Root + } + } + return "" +} + +func isDeviceMountedAt(mnts []k8smount.MountInfo, device, targetPath string) bool { + for _, mnt := range mnts { + if mnt.MountPoint == targetPath && mnt.Source == device { + return true + } + } + return false +} + +const mountInfoPath = "/proc/self/mountinfo" + +func CheckDeviceAvailable(devicePath, volumeID, targetPath string) error { + return checkDeviceAvailable(mountInfoPath, devicePath, volumeID, targetPath) +} + +func checkDeviceAvailable(mountinfoPath, devicePath, volumeID, targetPath string) error { + if devicePath == "" { + return fmt.Errorf("devicePath is empty, cannot used for Volume") + } + + mnts, err := k8smount.ParseMountInfo(mountinfoPath) + if err != nil { + return err + } + + // TODO: remove in next version + // Since devicePath has been change to symlink, we will never run into the following logics. + // block volume + if devicePath == "devtmpfs" { + device := getMountedVolumeDevice(mnts, targetPath) + newVolumeID, err := GetVolumeIDByDevice(device) + if err != nil { + return nil + } + if newVolumeID != volumeID { + return fmt.Errorf("device [%s] associate with volumeID: [%s] rather than volumeID: [%s]", device, newVolumeID, volumeID) + } + + return nil + } + + if !utils.IsFileExisting(devicePath) { + return fmt.Errorf("devicePath(%s) is empty, cannot used for Volume", devicePath) + } + + // check the device is used for system + if devicePath == "/dev/vda" || devicePath == "/dev/vda1" { + klog.Warningf("checkDeviceAvailable: devicePath(%s) may be system device: %s", devicePath, volumeID) + } + + if isDeviceMountedAt(mnts, devicePath, utils.KubeletRootDir) { + return fmt.Errorf("devicePath(%s) is used as DataDisk for kubelet, cannot used for Volume", devicePath) + } + return nil +} diff --git a/pkg/disk/utils_unix_test.go b/pkg/disk/utils_unix_test.go new file mode 100644 index 0000000000..c8c0fb8961 --- /dev/null +++ b/pkg/disk/utils_unix_test.go @@ -0,0 +1,150 @@ +//go:build unix + +package disk + +import ( + "errors" + "os" + "path" + "path/filepath" + "testing" + + "github.com/aliyun/alibaba-cloud-sdk-go/services/ecs" + gomock "github.com/golang/mock/gomock" + "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud" + "github.com/stretchr/testify/assert" + "golang.org/x/sys/unix" + corev1 "k8s.io/api/core/v1" + "k8s.io/mount-utils" +) + +type MockDisks struct { + disks []string + base string +} + +func (m *MockDisks) ListDisks() ([]string, error) { + return m.disks, nil +} + +func (m *MockDisks) AddDisk(t testing.TB, path string, diskID []byte) { + p := filepath.Join(m.base, path) + m.disks = append(m.disks, p) + assert.NoError(t, os.WriteFile(p, []byte{}, 0644)) + if diskID != nil { + assert.NoError(t, unix.Setxattr(p, DiskXattrName, diskID, 0)) + } +} + +const longDiskID = "d-some-very-looooooooooooooooog-value-that-cause-getxattr-to-fail" + +func TestGetVolumeCountFromOpenAPI(t *testing.T) { + testDiskXattr(t) + + ctrl := gomock.NewController(t) + c := cloud.NewMockECSInterface(ctrl) + efloc := cloud.NewMockEFLOInterface(ctrl) + + describeDisksResponse := ecs.CreateDescribeDisksResponse() + cloud.UnmarshalAcsResponse([]byte(DescribeDisksResponse), describeDisksResponse) + c.EXPECT().DescribeDisks(gomock.Any()).Return(describeDisksResponse, nil) + + describeInstanceTypesResponse := ecs.CreateDescribeInstanceTypesResponse() + cloud.UnmarshalAcsResponse([]byte(DescribeInstanceTypesResponse), describeInstanceTypesResponse) + c.EXPECT().DescribeInstanceTypes(gomock.Any()).Return(describeInstanceTypesResponse, nil) + + dev := MockDisks{base: t.TempDir() + "/dev"} + assert.NoError(t, os.MkdirAll(dev.base, 0755)) + + // add xattr to CSI attached disk + dev.AddDisk(t, "node-for-testingdetachingdisk", []byte("d-testingdetachingdisk")) + // manually attached disk has no xattr + dev.AddDisk(t, "node-for-2zeh74nnxxrobxz49eug", nil) + // an arbitrary error for getxattr, we should ignore it + dev.AddDisk(t, "node-for-testinglocaldisk", []byte(longDiskID)) + + getNode := func() (*corev1.Node, error) { return testNode(), nil } + count, err := getVolumeCountFromOpenAPI(getNode, c, efloc, testMetadata, &dev, "") + assert.NoError(t, err) + assert.Equal(t, 7, count) // 7 = 9 available disks - 1 system disk (d-2ze49fivxwkwxl36o1d3) - 1 manually attached (d-2zeh74nnxxrobxz49eug) +} + +func writeMountinfo(t *testing.T, mountInfo string) string { + mountInfoPath := path.Join(os.TempDir(), "mountinfo") + err := os.WriteFile(mountInfoPath, []byte(mountInfo), 0o644) + assert.NoError(t, err) + return mountInfoPath +} + +func parseMountinfo(t *testing.T, mountInfo string) []mount.MountInfo { + mountInfoPath := writeMountinfo(t, mountInfo) + mnts, err := mount.ParseMountInfo(mountInfoPath) + assert.NoError(t, err) + return mnts +} + +func TestGetMountedVolumeDevice(t *testing.T) { + cases := []struct { + name string + mountInfo []mount.MountInfo + device string + }{ + { + name: "mounted", + mountInfo: parseMountinfo(t, "707 97 0:5 /vdc /path/to/volumeDevice rw,nosuid shared:21 - devtmpfs devtmpfs rw,size=7901960k,nr_inodes=1975490,mode=755"), + device: "/vdc", + }, + { + name: "not mounted", + mountInfo: parseMountinfo(t, "707 97 0:5 /vdc /path/to/another_volumeDevice rw,nosuid shared:21 - devtmpfs devtmpfs rw,size=7901960k,nr_inodes=1975490,mode=755"), + device: "", + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + device := getMountedVolumeDevice(test.mountInfo, "/path/to/volumeDevice") + assert.Equal(t, test.device, device) + }) + } +} + +func TestIsDeviceMountedAt(t *testing.T) { + cases := []struct { + name string + mountInfo []mount.MountInfo + mounted bool + }{ + { + name: "mounted", + mountInfo: parseMountinfo(t, "291 97 253:16 / /path/to/mountpoint rw,relatime shared:160 - ext4 /dev/vdb rw"), + mounted: true, + }, + { + name: "wrong device", + mountInfo: parseMountinfo(t, "291 97 253:16 / /path/to/mountpoint rw,relatime shared:160 - ext4 /dev/vdc rw"), + mounted: false, + }, + { + name: "wrong path", + mountInfo: parseMountinfo(t, "291 97 253:16 / /path/to/another/mountpoint rw,relatime shared:160 - ext4 /dev/vdb rw"), + mounted: false, + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + t.Parallel() + mounted := isDeviceMountedAt(test.mountInfo, "/dev/vdb", "/path/to/mountpoint") + assert.Equal(t, test.mounted, mounted) + }) + } +} + +func TestCheckDeviceAvailableError(t *testing.T) { + err := checkDeviceAvailable("/not/exist", "/dev/vdc", "d-2zedmdfyiz2num45yx60", "/path/to/mountpoint") + if !errors.Is(err, os.ErrNotExist) { + t.Errorf("expected os.ErrNotExist, got %v", err) + } +} diff --git a/pkg/utils/mounter_linux.go b/pkg/utils/mounter_unix.go similarity index 99% rename from pkg/utils/mounter_linux.go rename to pkg/utils/mounter_unix.go index d557a627dd..d424f4e8db 100644 --- a/pkg/utils/mounter_linux.go +++ b/pkg/utils/mounter_unix.go @@ -1,3 +1,5 @@ +//go:build unix + package utils import (