Coverage for lintro / tools / core / runtime_discovery.py: 88%

129 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Runtime tool discovery for compiled binary mode. 

2 

3This module provides functions to discover external tools (ruff, black, mypy, etc.) 

4at runtime by searching the system PATH. This is essential for the compiled binary 

5distribution where lintro itself is standalone but depends on external tools. 

6 

7When lintro runs as a compiled binary: 

81. It cannot bundle tools like ruff, mypy, black (they are separate executables) 

92. Users must install these tools separately (via pip, brew, etc.) 

103. This module discovers which tools are available and their locations 

11 

12Usage: 

13 from lintro.tools.core.runtime_discovery import discover_tool, discover_all_tools 

14 

15 # Discover a single tool 

16 tool = discover_tool("ruff") 

17 if tool.available: 

18 print(f"Found ruff at {tool.path} (version {tool.version})") 

19 

20 # Discover all configured tools 

21 tools = discover_all_tools() 

22 for name, info in tools.items(): 

23 print(f"{name}: {'available' if info.available else 'missing'}") 

24""" 

25 

26from __future__ import annotations 

27 

28import json 

29import re 

30import shutil 

31import subprocess 

32import threading 

33from dataclasses import dataclass, field 

34 

35from loguru import logger 

36 

37# Default timeout for version checks (seconds) 

38VERSION_CHECK_TIMEOUT: int = 5 

39 

40 

41@dataclass 

42class _ToolProbeInfo: 

43 """Internal probe metadata for a single tool.""" 

44 

45 version_command: tuple[str, ...] 

46 executable: str | None = None 

47 

48 

49def _get_tool_probe_info() -> dict[str, _ToolProbeInfo] | None: 

50 """Get tool probe info (version commands + preferred executable) from the registry. 

51 

52 Returns: 

53 Dict mapping tool names to probe info, 

54 or None if the registry is unavailable (early startup). 

55 """ 

56 try: 

57 from lintro.tools.core.tool_registry import ToolRegistry 

58 

59 registry = ToolRegistry.load() 

60 return { 

61 tool.name: _ToolProbeInfo( 

62 version_command=tool.version_command, 

63 executable=tool.install_bin, 

64 ) 

65 for tool in registry.all_tools(include_dev=True) 

66 if tool.version_command 

67 } 

68 except ( 

69 ImportError, 

70 FileNotFoundError, 

71 KeyError, 

72 ValueError, 

73 json.JSONDecodeError, 

74 ) as exc: 

75 logger.debug("Registry unavailable, tool discovery limited: {}", exc) 

76 return None 

77 

78 

79@dataclass 

80class DiscoveredTool: 

81 """Information about a discovered external tool. 

82 

83 Attributes: 

84 name: The canonical name of the tool (e.g., "ruff", "black"). 

85 path: Full path to the tool executable, or empty string if not found. 

86 version: Version string if available, None otherwise. 

87 available: True if the tool was found and is executable, False by default. 

88 error_message: Error message if discovery failed, None otherwise. 

89 """ 

90 

91 name: str 

92 path: str = "" 

93 version: str | None = None 

94 available: bool = False 

95 error_message: str | None = None 

96 

97 

98@dataclass 

99class ToolDiscoveryCache: 

100 """Cache for discovered tools to avoid repeated PATH lookups. 

101 

102 Attributes: 

103 tools: Dictionary mapping tool names to their discovery info. 

104 is_populated: True if the cache has been populated. 

105 """ 

106 

107 tools: dict[str, DiscoveredTool] = field(default_factory=dict) 

108 is_populated: bool = False 

109 

110 

111# Global cache instance and lock for thread safety 

112_discovery_cache = ToolDiscoveryCache() 

113_discovery_cache_lock = threading.Lock() 

114 

115 

116def _extract_version(output: str) -> str | None: 

117 """Extract version number from tool output. 

118 

119 Args: 

120 output: Raw output from tool's version command. 

121 

122 Returns: 

123 Extracted version string or None if not found. 

