Coverage for lintro / plugins / base.py: 97%

131 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Base implementation for Lintro plugins. 

2 

3This module provides the BaseToolPlugin class, which implements common 

4functionality for all Lintro tool plugins. 

5 

6Example: 

7 >>> from lintro.plugins.base import BaseToolPlugin 

8 >>> from lintro.plugins.protocol import ToolDefinition 

9 >>> from lintro.plugins.registry import register_tool 

10 >>> 

11 >>> @register_tool 

12 ... class MyPlugin(BaseToolPlugin): 

13 ... @property 

14 ... def definition(self) -> ToolDefinition: 

15 ... return ToolDefinition(name="my-tool", description="My tool") 

16 ... 

17 ... def check(self, paths, options): 

18 ... # Implementation 

19 ... pass 

20""" 

21 

22from __future__ import annotations 

23 

24from abc import ABC, abstractmethod 

25from dataclasses import dataclass, field 

26from typing import TYPE_CHECKING, Any 

27 

28import click 

29from loguru import logger 

30 

31from lintro.config.lintro_config import LintroConfig 

32from lintro.models.core.tool_result import ToolResult 

33from lintro.plugins.execution_preparation import ( 

34 DEFAULT_TIMEOUT, 

35 build_config_args, 

36 get_defaults_config_args, 

37 get_effective_timeout, 

38 get_enforce_cli_args, 

39 get_enforced_settings, 

40 get_executable_command, 

41 get_lintro_config, 

42 prepare_execution, 

43 should_use_lintro_config, 

44 verify_tool_version, 

45) 

46from lintro.plugins.file_discovery import ( 

47 DEFAULT_EXCLUDE_PATTERNS, 

48 discover_files, 

49 get_cwd, 

50 setup_exclude_patterns, 

51 validate_paths, 

52) 

53from lintro.plugins.protocol import ToolDefinition 

54from lintro.plugins.subprocess_executor import ( 

55 run_subprocess, 

56 run_subprocess_streaming, 

57 validate_subprocess_command, 

58) 

59 

60if TYPE_CHECKING: 

61 from collections.abc import Callable 

62 

63 from lintro.plugins.file_processor import AggregatedResult, FileProcessingResult 

64 

65 

66@dataclass 

67class ExecutionContext: 

68 """Context for tool execution containing prepared files and metadata. 

69 

70 This dataclass encapsulates the common preparation steps needed before 

71 running a tool, eliminating duplicate boilerplate across tool implementations. 

72 

73 Attributes: 

74 files: List of absolute file paths to process. 

75 rel_files: List of file paths relative to cwd. 

76 cwd: Working directory for command execution. 

77 early_result: If set, return this result immediately. 

78 timeout: Timeout value for subprocess execution. 

79 """ 

80 

81 files: list[str] = field(default_factory=list) 

82 rel_files: list[str] = field(default_factory=list) 

83 cwd: str | None = None 

84 early_result: ToolResult | None = None 

85 timeout: int = DEFAULT_TIMEOUT 

86 

87 @property 

88 def should_skip(self) -> bool: 

89 """Check if execution should be skipped due to early result. 

90 

91 Returns: 

92 True if early_result is set and execution should be skipped. 

93 """ 

94 return self.early_result is not None 

95 

96 

97@dataclass 

98class BaseToolPlugin(ABC): 

99 """Base class providing common functionality for tool plugins. 

100 

101 This class implements the boilerplate that most tools need: 

102 - Subprocess execution with safety validation 

103 - File discovery and filtering 

104 - Version checking 

105 - Config injection support 

106 - Working directory computation 

107 

108 Subclasses must implement: 

109 - definition property: Return a ToolDefinition with tool metadata 

110 - check() method: Check files for issues 

111 

112 Optionally override: 

113 - fix() method: Fix issues (only if definition.can_fix=True) 

114 - set_options() method: Custom option validation 

115 

116 Attributes: 

117 options: Current tool options (merged from defaults and runtime). 

118 exclude_patterns: Patterns to exclude from file discovery. 

