Coverage for lintro / tools / core / tool_installer.py: 68%

177 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Tool installation planning and execution. 

2 

3Handles installing, upgrading, and managing external tools used by lintro. 

4Delegates to the appropriate package manager (pip, npm, cargo, rustup, or 

5install-tools.sh for binary downloads) based on the tool's install type. 

6 

7Usage: 

8 from lintro.tools.core.tool_installer import ToolInstaller 

9 from lintro.tools.core.tool_registry import ToolRegistry 

10 from lintro.tools.core.install_context import RuntimeContext 

11 

12 registry = ToolRegistry.load() 

13 context = RuntimeContext.detect() 

14 installer = ToolInstaller(registry, context) 

15 

16 plan = installer.plan(tools=["hadolint", "gitleaks"]) 

17 results = installer.execute(plan) 

18""" 

19 

20from __future__ import annotations 

21 

22import shlex 

23import shutil 

24import subprocess 

25import time 

26from pathlib import Path 

27 

28from loguru import logger 

29 

30from lintro.tools.core.install_context import RuntimeContext 

31from lintro.tools.core.install_plan import InstallPlan, InstallResult 

32from lintro.tools.core.install_strategies import get_strategy 

33from lintro.tools.core.tool_registry import ManifestTool, ToolRegistry 

34from lintro.tools.core.version_parsing import ( 

35 compare_versions, 

36 extract_version_from_output, 

37) 

38 

39# Re-export so existing ``from lintro.tools.core.tool_installer import InstallPlan`` 

40# continues to work. 

41__all__ = [ 

42 "InstallPlan", 

43 "InstallResult", 

44 "ToolInstaller", 

45] 

46 

47 

48class ToolInstaller: 

49 """Plans and executes tool installations. 

50 

51 Uses the RuntimeContext to generate appropriate install commands for the 

52 current platform and installation method. 

53 """ 

54 

55 def __init__( 

56 self, 

57 registry: ToolRegistry, 

58 context: RuntimeContext, 

59 ) -> None: 

60 """Initialize the installer with registry and context.""" 

61 self._registry = registry 

62 self._context = context 

63 

64 def plan( 

65 self, 

66 tools: list[str] | None = None, 

67 *, 

68 profile: str | None = None, 

69 upgrade: bool = False, 

70 detected_langs: list[str] | None = None, 

71 ) -> InstallPlan: 

72 """Create an installation plan. 

73 

74 Args: 

75 tools: Specific tool names to install. If None, uses profile. 

76 profile: Profile name to resolve tools from. 

77 upgrade: If True, upgrade already-installed tools. 

78 detected_langs: Detected languages for profile resolution. 

79 

80 Returns: 

81 InstallPlan describing what will happen. 

82 """ 

83 plan = InstallPlan() 

84 

85 # Determine which tools to consider 

86 if tools is not None: 

87 tools = list(dict.fromkeys(tools)) # deduplicate, preserve order 

88 unknown = [n for n in tools if n not in self._registry] 

89 if unknown: 

90 logger.warning( 

91 "Unknown tools (not in registry): {}", 

92 ", ".join(unknown), 

93 ) 

94 tool_list = [self._registry.get(n) for n in tools if n in self._registry] 

95 elif profile: 

96 tool_list = self._registry.tools_for_profile( 

97 profile, 

98 detected_langs, 

99 ) 

100 else: 

101 tool_list = self._registry.all_tools() 

102 

103 for tool in tool_list: 

104 self._plan_tool(plan, tool, upgrade=upgrade) 

105 

106 return plan 

107 

108 @staticmethod 

109 def _is_manual_hint(hint: str) -> bool: 

110 """Check if an install hint is a human-only message, not an executable command. 

111 

112 Args: 

113 hint: Install/upgrade command string. 

114 

115 Returns: 

116 True if the hint requires manual action. 

