Coverage for lintro / ai / fix_context.py: 86%

94 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Context building and file reading for AI fix generation. 

2 

3Provides utilities for reading source files, extracting code context 

4windows, validating issues, checking the suggestion cache, and 

5constructing fix prompts with appropriate context sizing. 

6""" 

7 

8from __future__ import annotations 

9 

10import threading 

11from pathlib import Path 

12from typing import TYPE_CHECKING 

13 

14from loguru import logger 

15 

16from lintro.ai.cache import get_cached_suggestion 

17from lintro.ai.enums.sanitize_mode import SanitizeMode 

18from lintro.ai.paths import resolve_workspace_file, to_provider_path 

19from lintro.ai.prompts import FIX_PROMPT_TEMPLATE 

20from lintro.ai.sanitize import ( 

21 detect_injection_patterns, 

22 make_boundary_marker, 

23 sanitize_code_content, 

24) 

25from lintro.ai.secrets import redact_secrets 

26from lintro.ai.token_budget import estimate_tokens 

27 

28if TYPE_CHECKING: 

29 from lintro.ai.models import AIFixSuggestion 

30 from lintro.parsers.base_issue import BaseIssue 

31 

32# Context window around the issue line (lines before/after) 

33CONTEXT_LINES = 15 

34 

35# Maximum file cache entries to limit memory usage. 

36# Eviction is FIFO (oldest insertion order, via dict ordering) rather 

37# than true LRU. FIFO is simpler and sufficient here because fix 

38# generation processes issues file-by-file, so recently inserted 

39# entries are overwhelmingly the ones accessed next. 

40_MAX_CACHE_ENTRIES = 100 

41 

42# Only attempt full-file context for files under this many lines 

43FULL_FILE_THRESHOLD = 500 

44 

45# Minimum context lines to keep when trimming for token budget 

46MIN_CONTEXT_LINES = 3 

47 

48 

49def read_file_safely(file_path: str) -> str | None: 

50 """Read a file's contents, returning None on failure. 

51 

52 Args: 

53 file_path: Path to the file. 

54 

55 Returns: 

56 File contents as a string, or None if unreadable. 

57 """ 

58 try: 

59 return Path(file_path).read_text(encoding="utf-8") 

60 except (OSError, UnicodeDecodeError): 

61 logger.debug(f"Could not read file: {file_path}") 

62 return None 

63 

64 

65def extract_context( 

66 content: str, 

67 line: int, 

68 context_lines: int = CONTEXT_LINES, 

69) -> tuple[str, int, int]: 

70 """Extract a code context window around a specific line. 

71 

72 Args: 

73 content: Full file content. 

74 line: 1-based line number. 

75 context_lines: Number of lines before and after. 

76 

77 Returns: 

78 Tuple of (context_string, start_line, end_line). 

79 """ 

80 lines = content.splitlines() 

81 total = len(lines) 

82 

83 # Clamp line to valid range [1, total] so out-of-range values 

84 # still produce a useful context window. 

85 clamped_line = max(1, min(line, total)) if total > 0 else 1 

86 start = max(0, clamped_line - 1 - context_lines) 

87 end = min(total, clamped_line + context_lines) 

88 

89 context = "\n".join(lines[start:end]) 

90 return context, start + 1, end 

91 

92 

93def validate_and_read_file( 

94 issue: BaseIssue, 

95 file_cache: dict[str, str | None], 

96 cache_lock: threading.Lock, 

97 workspace_root: Path, 

98 cache_max_entries: int = _MAX_CACHE_ENTRIES, 

99) -> tuple[str, str] | None: 

100 """Validate the issue and read its file content. 

101 

102 Returns (issue_file, file_content) or None if validation fails. 

103 Thread-safe — uses a lock for the shared file cache. 

