-
Notifications
You must be signed in to change notification settings - Fork 170
fix(clone): resume errored azcopy jobs during clone creation #3095
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -107,7 +107,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) | |||||
| if acquired := d.volumeLocks.TryAcquire(volName); !acquired { | ||||||
| // logging the job status if it's volume cloning | ||||||
| if req.GetVolumeContentSource() != nil { | ||||||
| jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{}) | ||||||
| jobState, percent, _, err := d.azcopy.GetAzcopyJob(volName, []string{}) | ||||||
| return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err) | ||||||
| } | ||||||
| return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) | ||||||
|
|
@@ -1230,35 +1230,61 @@ func (d *Driver) copyFileShareByAzcopy(ctx context.Context, srcFileShareName, ds | |||||
| azcopyCopyOptions = append(azcopyCopyOptions, "--from-to=FileNFSFileNFS") | ||||||
| } | ||||||
|
|
||||||
| jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) | ||||||
| klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) | ||||||
| jobState, percent, jobid, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) | ||||||
| klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, jobid: %s, error: %v", jobState, percent, jobid, err) | ||||||
|
|
||||||
| switch jobState { | ||||||
| case util.AzcopyJobError, util.AzcopyJobCompleted, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped: | ||||||
| case util.AzcopyMultipleJobsFound: | ||||||
| klog.Warningf("Multiple copy job exist for copy of fileshare %s to %s", srcFileShareName, dstFileShareName) | ||||||
| return err | ||||||
| case util.AzcopyJobCompleted: | ||||||
| klog.V(2).Infof("Already copied fileshare %s to %s successfully", srcFileShareName, dstFileShareName) | ||||||
| return err | ||||||
| case util.AzcopyJobError, util.AzcopyJobCompletedWithErrors, util.AzcopyJobCompletedWithSkipped, util.AzcopyJobCompletedWithErrorsAndSkipped: | ||||||
| // Resume job that are failed with errors | ||||||
| klog.Errorf("Copy of fileshare %s to %s completed with status: %s, copy percent: %s%%, jobid: %s, error: %v. Will try to resume...", srcFileShareName, dstFileShareName, jobState, percent, jobid, err) | ||||||
| resumeAzcopyJob := func() error { | ||||||
| klog.V(2).Infof("Resuming azcopy job with id: %s for copying fileshare %s to %s", jobid, srcFileShareName, dstFileShareName) | ||||||
| if resumeOut, resumeErr := d.execAzcopyResume(jobid, authAzcopyEnv); resumeErr != nil { | ||||||
| klog.Errorf("Failed to resume azcopy job with id: %s for copying fileshare %s to %s, error: %v, output: %s", jobid, srcFileShareName, dstFileShareName, resumeErr, string(resumeOut)) | ||||||
|
Comment on lines
+1246
to
+1249
|
||||||
| return fmt.Errorf("failed to resume azcopy job: %v, output: %v", resumeErr, string(resumeOut)) | ||||||
| } | ||||||
| return nil | ||||||
| } | ||||||
| timeoutFunc := func() error { | ||||||
| jobState, percent, jobid, _ := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) | ||||||
| return fmt.Errorf("azcopy job resume status: %s, timeout waiting for resume fileshare %s:%s to %s:%s complete, current copy percent: %s%%, jobid: %s", | ||||||
| jobState, srcAccountName, srcFileShareName, dstAccountName, dstFileShareName, percent, jobid, | ||||||
| ) | ||||||
| } | ||||||
| err = util.WaitUntilTimeout(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, resumeAzcopyJob, timeoutFunc) | ||||||
| case util.AzcopyJobRunning: | ||||||
| err = wait.PollUntilContextTimeout(ctx, 20*time.Second, time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, true, func(context.Context) (bool, error) { | ||||||
| jobState, percent, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) | ||||||
| klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) | ||||||
| jobState, percent, jobid, err := d.azcopy.GetAzcopyJob(dstFileShareName, authAzcopyEnv) | ||||||
| klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, jobid: %s, error: %v", jobState, percent, jobid, err) | ||||||
| if err != nil { | ||||||
| return false, err | ||||||
| } | ||||||
| if jobState == util.AzcopyJobRunning { | ||||||
| return false, nil | ||||||
| } else if jobState == util.AzcopyJobCompleted { | ||||||
| return true, nil | ||||||
| } | ||||||
| return true, nil | ||||||
| return true, fmt.Errorf("copy job in unexpected status: %s", jobState) | ||||||
| }) | ||||||
| case util.AzcopyJobNotFound: | ||||||
| klog.V(2).Infof("copy fileshare %s:%s to %s:%s", srcAccountName, srcFileShareName, dstAccountName, dstFileShareName) | ||||||
| execAzcopyJob := func() error { | ||||||
| if out, err := d.execAzcopyCopy(srcPathAuth, dstPath, azcopyCopyOptions, authAzcopyEnv); err != nil { | ||||||
| // Just log the error here since azcopy allows to resume the failed jobs and the job will be resumed in the next re-concile loop | ||||||
|
||||||
| // Just log the error here since azcopy allows to resume the failed jobs and the job will be resumed in the next re-concile loop | |
| // Just log the error here since azcopy allows to resume the failed jobs and the job will be resumed in the next reconcile loop |
Copilot
AI
Apr 18, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment "Use --trusted-microsoft-suffixes option to avoid failure caused by" is incomplete, which makes it unclear what failure mode this option is addressing. Please either complete the sentence (e.g., describe the specific trust/suffix validation issue) or remove the comment if it’s redundant with other documentation.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,6 +41,7 @@ const ( | |
| AzcopyJobCompletedWithErrors AzcopyJobState = "CompletedWithErrors" | ||
| AzcopyJobCompletedWithSkipped AzcopyJobState = "CompletedWithSkipped" | ||
| AzcopyJobCompletedWithErrorsAndSkipped AzcopyJobState = "CompletedWithErrorsAndSkipped" | ||
| AzcopyMultipleJobsFound AzcopyJobState = "MultipleJobsFound" | ||
| ) | ||
|
|
||
| // RoundUpBytes rounds up the volume size in bytes up to multiplications of GiB | ||
|
|
@@ -98,7 +99,7 @@ type Azcopy struct { | |
| } | ||
|
|
||
| // GetAzcopyJob get the azcopy job status if job existed | ||
| func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (AzcopyJobState, string, error) { | ||
| func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (AzcopyJobState, string, string, error) { | ||
| cmdStr := fmt.Sprintf("azcopy jobs list | grep %s -B 3", dstFileshare) | ||
| // cmd output example: | ||
| // JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9 | ||
|
|
@@ -114,33 +115,33 @@ func (ac *Azcopy) GetAzcopyJob(dstFileshare string, authAzcopyEnv []string) (Azc | |
| // if grep command returns nothing, the exec will return exit status 1 error, so filter this error | ||
| if err != nil && err.Error() != "exit status 1" { | ||
| klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError) | ||
| return AzcopyJobError, "", fmt.Errorf("couldn't list jobs in azcopy %v", err) | ||
| return AzcopyJobError, "", "", fmt.Errorf("couldn't list jobs in azcopy %v", err) | ||
| } | ||
| jobid, jobState, err := parseAzcopyJobList(out) | ||
| if err != nil || jobState == AzcopyJobError { | ||
| klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState) | ||
| return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job list in azcopy %v", err) | ||
| return AzcopyJobError, "", jobid, fmt.Errorf("couldn't parse azcopy job list in azcopy %v", err) | ||
| } | ||
| if jobState == AzcopyJobCompleted || jobState == AzcopyJobCompletedWithErrors || jobState == AzcopyJobCompletedWithSkipped || jobState == AzcopyJobCompletedWithErrorsAndSkipped { | ||
| return jobState, "100.0", err | ||
| return jobState, "100.0", jobid, err | ||
| } | ||
| if jobid == "" { | ||
| return jobState, "", err | ||
| return jobState, "", jobid, err | ||
| } | ||
|
Comment on lines
120
to
130
|
||
| cmdPercentStr := fmt.Sprintf("azcopy jobs show %s | grep Percent", jobid) | ||
| // cmd out example: | ||
| // Percent Complete (approx): 100.0 | ||
| summary, err := ac.ExecCmd.RunCommand(cmdPercentStr, authAzcopyEnv) | ||
| if err != nil { | ||
| klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, AzcopyJobError) | ||
| return AzcopyJobError, "", fmt.Errorf("couldn't show jobs summary in azcopy %v", err) | ||
| return AzcopyJobError, "", jobid, fmt.Errorf("couldn't show jobs summary in azcopy %v", err) | ||
| } | ||
| jobState, percent, err := parseAzcopyJobShow(summary) | ||
| if err != nil || jobState == AzcopyJobError { | ||
| klog.Warningf("failed to get azcopy job with error: %v, jobState: %v", err, jobState) | ||
| return AzcopyJobError, "", fmt.Errorf("couldn't parse azcopy job show in azcopy %v", err) | ||
| return AzcopyJobError, "", jobid, fmt.Errorf("couldn't parse azcopy job show in azcopy %v", err) | ||
| } | ||
| return jobState, percent, nil | ||
| return jobState, percent, jobid, nil | ||
| } | ||
|
|
||
| func (ac *Azcopy) CleanJobs() (string, error) { | ||
|
|
@@ -154,9 +155,16 @@ func parseAzcopyJobList(joblist string) (string, AzcopyJobState, error) { | |
| if len(jobSegments) < 2 { | ||
| return jobid, AzcopyJobNotFound, nil | ||
| } | ||
| // If there are multiple jobs just return error | ||
| if len(jobSegments) > 2 { | ||
| return jobid, AzcopyMultipleJobsFound, fmt.Errorf("multiple copy jobs found ") | ||
| } | ||
| jobSegments = jobSegments[1:] | ||
| for _, job := range jobSegments { | ||
| segments := strings.Split(job, "\n") | ||
| if jobid == "" { | ||
| jobid = segments[0] | ||
| } | ||
| if len(segments) < 4 { | ||
| return jobid, AzcopyJobError, fmt.Errorf("error parsing jobs list: %s", job) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -197,9 +197,9 @@ func TestGetAzcopyJob(t *testing.T) { | |||||
| } | ||||||
|
|
||||||
| azcopyFunc := &Azcopy{ExecCmd: m} | ||||||
| jobState, percent, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{}) | ||||||
| jobState, percent, jobid, err := azcopyFunc.GetAzcopyJob(dstFileshare, []string{}) | ||||||
| if jobState != test.expectedJobState || percent != test.expectedPercent || !reflect.DeepEqual(err, test.expectedErr) { | ||||||
| t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, err, test.expectedJobState, test.expectedPercent, test.expectedErr) | ||||||
| t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, jobid: %v, err: %v, expected jobState: %v, percent: %v, jobid: %v, err: %v", test.desc, jobState, percent, jobid, err, test.expectedJobState, test.expectedPercent, jobid, test.expectedErr) | ||||||
|
||||||
| t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, jobid: %v, err: %v, expected jobState: %v, percent: %v, jobid: %v, err: %v", test.desc, jobState, percent, jobid, err, test.expectedJobState, test.expectedPercent, jobid, test.expectedErr) | |
| t.Errorf("test[%s]: unexpected jobState: %v, percent: %v, jobid: %v, err: %v, expected jobState: %v, percent: %v, err: %v", test.desc, jobState, percent, jobid, err, test.expectedJobState, test.expectedPercent, test.expectedErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resume path includes
util.AzcopyJobError, butAzcopyJobErrorcan mean command/list/show/parsing failures (not a resumable AzCopy job). Resuming in this state can mask the real error and also commonly runs with an empty/unknown jobID. Consider handlingAzcopyJobErrorby returning the original error (or re-running copy) and only attemptingjobs resumefor the explicit completed-with-errors/skipped states whenjobidis non-empty.