Coverage for lintro / plugins / subprocess_executor.py: 88%

105 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Subprocess execution utilities for tool plugins. 

2 

3This module provides safe subprocess execution with validation and streaming. 

4""" 

5 

6from __future__ import annotations 

7 

8import contextlib 

9import os 

10import subprocess # nosec B404 - subprocess used safely with shell=False 

11import sys 

12import threading 

13from typing import TYPE_CHECKING 

14 

15from loguru import logger 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Callable 

19 

20# Cache for compiled binary detection 

21_IS_COMPILED_BINARY: bool | None = None 

22 

23 

24def is_compiled_binary() -> bool: 

25 """Detect if lintro is running as a Nuitka-compiled binary. 

26 

27 When compiled with Nuitka, sys.executable points to the lintro binary itself, 

28 not a Python interpreter. We detect this by checking if we can import Nuitka 

29 runtime modules or by checking the executable name. 

30 

31 Returns: 

32 True if running as a compiled binary, False otherwise. 

33 """ 

34 global _IS_COMPILED_BINARY 

35 

36 if _IS_COMPILED_BINARY is not None: 

37 return _IS_COMPILED_BINARY 

38 

39 # Method 1: Check for Nuitka's __compiled__ marker 

40 try: 

41 # Nuitka sets __compiled__ at module level 

42 import __main__ 

43 

44 if getattr(__main__, "__compiled__", False): 

45 _IS_COMPILED_BINARY = True 

46 return True 

47 except (ImportError, AttributeError): 

48 pass 

49 

50 # Method 2: Check if sys.executable looks like our binary (not python) 

51 exe_name = os.path.basename(sys.executable).lower() 

52 if exe_name in ("lintro", "lintro.exe", "lintro.bin"): 

53 _IS_COMPILED_BINARY = True 

54 return True 

55 

56 # Method 3: Check if we're running from a Nuitka dist folder 

57 exe_dir = os.path.dirname(sys.executable) 

58 if "nuitka" in exe_dir.lower() or "__nuitka" in exe_dir.lower(): 

59 _IS_COMPILED_BINARY = True 

60 return True 

61 

62 _IS_COMPILED_BINARY = False 

63 return False 

64 

65 

66# Shell metacharacters that could enable command injection or unexpected behavior. 

67# Using frozenset for immutability and O(1) membership testing. 

68UNSAFE_SHELL_CHARS: frozenset[str] = frozenset( 

69 { 

70 # Command chaining and piping 

71 ";", # Command separator 

72 "&", # Background execution / AND operator 

73 "|", # Pipe 

74 # Redirection 

75 ">", # Output redirection 

76 "<", # Input redirection 

77 # Command substitution and expansion 

78 "`", # Backtick command substitution 

79 "$", # Variable expansion / command substitution 

80 # Escape and control characters 

81 "\\", # Escape character 

82 "\n", # Newline (command separator in some contexts) 

83 "\r", # Carriage return 

84 # Glob and pattern matching 

85 "*", # Glob wildcard (match any) 

86 "?", # Glob wildcard (match single char) 

87 "[", # Character class start 

88 "]", # Character class end 

89 # Brace and subshell expansion 

90 "{", # Brace expansion start 

91 "}", # Brace expansion end 

92 "(", # Subshell start 

93 ")", # Subshell end 

94 # Other shell special characters 

95 "~", # Home directory expansion 

96 "!", # History expansion 

97 }, 

98) 

99 

100 

101def validate_subprocess_command(cmd: list[str]) -> None: 

102 """Validate a subprocess command for safety. 

103 

104 Since lintro uses shell=False for all subprocess calls, command arguments 

105 are passed directly to the executable without shell interpretation. This 

106 means characters like $, *, {, } in arguments are safe - they won't be 

107 expanded by the shell. 

108 

109 We only validate the command name (first element) to ensure it doesn't 

110 contain shell metacharacters that could indicate a path traversal or 

111 injection attempt. 

112 

113 Args: 

114 cmd: Command and arguments to validate. 

115 

116 Raises: 

117 ValueError: If command is invalid or the command name contains 

