Coverage for lintro / tools / core / tool_installer.py: 68%
177 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Tool installation planning and execution.
3Handles installing, upgrading, and managing external tools used by lintro.
4Delegates to the appropriate package manager (pip, npm, cargo, rustup, or
5install-tools.sh for binary downloads) based on the tool's install type.
7Usage:
8 from lintro.tools.core.tool_installer import ToolInstaller
9 from lintro.tools.core.tool_registry import ToolRegistry
10 from lintro.tools.core.install_context import RuntimeContext
12 registry = ToolRegistry.load()
13 context = RuntimeContext.detect()
14 installer = ToolInstaller(registry, context)
16 plan = installer.plan(tools=["hadolint", "gitleaks"])
17 results = installer.execute(plan)
18"""
20from __future__ import annotations
22import shlex
23import shutil
24import subprocess
25import time
26from pathlib import Path
28from loguru import logger
30from lintro.tools.core.install_context import RuntimeContext
31from lintro.tools.core.install_plan import InstallPlan, InstallResult
32from lintro.tools.core.install_strategies import get_strategy
33from lintro.tools.core.tool_registry import ManifestTool, ToolRegistry
34from lintro.tools.core.version_parsing import (
35 compare_versions,
36 extract_version_from_output,
37)
39# Re-export so existing ``from lintro.tools.core.tool_installer import InstallPlan``
40# continues to work.
41__all__ = [
42 "InstallPlan",
43 "InstallResult",
44 "ToolInstaller",
45]
48class ToolInstaller:
49 """Plans and executes tool installations.
51 Uses the RuntimeContext to generate appropriate install commands for the
52 current platform and installation method.
53 """
55 def __init__(
56 self,
57 registry: ToolRegistry,
58 context: RuntimeContext,
59 ) -> None:
60 """Initialize the installer with registry and context."""
61 self._registry = registry
62 self._context = context
64 def plan(
65 self,
66 tools: list[str] | None = None,
67 *,
68 profile: str | None = None,
69 upgrade: bool = False,
70 detected_langs: list[str] | None = None,
71 ) -> InstallPlan:
72 """Create an installation plan.
74 Args:
75 tools: Specific tool names to install. If None, uses profile.
76 profile: Profile name to resolve tools from.
77 upgrade: If True, upgrade already-installed tools.
78 detected_langs: Detected languages for profile resolution.
80 Returns:
81 InstallPlan describing what will happen.
82 """
83 plan = InstallPlan()
85 # Determine which tools to consider
86 if tools is not None:
87 tools = list(dict.fromkeys(tools)) # deduplicate, preserve order
88 unknown = [n for n in tools if n not in self._registry]
89 if unknown:
90 logger.warning(
91 "Unknown tools (not in registry): {}",
92 ", ".join(unknown),
93 )
94 tool_list = [self._registry.get(n) for n in tools if n in self._registry]
95 elif profile:
96 tool_list = self._registry.tools_for_profile(
97 profile,
98 detected_langs,
99 )
100 else:
101 tool_list = self._registry.all_tools()
103 for tool in tool_list:
104 self._plan_tool(plan, tool, upgrade=upgrade)
106 return plan
108 @staticmethod
109 def _is_manual_hint(hint: str) -> bool:
110 """Check if an install hint is a human-only message, not an executable command.
112 Args:
113 hint: Install/upgrade command string.
115 Returns:
116 True if the hint requires manual action.
117 """
118 return (
119 hint.startswith(("See ", "Install ", "Upgrade "))
120 or "https://" in hint
121 or "http://" in hint
122 )
124 def _plan_tool(
125 self,
126 plan: InstallPlan,
127 tool: ManifestTool,
128 *,
129 upgrade: bool,
130 ) -> None:
131 """Plan installation for a single tool.
133 Args:
134 plan: Plan to add to.
135 tool: Tool to plan for.
136 upgrade: Whether to upgrade if already installed.
137 """
138 # Check current installation status first — tool may already be on PATH
139 # even if its package manager isn't available
140 installed_version = self._get_installed_version(tool)
142 if installed_version:
143 is_current = self._version_meets_minimum(
144 installed_version,
145 tool.version,
146 )
147 if is_current:
148 plan.already_ok.append(tool)
149 elif upgrade:
150 skip_reason = self._check_prerequisites(tool)
151 if skip_reason:
152 plan.skipped.append((tool, skip_reason))
153 return
154 hint = self._get_install_command(tool, upgrade=True)
155 if self._is_manual_hint(hint):
156 if self._has_install_script(tool):
157 hint = f"via install-tools.sh ({tool.name})"
158 else:
159 plan.skipped.append(
160 (tool, f"manual upgrade required: {hint}"),
161 )
162 return
163 plan.to_upgrade.append((tool, installed_version, hint))
164 else:
165 plan.outdated.append((tool, installed_version))
166 return
168 # Only check prerequisites when we need to install
169 skip_reason = self._check_prerequisites(tool)
170 if skip_reason:
171 plan.skipped.append((tool, skip_reason))
172 return
174 hint = self._get_install_command(tool)
175 if self._is_manual_hint(hint):
176 if self._has_install_script(tool):
177 hint = f"via install-tools.sh ({tool.name})"
178 else:
179 plan.skipped.append((tool, f"manual install required: {hint}"))
180 return
181 plan.to_install.append((tool, hint))
183 def _check_prerequisites(self, tool: ManifestTool) -> str | None:
184 """Check if prerequisites for installing a tool are met.
186 Delegates to the install strategy for the tool's install_type.
188 Args:
189 tool: Tool to check.
191 Returns:
192 Skip reason string, or None if prerequisites are met.
193 """
194 strategy = get_strategy(tool.install_type)
195 if strategy is None:
196 return None
197 return strategy.check_prerequisites(self._context.environment, tool.name)
199 def _get_installed_version(self, tool: ManifestTool) -> str | None:
200 """Get the currently installed version of a tool.
202 Args:
203 tool: Tool to check.
205 Returns:
206 Version string or None if not installed.
207 """
208 if not tool.version_command:
209 return None
211 main_cmd = tool.version_command[0]
212 if main_cmd not in ("sh", "bash", "cargo") and not shutil.which(main_cmd):
213 return None
215 try:
216 result = subprocess.run(
217 tool.version_command,
218 capture_output=True,
219 text=True,
220 timeout=10,
221 check=False,
222 )
223 if result.returncode != 0:
224 return None
225 output = result.stdout + result.stderr
226 return extract_version_from_output(output, tool.name)
227 except (subprocess.TimeoutExpired, OSError):
228 return None
230 @staticmethod
231 def _version_meets_minimum(installed: str, minimum: str) -> bool:
232 """Check if installed version meets the minimum requirement.
234 Delegates to version_parsing.compare_versions which uses the
235 packaging library for robust PEP 440 version comparison.
237 Args:
238 installed: Installed version string.
239 minimum: Minimum required version string.
241 Returns:
242 True if installed >= minimum.
243 """
244 try:
245 return compare_versions(installed, minimum) >= 0
246 except ValueError as exc:
247 logger.debug(
248 f"Version comparison failed for {installed!r} vs {minimum!r}: {exc}",
249 )
250 return False
252 def _get_install_command(
253 self,
254 tool: ManifestTool,
255 *,
256 upgrade: bool = False,
257 ) -> str:
258 """Get the install command string for a tool.
260 Delegates to the install strategy for the tool's install_type.
262 Args:
263 tool: Tool to generate command for.
264 upgrade: If True, generate an upgrade command.
266 Returns:
267 Shell command string.
268 """
269 strategy = get_strategy(tool.install_type)
270 env = self._context.environment
271 _args = (
272 env,
273 tool.name,
274 tool.version,
275 tool.install_package,
276 tool.install_component,
277 )
278 if strategy is None:
279 return (
280 f"Upgrade {tool.name} manually"
281 if upgrade
282 else f"Install {tool.name} manually"
283 )
284 if upgrade:
285 hint = strategy.upgrade_hint(*_args)
286 # For brew upgrades, validate that brew actually manages this
287 # package — if not, use the non-brew install command instead
288 # (strategies may prefer brew when available, so we can't just
289 # call install_hint which might also suggest brew).
290 if hint.startswith("brew upgrade"):
291 brew_pkg = hint.split()[-1] if hint.split() else tool.name
292 if not self._is_brew_managed(brew_pkg):
293 pkg = tool.install_package or tool.name
294 hint = f"Upgrade {pkg} manually (not managed by Homebrew)"
295 return hint
296 return strategy.install_hint(*_args)
298 @staticmethod
299 def _is_brew_managed(package: str) -> bool:
300 """Check if a package is installed via Homebrew.
302 Args:
303 package: Homebrew formula name.
305 Returns:
306 True if brew manages this package.
307 """
308 if not shutil.which("brew"):
309 return False
310 try:
311 result = subprocess.run(
312 ["brew", "list", "--formula", package],
313 capture_output=True,
314 timeout=10,
315 check=False,
316 )
317 return result.returncode == 0
318 except (subprocess.TimeoutExpired, OSError):
319 return False
321 def execute(self, plan: InstallPlan) -> list[InstallResult]:
322 """Execute an installation plan.
324 Args:
325 plan: The plan to execute.
327 Returns:
328 List of results for each install/upgrade action.
329 """
330 results: list[InstallResult] = []
332 for tool, command in plan.to_install:
333 result = self._run_install(tool, command)
334 results.append(result)
336 for tool, _current_ver, command in plan.to_upgrade:
337 result = self._run_install(tool, command)
338 results.append(result)
340 return results
342 def _run_install(
343 self,
344 tool: ManifestTool,
345 command: str,
346 ) -> InstallResult:
347 """Run an install command for a tool.
349 Args:
350 tool: Tool being installed.
351 command: Shell command string.
353 Returns:
354 InstallResult.
355 """
356 logger.info(f"Installing {tool.name}: {command}")
357 start = time.monotonic()
359 try:
360 # Script-backed installs: the planner sets "via install-tools.sh"
361 # when a helper script is available for binary tools
362 if command.startswith("via install-tools.sh"):
363 result = self._install_via_script(tool)
364 if result:
365 return result
366 return InstallResult(
367 tool=tool,
368 success=False,
369 message="install-tools.sh not found",
370 duration_seconds=time.monotonic() - start,
371 )
373 # Non-executable hints: try install script for binary tools,
374 # otherwise report as manual
375 if self._is_manual_hint(command):
376 if tool.install_type == "binary":
377 result = self._install_via_script(tool)
378 if result:
379 return result
380 return InstallResult(
381 tool=tool,
382 success=False,
383 message=f"Manual install required: {command}",
384 duration_seconds=0.0,
385 )
387 # Otherwise run the command directly
388 proc = subprocess.run(
389 shlex.split(command),
390 capture_output=True,
391 text=True,
392 timeout=300,
393 check=False,
394 )
395 duration = time.monotonic() - start
397 if proc.returncode == 0:
398 return InstallResult(
399 tool=tool,
400 success=True,
401 message="Installed successfully",
402 duration_seconds=duration,
403 )
404 return InstallResult(
405 tool=tool,
406 success=False,
407 message=f"Command failed (exit {proc.returncode}): {proc.stderr[:200]}",
408 duration_seconds=duration,
409 )
410 except subprocess.TimeoutExpired:
411 return InstallResult(
412 tool=tool,
413 success=False,
414 message="Installation timed out (5 min)",
415 duration_seconds=time.monotonic() - start,
416 )
417 except OSError as e:
418 return InstallResult(
419 tool=tool,
420 success=False,
421 message=f"OS error: {e}",
422 duration_seconds=time.monotonic() - start,
423 )
425 @staticmethod
426 def _has_install_script(tool: ManifestTool) -> bool:
427 """Check if an install script exists for a binary tool.
429 Reuses the same script lookup as _install_via_script.
431 Args:
432 tool: Tool to check.
434 Returns:
435 True if a script can handle this tool.
436 """
437 if tool.install_type != "binary":
438 return False
439 if not shutil.which("bash"):
440 return False
441 script = (
442 Path(__file__).parent.parent.parent.parent
443 / "scripts"
444 / "utils"
445 / "install-tools.sh"
446 )
447 return script.exists()
449 def _install_via_script(self, tool: ManifestTool) -> InstallResult | None:
450 """Try to install a binary tool via install-tools.sh.
452 Args:
453 tool: Binary tool to install.
455 Returns:
456 InstallResult if script was found and executed, None otherwise.
457 """
458 # Look for install-tools.sh relative to the lintro package
459 script_paths = [
460 Path(__file__).parent.parent.parent.parent
461 / "scripts"
462 / "utils"
463 / "install-tools.sh",
464 ]
466 script = None
467 for p in script_paths:
468 if p.exists():
469 script = p
470 break
472 if not script:
473 logger.debug(
474 "install-tools.sh not found for binary install "
475 "(only available in dev/Homebrew installs, not pip)",
476 )
477 return None
479 tool_arg = tool.name.replace("_", "-")
480 cmd = ["bash", str(script), "--tools", tool_arg]
482 start = time.monotonic()
483 try:
484 proc = subprocess.run(
485 cmd,
486 capture_output=True,
487 text=True,
488 check=False,
489 timeout=300,
490 )
491 duration = time.monotonic() - start
493 if proc.returncode == 0:
494 return InstallResult(
495 tool=tool,
496 success=True,
497 message="Installed via install-tools.sh",
498 duration_seconds=duration,
499 )
500 return InstallResult(
501 tool=tool,
502 success=False,
503 message=f"install-tools.sh failed: {proc.stderr[:200]}",
504 duration_seconds=duration,
505 )
506 except (subprocess.TimeoutExpired, OSError) as exc:
507 return InstallResult(
508 tool=tool,
509 success=False,
510 message=f"install-tools.sh execution failed: {exc}",
511 duration_seconds=time.monotonic() - start,
512 )