diff --git a/pkg/azurefile/controllerserver.go b/pkg/azurefile/controllerserver.go index 16846ab1ab..4c287dd759 100644 --- a/pkg/azurefile/controllerserver.go +++ b/pkg/azurefile/controllerserver.go @@ -107,7 +107,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) if acquired := d.volumeLocks.TryAcquire(volName); !acquired { // logging the job status if it's volume cloning if req.GetVolumeContentSource() != nil { - jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{}) + jobState, percent, _, err := d.azcopy.GetAzcopyJob(volName, []string{}) return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err) } return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) @@ -1230,35 +1230,61 @@ func (d *Driver) copyFileShareByAzcopy(ctx context.Context, srcFileShareName, ds azcopyCopyOptions = append(azcopyCopyOptions, "--from-to=FileNFSFileNFS") } - jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + jobState, percent, jobid, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) + klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, jobid: %s, error: %v", jobState, percent, jobid, err) switch jobState { - case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped: + case util.AzcopyMultipleJobsFound: + klog.Warningf("Multiple copy job exist for copy of fileshare %s to %s", srcFileShareName, dstFileShareName) return err + case util.AzcopyJobCompleted: + klog.V(2).Infof("Already copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName) + return err + case util.AzcopyJobError, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped: + // Resume job that are failed with errors + klog.Errorf("Copy of fileshare %s to %s completed with status: %s, copy percent: %s%%, jobid: %s, error: %v. Will try to resume...", srcFileShareName, dstFileShareName, jobState, percent, jobid, err) + resumeAzcopyJob := func() error { + klog.V(2).Infof("Resuming azcopy job with id: %s for copying fileshare %s to %s", jobid, srcFileShareName, dstFileShareName) + if resumeOut, resumeErr := d.execAzcopyResume(jobid, authAzcopyEnv); resumeErr != nil { + klog.Errorf("Failed to resume azcopy job with id: %s for copying fileshare %s to %s, error: %v, output: %s", jobid, srcFileShareName, dstFileShareName, resumeErr, string(resumeOut)) + return fmt.Errorf("failed to resume azcopy job: %v, output: %v", resumeErr, string(resumeOut)) + } + return nil + } + timeoutFunc := func() error { + jobState, percent, jobid, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) + return fmt.Errorf("azcopy job resume status: %s, timeout waiting for resume fileshare %s:%s to %s:%s complete, current copy percent: %s%%, jobid: %s", + jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent, jobid, + ) + } + err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, resumeAzcopyJob, timeoutFunc) case util.AzcopyJobRunning: err = wait.PollUntilContextTimeout(ctx, 20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, true, func(context.Context) (bool, error) { - jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + jobState, percent, jobid, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) + klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, jobid: %s, error: %v", jobState, percent, jobid, err) if err != nil { return false, err } if jobState == util.AzcopyJobRunning { return false, nil + } else if jobState == util.AzcopyJobCompleted { + return true, nil } - return true, nil + return true, fmt.Errorf("copy job in unexpected status: %s", jobState) }) case util.AzcopyJobNotFound: klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName) execAzcopyJob := func() error { if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil { + // Just log the error here since azcopy allows to resume the failed jobs and the job will be resumed in the next re-concile loop + klog.Errorf("copy fileshare %s:%s to %s:%s failed with error: %v, output: %s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, err, string(out)) return fmt.Errorf("exec error: %v, output: %v", err, string(out)) } return nil } timeoutFunc := func() error { - jobState, percent, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) - return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent) + jobState, percent, jobid, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) + return fmt.Errorf("azcopy job status: %s, timeout waiting for copy fileshare %s:%s to %s:%s complete, current copy percent: %s%%, jobid: %s", jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent, jobid) } err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execAzcopyJob, timeoutFunc) } @@ -1290,6 +1316,22 @@ func (d *Driver) execAzcopyCopy(srcPath, dstPath string, azcopyCopyOptions, auth return cmd.CombinedOutput() } +// Only resume the job was completed with failed with errors or skipped +// Note: resume is blocking operation like copy +func (d *Driver) execAzcopyResume(jobID string, authAzcopyEnv []string) ([]byte, error) { + var azcopyResumeOptions []string + // Use --trusted-microsoft-suffixes option to avoid failure caused by + if d.requiredAzCopyToTrust { + azcopyResumeOptions = append(azcopyResumeOptions, fmt.Sprintf("--trusted-microsoft-suffixes=%s", d.getStorageEndPointSuffix())) + } + cmd := exec.Command("azcopy", "jobs", "resume", jobID) + cmd.Args = append(cmd.Args, azcopyResumeOptions...) + if len(authAzcopyEnv) > 0 { + cmd.Env = append(os.Environ(), authAzcopyEnv...) + } + return cmd.CombinedOutput() +} + // ControllerExpandVolume controller expand volume func (d *Driver) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { volumeID := req.GetVolumeId() diff --git a/pkg/util/util.go b/pkg/util/util.go index 6b9ffd1f81..b4edc8ec03 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -41,6 +41,7 @@ const ( AzcopyJobCompletedWithErrors AzcopyJobState = "CompletedWithErrors" AzcopyJobCompletedWithSkipped AzcopyJobState = "CompletedWithSkipped" AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped" + AzcopyMultipleJobsFound AzcopyJobState = "MultipleJobsFound" ) // RoundUpBytes rounds up the volume size in bytes up to multiplications of GiB @@ -98,7 +99,7 @@ type Azcopy struct { } // GetAzcopyJob get the azcopy job status if job existed -func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (AzcopyJobState, string, error) { +func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (AzcopyJobState, string, string, error) { cmdStr := fmt.Sprintf("azcopy jobs list | grep %s -B 3", dstFileshare) // cmd output example: // JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9 @@ -114,18 +115,18 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc // if grep command returns nothing, the exec will return exit status 1 error, so filter this error if err != nil && err.Error() != "exit status 1" { klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError) - return AzcopyJobError, "", fmt.Errorf("couldn't list jobs in azcopy %v", err) + return AzcopyJobError, "", "", fmt.Errorf("couldn't list jobs in azcopy %v", err) } jobid, jobState, err := parseAzcopyJobList(out) if err != nil || jobState == AzcopyJobError { klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState) - return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job list in azcopy %v", err) + return AzcopyJobError, "", jobid, fmt.Errorf("couldn't parse azcopy job list in azcopy %v", err) } if jobState == AzcopyJobCompleted || jobState == AzcopyJobCompletedWithErrors || jobState == AzcopyJobCompletedWithSkipped || jobState == AzcopyJobCompletedWithErrorsAndSkipped { - return jobState, "100.0", err + return jobState, "100.0", jobid, err } if jobid == "" { - return jobState, "", err + return jobState, "", jobid, err } cmdPercentStr := fmt.Sprintf("azcopy jobs show %s | grep Percent", jobid) // cmd out example: @@ -133,14 +134,14 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc summary, err := ac.ExecCmd.RunCommand(cmdPercentStr, authAzcopyEnv) if err != nil { klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError) - return AzcopyJobError, "", fmt.Errorf("couldn't show jobs summary in azcopy %v", err) + return AzcopyJobError, "", jobid, fmt.Errorf("couldn't show jobs summary in azcopy %v", err) } jobState, percent, err := parseAzcopyJobShow(summary) if err != nil || jobState == AzcopyJobError { klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState) - return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job show in azcopy %v", err) + return AzcopyJobError, "", jobid, fmt.Errorf("couldn't parse azcopy job show in azcopy %v", err) } - return jobState, percent, nil + return jobState, percent, jobid, nil } func (ac *Azcopy) CleanJobs() (string, error) { @@ -154,9 +155,16 @@ func parseAzcopyJobList(joblist string) (string, AzcopyJobState, error) { if len(jobSegments) < 2 { return jobid, AzcopyJobNotFound, nil } + // If there are multiple jobs just return error + if len(jobSegments) > 2 { + return jobid, AzcopyMultipleJobsFound, fmt.Errorf("multiple copy jobs found ") + } jobSegments = jobSegments[1:] for _, job := range jobSegments { segments := strings.Split(job, "\n") + if jobid == "" { + jobid = segments[0] + } if len(segments) < 4 { return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list: %s", job) } diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 6b0ba34370..4259df82f8 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -197,9 +197,9 @@ func TestGetAzcopyJob(t *testing.T) { } azcopyFunc := &Azcopy{ExecCmd: m} - jobState, percent, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{}) + jobState, percent, jobid, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{}) if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) { - t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr) + t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, jobid: %v, err: %v, expected jobState: %v, percent: %v, jobid: %v, err: %v", test.desc, jobState, percent, jobid, err, test.expectedJobState, test.expectedPercent, jobid, test.expectedErr) } } }