Coverage for lintro / tools / definitions / black.py: 81%
140 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Black tool definition.
3Black is an opinionated Python code formatter. It enforces a consistent style
4by parsing Python code and re-printing it with its own rules, ensuring uniformity
5across projects.
6"""
8from __future__ import annotations
10import subprocess # nosec B404 - used safely with shell disabled
11from dataclasses import dataclass
12from typing import Any
14from loguru import logger
16from lintro.enums.tool_type import ToolType
17from lintro.models.core.tool_result import ToolResult
18from lintro.parsers.black.black_issue import BlackIssue
19from lintro.parsers.black.black_parser import parse_black_output
20from lintro.plugins.base import BaseToolPlugin
21from lintro.plugins.protocol import ToolDefinition
22from lintro.plugins.registry import register_tool
23from lintro.tools.core.option_validators import (
24 filter_none_options,
25 validate_bool,
26 validate_int,
27 validate_str,
28)
30# Constants for Black configuration
31BLACK_DEFAULT_TIMEOUT: int = 30
32BLACK_DEFAULT_PRIORITY: int = 90 # Prefer Black ahead of Ruff formatting
33BLACK_FILE_PATTERNS: list[str] = ["*.py", "*.pyi"]
36@register_tool
37@dataclass
38class BlackPlugin(BaseToolPlugin):
39 """Black Python formatter plugin.
41 This plugin integrates Black with Lintro for formatting Python files.
42 """
44 @property
45 def definition(self) -> ToolDefinition:
46 """Return the tool definition.
48 Returns:
49 ToolDefinition containing tool metadata.
50 """
51 return ToolDefinition(
52 name="black",
53 description="Opinionated Python code formatter",
54 can_fix=True,
55 tool_type=ToolType.FORMATTER,
56 file_patterns=BLACK_FILE_PATTERNS,
57 priority=BLACK_DEFAULT_PRIORITY,
58 conflicts_with=[],
59 native_configs=["pyproject.toml"],
60 version_command=["black", "--version"],
61 min_version="24.0.0",
62 default_options={
63 "timeout": BLACK_DEFAULT_TIMEOUT,
64 "line_length": None,
65 "target_version": None,
66 "fast": False,
67 "preview": False,
68 "diff": False,
69 },
70 default_timeout=BLACK_DEFAULT_TIMEOUT,
71 )
73 def set_options(
74 self,
75 line_length: int | None = None,
76 target_version: str | None = None,
77 fast: bool | None = None,
78 preview: bool | None = None,
79 diff: bool | None = None,
80 **kwargs: Any,
81 ) -> None:
82 """Set Black-specific options with validation.
84 Args:
85 line_length: Optional line length override.
86 target_version: String per Black CLI (e.g., "py313").
87 fast: Use --fast mode (skip safety checks).
88 preview: Enable preview style.
89 diff: Show diffs in output when formatting.
90 **kwargs: Additional base options.
91 """
92 validate_int(line_length, "line_length")
93 validate_str(target_version, "target_version")
94 validate_bool(fast, "fast")
95 validate_bool(preview, "preview")
96 validate_bool(diff, "diff")
98 options = filter_none_options(
99 line_length=line_length,
100 target_version=target_version,
101 fast=fast,
102 preview=preview,
103 diff=diff,
104 )
105 super().set_options(**options, **kwargs)
107 def _build_common_args(self) -> list[str]:
108 """Build common CLI arguments for Black.
110 Returns:
111 CLI arguments for Black.
112 """
113 args: list[str] = []
115 # Try Lintro config injection first
116 config_args = self._build_config_args()
117 if config_args:
118 args.extend(config_args)
119 else:
120 if self.options.get("line_length"):
121 args.extend(["--line-length", str(self.options["line_length"])])
122 if self.options.get("target_version"):
123 args.extend(["--target-version", str(self.options["target_version"])])
125 if self.options.get("fast"):
126 args.append("--fast")
127 if self.options.get("preview"):
128 args.append("--preview")
129 return args
131 def _check_line_length_violations(
132 self,
133 files: list[str],
134 cwd: str | None,
135 ) -> list[BlackIssue]:
136 """Check for line length violations using the shared line-length checker.
138 Args:
139 files: List of file paths to check.
140 cwd: Working directory for the check.
142 Returns:
143 List of line length violations converted to BlackIssue objects.
144 """
145 if not files:
146 return []
148 from lintro.tools.core.line_length_checker import check_line_length_violations
150 line_length_opt = self.options.get("line_length")
151 timeout_opt = self.options.get("timeout", BLACK_DEFAULT_TIMEOUT)
152 line_length_val: int | None = None
153 if isinstance(line_length_opt, int):
154 line_length_val = line_length_opt
155 elif line_length_opt is not None:
156 line_length_val = int(str(line_length_opt))
157 if isinstance(timeout_opt, int):
158 timeout_val = timeout_opt
159 elif timeout_opt is not None:
160 timeout_val = int(str(timeout_opt))
161 else:
162 timeout_val = BLACK_DEFAULT_TIMEOUT
164 violations = check_line_length_violations(
165 files=files,
166 cwd=cwd,
167 line_length=line_length_val,
168 timeout=timeout_val,
169 )
171 black_issues: list[BlackIssue] = []
172 for violation in violations:
173 message = (
174 f"Line {violation.line} exceeds line length limit "
175 f"({violation.message})"
176 )
177 black_issues.append(
178 BlackIssue(
179 file=violation.file,
180 line=violation.line,
181 column=violation.column,
182 code=violation.code,
183 message=message,
184 severity="error",
185 fixable=False,
186 ),
187 )
189 return black_issues
191 def _handle_timeout_error(
192 self,
193 timeout_val: int,
194 initial_count: int | None = None,
195 cwd: str | None = None,
196 ) -> ToolResult:
197 """Handle timeout errors consistently.
199 Args:
200 timeout_val: The timeout value that was exceeded.
201 initial_count: Optional initial issues count for fix operations.
202 cwd: Working directory for the tool result.
204 Returns:
205 Standardized timeout error result.
206 """
207 timeout_msg = (
208 f"Black execution timed out ({timeout_val}s limit exceeded).\n\n"
209 "This may indicate:\n"
210 " - Large codebase taking too long to process\n"
211 " - Need to increase timeout via --tool-options black:timeout=N"
212 )
213 if initial_count is not None:
214 return ToolResult(
215 name=self.definition.name,
216 success=False,
217 output=timeout_msg,
218 issues_count=0,
219 issues=[],
220 initial_issues_count=initial_count,
221 cwd=cwd,
222 )
223 return ToolResult(
224 name=self.definition.name,
225 success=False,
226 output=timeout_msg,
227 issues_count=1,
228 issues=[],
229 cwd=cwd,
230 )
232 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult:
233 """Check files using Black without applying changes.
235 Args:
236 paths: List of file or directory paths to check.
237 options: Runtime options that override defaults.
239 Returns:
240 ToolResult with check results.
241 """
242 # Merge runtime options
243 merged_options = dict(self.options)
244 merged_options.update(options)
246 # Use shared preparation for version check, path validation, file discovery
247 ctx = self._prepare_execution(paths, options)
248 if ctx.should_skip:
249 return ctx.early_result # type: ignore[return-value]
251 cmd: list[str] = self._get_executable_command(tool_name="black") + ["--check"]
252 cmd.extend(self._build_common_args())
253 cmd.extend(ctx.rel_files)
255 logger.debug(f"[BlackPlugin] Running: {' '.join(cmd)} (cwd={ctx.cwd})")
256 try:
257 success, output = self._run_subprocess(
258 cmd=cmd,
259 timeout=ctx.timeout,
260 cwd=ctx.cwd,
261 )
262 except subprocess.TimeoutExpired:
263 return self._handle_timeout_error(ctx.timeout, cwd=ctx.cwd)
265 black_issues = parse_black_output(output=output)
267 # Check for line length violations that Black cannot wrap
268 line_length_issues = self._check_line_length_violations(
269 files=ctx.rel_files,
270 cwd=ctx.cwd,
271 )
273 all_issues = black_issues + line_length_issues
274 count = len(all_issues)
276 return ToolResult(
277 name=self.definition.name,
278 success=(success and count == 0),
279 output=None if count == 0 else output,
280 issues_count=count,
281 issues=all_issues,
282 cwd=ctx.cwd,
283 )
285 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult:
286 """Format files using Black.
288 Args:
289 paths: List of file or directory paths to format.
290 options: Runtime options that override defaults.
292 Returns:
293 ToolResult with fix results.
294 """
295 # Merge runtime options
296 merged_options = dict(self.options)
297 merged_options.update(options)
299 # Use shared preparation for version check, path validation, file discovery
300 ctx = self._prepare_execution(
301 paths,
302 options,
303 no_files_message="No files to format.",
304 )
305 if ctx.should_skip:
306 return ctx.early_result # type: ignore[return-value]
308 # Build reusable check command
309 check_cmd: list[str] = self._get_executable_command(tool_name="black") + [
310 "--check",
311 ]
312 check_cmd.extend(self._build_common_args())
313 check_cmd.extend(ctx.rel_files)
315 if self.options.get("diff"):
316 initial_issues = []
317 initial_line_length_issues = []
318 initial_count = 0
319 else:
320 try:
321 _, check_output = self._run_subprocess(
322 cmd=check_cmd,
323 timeout=ctx.timeout,
324 cwd=ctx.cwd,
325 )
326 except subprocess.TimeoutExpired:
327 return self._handle_timeout_error(
328 ctx.timeout,
329 initial_count=0,
330 cwd=ctx.cwd,
331 )
332 initial_issues = parse_black_output(output=check_output)
333 initial_line_length_issues = self._check_line_length_violations(
334 files=ctx.rel_files,
335 cwd=ctx.cwd,
336 )
337 initial_issues = initial_issues + initial_line_length_issues
338 initial_count = len(initial_issues)
340 # Apply formatting
341 fix_cmd_base: list[str] = self._get_executable_command(tool_name="black")
342 fix_cmd: list[str] = list(fix_cmd_base)
343 if self.options.get("diff"):
344 fix_cmd.append("--diff")
345 fix_cmd.extend(self._build_common_args())
346 fix_cmd.extend(ctx.rel_files)
348 logger.debug(f"[BlackPlugin] Fixing: {' '.join(fix_cmd)} (cwd={ctx.cwd})")
349 try:
350 _, fix_output = self._run_subprocess(
351 cmd=fix_cmd,
352 timeout=ctx.timeout,
353 cwd=ctx.cwd,
354 )
355 except subprocess.TimeoutExpired:
356 return self._handle_timeout_error(
357 ctx.timeout,
358 initial_count=initial_count,
359 cwd=ctx.cwd,
360 )
362 # Final check for remaining differences
363 try:
364 final_success, final_output = self._run_subprocess(
365 cmd=check_cmd,
366 timeout=ctx.timeout,
367 cwd=ctx.cwd,
368 )
369 except subprocess.TimeoutExpired:
370 return self._handle_timeout_error(
371 ctx.timeout,
372 initial_count=initial_count,
373 cwd=ctx.cwd,
374 )
375 remaining_issues = parse_black_output(output=final_output)
377 # Check for line length violations that Black cannot wrap
378 line_length_issues = self._check_line_length_violations(
379 files=ctx.rel_files,
380 cwd=ctx.cwd,
381 )
383 all_remaining_issues = remaining_issues + line_length_issues
384 remaining_count = len(all_remaining_issues)
386 fixed_issues_parsed = parse_black_output(output=fix_output)
387 fixed_count = max(0, initial_count - remaining_count)
389 # Build summary
390 summary: list[str] = []
391 if fixed_count > 0:
392 summary.append(f"Fixed {fixed_count} issue(s)")
393 if remaining_count > 0:
394 summary.append(
395 f"Found {remaining_count} issue(s) that cannot be auto-fixed",
396 )
397 final_summary = "\n".join(summary) if summary else "No fixes applied."
399 all_issues = (fixed_issues_parsed or []) + all_remaining_issues
401 return ToolResult(
402 name=self.definition.name,
403 success=(remaining_count == 0),
404 output=final_summary,
405 issues_count=remaining_count,
406 issues=all_issues,
407 initial_issues_count=initial_count,
408 fixed_issues_count=fixed_count,
409 remaining_issues_count=remaining_count,
410 initial_issues=initial_issues if initial_issues else None,
411 cwd=ctx.cwd,
412 )