Coverage for lintro / plugins / subprocess_executor.py: 88%
105 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Subprocess execution utilities for tool plugins.
3This module provides safe subprocess execution with validation and streaming.
4"""
6from __future__ import annotations
8import contextlib
9import os
10import subprocess # nosec B404 - subprocess used safely with shell=False
11import sys
12import threading
13from typing import TYPE_CHECKING
15from loguru import logger
17if TYPE_CHECKING:
18 from collections.abc import Callable
20# Cache for compiled binary detection
21_IS_COMPILED_BINARY: bool | None = None
24def is_compiled_binary() -> bool:
25 """Detect if lintro is running as a Nuitka-compiled binary.
27 When compiled with Nuitka, sys.executable points to the lintro binary itself,
28 not a Python interpreter. We detect this by checking if we can import Nuitka
29 runtime modules or by checking the executable name.
31 Returns:
32 True if running as a compiled binary, False otherwise.
33 """
34 global _IS_COMPILED_BINARY
36 if _IS_COMPILED_BINARY is not None:
37 return _IS_COMPILED_BINARY
39 # Method 1: Check for Nuitka's __compiled__ marker
40 try:
41 # Nuitka sets __compiled__ at module level
42 import __main__
44 if getattr(__main__, "__compiled__", False):
45 _IS_COMPILED_BINARY = True
46 return True
47 except (ImportError, AttributeError):
48 pass
50 # Method 2: Check if sys.executable looks like our binary (not python)
51 exe_name = os.path.basename(sys.executable).lower()
52 if exe_name in ("lintro", "lintro.exe", "lintro.bin"):
53 _IS_COMPILED_BINARY = True
54 return True
56 # Method 3: Check if we're running from a Nuitka dist folder
57 exe_dir = os.path.dirname(sys.executable)
58 if "nuitka" in exe_dir.lower() or "__nuitka" in exe_dir.lower():
59 _IS_COMPILED_BINARY = True
60 return True
62 _IS_COMPILED_BINARY = False
63 return False
66# Shell metacharacters that could enable command injection or unexpected behavior.
67# Using frozenset for immutability and O(1) membership testing.
68UNSAFE_SHELL_CHARS: frozenset[str] = frozenset(
69 {
70 # Command chaining and piping
71 ";", # Command separator
72 "&", # Background execution / AND operator
73 "|", # Pipe
74 # Redirection
75 ">", # Output redirection
76 "<", # Input redirection
77 # Command substitution and expansion
78 "`", # Backtick command substitution
79 "$", # Variable expansion / command substitution
80 # Escape and control characters
81 "\\", # Escape character
82 "\n", # Newline (command separator in some contexts)
83 "\r", # Carriage return
84 # Glob and pattern matching
85 "*", # Glob wildcard (match any)
86 "?", # Glob wildcard (match single char)
87 "[", # Character class start
88 "]", # Character class end
89 # Brace and subshell expansion
90 "{", # Brace expansion start
91 "}", # Brace expansion end
92 "(", # Subshell start
93 ")", # Subshell end
94 # Other shell special characters
95 "~", # Home directory expansion
96 "!", # History expansion
97 },
98)
101def validate_subprocess_command(cmd: list[str]) -> None:
102 """Validate a subprocess command for safety.
104 Since lintro uses shell=False for all subprocess calls, command arguments
105 are passed directly to the executable without shell interpretation. This
106 means characters like $, *, {, } in arguments are safe - they won't be
107 expanded by the shell.
109 We only validate the command name (first element) to ensure it doesn't
110 contain shell metacharacters that could indicate a path traversal or
111 injection attempt.
113 Args:
114 cmd: Command and arguments to validate.
116 Raises:
117 ValueError: If command is invalid or the command name contains
118 unsafe characters.
119 """
120 if not cmd or not isinstance(cmd, list):
121 raise ValueError("Command must be a non-empty list of strings")
123 for arg in cmd:
124 if not isinstance(arg, str):
125 raise ValueError("All command arguments must be strings")
127 # Only validate the command name (first element) for shell metacharacters.
128 # Arguments are safe with shell=False as they're passed literally.
129 if any(ch in cmd[0] for ch in UNSAFE_SHELL_CHARS):
130 raise ValueError("Unsafe character detected in command name")
133def run_subprocess(
134 cmd: list[str],
135 timeout: float,
136 cwd: str | None = None,
137 env: dict[str, str] | None = None,
138) -> tuple[bool, str]:
139 """Run a subprocess command safely.
141 Args:
142 cmd: Command and arguments to run.
143 timeout: Timeout in seconds.
144 cwd: Working directory for command execution.
145 env: Environment variables for the subprocess. These are merged with
146 os.environ to preserve PATH and other essential variables.
148 Returns:
149 Tuple of (success, output) where success indicates return code 0.
151 Raises:
152 subprocess.TimeoutExpired: If command times out.
153 FileNotFoundError: If command executable is not found.
154 """
155 validate_subprocess_command(cmd)
157 cmd_str = " ".join(cmd[:5]) + ("..." if len(cmd) > 5 else "")
158 logger.debug(f"Running subprocess: {cmd_str} (timeout={timeout}s, cwd={cwd})")
160 # Merge custom env with os.environ to preserve PATH, HOME, etc.
161 # Custom env values override os.environ when there are conflicts.
162 effective_env: dict[str, str] | None = None
163 if env is not None:
164 effective_env = {**os.environ, **env}
166 try:
167 result = subprocess.run( # nosec B603 - args list, shell=False
168 cmd,
169 capture_output=True,
170 text=True,
171 timeout=timeout,
172 cwd=cwd,
173 env=effective_env,
174 )
176 if result.returncode != 0:
177 stderr_preview = (result.stderr or "")[:500]
178 if stderr_preview:
179 logger.debug(
180 f"Subprocess {cmd[0]} exited with code {result.returncode}, "
181 f"stderr: {stderr_preview}",
182 )
184 return result.returncode == 0, result.stdout + result.stderr
185 except subprocess.TimeoutExpired as e:
186 logger.warning(f"Subprocess {cmd[0]} timed out after {timeout}s")
187 # Preserve partial output from the original exception
188 partial_output = ""
189 if e.output:
190 partial_output = (
191 e.output
192 if isinstance(e.output, str)
193 else e.output.decode(errors="replace")
194 )
195 if e.stderr:
196 stderr = (
197 e.stderr
198 if isinstance(e.stderr, str)
199 else e.stderr.decode(errors="replace")
200 )
201 partial_output = partial_output + stderr if partial_output else stderr
202 raise subprocess.TimeoutExpired(
203 cmd=cmd,
204 timeout=timeout,
205 output=partial_output,
206 ) from e
207 except FileNotFoundError as e:
208 logger.warning(
209 f"Command not found: {cmd[0]}. Ensure it is installed and in PATH.",
210 )
211 raise FileNotFoundError(
212 f"Command not found: {cmd[0]}. "
213 f"Please ensure it is installed and in your PATH.",
214 ) from e
217def run_subprocess_streaming(
218 cmd: list[str],
219 timeout: float,
220 cwd: str | None = None,
221 env: dict[str, str] | None = None,
222 line_handler: Callable[[str], None] | None = None,
223) -> tuple[bool, str]:
224 """Run a subprocess command with optional line-by-line streaming.
226 This function allows real-time output processing by calling the line_handler
227 callback for each line of output as it is produced by the subprocess.
229 The timeout is enforced during both output reading and process completion,
230 preventing indefinite blocking on slow or hanging processes.
232 Args:
233 cmd: Command and arguments to run.
234 timeout: Timeout in seconds.
235 cwd: Working directory for command execution.
236 env: Environment variables for the subprocess. These are merged with
237 os.environ to preserve PATH and other essential variables.
238 line_handler: Optional callback called for each line of output.
240 Returns:
241 Tuple of (success, output) where success indicates return code 0.
243 Raises:
244 subprocess.TimeoutExpired: If command times out.
245 FileNotFoundError: If command executable is not found.
246 """
247 validate_subprocess_command(cmd)
249 cmd_str = " ".join(cmd[:5]) + ("..." if len(cmd) > 5 else "")
250 logger.debug(
251 f"Running subprocess (streaming): {cmd_str} (timeout={timeout}s, cwd={cwd})",
252 )
254 # Merge custom env with os.environ to preserve PATH, HOME, etc.
255 # Custom env values override os.environ when there are conflicts.
256 effective_env: dict[str, str] | None = None
257 if env is not None:
258 effective_env = {**os.environ, **env}
260 try:
261 # Use Popen for streaming output # nosec B603
262 process = subprocess.Popen(
263 cmd,
264 stdout=subprocess.PIPE,
265 stderr=subprocess.STDOUT,
266 text=True,
267 cwd=cwd,
268 env=effective_env,
269 bufsize=1, # Line buffering
270 )
272 output_lines: list[str] = []
274 def read_output() -> None:
275 """Read output lines in a separate thread."""
276 if process.stdout:
277 for line in process.stdout:
278 stripped = line.rstrip("\n")
279 output_lines.append(stripped)
280 if line_handler:
281 line_handler(stripped)
283 # Use a thread to read output so we can enforce timeout
284 reader_thread = threading.Thread(target=read_output, daemon=True)
285 reader_thread.start()
286 reader_thread.join(timeout=timeout)
288 if reader_thread.is_alive():
289 # Timeout occurred during reading - kill the process
290 logger.warning(
291 f"Subprocess {cmd[0]} timed out after {timeout}s (reading output)",
292 )
293 process.kill()
294 # Brief timeout for cleanup; ignore if process doesn't die cleanly
295 with contextlib.suppress(subprocess.TimeoutExpired):
296 process.wait(timeout=1.0)
297 raise subprocess.TimeoutExpired(
298 cmd=cmd,
299 timeout=timeout,
300 output="\n".join(output_lines),
301 )
303 # Reading completed, now wait for process to finish
304 try:
305 returncode = process.wait(timeout=timeout)
306 except subprocess.TimeoutExpired as e:
307 logger.warning(
308 f"Subprocess {cmd[0]} timed out after {timeout}s (during wait)",
309 )
310 process.kill()
311 process.wait(timeout=1.0)
312 raise subprocess.TimeoutExpired(
313 cmd=cmd,
314 timeout=timeout,
315 output="\n".join(output_lines),
316 ) from e
318 if returncode != 0:
319 output_preview = "\n".join(output_lines)[:500]
320 if output_preview:
321 logger.debug(
322 f"Subprocess {cmd[0]} exited with code {returncode}, "
323 f"output: {output_preview}",
324 )
326 return returncode == 0, "\n".join(output_lines)
328 except FileNotFoundError as e:
329 logger.warning(
330 f"Command not found: {cmd[0]}. Ensure it is installed and in PATH.",
331 )
332 raise FileNotFoundError(
333 f"Command not found: {cmd[0]}. "
334 f"Please ensure it is installed and in your PATH.",
335 ) from e