Coverage for lintro / ai / fix_context.py: 86%
94 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Context building and file reading for AI fix generation.
3Provides utilities for reading source files, extracting code context
4windows, validating issues, checking the suggestion cache, and
5constructing fix prompts with appropriate context sizing.
6"""
8from __future__ import annotations
10import threading
11from pathlib import Path
12from typing import TYPE_CHECKING
14from loguru import logger
16from lintro.ai.cache import get_cached_suggestion
17from lintro.ai.enums.sanitize_mode import SanitizeMode
18from lintro.ai.paths import resolve_workspace_file, to_provider_path
19from lintro.ai.prompts import FIX_PROMPT_TEMPLATE
20from lintro.ai.sanitize import (
21 detect_injection_patterns,
22 make_boundary_marker,
23 sanitize_code_content,
24)
25from lintro.ai.secrets import redact_secrets
26from lintro.ai.token_budget import estimate_tokens
28if TYPE_CHECKING:
29 from lintro.ai.models import AIFixSuggestion
30 from lintro.parsers.base_issue import BaseIssue
32# Context window around the issue line (lines before/after)
33CONTEXT_LINES = 15
35# Maximum file cache entries to limit memory usage.
36# Eviction is FIFO (oldest insertion order, via dict ordering) rather
37# than true LRU. FIFO is simpler and sufficient here because fix
38# generation processes issues file-by-file, so recently inserted
39# entries are overwhelmingly the ones accessed next.
40_MAX_CACHE_ENTRIES = 100
42# Only attempt full-file context for files under this many lines
43FULL_FILE_THRESHOLD = 500
45# Minimum context lines to keep when trimming for token budget
46MIN_CONTEXT_LINES = 3
49def read_file_safely(file_path: str) -> str | None:
50 """Read a file's contents, returning None on failure.
52 Args:
53 file_path: Path to the file.
55 Returns:
56 File contents as a string, or None if unreadable.
57 """
58 try:
59 return Path(file_path).read_text(encoding="utf-8")
60 except (OSError, UnicodeDecodeError):
61 logger.debug(f"Could not read file: {file_path}")
62 return None
65def extract_context(
66 content: str,
67 line: int,
68 context_lines: int = CONTEXT_LINES,
69) -> tuple[str, int, int]:
70 """Extract a code context window around a specific line.
72 Args:
73 content: Full file content.
74 line: 1-based line number.
75 context_lines: Number of lines before and after.
77 Returns:
78 Tuple of (context_string, start_line, end_line).
79 """
80 lines = content.splitlines()
81 total = len(lines)
83 # Clamp line to valid range [1, total] so out-of-range values
84 # still produce a useful context window.
85 clamped_line = max(1, min(line, total)) if total > 0 else 1
86 start = max(0, clamped_line - 1 - context_lines)
87 end = min(total, clamped_line + context_lines)
89 context = "\n".join(lines[start:end])
90 return context, start + 1, end
93def validate_and_read_file(
94 issue: BaseIssue,
95 file_cache: dict[str, str | None],
96 cache_lock: threading.Lock,
97 workspace_root: Path,
98 cache_max_entries: int = _MAX_CACHE_ENTRIES,
99) -> tuple[str, str] | None:
100 """Validate the issue and read its file content.
102 Returns (issue_file, file_content) or None if validation fails.
103 Thread-safe — uses a lock for the shared file cache.
104 """
105 if not issue.file or not issue.line:
106 logger.debug(
107 f"Skipping issue without file/line: "
108 f"file={issue.file!r} line={issue.line}",
109 )
110 return None
112 resolved_file = resolve_workspace_file(issue.file, workspace_root)
113 if resolved_file is None:
114 logger.debug(
115 f"Skipping issue outside workspace root: "
116 f"file={issue.file!r}, root={workspace_root}",
117 )
118 return None
119 issue_file = str(resolved_file)
121 with cache_lock:
122 if issue_file not in file_cache:
123 if len(file_cache) >= cache_max_entries:
124 oldest_key = next(iter(file_cache))
125 del file_cache[oldest_key]
126 file_cache[issue_file] = read_file_safely(issue_file)
127 file_content = file_cache[issue_file]
129 if file_content is None:
130 logger.debug(f"Cannot read file: {issue_file!r}")
131 return None
133 return issue_file, file_content
136def check_cache(
137 workspace_root: Path,
138 file_content: str,
139 code: str,
140 issue: BaseIssue,
141 tool_name: str,
142 cache_ttl: int,
143) -> AIFixSuggestion | None:
144 """Check the suggestion dedup cache and return a hit if found."""
145 cached = get_cached_suggestion(
146 workspace_root,
147 file_content,
148 code,
149 issue.line,
150 issue.message,
151 ttl=cache_ttl,
152 )
153 if cached is not None:
154 logger.debug(
155 f"Cache hit for {issue.file}:{issue.line} ({code})",
156 )
157 cached.tool_name = tool_name
158 cached.input_tokens = 0
159 cached.output_tokens = 0
160 cached.cost_estimate = 0.0
161 return cached
162 return None
165def build_fix_context(
166 issue: BaseIssue,
167 issue_file: str,
168 file_content: str,
169 tool_name: str,
170 code: str,
171 workspace_root: Path,
172 context_lines: int,
173 max_prompt_tokens: int,
174 full_file_threshold: int,
175 sanitize_mode: SanitizeMode = SanitizeMode.WARN,
176) -> str | None:
177 """Sanitize content and build the fix prompt with appropriate context.
179 Tries full-file context for small files, falls back to windowed
180 context that progressively shrinks to fit the token budget.
181 """
182 sanitized_content = redact_secrets(sanitize_code_content(file_content))
183 safe_message = redact_secrets(sanitize_code_content(issue.message))
184 if sanitize_mode != SanitizeMode.OFF:
185 file_injections = detect_injection_patterns(file_content)
186 msg_injections = detect_injection_patterns(issue.message)
187 injections = file_injections + msg_injections
188 if injections:
189 if sanitize_mode == SanitizeMode.BLOCK:
190 logger.warning(
191 f"Blocking fix for {issue.file}: prompt injection "
192 f"patterns detected in file/diagnostic: "
193 f"{', '.join(injections)}",
194 )
195 return None
196 # SanitizeMode.WARN (default)
197 logger.warning(
198 f"Potential prompt injection patterns detected in "
199 f"{issue.file} (file/diagnostic): {', '.join(injections)}",
200 )
202 total_lines = len(file_content.splitlines())
203 if total_lines <= full_file_threshold:
204 boundary = make_boundary_marker()
205 full_prompt = FIX_PROMPT_TEMPLATE.format(
206 tool_name=tool_name,
207 code=code,
208 file=to_provider_path(issue_file, workspace_root),
209 line=issue.line,
210 message=safe_message,
211 context_start=1,
212 context_end=total_lines,
213 code_context=sanitized_content,
214 boundary=boundary,
215 )
216 if estimate_tokens(full_prompt) <= max_prompt_tokens:
217 logger.debug(
218 f"Using full file context ({total_lines} lines) for "
219 f"{issue.file}:{issue.line}",
220 )
221 return full_prompt
223 effective_context_lines = context_lines
224 while True:
225 context, context_start, context_end = extract_context(
226 file_content,
227 issue.line,
228 context_lines=effective_context_lines,
229 )
230 boundary = make_boundary_marker()
231 sanitized_context = redact_secrets(sanitize_code_content(context))
232 prompt = FIX_PROMPT_TEMPLATE.format(
233 tool_name=tool_name,
234 code=code,
235 file=to_provider_path(issue_file, workspace_root),
236 line=issue.line,
237 message=safe_message,
238 context_start=context_start,
239 context_end=context_end,
240 code_context=sanitized_context,
241 boundary=boundary,
242 )
243 prompt_tokens = estimate_tokens(prompt)
244 if prompt_tokens <= max_prompt_tokens:
245 return prompt
246 if effective_context_lines <= MIN_CONTEXT_LINES:
247 logger.debug(
248 f"Fix prompt still over budget at minimum context "
249 f"({prompt_tokens} > {max_prompt_tokens}) for "
250 f"{issue.file}:{issue.line}; sending anyway",
251 )
252 return prompt
253 old_ctx = effective_context_lines
254 effective_context_lines = max(
255 MIN_CONTEXT_LINES,
256 effective_context_lines // 2,
257 )
258 logger.debug(
259 f"Fix prompt over budget for {issue.file}:{issue.line} "
260 f"reducing context_lines {old_ctx} -> {effective_context_lines}",
261 )