Coverage for lintro / ai / validation.py: 92%
127 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Fix validation via tool re-run.
3After AI fixes are applied, re-runs the relevant linting tools on the
4affected files to confirm that the issues were actually resolved.
6The unified ``verify_fixes`` function combines tool re-execution (previously
7in ``rerun.py``) with per-fix verification so that each tool is invoked only
8once rather than twice.
9"""
11from __future__ import annotations
13import subprocess
14from collections import Counter, defaultdict
15from collections.abc import Sequence
16from dataclasses import dataclass, field
17from pathlib import Path
18from typing import TYPE_CHECKING
20from loguru import logger
22if TYPE_CHECKING:
23 from lintro.ai.models import AIFixSuggestion
24 from lintro.models.core.tool_result import ToolResult
25 from lintro.parsers.base_issue import BaseIssue
28IssueMatchKey = tuple[str, str, int | None]
31@dataclass
32class ValidationResult:
33 """Result of validating applied fixes by re-running tools.
35 Attributes:
36 verified: Number of fixes whose issues no longer appear.
37 unverified: Number of fixes whose issues still appear.
38 new_issues: Number of remaining issues not matched to any applied fix.
39 details: Per-file validation details.
40 verified_by_tool: Verified fix counts grouped by tool name.
41 unverified_by_tool: Unverified fix counts grouped by tool name.
42 """
44 verified: int = 0
45 unverified: int = 0
46 new_issues: int = 0
47 details: list[str] = field(default_factory=list)
48 verified_by_tool: dict[str, int] = field(default_factory=dict)
49 unverified_by_tool: dict[str, int] = field(default_factory=dict)
52def verify_fixes(
53 *,
54 applied_suggestions: Sequence[AIFixSuggestion],
55 by_tool: dict[str, tuple[ToolResult, list[BaseIssue]]],
56) -> ValidationResult | None:
57 """Unified post-fix verification: re-run tools and validate fixes.
59 Runs each tool once and uses the results for two purposes:
60 1. Update the original ``ToolResult`` objects with fresh remaining-issue
61 counts (what ``rerun.apply_rerun_results`` previously did).
62 2. Check each applied suggestion against the fresh results to determine
63 whether the fix was verified (what ``validate_applied_fixes`` does).
65 This replaces the previous pattern of calling ``rerun_tools`` +
66 ``apply_rerun_results`` followed by ``validate_applied_fixes``, which
67 caused each tool to be invoked twice.
69 Args:
70 applied_suggestions: Suggestions that were successfully applied.
71 by_tool: Dict mapping tool name to (ToolResult, issues) pairs,
72 used for cwd-aware tool re-execution and ToolResult updates.
74 Returns:
75 ValidationResult summarizing what was verified, or None if
76 validation could not run.
77 """
78 if not applied_suggestions:
79 return None
81 from lintro.ai.rerun import apply_rerun_results, rerun_tools
83 # Step 1: Re-run tools (cwd-aware) to get fresh ToolResults.
84 fresh_results = rerun_tools(by_tool)
85 if fresh_results is None:
86 return None
88 apply_rerun_results(by_tool=by_tool, rerun_results=fresh_results)
90 # Build a lookup from tool name -> fresh remaining issues for validation.
91 fresh_issues_by_tool: dict[str, list[object]] = {}
92 for result in fresh_results:
93 issues: list[object] = list(result.issues) if result.issues is not None else []
94 fresh_issues_by_tool[result.name] = issues
96 # Step 2: Validate each applied suggestion using the fresh results.
97 return _validate_suggestions(applied_suggestions, fresh_issues_by_tool)
100def validate_applied_fixes(
101 applied_suggestions: Sequence[AIFixSuggestion],
102) -> ValidationResult | None:
103 """Re-run tools on files modified by AI fixes to verify correctness.
105 Groups applied suggestions by tool, runs each tool's check on the
106 affected files, and checks whether the originally reported issues
107 are still present.
109 This standalone version is used when ``by_tool`` context is not
110 available (e.g. interactive per-group validation and post-refinement
111 re-validation).
113 Args:
114 applied_suggestions: Suggestions that were successfully applied.
116 Returns:
117 ValidationResult summarizing what was verified, or None if
118 validation could not run (e.g. no tools available).
119 """
120 if not applied_suggestions:
121 return None
123 # Group suggestions by tool_name -> set of files
124 by_tool_suggestions: dict[str, list[AIFixSuggestion]] = defaultdict(list)
125 for s in applied_suggestions:
126 tool = s.tool_name or "unknown"
127 by_tool_suggestions[tool].append(s)
129 # Run each tool and collect fresh issues
130 fresh_issues_by_tool: dict[str, list[object]] = {}
131 any_tool_ran = False
132 for tool_name, suggestions in by_tool_suggestions.items():
133 if tool_name == "unknown":
134 continue
136 file_paths = list({s.file for s in suggestions})
137 remaining_issues = _run_tool_check(tool_name, file_paths)
139 if remaining_issues is None:
140 logger.debug(f"Validation skipped for {tool_name}: tool check failed")
141 continue
143 any_tool_ran = True
144 fresh_issues_by_tool[tool_name] = remaining_issues
146 if not any_tool_ran:
147 return None
149 return _validate_suggestions(applied_suggestions, fresh_issues_by_tool)
152def _validate_suggestions(
153 applied_suggestions: Sequence[AIFixSuggestion],
154 fresh_issues_by_tool: dict[str, list[object]],
155) -> ValidationResult:
156 """Core validation logic: compare applied suggestions against fresh issues.
158 Args:
159 applied_suggestions: Suggestions that were applied.
160 fresh_issues_by_tool: Mapping of tool name to remaining issues
161 from a fresh tool run.
163 Returns:
164 ValidationResult with per-fix verification status.
165 """
166 # Group suggestions by tool
167 by_tool_suggestions: dict[str, list[AIFixSuggestion]] = defaultdict(list)
168 for s in applied_suggestions:
169 tool = s.tool_name or "unknown"
170 by_tool_suggestions[tool].append(s)
172 result = ValidationResult()
174 for tool_name, suggestions in by_tool_suggestions.items():
175 if tool_name == "unknown":
176 continue
178 remaining_issues = fresh_issues_by_tool.get(tool_name)
179 if remaining_issues is None:
180 # Tool was not run or check failed -- skip validation
181 continue
183 # Build a multiset for accurate one-to-one matching.
184 remaining_counts: Counter[IssueMatchKey] = Counter()
185 for issue in remaining_issues:
186 code = getattr(issue, "code", "") or ""
187 remaining_path = _normalize_file_path(getattr(issue, "file", ""))
188 line = _normalize_line(getattr(issue, "line", None))
189 remaining_counts[(remaining_path, code, line)] += 1
191 # Check each applied suggestion against remaining issues
192 for s in suggestions:
193 suggestion_path = _normalize_file_path(s.file)
194 suggestion_line = _normalize_line(s.line)
195 if _consume_matching_remaining_issue(
196 remaining_counts=remaining_counts,
197 file_path=suggestion_path,
198 code=s.code,
199 line=suggestion_line,
200 ):
201 result.unverified += 1
202 result.unverified_by_tool[tool_name] = (
203 result.unverified_by_tool.get(tool_name, 0) + 1
204 )
205 rel = Path(s.file).name
206 result.details.append(
207 f"[{s.code}] {rel}:{s.line} — issue still present",
208 )
209 else:
210 result.verified += 1
211 result.verified_by_tool[tool_name] = (
212 result.verified_by_tool.get(tool_name, 0) + 1
213 )
215 # Count new issues: remaining issues not consumed by any suggestion
216 new_count = sum(v for v in remaining_counts.values() if v > 0)
217 result.new_issues += new_count
219 return result
222def _normalize_line(line: object) -> int | None:
223 """Normalize line values for reliable issue matching.
225 ``BaseIssue.line`` is typed as ``int`` (default 0), so the ``str``
226 branch is unnecessary. The ``bool`` guard remains because ``bool``
227 is a subclass of ``int`` in Python.
228 """
229 if isinstance(line, bool):
230 return None
231 if isinstance(line, int):
232 return line if line > 0 else None
233 return None
236def _consume_matching_remaining_issue(
237 *,
238 remaining_counts: Counter[IssueMatchKey],
239 file_path: str,
240 code: str,
241 line: int | None,
242) -> bool:
243 """Consume a matching remaining issue if present.
245 Matching order:
246 1. Exact file/code/line.
247 2. File/code where the remaining issue has no line number.
248 3. For line-less suggestions, file/code with any line.
249 """
250 if line is not None:
251 exact_key = (file_path, code, line)
252 if remaining_counts.get(exact_key, 0) > 0:
253 remaining_counts[exact_key] -= 1
254 return True
256 unknown_line_key = (file_path, code, None)
257 if remaining_counts.get(unknown_line_key, 0) > 0:
258 remaining_counts[unknown_line_key] -= 1
259 return True
261 if line is None:
262 match_key = next(
263 (
264 k
265 for k, v in remaining_counts.items()
266 if v > 0 and k[0] == file_path and k[1] == code
267 ),
268 None,
269 )
270 if match_key is not None:
271 remaining_counts[match_key] -= 1
272 return True
274 return False
277def _normalize_file_path(file_path: str) -> str:
278 """Normalize file paths for reliable issue matching."""
279 if not file_path:
280 return ""
281 try:
282 return str(Path(file_path).resolve())
283 except OSError:
284 return str(Path(file_path).absolute())
287def _run_tool_check(
288 tool_name: str,
289 file_paths: list[str],
290) -> list[object] | None:
291 """Run a tool's check on specific files.
293 Args:
294 tool_name: Name of the tool to run (e.g. "ruff").
295 file_paths: Absolute paths to files to check.
297 Returns:
298 List of issues found, or None if the tool is not available
299 or the check failed.
300 """
301 try:
302 from lintro.tools import tool_manager
304 tool = tool_manager.get_tool(tool_name)
305 except (KeyError, ImportError):
306 logger.debug(f"Validation: tool {tool_name!r} not available")
307 return None
309 try:
310 tool_result = tool.check(file_paths, {})
311 except (OSError, subprocess.SubprocessError, ValueError, RuntimeError):
312 logger.debug(
313 f"Validation: {tool_name} check failed",
314 exc_info=True,
315 )
316 return None
317 else:
318 if tool_result.issues is not None:
319 return list(tool_result.issues)
320 return []