Coverage for lintro / ai / apply.py: 90%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Apply AI-generated fix suggestions to source files. 

2 

3Handles line-targeted replacement within a configurable search radius. 

4""" 

5 

6from __future__ import annotations 

7 

8import os 

9import tempfile 

10from collections.abc import Sequence 

11from pathlib import Path 

12 

13from loguru import logger 

14 

15from lintro.ai.models import AIFixSuggestion 

16from lintro.ai.paths import resolve_workspace_file 

17 

18 

19def _apply_fix( 

20 suggestion: AIFixSuggestion, 

21 *, 

22 workspace_root: Path, 

23 auto_apply: bool = False, 

24 search_radius: int = 5, 

25) -> bool: 

26 """Apply a single fix suggestion to the file. 

27 

28 Uses line-number-targeted replacement to avoid matching the wrong 

29 occurrence when the same code pattern appears elsewhere in the file. 

30 If the original code is not found within the search radius of the 

31 target line, the fix fails (returns False). 

32 

33 Args: 

34 suggestion: Fix suggestion to apply. 

35 workspace_root: Root directory limiting writable paths. 

36 auto_apply: Reserved for future use; kept for API compatibility. 

37 search_radius: Max lines above/below the target line to search 

38 for the original code pattern. 

39 

40 Returns: 

41 True if the fix was applied successfully. 

42 

43 Raises: 

44 BaseException: Re-raised after cleanup when the atomic write fails. 

45 """ 

46 try: 

47 resolved = resolve_workspace_file(suggestion.file, workspace_root) 

48 if resolved is None: 

49 return False 

50 path = resolved 

51 content = path.read_text(encoding="utf-8") 

52 lines = content.splitlines(keepends=True) 

53 

54 original_lines = suggestion.original_code.splitlines(keepends=True) 

55 if not original_lines: 

56 return False 

57 

58 # Ensure last line has consistent newline for comparison 

59 if original_lines and not original_lines[-1].endswith("\n"): 

60 original_lines[-1] += "\n" 

61 

62 # Validate line number before doing arithmetic. 

63 if not isinstance(suggestion.line, int) or suggestion.line < 0: 

64 logger.debug( 

65 f"Invalid line {suggestion.line!r} for {suggestion.file}, " 

66 f"skipping fix", 

67 ) 

68 return False 

69 

70 # line == 0 means "unspecified" — no line-targeted search possible. 

71 if suggestion.line >= 1: 

72 # Search outward from the target line (closest match wins). 

73 # Clamp to last line when the AI reports a stale/out-of-range 

74 # number so the search radius still gets a chance. 

75 target_idx = min(suggestion.line - 1, len(lines) - 1) # 0-based 

76 search_order = [target_idx] 

77 for offset in range(1, search_radius + 1): 

78 if target_idx - offset >= 0: 

79 search_order.append(target_idx - offset) 

80 if target_idx + offset < len(lines): 

81 search_order.append(target_idx + offset) 

82 

83 else: 

84 search_order = [] 

85 

86 for start in search_order: 

87 end = start + len(original_lines) 

88 if end > len(lines): 

89 continue 

90 

91 window = lines[start:end] 

92 # Normalize trailing newline on last window line for comparison 

93 normalized_window = list(window) 

94 if normalized_window and not normalized_window[-1].endswith("\n"): 

95 normalized_window[-1] += "\n" 

96 

97 if normalized_window == original_lines: 

98 suggested_lines = suggestion.suggested_code.splitlines( 

99 keepends=True, 

100 ) 

101 # Preserve trailing newline consistency 

102 if ( 

103 suggested_lines 

104 and window 

105 and window[-1].endswith("\n") 

106 and not suggested_lines[-1].endswith("\n") 

107 ): 

108 suggested_lines[-1] += "\n" 

109 

110 new_lines = lines[:start] + suggested_lines + lines[end:] 

111 # Atomic write: write to temp file then rename 

112 fd, tmp = tempfile.mkstemp( 

113 dir=path.parent, 

114 suffix=".tmp", 

115 ) 

116 try: 

117 # os.fdopen transfers fd ownership to the file object. 

118 # If os.fdopen itself raises, fd is still raw and must 

119 # be closed manually to avoid a leak. 

120 try: 

121 fobj = os.fdopen(fd, "wb") 

122 except BaseException: 

123 os.close(fd) 

124 raise 

125 with fobj: 

126 fobj.write("".join(new_lines).encode("utf-8")) 

127 Path(tmp).replace(path) 

128 except BaseException: 

129 Path(tmp).unlink(missing_ok=True) 

130 raise 

131 return True 

132 

133 return False 

134 

135 except OSError: 

136 return False 

137 

138 

139def apply_fixes( 

140 suggestions: Sequence[AIFixSuggestion], 

141 *, 

142 workspace_root: Path, 

143 auto_apply: bool = False, 

144 search_radius: int | None = None, 

145) -> list[AIFixSuggestion]: 

146 """Apply suggestions and return only those successfully applied.""" 

147 extra: dict[str, int] = {} 

148 if search_radius is not None: 

149 extra["search_radius"] = search_radius 

150 return [ 

151 fix 

152 for fix in suggestions 

153 if _apply_fix( 

154 fix, 

155 workspace_root=workspace_root, 

156 auto_apply=auto_apply, 

157 **extra, 

158 ) 

159 ]