Implement filesystem sandbox for Node.js and Python scripts

Co-authored-by: whyour <22700758+whyour@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2025-11-17 12:35:03 +00:00
parent 3c2d782ec8
commit 5267cd03e0
6 changed files with 658 additions and 0 deletions

233
shell/preload/sandbox.js Normal file
View File

@ -0,0 +1,233 @@
const Module = require('module');
const path = require('path');
const fs = require('fs');
// Get the QL_DIR and data directory paths
const qlDir = process.env.QL_DIR || path.join(__dirname, '../../');
let dataDir = process.env.QL_DATA_DIR || path.join(qlDir, 'data');
// Remove trailing slash if present
dataDir = dataDir.replace(/\/$/, '');
// Normalize paths to avoid bypassing with relative paths or symlinks
const normalizedQlDir = fs.existsSync(qlDir) ? fs.realpathSync(qlDir) : path.resolve(qlDir);
const normalizedDataDir = fs.existsSync(dataDir) ? fs.realpathSync(dataDir) : path.resolve(dataDir);
// Protected directories - no write access allowed
const protectedPaths = [
path.join(normalizedQlDir, 'back'),
path.join(normalizedQlDir, 'src'),
path.join(normalizedQlDir, 'shell'),
path.join(normalizedQlDir, 'sample'),
path.join(normalizedQlDir, 'node_modules'),
path.join(normalizedDataDir, 'config'),
path.join(normalizedDataDir, 'db'),
];
// Allowed write directories - scripts can write here
const allowedWritePaths = [
path.join(normalizedDataDir, 'scripts'),
path.join(normalizedDataDir, 'log'),
path.join(normalizedDataDir, 'repo'),
path.join(normalizedDataDir, 'raw'),
path.join(normalizedQlDir, '.tmp'),
'/tmp',
];
// Check if sandboxing is enabled (default: true)
const sandboxEnabled = process.env.QL_DISABLE_SANDBOX !== 'true';
function isPathProtected(targetPath) {
if (!sandboxEnabled) {
return false;
}
try {
// Resolve to absolute path and follow symlinks
const resolvedPath = fs.realpathSync.native ?
fs.realpathSync.native(targetPath) :
path.resolve(targetPath);
// Check if path is in a protected directory
for (const protectedPath of protectedPaths) {
if (resolvedPath.startsWith(protectedPath + path.sep) || resolvedPath === protectedPath) {
// Check if it's in an allowed subdirectory
const isInAllowedPath = allowedWritePaths.some(allowedPath =>
resolvedPath.startsWith(allowedPath + path.sep) || resolvedPath === allowedPath
);
if (!isInAllowedPath) {
return true;
}
}
}
// Also check if trying to write outside data/scripts without being in allowed paths
const isInQlDir = resolvedPath.startsWith(normalizedQlDir + path.sep) || resolvedPath === normalizedQlDir;
const isInDataDir = resolvedPath.startsWith(normalizedDataDir + path.sep) || resolvedPath === normalizedDataDir;
if (isInQlDir || isInDataDir) {
const isInAllowedPath = allowedWritePaths.some(allowedPath =>
resolvedPath.startsWith(allowedPath + path.sep) || resolvedPath === allowedPath
);
if (!isInAllowedPath) {
return true;
}
}
return false;
} catch (err) {
// If path doesn't exist yet, check parent directory
const parentPath = path.dirname(targetPath);
if (parentPath !== targetPath) {
return isPathProtected(parentPath);
}
return false;
}
}
function createSecurityError(operation, targetPath) {
const err = new Error(
`Security Error: Script attempted to ${operation} protected path: ${targetPath}\n` +
`Scripts are only allowed to write to: ${allowedWritePaths.join(', ')}`
);
err.code = 'EACCES';
return err;
}
// Store original fs methods
const originalFS = {};
const writeOperations = [
'writeFile', 'writeFileSync',
'appendFile', 'appendFileSync',
'mkdir', 'mkdirSync',
'rmdir', 'rmdirSync',
'unlink', 'unlinkSync',
'rm', 'rmSync',
'rename', 'renameSync',
'copyFile', 'copyFileSync',
'chmod', 'chmodSync',
'chown', 'chownSync',
'link', 'linkSync',
'symlink', 'symlinkSync',
'truncate', 'truncateSync',
'utimes', 'utimesSync',
];
// Wrap fs methods
for (const method of writeOperations) {
if (fs[method]) {
originalFS[method] = fs[method];
}
}
function wrapFsMethod(method, isSync) {
return function(...args) {
const targetPath = args[0];
if (isPathProtected(targetPath)) {
const err = createSecurityError(method, targetPath);
if (isSync) {
throw err;
} else {
const callback = args[args.length - 1];
if (typeof callback === 'function') {
process.nextTick(() => callback(err));
return;
}
throw err;
}
}
// For rename/copy operations, check destination too
if ((method.startsWith('rename') || method.startsWith('copy')) && args[1]) {
if (isPathProtected(args[1])) {
const err = createSecurityError(method, args[1]);
if (isSync) {
throw err;
} else {
const callback = args[args.length - 1];
if (typeof callback === 'function') {
process.nextTick(() => callback(err));
return;
}
throw err;
}
}
}
return originalFS[method].apply(fs, args);
};
}
// Apply wrappers
if (sandboxEnabled) {
for (const method of writeOperations) {
if (fs[method]) {
const isSync = method.endsWith('Sync');
fs[method] = wrapFsMethod(method, isSync);
}
}
// Wrap createWriteStream
originalFS.createWriteStream = fs.createWriteStream;
fs.createWriteStream = function(targetPath, options) {
if (isPathProtected(targetPath)) {
throw createSecurityError('createWriteStream', targetPath);
}
return originalFS.createWriteStream.call(fs, targetPath, options);
};
// Wrap promises API if it exists
if (fs.promises) {
const promisesOriginal = {};
const promisesMethods = [
'writeFile', 'appendFile', 'mkdir', 'rmdir', 'unlink', 'rm',
'rename', 'copyFile', 'chmod', 'chown', 'link', 'symlink',
'truncate', 'utimes',
];
for (const method of promisesMethods) {
if (fs.promises[method]) {
promisesOriginal[method] = fs.promises[method];
fs.promises[method] = async function(...args) {
const targetPath = args[0];
if (isPathProtected(targetPath)) {
throw createSecurityError(method, targetPath);
}
// For rename/copy operations, check destination too
if ((method === 'rename' || method === 'copyFile') && args[1]) {
if (isPathProtected(args[1])) {
throw createSecurityError(method, args[1]);
}
}
return promisesOriginal[method].apply(fs.promises, args);
};
}
}
}
}
// Prevent requiring the original fs module to bypass sandbox
const originalRequire = Module.prototype.require;
Module.prototype.require = function(id) {
const module = originalRequire.apply(this, arguments);
// Return wrapped fs module
if (id === 'fs' || id === 'node:fs') {
return fs;
}
return module;
};
module.exports = {
sandboxEnabled,
isPathProtected,
protectedPaths,
allowedWritePaths,
};

