Coverage for lintro / tools / core / tool_registry.py: 84%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Unified tool registry loaded from manifest.json. 

2 

3This module is the single source of truth for all tool metadata. It replaces 

4three parallel tool-to-version-command dicts that previously existed in: 

5- doctor.py (TOOL_COMMANDS) 

6- version_requirements.py (tool_commands) 

7- runtime_discovery.py (TOOL_VERSION_COMMANDS) 

8 

9Usage: 

10 from lintro.tools.core.tool_registry import ToolRegistry 

11 

12 registry = ToolRegistry.load() 

13 tool = registry.get("ruff") 

14 print(tool.version_command) # ["ruff", "--version"] 

15""" 

16 

17from __future__ import annotations 

18 

19import json 

20import logging 

21from functools import lru_cache 

22from pathlib import Path 

23from typing import Any 

24 

25from lintro.tools.core.manifest_models import ManifestTool, ProfileDefinition 

26 

27# Re-export so existing ``from lintro.tools.core.tool_registry import ManifestTool`` 

28# continues to work. 

29__all__ = [ 

30 "CATEGORY_LABELS", 

31 "ManifestTool", 

32 "ProfileDefinition", 

33 "ToolRegistry", 

34] 

35 

36# Use stdlib logging to avoid external dependencies during early imports 

37_logger = logging.getLogger(__name__) 

38 

39_MANIFEST_PATH = Path(__file__).parent.parent / "manifest.json" 

40 

41# Display labels for tool categories 

42CATEGORY_LABELS: dict[str, str] = { 

43 "bundled": "Bundled Python tools", 

44 "npm": "npm tools", 

45 "external": "External tools", 

46} 

47 

48 

49class ToolRegistry: 

50 """Single source of truth for all tool metadata. 

51 

52 Loaded from manifest.json v2. Provides typed access to tool metadata, 

53 version commands, language mappings, and profiles. 

54 """ 

55 

56 def __init__( 

57 self, 

58 tools: dict[str, ManifestTool], 

59 language_map: dict[str, list[str]], 

60 profiles: dict[str, ProfileDefinition], 

61 ) -> None: 

62 """Initialize the registry with parsed manifest data.""" 

63 self._tools = tools 

64 self._language_map = language_map 

65 self._profiles = profiles 

66 

67 @classmethod 

68 @lru_cache(maxsize=1) 

69 def load(cls) -> ToolRegistry: 

70 """Load the registry from manifest.json. 

71 

72 Cached via lru_cache — call clear_cache() to force a reload. 

73 

74 Returns: 

75 ToolRegistry: Loaded and cached registry instance. 

76 """ 

77 return cls._load_from_path(_MANIFEST_PATH) 

78 

79 @classmethod 

80 def clear_cache(cls) -> None: 

81 """Clear the cached registry so the next load() re-reads manifest.json. 

82 

83 Useful for tests and development workflows where the manifest may 

84 change between calls. 

85 """ 

86 cls.load.cache_clear() 

87 

88 @classmethod 

89 def _load_from_path(cls, path: Path) -> ToolRegistry: 

90 """Load registry from a specific manifest path. 

91 

92 Args: 

93 path: Path to manifest.json. 

94 

95 Returns: 

96 ToolRegistry: Loaded registry. 

97 

98 Raises: 

99 ValueError: If the manifest is malformed or has an invalid version. 

100 """ 

101 data = json.loads(path.read_text(encoding="utf-8")) 

102 if not isinstance(data, dict): 

103 raise ValueError( 

104 f"manifest root must be a JSON object, got {type(data).__name__}", 

105 ) 

106 raw_version = data.get("version", 1) 

107 try: 

108 manifest_version = int(raw_version) 

109 except (TypeError, ValueError): 

110 raise ValueError( 

111 f"manifest 'version' must be an integer, got {raw_version!r}", 

112 ) from None 

113 

114 # Validate and parse tools 

115 raw_tools = data.get("tools", []) 

116 if not isinstance(raw_tools, list): 

117 raise ValueError( 

118 f"manifest 'tools' must be a list, got {type(raw_tools).__name__}", 

119 ) 

120 tools: dict[str, ManifestTool] = {} 

121 for entry in raw_tools: 

122 tool = cls._parse_tool_entry(entry, manifest_version) 

123 if tool: 

124 tools[tool.name] = tool 

125 

126 # Validate and parse language_map (v2 only) 

127 language_map: dict[str, list[str]] = data.get("language_map", {}) 

128 if not isinstance(language_map, dict): 

129 kind = type(language_map).__name__ 

130 raise ValueError( 

131 f"manifest 'language_map' must be a dict, got {kind}", 

132 ) 

133 

134 # Validate and parse profiles (v2 only) 

135 raw_profiles = data.get("profiles", {}) 

136 if not isinstance(raw_profiles, dict): 

137 kind = type(raw_profiles).__name__ 

138 raise ValueError( 

139 f"manifest 'profiles' must be a dict, got {kind}", 

140 ) 

141 profiles: dict[str, ProfileDefinition] = {} 

142 for name, pdata in raw_profiles.items(): 

143 profiles[name] = ProfileDefinition( 

144 name=name, 

145 description=pdata.get("description", ""), 

146 strategy=pdata.get("strategy", "explicit"), 

147 tools=tuple(pdata.get("tools", [])), 

148 exclude_types=tuple(pdata.get("exclude_types", [])), 

149 ) 

150 

151 _logger.debug( 

152 "Loaded %d tools, %d profiles from manifest v%d", 

153 len(tools), 

154 len(profiles), 

155 manifest_version, 

156 ) 

157 return cls(tools=tools, language_map=language_map, profiles=profiles) 

158 

159 @staticmethod 

160 def _parse_tool_entry( 

161 entry: dict[str, Any], 

162 manifest_version: int, 

163 ) -> ManifestTool | None: 

164 """Parse a single tool entry from manifest data. 