119 include_venv: Whether to include virtual environment files. 

120 """ 

121 

122 options: dict[str, object] = field(default_factory=dict, init=False) 

123 exclude_patterns: list[str] = field(default_factory=list, init=False) 

124 include_venv: bool = field(default=False, init=False) 

125 

126 def __post_init__(self) -> None: 

127 """Initialize plugin with defaults from definition.""" 

128 # Initialize options from definition defaults 

129 self.options = dict(self.definition.default_options) 

130 

131 # Set up exclude patterns 

132 self._setup_defaults() 

133 

134 @property 

135 @abstractmethod 

136 def definition(self) -> ToolDefinition: 

137 """Return the tool definition. 

138 

139 Must be implemented by subclasses. 

140 

141 Returns: 

142 ToolDefinition containing all tool metadata. 

143 """ 

144 ... 

145 

146 @property 

147 def name(self) -> str: 

148 """Return the tool name from definition. 

149 

150 Returns: 

151 str: Tool name. 

152 """ 

153 return self.definition.name 

154 

155 # ------------------------------------------------------------------------- 

156 # Public API 

157 # ------------------------------------------------------------------------- 

158 

159 def reset_options(self) -> None: 

160 """Reset options back to definition defaults. 

161 

162 Clears accumulated state from prior ``set_options()`` calls so 

163 the same plugin instance can be reused across runs without 

164 leaking mutated configuration. 

165 """ 

166 self.options = dict(self.definition.default_options) 

167 self.exclude_patterns = [] 

168 self.include_venv = False 

169 self._setup_defaults() 

170 

171 def set_options(self, **kwargs: Any) -> None: 

172 """Set tool-specific options. 

173 

174 Args: 

175 **kwargs: Tool-specific options. 

176 

177 Raises: 

178 ValueError: If an option value is invalid. 

179 """ 

180 from lintro.enums.tool_option_key import ToolOptionKey 

181 

182 for key, value in kwargs.items(): 

183 if key == ToolOptionKey.TIMEOUT.value: 

184 if value is not None and not isinstance(value, (int, float)): 

185 raise ValueError("Timeout must be a number or None") 

186 kwargs[key] = float(value) if value is not None else None 

187 if key == ToolOptionKey.EXCLUDE_PATTERNS.value and not isinstance( 

188 value, 

189 list, 

190 ): 

191 raise ValueError("Exclude patterns must be a list") 

192 if key == ToolOptionKey.INCLUDE_VENV.value and not isinstance(value, bool): 

193 raise ValueError("Include venv must be a boolean") 

194 

195 self.options.update(kwargs) 

196 

197 # Update specific attributes — merge CLI patterns with existing 

198 # defaults and .lintro-ignore patterns instead of replacing them 

199 if ToolOptionKey.EXCLUDE_PATTERNS.value in kwargs: 

200 patterns = kwargs[ToolOptionKey.EXCLUDE_PATTERNS.value] 

201 if isinstance(patterns, list): 

202 seen = set(self.exclude_patterns) 

203 for p in patterns: 

204 if p not in seen: 

205 self.exclude_patterns.append(p) 

206 seen.add(p) 

207 if ToolOptionKey.INCLUDE_VENV.value in kwargs: 

208 self.include_venv = bool(kwargs[ToolOptionKey.INCLUDE_VENV.value]) 

209 

210 def doc_url(self, _code: str) -> str | None: 

211 """Return a documentation URL for the given rule code. 

212 

213 Override in subclasses to provide tool-specific documentation links. 

214 

215 Args: 

216 _code: The rule/error code (e.g., "E501", "SC2086"). 

217 

218 Returns: 

219 Documentation URL string, or None if no docs are available. 

220 """ 

221 return None 

222 

223 @abstractmethod 

224 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

225 """Check files for issues. 

226 

227 Args: 

228 paths: List of file or directory paths to check. 

229 options: Tool-specific options that override defaults. 

230 

231 Returns: 

232 ToolResult containing check results and any issues found. 

233 """ 

234 ... 

235 

236 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

237 """Fix issues in files. 