104 """ 

105 if not issue.file or not issue.line: 

106 logger.debug( 

107 f"Skipping issue without file/line: " 

108 f"file={issue.file!r} line={issue.line}", 

109 ) 

110 return None 

111 

112 resolved_file = resolve_workspace_file(issue.file, workspace_root) 

113 if resolved_file is None: 

114 logger.debug( 

115 f"Skipping issue outside workspace root: " 

116 f"file={issue.file!r}, root={workspace_root}", 

117 ) 

118 return None 

119 issue_file = str(resolved_file) 

120 

121 with cache_lock: 

122 if issue_file not in file_cache: 

123 if len(file_cache) >= cache_max_entries: 

124 oldest_key = next(iter(file_cache)) 

125 del file_cache[oldest_key] 

126 file_cache[issue_file] = read_file_safely(issue_file) 

127 file_content = file_cache[issue_file] 

128 

129 if file_content is None: 

130 logger.debug(f"Cannot read file: {issue_file!r}") 

131 return None 

132 

133 return issue_file, file_content 

134 

135 

136def check_cache( 

137 workspace_root: Path, 

138 file_content: str, 

139 code: str, 

140 issue: BaseIssue, 

141 tool_name: str, 

142 cache_ttl: int, 

143) -> AIFixSuggestion | None: 

144 """Check the suggestion dedup cache and return a hit if found.""" 

145 cached = get_cached_suggestion( 

146 workspace_root, 

147 file_content, 

148 code, 

149 issue.line, 

150 issue.message, 

151 ttl=cache_ttl, 

152 ) 

153 if cached is not None: 

154 logger.debug( 

155 f"Cache hit for {issue.file}:{issue.line} ({code})", 

156 ) 

157 cached.tool_name = tool_name 

158 cached.input_tokens = 0 

159 cached.output_tokens = 0 

160 cached.cost_estimate = 0.0 

161 return cached 

162 return None 

163 

164 

165def build_fix_context( 

166 issue: BaseIssue, 

167 issue_file: str, 

168 file_content: str, 

169 tool_name: str, 

170 code: str, 

171 workspace_root: Path, 

172 context_lines: int, 

173 max_prompt_tokens: int, 

174 full_file_threshold: int, 

175 sanitize_mode: SanitizeMode = SanitizeMode.WARN, 

176) -> str | None: 

177 """Sanitize content and build the fix prompt with appropriate context. 

178 

179 Tries full-file context for small files, falls back to windowed 

180 context that progressively shrinks to fit the token budget. 

181 """ 

182 sanitized_content = redact_secrets(sanitize_code_content(file_content)) 

183 safe_message = redact_secrets(sanitize_code_content(issue.message)) 

184 if sanitize_mode != SanitizeMode.OFF: 

185 file_injections = detect_injection_patterns(file_content) 

186 msg_injections = detect_injection_patterns(issue.message) 

187 injections = file_injections + msg_injections 

188 if injections: 

189 if sanitize_mode == SanitizeMode.BLOCK: 

190 logger.warning( 

191 f"Blocking fix for {issue.file}: prompt injection " 

192 f"patterns detected in file/diagnostic: " 

193 f"{', '.join(injections)}", 

194 ) 

195 return None 

196 # SanitizeMode.WARN (default) 

197 logger.warning( 

198 f"Potential prompt injection patterns detected in " 

199 f"{issue.file} (file/diagnostic): {', '.join(injections)}", 

200 ) 

201 

202 total_lines = len(file_content.splitlines()) 

203 if total_lines <= full_file_threshold: 

204 boundary = make_boundary_marker() 

205 full_prompt = FIX_PROMPT_TEMPLATE.format( 

206 tool_name=tool_name, 

207 code=code, 

208 file=to_provider_path(issue_file, workspace_root), 

209 line=issue.line, 

210 message=safe_message, 

211 context_start=1, 

212 context_end=total_lines, 

213 code_context=sanitized_content, 

214 boundary=boundary, 

215 ) 

216 if estimate_tokens(full_prompt) <= max_prompt_tokens: 

217 logger.debug( 

218 f"Using full file context ({total_lines} lines) for " 

219 f"{issue.file}:{issue.line}", 

220 ) 

221 return full_prompt 

222 

223 effective_context_lines = context_lines 

224 while True: 

225 context, context_start, context_end = extract_context( 

226 file_content, 

227 issue.line, 

228 context_lines=effective_context_lines, 

229 ) 

230 boundary = make_boundary_marker() 

231 sanitized_context = redact_secrets(sanitize_code_content(context)) 

232 prompt = FIX_PROMPT_TEMPLATE.format( 

233 tool_name=tool_name, 

234 code=code, 

235 file=to_provider_path(issue_file, workspace_root), 

236 line=issue.line, 

237 message=safe_message, 

238 context_start=context_start, 

239 context_end=context_end, 

240 code_context=sanitized_context, 

241 boundary=boundary, 

242 ) 

243 prompt_tokens = estimate_tokens(prompt) 

244 if prompt_tokens <= max_prompt_tokens: 

245 return prompt 

246 if effective_context_lines <= MIN_CONTEXT_LINES: 

247 logger.debug( 

248 f"Fix prompt still over budget at minimum context " 

249 f"({prompt_tokens} > {max_prompt_tokens}) for " 

250 f"{issue.file}:{issue.line}; sending anyway", 

251 ) 

252 return prompt 

253 old_ctx = effective_context_lines 

254 effective_context_lines = max( 

255 MIN_CONTEXT_LINES, 

256 effective_context_lines // 2, 

257 ) 

258 logger.debug( 

259 f"Fix prompt over budget for {issue.file}:{issue.line} " 

260 f"reducing context_lines {old_ctx} -> {effective_context_lines}", 

261 )