Coverage for lintro / ai / pipeline.py: 73%

212 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""AI fix pipeline: generate, classify, apply, validate, post-summary.""" 

2 

3from __future__ import annotations 

4 

5import sys 

6from pathlib import Path 

7from typing import TYPE_CHECKING 

8 

9from loguru import logger as loguru_logger 

10 

11from lintro.ai.apply import apply_fixes 

12from lintro.ai.audit import write_audit_log 

13from lintro.ai.budget import CostBudget 

14from lintro.ai.display import render_fixes, render_summary, render_validation 

15from lintro.ai.enums import ConfidenceLevel 

16from lintro.ai.fix import generate_fixes_from_params 

17from lintro.ai.fix_params import FixGenParams 

18from lintro.ai.interactive import review_fixes_interactive 

19from lintro.ai.metadata import ( 

20 attach_fix_suggestions_metadata, 

21 attach_fixed_count_metadata, 

22 attach_telemetry_metadata, 

23 attach_validation_counts_metadata, 

24) 

25from lintro.ai.refinement import refine_unverified_fixes 

26from lintro.ai.risk import is_safe_style_fix 

27from lintro.ai.summary import generate_post_fix_summary 

28from lintro.ai.telemetry import AITelemetry 

29from lintro.ai.undo import save_undo_patch 

30from lintro.ai.validation import ValidationResult, validate_applied_fixes, verify_fixes 

31from lintro.enums.output_format import OutputFormat 

32 

33if TYPE_CHECKING: 

34 from lintro.ai.config import AIConfig 

35 from lintro.ai.models import AIFixSuggestion 

36 from lintro.ai.providers.base import BaseAIProvider 

37 from lintro.models.core.tool_result import ToolResult 

38 from lintro.parsers.base_issue import BaseIssue 

39 from lintro.utils.console.logger import ThreadSafeConsoleLogger 

40 

41 

42def _confidence_numeric(value: ConfidenceLevel | str) -> int: 

43 """Return numeric ordering for a confidence value (3=high, 2=medium, 1=low).""" 

44 try: 

45 return ConfidenceLevel(value).numeric_order 

46 except ValueError: 

47 return 1 

48 

49 

50def _generate_all_suggestions( 

51 by_tool: dict[str, tuple[ToolResult, list[BaseIssue]]], 

52 provider: BaseAIProvider, 

53 ai_config: AIConfig, 

54 logger: ThreadSafeConsoleLogger, 

55 workspace_root: Path, 

56 is_json: bool, 

57 telemetry: AITelemetry, 

58 budget: CostBudget | None, 

59) -> list[AIFixSuggestion]: 

60 """Iterate tools, generate fix suggestions, and collect telemetry.""" 

61 all_suggestions: list[AIFixSuggestion] = [] 

62 remaining_budget = ai_config.max_fix_attempts 

63 

64 total_fix_issues = sum(len(issues) for _, issues in by_tool.values()) 

65 if not is_json and total_fix_issues > 0: 

66 logger.console_output( 

67 f" AI: generating fixes for {total_fix_issues} issues...", 

68 ) 

69 

70 def _progress_callback(completed: int, total: int) -> None: 

71 if not is_json: 

72 logger.console_output( 

73 f" AI: generating fixes... {completed}/{total}", 

74 ) 

75 

76 for tool_name, (result, issues) in by_tool.items(): 

77 if remaining_budget <= 0: 

78 break 

79 if not issues: 

80 continue 

81 if budget is not None: 

82 budget.check() 

83 

84 loguru_logger.debug( 

85 f"AI fix: {tool_name} has {len(issues)} issues, " 

86 f"budget={remaining_budget}", 

87 ) 

88 

89 fix_params = FixGenParams( 

90 tool_name=tool_name, 

91 workspace_root=workspace_root, 

92 max_issues=remaining_budget, 

93 max_workers=ai_config.max_parallel_calls, 

94 max_tokens=ai_config.max_tokens, 

95 max_retries=ai_config.max_retries, 

96 timeout=ai_config.api_timeout, 

97 context_lines=ai_config.context_lines, 

98 base_delay=ai_config.retry_base_delay, 

99 max_delay=ai_config.retry_max_delay, 

100 backoff_factor=ai_config.retry_backoff_factor, 

101 max_prompt_tokens=ai_config.max_prompt_tokens, 

102 enable_cache=ai_config.enable_cache, 

103 cache_ttl=ai_config.cache_ttl, 

104 cache_max_entries=ai_config.cache_max_entries, 

105 progress_callback=_progress_callback, 

106 fallback_models=ai_config.fallback_models, 

107 sanitize_mode=ai_config.sanitize_mode, 

108 ) 

109 suggestions = generate_fixes_from_params(issues, provider, fix_params) 

110 for suggestion in suggestions: 

111 if not suggestion.tool_name: 

112 suggestion.tool_name = tool_name 

113 remaining_budget -= len(issues[:remaining_budget]) 

114 

115 if ai_config.verbose: 

116 loguru_logger.info( 

117 f"AI fix: {tool_name} generated {len(suggestions)} suggestions", 

118 ) 

119 

120 # Count API calls by unique token footprint (batch = 1 call, not N). 

121 # Cache hits (0 tokens) are not counted as API calls. 

122 seen_calls: set[tuple[int, int]] = set() 

123 for s in suggestions: 

124 telemetry.total_input_tokens += s.input_tokens 

125 telemetry.total_output_tokens += s.output_tokens 

126 telemetry.total_cost_usd += s.cost_estimate 

127 if s.input_tokens > 0 or s.output_tokens > 0: 

128 seen_calls.add((s.input_tokens, s.output_tokens)) 

129 telemetry.total_api_calls += len(seen_calls) 

130 

131 if budget is not None: 

132 budget.record(sum(s.cost_estimate for s in suggestions)) 

133 

134 if ai_config.verbose: 

135 tool_cost = sum(s.cost_estimate for s in suggestions) 

136 loguru_logger.info( 

137 f"AI fix: {tool_name} cost=${tool_cost:.6f}, " 

138 f"cumulative=${telemetry.total_cost_usd:.6f}", 

139 ) 

140 

141 all_suggestions.extend(suggestions) 

142 

143 if suggestions: 

144 attach_fix_suggestions_metadata(result, suggestions) 

145 

146 return all_suggestions 

147 

148 

149def _filter_by_confidence( 

150 all_suggestions: list[AIFixSuggestion], 

151 ai_config: AIConfig, 

152) -> list[AIFixSuggestion]: 

153 """Apply confidence threshold filter and log diagnostics.""" 

154 threshold = _confidence_numeric(ai_config.min_confidence) 

155 filtered = [ 

156 s for s in all_suggestions if _confidence_numeric(s.confidence) >= threshold 

157 ] 

158 

159 if ai_config.verbose and filtered: 

160 confidence_counts: dict[str, int] = {} 

161 risk_counts: dict[str, int] = {} 

162 for s in filtered: 

163 confidence_counts[s.confidence] = confidence_counts.get(s.confidence, 0) + 1 

164 risk_label = s.risk_level or "unclassified" 

165 risk_counts[risk_label] = risk_counts.get(risk_label, 0) + 1 

166 loguru_logger.info( 

167 f"AI fix: confidence breakdown: {confidence_counts}", 

168 ) 

169 loguru_logger.info( 

170 f"AI fix: risk breakdown: {risk_counts}", 

171 ) 

172 

173 return filtered 

174 

175 

176def _apply_or_review( 

177 all_suggestions: list[AIFixSuggestion], 

178 ai_config: AIConfig, 

179 logger: ThreadSafeConsoleLogger, 

180 is_json: bool, 

181 workspace_root: Path, 

182) -> tuple[int, int, list[AIFixSuggestion]]: 

183 """Auto-apply safe fixes, then auto-apply or interactively review the rest. 

