Coverage for lintro / ai / refinement.py: 88%

85 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Multi-turn fix refinement for unverified AI fixes. 

2 

3When validation shows that an applied fix did not resolve the original 

4issue, this module reverts the fix, generates a refined suggestion using 

5the refinement prompt (which includes the previous attempt and the 

6validation error), applies the refined fix, and returns it for 

7re-validation. 

8""" 

9 

10from __future__ import annotations 

11 

12import functools 

13import re 

14from pathlib import Path 

15from typing import TYPE_CHECKING 

16 

17from loguru import logger 

18 

19from lintro.ai.apply import apply_fixes 

20from lintro.ai.fix import _call_provider 

21from lintro.ai.fix_context import extract_context, read_file_safely 

22from lintro.ai.fix_parsing import parse_fix_response 

23from lintro.ai.paths import resolve_workspace_file, to_provider_path 

24from lintro.ai.prompts import FIX_SYSTEM, REFINEMENT_PROMPT_TEMPLATE 

25from lintro.ai.retry import with_retry 

26from lintro.ai.sanitize import make_boundary_marker, sanitize_code_content 

27from lintro.ai.secrets import redact_secrets 

28 

29if TYPE_CHECKING: 

30 from lintro.ai.config import AIConfig 

31 from lintro.ai.models import AIFixSuggestion 

32 from lintro.ai.providers.base import BaseAIProvider 

33 from lintro.ai.validation import ValidationResult 

34 

35 

36def _revert_fix( 

37 suggestion: AIFixSuggestion, 

38 workspace_root: Path, 

39 *, 

40 search_radius: int = 5, 

41) -> bool: 

42 """Revert an applied fix by replacing suggested_code back with original_code. 

43 

44 Args: 

45 suggestion: The applied suggestion to revert. 

46 workspace_root: Workspace root for path resolution. 

47 search_radius: Max lines above/below the target line to search. 

48 

49 Returns: 

50 True if the revert succeeded. 

51 """ 

52 from lintro.ai.models import AIFixSuggestion as _Suggestion 

53 

54 # Build a "reverse" suggestion: swap original and suggested code 

55 reverse = _Suggestion( 

56 file=suggestion.file, 

57 line=suggestion.line, 

58 code=suggestion.code, 

59 original_code=suggestion.suggested_code, 

60 suggested_code=suggestion.original_code, 

61 ) 

62 applied = apply_fixes( 

63 [reverse], 

64 workspace_root=workspace_root, 

65 auto_apply=True, 

66 search_radius=search_radius, 

67 ) 

68 return len(applied) > 0 

69 

70 

71def refine_unverified_fixes( 

72 *, 

73 applied_suggestions: list[AIFixSuggestion], 

74 validation: ValidationResult, 

75 provider: BaseAIProvider, 

76 ai_config: AIConfig, 

77 workspace_root: Path, 

78) -> tuple[list[AIFixSuggestion], float]: 

79 """Attempt one refinement round for unverified fixes. 

80 

81 For each unverified fix: 

82 1. Revert the original suggestion 

83 2. Generate a refined fix using the refinement prompt 

84 3. Apply the refined fix 

85 

86 Args: 

87 applied_suggestions: All suggestions that were applied. 

88 validation: Validation result identifying unverified fixes. 

89 provider: AI provider instance. 

90 ai_config: AI configuration. 

91 workspace_root: Workspace root path. 

92 

93 Returns: 

94 Tuple of (list of successfully refined suggestions, total cost). 

95 

96 Raises: 

97 KeyboardInterrupt: Re-raised immediately. 

98 SystemExit: Re-raised immediately. 