124 """ 

125 # Common version patterns: 

126 # - "ruff 0.1.0" 

127 # - "black, version 23.0.0" 

128 # - "mypy 1.0.0" 

129 # - "v1.2.3" 

130 patterns = [ 

131 r"(\d+\.\d+\.\d+)", # Semantic version (1.2.3) 

132 r"v(\d+\.\d+\.\d+)", # Prefixed version (v1.2.3) 

133 r"version\s+(\d+\.\d+\.\d+)", # "version 1.2.3" 

134 ] 

135 

136 for pattern in patterns: 

137 match = re.search(pattern, output, re.IGNORECASE) 

138 if match: 

139 return match.group(1) 

140 

141 return None 

142 

143 

144def discover_tool( 

145 tool_name: str, 

146 use_cache: bool = True, 

147 *, 

148 _probe_info: dict[str, _ToolProbeInfo] | None = None, 

149) -> DiscoveredTool: 

150 """Discover a single tool in the system PATH. 

151 

152 Thread-safe: uses lock to protect cache access. 

153 

154 Args: 

155 tool_name: Name of the tool to discover (e.g., "ruff", "black"). 

156 use_cache: Whether to use cached results if available. 

157 _probe_info: Pre-loaded probe info map (avoids redundant registry 

158 lookups when called from discover_all_tools). 

159 

160 Returns: 

161 DiscoveredTool with information about the tool. 

162 """ 

163 # Check cache first (thread-safe) 

164 with _discovery_cache_lock: 

165 if use_cache and tool_name in _discovery_cache.tools: 

166 return _discovery_cache.tools[tool_name] 

167 

168 logger.debug(f"Discovering tool: {tool_name}") 

169 

170 # Get probe metadata from the registry (version_command + preferred executable) 

171 probe_info_map = _probe_info if _probe_info is not None else _get_tool_probe_info() 

172 if probe_info_map is None: 

173 # Registry unavailable — don't cache guessed probes so we retry later 

174 logger.debug(f"Registry unavailable, skipping discovery for {tool_name}") 

175 return DiscoveredTool( 

176 name=tool_name, 

177 available=False, 

178 error_message="registry unavailable", 

179 ) 

180 

181 probe = probe_info_map.get(tool_name) 

182 version_cmd = probe.version_command if probe else [tool_name, "--version"] 

183 

184 # Prefer the registry-provided executable (install_bin) over deriving 

185 # from version_cmd[0], which misreports tools invoked via wrappers 

186 # (e.g., cargo subcommands, node -e probes). 

187 if probe and probe.executable: 

188 executable = probe.executable 

189 else: 

190 executable = version_cmd[0] if version_cmd else tool_name 

191 # For shell/interpreter-wrapped probes (e.g., ["sh","-c","..."], 

192 # ["node","-e","..."], ["python","-c","..."]), resolve the inner 

193 # command's executable instead of caching the wrapper path. 

194 _wrappers = ("sh", "bash", "zsh", "node", "python", "python3", "ruby", "perl") 

195 if ( 

196 executable in _wrappers 

197 and len(version_cmd) >= 3 

198 and version_cmd[1] in ("-c", "-e") 

199 ): 

200 inner_tokens = version_cmd[2].split() 

201 if inner_tokens: 

202 executable = inner_tokens[0] 

203 

204 # Find the executable in PATH (outside lock - this is IO-bound) 

205 path = shutil.which(executable) 

206 

207 if not path: 

208 result = DiscoveredTool( 

209 name=tool_name, 

210 path="", 

211 available=False, 

212 error_message=f"{executable} not found in PATH", 

213 ) 

214 with _discovery_cache_lock: 

215 _discovery_cache.tools[tool_name] = result 

216 logger.debug(f"Tool {tool_name} ({executable}) not found in PATH") 

217 return result 

218 

219 # Run the full version probe — only mark available if it succeeds 

220 version: str | None = None 

221 probe_ok = False 

222 

223 try: 

224 proc_result = subprocess.run( 

225 version_cmd, 

226 capture_output=True, 

227 text=True, 

228 timeout=VERSION_CHECK_TIMEOUT, 

229 ) 

230 if proc_result.returncode == 0: 

231 version = _extract_version(proc_result.stdout or proc_result.stderr) 

232 probe_ok = True 

233 except subprocess.TimeoutExpired: 

234 logger.debug(f"Version check for {tool_name} timed out") 

235 except (OSError, subprocess.SubprocessError) as e: 

236 logger.debug(f"Failed to get version for {tool_name}: {e}") 

237 

238 discovered = DiscoveredTool( 

239 name=tool_name, 

240 path=path, 

241 version=version, 

242 available=probe_ok, 

243 error_message=None if probe_ok else f"{tool_name} version probe failed", 

244 ) 

245 

246 with _discovery_cache_lock: 

247 _discovery_cache.tools[tool_name] = discovered 

248 logger.debug(f"Discovered {tool_name} at {path} (version: {version})") 

249 

250 return discovered 

251 

252 

253def discover_all_tools(use_cache: bool = True) -> dict[str, DiscoveredTool]: 

254 """Discover all configured external tools. 

