Coverage for lintro / tools / definitions / rustfmt.py: 89%
110 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Rustfmt tool definition.
3Rustfmt is Rust's official code formatter. It enforces a consistent style
4by parsing Rust code and re-printing it with its own rules. It runs via
5`cargo fmt` and requires a Cargo.toml file in the project.
6"""
8from __future__ import annotations
10import os
11import subprocess # nosec B404 - used safely with shell disabled
12from dataclasses import dataclass
13from pathlib import Path
14from typing import Any
16from loguru import logger
18from lintro._tool_versions import get_min_version
19from lintro.enums.tool_name import ToolName
20from lintro.enums.tool_type import ToolType
21from lintro.models.core.tool_result import ToolResult
22from lintro.parsers.rustfmt.rustfmt_parser import parse_rustfmt_output
23from lintro.plugins.base import BaseToolPlugin
24from lintro.plugins.protocol import ToolDefinition
25from lintro.plugins.registry import register_tool
26from lintro.tools.core.option_validators import (
27 filter_none_options,
28 validate_positive_int,
29)
30from lintro.tools.core.timeout_utils import (
31 create_timeout_result,
32 run_subprocess_with_timeout,
33)
35# Constants for Rustfmt configuration
36RUSTFMT_DEFAULT_TIMEOUT: int = 60
37RUSTFMT_DEFAULT_PRIORITY: int = 80 # Formatter, runs after linters
38RUSTFMT_FILE_PATTERNS: list[str] = ["*.rs"]
41def _find_cargo_root(paths: list[str]) -> Path | None:
42 """Return the nearest directory containing Cargo.toml for given paths.
44 Args:
45 paths: List of file paths to search from.
47 Returns:
48 Path to Cargo.toml directory, or None if not found.
49 """
50 roots: list[Path] = []
51 for raw_path in paths:
52 current = Path(raw_path).resolve()
53 # If it's a file, start from its parent
54 if current.is_file():
55 current = current.parent
56 # Search upward for Cargo.toml
57 for candidate in [current, *list(current.parents)]:
58 manifest = candidate / "Cargo.toml"
59 if manifest.exists():
60 roots.append(candidate)
61 break
63 if not roots:
64 return None
66 # Prefer a single root; if multiple, use common path when valid
67 unique_roots = set(roots)
68 if len(unique_roots) == 1:
69 return roots[0]
71 try:
72 common = Path(os.path.commonpath([str(r) for r in unique_roots]))
73 except ValueError:
74 logger.warning(
75 "Multiple Cargo roots found on different drives; cannot determine "
76 "common workspace root. Skipping rustfmt.",
77 )
78 return None
80 manifest = common / "Cargo.toml"
81 if manifest.exists():
82 return common
84 logger.warning(
85 "Multiple Cargo roots found ({}) without a common workspace Cargo.toml. "
86 "Consider creating a workspace or running rustfmt on each crate separately.",
87 ", ".join(str(r) for r in unique_roots),
88 )
89 return None
92def _build_rustfmt_check_command() -> list[str]:
93 """Build the cargo fmt check command.
95 Returns:
96 List of command arguments.
97 """
98 return ["cargo", "fmt", "--all", "--", "--check"]
101def _build_rustfmt_fix_command() -> list[str]:
102 """Build the cargo fmt fix command.
104 Returns:
105 List of command arguments.
106 """
107 return ["cargo", "fmt", "--all"]
110@register_tool
111@dataclass
112class RustfmtPlugin(BaseToolPlugin):
113 """Rustfmt Rust formatter plugin.
115 This plugin integrates Rust's rustfmt formatter with Lintro for formatting
116 Rust code consistently.
117 """
119 @property
120 def definition(self) -> ToolDefinition:
121 """Return the tool definition.
123 Returns:
124 ToolDefinition containing tool metadata.
125 """
126 return ToolDefinition(
127 name="rustfmt",
128 description="Rust's official code formatter",
129 can_fix=True,
130 tool_type=ToolType.FORMATTER,
131 file_patterns=RUSTFMT_FILE_PATTERNS,
132 priority=RUSTFMT_DEFAULT_PRIORITY,
133 conflicts_with=[],
134 native_configs=["rustfmt.toml", ".rustfmt.toml"],
135 version_command=["rustfmt", "--version"],
136 min_version=get_min_version(ToolName.RUSTFMT),
137 default_options={
138 "timeout": RUSTFMT_DEFAULT_TIMEOUT,
139 },
140 default_timeout=RUSTFMT_DEFAULT_TIMEOUT,
141 )
143 def set_options(
144 self,
145 timeout: int | None = None,
146 **kwargs: Any,
147 ) -> None:
148 """Set Rustfmt-specific options.
150 Args:
151 timeout: Timeout in seconds (default: 60).
152 **kwargs: Additional options.
153 """
154 validate_positive_int(timeout, "timeout")
156 options = filter_none_options(timeout=timeout)
157 super().set_options(**options, **kwargs)
159 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult:
160 """Run `cargo fmt -- --check` and parse formatting issues.
162 Args:
163 paths: List of file or directory paths to check.
164 options: Runtime options that override defaults.
166 Returns:
167 ToolResult with check results.
168 """
169 # Use shared preparation for version check, path validation, file discovery
170 ctx = self._prepare_execution(
171 paths,
172 options,
173 no_files_message="No Rust files found to check.",
174 )
175 if ctx.should_skip:
176 return ctx.early_result # type: ignore[return-value]
178 cargo_root = _find_cargo_root(ctx.files)
179 if cargo_root is None:
180 return ToolResult(
181 name=self.definition.name,
182 success=True,
183 output="No Cargo.toml found; skipping rustfmt.",
184 issues_count=0,
185 )
187 cmd = _build_rustfmt_check_command()
189 try:
190 success_cmd, output = run_subprocess_with_timeout(
191 tool=self,
192 cmd=cmd,
193 timeout=ctx.timeout,
194 cwd=str(cargo_root),
195 tool_name="rustfmt",
196 )
197 except subprocess.TimeoutExpired:
198 timeout_result = create_timeout_result(
199 tool=self,
200 timeout=ctx.timeout,
201 cmd=cmd,
202 tool_name="rustfmt",
203 )
204 return ToolResult(
205 name=self.definition.name,
206 success=timeout_result.success,
207 output=timeout_result.output,
208 issues_count=timeout_result.issues_count,
209 issues=timeout_result.issues,
210 )
212 issues = parse_rustfmt_output(output=output)
213 issues_count = len(issues)
215 # Preserve output when command failed, even if no issues were parsed
216 should_show_output = issues_count > 0 or not success_cmd
218 return ToolResult(
219 name=self.definition.name,
220 success=bool(success_cmd) and issues_count == 0,
221 output=output if should_show_output else None,
222 issues_count=issues_count,
223 issues=issues,
224 )
226 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult:
227 """Run `cargo fmt --all` then re-check for remaining issues.
229 Args:
230 paths: List of file or directory paths to fix.
231 options: Runtime options that override defaults.
233 Returns:
234 ToolResult with fix results.
235 """
236 # Use shared preparation for version check, path validation, file discovery
237 ctx = self._prepare_execution(
238 paths,
239 options,
240 no_files_message="No Rust files found to fix.",
241 )
242 if ctx.should_skip:
243 return ctx.early_result # type: ignore[return-value]
245 cargo_root = _find_cargo_root(ctx.files)
246 if cargo_root is None:
247 return ToolResult(
248 name=self.definition.name,
249 success=True,
250 output="No Cargo.toml found; skipping rustfmt.",
251 issues_count=0,
252 initial_issues_count=0,
253 fixed_issues_count=0,
254 remaining_issues_count=0,
255 )
257 check_cmd = _build_rustfmt_check_command()
259 # First, count issues before fixing
260 try:
261 _, output_check = run_subprocess_with_timeout(
262 tool=self,
263 cmd=check_cmd,
264 timeout=ctx.timeout,
265 cwd=str(cargo_root),
266 tool_name="rustfmt",
267 )
268 except subprocess.TimeoutExpired:
269 # Timeout on initial check - can't determine issue counts
270 timeout_result = create_timeout_result(
271 tool=self,
272 timeout=ctx.timeout,
273 cmd=check_cmd,
274 tool_name="rustfmt",
275 )
276 return ToolResult(
277 name=self.definition.name,
278 success=timeout_result.success,
279 output=timeout_result.output,
280 issues_count=timeout_result.issues_count,
281 issues=timeout_result.issues,
282 initial_issues_count=0,
283 fixed_issues_count=0,
284 remaining_issues_count=0,
285 )
287 initial_issues = parse_rustfmt_output(output=output_check)
288 initial_count = len(initial_issues)
290 # Run fix
291 fix_cmd = _build_rustfmt_fix_command()
292 try:
293 fix_success, fix_output = run_subprocess_with_timeout(
294 tool=self,
295 cmd=fix_cmd,
296 timeout=ctx.timeout,
297 cwd=str(cargo_root),
298 tool_name="rustfmt",
299 )
300 except subprocess.TimeoutExpired:
301 timeout_result = create_timeout_result(
302 tool=self,
303 timeout=ctx.timeout,
304 cmd=fix_cmd,
305 tool_name="rustfmt",
306 )
307 return ToolResult(
308 name=self.definition.name,
309 success=timeout_result.success,
310 output=timeout_result.output,
311 issues_count=initial_count,
312 issues=initial_issues,
313 initial_issues_count=initial_count,
314 fixed_issues_count=0,
315 remaining_issues_count=initial_count,
316 )
318 # If fix command failed, return early with the fix output
319 if not fix_success:
320 return ToolResult(
321 name=self.definition.name,
322 success=False,
323 output=fix_output,
324 issues_count=initial_count,
325 issues=initial_issues,
326 initial_issues_count=initial_count,
327 fixed_issues_count=0,
328 remaining_issues_count=initial_count,
329 )
331 # Re-check after fix to count remaining issues
332 try:
333 verify_success, output_after = run_subprocess_with_timeout(
334 tool=self,
335 cmd=check_cmd,
336 timeout=ctx.timeout,
337 cwd=str(cargo_root),
338 tool_name="rustfmt",
339 )
340 except subprocess.TimeoutExpired:
341 timeout_result = create_timeout_result(
342 tool=self,
343 timeout=ctx.timeout,
344 cmd=check_cmd,
345 tool_name="rustfmt",
346 )
347 return ToolResult(
348 name=self.definition.name,
349 success=timeout_result.success,
350 output=timeout_result.output,
351 issues_count=initial_count,
352 issues=initial_issues,
353 initial_issues_count=initial_count,
354 fixed_issues_count=0,
355 remaining_issues_count=initial_count,
356 )
358 remaining_issues = parse_rustfmt_output(output=output_after)
359 remaining_count = len(remaining_issues)
360 fixed_count = max(0, initial_count - remaining_count)
362 # Success requires both: verification passed AND no remaining issues
363 overall_success = verify_success and remaining_count == 0
365 return ToolResult(
366 name=self.definition.name,
367 success=overall_success,
368 output=output_after if not overall_success else None,
369 issues_count=remaining_count,
370 issues=remaining_issues,
371 initial_issues_count=initial_count,
372 fixed_issues_count=fixed_count,
373 remaining_issues_count=remaining_count,
374 )