238 

239 Default implementation raises NotImplementedError if can_fix=False. 

240 Override in subclasses that support fixing. 

241 

242 Args: 

243 paths: List of file or directory paths to fix. 

244 options: Tool-specific options that override defaults. 

245 

246 Returns: 

247 ToolResult containing fix results and any remaining issues. 

248 

249 Raises: 

250 NotImplementedError: If the tool doesn't support fixing. 

251 """ 

252 if not self.definition.can_fix: 

253 raise NotImplementedError( 

254 f"{self.definition.name} does not support fixing issues", 

255 ) 

256 raise NotImplementedError("Subclass must implement fix()") 

257 

258 # ------------------------------------------------------------------------- 

259 # Protected Methods - For use by subclasses 

260 # ------------------------------------------------------------------------- 

261 

262 def _setup_defaults(self) -> None: 

263 """Set up default options and patterns.""" 

264 self.exclude_patterns = setup_exclude_patterns(self.exclude_patterns) 

265 

266 # Set default timeout if not specified 

267 if "timeout" not in self.options: 

268 self.options["timeout"] = self.definition.default_timeout 

269 

270 def _discover_files( 

271 self, 

272 paths: list[str], 

273 show_progress: bool = True, 

274 ) -> list[str]: 

275 """Discover files matching the tool's patterns. 

276 

277 Args: 

278 paths: Input paths to search. 

279 show_progress: Whether to show a progress spinner during discovery. 

280 

281 Returns: 

282 List of matching file paths. 

283 """ 

284 return discover_files( 

285 paths=paths, 

286 definition=self.definition, 

287 exclude_patterns=self.exclude_patterns, 

288 include_venv=self.include_venv, 

289 show_progress=show_progress, 

290 ) 

291 

292 def _run_subprocess( 

293 self, 

294 cmd: list[str], 

295 timeout: int | float | None = None, 

296 cwd: str | None = None, 

297 env: dict[str, str] | None = None, 

298 ) -> tuple[bool, str]: 

299 """Run a subprocess command safely. 

300 

301 Args: 

302 cmd: Command and arguments to run. 

