Coverage for lintro / ai / pipeline.py: 73%
212 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""AI fix pipeline: generate, classify, apply, validate, post-summary."""
3from __future__ import annotations
5import sys
6from pathlib import Path
7from typing import TYPE_CHECKING
9from loguru import logger as loguru_logger
11from lintro.ai.apply import apply_fixes
12from lintro.ai.audit import write_audit_log
13from lintro.ai.budget import CostBudget
14from lintro.ai.display import render_fixes, render_summary, render_validation
15from lintro.ai.enums import ConfidenceLevel
16from lintro.ai.fix import generate_fixes_from_params
17from lintro.ai.fix_params import FixGenParams
18from lintro.ai.interactive import review_fixes_interactive
19from lintro.ai.metadata import (
20 attach_fix_suggestions_metadata,
21 attach_fixed_count_metadata,
22 attach_telemetry_metadata,
23 attach_validation_counts_metadata,
24)
25from lintro.ai.refinement import refine_unverified_fixes
26from lintro.ai.risk import is_safe_style_fix
27from lintro.ai.summary import generate_post_fix_summary
28from lintro.ai.telemetry import AITelemetry
29from lintro.ai.undo import save_undo_patch
30from lintro.ai.validation import ValidationResult, validate_applied_fixes, verify_fixes
31from lintro.enums.output_format import OutputFormat
33if TYPE_CHECKING:
34 from lintro.ai.config import AIConfig
35 from lintro.ai.models import AIFixSuggestion
36 from lintro.ai.providers.base import BaseAIProvider
37 from lintro.models.core.tool_result import ToolResult
38 from lintro.parsers.base_issue import BaseIssue
39 from lintro.utils.console.logger import ThreadSafeConsoleLogger
42def _confidence_numeric(value: ConfidenceLevel | str) -> int:
43 """Return numeric ordering for a confidence value (3=high, 2=medium, 1=low)."""
44 try:
45 return ConfidenceLevel(value).numeric_order
46 except ValueError:
47 return 1
50def _generate_all_suggestions(
51 by_tool: dict[str, tuple[ToolResult, list[BaseIssue]]],
52 provider: BaseAIProvider,
53 ai_config: AIConfig,
54 logger: ThreadSafeConsoleLogger,
55 workspace_root: Path,
56 is_json: bool,
57 telemetry: AITelemetry,
58 budget: CostBudget | None,
59) -> list[AIFixSuggestion]:
60 """Iterate tools, generate fix suggestions, and collect telemetry."""
61 all_suggestions: list[AIFixSuggestion] = []
62 remaining_budget = ai_config.max_fix_attempts
64 total_fix_issues = sum(len(issues) for _, issues in by_tool.values())
65 if not is_json and total_fix_issues > 0:
66 logger.console_output(
67 f" AI: generating fixes for {total_fix_issues} issues...",
68 )
70 def _progress_callback(completed: int, total: int) -> None:
71 if not is_json:
72 logger.console_output(
73 f" AI: generating fixes... {completed}/{total}",
74 )
76 for tool_name, (result, issues) in by_tool.items():
77 if remaining_budget <= 0:
78 break
79 if not issues:
80 continue
81 if budget is not None:
82 budget.check()
84 loguru_logger.debug(
85 f"AI fix: {tool_name} has {len(issues)} issues, "
86 f"budget={remaining_budget}",
87 )
89 fix_params = FixGenParams(
90 tool_name=tool_name,
91 workspace_root=workspace_root,
92 max_issues=remaining_budget,
93 max_workers=ai_config.max_parallel_calls,
94 max_tokens=ai_config.max_tokens,
95 max_retries=ai_config.max_retries,
96 timeout=ai_config.api_timeout,
97 context_lines=ai_config.context_lines,
98 base_delay=ai_config.retry_base_delay,
99 max_delay=ai_config.retry_max_delay,
100 backoff_factor=ai_config.retry_backoff_factor,
101 max_prompt_tokens=ai_config.max_prompt_tokens,
102 enable_cache=ai_config.enable_cache,
103 cache_ttl=ai_config.cache_ttl,
104 cache_max_entries=ai_config.cache_max_entries,
105 progress_callback=_progress_callback,
106 fallback_models=ai_config.fallback_models,
107 sanitize_mode=ai_config.sanitize_mode,
108 )
109 suggestions = generate_fixes_from_params(issues, provider, fix_params)
110 for suggestion in suggestions:
111 if not suggestion.tool_name:
112 suggestion.tool_name = tool_name
113 remaining_budget -= len(issues[:remaining_budget])
115 if ai_config.verbose:
116 loguru_logger.info(
117 f"AI fix: {tool_name} generated {len(suggestions)} suggestions",
118 )
120 # Count API calls by unique token footprint (batch = 1 call, not N).
121 # Cache hits (0 tokens) are not counted as API calls.
122 seen_calls: set[tuple[int, int]] = set()
123 for s in suggestions:
124 telemetry.total_input_tokens += s.input_tokens
125 telemetry.total_output_tokens += s.output_tokens
126 telemetry.total_cost_usd += s.cost_estimate
127 if s.input_tokens > 0 or s.output_tokens > 0:
128 seen_calls.add((s.input_tokens, s.output_tokens))
129 telemetry.total_api_calls += len(seen_calls)
131 if budget is not None:
132 budget.record(sum(s.cost_estimate for s in suggestions))
134 if ai_config.verbose:
135 tool_cost = sum(s.cost_estimate for s in suggestions)
136 loguru_logger.info(
137 f"AI fix: {tool_name} cost=${tool_cost:.6f}, "
138 f"cumulative=${telemetry.total_cost_usd:.6f}",
139 )
141 all_suggestions.extend(suggestions)
143 if suggestions:
144 attach_fix_suggestions_metadata(result, suggestions)
146 return all_suggestions
149def _filter_by_confidence(
150 all_suggestions: list[AIFixSuggestion],
151 ai_config: AIConfig,
152) -> list[AIFixSuggestion]:
153 """Apply confidence threshold filter and log diagnostics."""
154 threshold = _confidence_numeric(ai_config.min_confidence)
155 filtered = [
156 s for s in all_suggestions if _confidence_numeric(s.confidence) >= threshold
157 ]
159 if ai_config.verbose and filtered:
160 confidence_counts: dict[str, int] = {}
161 risk_counts: dict[str, int] = {}
162 for s in filtered:
163 confidence_counts[s.confidence] = confidence_counts.get(s.confidence, 0) + 1
164 risk_label = s.risk_level or "unclassified"
165 risk_counts[risk_label] = risk_counts.get(risk_label, 0) + 1
166 loguru_logger.info(
167 f"AI fix: confidence breakdown: {confidence_counts}",
168 )
169 loguru_logger.info(
170 f"AI fix: risk breakdown: {risk_counts}",
171 )
173 return filtered
176def _apply_or_review(
177 all_suggestions: list[AIFixSuggestion],
178 ai_config: AIConfig,
179 logger: ThreadSafeConsoleLogger,
180 is_json: bool,
181 workspace_root: Path,
182) -> tuple[int, int, list[AIFixSuggestion]]:
183 """Auto-apply safe fixes, then auto-apply or interactively review the rest.
185 Returns (applied, rejected, applied_suggestions).
186 """
187 applied = 0
188 rejected = 0
189 applied_suggestions: list[AIFixSuggestion] = []
190 safe_suggestions = [s for s in all_suggestions if is_safe_style_fix(s)]
191 risky_suggestions = [s for s in all_suggestions if not is_safe_style_fix(s)]
192 safe_failed = 0
193 safe_fast_path_applied = False
195 # Fast path: auto-apply deterministic style-only fixes when non-interactive.
196 if (
197 ai_config.auto_apply_safe_fixes
198 and safe_suggestions
199 and (is_json or not sys.stdin.isatty())
200 ):
201 safe_fast_path_applied = True
202 save_undo_patch(safe_suggestions, workspace_root)
203 applied_safe = apply_fixes(
204 safe_suggestions,
205 workspace_root=workspace_root,
206 auto_apply=True,
207 search_radius=ai_config.fix_search_radius,
208 )
209 applied_suggestions.extend(applied_safe)
210 applied += len(applied_safe)
211 safe_failed = len(safe_suggestions) - len(applied_safe)
212 if not is_json:
213 msg = (
214 f" AI: auto-applied {len(applied_safe)}/{len(safe_suggestions)}"
215 f" safe-style fixes"
216 )
217 if safe_failed:
218 msg += f" ({safe_failed} failed)"
219 logger.console_output(msg)
221 if ai_config.auto_apply:
222 auto_apply_candidates = (
223 risky_suggestions if safe_fast_path_applied else all_suggestions
224 )
225 save_undo_patch(auto_apply_candidates, workspace_root)
226 auto_applied = apply_fixes(
227 auto_apply_candidates,
228 workspace_root=workspace_root,
229 auto_apply=True,
230 search_radius=ai_config.fix_search_radius,
231 )
232 applied_suggestions.extend(auto_applied)
233 applied += len(auto_applied)
234 rejected = len(auto_apply_candidates) - len(auto_applied) + safe_failed
235 if not is_json:
236 logger.console_output(
237 f" AI: auto-applied {applied}/{len(all_suggestions)} fixes",
238 )
239 elif not is_json:
240 review_candidates = (
241 risky_suggestions if safe_fast_path_applied else all_suggestions
242 )
243 save_undo_patch(review_candidates, workspace_root)
244 accepted_count, rejected_count, interactive_applied = review_fixes_interactive(
245 review_candidates,
246 validate_after_group=ai_config.validate_after_group,
247 workspace_root=workspace_root,
248 search_radius=ai_config.fix_search_radius,
249 )
250 applied += accepted_count
251 rejected += rejected_count + safe_failed
252 applied_suggestions.extend(interactive_applied)
254 return applied, rejected, applied_suggestions
257def _verify_and_refine(
258 applied_suggestions: list[AIFixSuggestion],
259 by_tool: dict[str, tuple[ToolResult, list[BaseIssue]]],
260 provider: BaseAIProvider,
261 ai_config: AIConfig,
262 logger: ThreadSafeConsoleLogger,
263 is_json: bool,
264 workspace_root: Path,
265 telemetry: AITelemetry,
266 budget: CostBudget | None,
267) -> ValidationResult | None:
268 """Verify applied fixes and attempt refinement for unverified ones.
270 Returns the ValidationResult (or None).
271 """
272 validation = verify_fixes(
273 applied_suggestions=applied_suggestions,
274 by_tool=by_tool,
275 )
276 if not is_json and validation and (validation.verified or validation.unverified):
277 val_output = render_validation(validation)
278 if val_output:
279 logger.console_output(val_output)
281 if (
282 validation
283 and validation.unverified > 0
284 and ai_config.max_refinement_attempts >= 1
285 ):
286 refined, refinement_cost = refine_unverified_fixes(
287 applied_suggestions=applied_suggestions,
288 validation=validation,
289 provider=provider,
290 ai_config=ai_config,
291 workspace_root=workspace_root,
292 )
293 if refined:
294 telemetry.total_api_calls += len(refined)
295 for s in refined:
296 telemetry.total_input_tokens += s.input_tokens
297 telemetry.total_output_tokens += s.output_tokens
298 telemetry.total_cost_usd += s.cost_estimate
299 if budget is not None:
300 budget.record(refinement_cost)
302 re_validation = validate_applied_fixes(refined)
303 if re_validation:
304 validation.verified += re_validation.verified
305 validation.unverified -= re_validation.verified
306 # Merge per-tool counters from refinement re-validation
307 for tool, count in re_validation.verified_by_tool.items():
308 validation.verified_by_tool[tool] = (
309 validation.verified_by_tool.get(tool, 0) + count
310 )
311 # Decrease unverified for the same tool accordingly
312 if tool in validation.unverified_by_tool:
313 validation.unverified_by_tool[tool] = max(
314 0,
315 validation.unverified_by_tool[tool] - count,
316 )
317 if not is_json and re_validation.verified:
318 logger.console_output(
319 f" AI: refinement verified "
320 f"{re_validation.verified} additional fix(es)",
321 )
322 if ai_config.verbose:
323 loguru_logger.info(
324 f"AI fix: refinement cost=${refinement_cost:.6f}",
325 )
327 return validation
330def run_fix_pipeline(
331 *,
332 fix_issues: list[tuple[ToolResult, BaseIssue]],
333 provider: BaseAIProvider,
334 ai_config: AIConfig,
335 logger: ThreadSafeConsoleLogger,
336 output_format: str,
337 workspace_root: Path,
338 budget: CostBudget | None = None,
339) -> tuple[int, int, list[AIFixSuggestion]]:
340 """Generate and optionally apply AI fix suggestions across all tools.
342 This is the main fix pipeline that:
343 1. Groups issues by tool and generates fix suggestions
344 2. Classifies fixes as safe-style or behavioral-risk
345 3. Auto-applies safe fixes in non-interactive mode
346 4. Presents risky fixes for interactive review
347 5. Re-runs tools to verify fixes
348 6. Generates post-fix summary
350 Args:
351 fix_issues: List of (tool_result, issue) pairs to fix.
352 provider: AI provider instance.
353 ai_config: AI configuration.
354 logger: Console logger for output.
355 output_format: Output format string.
356 workspace_root: Workspace root path.
357 budget: Optional cost budget tracker.
359 Returns:
360 Tuple of (fixes_applied, fixes_failed).
361 """
362 telemetry = AITelemetry()
364 by_tool: dict[str, tuple[ToolResult, list[BaseIssue]]] = {}
365 for result, issue in fix_issues:
366 if result.name not in by_tool:
367 by_tool[result.name] = (result, [])
368 by_tool[result.name][1].append(issue)
370 is_json = output_format.lower() == OutputFormat.JSON
372 # Step 1: Generate suggestions across all tools
373 all_suggestions = _generate_all_suggestions(
374 by_tool,
375 provider,
376 ai_config,
377 logger,
378 workspace_root,
379 is_json,
380 telemetry,
381 budget,
382 )
384 # Step 2: Apply confidence threshold filter
385 total_generated = len(all_suggestions)
386 all_suggestions = _filter_by_confidence(all_suggestions, ai_config)
387 filtered_out = total_generated - len(all_suggestions)
389 if not all_suggestions:
390 telemetry.successful_fixes = 0
391 telemetry.failed_fixes = total_generated
392 write_audit_log(workspace_root, [], 0, telemetry.total_cost_usd)
393 attach_telemetry_metadata(
394 [r for r, _ in by_tool.values()],
395 telemetry,
396 )
397 return (0, 0, [])
399 # Dry-run mode: display fixes but do not apply them
400 if ai_config.dry_run:
401 if not is_json:
402 rendered = render_fixes(
403 all_suggestions,
404 show_cost=ai_config.show_cost_estimate,
405 )
406 if rendered:
407 logger.console_output(rendered)
408 logger.console_output(
409 " AI: dry-run mode — fixes displayed but not applied",
410 )
411 return (0, 0, all_suggestions)
413 # Step 3: Apply or review fixes
414 applied, rejected, applied_suggestions = _apply_or_review(
415 all_suggestions,
416 ai_config,
417 logger,
418 is_json,
419 workspace_root,
420 )
422 telemetry.successful_fixes = applied
423 telemetry.failed_fixes = (len(all_suggestions) - applied) + filtered_out
425 # Step 4: Verify and refine applied fixes
426 validation = None
427 if applied_suggestions:
428 validation = _verify_and_refine(
429 applied_suggestions,
430 by_tool,
431 provider,
432 ai_config,
433 logger,
434 is_json,
435 workspace_root,
436 telemetry,
437 budget,
438 )
440 # Attach metadata to tool results
441 applied_by_tool: dict[str, int] = {}
442 for suggestion in applied_suggestions:
443 if not suggestion.tool_name:
444 continue
445 applied_by_tool[suggestion.tool_name] = (
446 applied_by_tool.get(suggestion.tool_name, 0) + 1
447 )
448 for tool_name, (result, _issues) in by_tool.items():
449 attach_fixed_count_metadata(
450 result=result,
451 fixed_count=applied_by_tool.get(tool_name, 0),
452 )
453 attach_validation_counts_metadata(
454 result,
455 verified_count=(
456 validation.verified_by_tool.get(tool_name, 0) if validation else 0
457 ),
458 unverified_count=(
459 validation.unverified_by_tool.get(tool_name, 0) if validation else 0
460 ),
461 )
463 # Generate post-fix summary — only when validation confirms outcomes
464 if (applied > 0 or rejected > 0) and not is_json and validation is not None:
465 applied_for_summary = applied
466 if validation.verified or validation.unverified:
467 applied_for_summary = validation.verified
469 # When fixes were applied, by_tool already holds de-duplicated
470 # per-tool results (one ToolResult per tool). When no fixes were
471 # applied, we derive unique results from the original fix_issues
472 # because by_tool may be empty or stale.
473 if applied_suggestions:
474 unique_results: list[ToolResult] | None = [
475 result for result, _ in by_tool.values()
476 ]
477 else:
478 unique_results = _unique_results_from_fix_issues(fix_issues)
480 if unique_results:
481 post_summary = generate_post_fix_summary(
482 applied=applied_for_summary,
483 rejected=rejected,
484 remaining_results=unique_results,
485 provider=provider,
486 max_tokens=ai_config.max_tokens,
487 workspace_root=workspace_root,
488 timeout=ai_config.api_timeout,
489 max_retries=ai_config.max_retries,
490 base_delay=ai_config.retry_base_delay,
491 max_delay=ai_config.retry_max_delay,
492 backoff_factor=ai_config.retry_backoff_factor,
493 fallback_models=ai_config.fallback_models,
494 )
495 if post_summary:
496 output = render_summary(
497 post_summary,
498 show_cost=ai_config.show_cost_estimate,
499 )
500 if output:
501 logger.console_output(output)
503 write_audit_log(
504 workspace_root,
505 applied_suggestions,
506 rejected,
507 telemetry.total_cost_usd,
508 )
509 attach_telemetry_metadata(
510 [r for r, _ in by_tool.values()],
511 telemetry,
512 )
514 return (applied, len(all_suggestions) - applied, applied_suggestions)
517def _unique_results_from_fix_issues(
518 fix_issues: list[tuple[ToolResult, BaseIssue]],
519) -> list[ToolResult]:
520 """Return unique tool results preserving first-seen order."""
521 remaining_results = [result for result, _ in fix_issues]
522 seen: set[str] = set()
523 unique_results: list[ToolResult] = []
524 for result in remaining_results:
525 if result.name in seen:
526 continue
527 seen.add(result.name)
528 unique_results.append(result)
529 return unique_results