184 

185 Returns (applied, rejected, applied_suggestions). 

186 """ 

187 applied = 0 

188 rejected = 0 

189 applied_suggestions: list[AIFixSuggestion] = [] 

190 safe_suggestions = [s for s in all_suggestions if is_safe_style_fix(s)] 

191 risky_suggestions = [s for s in all_suggestions if not is_safe_style_fix(s)] 

192 safe_failed = 0 

193 safe_fast_path_applied = False 

194 

195 # Fast path: auto-apply deterministic style-only fixes when non-interactive. 

196 if ( 

197 ai_config.auto_apply_safe_fixes 

198 and safe_suggestions 

199 and (is_json or not sys.stdin.isatty()) 

200 ): 

201 safe_fast_path_applied = True 

202 save_undo_patch(safe_suggestions, workspace_root) 

203 applied_safe = apply_fixes( 

204 safe_suggestions, 

205 workspace_root=workspace_root, 

206 auto_apply=True, 

207 search_radius=ai_config.fix_search_radius, 

208 ) 

209 applied_suggestions.extend(applied_safe) 

210 applied += len(applied_safe) 

211 safe_failed = len(safe_suggestions) - len(applied_safe) 

212 if not is_json: 

213 msg = ( 

214 f" AI: auto-applied {len(applied_safe)}/{len(safe_suggestions)}" 

215 f" safe-style fixes" 

216 ) 

217 if safe_failed: 

218 msg += f" ({safe_failed} failed)" 

219 logger.console_output(msg) 

220 

221 if ai_config.auto_apply: 

222 auto_apply_candidates = ( 

223 risky_suggestions if safe_fast_path_applied else all_suggestions 

224 ) 

225 save_undo_patch(auto_apply_candidates, workspace_root) 

226 auto_applied = apply_fixes( 

227 auto_apply_candidates, 

228 workspace_root=workspace_root, 

229 auto_apply=True, 

230 search_radius=ai_config.fix_search_radius, 

231 ) 

232 applied_suggestions.extend(auto_applied) 

233 applied += len(auto_applied) 

234 rejected = len(auto_apply_candidates) - len(auto_applied) + safe_failed 

235 if not is_json: 

236 logger.console_output( 

237 f" AI: auto-applied {applied}/{len(all_suggestions)} fixes", 

238 ) 

239 elif not is_json: 

240 review_candidates = ( 

241 risky_suggestions if safe_fast_path_applied else all_suggestions 

242 ) 

243 save_undo_patch(review_candidates, workspace_root) 

244 accepted_count, rejected_count, interactive_applied = review_fixes_interactive( 

245 review_candidates, 

246 validate_after_group=ai_config.validate_after_group, 

247 workspace_root=workspace_root, 

248 search_radius=ai_config.fix_search_radius, 

249 ) 

250 applied += accepted_count 

251 rejected += rejected_count + safe_failed 

252 applied_suggestions.extend(interactive_applied) 

253 

254 return applied, rejected, applied_suggestions 

255 

256 

257def _verify_and_refine( 

258 applied_suggestions: list[AIFixSuggestion], 

259 by_tool: dict[str, tuple[ToolResult, list[BaseIssue]]], 

260 provider: BaseAIProvider, 

261 ai_config: AIConfig, 

262 logger: ThreadSafeConsoleLogger, 

263 is_json: bool, 

264 workspace_root: Path, 

265 telemetry: AITelemetry, 

266 budget: CostBudget | None, 

267) -> ValidationResult | None: 

268 """Verify applied fixes and attempt refinement for unverified ones. 

