Coverage for lintro / ai / orchestrator.py: 73%
153 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""AI orchestration for check/fix actions.
3Thin coordinator that delegates to pipeline and rerun services.
4"""
6from __future__ import annotations
8import dataclasses
9import os
10from pathlib import Path
11from typing import TYPE_CHECKING
13from loguru import logger as loguru_logger
15from lintro.ai import require_ai
16from lintro.ai.budget import CostBudget
17from lintro.ai.display import render_summary, render_summary_annotations
18from lintro.ai.display.shared import is_github_actions
19from lintro.ai.filters import filter_issues
20from lintro.ai.integrations.github_pr import GitHubPRReporter
21from lintro.ai.metadata import attach_summary_metadata
22from lintro.ai.models import AIResult
23from lintro.ai.paths import resolve_workspace_file, resolve_workspace_root
24from lintro.ai.pipeline import run_fix_pipeline
25from lintro.ai.providers import get_provider
26from lintro.ai.summary import generate_summary
27from lintro.enums.action import Action
28from lintro.enums.output_format import OutputFormat
30if TYPE_CHECKING:
31 from lintro.ai.config import AIConfig
32 from lintro.ai.models.fix_suggestion import AIFixSuggestion
33 from lintro.ai.models.summary import AISummary
34 from lintro.ai.providers.base import BaseAIProvider
35 from lintro.config.lintro_config import LintroConfig
36 from lintro.models.core.tool_result import ToolResult
37 from lintro.parsers.base_issue import BaseIssue
38 from lintro.utils.console.logger import ThreadSafeConsoleLogger
41def run_ai_enhancement(
42 *,
43 action: Action,
44 all_results: list[ToolResult],
45 lintro_config: LintroConfig,
46 logger: ThreadSafeConsoleLogger,
47 output_format: str,
48 ai_fix: bool = False,
49) -> AIResult:
50 """Run AI-powered enhancement for check/fix actions.
52 Args:
53 action: The action being performed (CHECK or FIX).
54 all_results: Tool results from the linting run.
55 lintro_config: Full lintro configuration.
56 logger: Thread-safe console logger.
57 output_format: Output format (e.g. "terminal", "json").
58 ai_fix: Whether to generate AI fix suggestions.
60 Returns:
61 AIResult with structured outcome data for exit code decisions.
63 Raises:
64 KeyboardInterrupt: Re-raised immediately.
65 SystemExit: Re-raised immediately.
66 Exception: Re-raised when ``fail_on_ai_error`` is True.
67 """
68 try:
69 require_ai()
71 ai_config = lintro_config.ai
72 workspace_root = resolve_workspace_root(lintro_config.config_path)
73 provider = get_provider(ai_config)
74 is_json = output_format.lower() == OutputFormat.JSON
76 # P5-4: Verbose — log provider, model, and workspace at info level
77 if ai_config.verbose:
78 loguru_logger.info(
79 f"AI: provider={ai_config.provider.value}, "
80 f"model={ai_config.model or 'default'}, "
81 f"workspace_root={workspace_root}",
82 )
84 if action == Action.CHECK:
85 return _run_ai_check(
86 all_results=all_results,
87 provider=provider,
88 ai_config=ai_config,
89 logger=logger,
90 is_json=is_json,
91 ai_fix=ai_fix,
92 workspace_root=workspace_root,
93 output_format=output_format,
94 )
95 elif action == Action.FIX:
96 return _run_ai_fix(
97 all_results=all_results,
98 provider=provider,
99 ai_config=ai_config,
100 logger=logger,
101 is_json=is_json,
102 workspace_root=workspace_root,
103 )
104 return AIResult()
105 except (KeyboardInterrupt, SystemExit):
106 raise
107 except Exception as e:
108 if getattr(lintro_config.ai, "fail_on_ai_error", False):
109 raise
110 loguru_logger.opt(exception=e).debug(
111 f"AI enhancement failed ({type(e).__name__}): {e}",
112 )
113 is_json = output_format.lower() == OutputFormat.JSON
114 if not is_json:
115 logger.console_output(
116 f" AI: enhancement unavailable ({type(e).__name__})",
117 )
118 return AIResult(error=True)
121def _run_ai_check(
122 *,
123 all_results: list[ToolResult],
124 provider: BaseAIProvider,
125 ai_config: AIConfig,
126 logger: ThreadSafeConsoleLogger,
127 is_json: bool,
128 ai_fix: bool,
129 workspace_root: Path,
130 output_format: str = "auto",
131) -> AIResult:
132 """Run AI summary and optional AI fix suggestions for check action."""
133 budget = CostBudget(max_cost_usd=ai_config.max_cost_usd)
135 summary = generate_summary(
136 all_results,
137 provider,
138 max_tokens=ai_config.max_tokens,
139 workspace_root=workspace_root,
140 timeout=ai_config.api_timeout,
141 max_retries=ai_config.max_retries,
142 base_delay=ai_config.retry_base_delay,
143 max_delay=ai_config.retry_max_delay,
144 backoff_factor=ai_config.retry_backoff_factor,
145 fallback_models=ai_config.fallback_models,
146 )
147 if summary and not is_json:
148 output = render_summary(
149 summary,
150 show_cost=ai_config.show_cost_estimate,
151 output_format=output_format,
152 )
153 if output:
154 logger.console_output(output)
155 # Emit GitHub Actions annotations for summary insights
156 if is_github_actions():
157 annotations = render_summary_annotations(summary)
158 if annotations:
159 logger.console_output(annotations)
161 if summary:
162 budget.record(summary.cost_estimate)
163 for result in all_results:
164 if result.issues and not result.skipped:
165 attach_summary_metadata(result, summary)
167 # Post summary as PR comment when enabled
168 if summary and ai_config.github_pr_comments:
169 _post_pr_comments(
170 summary=summary,
171 logger=logger,
172 workspace_root=workspace_root,
173 is_json=is_json,
174 )
176 if not ai_fix:
177 return AIResult()
179 all_fix_issues: list[tuple[ToolResult, BaseIssue]] = []
180 for result in all_results:
181 loguru_logger.debug(
182 f"AI fix (chk): {result.name} "
183 f"issues={len(result.issues) if result.issues else 0}",
184 )
185 if not result.issues or result.skipped:
186 continue
187 filtered = filter_issues(list(result.issues), ai_config)
188 for issue in filtered:
189 if not issue.file:
190 continue
191 resolved = _resolve_issue_path(
192 file=issue.file,
193 workspace_root=workspace_root,
194 cwd=result.cwd,
195 )
196 if resolved is not None:
197 # Shallow-copy to avoid mutating the shared issue on
198 # ToolResult.issues (which downstream renderers read).
199 fix_issue = dataclasses.replace(issue, file=str(resolved))
200 all_fix_issues.append((result, fix_issue))
202 return _collect_and_fix(
203 fix_issues=all_fix_issues,
204 provider=provider,
205 ai_config=ai_config,
206 logger=logger,
207 is_json=is_json,
208 workspace_root=workspace_root,
209 budget=budget,
210 )
213def _run_ai_fix(
214 *,
215 all_results: list[ToolResult],
216 provider: BaseAIProvider,
217 ai_config: AIConfig,
218 logger: ThreadSafeConsoleLogger,
219 is_json: bool,
220 workspace_root: Path,
221) -> AIResult:
222 """Run AI fix suggestions for format action."""
223 budget = CostBudget(max_cost_usd=ai_config.max_cost_usd)
225 all_fix_issues: list[tuple[ToolResult, BaseIssue]] = []
226 for result in all_results:
227 loguru_logger.debug(
228 f"AI: {result.name} skipped={result.skipped} "
229 f"issues={type(result.issues).__name__} "
230 f"len={len(result.issues) if result.issues else 0} "
231 f"remaining={result.remaining_issues_count}",
232 )
233 if result.skipped:
234 continue
235 remaining_issues = _remaining_issues_for_fix_result(result)
236 if not remaining_issues:
237 continue
238 remaining_issues = filter_issues(remaining_issues, ai_config)
239 for issue in remaining_issues:
240 if not issue.file:
241 continue
242 resolved = _resolve_issue_path(
243 file=issue.file,
244 workspace_root=workspace_root,
245 cwd=result.cwd,
246 )
247 if resolved is not None:
248 fix_issue = dataclasses.replace(issue, file=str(resolved))
249 all_fix_issues.append((result, fix_issue))
251 return _collect_and_fix(
252 fix_issues=all_fix_issues,
253 provider=provider,
254 ai_config=ai_config,
255 logger=logger,
256 is_json=is_json,
257 workspace_root=workspace_root,
258 budget=budget,
259 )
262def _collect_and_fix(
263 *,
264 fix_issues: list[tuple[ToolResult, BaseIssue]],
265 provider: BaseAIProvider,
266 ai_config: AIConfig,
267 logger: ThreadSafeConsoleLogger,
268 is_json: bool,
269 workspace_root: Path,
270 budget: CostBudget,
271) -> AIResult:
272 """Run the fix pipeline and build an AIResult.
274 Shared by ``_run_ai_check`` and ``_run_ai_fix``.
275 """
276 fixes_applied = 0
277 fixes_failed = 0
278 fix_suggestions: list[AIFixSuggestion] = []
279 if fix_issues:
280 fixes_applied, fixes_failed, fix_suggestions = run_fix_pipeline(
281 fix_issues=fix_issues,
282 provider=provider,
283 ai_config=ai_config,
284 logger=logger,
285 output_format=OutputFormat.JSON if is_json else OutputFormat.PLAIN,
286 workspace_root=workspace_root,
287 budget=budget,
288 )
290 if not is_json:
291 _log_fix_limit_message(
292 logger=logger,
293 total_issues=len(fix_issues),
294 max_fix_attempts=ai_config.max_fix_attempts,
295 )
297 if fix_suggestions and ai_config.github_pr_comments:
298 _post_pr_comments(
299 suggestions=fix_suggestions,
300 logger=logger,
301 workspace_root=workspace_root,
302 is_json=is_json,
303 )
305 unfixed = len(fix_issues) - fixes_applied
306 return AIResult(
307 fixes_applied=fixes_applied,
308 fixes_failed=fixes_failed,
309 unfixed_issues=max(0, unfixed),
310 budget_exceeded=(
311 budget.remaining == 0.0 if budget.remaining is not None else False
312 ),
313 )
316def _remaining_issues_for_fix_result(result: ToolResult) -> list[BaseIssue]:
317 """Return only issues still remaining after native fix step.
319 In format mode, many tools include both initially detected and remaining
320 issues in ``result.issues``. AI fix generation should only analyze the
321 remaining tail to avoid stale suggestions that cannot apply.
322 """
323 if not result.issues:
324 return []
326 issues = list(result.issues)
327 remaining_count = result.remaining_issues_count
329 if remaining_count is None:
330 return issues
331 if remaining_count <= 0:
332 return []
333 if remaining_count > len(issues):
334 loguru_logger.warning(
335 f"remaining_issues_count ({remaining_count}) exceeds "
336 f"issues length ({len(issues)}); clamping to {len(issues)}",
337 )
338 remaining_count = len(issues)
339 if remaining_count >= len(issues):
340 return issues
342 # Convention: the remaining issues occupy the tail of the list.
343 # Tools append all detected issues in order, so the last N are remaining.
344 loguru_logger.debug(
345 f"Tail-slicing {remaining_count} remaining issues from {len(issues)} total",
346 )
347 return issues[-remaining_count:]
350def _resolve_issue_path(
351 *,
352 file: str,
353 workspace_root: Path,
354 cwd: str | None,
355) -> Path | None:
356 """Resolve an issue file path to an absolute workspace-local path.
358 Returns the resolved path if valid, or ``None`` if the path is
359 outside the workspace root or does not exist on disk.
360 """
361 candidate = file
362 if cwd and not os.path.isabs(candidate):
363 candidate = os.path.join(cwd, candidate)
365 resolved = resolve_workspace_file(candidate, workspace_root)
366 if resolved is None:
367 loguru_logger.debug(
368 f"Skipping issue outside workspace root: "
369 f"file={candidate!r} root={workspace_root}",
370 )
371 return None
373 if not resolved.is_file():
374 loguru_logger.debug(
375 f"Skipping non-existent file: {resolved}",
376 )
377 return None
379 return resolved
382def _post_pr_comments(
383 *,
384 summary: AISummary | None = None,
385 suggestions: list[AIFixSuggestion] | None = None,
386 logger: ThreadSafeConsoleLogger,
387 workspace_root: Path | None = None,
388 is_json: bool = False,
389) -> None:
390 """Post AI findings as GitHub PR review comments.
392 Logs a warning and continues gracefully on failure.
394 Args:
395 summary: Optional AI summary.
396 suggestions: Optional fix suggestions.
397 logger: Console logger.
398 workspace_root: Workspace root for repo-relative paths.
399 is_json: Whether output is JSON/SARIF (suppresses plain text).
400 """
401 reporter = GitHubPRReporter(workspace_root=workspace_root)
402 if not reporter.is_available():
403 loguru_logger.debug(
404 "GitHub PR reporter not available — missing token, repo, or PR number",
405 )
406 return
407 success = reporter.post_review_comments(
408 suggestions=suggestions or [],
409 summary=summary,
410 )
411 if success:
412 loguru_logger.debug("GitHub PR review comments posted successfully")
413 elif not is_json:
414 logger.console_output(" AI: failed to post some PR review comments")
415 else:
416 loguru_logger.debug("GitHub PR review comments partially failed")
419def _log_fix_limit_message(
420 *,
421 logger: ThreadSafeConsoleLogger,
422 total_issues: int,
423 max_fix_attempts: int,
424) -> None:
425 """Log a message when some issues were skipped due to the fix limit."""
426 if total_issues <= max_fix_attempts:
427 return
428 skipped = total_issues - max_fix_attempts
429 logger.console_output(
430 f"\n AI: analyzed {max_fix_attempts} of "
431 f"{total_issues} issues "
432 f"({skipped} skipped due to limit)\n"
433 f" Increase ai.max_fix_attempts in .lintro-config.yaml "
434 f"to analyze more",
435 )