165 

166 Args: 

167 entry: Raw dict from manifest JSON. 

168 manifest_version: Manifest schema version. 

169 

170 Returns: 

171 ManifestTool or None if entry is invalid. 

172 """ 

173 name = entry.get("name") 

174 version = entry.get("version") 

175 if not name or not version: 

176 return None 

177 

178 install = entry.get("install", {}) 

179 install_type = install.get("type", "binary") 

180 

181 # v2: version_command at top level; v1 compat: fall back to install 

182 if manifest_version >= 2: 

183 version_command = entry.get("version_command", []) 

184 if not isinstance(version_command, list) or not all( 

185 isinstance(t, str) and t.strip() for t in version_command 

186 ): 

187 _logger.warning( 

188 "Tool %r has invalid version_command %r, treating as absent", 

189 name, 

190 version_command, 

191 ) 

192 version_command = [] 

193 else: 

194 version_command = entry.get("version_command") or install.get( 

195 "version_command", 

196 [], 

197 ) 

198 

199 # v1 compat: derive category from install type if not present 

200 category = entry.get("category") 

201 if not category: 

202 category_map = { 

203 "pip": "bundled", 

204 "npm": "npm", 

205 "binary": "external", 

206 "cargo": "external", 

207 "rustup": "external", 

208 } 

209 category = category_map.get(install_type, "external") 

210 

211 return ManifestTool( 

212 name=name, 

213 version=str(version), 

214 install_type=install_type, 

215 install_package=install.get("package"), 

216 install_bin=install.get("bin"), 

217 install_component=install.get("component"), 

218 tier=entry.get("tier", "tools"), 

219 category=category, 

220 version_command=tuple(version_command), 

221 languages=tuple(entry.get("languages", [])), 

222 tags=tuple(entry.get("tags", [])), 

223 ) 

224 

225 # ── Query methods ────────────────────────────────────────────── 

226 

227 def get(self, name: str) -> ManifestTool: 

228 """Get a tool by name. 

229 

230 Args: 

231 name: Tool name (e.g., "ruff"). 

232 

233 Returns: 

234 ManifestTool. 

235 

236 Raises: 

237 KeyError: If tool is not in the registry. 

238 """ 

239 if name not in self._tools: 

240 raise KeyError( 

241 f"Tool {name!r} not in registry. " 

242 f"Known tools: {sorted(self._tools)}", 

243 ) 

244 return self._tools[name] 

245 

246 def get_or_none(self, name: str) -> ManifestTool | None: 

247 """Get a tool by name, returning None if not found. 

248 

249 Args: 

250 name: Tool name. 

251 

252 Returns: 

253 ManifestTool or None. 

254 """ 

255 return self._tools.get(name) 

256 

257 def all_tools(self, *, include_dev: bool = False) -> list[ManifestTool]: 

258 """Get all tools, optionally including dev-tier tools. 

259 

260 Args: 

261 include_dev: If True, include tools with tier="dev". 

262 

263 Returns: 

264 List of ManifestTool sorted by name. 

265 """ 

266 all_vals = list(self._tools.values()) 

267 if not include_dev: 

268 all_vals = [t for t in all_vals if t.tier != "dev"] 

269 return sorted(all_vals, key=lambda t: t.name) 

270 

271 def tools_for_languages(self, langs: list[str]) -> list[ManifestTool]: 

272 """Get tools recommended for the given languages/ecosystems. 

273 

274 Uses the language_map to resolve language names to tool lists, 

275 then returns the union of all matching tools. 

276 

277 Args: 

