Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1219,10 +1219,10 @@ public void copyToLocal(String srcPath, String destPath, Map<String, String> pro
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while copy {} to local {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy " + srcPath + "to local " + destPath, e);
throw new StarRocksException("Failed to copy " + srcPath + " to local " + destPath + ": " + e.getMessage(), e);
} catch (Exception e) {
LOG.error("Exception while copy {} to local {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy " + srcPath + "to local " + destPath, e);
throw new StarRocksException("Failed to copy " + srcPath + " to local " + destPath + ": " + e.getMessage(), e);
}
}

Expand All @@ -1235,10 +1235,10 @@ public void copyFromLocal(String srcPath, String destPath, Map<String, String> p
} catch (InterruptedIOException e) {
Thread.interrupted(); // clear interrupted flag
LOG.error("Interrupted while copy local {} to {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath + ": " + e.getMessage(), e);
} catch (Exception e) {
LOG.error("Exception while copy local {} to {} ", srcPath, destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath, e);
throw new StarRocksException("Failed to copy local " + srcPath + " to " + destPath + ": " + e.getMessage(), e);
}
}

Expand Down
134 changes: 134 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/fs/hdfs/HdfsFsManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -559,6 +562,137 @@ public void testExpirationCheckerClosesOnFallbackThreadWhenPoolFull() throws Exc
Mockito.verify(fs, Mockito.timeout(2000).times(1)).closeFileSystem();
}

/**
* Regression for cluster snapshot upload error reporting: copyToLocal wraps the
* underlying Hadoop exception in a StarRocksException, and the wrapper message
* must include the cause's message so callers that surface only
* StarRocksException#getMessage (e.g. ClusterSnapshotJob.error_message) still
* see the real reason (AccessDenied, NoSuchBucket, timeout, etc.).
*/
@Test
public void testCopyToLocalIncludesCauseMessage() throws Exception {
HdfsFsManager mgr = Mockito.spy(fileSystemManager);
FileSystem fs = Mockito.mock(FileSystem.class);
HdfsFs hdfs = Mockito.mock(HdfsFs.class);
Mockito.when(hdfs.getDFSFileSystem()).thenReturn(fs);
Mockito.doReturn(hdfs).when(mgr)
.getFileSystem(Mockito.anyString(), Mockito.anyMap(), Mockito.isNull());

String causeMsg = "AccessDenied: anonymous is not authorized to perform s3:GetObject";
Mockito.doThrow(new IOException(causeMsg))
.when(fs).copyToLocalFile(Mockito.anyBoolean(), Mockito.any(Path.class),
Mockito.any(Path.class), Mockito.anyBoolean());

StarRocksException ex = Assertions.assertThrows(StarRocksException.class,
() -> mgr.copyToLocal("s3a://bucket/key", "/tmp/dest", new HashMap<>()));
Assertions.assertTrue(ex.getMessage().contains(causeMsg),
"wrapper message must include cause: " + ex.getMessage());
Assertions.assertTrue(ex.getMessage().contains("Failed to copy s3a://bucket/key to local /tmp/dest"),
"wrapper message must include both paths: " + ex.getMessage());
Assertions.assertNotNull(ex.getCause(), "cause must still be attached");
}

/**
* Companion to {@link #testCopyToLocalIncludesCauseMessage()} for copyFromLocal.
* Uses a non-existent local source so FileUtil.copy throws naturally, then checks
* that the wrapper message preserves the underlying message.
*/
@Test
public void testCopyFromLocalIncludesCauseMessage() throws Exception {
HdfsFsManager mgr = Mockito.spy(fileSystemManager);
FileSystem fs = Mockito.mock(FileSystem.class);
HdfsFs hdfs = Mockito.mock(HdfsFs.class);
Mockito.when(hdfs.getDFSFileSystem()).thenReturn(fs);
Mockito.doReturn(hdfs).when(mgr)
.getFileSystem(Mockito.anyString(), Mockito.anyMap(), Mockito.isNull());

String nonExistentSrc = "/this/path/should/not/exist/" + System.nanoTime();
StarRocksException ex = Assertions.assertThrows(StarRocksException.class,
() -> mgr.copyFromLocal(nonExistentSrc, "s3a://bucket/key", new HashMap<>()));
Assertions.assertNotNull(ex.getCause(), "cause must still be attached");
Assertions.assertNotNull(ex.getCause().getMessage(), "cause must carry a message");
Assertions.assertTrue(ex.getMessage().contains(ex.getCause().getMessage()),
"wrapper message must include cause's message: " + ex.getMessage());
Assertions.assertTrue(ex.getMessage().contains("Failed to copy local " + nonExistentSrc
+ " to s3a://bucket/key"),
"wrapper message must include both paths: " + ex.getMessage());
}