117 """ 

118 return ( 

119 hint.startswith(("See ", "Install ", "Upgrade ")) 

120 or "https://" in hint 

121 or "http://" in hint 

122 ) 

123 

124 def _plan_tool( 

125 self, 

126 plan: InstallPlan, 

127 tool: ManifestTool, 

128 *, 

129 upgrade: bool, 

130 ) -> None: 

131 """Plan installation for a single tool. 

132 

133 Args: 

134 plan: Plan to add to. 

135 tool: Tool to plan for. 

136 upgrade: Whether to upgrade if already installed. 

137 """ 

138 # Check current installation status first — tool may already be on PATH 

139 # even if its package manager isn't available 

140 installed_version = self._get_installed_version(tool) 

141 

142 if installed_version: 

143 is_current = self._version_meets_minimum( 

144 installed_version, 

145 tool.version, 

146 ) 

147 if is_current: 

148 plan.already_ok.append(tool) 

149 elif upgrade: 

150 skip_reason = self._check_prerequisites(tool) 

151 if skip_reason: 

152 plan.skipped.append((tool, skip_reason)) 

153 return 

154 hint = self._get_install_command(tool, upgrade=True) 

155 if self._is_manual_hint(hint): 

156 if self._has_install_script(tool): 

157 hint = f"via install-tools.sh ({tool.name})" 

158 else: 

159 plan.skipped.append( 

160 (tool, f"manual upgrade required: {hint}"), 

161 ) 

162 return 

163 plan.to_upgrade.append((tool, installed_version, hint)) 

164 else: 

165 plan.outdated.append((tool, installed_version)) 

166 return 

167 

168 # Only check prerequisites when we need to install 

169 skip_reason = self._check_prerequisites(tool) 

170 if skip_reason: 

171 plan.skipped.append((tool, skip_reason)) 

172 return 

173 

174 hint = self._get_install_command(tool) 

175 if self._is_manual_hint(hint): 

176 if self._has_install_script(tool): 

177 hint = f"via install-tools.sh ({tool.name})" 

178 else: 

179 plan.skipped.append((tool, f"manual install required: {hint}")) 

180 return 

181 plan.to_install.append((tool, hint)) 

182 

183 def _check_prerequisites(self, tool: ManifestTool) -> str | None: 

184 """Check if prerequisites for installing a tool are met. 

185 

186 Delegates to the install strategy for the tool's install_type. 

187 

188 Args: 

189 tool: Tool to check. 

190 

191 Returns: 

192 Skip reason string, or None if prerequisites are met. 

193 """ 

194 strategy = get_strategy(tool.install_type) 

195 if strategy is None: 

196 return None 

197 return strategy.check_prerequisites(self._context.environment, tool.name) 

198 

199 def _get_installed_version(self, tool: ManifestTool) -> str | None: 

200 """Get the currently installed version of a tool. 

201 

202 Args: 

203 tool: Tool to check. 

204 

205 Returns: 

206 Version string or None if not installed. 

207 """ 

208 if not tool.version_command: 

209 return None 

210 

211 main_cmd = tool.version_command[0] 

212 if main_cmd not in ("sh", "bash", "cargo") and not shutil.which(main_cmd): 

213 return None 

214 

215 try: 

216 result = subprocess.run( 

217 tool.version_command, 

218 capture_output=True, 

219 text=True, 

220 timeout=10, 

221 check=False, 

222 ) 

223 if result.returncode != 0: 

224 return None 

225 output = result.stdout + result.stderr 

226 return extract_version_from_output(output, tool.name) 

227 except (subprocess.TimeoutExpired, OSError): 

228 return None 

229 

230 @staticmethod 

231 def _version_meets_minimum(installed: str, minimum: str) -> bool: 

232 """Check if installed version meets the minimum requirement. 

233 

234 Delegates to version_parsing.compare_versions which uses the 

235 packaging library for robust PEP 440 version comparison. 

236 

237 Args: 

238 installed: Installed version string. 

239 minimum: Minimum required version string. 

240 

241 Returns: 

242 True if installed >= minimum. 

