Coverage for lintro / plugins / base.py: 97%
131 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Base implementation for Lintro plugins.
3This module provides the BaseToolPlugin class, which implements common
4functionality for all Lintro tool plugins.
6Example:
7 >>> from lintro.plugins.base import BaseToolPlugin
8 >>> from lintro.plugins.protocol import ToolDefinition
9 >>> from lintro.plugins.registry import register_tool
10 >>>
11 >>> @register_tool
12 ... class MyPlugin(BaseToolPlugin):
13 ... @property
14 ... def definition(self) -> ToolDefinition:
15 ... return ToolDefinition(name="my-tool", description="My tool")
16 ...
17 ... def check(self, paths, options):
18 ... # Implementation
19 ... pass
20"""
22from __future__ import annotations
24from abc import ABC, abstractmethod
25from dataclasses import dataclass, field
26from typing import TYPE_CHECKING, Any
28import click
29from loguru import logger
31from lintro.config.lintro_config import LintroConfig
32from lintro.models.core.tool_result import ToolResult
33from lintro.plugins.execution_preparation import (
34 DEFAULT_TIMEOUT,
35 build_config_args,
36 get_defaults_config_args,
37 get_effective_timeout,
38 get_enforce_cli_args,
39 get_enforced_settings,
40 get_executable_command,
41 get_lintro_config,
42 prepare_execution,
43 should_use_lintro_config,
44 verify_tool_version,
45)
46from lintro.plugins.file_discovery import (
47 DEFAULT_EXCLUDE_PATTERNS,
48 discover_files,
49 get_cwd,
50 setup_exclude_patterns,
51 validate_paths,
52)
53from lintro.plugins.protocol import ToolDefinition
54from lintro.plugins.subprocess_executor import (
55 run_subprocess,
56 run_subprocess_streaming,
57 validate_subprocess_command,
58)
60if TYPE_CHECKING:
61 from collections.abc import Callable
63 from lintro.plugins.file_processor import AggregatedResult, FileProcessingResult
66@dataclass
67class ExecutionContext:
68 """Context for tool execution containing prepared files and metadata.
70 This dataclass encapsulates the common preparation steps needed before
71 running a tool, eliminating duplicate boilerplate across tool implementations.
73 Attributes:
74 files: List of absolute file paths to process.
75 rel_files: List of file paths relative to cwd.
76 cwd: Working directory for command execution.
77 early_result: If set, return this result immediately.
78 timeout: Timeout value for subprocess execution.
79 """
81 files: list[str] = field(default_factory=list)
82 rel_files: list[str] = field(default_factory=list)
83 cwd: str | None = None
84 early_result: ToolResult | None = None
85 timeout: int = DEFAULT_TIMEOUT
87 @property
88 def should_skip(self) -> bool:
89 """Check if execution should be skipped due to early result.
91 Returns:
92 True if early_result is set and execution should be skipped.
93 """
94 return self.early_result is not None
97@dataclass
98class BaseToolPlugin(ABC):
99 """Base class providing common functionality for tool plugins.
101 This class implements the boilerplate that most tools need:
102 - Subprocess execution with safety validation
103 - File discovery and filtering
104 - Version checking
105 - Config injection support
106 - Working directory computation
108 Subclasses must implement:
109 - definition property: Return a ToolDefinition with tool metadata
110 - check() method: Check files for issues
112 Optionally override:
113 - fix() method: Fix issues (only if definition.can_fix=True)
114 - set_options() method: Custom option validation
116 Attributes:
117 options: Current tool options (merged from defaults and runtime).
118 exclude_patterns: Patterns to exclude from file discovery.
119 include_venv: Whether to include virtual environment files.
120 """
122 options: dict[str, object] = field(default_factory=dict, init=False)
123 exclude_patterns: list[str] = field(default_factory=list, init=False)
124 include_venv: bool = field(default=False, init=False)
126 def __post_init__(self) -> None:
127 """Initialize plugin with defaults from definition."""
128 # Initialize options from definition defaults
129 self.options = dict(self.definition.default_options)
131 # Set up exclude patterns
132 self._setup_defaults()
134 @property
135 @abstractmethod
136 def definition(self) -> ToolDefinition:
137 """Return the tool definition.
139 Must be implemented by subclasses.
141 Returns:
142 ToolDefinition containing all tool metadata.
143 """
144 ...
146 @property
147 def name(self) -> str:
148 """Return the tool name from definition.
150 Returns:
151 str: Tool name.
152 """
153 return self.definition.name
155 # -------------------------------------------------------------------------
156 # Public API
157 # -------------------------------------------------------------------------
159 def reset_options(self) -> None:
160 """Reset options back to definition defaults.
162 Clears accumulated state from prior ``set_options()`` calls so
163 the same plugin instance can be reused across runs without
164 leaking mutated configuration.
165 """
166 self.options = dict(self.definition.default_options)
167 self.exclude_patterns = []
168 self.include_venv = False
169 self._setup_defaults()
171 def set_options(self, **kwargs: Any) -> None:
172 """Set tool-specific options.
174 Args:
175 **kwargs: Tool-specific options.
177 Raises:
178 ValueError: If an option value is invalid.
179 """
180 from lintro.enums.tool_option_key import ToolOptionKey
182 for key, value in kwargs.items():
183 if key == ToolOptionKey.TIMEOUT.value:
184 if value is not None and not isinstance(value, (int, float)):
185 raise ValueError("Timeout must be a number or None")
186 kwargs[key] = float(value) if value is not None else None
187 if key == ToolOptionKey.EXCLUDE_PATTERNS.value and not isinstance(
188 value,
189 list,
190 ):
191 raise ValueError("Exclude patterns must be a list")
192 if key == ToolOptionKey.INCLUDE_VENV.value and not isinstance(value, bool):
193 raise ValueError("Include venv must be a boolean")
195 self.options.update(kwargs)
197 # Update specific attributes — merge CLI patterns with existing
198 # defaults and .lintro-ignore patterns instead of replacing them
199 if ToolOptionKey.EXCLUDE_PATTERNS.value in kwargs:
200 patterns = kwargs[ToolOptionKey.EXCLUDE_PATTERNS.value]
201 if isinstance(patterns, list):
202 seen = set(self.exclude_patterns)
203 for p in patterns:
204 if p not in seen:
205 self.exclude_patterns.append(p)
206 seen.add(p)
207 if ToolOptionKey.INCLUDE_VENV.value in kwargs:
208 self.include_venv = bool(kwargs[ToolOptionKey.INCLUDE_VENV.value])
210 def doc_url(self, _code: str) -> str | None:
211 """Return a documentation URL for the given rule code.
213 Override in subclasses to provide tool-specific documentation links.
215 Args:
216 _code: The rule/error code (e.g., "E501", "SC2086").
218 Returns:
219 Documentation URL string, or None if no docs are available.
220 """
221 return None
223 @abstractmethod
224 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult:
225 """Check files for issues.
227 Args:
228 paths: List of file or directory paths to check.
229 options: Tool-specific options that override defaults.
231 Returns:
232 ToolResult containing check results and any issues found.
233 """
234 ...
236 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult:
237 """Fix issues in files.
239 Default implementation raises NotImplementedError if can_fix=False.
240 Override in subclasses that support fixing.
242 Args:
243 paths: List of file or directory paths to fix.
244 options: Tool-specific options that override defaults.
246 Returns:
247 ToolResult containing fix results and any remaining issues.
249 Raises:
250 NotImplementedError: If the tool doesn't support fixing.
251 """
252 if not self.definition.can_fix:
253 raise NotImplementedError(
254 f"{self.definition.name} does not support fixing issues",
255 )
256 raise NotImplementedError("Subclass must implement fix()")
258 # -------------------------------------------------------------------------
259 # Protected Methods - For use by subclasses
260 # -------------------------------------------------------------------------
262 def _setup_defaults(self) -> None:
263 """Set up default options and patterns."""
264 self.exclude_patterns = setup_exclude_patterns(self.exclude_patterns)
266 # Set default timeout if not specified
267 if "timeout" not in self.options:
268 self.options["timeout"] = self.definition.default_timeout
270 def _discover_files(
271 self,
272 paths: list[str],
273 show_progress: bool = True,
274 ) -> list[str]:
275 """Discover files matching the tool's patterns.
277 Args:
278 paths: Input paths to search.
279 show_progress: Whether to show a progress spinner during discovery.
281 Returns:
282 List of matching file paths.
283 """
284 return discover_files(
285 paths=paths,
286 definition=self.definition,
287 exclude_patterns=self.exclude_patterns,
288 include_venv=self.include_venv,
289 show_progress=show_progress,
290 )
292 def _run_subprocess(
293 self,
294 cmd: list[str],
295 timeout: int | float | None = None,
296 cwd: str | None = None,
297 env: dict[str, str] | None = None,
298 ) -> tuple[bool, str]:
299 """Run a subprocess command safely.
301 Args:
302 cmd: Command and arguments to run.
303 timeout: Timeout in seconds (defaults to tool's timeout).
304 cwd: Working directory for command execution.
305 env: Environment variables for the subprocess.
307 Returns:
308 Tuple of (success, output) where success indicates return code 0.
309 """
310 effective_timeout = self._get_effective_timeout(timeout)
311 return run_subprocess(cmd, effective_timeout, cwd, env)
313 def _run_subprocess_streaming(
314 self,
315 cmd: list[str],
316 timeout: int | float | None = None,
317 cwd: str | None = None,
318 env: dict[str, str] | None = None,
319 line_handler: Callable[[str], None] | None = None,
320 ) -> tuple[bool, str]:
321 """Run a subprocess command with optional line-by-line streaming.
323 This method allows real-time output processing by calling the line_handler
324 callback for each line of output as it is produced by the subprocess.
326 Args:
327 cmd: Command and arguments to run.
328 timeout: Timeout in seconds (defaults to tool's timeout).
329 cwd: Working directory for command execution.
330 env: Environment variables for the subprocess.
331 line_handler: Optional callback called for each line of output.
333 Returns:
334 Tuple of (success, output) where success indicates return code 0.
335 """
336 effective_timeout = self._get_effective_timeout(timeout)
337 return run_subprocess_streaming(cmd, effective_timeout, cwd, env, line_handler)
339 def _get_effective_timeout(self, timeout: int | float | None = None) -> float:
340 """Get the effective timeout value.
342 Args:
343 timeout: Override timeout value, or None to use default.
345 Returns:
346 Timeout value in seconds.
347 """
348 return get_effective_timeout(
349 timeout,
350 self.options,
351 self.definition.default_timeout,
352 )
354 def _validate_subprocess_command(self, cmd: list[str]) -> None:
355 """Validate a subprocess command for safety.
357 Args:
358 cmd: Command and arguments to validate.
359 """
360 validate_subprocess_command(cmd)
362 def _validate_paths(self, paths: list[str]) -> None:
363 """Validate that paths exist and are accessible.
365 Args:
366 paths: Paths to validate.
367 """
368 validate_paths(paths)
370 def _get_cwd(self, paths: list[str]) -> str | None:
371 """Get common parent directory for paths.
373 Args:
374 paths: Paths to compute common parent for.
376 Returns:
377 Common parent directory path, or None if not applicable.
378 """
379 return get_cwd(paths)
381 def _prepare_execution(
382 self,
383 paths: list[str],
384 options: dict[str, object],
385 *,
386 no_files_message: str = "No files to check.",
387 ) -> ExecutionContext:
388 """Prepare execution context with common boilerplate steps.
390 This method consolidates repeated patterns:
391 1. Merge options with defaults
392 2. Validate input paths
393 3. Discover files matching patterns (returns early if none found)
394 4. Verify tool version requirements (skipped when no files match)
395 5. Compute working directory and relative paths
396 6. Calculate timeout based on provided options
398 Args:
399 paths: Input paths to process.
400 options: Runtime options to merge with defaults.
401 no_files_message: Message when no files are found.
403 Returns:
404 ExecutionContext with files, cwd, and optional early_result.
406 Example:
407 ctx = self._prepare_execution(paths, options)
408 if ctx.should_skip:
409 return ctx.early_result
411 cmd = self._build_command(ctx.rel_files)
412 success, output = self._run_subprocess(cmd, cwd=ctx.cwd)
413 """
414 logger.debug(f"[{self.name}] Preparing execution for {len(paths)} input paths")
416 result = prepare_execution(
417 paths=paths,
418 options=options,
419 definition=self.definition,
420 exclude_patterns=self.exclude_patterns,
421 include_venv=self.include_venv,
422 current_options=self.options,
423 no_files_message=no_files_message,
424 )
426 if "early_result" in result:
427 early_result = result["early_result"]
428 logger.debug(f"[{self.name}] Early exit: {early_result.output}")
429 return ExecutionContext(early_result=early_result)
431 files = result.get("files", [])
432 timeout = result.get("timeout", DEFAULT_TIMEOUT)
433 logger.debug(f"[{self.name}] Ready: {len(files)} files, timeout={timeout}s")
435 return ExecutionContext(
436 files=files,
437 rel_files=result.get("rel_files", []),
438 cwd=result.get("cwd"),
439 timeout=timeout,
440 )
442 def _process_files_with_progress(
443 self,
444 files: list[str],
445 processor: Callable[[str], FileProcessingResult],
446 timeout: int,
447 *,
448 label: str = "Processing files",
449 progress_threshold: int = 2,
450 ) -> AggregatedResult:
451 """Process files with optional progress bar.
453 This method handles the common pattern of iterating through files,
454 calling a processor function for each file, and aggregating results.
455 It shows a progress bar when processing multiple files.
457 Args:
458 files: List of file paths to process.
459 processor: Callable that processes a single file and returns
460 FileProcessingResult. The processor should handle its own
461 exceptions and return appropriate FileProcessingResult.
462 timeout: Timeout for each file operation (included in output).
463 label: Label for progress bar.
464 progress_threshold: Minimum files to show progress bar.
466 Returns:
467 AggregatedResult with all file processing results.
469 Example:
470 def process_file(path: str) -> FileProcessingResult:
471 try:
472 success, output = self._run_subprocess(cmd + [path])
473 issues = parse_output(output)
474 return FileProcessingResult(
475 success=success,
476 output=output,
477 issues=issues,
478 )
479 except subprocess.TimeoutExpired:
480 return FileProcessingResult(
481 success=False,
482 output="",
483 issues=[],
484 skipped=True,
485 )
487 result = self._process_files_with_progress(
488 files=ctx.files,
489 processor=process_file,
490 timeout=ctx.timeout,
491 )
492 """
493 from lintro.plugins.file_processor import AggregatedResult
495 aggregated = AggregatedResult()
497 if len(files) >= progress_threshold:
498 with click.progressbar(
499 files,
500 label=label,
501 bar_template="%(label)s %(info)s",
502 ) as bar:
503 for file_path in bar:
504 result = processor(file_path)
505 aggregated.add_file_result(file_path, result)
506 else:
507 for file_path in files:
508 result = processor(file_path)
509 aggregated.add_file_result(file_path, result)
511 return aggregated
513 def _get_executable_command(self, tool_name: str) -> list[str]:
514 """Get the command prefix to execute a tool.
516 Delegates to CommandBuilderRegistry for language-specific logic.
517 This satisfies ISP by keeping BaseToolPlugin language-agnostic.
519 Args:
520 tool_name: Name of the tool executable.
522 Returns:
523 Command prefix list.
524 """
525 return get_executable_command(tool_name)
527 def _verify_tool_version(self) -> ToolResult | None:
528 """Verify that the tool meets minimum version requirements.
530 Returns:
531 None if version check passes, or a skip result if it fails.
532 """
533 return verify_tool_version(self.definition)
535 # -------------------------------------------------------------------------
536 # Lintro Config Support
537 # -------------------------------------------------------------------------
539 def _get_lintro_config(self) -> LintroConfig:
540 """Get the current Lintro configuration.
542 Returns:
543 The current LintroConfig instance.
544 """
545 return get_lintro_config()
547 def _get_enforced_settings(self) -> dict[str, object]:
548 """Get enforced settings as a dictionary.
550 Returns:
551 Dictionary of enforced settings.
552 """
553 return get_enforced_settings(lintro_config=self._get_lintro_config())
555 def _get_enforce_cli_args(self) -> list[str]:
556 """Get CLI arguments for enforced settings.
558 Returns:
559 List of CLI arguments for enforced settings.
560 """
561 return get_enforce_cli_args(
562 tool_name=self.definition.name,
563 lintro_config=self._get_lintro_config(),
564 )
566 def _get_defaults_config_args(self) -> list[str]:
567 """Get CLI arguments for defaults config injection.
569 Returns:
570 List of CLI arguments for defaults config.
571 """
572 return get_defaults_config_args(
573 tool_name=self.definition.name,
574 lintro_config=self._get_lintro_config(),
575 )
577 def _should_use_lintro_config(self) -> bool:
578 """Check if Lintro config should be used for this tool.
580 Returns:
581 True if Lintro config should be used.
582 """
583 return should_use_lintro_config(tool_name=self.definition.name)
585 def _build_config_args(self) -> list[str]:
586 """Build combined CLI arguments for config injection.
588 Returns:
589 List of combined CLI arguments for config.
590 """
591 return build_config_args(
592 tool_name=self.definition.name,
593 lintro_config=self._get_lintro_config(),
594 )
597__all__ = [
598 "DEFAULT_EXCLUDE_PATTERNS",
599 "DEFAULT_TIMEOUT",
600 "BaseToolPlugin",
601 "ExecutionContext",
602]