Coverage for lintro / tools / core / runtime_discovery.py: 88%
129 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Runtime tool discovery for compiled binary mode.
3This module provides functions to discover external tools (ruff, black, mypy, etc.)
4at runtime by searching the system PATH. This is essential for the compiled binary
5distribution where lintro itself is standalone but depends on external tools.
7When lintro runs as a compiled binary:
81. It cannot bundle tools like ruff, mypy, black (they are separate executables)
92. Users must install these tools separately (via pip, brew, etc.)
103. This module discovers which tools are available and their locations
12Usage:
13 from lintro.tools.core.runtime_discovery import discover_tool, discover_all_tools
15 # Discover a single tool
16 tool = discover_tool("ruff")
17 if tool.available:
18 print(f"Found ruff at {tool.path} (version {tool.version})")
20 # Discover all configured tools
21 tools = discover_all_tools()
22 for name, info in tools.items():
23 print(f"{name}: {'available' if info.available else 'missing'}")
24"""
26from __future__ import annotations
28import json
29import re
30import shutil
31import subprocess
32import threading
33from dataclasses import dataclass, field
35from loguru import logger
37# Default timeout for version checks (seconds)
38VERSION_CHECK_TIMEOUT: int = 5
41@dataclass
42class _ToolProbeInfo:
43 """Internal probe metadata for a single tool."""
45 version_command: tuple[str, ...]
46 executable: str | None = None
49def _get_tool_probe_info() -> dict[str, _ToolProbeInfo] | None:
50 """Get tool probe info (version commands + preferred executable) from the registry.
52 Returns:
53 Dict mapping tool names to probe info,
54 or None if the registry is unavailable (early startup).
55 """
56 try:
57 from lintro.tools.core.tool_registry import ToolRegistry
59 registry = ToolRegistry.load()
60 return {
61 tool.name: _ToolProbeInfo(
62 version_command=tool.version_command,
63 executable=tool.install_bin,
64 )
65 for tool in registry.all_tools(include_dev=True)
66 if tool.version_command
67 }
68 except (
69 ImportError,
70 FileNotFoundError,
71 KeyError,
72 ValueError,
73 json.JSONDecodeError,
74 ) as exc:
75 logger.debug("Registry unavailable, tool discovery limited: {}", exc)
76 return None
79@dataclass
80class DiscoveredTool:
81 """Information about a discovered external tool.
83 Attributes:
84 name: The canonical name of the tool (e.g., "ruff", "black").
85 path: Full path to the tool executable, or empty string if not found.
86 version: Version string if available, None otherwise.
87 available: True if the tool was found and is executable, False by default.
88 error_message: Error message if discovery failed, None otherwise.
89 """
91 name: str
92 path: str = ""
93 version: str | None = None
94 available: bool = False
95 error_message: str | None = None
98@dataclass
99class ToolDiscoveryCache:
100 """Cache for discovered tools to avoid repeated PATH lookups.
102 Attributes:
103 tools: Dictionary mapping tool names to their discovery info.
104 is_populated: True if the cache has been populated.
105 """
107 tools: dict[str, DiscoveredTool] = field(default_factory=dict)
108 is_populated: bool = False
111# Global cache instance and lock for thread safety
112_discovery_cache = ToolDiscoveryCache()
113_discovery_cache_lock = threading.Lock()
116def _extract_version(output: str) -> str | None:
117 """Extract version number from tool output.
119 Args:
120 output: Raw output from tool's version command.
122 Returns:
123 Extracted version string or None if not found.
124 """
125 # Common version patterns:
126 # - "ruff 0.1.0"
127 # - "black, version 23.0.0"
128 # - "mypy 1.0.0"
129 # - "v1.2.3"
130 patterns = [
131 r"(\d+\.\d+\.\d+)", # Semantic version (1.2.3)
132 r"v(\d+\.\d+\.\d+)", # Prefixed version (v1.2.3)
133 r"version\s+(\d+\.\d+\.\d+)", # "version 1.2.3"
134 ]
136 for pattern in patterns:
137 match = re.search(pattern, output, re.IGNORECASE)
138 if match:
139 return match.group(1)
141 return None
144def discover_tool(
145 tool_name: str,
146 use_cache: bool = True,
147 *,
148 _probe_info: dict[str, _ToolProbeInfo] | None = None,
149) -> DiscoveredTool:
150 """Discover a single tool in the system PATH.
152 Thread-safe: uses lock to protect cache access.
154 Args:
155 tool_name: Name of the tool to discover (e.g., "ruff", "black").
156 use_cache: Whether to use cached results if available.
157 _probe_info: Pre-loaded probe info map (avoids redundant registry
158 lookups when called from discover_all_tools).
160 Returns:
161 DiscoveredTool with information about the tool.
162 """
163 # Check cache first (thread-safe)
164 with _discovery_cache_lock:
165 if use_cache and tool_name in _discovery_cache.tools:
166 return _discovery_cache.tools[tool_name]
168 logger.debug(f"Discovering tool: {tool_name}")
170 # Get probe metadata from the registry (version_command + preferred executable)
171 probe_info_map = _probe_info if _probe_info is not None else _get_tool_probe_info()
172 if probe_info_map is None:
173 # Registry unavailable — don't cache guessed probes so we retry later
174 logger.debug(f"Registry unavailable, skipping discovery for {tool_name}")
175 return DiscoveredTool(
176 name=tool_name,
177 available=False,
178 error_message="registry unavailable",
179 )
181 probe = probe_info_map.get(tool_name)
182 version_cmd = probe.version_command if probe else [tool_name, "--version"]
184 # Prefer the registry-provided executable (install_bin) over deriving
185 # from version_cmd[0], which misreports tools invoked via wrappers
186 # (e.g., cargo subcommands, node -e probes).
187 if probe and probe.executable:
188 executable = probe.executable
189 else:
190 executable = version_cmd[0] if version_cmd else tool_name
191 # For shell/interpreter-wrapped probes (e.g., ["sh","-c","..."],
192 # ["node","-e","..."], ["python","-c","..."]), resolve the inner
193 # command's executable instead of caching the wrapper path.
194 _wrappers = ("sh", "bash", "zsh", "node", "python", "python3", "ruby", "perl")
195 if (
196 executable in _wrappers
197 and len(version_cmd) >= 3
198 and version_cmd[1] in ("-c", "-e")
199 ):
200 inner_tokens = version_cmd[2].split()
201 if inner_tokens:
202 executable = inner_tokens[0]
204 # Find the executable in PATH (outside lock - this is IO-bound)
205 path = shutil.which(executable)
207 if not path:
208 result = DiscoveredTool(
209 name=tool_name,
210 path="",
211 available=False,
212 error_message=f"{executable} not found in PATH",
213 )
214 with _discovery_cache_lock:
215 _discovery_cache.tools[tool_name] = result
216 logger.debug(f"Tool {tool_name} ({executable}) not found in PATH")
217 return result
219 # Run the full version probe — only mark available if it succeeds
220 version: str | None = None
221 probe_ok = False
223 try:
224 proc_result = subprocess.run(
225 version_cmd,
226 capture_output=True,
227 text=True,
228 timeout=VERSION_CHECK_TIMEOUT,
229 )
230 if proc_result.returncode == 0:
231 version = _extract_version(proc_result.stdout or proc_result.stderr)
232 probe_ok = True
233 except subprocess.TimeoutExpired:
234 logger.debug(f"Version check for {tool_name} timed out")
235 except (OSError, subprocess.SubprocessError) as e:
236 logger.debug(f"Failed to get version for {tool_name}: {e}")
238 discovered = DiscoveredTool(
239 name=tool_name,
240 path=path,
241 version=version,
242 available=probe_ok,
243 error_message=None if probe_ok else f"{tool_name} version probe failed",
244 )
246 with _discovery_cache_lock:
247 _discovery_cache.tools[tool_name] = discovered
248 logger.debug(f"Discovered {tool_name} at {path} (version: {version})")
250 return discovered
253def discover_all_tools(use_cache: bool = True) -> dict[str, DiscoveredTool]:
254 """Discover all configured external tools.
256 Thread-safe: uses lock to protect cache access.
258 Args:
259 use_cache: Whether to use cached results if available.
261 Returns:
262 Dictionary mapping tool names to their discovery info.
263 """
264 with _discovery_cache_lock:
265 if use_cache and _discovery_cache.is_populated:
266 return _discovery_cache.tools.copy()
268 probe_info_map = _get_tool_probe_info()
269 if probe_info_map is None:
270 # Registry unavailable — clear stale discoveries so downstream
271 # get_available_tools()/get_unavailable_tools() don't see outdated entries
272 with _discovery_cache_lock:
273 _discovery_cache.tools.clear()
274 _discovery_cache.is_populated = False
275 return {}
277 for tool_name in probe_info_map:
278 discover_tool(tool_name, use_cache=False, _probe_info=probe_info_map)
280 with _discovery_cache_lock:
281 # Prune stale entries for tools no longer in the registry
282 stale = _discovery_cache.tools.keys() - probe_info_map.keys()
283 for key in stale:
284 del _discovery_cache.tools[key]
285 _discovery_cache.is_populated = True
286 return _discovery_cache.tools.copy()
289def clear_discovery_cache() -> None:
290 """Clear the tool discovery cache.
292 Thread-safe: uses lock to protect cache access.
293 Call this if tools may have been installed/uninstalled since last check.
294 """
295 global _discovery_cache
296 with _discovery_cache_lock:
297 _discovery_cache = ToolDiscoveryCache()
298 logger.debug("Tool discovery cache cleared")
301def is_tool_available(tool_name: str) -> bool:
302 """Check if a tool is available in the system PATH.
304 Args:
305 tool_name: Name of the tool to check.
307 Returns:
308 True if the tool is available, False otherwise.
309 """
310 return discover_tool(tool_name).available
313def get_tool_path(tool_name: str) -> str | None:
314 """Get the full path to a tool executable.
316 Args:
317 tool_name: Name of the tool.
319 Returns:
320 Full path to the executable, or None if not found.
321 """
322 tool = discover_tool(tool_name)
323 return tool.path if tool.available else None
326def get_unavailable_tools() -> list[str]:
327 """Get a list of tools that are not available.
329 Returns:
330 List of tool names that were not found in PATH.
331 """
332 discover_all_tools()
333 return [name for name, tool in _discovery_cache.tools.items() if not tool.available]
336def get_available_tools() -> list[str]:
337 """Get a list of tools that are available.
339 Returns:
340 List of tool names that were found in PATH.
341 """
342 discover_all_tools()
343 return [name for name, tool in _discovery_cache.tools.items() if tool.available]
346def format_tool_status_table() -> str:
347 """Format a table showing the status of all tools.
349 Returns:
350 Formatted string table showing tool availability.
351 """
352 tools = discover_all_tools()
354 lines = [
355 "Tool Discovery Status",
356 "=" * 60,
357 f"{'Tool':<15} {'Status':<12} {'Version':<15} {'Path'}",
358 "-" * 60,
359 ]
361 for name, tool in sorted(tools.items()):
362 status = "Available" if tool.available else "Missing"
363 version = tool.version or "-"
364 path = tool.path or tool.error_message or "-"
365 lines.append(f"{name:<15} {status:<12} {version:<15} {path}")
367 lines.append("-" * 60)
369 available = sum(1 for t in tools.values() if t.available)
370 total = len(tools)
371 lines.append(f"Available: {available}/{total} tools")
373 return "\n".join(lines)