diff --git a/gpt_engineer/core/default/file_store.py b/gpt_engineer/core/default/file_store.py index 3700941b25..7836ae9fba 100644 --- a/gpt_engineer/core/default/file_store.py +++ b/gpt_engineer/core/default/file_store.py @@ -1,11 +1,14 @@ +import logging import tempfile from pathlib import Path -from typing import Union +from typing import Union, Set from gpt_engineer.core.files_dict import FilesDict from gpt_engineer.core.linting import Linting +logger = logging.getLogger(__name__) + class FileStore: """ @@ -28,20 +31,107 @@ class FileStore: - FilesDict: For handling collections of files. """ - def __init__(self, path: Union[str, Path, None] = None): + def __init__(self, path: Union[str, Path, None] = None, allowed_extensions: Set[str] = None): if path is None: path = Path(tempfile.mkdtemp(prefix="gpt-engineer-")) - self.working_dir = Path(path) + self.working_dir = Path(path).resolve() self.working_dir.mkdir(parents=True, exist_ok=True) self.id = self.working_dir.name.split("-")[-1] + + # Security: Optional file extension allowlist + # If None, all extensions are allowed (backward compatibility) + self.allowed_extensions = allowed_extensions + + def _validate_path(self, file_name: str) -> Path: + """ + Validates and sanitizes a file path to prevent security vulnerabilities. + + Args: + file_name: The requested file name/path from user input + + Returns: + Path: A validated, resolved path within the working directory + + Raises: + ValueError: If the path is invalid, contains dangerous patterns, + or would escape the working directory + """ + # Remove dangerous characters and normalize the path + sanitized = str(file_name).strip() + + # Block obvious path traversal attempts + dangerous_patterns = ['../', '..\\', '~/', '/etc/', '/usr/', '/var/', + '/tmp/', '/root/', '/home/', 'C:\\', 'c:\\'] + for pattern in dangerous_patterns: + if pattern.lower() in sanitized.lower(): + logger.warning(f"Blocked dangerous path pattern: {sanitized}") + raise ValueError(f"Path contains dangerous pattern: {pattern}") + + # Create path relative to working directory and resolve it + try: + # Ensure it's treated as relative to working_dir + if Path(sanitized).is_absolute(): + logger.warning(f"Blocked absolute path: {sanitized}") + raise ValueError("Absolute paths are not allowed") + + candidate_path = (self.working_dir / sanitized).resolve() + + # Critical security check: ensure resolved path is within working_dir + if not str(candidate_path).startswith(str(self.working_dir)): + logger.warning(f"Path traversal attempt blocked: {sanitized} -> {candidate_path}") + raise ValueError("Path would escape working directory") + + except (OSError, ValueError) as e: + logger.warning(f"Invalid path rejected: {sanitized} - {e}") + raise ValueError(f"Invalid file path: {sanitized}") + + # Optional: Check file extension allowlist + if self.allowed_extensions is not None: + file_suffix = candidate_path.suffix.lower() + if file_suffix not in self.allowed_extensions: + logger.warning(f"File extension not allowed: {file_suffix} for {sanitized}") + raise ValueError(f"File extension '{file_suffix}' is not allowed") + + return candidate_path def push(self, files: FilesDict): - for name, content in files.items(): - path = self.working_dir / name - path.parent.mkdir(parents=True, exist_ok=True) - with open(path, "w") as f: - f.write(content) + """ + Pushes files to the working directory with security validation. + + Args: + files: FilesDict containing file names and contents + + Returns: + self: For method chaining + + Raises: + ValueError: If any file path fails security validation + """ + successful_writes = [] + + try: + for name, content in files.items(): + # Security validation - this will raise ValueError if path is unsafe + validated_path = self._validate_path(name) + + # Ensure parent directories exist + validated_path.parent.mkdir(parents=True, exist_ok=True) + + # Write file with validated path + with open(validated_path, "w", encoding="utf-8") as f: + f.write(content) + + successful_writes.append(validated_path) + logger.debug(f"Successfully wrote file: {name} -> {validated_path}") + + except Exception as e: + # If any file fails, log the error but continue processing others + # This maintains backward compatibility while improving security + logger.error(f"Failed to write file '{name}': {e}") + # Re-raise the exception to alert calling code + raise + return self def linting(self, files: FilesDict) -> FilesDict: diff --git a/tests/core/test_file_store_security.py b/tests/core/test_file_store_security.py new file mode 100644 index 0000000000..ccd51b9dbb --- /dev/null +++ b/tests/core/test_file_store_security.py @@ -0,0 +1,167 @@ +""" +Security tests for FileStore class. + +This module tests the security fixes for path traversal vulnerabilities +in the FileStore.push() method. +""" +import pytest +import tempfile +from pathlib import Path + +from gpt_engineer.core.default.file_store import FileStore +from gpt_engineer.core.files_dict import FilesDict + + +class TestFileStoreSecurity: + """Test security features of FileStore.""" + + def test_normal_file_creation(self): + """Test that normal files are created successfully.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + files = FilesDict({ + "test.py": "print('hello world')", + "subdir/another.py": "import os" + }) + + store.push(files) + + # Verify files were created + assert (Path(temp_dir) / "test.py").exists() + assert (Path(temp_dir) / "subdir" / "another.py").exists() + + def test_path_traversal_blocked_dotdot(self): + """Test that path traversal with ../ is blocked.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + files = FilesDict({ + "../../../etc/passwd": "malicious content" + }) + + with pytest.raises(ValueError, match="dangerous pattern"): + store.push(files) + + def test_path_traversal_blocked_absolute(self): + """Test that absolute paths are blocked.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + files = FilesDict({ + "/etc/passwd": "malicious content" + }) + + with pytest.raises(ValueError, match="Absolute paths are not allowed"): + store.push(files) + + def test_path_traversal_blocked_home(self): + """Test that paths to home directory are blocked.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + files = FilesDict({ + "~/malicious.sh": "rm -rf /" + }) + + with pytest.raises(ValueError, match="dangerous pattern"): + store.push(files) + + def test_path_traversal_blocked_system_dirs(self): + """Test that system directory paths are blocked.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + + dangerous_paths = [ + "/usr/bin/malicious", + "/var/log/hijack.log", + "/root/.ssh/authorized_keys", + "C:\\Windows\\System32\\evil.exe" + ] + + for dangerous_path in dangerous_paths: + files = FilesDict({dangerous_path: "malicious content"}) + with pytest.raises(ValueError, match="dangerous pattern"): + store.push(files) + + def test_file_extension_allowlist(self): + """Test that file extension allowlist works correctly.""" + with tempfile.TemporaryDirectory() as temp_dir: + # Only allow .py and .txt files + store = FileStore(temp_dir, allowed_extensions={".py", ".txt"}) + + # This should work + good_files = FilesDict({ + "script.py": "print('hello')", + "readme.txt": "This is a readme" + }) + store.push(good_files) + + # This should be blocked + bad_files = FilesDict({ + "malware.exe": "binary content" + }) + + with pytest.raises(ValueError, match="not allowed"): + store.push(bad_files) + + def test_file_extension_allowlist_none_allows_all(self): + """Test that None allowlist allows all extensions (backward compatibility).""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir, allowed_extensions=None) + + files = FilesDict({ + "script.py": "print('hello')", + "data.json": '{"key": "value"}', + "image.png": "binary data", + "executable": "no extension" + }) + + # Should not raise any errors + store.push(files) + + def test_complex_path_traversal_attempts(self): + """Test various sophisticated path traversal attempts.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + + tricky_paths = [ + "subdir/../../../etc/passwd", # Mixed legitimate and traversal + "normal/../../../../../root/.bashrc", # Deep traversal + ".././../etc/hosts", # Variations of dots + "subdir\\..\\..\\..\\Windows\\System32\\evil.dll", # Windows style + ] + + for tricky_path in tricky_paths: + files = FilesDict({tricky_path: "malicious content"}) + with pytest.raises(ValueError): + store.push(files) + + def test_path_stays_within_working_dir_even_with_symlinks(self): + """Test that resolved paths stay within working directory.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + + # Create a legitimate subdirectory + subdir = Path(temp_dir) / "subdir" + subdir.mkdir() + + # Try to use relative path that would resolve outside working_dir + files = FilesDict({ + "subdir/../../etc/passwd": "malicious content" + }) + + with pytest.raises(ValueError, match="escape working directory"): + store.push(files) + + def test_empty_and_whitespace_paths(self): + """Test handling of empty and whitespace-only paths.""" + with tempfile.TemporaryDirectory() as temp_dir: + store = FileStore(temp_dir) + + problematic_paths = [ + "", # Empty string + " ", # Whitespace only + "\t\n", # Tabs and newlines + ] + + for bad_path in problematic_paths: + files = FilesDict({bad_path: "content"}) + with pytest.raises(ValueError): + store.push(files) \ No newline at end of file