99 """ 

100 # Identify unverified suggestions from validation details. 

101 # Detail format: "[code] file:line - issue still present" 

102 detail_re = re.compile(r"^\[([^\]]+)\]\s+(.+?):(\d+)\s+[-\u2014]\s+") 

103 unverified_keys: set[tuple[str, str, int]] = set() 

104 for detail in validation.details: 

105 if "issue still present" not in detail: 

106 continue 

107 m = detail_re.match(detail) 

108 if m: 

109 unverified_keys.add((m.group(2), m.group(1), int(m.group(3)))) 

110 else: 

111 logger.debug( 

112 "Skipping unparseable validation detail: {} (no regex match)", 

113 detail, 

114 ) 

115 

116 if not unverified_keys: 

117 return [], 0.0 

118 

119 bound_call = functools.partial( 

120 _call_provider, 

121 fallback_models=ai_config.fallback_models or [], 

122 ) 

123 retrying_call = with_retry( 

124 max_retries=ai_config.max_retries, 

125 base_delay=ai_config.retry_base_delay, 

126 max_delay=ai_config.retry_max_delay, 

127 backoff_factor=ai_config.retry_backoff_factor, 

128 )(bound_call) 

129 

130 refined: list[AIFixSuggestion] = [] 

131 total_cost = 0.0 

132 

133 for suggestion in applied_suggestions: 

134 if (suggestion.file, suggestion.code, suggestion.line) not in unverified_keys: 

135 continue 

136 

137 logger.debug( 

138 f"Refinement: reverting {suggestion.file}:{suggestion.line} " 

139 f"[{suggestion.code}]", 

140 ) 

141 

142 # Step 1: Revert the fix 

143 if not _revert_fix( 

144 suggestion, 

145 workspace_root, 

146 search_radius=ai_config.fix_search_radius, 

147 ): 

148 logger.debug( 

149 f"Refinement: revert failed for " 

150 f"{suggestion.file}:{suggestion.line}", 

151 ) 

152 continue 

153 

154 # Validate the file path is within the workspace root 

155 resolved_path = resolve_workspace_file(suggestion.file, workspace_root) 

156 if resolved_path is None: 

157 logger.debug( 

158 f"Refinement: file {suggestion.file} is outside " 

159 f"workspace root {workspace_root}, skipping", 

160 ) 

161 continue 

162 

163 # Step 2: Read current file content and build refinement prompt 

164 file_content = read_file_safely(str(resolved_path)) 

165 if file_content is None: 

166 continue 

167 

168 context, context_start, context_end = extract_context( 

169 file_content, 

170 suggestion.line, 

171 context_lines=ai_config.context_lines, 

172 ) 

173 

174 previous_suggestion = redact_secrets( 

175 sanitize_code_content( 

176 f"original_code: {suggestion.original_code}\n" 

177 f"suggested_code: {suggestion.suggested_code}\n" 

178 f"explanation: {suggestion.explanation}", 

179 ), 

180 ) 

181 

182 # Find the matching validation detail for the error message 

183 error_detail = "" 

184 code_tag = f"[{suggestion.code}]" 

185 line_tag = f":{suggestion.line} " 

186 file_tag = suggestion.file 

187 for detail in validation.details: 

188 if code_tag in detail and line_tag in detail and file_tag in detail: 

189 error_detail = detail 

190 break 

191 

192 boundary = make_boundary_marker() 

193 safe_file = sanitize_code_content( 

194 to_provider_path(suggestion.file, workspace_root), 

195 ) 

196 safe_error = redact_secrets( 

197 sanitize_code_content( 

198 error_detail or "Issue still present after fix", 

199 ), 

200 ) 

201 prompt = REFINEMENT_PROMPT_TEMPLATE.format( 

202 tool_name=sanitize_code_content(suggestion.tool_name or "unknown"), 

203 code=suggestion.code, 

204 file=safe_file, 

205 line=suggestion.line, 

206 previous_suggestion=previous_suggestion, 

207 new_error=safe_error, 

208 context_start=context_start, 

209 context_end=context_end, 

210 code_context=redact_secrets(sanitize_code_content(context)), 

211 boundary=boundary, 

212 ) 

213 

214 # Step 3: Generate refined fix 

215 try: 

216 response = retrying_call( 

217 provider, 

218 prompt, 

219 FIX_SYSTEM, 

220 ai_config.max_tokens, 

221 ai_config.api_timeout, 

222 ) 

223 

224 refined_suggestion = parse_fix_response( 

225 response.content, 

226 suggestion.file, 

227 suggestion.line, 

228 suggestion.code, 

229 ) 

230 

231 if refined_suggestion is None: 

232 logger.debug( 

233 f"Refinement: no valid suggestion for " 

234 f"{suggestion.file}:{suggestion.line}", 

235 ) 

236 continue 

237 

238 refined_suggestion.tool_name = suggestion.tool_name 

239 refined_suggestion.input_tokens = response.input_tokens 

240 refined_suggestion.output_tokens = response.output_tokens 

241 refined_suggestion.cost_estimate = response.cost_estimate 

242 total_cost += response.cost_estimate 

243 

244 # Step 4: Apply the refined fix 

245 applied = apply_fixes( 

246 [refined_suggestion], 

247 workspace_root=workspace_root, 

248 auto_apply=True, 

249 search_radius=ai_config.fix_search_radius, 

250 ) 

251 if applied: 

252 refined.extend(applied) 

253 logger.debug( 

254 f"Refinement: applied refined fix for " 

255 f"{suggestion.file}:{suggestion.line}", 

256 ) 

257 else: 

258 logger.debug( 

259 f"Refinement: refined fix failed to apply for " 

260 f"{suggestion.file}:{suggestion.line}", 

261 ) 

262 

263 except (KeyboardInterrupt, SystemExit): 

264 raise 

265 except Exception as exc: 

266 logger.debug( 

267 f"Refinement failed for {suggestion.file}:{suggestion.line} " 

268 f"({type(exc).__name__}: {exc})", 

269 exc_info=True, 

270 ) 

271 

272 return refined, total_cost