118 unsafe characters. 

119 """ 

120 if not cmd or not isinstance(cmd, list): 

121 raise ValueError("Command must be a non-empty list of strings") 

122 

123 for arg in cmd: 

124 if not isinstance(arg, str): 

125 raise ValueError("All command arguments must be strings") 

126 

127 # Only validate the command name (first element) for shell metacharacters. 

128 # Arguments are safe with shell=False as they're passed literally. 

129 if any(ch in cmd[0] for ch in UNSAFE_SHELL_CHARS): 

130 raise ValueError("Unsafe character detected in command name") 

131 

132 

133def run_subprocess( 

134 cmd: list[str], 

135 timeout: float, 

136 cwd: str | None = None, 

137 env: dict[str, str] | None = None, 

138) -> tuple[bool, str]: 

139 """Run a subprocess command safely. 

140 

141 Args: 

142 cmd: Command and arguments to run. 

143 timeout: Timeout in seconds. 

144 cwd: Working directory for command execution. 

145 env: Environment variables for the subprocess. These are merged with 

146 os.environ to preserve PATH and other essential variables. 

147 

148 Returns: 

149 Tuple of (success, output) where success indicates return code 0. 

150 

151 Raises: 

152 subprocess.TimeoutExpired: If command times out. 

153 FileNotFoundError: If command executable is not found. 

154 """ 

155 validate_subprocess_command(cmd) 

156 

157 cmd_str = " ".join(cmd[:5]) + ("..." if len(cmd) > 5 else "") 

158 logger.debug(f"Running subprocess: {cmd_str} (timeout={timeout}s, cwd={cwd})") 

159 

160 # Merge custom env with os.environ to preserve PATH, HOME, etc. 

161 # Custom env values override os.environ when there are conflicts. 

162 effective_env: dict[str, str] | None = None 

163 if env is not None: 

164 effective_env = {**os.environ, **env} 

165 

166 try: 

167 result = subprocess.run( # nosec B603 - args list, shell=False 

168 cmd, 

169 capture_output=True, 

170 text=True, 

171 timeout=timeout, 

172 cwd=cwd, 

173 env=effective_env, 

174 ) 

175 

176 if result.returncode != 0: 

177 stderr_preview = (result.stderr or "")[:500] 

178 if stderr_preview: 

179 logger.debug( 

180 f"Subprocess {cmd[0]} exited with code {result.returncode}, " 

181 f"stderr: {stderr_preview}", 

182 ) 

183 

184 return result.returncode == 0, result.stdout + result.stderr 

185 except subprocess.TimeoutExpired as e: 

186 logger.warning(f"Subprocess {cmd[0]} timed out after {timeout}s") 

187 # Preserve partial output from the original exception 

188 partial_output = "" 

189 if e.output: 

190 partial_output = ( 

191 e.output 

192 if isinstance(e.output, str) 

193 else e.output.decode(errors="replace") 

194 ) 

195 if e.stderr: 

196 stderr = ( 

197 e.stderr 

198 if isinstance(e.stderr, str) 

199 else e.stderr.decode(errors="replace") 

200 ) 

201 partial_output = partial_output + stderr if partial_output else stderr 

202 raise subprocess.TimeoutExpired( 

203 cmd=cmd, 

204 timeout=timeout, 

205 output=partial_output, 

206 ) from e 

207 except FileNotFoundError as e: 

208 logger.warning( 

209 f"Command not found: {cmd[0]}. Ensure it is installed and in PATH.", 

210 ) 

211 raise FileNotFoundError( 

212 f"Command not found: {cmd[0]}. " 

213 f"Please ensure it is installed and in your PATH.", 

214 ) from e 

215 

216 

217def run_subprocess_streaming( 

218 cmd: list[str], 

219 timeout: float, 

220 cwd: str | None = None, 

221 env: dict[str, str] | None = None, 

222 line_handler: Callable[[str], None] | None = None, 

223) -> tuple[bool, str]: 

224 """Run a subprocess command with optional line-by-line streaming. 

225 

226 This function allows real-time output processing by calling the line_handler 