303 timeout: Timeout in seconds (defaults to tool's timeout). 

304 cwd: Working directory for command execution. 

305 env: Environment variables for the subprocess. 

306 

307 Returns: 

308 Tuple of (success, output) where success indicates return code 0. 

309 """ 

310 effective_timeout = self._get_effective_timeout(timeout) 

311 return run_subprocess(cmd, effective_timeout, cwd, env) 

312 

313 def _run_subprocess_streaming( 

314 self, 

315 cmd: list[str], 

316 timeout: int | float | None = None, 

317 cwd: str | None = None, 

318 env: dict[str, str] | None = None, 

319 line_handler: Callable[[str], None] | None = None, 

320 ) -> tuple[bool, str]: 

321 """Run a subprocess command with optional line-by-line streaming. 

322 

323 This method allows real-time output processing by calling the line_handler 

324 callback for each line of output as it is produced by the subprocess. 

325 

326 Args: 

327 cmd: Command and arguments to run. 

328 timeout: Timeout in seconds (defaults to tool's timeout). 

329 cwd: Working directory for command execution. 

330 env: Environment variables for the subprocess. 

331 line_handler: Optional callback called for each line of output. 

332 

333 Returns: 

334 Tuple of (success, output) where success indicates return code 0. 

335 """ 

336 effective_timeout = self._get_effective_timeout(timeout) 

337 return run_subprocess_streaming(cmd, effective_timeout, cwd, env, line_handler) 

338 

339 def _get_effective_timeout(self, timeout: int | float | None = None) -> float: 

340 """Get the effective timeout value. 

341 

342 Args: 

343 timeout: Override timeout value, or None to use default. 

344 

345 Returns: 

346 Timeout value in seconds. 

347 """ 

348 return get_effective_timeout( 

349 timeout, 

350 self.options, 

351 self.definition.default_timeout, 

352 ) 

353 

354 def _validate_subprocess_command(self, cmd: list[str]) -> None: 

355 """Validate a subprocess command for safety. 

356 

357 Args: 

358 cmd: Command and arguments to validate. 

359 """ 

360 validate_subprocess_command(cmd) 

361 

362 def _validate_paths(self, paths: list[str]) -> None: 

363 """Validate that paths exist and are accessible. 

364 

365 Args: 

366 paths: Paths to validate. 

367 """ 

368 validate_paths(paths) 

369 

370 def _get_cwd(self, paths: list[str]) -> str | None: 

371 """Get common parent directory for paths. 

372 

373 Args: 

374 paths: Paths to compute common parent for. 

375 

376 Returns: 

377 Common parent directory path, or None if not applicable. 

378 """ 

379 return get_cwd(paths) 

380 

381 def _prepare_execution( 

382 self, 

383 paths: list[str], 

384 options: dict[str, object], 

385 *, 

386 no_files_message: str = "No files to check.", 

387 ) -> ExecutionContext: 

388 """Prepare execution context with common boilerplate steps. 

389 

390 This method consolidates repeated patterns: 

391 1. Merge options with defaults 

392 2. Validate input paths 

393 3. Discover files matching patterns (returns early if none found) 

394 4. Verify tool version requirements (skipped when no files match) 

395 5. Compute working directory and relative paths 

396 6. Calculate timeout based on provided options 

397 

398 Args: 

399 paths: Input paths to process. 

400 options: Runtime options to merge with defaults. 

401 no_files_message: Message when no files are found. 

402 

403 Returns: 

404 ExecutionContext with files, cwd, and optional early_result. 

405 

406 Example: 

407 ctx = self._prepare_execution(paths, options) 

408 if ctx.should_skip: 

409 return ctx.early_result 

410 

411 cmd = self._build_command(ctx.rel_files) 

412 success, output = self._run_subprocess(cmd, cwd=ctx.cwd) 

413 """ 

414 logger.debug(f"[{self.name}] Preparing execution for {len(paths)} input paths") 

415 

416 result = prepare_execution( 

417 paths=paths, 

418 options=options, 

419 definition=self.definition, 

420 exclude_patterns=self.exclude_patterns, 

421 include_venv=self.include_venv, 

422 current_options=self.options, 

423 no_files_message=no_files_message, 

424 ) 

425 

426 if "early_result" in result: 

427 early_result = result["early_result"] 

428 logger.debug(f"[{self.name}] Early exit: {early_result.output}") 

429 return ExecutionContext(early_result=early_result) 

430 

431 files = result.get("files", []) 

432 timeout = result.get("timeout", DEFAULT_TIMEOUT) 

433 logger.debug(f"[{self.name}] Ready: {len(files)} files, timeout={timeout}s") 

434 

435 return ExecutionContext( 

436 files=files, 

437 rel_files=result.get("rel_files", []), 

438 cwd=result.get("cwd"), 

439 timeout=timeout, 

440 ) 

441 

442 def _process_files_with_progress( 

443 self, 

444 files: list[str], 

445 processor: Callable[[str], FileProcessingResult], 

446 timeout: int, 

447 *, 

448 label: str = "Processing files", 

449 progress_threshold: int = 2, 

450 ) -> AggregatedResult: 

451 """Process files with optional progress bar. 

452 

453 This method handles the common pattern of iterating through files, 

454 calling a processor function for each file, and aggregating results. 

455 It shows a progress bar when processing multiple files. 

456 

457 Args: 

458 files: List of file paths to process. 

459 processor: Callable that processes a single file and returns 

460 FileProcessingResult. The processor should handle its own 

461 exceptions and return appropriate FileProcessingResult. 

462 timeout: Timeout for each file operation (included in output). 

463 label: Label for progress bar. 

464 progress_threshold: Minimum files to show progress bar. 

465 

466 Returns: 

467 AggregatedResult with all file processing results. 

468 

469 Example: 

470 def process_file(path: str) -> FileProcessingResult: 

471 try: 

472 success, output = self._run_subprocess(cmd + [path]) 

473 issues = parse_output(output) 

474 return FileProcessingResult( 

475 success=success, 

476 output=output, 

477 issues=issues, 

478 ) 

479 except subprocess.TimeoutExpired: 

480 return FileProcessingResult( 

481 success=False, 

482 output="", 

483 issues=[], 

484 skipped=True, 

485 ) 

486 

487 result = self._process_files_with_progress( 

488 files=ctx.files, 

489 processor=process_file, 

490 timeout=ctx.timeout, 

491 ) 

492 """ 

493 from lintro.plugins.file_processor import AggregatedResult 

494 

495 aggregated = AggregatedResult() 

496 

497 if len(files) >= progress_threshold: 

498 with click.progressbar( 

499 files, 

500 label=label, 

501 bar_template="%(label)s %(info)s", 

502 ) as bar: 

503 for file_path in bar: 

504 result = processor(file_path) 

505 aggregated.add_file_result(file_path, result) 

506 else: 

507 for file_path in files: 

508 result = processor(file_path) 

509 aggregated.add_file_result(file_path, result) 

510 

511 return aggregated 

512 

513 def _get_executable_command(self, tool_name: str) -> list[str]: 

514 """Get the command prefix to execute a tool. 

515 

516 Delegates to CommandBuilderRegistry for language-specific logic. 

517 This satisfies ISP by keeping BaseToolPlugin language-agnostic. 

518 

519 Args: 

520 tool_name: Name of the tool executable. 

521 

522 Returns: 

523 Command prefix list. 

524 """ 

525 return get_executable_command(tool_name) 

526 

527 def _verify_tool_version(self) -> ToolResult | None: 

528 """Verify that the tool meets minimum version requirements. 

529 

530 Returns: 

531 None if version check passes, or a skip result if it fails. 

532 """ 

533 return verify_tool_version(self.definition) 

534 

535 # ------------------------------------------------------------------------- 

536 # Lintro Config Support 

537 # ------------------------------------------------------------------------- 

538 

539 def _get_lintro_config(self) -> LintroConfig: 

540 """Get the current Lintro configuration. 

541 

542 Returns: 

543 The current LintroConfig instance. 

544 """ 

545 return get_lintro_config() 

546 

547 def _get_enforced_settings(self) -> dict[str, object]: 

548 """Get enforced settings as a dictionary. 

549 

550 Returns: 

551 Dictionary of enforced settings. 

552 """ 

553 return get_enforced_settings(lintro_config=self._get_lintro_config()) 

554 

555 def _get_enforce_cli_args(self) -> list[str]: 

556 """Get CLI arguments for enforced settings. 

557 

558 Returns: 

559 List of CLI arguments for enforced settings. 

560 """ 

561 return get_enforce_cli_args( 

562 tool_name=self.definition.name, 

563 lintro_config=self._get_lintro_config(), 

564 ) 

565 

566 def _get_defaults_config_args(self) -> list[str]: 

567 """Get CLI arguments for defaults config injection. 

568 

569 Returns: 

570 List of CLI arguments for defaults config. 

571 """ 

572 return get_defaults_config_args( 

573 tool_name=self.definition.name, 

574 lintro_config=self._get_lintro_config(), 

575 ) 

576 

577 def _should_use_lintro_config(self) -> bool: 

578 """Check if Lintro config should be used for this tool. 

579 

580 Returns: 

581 True if Lintro config should be used. 

582 """ 

583 return should_use_lintro_config(tool_name=self.definition.name) 

584 

585 def _build_config_args(self) -> list[str]: 

586 """Build combined CLI arguments for config injection. 

587 

588 Returns: 

589 List of combined CLI arguments for config. 

590 """ 

591 return build_config_args( 

592 tool_name=self.definition.name, 

593 lintro_config=self._get_lintro_config(), 

594 ) 

595 

596 

597__all__ = [ 

598 "DEFAULT_EXCLUDE_PATTERNS", 

599 "DEFAULT_TIMEOUT", 

600 "BaseToolPlugin", 

601 "ExecutionContext", 

602]