269 

270 Returns the ValidationResult (or None). 

271 """ 

272 validation = verify_fixes( 

273 applied_suggestions=applied_suggestions, 

274 by_tool=by_tool, 

275 ) 

276 if not is_json and validation and (validation.verified or validation.unverified): 

277 val_output = render_validation(validation) 

278 if val_output: 

279 logger.console_output(val_output) 

280 

281 if ( 

282 validation 

283 and validation.unverified > 0 

284 and ai_config.max_refinement_attempts >= 1 

285 ): 

286 refined, refinement_cost = refine_unverified_fixes( 

287 applied_suggestions=applied_suggestions, 

288 validation=validation, 

289 provider=provider, 

290 ai_config=ai_config, 

291 workspace_root=workspace_root, 

292 ) 

293 if refined: 

294 telemetry.total_api_calls += len(refined) 

295 for s in refined: 

296 telemetry.total_input_tokens += s.input_tokens 

297 telemetry.total_output_tokens += s.output_tokens 

298 telemetry.total_cost_usd += s.cost_estimate 

299 if budget is not None: 

300 budget.record(refinement_cost) 

301 

302 re_validation = validate_applied_fixes(refined) 

303 if re_validation: 

304 validation.verified += re_validation.verified 

305 validation.unverified -= re_validation.verified 

306 # Merge per-tool counters from refinement re-validation 

307 for tool, count in re_validation.verified_by_tool.items(): 

308 validation.verified_by_tool[tool] = ( 

309 validation.verified_by_tool.get(tool, 0) + count 

310 ) 

311 # Decrease unverified for the same tool accordingly 

312 if tool in validation.unverified_by_tool: 

313 validation.unverified_by_tool[tool] = max( 

314 0, 

315 validation.unverified_by_tool[tool] - count, 

316 ) 

317 if not is_json and re_validation.verified: 

318 logger.console_output( 

319 f" AI: refinement verified " 

320 f"{re_validation.verified} additional fix(es)", 

321 ) 

322 if ai_config.verbose: 

323 loguru_logger.info( 

324 f"AI fix: refinement cost=${refinement_cost:.6f}", 

325 ) 

326 

327 return validation 

328 

329 

330def run_fix_pipeline( 

331 *, 

332 fix_issues: list[tuple[ToolResult, BaseIssue]], 

333 provider: BaseAIProvider, 

334 ai_config: AIConfig, 

335 logger: ThreadSafeConsoleLogger, 

336 output_format: str, 

337 workspace_root: Path, 

338 budget: CostBudget | None = None, 

339) -> tuple[int, int, list[AIFixSuggestion]]: 

340 """Generate and optionally apply AI fix suggestions across all tools. 

