From 36744732ef183eacce8ea3afbcf554737109ed0a Mon Sep 17 00:00:00 2001 From: ddl-rliu Date: Tue, 12 May 2026 17:03:42 -0700 Subject: [PATCH 1/2] Retry storage operation on transient idle connection error Workflows with 100+ large (100MB+) file outputs lead to failures in the sidecar. These failures appear to be due to transient errors during the upload phase. Large file outputs (anything > 5MB) lead to a MultipartUpload, and flytecopilot attempts to parallelize file uploads. However, it appears the parallelism causes idle connections from a set of completed uploads to interfere with in-progress connections for other uploads. Since the failures are transient, implement a fix where copilot simply initiates a retry (up to 5 max retries) if a transient error is seen during a flytecopilot raw file upload. This fix is consistent with a similar issue seen previously in Flyte storage writes: https://github.com/flyteorg/flyteadmin/pull/325 --- flytecopilot/data/retry.go | 68 +++++++++++++++++++++++++++++++++ flytecopilot/data/retry_test.go | 57 +++++++++++++++++++++++++++ flytecopilot/data/utils.go | 19 ++++----- 3 files changed, 135 insertions(+), 9 deletions(-) create mode 100644 flytecopilot/data/retry.go create mode 100644 flytecopilot/data/retry_test.go diff --git a/flytecopilot/data/retry.go b/flytecopilot/data/retry.go new file mode 100644 index 00000000000..5633464dbbd --- /dev/null +++ b/flytecopilot/data/retry.go @@ -0,0 +1,68 @@ +package data + +import ( + "context" + stderrors "errors" + "strings" + "time" + + "github.com/flyteorg/flyte/flytestdlib/logger" +) + +const ( + uploadFileRetryMaxAttemptIndex = 5 + uploadFileRetryDelay = 2 * time.Second +) + +const ( + errMsgHTTPServerClosedIdleConn = "http: server closed idle connection" + errMsgUseOfClosedNetworkConn = "use of closed network connection" + errMsgEOF = "EOF" + errMsgWriteBrokenPipe = "write: broken pipe" +) + +// See flyteadmin/pkg/async.RetryOnSpecificErrors +func retryOnSpecificErrors(ctx context.Context, attempts int, delay time.Duration, f func() error, isErrorRetryable func(error) bool) error { + var err error + for attempt := 0; attempt <= attempts; attempt++ { + err = f() + if err == nil { + return nil + } + if !isErrorRetryable(err) { + return err + } + if attempt == attempts { + return err + } + logger.Warningf(ctx, "Failed [%v] on attempt %d of %d; retrying after %v", err, attempt, attempts, delay) + select { + case <-time.After(delay): + case <-ctx.Done(): + return ctx.Err() + } + } + return err +} + +func isTransientStorageWriteError(err error) bool { + if err == nil { + return false + } + if errorChainContainsSubstring(err, errMsgHTTPServerClosedIdleConn) || + errorChainContainsSubstring(err, errMsgUseOfClosedNetworkConn) || + errorChainContainsSubstring(err, errMsgEOF) || + errorChainContainsSubstring(err, errMsgWriteBrokenPipe) { + return true + } + return false +} + +func errorChainContainsSubstring(err error, needle string) bool { + for cur := err; cur != nil; cur = stderrors.Unwrap(cur) { + if strings.Contains(cur.Error(), needle) { + return true + } + } + return false +} diff --git a/flytecopilot/data/retry_test.go b/flytecopilot/data/retry_test.go new file mode 100644 index 00000000000..4d16f464192 --- /dev/null +++ b/flytecopilot/data/retry_test.go @@ -0,0 +1,57 @@ +package data + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +var s3LikeError = errors.Wrap(errors.New( + `Put "https://example.s3.us-west-2.amazonaws.com/key?partNumber=11&uploadId=x": http: server closed idle connection`), + `PutObject, putting object: MultipartUpload: upload multipart failed +caused by: RequestError: send request failed +caused by: Put "https://example.s3.us-west-2.amazonaws.com/key?partNumber=11&uploadId=x"`) + +func TestRetryOnSpecificErrors_SucceedsFirstTry(t *testing.T) { + ctx := context.Background() + calls := 0 + err := retryOnSpecificErrors(ctx, 3, time.Millisecond, func() error { + calls++ + return nil + }, isTransientStorageWriteError) + assert.NoError(t, err) + assert.Equal(t, 1, calls) +} + +func TestRetryOnSpecificErrors_RetriesThenSucceeds(t *testing.T) { + ctx := context.Background() + calls := 0 + err := retryOnSpecificErrors(ctx, 5, time.Millisecond, func() error { + calls++ + if calls < 3 { + return s3LikeError + } + return nil + }, isTransientStorageWriteError) + assert.NoError(t, err) + assert.Equal(t, 3, calls) +} + +func TestRetryOnSpecificErrors_NonRetryableStops(t *testing.T) { + ctx := context.Background() + calls := 0 + want := errors.New("permanent") + err := retryOnSpecificErrors(ctx, 5, time.Millisecond, func() error { + calls++ + return want + }, isTransientStorageWriteError) + assert.ErrorIs(t, err, want) + assert.Equal(t, 1, calls) +} + +func TestIsTransientStorageWriteError(t *testing.T) { + assert.True(t, isTransientStorageWriteError(s3LikeError)) +} diff --git a/flytecopilot/data/utils.go b/flytecopilot/data/utils.go index 923c75f908b..582138e9178 100644 --- a/flytecopilot/data/utils.go +++ b/flytecopilot/data/utils.go @@ -46,17 +46,18 @@ func IsFileReadable(fpath string, ignoreExtension bool) (string, os.FileInfo, er // Uploads a file to the data store. func UploadFileToStorage(ctx context.Context, filePath string, toPath storage.DataReference, size int64, store *storage.DataStore) error { - f, err := os.Open(filePath) - if err != nil { - return err - } - defer func() { - err := f.Close() + return retryOnSpecificErrors(ctx, uploadFileRetryMaxAttemptIndex, uploadFileRetryDelay, func() error { + f, err := os.Open(filePath) if err != nil { - logger.Errorf(ctx, "failed to close blob file at path [%s]", filePath) + return err } - }() - return store.WriteRaw(ctx, toPath, size, storage.Options{}, f) + defer func() { + if cerr := f.Close(); cerr != nil { + logger.Errorf(ctx, "failed to close blob file at path [%s]", filePath) + } + }() + return store.WriteRaw(ctx, toPath, size, storage.Options{}, f) + }, isTransientStorageWriteError) } func DownloadFileFromStorage(ctx context.Context, ref storage.DataReference, store *storage.DataStore) (io.ReadCloser, error) { From 6fd96aee766236d7fa44f285ab0505b0a25977ff Mon Sep 17 00:00:00 2001 From: ddl-rliu Date: Thu, 14 May 2026 15:16:44 -0700 Subject: [PATCH 2/2] Retry storage operation on all errors As part of the previous fix, flytecopilot simply initiates a retry (up to 5 max retries) if a transient error is seen during a flytecopilot raw file upload. This corresponds to the following list of four specific transient network errors: - "http: server closed idle connection" - "use of closed network connection" - "EOF" - "write: broken pipe" These four network errors were discovered during the local reproduction of the issue. However, during a live run, a new transient network error "WebidentityErr: failed to retrieve credentials ... status code: 408" was observed (Note the status code 408 in particular). In order to update the retry logic to be more robust against unanticipated transient errors, we relax the retry logic to retry on all errors. --- flytecopilot/data/retry.go | 31 ++----------------------------- flytecopilot/data/retry_test.go | 20 ++------------------ flytecopilot/data/utils.go | 2 +- 3 files changed, 5 insertions(+), 48 deletions(-) diff --git a/flytecopilot/data/retry.go b/flytecopilot/data/retry.go index 5633464dbbd..a8e1e9109f3 100644 --- a/flytecopilot/data/retry.go +++ b/flytecopilot/data/retry.go @@ -2,8 +2,6 @@ package data import ( "context" - stderrors "errors" - "strings" "time" "github.com/flyteorg/flyte/flytestdlib/logger" @@ -14,13 +12,6 @@ const ( uploadFileRetryDelay = 2 * time.Second ) -const ( - errMsgHTTPServerClosedIdleConn = "http: server closed idle connection" - errMsgUseOfClosedNetworkConn = "use of closed network connection" - errMsgEOF = "EOF" - errMsgWriteBrokenPipe = "write: broken pipe" -) - // See flyteadmin/pkg/async.RetryOnSpecificErrors func retryOnSpecificErrors(ctx context.Context, attempts int, delay time.Duration, f func() error, isErrorRetryable func(error) bool) error { var err error @@ -45,24 +36,6 @@ func retryOnSpecificErrors(ctx context.Context, attempts int, delay time.Duratio return err } -func isTransientStorageWriteError(err error) bool { - if err == nil { - return false - } - if errorChainContainsSubstring(err, errMsgHTTPServerClosedIdleConn) || - errorChainContainsSubstring(err, errMsgUseOfClosedNetworkConn) || - errorChainContainsSubstring(err, errMsgEOF) || - errorChainContainsSubstring(err, errMsgWriteBrokenPipe) { - return true - } - return false -} - -func errorChainContainsSubstring(err error, needle string) bool { - for cur := err; cur != nil; cur = stderrors.Unwrap(cur) { - if strings.Contains(cur.Error(), needle) { - return true - } - } - return false +func retryOnAllErrors(err error) bool { + return true } diff --git a/flytecopilot/data/retry_test.go b/flytecopilot/data/retry_test.go index 4d16f464192..9be78ad0364 100644 --- a/flytecopilot/data/retry_test.go +++ b/flytecopilot/data/retry_test.go @@ -21,7 +21,7 @@ func TestRetryOnSpecificErrors_SucceedsFirstTry(t *testing.T) { err := retryOnSpecificErrors(ctx, 3, time.Millisecond, func() error { calls++ return nil - }, isTransientStorageWriteError) + }, retryOnAllErrors) assert.NoError(t, err) assert.Equal(t, 1, calls) } @@ -35,23 +35,7 @@ func TestRetryOnSpecificErrors_RetriesThenSucceeds(t *testing.T) { return s3LikeError } return nil - }, isTransientStorageWriteError) + }, retryOnAllErrors) assert.NoError(t, err) assert.Equal(t, 3, calls) } - -func TestRetryOnSpecificErrors_NonRetryableStops(t *testing.T) { - ctx := context.Background() - calls := 0 - want := errors.New("permanent") - err := retryOnSpecificErrors(ctx, 5, time.Millisecond, func() error { - calls++ - return want - }, isTransientStorageWriteError) - assert.ErrorIs(t, err, want) - assert.Equal(t, 1, calls) -} - -func TestIsTransientStorageWriteError(t *testing.T) { - assert.True(t, isTransientStorageWriteError(s3LikeError)) -} diff --git a/flytecopilot/data/utils.go b/flytecopilot/data/utils.go index 582138e9178..0b229118a2e 100644 --- a/flytecopilot/data/utils.go +++ b/flytecopilot/data/utils.go @@ -57,7 +57,7 @@ func UploadFileToStorage(ctx context.Context, filePath string, toPath storage.Da } }() return store.WriteRaw(ctx, toPath, size, storage.Options{}, f) - }, isTransientStorageWriteError) + }, retryOnAllErrors) } func DownloadFileFromStorage(ctx context.Context, ref storage.DataReference, store *storage.DataStore) (io.ReadCloser, error) {