Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 0 additions & 54 deletions pkg/disk/bdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/cloud"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
utilsio "github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils/io"
"github.com/pkg/errors"
Expand All @@ -44,12 +43,6 @@ const (
iohubSrviovDriver = "iohub_sriov"
virtioPciDriver = "virtio-pci"

// InstanceStatusStopped ecs stopped status
InstanceStatusStopped = "Stopped"
// DiskBdfTagKey disk bdf tag
DiskBdfTagKey = "bdf.csi.aliyun.com"
// DiskBdfCheckTagKey disk bdf check tag
DiskBdfCheckTagKey = "check.bdf.csi.aliyun.com"
// Vfhp Reconcile period
VfhpReconcilePeriod = 600
)
Expand Down Expand Up @@ -355,53 +348,6 @@ func clearBdfInfo(diskID, bdf string) (err error) {
return nil
}

func forceDetachAllowed(ecsClient cloud.ECSInterface, disk *ecs.Disk) (allowed bool, err error) {
// The following case allow detach:
// 1. no depend bdf
// 2. instance status is stopped

// case 1
describeDisksRequest := ecs.CreateDescribeDisksRequest()
describeDisksRequest.RegionId = GlobalConfigVar.Region
describeDisksRequest.DiskIds = "[\"" + disk.DiskId + "\"]"
diskResponse, err := ecsClient.DescribeDisks(describeDisksRequest)
if err != nil {
klog.Warningf("forceDetachAllowed: error with DescribeDisks: %s, %s", disk.DiskId, err.Error())
return false, errors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId)
}
disks := diskResponse.Disks.Disk
klog.Infof("forceDetachAllowed: diskResponse: %+v", diskResponse)
if len(disks) == 0 {
klog.Warningf("forceDetachAllowed: no disk found: %s", disk.DiskId)
return false, errors.Wrapf(err, "forceDetachAllowed: Get disk empty, ID=%s", disk.DiskId)
}
bdfTagExist := false
for _, tag := range disks[0].Tags.Tag {
if tag.TagKey == DiskBdfTagKey {
bdfTagExist = true
}
}
if !bdfTagExist {
return true, nil
}

request := ecs.CreateDescribeInstancesRequest()
request.RegionId = disk.RegionId
request.InstanceIds = "[\"" + disk.InstanceId + "\"]"
instanceResponse, err := ecsClient.DescribeInstances(request)
klog.Infof("forceDetachAllowed: instanceResponse: %+v", instanceResponse)
if err != nil {
return false, errors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId)
}
if len(instanceResponse.Instances.Instance) == 0 {
return false, errors.Errorf("Describe Instance with empty response: %s", disk.InstanceId)
}
inst := instanceResponse.Instances.Instance[0]
klog.Infof("forceDetachAllowed: Instance status is %s", inst.Status)
// case 2
return inst.Status == InstanceStatusStopped, nil
}

var vfOnce = new(sync.Once)

// isVF means ecsClient running in VF mode
Expand Down
192 changes: 76 additions & 116 deletions pkg/disk/cloud.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build !windows