243 """ 

244 try: 

245 return compare_versions(installed, minimum) >= 0 

246 except ValueError as exc: 

247 logger.debug( 

248 f"Version comparison failed for {installed!r} vs {minimum!r}: {exc}", 

249 ) 

250 return False 

251 

252 def _get_install_command( 

253 self, 

254 tool: ManifestTool, 

255 *, 

256 upgrade: bool = False, 

257 ) -> str: 

258 """Get the install command string for a tool. 

259 

260 Delegates to the install strategy for the tool's install_type. 

261 

262 Args: 

263 tool: Tool to generate command for. 

264 upgrade: If True, generate an upgrade command. 

265 

266 Returns: 

267 Shell command string. 

268 """ 

269 strategy = get_strategy(tool.install_type) 

270 env = self._context.environment 

271 _args = ( 

272 env, 

273 tool.name, 

274 tool.version, 

275 tool.install_package, 

276 tool.install_component, 

277 ) 

278 if strategy is None: 

279 return ( 

280 f"Upgrade {tool.name} manually" 

281 if upgrade 

282 else f"Install {tool.name} manually" 

283 ) 

284 if upgrade: 

285 hint = strategy.upgrade_hint(*_args) 

286 # For brew upgrades, validate that brew actually manages this 

287 # package — if not, use the non-brew install command instead 

288 # (strategies may prefer brew when available, so we can't just 

289 # call install_hint which might also suggest brew). 

290 if hint.startswith("brew upgrade"): 

291 brew_pkg = hint.split()[-1] if hint.split() else tool.name 

292 if not self._is_brew_managed(brew_pkg): 

293 pkg = tool.install_package or tool.name 

294 hint = f"Upgrade {pkg} manually (not managed by Homebrew)" 

295 return hint 

296 return strategy.install_hint(*_args) 

297 

298 @staticmethod 

299 def _is_brew_managed(package: str) -> bool: 

300 """Check if a package is installed via Homebrew. 

301 

302 Args: 

303 package: Homebrew formula name. 

304 

305 Returns: 

306 True if brew manages this package. 

307 """ 

308 if not shutil.which("brew"): 

309 return False 

310 try: 

311 result = subprocess.run( 

312 ["brew", "list", "--formula", package], 

313 capture_output=True, 

314 timeout=10, 

315 check=False, 

316 ) 

317 return result.returncode == 0 

318 except (subprocess.TimeoutExpired, OSError): 

319 return False 

320 

321 def execute(self, plan: InstallPlan) -> list[InstallResult]: 

322 """Execute an installation plan. 

323 

324 Args: 

325 plan: The plan to execute. 

326 

327 Returns: 

328 List of results for each install/upgrade action. 

329 """ 

330 results: list[InstallResult] = [] 

331 

332 for tool, command in plan.to_install: 

333 result = self._run_install(tool, command) 

334 results.append(result) 

335 

336 for tool, _current_ver, command in plan.to_upgrade: 

337 result = self._run_install(tool, command) 

338 results.append(result) 

339 

340 return results 

341 

342 def _run_install( 

343 self, 

344 tool: ManifestTool, 

345 command: str, 

346 ) -> InstallResult: 

347 """Run an install command for a tool. 

348 

349 Args: 

350 tool: Tool being installed. 

351 command: Shell command string. 

352 

353 Returns: 

354 InstallResult. 