/**
* Covers the {@link InterruptedIOException} branch of copyToLocal: the cause's
* message must still be visible in the wrapper, and the catch clears the
* thread's interrupt flag so callers don't observe spurious interruption.
*/
@Test
public void testCopyToLocalInterruptedIncludesCauseMessage() throws Exception {
HdfsFsManager mgr = Mockito.spy(fileSystemManager);
FileSystem fs = Mockito.mock(FileSystem.class);
HdfsFs hdfs = Mockito.mock(HdfsFs.class);
Mockito.when(hdfs.getDFSFileSystem()).thenReturn(fs);
Mockito.doReturn(hdfs).when(mgr)
.getFileSystem(Mockito.anyString(), Mockito.anyMap(), Mockito.isNull());

String causeMsg = "interrupted during S3 GET";
Mockito.doThrow(new InterruptedIOException(causeMsg))
.when(fs).copyToLocalFile(Mockito.anyBoolean(), Mockito.any(Path.class),
Mockito.any(Path.class), Mockito.anyBoolean());

try {
StarRocksException ex = Assertions.assertThrows(StarRocksException.class,
() -> mgr.copyToLocal("s3a://bucket/key", "/tmp/dest", new HashMap<>()));
Assertions.assertTrue(ex.getMessage().contains(causeMsg),
"wrapper message must include cause: " + ex.getMessage());
Assertions.assertTrue(ex.getMessage().contains(
"Failed to copy s3a://bucket/key to local /tmp/dest"),
"wrapper message must include both paths: " + ex.getMessage());
Assertions.assertInstanceOf(InterruptedIOException.class, ex.getCause(),
"cause must be the InterruptedIOException");
} finally {
// Defensive: the catch calls Thread.interrupted() to clear the flag.
Thread.interrupted();
}
}

/**
* Covers the {@link InterruptedIOException} branch of copyFromLocal. Uses a
* real local source file so FileUtil.copy reaches the destination
* FileSystem.create(...) call, where we inject the interrupt.
*/
@Test
public void testCopyFromLocalInterruptedIncludesCauseMessage() throws Exception {
HdfsFsManager mgr = Mockito.spy(fileSystemManager);
FileSystem fs = Mockito.mock(FileSystem.class);
HdfsFs hdfs = Mockito.mock(HdfsFs.class);
Mockito.when(hdfs.getDFSFileSystem()).thenReturn(fs);
Mockito.doReturn(hdfs).when(mgr)
.getFileSystem(Mockito.anyString(), Mockito.anyMap(), Mockito.isNull());

File srcFile = Files.createTempFile("hdfs-fs-mgr-test-", ".bin").toFile();
try {
Files.write(srcFile.toPath(), new byte[] {1, 2, 3});

String causeMsg = "interrupted during S3 PUT";
// FileUtil.copy ends up calling FileSystem.create(Path) — stub that overload directly
// (the (Path, boolean) overload isn't reached because Mockito short-circuits at the mocked
// create(Path) method without delegating to its real implementation).
Mockito.doThrow(new InterruptedIOException(causeMsg))
.when(fs).create(Mockito.any(Path.class));

StarRocksException ex = Assertions.assertThrows(StarRocksException.class,
() -> mgr.copyFromLocal(srcFile.getAbsolutePath(), "s3a://bucket/key", new HashMap<>()));
Assertions.assertTrue(ex.getMessage().contains(causeMsg),
"wrapper message must include cause: " + ex.getMessage());
Assertions.assertTrue(ex.getMessage().contains(
"Failed to copy local " + srcFile.getAbsolutePath() + " to s3a://bucket/key"),
"wrapper message must include both paths: " + ex.getMessage());
Assertions.assertInstanceOf(InterruptedIOException.class, ex.getCause(),
"cause must be the InterruptedIOException");
} finally {
srcFile.delete();
Thread.interrupted();
}
}

/**
* Fallback thread watchdog: when the close pool is full and the fallback daemon thread
* hangs, the watchdog interrupts it after the configured timeout so stuck threads
Expand Down
Loading