255 

256 Thread-safe: uses lock to protect cache access. 

257 

258 Args: 

259 use_cache: Whether to use cached results if available. 

260 

261 Returns: 

262 Dictionary mapping tool names to their discovery info. 

263 """ 

264 with _discovery_cache_lock: 

265 if use_cache and _discovery_cache.is_populated: 

266 return _discovery_cache.tools.copy() 

267 

268 probe_info_map = _get_tool_probe_info() 

269 if probe_info_map is None: 

270 # Registry unavailable — clear stale discoveries so downstream 

271 # get_available_tools()/get_unavailable_tools() don't see outdated entries 

272 with _discovery_cache_lock: 

273 _discovery_cache.tools.clear() 

274 _discovery_cache.is_populated = False 

275 return {} 

276 

277 for tool_name in probe_info_map: 

278 discover_tool(tool_name, use_cache=False, _probe_info=probe_info_map) 

279 

280 with _discovery_cache_lock: 

281 # Prune stale entries for tools no longer in the registry 

282 stale = _discovery_cache.tools.keys() - probe_info_map.keys() 

283 for key in stale: 

284 del _discovery_cache.tools[key] 

285 _discovery_cache.is_populated = True 

286 return _discovery_cache.tools.copy() 

287 

288 

289def clear_discovery_cache() -> None: 

290 """Clear the tool discovery cache. 

291 

292 Thread-safe: uses lock to protect cache access. 

293 Call this if tools may have been installed/uninstalled since last check. 

294 """ 

295 global _discovery_cache 

296 with _discovery_cache_lock: 

297 _discovery_cache = ToolDiscoveryCache() 

298 logger.debug("Tool discovery cache cleared") 

299 

300 

301def is_tool_available(tool_name: str) -> bool: 

302 """Check if a tool is available in the system PATH. 

303 

304 Args: 

305 tool_name: Name of the tool to check. 

306 

307 Returns: 

308 True if the tool is available, False otherwise. 

309 """ 

310 return discover_tool(tool_name).available 

311 

312 

313def get_tool_path(tool_name: str) -> str | None: 

314 """Get the full path to a tool executable. 

315 

316 Args: 

317 tool_name: Name of the tool. 

318 

319 Returns: 

320 Full path to the executable, or None if not found. 

321 """ 

322 tool = discover_tool(tool_name) 

323 return tool.path if tool.available else None 

324 

325 

326def get_unavailable_tools() -> list[str]: 

327 """Get a list of tools that are not available. 

328 

329 Returns: 

330 List of tool names that were not found in PATH. 

331 """ 

332 discover_all_tools() 

333 return [name for name, tool in _discovery_cache.tools.items() if not tool.available] 

334 

335 

336def get_available_tools() -> list[str]: 

337 """Get a list of tools that are available. 

338 

339 Returns: 

340 List of tool names that were found in PATH. 

341 """ 

342 discover_all_tools() 

343 return [name for name, tool in _discovery_cache.tools.items() if tool.available] 

344 

345 

346def format_tool_status_table() -> str: 

347 """Format a table showing the status of all tools. 

348 

349 Returns: 

350 Formatted string table showing tool availability. 

351 """ 

352 tools = discover_all_tools() 

353 

354 lines = [ 

355 "Tool Discovery Status", 

356 "=" * 60, 

357 f"{'Tool':<15} {'Status':<12} {'Version':<15} {'Path'}", 

358 "-" * 60, 

359 ] 

360 

361 for name, tool in sorted(tools.items()): 

362 status = "Available" if tool.available else "Missing" 

363 version = tool.version or "-" 

364 path = tool.path or tool.error_message or "-" 

365 lines.append(f"{name:<15} {status:<12} {version:<15} {path}") 

366 

367 lines.append("-" * 60) 

368 

369 available = sum(1 for t in tools.values() if t.available) 

370 total = len(tools) 

371 lines.append(f"Available: {available}/{total} tools") 

372 

373 return "\n".join(lines)