355 """ 

356 logger.info(f"Installing {tool.name}: {command}") 

357 start = time.monotonic() 

358 

359 try: 

360 # Script-backed installs: the planner sets "via install-tools.sh" 

361 # when a helper script is available for binary tools 

362 if command.startswith("via install-tools.sh"): 

363 result = self._install_via_script(tool) 

364 if result: 

365 return result 

366 return InstallResult( 

367 tool=tool, 

368 success=False, 

369 message="install-tools.sh not found", 

370 duration_seconds=time.monotonic() - start, 

371 ) 

372 

373 # Non-executable hints: try install script for binary tools, 

374 # otherwise report as manual 

375 if self._is_manual_hint(command): 

376 if tool.install_type == "binary": 

377 result = self._install_via_script(tool) 

378 if result: 

379 return result 

380 return InstallResult( 

381 tool=tool, 

382 success=False, 

383 message=f"Manual install required: {command}", 

384 duration_seconds=0.0, 

385 ) 

386 

387 # Otherwise run the command directly 

388 proc = subprocess.run( 

389 shlex.split(command), 

390 capture_output=True, 

391 text=True, 

392 timeout=300, 

393 check=False, 

394 ) 

395 duration = time.monotonic() - start 

396 

397 if proc.returncode == 0: 

398 return InstallResult( 

399 tool=tool, 

400 success=True, 

401 message="Installed successfully", 

402 duration_seconds=duration, 

403 ) 

404 return InstallResult( 

405 tool=tool, 

406 success=False, 

407 message=f"Command failed (exit {proc.returncode}): {proc.stderr[:200]}", 

408 duration_seconds=duration, 

409 ) 

410 except subprocess.TimeoutExpired: 

411 return InstallResult( 

412 tool=tool, 

413 success=False, 

414 message="Installation timed out (5 min)", 

415 duration_seconds=time.monotonic() - start, 

416 ) 

417 except OSError as e: 

418 return InstallResult( 

419 tool=tool, 

420 success=False, 

421 message=f"OS error: {e}", 

422 duration_seconds=time.monotonic() - start, 

423 ) 

424 

425 @staticmethod 

426 def _has_install_script(tool: ManifestTool) -> bool: 

427 """Check if an install script exists for a binary tool. 

428 

429 Reuses the same script lookup as _install_via_script. 

430 

431 Args: 

432 tool: Tool to check. 

433 

434 Returns: 

435 True if a script can handle this tool. 

436 """ 

437 if tool.install_type != "binary": 

438 return False 

439 if not shutil.which("bash"): 

440 return False 

441 script = ( 

442 Path(__file__).parent.parent.parent.parent 

443 / "scripts" 

444 / "utils" 

445 / "install-tools.sh" 

446 ) 

447 return script.exists() 

448 

449 def _install_via_script(self, tool: ManifestTool) -> InstallResult | None: 

450 """Try to install a binary tool via install-tools.sh. 

451 

452 Args: 

453 tool: Binary tool to install. 

454 

455 Returns: 

456 InstallResult if script was found and executed, None otherwise. 

457 """ 

458 # Look for install-tools.sh relative to the lintro package 

459 script_paths = [ 

460 Path(__file__).parent.parent.parent.parent 

461 / "scripts" 

462 / "utils" 

463 / "install-tools.sh", 

464 ] 

465 

466 script = None 

467 for p in script_paths: 

468 if p.exists(): 

469 script = p 

470 break 

471 

472 if not script: 

473 logger.debug( 

474 "install-tools.sh not found for binary install " 

475 "(only available in dev/Homebrew installs, not pip)", 

476 ) 

477 return None 

478 

479 tool_arg = tool.name.replace("_", "-") 

480 cmd = ["bash", str(script), "--tools", tool_arg] 

481 

482 start = time.monotonic() 

483 try: 

484 proc = subprocess.run( 

485 cmd, 

486 capture_output=True, 

487 text=True, 

488 check=False, 

489 timeout=300, 

490 ) 

491 duration = time.monotonic() - start 

492 

493 if proc.returncode == 0: 

494 return InstallResult( 

495 tool=tool, 

496 success=True, 

497 message="Installed via install-tools.sh", 

498 duration_seconds=duration, 

499 ) 

500 return InstallResult( 

501 tool=tool, 

502 success=False, 

503 message=f"install-tools.sh failed: {proc.stderr[:200]}", 

504 duration_seconds=duration, 

505 ) 

506 except (subprocess.TimeoutExpired, OSError) as exc: 

507 return InstallResult( 

508 tool=tool, 

509 success=False, 

510 message=f"install-tools.sh execution failed: {exc}", 

511 duration_seconds=time.monotonic() - start, 

512 )