326
shell/preload/sandbox.py Normal file
View File

@ -0,0 +1,326 @@
import os
import sys
import builtins
from pathlib import Path
# Get the QL_DIR and data directory paths
ql_dir = os.environ.get('QL_DIR', os.path.join(os.path.dirname(__file__), '../..'))
data_dir = os.environ.get('QL_DATA_DIR', os.path.join(ql_dir, 'data'))
# Normalize paths to avoid bypassing with relative paths or symlinks
try:
normalized_ql_dir = os.path.realpath(ql_dir)
normalized_data_dir = os.path.realpath(data_dir)
except:
normalized_ql_dir = os.path.abspath(ql_dir)
normalized_data_dir = os.path.abspath(data_dir)
# Protected directories - no write access allowed
protected_paths = [
os.path.join(normalized_ql_dir, 'back'),
os.path.join(normalized_ql_dir, 'src'),
os.path.join(normalized_ql_dir, 'shell'),
os.path.join(normalized_ql_dir, 'sample'),
os.path.join(normalized_ql_dir, 'node_modules'),
os.path.join(normalized_data_dir, 'config'),
os.path.join(normalized_data_dir, 'db'),
]
# Allowed write directories - scripts can write here
allowed_write_paths = [
os.path.join(normalized_data_dir, 'scripts'),
os.path.join(normalized_data_dir, 'log'),
os.path.join(normalized_data_dir, 'repo'),
os.path.join(normalized_data_dir, 'raw'),
os.path.join(normalized_ql_dir, '.tmp'),
'/tmp',
]
# Check if sandboxing is enabled (default: true)
sandbox_enabled = os.environ.get('QL_DISABLE_SANDBOX') != 'true'
def is_path_protected(target_path):
"""Check if a path is protected from write operations"""
if not sandbox_enabled:
return False
try:
# Resolve to absolute path and follow symlinks
resolved_path = os.path.realpath(target_path)
# Check if path is in a protected directory
for protected_path in protected_paths:
if resolved_path.startswith(protected_path + os.sep) or resolved_path == protected_path:
# Check if it's in an allowed subdirectory
is_in_allowed_path = any(
resolved_path.startswith(allowed_path + os.sep) or resolved_path == allowed_path
for allowed_path in allowed_write_paths
)
if not is_in_allowed_path:
return True
# Also check if trying to write inside ql_dir or data_dir without being in allowed paths
is_in_ql_dir = resolved_path.startswith(normalized_ql_dir + os.sep) or resolved_path == normalized_ql_dir
is_in_data_dir = resolved_path.startswith(normalized_data_dir + os.sep) or resolved_path == normalized_data_dir
if is_in_ql_dir or is_in_data_dir:
is_in_allowed_path = any(
resolved_path.startswith(allowed_path + os.sep) or resolved_path == allowed_path
for allowed_path in allowed_write_paths
)
if not is_in_allowed_path:
return True
return False
except:
# If path doesn't exist yet, check parent directory
parent_path = os.path.dirname(target_path)
if parent_path != target_path:
return is_path_protected(parent_path)
return False
def create_security_error(operation, target_path):
"""Create a security error for unauthorized file operations"""
return PermissionError(
f"Security Error: Script attempted to {operation} protected path: {target_path}\n"
f"Scripts are only allowed to write to: {', '.join(allowed_write_paths)}"
)
# Store original functions
original_open = builtins.open
original_os_remove = os.remove
original_os_unlink = os.unlink
original_os_rmdir = os.rmdir
original_os_mkdir = os.mkdir
original_os_makedirs = os.makedirs
original_os_rename = os.rename
original_os_replace = os.replace
original_os_chmod = os.chmod
original_os_chown = os.chown if hasattr(os, 'chown') else None
original_os_link = os.link if hasattr(os, 'link') else None
original_os_symlink = os.symlink if hasattr(os, 'symlink') else None
original_os_truncate = os.truncate if hasattr(os, 'truncate') else None
original_os_utime = os.utime if hasattr(os, 'utime') else None
# Wrap open() to check write operations
def sandboxed_open(file, mode='r', *args, **kwargs):
"""Wrapped open() that checks for protected paths on write operations"""
if sandbox_enabled and isinstance(mode, str) and any(m in mode for m in ['w', 'a', 'x', '+']):
if is_path_protected(file):
raise create_security_error('open for writing', file)
return original_open(file, mode, *args, **kwargs)
# Wrap os functions
def sandboxed_remove(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('remove', path)
return original_os_remove(path, *args, **kwargs)
def sandboxed_unlink(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('unlink', path)
return original_os_unlink(path, *args, **kwargs)
def sandboxed_rmdir(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('rmdir', path)
return original_os_rmdir(path, *args, **kwargs)
def sandboxed_mkdir(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('mkdir', path)
return original_os_mkdir(path, *args, **kwargs)
def sandboxed_makedirs(name, *args, **kwargs):
if sandbox_enabled and is_path_protected(name):
raise create_security_error('makedirs', name)
return original_os_makedirs(name, *args, **kwargs)
def sandboxed_rename(src, dst, *args, **kwargs):
if sandbox_enabled:
if is_path_protected(src):
raise create_security_error('rename (source)', src)
if is_path_protected(dst):
raise create_security_error('rename (destination)', dst)
return original_os_rename(src, dst, *args, **kwargs)
def sandboxed_replace(src, dst, *args, **kwargs):
if sandbox_enabled:
if is_path_protected(src):
raise create_security_error('replace (source)', src)
if is_path_protected(dst):
raise create_security_error('replace (destination)', dst)
return original_os_replace(src, dst, *args, **kwargs)
def sandboxed_chmod(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('chmod', path)
return original_os_chmod(path, *args, **kwargs)
def sandboxed_chown(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('chown', path)
return original_os_chown(path, *args, **kwargs)
def sandboxed_link(src, dst, *args, **kwargs):
if sandbox_enabled:
if is_path_protected(dst):
raise create_security_error('link', dst)
return original_os_link(src, dst, *args, **kwargs)
def sandboxed_symlink(src, dst, *args, **kwargs):
if sandbox_enabled:
if is_path_protected(dst):
raise create_security_error('symlink', dst)
return original_os_symlink(src, dst, *args, **kwargs)
def sandboxed_truncate(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('truncate', path)
return original_os_truncate(path, *args, **kwargs)
def sandboxed_utime(path, *args, **kwargs):
if sandbox_enabled and is_path_protected(path):
raise create_security_error('utime', path)
return original_os_utime(path, *args, **kwargs)
# Apply sandbox wrappers
if sandbox_enabled:
builtins.open = sandboxed_open
os.remove = sandboxed_remove
os.unlink = sandboxed_unlink
os.rmdir = sandboxed_rmdir
os.mkdir = sandboxed_mkdir
os.makedirs = sandboxed_makedirs
os.rename = sandboxed_rename
os.replace = sandboxed_replace
os.chmod = sandboxed_chmod
if original_os_chown:
os.chown = sandboxed_chown
if original_os_link:
os.link = sandboxed_link
if original_os_symlink:
os.symlink = sandboxed_symlink
if original_os_truncate:
os.truncate = sandboxed_truncate
if original_os_utime:
os.utime = sandboxed_utime
# Wrap shutil if it's imported
try:
import shutil
original_shutil_rmtree = shutil.rmtree
original_shutil_copy = shutil.copy
original_shutil_copy2 = shutil.copy2
original_shutil_copytree = shutil.copytree
original_shutil_move = shutil.move
def sandboxed_rmtree(path, *args, **kwargs):
if is_path_protected(path):
raise create_security_error('rmtree', path)
return original_shutil_rmtree(path, *args, **kwargs)
def sandboxed_copy(src, dst, *args, **kwargs):
if is_path_protected(dst):
raise create_security_error('copy', dst)
return original_shutil_copy(src, dst, *args, **kwargs)
def sandboxed_copy2(src, dst, *args, **kwargs):
if is_path_protected(dst):
raise create_security_error('copy2', dst)
return original_shutil_copy2(src, dst, *args, **kwargs)
def sandboxed_copytree(src, dst, *args, **kwargs):
if is_path_protected(dst):
raise create_security_error('copytree', dst)
return original_shutil_copytree(src, dst, *args, **kwargs)
def sandboxed_move(src, dst, *args, **kwargs):
if is_path_protected(src):
raise create_security_error('move (source)', src)
if is_path_protected(dst):
raise create_security_error('move (destination)', dst)
return original_shutil_move(src, dst, *args, **kwargs)
shutil.rmtree = sandboxed_rmtree
shutil.copy = sandboxed_copy
shutil.copy2 = sandboxed_copy2
shutil.copytree = sandboxed_copytree
shutil.move = sandboxed_move
except ImportError:
pass
# Wrap pathlib.Path if available
try:
original_path_write_text = Path.write_text
original_path_write_bytes = Path.write_bytes
original_path_touch = Path.touch
original_path_mkdir = Path.mkdir
original_path_rmdir = Path.rmdir
original_path_unlink = Path.unlink
original_path_rename = Path.rename
original_path_replace = Path.replace
original_path_chmod = Path.chmod
def sandboxed_path_write_text(self, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.write_text', str(self))
return original_path_write_text(self, *args, **kwargs)
def sandboxed_path_write_bytes(self, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.write_bytes', str(self))
return original_path_write_bytes(self, *args, **kwargs)
def sandboxed_path_touch(self, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.touch', str(self))
return original_path_touch(self, *args, **kwargs)
def sandboxed_path_mkdir(self, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.mkdir', str(self))
return original_path_mkdir(self, *args, **kwargs)
def sandboxed_path_rmdir(self, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.rmdir', str(self))
return original_path_rmdir(self, *args, **kwargs)
def sandboxed_path_unlink(self, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.unlink', str(self))
return original_path_unlink(self, *args, **kwargs)
def sandboxed_path_rename(self, target, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.rename (source)', str(self))
if is_path_protected(str(target)):
raise create_security_error('Path.rename (target)', str(target))
return original_path_rename(self, target, *args, **kwargs)
def sandboxed_path_replace(self, target, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.replace (source)', str(self))
if is_path_protected(str(target)):
raise create_security_error('Path.replace (target)', str(target))
return original_path_replace(self, target, *args, **kwargs)
def sandboxed_path_chmod(self, *args, **kwargs):
if is_path_protected(str(self)):
raise create_security_error('Path.chmod', str(self))
return original_path_chmod(self, *args, **kwargs)
Path.write_text = sandboxed_path_write_text
Path.write_bytes = sandboxed_path_write_bytes
Path.touch = sandboxed_path_touch
Path.mkdir = sandboxed_path_mkdir
Path.rmdir = sandboxed_path_rmdir
Path.unlink = sandboxed_path_unlink
Path.rename = sandboxed_path_rename
Path.replace = sandboxed_path_replace
Path.chmod = sandboxed_path_chmod
except:
pass

View File

@ -1,3 +1,6 @@
// Load sandbox first to protect filesystem
require('./sandbox.js');
const { execSync } = require('child_process');
const client = require('./client.js');
require(`./env.js`);

View File

@ -1,3 +1,6 @@
# Load sandbox first to protect filesystem
import sandbox
import os
import re
import subprocess

View File

@ -0,0 +1,43 @@
// Minimal test of sandbox module
const path = require('path');
// Set up environment
process.env.QL_DIR = path.join(__dirname, '../..');
process.env.QL_DATA_DIR = path.join(__dirname, '../../data');
// Load sandbox
require('./sandbox.js');
const fs = require('fs');
console.log("Testing filesystem sandbox...\n");
// Test 1: Try to write to config directory (should fail)
console.log("Test 1: Attempting to write to config/test.txt (should fail)...");
try {
const configPath = path.join(__dirname, '../../data/config/test.txt');
fs.writeFileSync(configPath, 'test');
console.log("❌ FAILED: Was able to write to protected config directory!");
process.exit(1);
} catch (error) {
if (error.code === 'EACCES' && error.message.includes('Security Error')) {
console.log("✅ PASSED: Correctly blocked write to config directory");
console.log("Error message:", error.message.split('\n')[0]);
} else {
console.log("❓ UNEXPECTED ERROR:", error.message);
}
}
// Test 2: Write to scripts directory (should succeed)
console.log("\nTest 2: Attempting to write to scripts directory (should succeed)...");
try {
const scriptsPath = path.join(__dirname, '../../data/scripts/test_output.txt');
fs.writeFileSync(scriptsPath, 'This is a test file');
console.log("✅ PASSED: Successfully wrote to scripts directory");
fs.unlinkSync(scriptsPath);
} catch (error) {
console.log("❌ FAILED: Could not write to allowed scripts directory:", error.message);
process.exit(1);
}
console.log("\n✅ Basic sandbox tests passed!");

View File

@ -0,0 +1,50 @@
#!/usr/bin/env python3
# Minimal test of Python sandbox module
import os
import sys
from pathlib import Path
# Set up environment
script_dir = Path(__file__).parent.resolve()
ql_dir = script_dir.parent.parent
os.environ['QL_DIR'] = str(ql_dir)
os.environ['QL_DATA_DIR'] = str(ql_dir / 'data')
# Add preload to path
sys.path.insert(0, str(script_dir))
# Load sandbox
import sandbox
print("Testing Python filesystem sandbox...\n")
# Test 1: Try to write to config directory (should fail)
print("Test 1: Attempting to write to config/test.txt (should fail)...")
try:
config_path = ql_dir / 'data' / 'config' / 'test.txt'
with open(config_path, 'w') as f:
f.write('test')
print("❌ FAILED: Was able to write to protected config directory!")
sys.exit(1)
except PermissionError as e:
if 'Security Error' in str(e):
print("✅ PASSED: Correctly blocked write to config directory")
print(f"Error message: {str(e).split(chr(10))[0]}")
else:
print(f"❓ UNEXPECTED ERROR: {e}")
except Exception as e:
print(f"❓ UNEXPECTED ERROR: {e}")
# Test 2: Write to scripts directory (should succeed)
print("\nTest 2: Attempting to write to scripts directory (should succeed)...")
try:
scripts_path = ql_dir / 'data' / 'scripts' / 'test_output.txt'
with open(scripts_path, 'w') as f:
f.write('This is a test file')
print("✅ PASSED: Successfully wrote to scripts directory")
os.remove(scripts_path)
except Exception as e:
print(f"❌ FAILED: Could not write to allowed scripts directory: {e}")
sys.exit(1)
print("\n✅ Basic Python sandbox tests passed!")