diff --git a/pkg/azurefile/azurefile.go b/pkg/azurefile/azurefile.go index f1051b665b..7830274ca8 100644 --- a/pkg/azurefile/azurefile.go +++ b/pkg/azurefile/azurefile.go @@ -173,8 +173,11 @@ const ( runtimeClassHandlerField = "runtimeclasshandler" defaultRuntimeClassHandler = "kata-cc" mountWithManagedIdentityField = "mountwithmanagedidentity" + mountWithOAuthTokenField = "mountwithoauthtoken" mountWithWITokenField = "mountwithworkloadidentitytoken" + defaultSecretOAuthToken = "oauthtoken" + accountNotProvisioned = "StorageAccountIsNotProvisioned" // this is a workaround fix for 429 throttling issue, will update cloud provider for better fix later tooManyRequests = "TooManyRequests" @@ -292,6 +295,8 @@ type Driver struct { secretCacheMap azcache.Resource // a map storing all volumes using data plane API dataPlaneAPIVolMap sync.Map + // a map storing OAuth token SHA per server to avoid unnecessary credential cache refresh + oauthTokenSHAMap sync.Map // a timed cache storing all storage accounts that are using data plane API temporarily dataPlaneAPIAccountCache azcache.Resource // a timed cache storing account search history (solve account list throttling issue) @@ -824,7 +829,7 @@ func (d *Driver) GetAccountInfo(ctx context.Context, volumeID string, secrets, r var protocol, accountKey, secretName, pvcNamespace string // getAccountKeyFromSecret indicates whether get account key only from k8s secret - var getAccountKeyFromSecret, getLatestAccountKey, mountWithManagedIdentity, mountWithWIToken bool + var getAccountKeyFromSecret, getLatestAccountKey, mountWithManagedIdentity, mountWithOAuthToken, mountWithWIToken bool var clientID, tenantID, tokenFilePath, serviceAccountToken string for k, v := range reqContext { @@ -861,6 +866,10 @@ func (d *Driver) GetAccountInfo(ctx context.Context, volumeID string, secrets, r if mountWithManagedIdentity, err = strconv.ParseBool(v); err != nil { return rgName, accountName, accountKey, fileShareName, diskName, subsID, tenantID, tokenFilePath, fmt.Errorf("invalid %s: %s in volume context", mountWithManagedIdentityField, v) } + case mountWithOAuthTokenField: + if mountWithOAuthToken, err = strconv.ParseBool(v); err != nil { + return rgName, accountName, accountKey, fileShareName, diskName, subsID, tenantID, tokenFilePath, fmt.Errorf("invalid %s: %s in volume context", mountWithOAuthTokenField, v) + } case mountWithWITokenField: if mountWithWIToken, err = strconv.ParseBool(v); err != nil { return rgName, accountName, accountKey, fileShareName, diskName, subsID, tenantID, tokenFilePath, fmt.Errorf("invalid %s: %s in volume context", mountWithWITokenField, v) @@ -905,6 +914,21 @@ func (d *Driver) GetAccountInfo(ctx context.Context, volumeID string, secrets, r return rgName, accountName, accountKey, fileShareName, diskName, subsID, tenantID, tokenFilePath, nil } + if mountWithOAuthToken { + klog.V(2).Infof("mountWithOAuthToken is true, use OAuth token from secret for mount") + // Read accountName from secret if not already set + if accountName == "" && secretName != "" { + name, _, _, err := d.GetStorageAccountFromSecret(ctx, secretName, secretNamespace) + if err != nil { + return rgName, accountName, accountKey, fileShareName, diskName, subsID, tenantID, tokenFilePath, fmt.Errorf("failed to get account name from secret for mountWithOAuthToken: %v", err) + } + if name != "" { + accountName = name + } + } + return rgName, accountName, accountKey, fileShareName, diskName, subsID, tenantID, tokenFilePath, nil + } + if mountWithWIToken { if clientID == "" { clientID = d.cloud.Config.AzureAuthConfig.UserAssignedIdentityID @@ -957,7 +981,7 @@ func (d *Driver) GetAccountInfo(ctx context.Context, volumeID string, secrets, r if secretName != "" { var name string // 2. if not found in cache, get account key from kubernetes secret - name, accountKey, err = d.GetStorageAccountFromSecret(ctx, secretName, secretNamespace) + name, accountKey, _, err = d.GetStorageAccountFromSecret(ctx, secretName, secretNamespace) if name != "" { accountName = name } @@ -1354,7 +1378,7 @@ func (d *Driver) GetStorageAccesskey(ctx context.Context, accountOptions *storag if secretName == "" { secretName = fmt.Sprintf(secretNameTemplate, accountName) } - _, accountKey, err := d.GetStorageAccountFromSecret(ctx, secretName, secretNamespace) + _, accountKey, _, err := d.GetStorageAccountFromSecret(ctx, secretName, secretNamespace) if err != nil { klog.V(2).Infof("could not get account(%s) key from secret(%s), error: %v, use cluster identity to get account key instead", accountOptions.Name, secretName, err) accountKey, err = d.GetStorageAccesskeyWithSubsID(ctx, accountOptions.SubscriptionID, accountOptions.Name, accountOptions.ResourceGroup, accountOptions.GetLatestAccountKey) @@ -1378,21 +1402,22 @@ func (d *Driver) GetStorageAccesskeyWithSubsID(ctx context.Context, subsID, acco return d.cloud.GetStorageAccesskey(ctx, accountClient, account, resourceGroup, getLatestAccountKey) } -// GetStorageAccountFromSecret get storage account key from k8s secret -// return -func (d *Driver) GetStorageAccountFromSecret(ctx context.Context, secretName, secretNamespace string) (string, string, error) { +// GetStorageAccountFromSecret get storage account key and OAuth token from k8s secret +// return +func (d *Driver) GetStorageAccountFromSecret(ctx context.Context, secretName, secretNamespace string) (string, string, string, error) { if d.kubeClient == nil { - return "", "", fmt.Errorf("could not get account key from secret(%s): KubeClient is nil", secretName) + return "", "", "", fmt.Errorf("could not get credentials from secret(%s): KubeClient is nil", secretName) } secret, err := d.kubeClient.CoreV1().Secrets(secretNamespace).Get(ctx, secretName, metav1.GetOptions{}) if err != nil { - return "", "", fmt.Errorf("could not get secret(%v): %v", secretName, err) + return "", "", "", fmt.Errorf("could not get secret(%v): %v", secretName, err) } - accountName := strings.TrimSpace(string(secret.Data[defaultSecretAccountName][:])) - accountKey := strings.TrimSpace(string(secret.Data[defaultSecretAccountKey][:])) - return accountName, accountKey, nil + accountName := strings.TrimSpace(string(secret.Data[defaultSecretAccountName])) + accountKey := strings.TrimSpace(string(secret.Data[defaultSecretAccountKey])) + oauthToken := strings.TrimSpace(string(secret.Data[defaultSecretOAuthToken])) + return accountName, accountKey, oauthToken, nil } // getSubnetResourceID get default subnet resource ID from cloud provider config diff --git a/pkg/azurefile/controllerserver.go b/pkg/azurefile/controllerserver.go index 0e07c13f3b..6ef6f742c6 100644 --- a/pkg/azurefile/controllerserver.go +++ b/pkg/azurefile/controllerserver.go @@ -126,7 +126,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } var sku, subsID, resourceGroup, location, account, fileShareName, diskName, fsType, secretName string var secretNamespace, pvcNamespace, protocol, customTags, storageEndpointSuffix, networkEndpointType, shareAccessTier, accountAccessTier, rootSquashType, tagValueDelimiter string - var createAccount, useSeretCache, matchTags, selectRandomMatchingAccount, getLatestAccountKey, encryptInTransit, mountWithManagedIdentity, mountWithWIToken bool + var createAccount, useSeretCache, matchTags, selectRandomMatchingAccount, getLatestAccountKey, encryptInTransit, mountWithManagedIdentity, mountWithWIToken, mountWithOAuthToken bool var vnetResourceGroup, vnetName, vnetLinkName, publicNetworkAccess, subnetName, shareNamePrefix, fsGroupChangePolicy, useDataPlaneAPI, privateDNSZoneResourceGroup string var requireInfraEncryption, disableDeleteRetentionPolicy, enableLFS, isMultichannelEnabled, allowSharedKeyAccess, allowCrossTenantReplication *bool var provisionedBandwidthMibps, provisionedIops *int32 @@ -327,20 +327,33 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) if err != nil { return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in storage class", mountWithWITokenField, v) } + case mountWithOAuthTokenField: + mountWithOAuthToken, err = strconv.ParseBool(v) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "invalid %s: %s in storage class", mountWithOAuthTokenField, v) + } default: return nil, status.Errorf(codes.InvalidArgument, "invalid parameter %q in storage class", k) } } - if mountWithManagedIdentity && mountWithWIToken { - return nil, status.Error(codes.InvalidArgument, "mountwithmanagedidentity and mountwithworkloadidentitytoken cannot be both true in storage class") + if (mountWithManagedIdentity && mountWithWIToken) || (mountWithManagedIdentity && mountWithOAuthToken) || (mountWithWIToken && mountWithOAuthToken) { + return nil, status.Errorf(codes.InvalidArgument, "only one of %s, %s, and %s can be true in storage class", mountWithManagedIdentityField, mountWithOAuthTokenField, mountWithWITokenField) + } + + if mountWithOAuthToken && secretName == "" { + return nil, status.Errorf(codes.InvalidArgument, "%s is required when %s is true", secretNameField, mountWithOAuthTokenField) + } + + if mountWithOAuthToken && (protocol == nfs || fsType == nfs) { + return nil, status.Errorf(codes.InvalidArgument, "%s is not supported with NFS protocol", mountWithOAuthTokenField) } - // When using managed identity or workload identity token for mount, - // the account key should not be stored in the secret since mount - // authentication uses identity-based tokens, not account keys. - if mountWithManagedIdentity || mountWithWIToken { + var requiresSmbOAuth *bool + if mountWithManagedIdentity || mountWithWIToken || mountWithOAuthToken { storeAccountKey = false + klog.V(2).Info("enabling smb oauth for identity-based mount") + requiresSmbOAuth = to.Ptr(true) } if matchTags && account != "" { @@ -572,12 +585,6 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) } } - var requiresSmbOAuth *bool - if mountWithManagedIdentity || mountWithWIToken { - klog.V(2).Info("enabling smb oauth for managed identity or work identity token based mount") - requiresSmbOAuth = to.Ptr(true) - } - accountOptions := &storage.AccountOptions{ Name: account, Type: sku, diff --git a/pkg/azurefile/controllerserver_test.go b/pkg/azurefile/controllerserver_test.go index 02c0d2904f..3bd93268d4 100644 --- a/pkg/azurefile/controllerserver_test.go +++ b/pkg/azurefile/controllerserver_test.go @@ -1374,7 +1374,7 @@ var _ = ginkgo.Describe("TestCreateVolume", func() { }, } - expectedErr := status.Errorf(codes.InvalidArgument, "%s and %s cannot be both true in storage class", mountWithManagedIdentityField, mountWithWITokenField) + expectedErr := status.Errorf(codes.InvalidArgument, "only one of %s, %s, and %s can be true in storage class", mountWithManagedIdentityField, mountWithOAuthTokenField, mountWithWITokenField) _, err := d.CreateVolume(ctx, req) gomega.Expect(err).To(gomega.Equal(expectedErr)) }) diff --git a/pkg/azurefile/nodeserver.go b/pkg/azurefile/nodeserver.go index 46527d3271..fa72c0650b 100644 --- a/pkg/azurefile/nodeserver.go +++ b/pkg/azurefile/nodeserver.go @@ -17,6 +17,7 @@ limitations under the License. package azurefile import ( + "crypto/sha256" "encoding/json" "fmt" "io" @@ -97,11 +98,14 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu // ephemeral volume if strings.EqualFold(context[ephemeralField], trueValue) { setKeyValueInMap(context, secretNamespaceField, context[podNamespaceField]) - // When Managed Identity is used for ephemeral volumes then reject the request. + // When Managed Identity or OAuth token is used for ephemeral volumes then reject the request. // Allowing access for inline volume with identity will open up risk of arbitrary pods accessing fileshares with node identity permissions. if strings.EqualFold(getValueInMap(context, mountWithManagedIdentityField), trueValue) { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("mountWithManagedIdentity cannot be used for ephemeral volumes, please use either %s or secret based authentication", mountWithWITokenField)) } + if strings.EqualFold(getValueInMap(context, mountWithOAuthTokenField), trueValue) { + return nil, status.Error(codes.InvalidArgument, "mountWithOAuthToken cannot be used for ephemeral volumes, please use secret based authentication") + } useWIToken := strings.EqualFold(getValueInMap(context, mountWithWITokenField), trueValue) if !d.allowInlineVolumeKeyAccessWithIdentity && !useWIToken { // only get storage account from secret when not using managed identity or workload identity @@ -184,6 +188,24 @@ func (d *Driver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu mountOptions = append(mountOptions, "ro") } + // mountWithOAuthToken: validate and refresh credential cache BEFORE ensureMountPoint so that + // a stale mount (token expired → "required key not available") can recover. + // ensureMountPoint does ReadDir to validate the mount, which will fail if the + // kernel credential cache has an expired token, leading to an unmount attempt + // that fails with "target is busy". Refreshing first gives the kernel a valid + // token before the ReadDir check. + var oauthServer string + isMountWithOAuthToken := context != nil && strings.EqualFold(getValueInMap(context, mountWithOAuthTokenField), trueValue) + if isMountWithOAuthToken { + var err error + oauthServer, err = d.setCredentialCacheWithOAuthToken(ctx, volumeID, context) + if err != nil { + st := status.Convert(err) + return nil, status.Errorf(st.Code(), "NodePublishVolume: %s", st.Message()) + } + klog.V(2).Infof("NodePublishVolume: refreshed OAuth token credential cache for volume(%s) server(%s)", volumeID, oauthServer) + } + mnt, err := d.ensureMountPoint(target, os.FileMode(mountPermissions)) if err != nil { return nil, status.Errorf(codes.Internal, "Could not mount target %s: %v", target, err) @@ -293,7 +315,7 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe // don't respect fsType from req.GetVolumeCapability().GetMount().GetFsType() // since it's ext4 by default on Linux var fsType, server, protocol, ephemeralVolMountOptions, storageEndpointSuffix, folderName, clientID string - var ephemeralVol, createFolderIfNotExist, encryptInTransit, mountWithManagedIdentity, mountWithWIToken bool + var ephemeralVol, createFolderIfNotExist, encryptInTransit, mountWithManagedIdentity, mountWithOAuthToken, mountWithWIToken bool volumeMetadataReplaceMap := map[string]string{} mountPermissions := d.mountPermissions @@ -354,6 +376,11 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe if err != nil { return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("Volume context property %q must be a boolean value: %v", k, err)) } + case mountWithOAuthTokenField: + mountWithOAuthToken, err = strconv.ParseBool(v) + if err != nil { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("Volume context property %q must be a boolean value: %v", k, err)) + } case mountWithWITokenField: mountWithWIToken, err = strconv.ParseBool(v) if err != nil { @@ -380,8 +407,14 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe return nil, status.Errorf(codes.InvalidArgument, "fsGroupChangePolicy(%s) is not supported, supported fsGroupChangePolicy list: %v", fsGroupChangePolicy, supportedFSGroupChangePolicyList) } - if mountWithManagedIdentity && mountWithWIToken { - return nil, status.Error(codes.InvalidArgument, "mountWithManagedIdentity and mountWithWIToken cannot be both true") + if (mountWithManagedIdentity && mountWithWIToken) || (mountWithManagedIdentity && mountWithOAuthToken) || (mountWithWIToken && mountWithOAuthToken) { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("only one of %q, %q, and %q can be true", mountWithManagedIdentityField, mountWithOAuthTokenField, mountWithWITokenField)) + } + + if mountWithOAuthToken { + if err := validateMountWithOAuthToken(protocol, fsType, context); err != nil { + return nil, err + } } lockKey := fmt.Sprintf("%s-%s", volumeID, targetPath) @@ -451,10 +484,19 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe klog.V(2).Infof("using workload identity token for volume %s with mount options: %v", volumeID, sensitiveMountOptions) if tokenFilePath != "" { // always set credential cache when token file is provided even mount does not happen - if out, err := setCredentialCache(server, clientID, tenantID, tokenFilePath); err != nil { + if out, err := setCredentialCache(server, clientID, tenantID, tokenFilePath, ""); err != nil { return nil, status.Errorf(codes.Internal, "setCredentialCache failed for %s with error: %v, output: %s", server, err, out) } } + } else if mountWithOAuthToken && runtime.GOOS != "windows" { + sensitiveMountOptions = []string{"sec=krb5,cruid=0,upcall_target=mount"} + secretName := getValueInMap(context, secretNameField) + secretNamespace := getSecretNamespace(context) + klog.V(2).Infof("using OAuth token from secret(%s/%s) for volume %s", secretNamespace, secretName, volumeID) + // always refresh credential cache when mountWithOAuthToken is set, even if mount does not happen + if _, err := d.setCredentialCacheWithOAuthToken(ctx, volumeID, context); err != nil { + return nil, err + } } else { if accountName == "" || accountKey == "" { return nil, status.Errorf(codes.Internal, "accountName(%s) or accountKey is empty", accountName) @@ -516,7 +558,7 @@ func (d *Driver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRe } else { execFunc := func() error { if mountWithManagedIdentity && protocol != nfs && runtime.GOOS != "windows" { - if out, err := setCredentialCache(server, clientID, tenantID, tokenFilePath); err != nil { + if out, err := setCredentialCache(server, clientID, tenantID, tokenFilePath, ""); err != nil { return fmt.Errorf("setCredentialCache failed for %s with error: %v, output: %s", server, err, out) } } @@ -834,6 +876,83 @@ func (d *Driver) ensureMountPoint(target string, perm os.FileMode) (bool, error) return !notMnt, nil } +// validateMountWithOAuthToken validates that the volume context is compatible +// with mountWithOAuthToken. Shared by NodeStageVolume and NodePublishVolume. +func validateMountWithOAuthToken(protocol, fsType string, volumeContext map[string]string) error { + if runtime.GOOS == "windows" { + return status.Error(codes.InvalidArgument, "mountWithOAuthToken is not supported on Windows") + } + if protocol == nfs || fsType == nfs { + return status.Error(codes.InvalidArgument, "mountWithOAuthToken is not supported with NFS protocol") + } + if strings.TrimSpace(getValueInMap(volumeContext, secretNameField)) == "" { + return status.Error(codes.InvalidArgument, "secretName is required when mountWithOAuthToken is true") + } + if strings.EqualFold(getValueInMap(volumeContext, createFolderIfNotExistField), trueValue) { + return status.Error(codes.InvalidArgument, "createFolderIfNotExist is not supported with mountWithOAuthToken") + } + return nil +} + +func (d *Driver) setCredentialCacheWithOAuthToken(ctx context.Context, volumeID string, volumeContext map[string]string) (string, error) { + secretName := getValueInMap(volumeContext, secretNameField) + if secretName == "" { + return "", status.Errorf(codes.InvalidArgument, "secretName is required when %s is true", mountWithOAuthTokenField) + } + + // Fetch the secret once upfront to avoid duplicate API calls + secretNamespace := getSecretNamespace(volumeContext) + secretAccountName, _, oauthToken, err := d.GetStorageAccountFromSecret(ctx, secretName, secretNamespace) + if err != nil { + return "", status.Errorf(codes.Internal, "failed to get secret %s/%s: %v", secretNamespace, secretName, err) + } + + // Resolve server name + server := getValueInMap(volumeContext, serverNameField) + if server == "" { + accountName := getValueInMap(volumeContext, storageAccountField) + if accountName == "" { + _, parsedAccountName, _, _, _, _, parseErr := GetFileShareInfo(volumeID) + if parseErr == nil && parsedAccountName != "" { + accountName = parsedAccountName + } + } + if accountName == "" { + accountName = secretAccountName + } + storageEndpointSuffix := getValueInMap(volumeContext, storageEndpointSuffixField) + if storageEndpointSuffix == "" { + storageEndpointSuffix = d.getStorageEndPointSuffix() + } + if accountName != "" { + server = fmt.Sprintf("%s.file.%s", accountName, storageEndpointSuffix) + } + } + if server == "" { + return "", status.Errorf(codes.InvalidArgument, "server is empty for volume(%s) with %s: set %q or %q in volume context, or provide account name in secret %q", volumeID, mountWithOAuthTokenField, serverNameField, storageAccountField, secretNameField) + } + + if oauthToken == "" { + return "", status.Errorf(codes.InvalidArgument, "%s not found in secret %s/%s", defaultSecretOAuthToken, secretNamespace, secretName) + } + + // check if token has changed by comparing SHA256 hash + tokenSHA := fmt.Sprintf("%x", sha256.Sum256([]byte(oauthToken))) + if cachedSHA, ok := d.oauthTokenSHAMap.Load(server); ok && cachedSHA.(string) == tokenSHA { + klog.V(4).Infof("setCredentialCacheWithOAuthToken: OAuth token unchanged for server %s, skipping refresh", server) + return server, nil + } + + if output, err := setCredentialCache(server, "", "", "", oauthToken); err != nil { + klog.Errorf("setCredentialCache failed for %s with output: %s, error: %v", server, strings.ReplaceAll(string(output), oauthToken, ""), err) + return "", status.Errorf(codes.Internal, "setCredentialCache failed for %s: %v", server, err) + } + + d.oauthTokenSHAMap.Store(server, tokenSHA) + klog.V(2).Infof("setCredentialCacheWithOAuthToken: refreshed credential cache for server %s using secret %s/%s", server, secretNamespace, secretName) + return server, nil +} + func (d *Driver) mountWithProxy(ctx context.Context, source, target, fsType string, options, sensitiveMountOptions []string) error { conn, err := grpc.NewClient(d.azurefileProxyEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { diff --git a/pkg/azurefile/nodeserver_test.go b/pkg/azurefile/nodeserver_test.go index f12c5cd32c..58f0ed3a78 100644 --- a/pkg/azurefile/nodeserver_test.go +++ b/pkg/azurefile/nodeserver_test.go @@ -18,6 +18,7 @@ package azurefile import ( "context" + "crypto/sha256" "errors" "fmt" "net" @@ -35,7 +36,10 @@ import ( "go.uber.org/mock/gomock" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" mount "k8s.io/mount-utils" "k8s.io/utils/exec" testingexec "k8s.io/utils/exec/testing" @@ -234,6 +238,24 @@ func TestNodePublishVolume(t *testing.T) { WindowsError: status.Error(codes.InvalidArgument, fmt.Sprintf("mountWithManagedIdentity cannot be used for ephemeral volumes, please use either %s or secret based authentication", mountWithWITokenField)), }, }, + { + desc: "[Error] Ephemeral volume with mountWithOAuthToken should return error", + req: &csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, + VolumeId: "csi-94637b24200724b604b0e2c92e0fcdfabb0e109f656857c5a3c9585777c8ed85", + TargetPath: targetTest, + Readonly: true, + VolumeContext: map[string]string{ + ephemeralField: "true", + storageAccountField: "teststorageaccount", + shareNameField: "testshare", + mountWithOAuthTokenField: "true", + }, + }, + expectedErr: testutil.TestError{ + DefaultError: status.Error(codes.InvalidArgument, "mountWithOAuthToken cannot be used for ephemeral volumes, please use secret based authentication"), + WindowsError: status.Error(codes.InvalidArgument, "mountWithOAuthToken cannot be used for ephemeral volumes, please use secret based authentication"), + }, + }, { desc: "[Error] Ephemeral volume with mountWithWIToken should preserve storageAccount", req: &csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, @@ -324,6 +346,39 @@ func TestNodePublishVolume(t *testing.T) { cleanup: func() { }, }, + { + desc: "[Error] mountWithOAuthToken with missing secretName", + req: &csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, + VolumeId: "vol_1", + TargetPath: targetTest, + StagingTargetPath: sourceTest, + VolumeContext: map[string]string{ + mountWithOAuthTokenField: "true", + }, + }, + expectedErr: testutil.TestError{ + DefaultError: status.Errorf(codes.InvalidArgument, "NodePublishVolume: secretName is required when %s is true", mountWithOAuthTokenField), + }, + }, + { + desc: "[Error] mountWithOAuthToken with server but secret fetch fails", + req: &csi.NodePublishVolumeRequest{VolumeCapability: &csi.VolumeCapability{AccessMode: &volumeCap}, + VolumeId: "vol_1", + TargetPath: targetTest, + StagingTargetPath: sourceTest, + VolumeContext: map[string]string{ + mountWithOAuthTokenField: "true", + serverNameField: "testaccount.file.core.windows.net", + secretNameField: "test-secret", + }, + }, + expectedErr: testutil.TestError{ + DefaultError: status.Errorf(codes.Internal, "NodePublishVolume: failed to get secret %s/%s: %v", "default", "test-secret", fmt.Errorf("could not get credentials from secret(%s): KubeClient is nil", "test-secret")), + }, + setup: func() { + d.kubeClient = nil + }, + }, } // Setup @@ -856,7 +911,7 @@ func TestNodeStageVolume(t *testing.T) { }, }, { - desc: "[Error] mountWithManagedIdentity and mountWithWIToken cannot be both true", + desc: "[Error] only one of mountWithManagedIdentity, mountWithOAuthToken, and mountWithWorkloadIdentityToken can be true", req: &csi.NodeStageVolumeRequest{VolumeId: "vol_1##", StagingTargetPath: sourceTest, VolumeCapability: &stdVolCap, VolumeContext: map[string]string{ @@ -868,7 +923,55 @@ func TestNodeStageVolume(t *testing.T) { }, Secrets: secrets}, expectedErr: testutil.TestError{ - DefaultError: status.Error(codes.InvalidArgument, "mountWithManagedIdentity and mountWithWIToken cannot be both true"), + DefaultError: status.Error(codes.InvalidArgument, fmt.Sprintf("only one of %q, %q, and %q can be true", mountWithManagedIdentityField, mountWithOAuthTokenField, mountWithWITokenField)), + }, + }, + { + desc: "[Error] mountWithOAuthToken not supported with NFS", + req: &csi.NodeStageVolumeRequest{VolumeId: "vol_1##", StagingTargetPath: sourceTest, + VolumeCapability: &stdVolCap, + VolumeContext: map[string]string{ + shareNameField: "test_sharename", + storageAccountField: "test_accountname", + mountWithOAuthTokenField: "true", + protocolField: "nfs", + }, + Secrets: secrets}, + expectedErr: testutil.TestError{ + DefaultError: status.Error(codes.InvalidArgument, "mountWithOAuthToken is not supported with NFS protocol"), + WindowsError: status.Error(codes.InvalidArgument, "mountWithOAuthToken is not supported on Windows"), + }, + }, + { + desc: "[Error] mountWithOAuthToken missing secretName", + req: &csi.NodeStageVolumeRequest{VolumeId: "vol_1##", StagingTargetPath: sourceTest, + VolumeCapability: &stdVolCap, + VolumeContext: map[string]string{ + shareNameField: "test_sharename", + storageAccountField: "test_accountname", + mountWithOAuthTokenField: "true", + }, + Secrets: secrets}, + expectedErr: testutil.TestError{ + DefaultError: status.Error(codes.InvalidArgument, "secretName is required when mountWithOAuthToken is true"), + WindowsError: status.Error(codes.InvalidArgument, "mountWithOAuthToken is not supported on Windows"), + }, + }, + { + desc: "[Error] mountWithOAuthToken with createFolderIfNotExist", + req: &csi.NodeStageVolumeRequest{VolumeId: "vol_1##", StagingTargetPath: sourceTest, + VolumeCapability: &stdVolCap, + VolumeContext: map[string]string{ + shareNameField: "test_sharename", + storageAccountField: "test_accountname", + mountWithOAuthTokenField: "true", + secretNameField: "test-secret", + createFolderIfNotExistField: "true", + }, + Secrets: secrets}, + expectedErr: testutil.TestError{ + DefaultError: status.Error(codes.InvalidArgument, "createFolderIfNotExist is not supported with mountWithOAuthToken"), + WindowsError: status.Error(codes.InvalidArgument, "mountWithOAuthToken is not supported on Windows"), }, }, { @@ -1345,3 +1448,85 @@ func makeFakeOutput(output string, err error) testingexec.FakeAction { return []byte(o), nil, err } } + +func TestSetCredentialCacheWithOAuthToken(t *testing.T) { + tests := []struct { + desc string + volumeContext map[string]string + setupDriver func(d *Driver) + expectedErr string + expectSkip bool + }{ + { + desc: "missing secretName", + volumeContext: map[string]string{ + mountWithOAuthTokenField: "true", + }, + expectedErr: "secretName is required", + }, + { + desc: "kubeClient is nil", + volumeContext: map[string]string{ + secretNameField: "test-secret", + serverNameField: "testaccount.file.core.windows.net", + }, + setupDriver: func(d *Driver) { + d.kubeClient = nil + }, + expectedErr: "KubeClient is nil", + }, + { + desc: "oauthtoken missing in secret", + volumeContext: map[string]string{ + secretNameField: "test-secret", + serverNameField: "testaccount.file.core.windows.net", + }, + setupDriver: func(d *Driver) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "test-secret", Namespace: "default"}, + Data: map[string][]byte{defaultSecretAccountName: []byte("testaccount")}, + } + d.kubeClient = fake.NewSimpleClientset(secret) + }, + expectedErr: fmt.Sprintf("%s not found in secret", defaultSecretOAuthToken), + }, + { + desc: "skip refresh when token SHA unchanged", + volumeContext: map[string]string{ + secretNameField: "test-secret", + serverNameField: "testaccount.file.core.windows.net", + }, + setupDriver: func(d *Driver) { + token := "test-oauth-token-value" + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "test-secret", Namespace: "default"}, + Data: map[string][]byte{ + defaultSecretAccountName: []byte("testaccount"), + defaultSecretOAuthToken: []byte(token), + }, + } + d.kubeClient = fake.NewSimpleClientset(secret) + // pre-populate SHA cache + tokenSHA := fmt.Sprintf("%x", sha256.Sum256([]byte(token))) + d.oauthTokenSHAMap.Store("testaccount.file.core.windows.net", tokenSHA) + }, + expectSkip: true, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + d := NewFakeDriver() + if test.setupDriver != nil { + test.setupDriver(d) + } + _, err := d.setCredentialCacheWithOAuthToken(context.Background(), "vol_1", test.volumeContext) + if test.expectSkip { + assert.NoError(t, err) + } else if test.expectedErr != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), test.expectedErr) + } + }) + } +} diff --git a/pkg/azurefile/utils.go b/pkg/azurefile/utils.go index 61f915c41b..8ffcc01379 100644 --- a/pkg/azurefile/utils.go +++ b/pkg/azurefile/utils.go @@ -304,6 +304,18 @@ func getValueInMap(m map[string]string, key string) string { return "" } +// getSecretNamespace resolves the secret namespace from volume context, +// falling back to PVC namespace, then to "default". +func getSecretNamespace(volumeContext map[string]string) string { + if ns := getValueInMap(volumeContext, secretNamespaceField); ns != "" { + return ns + } + if ns := getValueInMap(volumeContext, pvcNamespaceKey); ns != "" { + return ns + } + return defaultNamespace +} + // replaceWithMap replace key with value for str func replaceWithMap(str string, m map[string]string) string { for k, v := range m { @@ -419,27 +431,42 @@ func getDefaultBandwidth(requestGiB int, storageAccountType string) *int32 { return &bandwidth } -func setCredentialCache(server, clientID, tenantID, tokenFile string) ([]byte, error) { +func setCredentialCache(server, clientID, tenantID, tokenFile, token string) ([]byte, error) { if server == "" { return nil, fmt.Errorf("server must be provided") } - if clientID == "" { - return nil, fmt.Errorf("clientID must be provided") + if token != "" && tokenFile != "" { + return nil, fmt.Errorf("token and tokenFile are mutually exclusive, only one can be provided") } + serverURL := "https://" + server var args []string - if tokenFile != "" { + switch { + case token != "": + // direct token mode: azfilesauthmanager set https:// + args = []string{"set", serverURL, token} + case tokenFile != "": + if clientID == "" { + return nil, fmt.Errorf("clientID must be provided when tokenFile is set") + } if tenantID == "" { return nil, fmt.Errorf("tenantID must be provided when tokenFile is provided") } - args = []string{"set", "https://" + server, "--workload-identity", "--tenant-id", tenantID, "--client-id", clientID, "--token-file", tokenFile} - } else { - args = []string{"set", "https://" + server, "--imds-client-id", clientID} + args = []string{"set", serverURL, "--workload-identity", "--tenant-id", tenantID, "--client-id", clientID, "--token-file", tokenFile} + default: + if clientID == "" { + return nil, fmt.Errorf("clientID must be provided") + } + args = []string{"set", serverURL, "--imds-client-id", clientID} } cmd := exec.Command("azfilesauthmanager", args...) cmd.Env = append(os.Environ(), cmd.Env...) - klog.V(2).Infof("Executing command: %q", cmd.String()) + if token != "" { + klog.V(2).Infof("Executing command: azfilesauthmanager set %s ", serverURL) + } else { + klog.V(2).Infof("Executing command: %q", cmd.String()) + } return cmd.CombinedOutput() } diff --git a/pkg/azurefile/utils_test.go b/pkg/azurefile/utils_test.go index 3e579d848f..8502252e39 100644 --- a/pkg/azurefile/utils_test.go +++ b/pkg/azurefile/utils_test.go @@ -1443,7 +1443,7 @@ func TestSetCredentialCache(t *testing.T) { clientID: "", tenantID: "test-tenant-id", tokenFile: "test-token-file", - expectedError: "clientID must be provided", + expectedError: "clientID must be provided when tokenFile is set", }, { desc: "empty tenantID with tokenFile", @@ -1480,7 +1480,7 @@ func TestSetCredentialCache(t *testing.T) { } for _, test := range tests { - _, err := setCredentialCache(test.server, test.clientID, test.tenantID, test.tokenFile) + _, err := setCredentialCache(test.server, test.clientID, test.tenantID, test.tokenFile, "") if test.expectedError != "" { if err == nil { t.Errorf("test[%s]: expected error containing %q, got nil", test.desc, test.expectedError) @@ -1491,6 +1491,53 @@ func TestSetCredentialCache(t *testing.T) { // Note: We don't test successful execution as it requires azfilesauthmanager binary // The actual command execution will fail, but we've validated the argument construction } + + // Test direct token mode + tokenTests := []struct { + desc string + server string + token string + tokenFile string + expectedError string + }{ + { + desc: "token mode: empty server", + server: "", + token: "test-oauth-token", + expectedError: "server must be provided", + }, + { + desc: "token mode: valid token bypasses clientID check", + server: "test.file.core.windows.net", + token: "test-oauth-token", + expectedError: "", // Will fail due to missing azfilesauthmanager, but must NOT fail with clientID/tenantID error + }, + { + desc: "both token and tokenFile should fail", + server: "test.file.core.windows.net", + token: "test-oauth-token", + tokenFile: "/tmp/token", + expectedError: "token and tokenFile are mutually exclusive", + }, + } + + for _, test := range tokenTests { + _, err := setCredentialCache(test.server, "", "", test.tokenFile, test.token) + if test.expectedError != "" { + if err == nil { + t.Errorf("test[%s]: expected error containing %q, got nil", test.desc, test.expectedError) + } else if !strings.Contains(err.Error(), test.expectedError) { + t.Errorf("test[%s]: expected error containing %q, got %v", test.desc, test.expectedError, err) + } + } else if err != nil { + // Token mode should not return clientID/tenantID validation errors + errMsg := err.Error() + if strings.Contains(errMsg, "clientID must be provided when tokenFile is set") || strings.Contains(errMsg, "tenantID must be provided") { + t.Errorf("test[%s]: token mode should bypass clientID/tenantID validation, got: %v", test.desc, err) + } + // Other errors (e.g., azfilesauthmanager not found) are expected in test environment + } + } } func int32Ptr(i int32) *int32 { diff --git a/test/e2e/dynamic_provisioning_test.go b/test/e2e/dynamic_provisioning_test.go index 5590ceba6b..4ec43473e2 100644 --- a/test/e2e/dynamic_provisioning_test.go +++ b/test/e2e/dynamic_provisioning_test.go @@ -1913,6 +1913,7 @@ var _ = ginkgo.Describe("Dynamic Provisioning", func() { ginkgo.Skip("test case is only available for capz test") } gomega.Expect(miRoleSetupSucceeded).To(gomega.BeTrue(), "MI role assignment failed, cannot run managed identity mount test") + pods := []testsuites.PodDetails{ { Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data", @@ -1939,6 +1940,48 @@ var _ = ginkgo.Describe("Dynamic Provisioning", func() { test.Run(ctx, cs, ns) }) + ginkgo.It("should create a volume on demand with mountWithOAuthToken [file.csi.azure.com]", func(ctx ginkgo.SpecContext) { + skipIfUsingInTreeVolumePlugin() + skipIfTestingInWindowsCluster() + if !isCapzTest { + ginkgo.Skip("mountWithOAuthToken test requires CAPZ environment") + } + // Setup OAuth token inline to ensure fresh token at mount time + err := setupOAuthToken(ctx, suiteCreds, suiteAzureClient) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "OAuth token setup failed") + + pods := []testsuites.PodDetails{ + { + Cmd: "echo 'hello world' > /mnt/test-1/data && grep 'hello world' /mnt/test-1/data", + Volumes: []testsuites.VolumeDetails{ + { + ClaimSize: "100Gi", + MountOptions: []string{ + "sec=krb5", + "dir_mode=0777", + "file_mode=0777", + }, + VolumeMount: testsuites.VolumeMountDetails{ + NameGenerate: "test-volume-", + MountPathGenerate: "/mnt/test-", + }, + }, + }, + }, + } + test := testsuites.DynamicallyProvisionedCmdVolumeTest{ + CSIDriver: testDriver, + Pods: pods, + StorageClassParameters: map[string]string{ + "skuName": "Premium_LRS", + "mountWithOAuthToken": "true", + "secretName": oauthSecretName, + "secretNamespace": oauthSecretNamespace, + }, + } + test.Run(ctx, cs, ns) + }) + ginkgo.It("should create a volume on demand with workload identity token mount [file.csi.azure.com]", ginkgo.Serial, func(ctx ginkgo.SpecContext) { skipIfUsingInTreeVolumePlugin() skipIfTestingInWindowsCluster() diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 32c36fb94a..9cda503377 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -48,6 +48,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework/config" "sigs.k8s.io/azurefile-csi-driver/pkg/azurefile" @@ -82,6 +83,8 @@ var ( supportEncryptInTransitwithNFS bool miRoleSetupSucceeded bool wiSetupSucceeded bool + suiteCreds *credentials.Credentials + suiteAzureClient *azure.Client // wiClientID is set during BeforeSuite after WI configuration succeeds. wiClientID string @@ -119,6 +122,8 @@ var _ = ginkgo.BeforeSuite(func(ctx ginkgo.SpecContext) { gomega.Expect(err).NotTo(gomega.HaveOccurred()) azureClient, err := azure.GetAzureClient(creds.Cloud, creds.SubscriptionID, creds.AADClientID, creds.TenantID, creds.AADClientSecret, creds.AADFederatedTokenFile) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + suiteCreds = creds + suiteAzureClient = azureClient _, err = azureClient.EnsureResourceGroup(ctx, creds.ResourceGroup, creds.Location, nil) gomega.Expect(err).NotTo(gomega.HaveOccurred()) @@ -237,6 +242,9 @@ var _ = ginkgo.BeforeSuite(func(ctx ginkgo.SpecContext) { err := azurefileDriver.Run(context.Background()) gomega.Expect(err).NotTo(gomega.HaveOccurred()) }() + + // Setup OAuth token for mountWithOAuthToken e2e test (CAPZ only) + // Moved to test case itself to ensure fresh token at mount time } }) @@ -368,7 +376,7 @@ func checkAccountCreationLeak(_ context.Context) { } ginkgo.By(fmt.Sprintf("GetAccountNumByResourceGroup(%s) returns %d accounts", creds.ResourceGroup, accountNum)) - accountLimitInTest := 17 + accountLimitInTest := 20 gomega.Expect(accountNum >= accountLimitInTest).To(gomega.BeFalse()) } @@ -547,6 +555,9 @@ func discoverOIDCIssuer(ctx context.Context, cs clientset.Interface) (string, er // waitForOIDCJWKS polls the OIDC issuer's JWKS endpoint until it returns a // valid response containing at least one signing key. +// valid response containing at least one signing key. This guards against the +// race where AAD tries to validate a SA token before the JWKS document is +// published (AADSTS7000272). func waitForOIDCJWKS(issuerURL string, timeout time.Duration) error { jwksURL := strings.TrimSuffix(issuerURL, "/") + "/openid/v1/jwks" log.Printf("Waiting up to %v for OIDC JWKS to be available at %s", timeout, jwksURL) @@ -561,7 +572,7 @@ func waitForOIDCJWKS(issuerURL string, timeout time.Duration) error { deadline := time.Now().Add(timeout) var lastErr error for time.Now().Before(deadline) { - resp, err := httpClient.Get(jwksURL) //nolint:gosec + resp, err := httpClient.Get(jwksURL) //nolint:gosec // URL is constructed from cluster OIDC issuer, not user input if err != nil { lastErr = fmt.Errorf("GET %s: %v", jwksURL, err) log.Printf("JWKS not ready: %v, retrying...", lastErr) @@ -905,3 +916,185 @@ func waitForAADTokenExchange(ctx context.Context, cs clientset.Interface, client } func int64Ptr(i int64) *int64 { return &i } + +const ( + oauthSecretName = "azure-oauth-token-secret" + oauthSecretNamespace = "default" +) + +// setupOAuthToken obtains an OAuth token from the node's managed identity and stores it +// in a Kubernetes Secret for use by mountWithOAuthToken e2e tests. +func setupOAuthToken(ctx context.Context, creds *credentials.Credentials, azureClient *azure.Client) error { + // Step 1: Get node managed identity info (uses Azure SDK with SP creds, works from Prow) + identityInfo, err := azureClient.GetNodeIdentityInfo(ctx, creds.ResourceGroup) + if err != nil { + return fmt.Errorf("failed to get node identity info: %v", err) + } + log.Printf("Found node identity: clientID=%s, principalID=%s", identityInfo.ClientID, identityInfo.PrincipalID) + + // Step 2: Assign Storage File Data SMB MI Admin role to identity + err = azureClient.AssignRoleToIdentity(ctx, creds.ResourceGroup, identityInfo.PrincipalID, wiStorageFileDataSMBMIAdmin) + if err != nil { + return fmt.Errorf("failed to assign Storage File Data SMB MI Admin role: %v", err) + } + log.Println("Storage File Data SMB MI Admin role assigned to node identity") + + // Step 3: Get OAuth token by running a pod on the workload cluster node. + // The test runner (Prow pod) cannot access IMDS (169.254.169.254) because it runs + // outside Azure VMs. Instead, we schedule a pod on an agent node that curls IMDS + // to obtain the token, then read it from the pod logs. + kubeconfig := os.Getenv(kubeconfigEnvVar) + restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + return fmt.Errorf("failed to build kubeconfig: %v", err) + } + cs, err := clientset.NewForConfig(restConfig) + if err != nil { + return fmt.Errorf("failed to create kubernetes client: %v", err) + } + + token, err := getOAuthTokenFromNode(ctx, cs, identityInfo.ClientID) + if err != nil { + return fmt.Errorf("failed to get storage OAuth token from node: %v", err) + } + log.Println("Obtained storage OAuth token from agent node via IMDS") + + // Step 4: Create Kubernetes Secret with the OAuth token + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: oauthSecretName, + Namespace: oauthSecretNamespace, + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "oauthtoken": []byte(token), + }, + } + _, err = cs.CoreV1().Secrets(oauthSecretNamespace).Create(ctx, secret, metav1.CreateOptions{}) + if err != nil { + if apierrors.IsAlreadyExists(err) { + existing, getErr := cs.CoreV1().Secrets(oauthSecretNamespace).Get(ctx, oauthSecretName, metav1.GetOptions{}) + if getErr != nil { + return fmt.Errorf("failed to get existing OAuth token secret: %v", getErr) + } + secret.ResourceVersion = existing.ResourceVersion + _, err = cs.CoreV1().Secrets(oauthSecretNamespace).Update(ctx, secret, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update OAuth token secret: %v", err) + } + } else { + return fmt.Errorf("failed to create OAuth token secret: %v", err) + } + } + log.Printf("Created/updated OAuth token secret %s/%s", oauthSecretNamespace, oauthSecretName) + + return nil +} + +// getOAuthTokenFromNode deploys a pod on a workload cluster agent node that uses IMDS +// to obtain an Azure Storage OAuth token, then reads the token from the pod logs. +func getOAuthTokenFromNode(ctx context.Context, cs clientset.Interface, clientID string) (string, error) { + namespace := "default" + + // IMDS curl command that outputs only the access_token value + curlCmd := fmt.Sprintf( + `curl -s -H "Metadata: true" "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&client_id=%s&resource=https://storage.azure.com/" | grep -o '"access_token":"[^"]*"' | cut -d'"' -f4`, + clientID, + ) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "oauth-token-fetcher-", + Namespace: namespace, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + // Use hostNetwork to ensure IMDS is accessible + HostNetwork: true, + Containers: []corev1.Container{ + { + Name: "token-fetcher", + Image: "mcr.microsoft.com/cbl-mariner/base/core:2.0", + Command: []string{"/bin/sh", "-c", curlCmd}, + }, + }, + // Schedule on agent nodes only (not control plane — no managed identity there) + NodeSelector: map[string]string{ + "kubernetes.io/os": "linux", + }, + Affinity: &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: "node-role.kubernetes.io/control-plane", + Operator: corev1.NodeSelectorOpDoesNotExist, + }, + }, + }, + }, + }, + }, + }, + }, + } + + created, err := cs.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) + if err != nil { + return "", fmt.Errorf("failed to create token fetcher pod: %v", err) + } + podName := created.Name + defer func() { + _ = cs.CoreV1().Pods(namespace).Delete(context.Background(), podName, metav1.DeleteOptions{}) + }() + + // Wait for pod to complete (up to 5 minutes) + log.Printf("Waiting for token fetcher pod %s/%s to complete...", namespace, podName) + waitCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + err = waitForPodComplete(waitCtx, cs, namespace, podName) + if err != nil { + return "", fmt.Errorf("token fetcher pod did not complete: %v", err) + } + + // Read token from pod logs + logBytes, err := cs.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{}).Do(ctx).Raw() + if err != nil { + return "", fmt.Errorf("failed to get token fetcher pod logs: %v", err) + } + + token := strings.TrimSpace(string(logBytes)) + if token == "" { + return "", fmt.Errorf("token fetcher pod returned empty token") + } + + return token, nil +} + +// waitForPodComplete polls until the pod reaches Succeeded or Failed phase. +func waitForPodComplete(ctx context.Context, cs clientset.Interface, namespace, name string) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + pod, err := cs.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + + switch pod.Status.Phase { + case corev1.PodSucceeded: + return nil + case corev1.PodFailed: + return fmt.Errorf("pod failed with reason: %s", pod.Status.Reason) + } + + time.Sleep(3 * time.Second) + } +} diff --git a/test/utils/azurefile_log.sh b/test/utils/azurefile_log.sh index 94b3f0a3b7..6430c47a15 100755 --- a/test/utils/azurefile_log.sh +++ b/test/utils/azurefile_log.sh @@ -31,7 +31,7 @@ cleanup() { trap cleanup ERR echo "print out all nodes status ..." -kubectl get nodes -o wide +kubectl get nodes -o wide --show-labels echo "======================================================================================" echo "print out all default namespace pods status ..."