/*
Copyright 2019 The Kubernetes Authors.

Expand Down Expand Up @@ -41,6 +39,7 @@ import (
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/batcher"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/disk/waitstatus"
"github.com/kubernetes-sigs/alibaba-cloud-csi-driver/pkg/utils"
perrors "github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
Expand All @@ -61,10 +60,28 @@ const (
DISK_RESIZE_PROCESSING_TIMEOUT = 30 * time.Second
)

// Disk status returned in ecs.DescribeDisks
const (
DiskStatusInuse = "In_use"
DiskStatusAttaching = "Attaching"
DiskStatusDetaching = "Detaching"
DiskStatusAvailable = "Available"
)

const (
SnapshotStatusAccomplished = "accomplished"
DiskMultiAttachDisabled = "Disabled"
DiskMultiAttachEnabled = "Enabled"

// InstanceStatusStopped ecs stopped status
InstanceStatusStopped = "Stopped"
)

const (
// DiskAttachedKey attached key
DiskAttachedKey = "k8s.aliyun.com"
// DiskAttachedValue attached value
DiskAttachedValue = "true"
)

type DiskAttachDetach struct {
Expand All @@ -77,8 +94,7 @@ type DiskAttachDetach struct {
detachThrottler *throttle.Throttler
detaching sync.Map

dev *DeviceManager
devMap *devMap
repo *diskRepo
}

type DiskCreateDelete struct {
Expand All @@ -88,109 +104,6 @@ type DiskCreateDelete struct {
deleteThrottler *throttle.Throttler
}

// GetRootBlockDevice get device name
func (ad *DiskAttachDetach) GetRootBlockDevice(logger klog.Logger, diskID string) (string, error) {
device, err := ad.dev.GetRootBlockBySerial(strings.TrimPrefix(diskID, "d-"))
if err == nil {
return device, nil
}
device, err2 := ad.devMap.Get(logger, diskID)
if device == "" {
return "", errors.Join(err, err2) // err2 may be nil, which is OK
}
klog.Infof("GetRootBlockDevice: got disk %s device name %s from devMap", diskID, device)
return device, nil
}

func (ad *DiskAttachDetach) GetVolumeDeviceName(logger klog.Logger, diskID string) (string, error) {
root, err := ad.GetRootBlockDevice(logger, diskID)
if err != nil {
return "", err
}
return ad.dev.adaptDevicePartition(root)
}

func (ad *DiskAttachDetach) possibleDisks(before sets.Set[string]) ([]string, error) {
after, err := ad.dev.ListBlocks()
if err != nil {
return nil, fmt.Errorf("cannot list devices after attach: %w", err)
}

var disks []string
for d := range after.Difference(before) {
serial, err := ad.dev.GetDeviceSerial(d)
if err != nil {
return nil, fmt.Errorf("get device serial for disk %s failed: %w", d, err)
}
if serial == "" {
disks = append(disks, "/dev/"+d)
}
}
return disks, nil
}

func (ad *DiskAttachDetach) findDevice(ctx context.Context, diskID, serial string, before sets.Set[string]) (string, error) {
logger := klog.FromContext(ctx)
var bdf, device string
var err error
for {
if serial != "" {
device, err = ad.dev.WaitRootBlock(ctx, serial)
if err == nil {
logger.V(2).Info("found disk by serial", "serial", serial, "device", device)
break
}
err = fmt.Errorf("disk attached but not found by serial %s: %w", serial, err)
} else if before != nil {
var disks []string
disks, err = ad.possibleDisks(before)
if err != nil {
return "", fmt.Errorf("failed to find disk without serial: %v", err)
}
if len(disks) == 1 {
device = disks[0]
err := ad.devMap.Add(diskID, device)
if err != nil {
return "", fmt.Errorf("failed to populate devMap: %v", err)
}
logger.V(2).Info("found device by diff", "device", device)
break
} else {
// device count is not expected, should retry (later by detaching and attaching again)
err = fmt.Errorf("disk attached, but got %d devices, will retry later", len(disks))
}
}

if !IsVFNode() {
return "", err
}
if bdf != "" {
// second attempt after bindBdfDisk
var errBDF error
device, errBDF = GetDeviceByBdf(bdf, true)
if errBDF != nil {
return "", fmt.Errorf("%v. failed to find by BDF: %v", err, errBDF)
}
logger.V(2).Info("found device by BDF", "BDF", bdf, "device", device)
break
}
// On VF node, try bind driver
bdf, err = bindBdfDisk(diskID)
if err != nil {
if err := unbindBdfDisk(diskID); err != nil {
return "", fmt.Errorf("NodeStageVolume: failed to detach bdf: %v", err)
}
return "", fmt.Errorf("NodeStageVolume: failed to attach bdf: %v", err)
}
if bdf == "" {
// avoid infinite loop
return "", fmt.Errorf("BDF not found")
}
// continue and retry finding device
}
return device, nil
}

type attachAction int

const (
Expand Down Expand Up @@ -236,9 +149,56 @@ func chooseAttachAction(disk *ecs.Disk, instanceID string) (attachAction, error)
return attachNormally, nil
}

func forceDetachAllowed(ecsClient cloud.ECSInterface, disk *ecs.Disk) (allowed bool, err error) {
// The following case allow detach:
// 1. no depend bdf
// 2. instance status is stopped

// case 1
describeDisksRequest := ecs.CreateDescribeDisksRequest()
describeDisksRequest.RegionId = GlobalConfigVar.Region
describeDisksRequest.DiskIds = "[\"" + disk.DiskId + "\"]"
diskResponse, err := ecsClient.DescribeDisks(describeDisksRequest)
if err != nil {
klog.Warningf("forceDetachAllowed: error with DescribeDisks: %s, %s", disk.DiskId, err.Error())
return false, perrors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId)
}
disks := diskResponse.Disks.Disk
klog.Infof("forceDetachAllowed: diskResponse: %+v", diskResponse)
if len(disks) == 0 {
klog.Warningf("forceDetachAllowed: no disk found: %s", disk.DiskId)
return false, perrors.Wrapf(err, "forceDetachAllowed: Get disk empty, ID=%s", disk.DiskId)
}
bdfTagExist := false
for _, tag := range disks[0].Tags.Tag {
if tag.TagKey == DiskBdfTagKey {
bdfTagExist = true
}
}
if !bdfTagExist {
return true, nil
}

request := ecs.CreateDescribeInstancesRequest()
request.RegionId = disk.RegionId
request.InstanceIds = "[\"" + disk.InstanceId + "\"]"
instanceResponse, err := ecsClient.DescribeInstances(request)
klog.Infof("forceDetachAllowed: instanceResponse: %+v", instanceResponse)
if err != nil {
return false, perrors.Wrapf(err, "DescribeInstances, instanceId=%s", disk.InstanceId)
}
if len(instanceResponse.Instances.Instance) == 0 {
return false, perrors.Errorf("Describe Instance with empty response: %s", disk.InstanceId)
}
inst := instanceResponse.Instances.Instance[0]
klog.Infof("forceDetachAllowed: Instance status is %s", inst.Status)
// case 2
return inst.Status == InstanceStatusStopped, nil
}

// Attach Alibaba Cloud disk.
// Returns device path if fromNode, disk serial number otherwise.
func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string, fromNode bool) (string, error) {
func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID string) (string, error) {
logger := klog.FromContext(ctx)
logger.V(2).Info("Starting Do AttachDisk")

Expand All @@ -251,7 +211,7 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
return "", status.Errorf(codes.NotFound, "AttachDisk: csi can't find disk: %s in region: %s, Please check if the cloud disk exists, if the region is correct, or if the csi permissions are correct", diskID, GlobalConfigVar.Region)
}

if !fromNode && disk.SerialNumber == "" {
if ad.repo == nil && disk.SerialNumber == "" {
if GlobalConfigVar.ADControllerEnable {
return "", status.Errorf(codes.InvalidArgument,
"Disk %s does not have serial number but AD controller is enabled, we cannot attach this disk. "+
Expand Down Expand Up @@ -310,14 +270,14 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin

switch action {
case alreadyAttached:
if !fromNode {
if ad.repo == nil {
logger.V(2).Info("already attached, skipping")
return disk.SerialNumber, nil
}
if disk.SerialNumber != "" {
return ad.dev.WaitRootBlock(ctx, disk.SerialNumber)
return ad.repo.WaitRootBlock(ctx, disk.SerialNumber)
}
device, err := ad.devMap.Get(logger, diskID)
device, err := ad.repo.GetAttached(logger, diskID)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -346,8 +306,8 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin

// Step 3: Attach Disk, list device before attach disk
var before sets.Set[string]
if fromNode && disk.SerialNumber == "" {
before, err = DefaultDeviceManager.ListBlocks()
if ad.repo != nil && disk.SerialNumber == "" {
before, err = ad.repo.ListBlocks()
if err != nil {
return "", status.Errorf(codes.Aborted, "AttachDisk: Can't list devices before attach: %v", err)
}
Expand Down Expand Up @@ -391,8 +351,8 @@ func (ad *DiskAttachDetach) attachDisk(ctx context.Context, diskID, nodeID strin
}

// step 5: diff device with previous files under /dev
if fromNode {
device, err := ad.findDevice(ctx, diskID, disk.SerialNumber, before)
if ad.repo != nil {
device, err := ad.repo.findDevice(ctx, diskID, disk.SerialNumber, before)
if err != nil {
return "", status.Error(codes.Aborted, err.Error())
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/disk/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,6 @@ func testAttachDetach(t *testing.T) (context.Context, *cloud.MockECSInterface, *
batcher: b,
attachThrottler: defaultThrottler(),
detachThrottler: defaultThrottler(),
dev: DefaultDeviceManager,
}
}

Expand Down Expand Up @@ -736,7 +735,7 @@ func TestAttachDisk(t *testing.T) {
})
c.EXPECT().DescribeDisks(gomock.Any()).Return(diskResp(tc.after), nil).After(attachCall)
}
serial, err := ad.attachDisk(ctx, "d-testdiskid", "i-testinstanceid", false)
serial, err := ad.attachDisk(ctx, "d-testdiskid", "i-testinstanceid")

assert.Equal(t, tc.forceAttach, force)
if tc.expectErr {
Expand Down
5 changes: 5 additions & 0 deletions pkg/disk/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,9 @@ const (
SNAPSHOT_MIN_RETENTION_DAYS = 1

PUBLISH_CONTEXT_SERIAL = "serialNumber"

// DiskBdfTagKey disk bdf tag
DiskBdfTagKey = "bdf.csi.aliyun.com"
// DiskBdfCheckTagKey disk bdf check tag
DiskBdfCheckTagKey = "check.bdf.csi.aliyun.com"
)
4 changes: 1 addition & 3 deletions pkg/disk/controllerserver.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build !windows

/*
Copyright 2019 The Kubernetes Authors.

Expand Down Expand Up @@ -348,7 +346,7 @@ func (cs *controllerServer) ControllerPublishVolume(ctx context.Context, req *cs

klog.Infof("ControllerPublishVolume: start attach disk: %s to node: %s", req.VolumeId, req.NodeId)

serial, err := cs.ad.attachDisk(ctx, req.VolumeId, req.NodeId, false)
serial, err := cs.ad.attachDisk(ctx, req.VolumeId, req.NodeId)
if err != nil {
klog.Errorf("ControllerPublishVolume: attach disk: %s to node: %s with error: %s", req.VolumeId, req.NodeId, err.Error())
return nil, err
Expand Down
6 changes: 4 additions & 2 deletions pkg/disk/csi_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ func NewCSIAgent() *CSIAgent {
podCGroup: podCgroup,
locks: utils.NewVolumeLocks(),
ad: DiskAttachDetach{
dev: DefaultDeviceManager,
devMap: &devMap{}, // Nobody will add to this map.
repo: &diskRepo{
dev: DefaultDeviceManager,
devMap: &devMap{}, // Nobody will add to this map.
},
},
},
}
Expand Down
Loading