227 callback for each line of output as it is produced by the subprocess. 

228 

229 The timeout is enforced during both output reading and process completion, 

230 preventing indefinite blocking on slow or hanging processes. 

231 

232 Args: 

233 cmd: Command and arguments to run. 

234 timeout: Timeout in seconds. 

235 cwd: Working directory for command execution. 

236 env: Environment variables for the subprocess. These are merged with 

237 os.environ to preserve PATH and other essential variables. 

238 line_handler: Optional callback called for each line of output. 

239 

240 Returns: 

241 Tuple of (success, output) where success indicates return code 0. 

242 

243 Raises: 

244 subprocess.TimeoutExpired: If command times out. 

245 FileNotFoundError: If command executable is not found. 

246 """ 

247 validate_subprocess_command(cmd) 

248 

249 cmd_str = " ".join(cmd[:5]) + ("..." if len(cmd) > 5 else "") 

250 logger.debug( 

251 f"Running subprocess (streaming): {cmd_str} (timeout={timeout}s, cwd={cwd})", 

252 ) 

253 

254 # Merge custom env with os.environ to preserve PATH, HOME, etc. 

255 # Custom env values override os.environ when there are conflicts. 

256 effective_env: dict[str, str] | None = None 

257 if env is not None: 

258 effective_env = {**os.environ, **env} 

259 

260 try: 

261 # Use Popen for streaming output # nosec B603 

262 process = subprocess.Popen( 

263 cmd, 

264 stdout=subprocess.PIPE, 

265 stderr=subprocess.STDOUT, 

266 text=True, 

267 cwd=cwd, 

268 env=effective_env, 

269 bufsize=1, # Line buffering 

270 ) 

271 

272 output_lines: list[str] = [] 

273 

274 def read_output() -> None: 

275 """Read output lines in a separate thread.""" 

276 if process.stdout: 

277 for line in process.stdout: 

278 stripped = line.rstrip("\n") 

279 output_lines.append(stripped) 

280 if line_handler: 

281 line_handler(stripped) 

282 

283 # Use a thread to read output so we can enforce timeout 

284 reader_thread = threading.Thread(target=read_output, daemon=True) 

285 reader_thread.start() 

286 reader_thread.join(timeout=timeout) 

287 

288 if reader_thread.is_alive(): 

289 # Timeout occurred during reading - kill the process 

290 logger.warning( 

291 f"Subprocess {cmd[0]} timed out after {timeout}s (reading output)", 

292 ) 

293 process.kill() 

294 # Brief timeout for cleanup; ignore if process doesn't die cleanly 

295 with contextlib.suppress(subprocess.TimeoutExpired): 

296 process.wait(timeout=1.0) 

297 raise subprocess.TimeoutExpired( 

298 cmd=cmd, 

299 timeout=timeout, 

300 output="\n".join(output_lines), 

301 ) 

302 

303 # Reading completed, now wait for process to finish 

304 try: 

305 returncode = process.wait(timeout=timeout) 

306 except subprocess.TimeoutExpired as e: 

307 logger.warning( 

308 f"Subprocess {cmd[0]} timed out after {timeout}s (during wait)", 

309 ) 

310 process.kill() 

311 process.wait(timeout=1.0) 

312 raise subprocess.TimeoutExpired( 

313 cmd=cmd, 

314 timeout=timeout, 

315 output="\n".join(output_lines), 

316 ) from e 

317 

318 if returncode != 0: 

319 output_preview = "\n".join(output_lines)[:500] 

320 if output_preview: 

321 logger.debug( 

322 f"Subprocess {cmd[0]} exited with code {returncode}, " 

323 f"output: {output_preview}", 

324 ) 

325 

326 return returncode == 0, "\n".join(output_lines) 

327 

328 except FileNotFoundError as e: 

329 logger.warning( 

330 f"Command not found: {cmd[0]}. Ensure it is installed and in PATH.", 

331 ) 

332 raise FileNotFoundError( 

333 f"Command not found: {cmd[0]}. " 

334 f"Please ensure it is installed and in your PATH.", 

335 ) from e