278 langs: List of language/ecosystem names (e.g., ["python", "docker"]). 

279 

280 Returns: 

281 Deduplicated, sorted list of ManifestTool. 

282 """ 

283 tool_names: set[str] = set() 

284 for lang in langs: 

285 lang_lower = lang.lower() 

286 if lang_lower in self._language_map: 

287 tool_names.update(self._language_map[lang_lower]) 

288 

289 # Always include security tools; yaml/markdown/toml only when detected 

290 if "security" in self._language_map: 

291 tool_names.update(self._language_map["security"]) 

292 

293 return sorted( 

294 [self._tools[n] for n in tool_names if n in self._tools], 

295 key=lambda t: t.name, 

296 ) 

297 

298 def tools_for_profile( 

299 self, 

300 profile_name: str, 

301 detected_langs: list[str] | None = None, 

302 ) -> list[ManifestTool]: 

303 """Resolve a profile to a concrete tool list. 

304 

305 Args: 

306 profile_name: Profile name (e.g., "minimal", "recommended"). 

307 detected_langs: Detected languages (for "auto-detect" strategy). 

308 

309 Returns: 

310 List of ManifestTool for the profile. 

311 

312 Raises: 

313 KeyError: If profile is not defined. 

314 """ 

315 if profile_name not in self._profiles: 

316 raise KeyError( 

317 f"Profile {profile_name!r} not found. " 

318 f"Available: {sorted(self._profiles)}", 

319 ) 

320 

321 profile = self._profiles[profile_name] 

322 

323 if profile.strategy == "explicit": 

324 return sorted( 

325 [self._tools[n] for n in profile.tools if n in self._tools], 

326 key=lambda t: t.name, 

327 ) 

328 

329 if profile.strategy == "auto-detect": 

330 if not detected_langs: 

331 # Fall back to minimal if no languages detected, guarding 

332 # against infinite recursion if minimal is also auto-detect. 

333 if ( 

334 "minimal" not in self._profiles 

335 or self._profiles["minimal"].strategy == "auto-detect" 

336 ): 

337 return [] 

338 return self.tools_for_profile("minimal") 

339 return self.tools_for_languages(detected_langs) 

340 

341 if profile.strategy == "all": 

342 return self.all_tools(include_dev=True) 

343 

344 if profile.strategy == "filter": 

345 # Start with recommended tools, then exclude tools whose tags 

346 # are a subset of the exclude set (pure formatters are excluded, 

347 # but tools that are both linter+formatter are kept). 

348 exclude = set(profile.exclude_types) 

349 base = self.tools_for_profile("recommended", detected_langs) 

350 return [t for t in base if not t.tags or not set(t.tags).issubset(exclude)] 

351 

352 _logger.warning( 

353 "Unknown profile strategy %r for profile %r", 

354 profile.strategy, 

355 profile.name, 

356 ) 

357 return [] 

358 

359 def tools_by_category(self) -> dict[str, list[ManifestTool]]: 

360 """Group all tools by their display category. 

361 

362 Returns: 

363 Dict mapping category name to list of tools. 

364 Keys are ordered: bundled, npm, external. 

365 """ 

366 groups: dict[str, list[ManifestTool]] = {} 

367 for tool in sorted(self._tools.values(), key=lambda t: t.name): 

368 groups.setdefault(tool.category, []).append(tool) 

369 

370 # Return in display order 

371 ordered: dict[str, list[ManifestTool]] = {} 

372 for cat in ("bundled", "npm", "external"): 

373 if cat in groups: 

374 ordered[cat] = groups[cat] 

375 # Add any remaining categories 

376 for cat, tools in groups.items(): 

377 if cat not in ordered: 

378 ordered[cat] = tools 

379 return ordered 

380 

381 def version_command(self, name: str) -> tuple[str, ...]: 

382 """Get the version check command for a tool. 

383 

384 Args: 

385 name: Tool name. 

386 

387 Returns: 

388 Command tuple (e.g., ("ruff", "--version")). 

389 """ 

390 return self.get(name).version_command 

391 

392 @property 

393 def profile_names(self) -> list[str]: 

394 """Get all available profile names.""" 

395 return sorted(self._profiles) 

396 

397 @property 

398 def profiles(self) -> dict[str, ProfileDefinition]: 

399 """Get all profile definitions.""" 

400 return dict(self._profiles) 

401 

402 @property 

403 def language_map(self) -> dict[str, list[str]]: 

404 """Get the language-to-tools mapping.""" 

405 return dict(self._language_map) 

406 

407 def __len__(self) -> int: 

408 """Return the number of tools in the registry.""" 

409 return len(self._tools) 

410 

411 def __contains__(self, name: str) -> bool: 

412 """Check if a tool is in the registry.""" 

413 return name in self._tools