Coverage for lintro / ai / apply.py: 90%
68 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Apply AI-generated fix suggestions to source files.
3Handles line-targeted replacement within a configurable search radius.
4"""
6from __future__ import annotations
8import os
9import tempfile
10from collections.abc import Sequence
11from pathlib import Path
13from loguru import logger
15from lintro.ai.models import AIFixSuggestion
16from lintro.ai.paths import resolve_workspace_file
19def _apply_fix(
20 suggestion: AIFixSuggestion,
21 *,
22 workspace_root: Path,
23 auto_apply: bool = False,
24 search_radius: int = 5,
25) -> bool:
26 """Apply a single fix suggestion to the file.
28 Uses line-number-targeted replacement to avoid matching the wrong
29 occurrence when the same code pattern appears elsewhere in the file.
30 If the original code is not found within the search radius of the
31 target line, the fix fails (returns False).
33 Args:
34 suggestion: Fix suggestion to apply.
35 workspace_root: Root directory limiting writable paths.
36 auto_apply: Reserved for future use; kept for API compatibility.
37 search_radius: Max lines above/below the target line to search
38 for the original code pattern.
40 Returns:
41 True if the fix was applied successfully.
43 Raises:
44 BaseException: Re-raised after cleanup when the atomic write fails.
45 """
46 try:
47 resolved = resolve_workspace_file(suggestion.file, workspace_root)
48 if resolved is None:
49 return False
50 path = resolved
51 content = path.read_text(encoding="utf-8")
52 lines = content.splitlines(keepends=True)
54 original_lines = suggestion.original_code.splitlines(keepends=True)
55 if not original_lines:
56 return False
58 # Ensure last line has consistent newline for comparison
59 if original_lines and not original_lines[-1].endswith("\n"):
60 original_lines[-1] += "\n"
62 # Validate line number before doing arithmetic.
63 if not isinstance(suggestion.line, int) or suggestion.line < 0:
64 logger.debug(
65 f"Invalid line {suggestion.line!r} for {suggestion.file}, "
66 f"skipping fix",
67 )
68 return False
70 # line == 0 means "unspecified" — no line-targeted search possible.
71 if suggestion.line >= 1:
72 # Search outward from the target line (closest match wins).
73 # Clamp to last line when the AI reports a stale/out-of-range
74 # number so the search radius still gets a chance.
75 target_idx = min(suggestion.line - 1, len(lines) - 1) # 0-based
76 search_order = [target_idx]
77 for offset in range(1, search_radius + 1):
78 if target_idx - offset >= 0:
79 search_order.append(target_idx - offset)
80 if target_idx + offset < len(lines):
81 search_order.append(target_idx + offset)
83 else:
84 search_order = []
86 for start in search_order:
87 end = start + len(original_lines)
88 if end > len(lines):
89 continue
91 window = lines[start:end]
92 # Normalize trailing newline on last window line for comparison
93 normalized_window = list(window)
94 if normalized_window and not normalized_window[-1].endswith("\n"):
95 normalized_window[-1] += "\n"
97 if normalized_window == original_lines:
98 suggested_lines = suggestion.suggested_code.splitlines(
99 keepends=True,
100 )
101 # Preserve trailing newline consistency
102 if (
103 suggested_lines
104 and window
105 and window[-1].endswith("\n")
106 and not suggested_lines[-1].endswith("\n")
107 ):
108 suggested_lines[-1] += "\n"
110 new_lines = lines[:start] + suggested_lines + lines[end:]
111 # Atomic write: write to temp file then rename
112 fd, tmp = tempfile.mkstemp(
113 dir=path.parent,
114 suffix=".tmp",
115 )
116 try:
117 # os.fdopen transfers fd ownership to the file object.
118 # If os.fdopen itself raises, fd is still raw and must
119 # be closed manually to avoid a leak.
120 try:
121 fobj = os.fdopen(fd, "wb")
122 except BaseException:
123 os.close(fd)
124 raise
125 with fobj:
126 fobj.write("".join(new_lines).encode("utf-8"))
127 Path(tmp).replace(path)
128 except BaseException:
129 Path(tmp).unlink(missing_ok=True)
130 raise
131 return True
133 return False
135 except OSError:
136 return False
139def apply_fixes(
140 suggestions: Sequence[AIFixSuggestion],
141 *,
142 workspace_root: Path,
143 auto_apply: bool = False,
144 search_radius: int | None = None,
145) -> list[AIFixSuggestion]:
146 """Apply suggestions and return only those successfully applied."""
147 extra: dict[str, int] = {}
148 if search_radius is not None:
149 extra["search_radius"] = search_radius
150 return [
151 fix
152 for fix in suggestions
153 if _apply_fix(
154 fix,
155 workspace_root=workspace_root,
156 auto_apply=auto_apply,
157 **extra,
158 )
159 ]