341 

342 This is the main fix pipeline that: 

343 1. Groups issues by tool and generates fix suggestions 

344 2. Classifies fixes as safe-style or behavioral-risk 

345 3. Auto-applies safe fixes in non-interactive mode 

346 4. Presents risky fixes for interactive review 

347 5. Re-runs tools to verify fixes 

348 6. Generates post-fix summary 

349 

350 Args: 

351 fix_issues: List of (tool_result, issue) pairs to fix. 

352 provider: AI provider instance. 

353 ai_config: AI configuration. 

354 logger: Console logger for output. 

355 output_format: Output format string. 

356 workspace_root: Workspace root path. 

357 budget: Optional cost budget tracker. 

358 

359 Returns: 

360 Tuple of (fixes_applied, fixes_failed). 

361 """ 

362 telemetry = AITelemetry() 

363 

364 by_tool: dict[str, tuple[ToolResult, list[BaseIssue]]] = {} 

365 for result, issue in fix_issues: 

366 if result.name not in by_tool: 

367 by_tool[result.name] = (result, []) 

368 by_tool[result.name][1].append(issue) 

369 

370 is_json = output_format.lower() == OutputFormat.JSON 

371 

372 # Step 1: Generate suggestions across all tools 

373 all_suggestions = _generate_all_suggestions( 

374 by_tool, 

375 provider, 

376 ai_config, 

377 logger, 

378 workspace_root, 

379 is_json, 

380 telemetry, 

381 budget, 

382 ) 

383 

384 # Step 2: Apply confidence threshold filter 

385 total_generated = len(all_suggestions) 

386 all_suggestions = _filter_by_confidence(all_suggestions, ai_config) 

387 filtered_out = total_generated - len(all_suggestions) 

388 

389 if not all_suggestions: 

390 telemetry.successful_fixes = 0 

391 telemetry.failed_fixes = total_generated 

392 write_audit_log(workspace_root, [], 0, telemetry.total_cost_usd) 

393 attach_telemetry_metadata( 

394 [r for r, _ in by_tool.values()], 

395 telemetry, 

396 ) 

397 return (0, 0, []) 

398 

399 # Dry-run mode: display fixes but do not apply them 

400 if ai_config.dry_run: 

401 if not is_json: 

402 rendered = render_fixes( 

403 all_suggestions, 

404 show_cost=ai_config.show_cost_estimate, 

405 ) 

406 if rendered: 

407 logger.console_output(rendered) 

408 logger.console_output( 

409 " AI: dry-run mode — fixes displayed but not applied", 

410 ) 

411 return (0, 0, all_suggestions) 

412 

413 # Step 3: Apply or review fixes 

414 applied, rejected, applied_suggestions = _apply_or_review( 

415 all_suggestions, 

416 ai_config, 

417 logger, 

418 is_json, 

419 workspace_root, 

420 ) 

421 

422 telemetry.successful_fixes = applied 

423 telemetry.failed_fixes = (len(all_suggestions) - applied) + filtered_out 

424 

425 # Step 4: Verify and refine applied fixes 

426 validation = None 

427 if applied_suggestions: 

428 validation = _verify_and_refine( 

429 applied_suggestions, 

430 by_tool, 

431 provider, 

432 ai_config, 

433 logger, 

434 is_json, 

435 workspace_root, 

436 telemetry, 

437 budget, 

438 ) 

439 

440 # Attach metadata to tool results 

441 applied_by_tool: dict[str, int] = {} 

442 for suggestion in applied_suggestions: 

443 if not suggestion.tool_name: 

444 continue 

445 applied_by_tool[suggestion.tool_name] = ( 

446 applied_by_tool.get(suggestion.tool_name, 0) + 1 

447 ) 

448 for tool_name, (result, _issues) in by_tool.items(): 

449 attach_fixed_count_metadata( 

450 result=result, 

451 fixed_count=applied_by_tool.get(tool_name, 0), 

452 ) 

453 attach_validation_counts_metadata( 

454 result, 

455 verified_count=( 

456 validation.verified_by_tool.get(tool_name, 0) if validation else 0 

457 ), 

458 unverified_count=( 

459 validation.unverified_by_tool.get(tool_name, 0) if validation else 0 

460 ), 

461 ) 

462 

463 # Generate post-fix summary — only when validation confirms outcomes 

464 if (applied > 0 or rejected > 0) and not is_json and validation is not None: 

465 applied_for_summary = applied 

466 if validation.verified or validation.unverified: 

467 applied_for_summary = validation.verified 

468 

469 # When fixes were applied, by_tool already holds de-duplicated 

470 # per-tool results (one ToolResult per tool). When no fixes were 

471 # applied, we derive unique results from the original fix_issues 

472 # because by_tool may be empty or stale. 

473 if applied_suggestions: 

474 unique_results: list[ToolResult] | None = [ 

475 result for result, _ in by_tool.values() 

476 ] 

477 else: 

478 unique_results = _unique_results_from_fix_issues(fix_issues) 

479 

480 if unique_results: 

481 post_summary = generate_post_fix_summary( 

482 applied=applied_for_summary, 

483 rejected=rejected, 

484 remaining_results=unique_results, 

485 provider=provider, 

486 max_tokens=ai_config.max_tokens, 

487 workspace_root=workspace_root, 

488 timeout=ai_config.api_timeout, 

489 max_retries=ai_config.max_retries, 

490 base_delay=ai_config.retry_base_delay, 

491 max_delay=ai_config.retry_max_delay, 

492 backoff_factor=ai_config.retry_backoff_factor, 

493 fallback_models=ai_config.fallback_models, 

494 ) 

495 if post_summary: 

496 output = render_summary( 

497 post_summary, 

498 show_cost=ai_config.show_cost_estimate, 

499 ) 

500 if output: 

501 logger.console_output(output) 

502 

503 write_audit_log( 

504 workspace_root, 

505 applied_suggestions, 

506 rejected, 

507 telemetry.total_cost_usd, 

508 ) 

509 attach_telemetry_metadata( 

510 [r for r, _ in by_tool.values()], 

511 telemetry, 

512 ) 

513 

514 return (applied, len(all_suggestions) - applied, applied_suggestions) 

515 

516 

517def _unique_results_from_fix_issues( 

518 fix_issues: list[tuple[ToolResult, BaseIssue]], 

519) -> list[ToolResult]: 

520 """Return unique tool results preserving first-seen order.""" 

521 remaining_results = [result for result, _ in fix_issues] 

522 seen: set[str] = set() 

523 unique_results: list[ToolResult] = [] 

524 for result in remaining_results: 

525 if result.name in seen: 

526 continue 

527 seen.add(result.name) 

528 unique_results.append(result) 

529 return unique_results