Coverage for lintro / ai / orchestrator.py: 73%

153 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""AI orchestration for check/fix actions. 

2 

3Thin coordinator that delegates to pipeline and rerun services. 

4""" 

5 

6from __future__ import annotations 

7 

8import dataclasses 

9import os 

10from pathlib import Path 

11from typing import TYPE_CHECKING 

12 

13from loguru import logger as loguru_logger 

14 

15from lintro.ai import require_ai 

16from lintro.ai.budget import CostBudget 

17from lintro.ai.display import render_summary, render_summary_annotations 

18from lintro.ai.display.shared import is_github_actions 

19from lintro.ai.filters import filter_issues 

20from lintro.ai.integrations.github_pr import GitHubPRReporter 

21from lintro.ai.metadata import attach_summary_metadata 

22from lintro.ai.models import AIResult 

23from lintro.ai.paths import resolve_workspace_file, resolve_workspace_root 

24from lintro.ai.pipeline import run_fix_pipeline 

25from lintro.ai.providers import get_provider 

26from lintro.ai.summary import generate_summary 

27from lintro.enums.action import Action 

28from lintro.enums.output_format import OutputFormat 

29 

30if TYPE_CHECKING: 

31 from lintro.ai.config import AIConfig 

32 from lintro.ai.models.fix_suggestion import AIFixSuggestion 

33 from lintro.ai.models.summary import AISummary 

34 from lintro.ai.providers.base import BaseAIProvider 

35 from lintro.config.lintro_config import LintroConfig 

36 from lintro.models.core.tool_result import ToolResult 

37 from lintro.parsers.base_issue import BaseIssue 

38 from lintro.utils.console.logger import ThreadSafeConsoleLogger 

39 

40 

41def run_ai_enhancement( 

42 *, 

43 action: Action, 

44 all_results: list[ToolResult], 

45 lintro_config: LintroConfig, 

46 logger: ThreadSafeConsoleLogger, 

47 output_format: str, 

48 ai_fix: bool = False, 

49) -> AIResult: 

50 """Run AI-powered enhancement for check/fix actions. 

51 

52 Args: 

53 action: The action being performed (CHECK or FIX). 

54 all_results: Tool results from the linting run. 

55 lintro_config: Full lintro configuration. 

56 logger: Thread-safe console logger. 

57 output_format: Output format (e.g. "terminal", "json"). 

58 ai_fix: Whether to generate AI fix suggestions. 

59 

60 Returns: 

61 AIResult with structured outcome data for exit code decisions. 

62 

63 Raises: 

64 KeyboardInterrupt: Re-raised immediately. 

65 SystemExit: Re-raised immediately. 

66 Exception: Re-raised when ``fail_on_ai_error`` is True. 

67 """ 

68 try: 

69 require_ai() 

70 

71 ai_config = lintro_config.ai 

72 workspace_root = resolve_workspace_root(lintro_config.config_path) 

73 provider = get_provider(ai_config) 

74 is_json = output_format.lower() == OutputFormat.JSON 

75 

76 # P5-4: Verbose — log provider, model, and workspace at info level 

77 if ai_config.verbose: 

78 loguru_logger.info( 

79 f"AI: provider={ai_config.provider.value}, " 

80 f"model={ai_config.model or 'default'}, " 

81 f"workspace_root={workspace_root}", 

82 ) 

83 

84 if action == Action.CHECK: 

85 return _run_ai_check( 

86 all_results=all_results, 

87 provider=provider, 

88 ai_config=ai_config, 

89 logger=logger, 

90 is_json=is_json, 

91 ai_fix=ai_fix, 

92 workspace_root=workspace_root, 

93 output_format=output_format, 

94 ) 

95 elif action == Action.FIX: 

96 return _run_ai_fix( 

97 all_results=all_results, 

98 provider=provider, 

99 ai_config=ai_config, 

100 logger=logger, 

101 is_json=is_json, 

102 workspace_root=workspace_root, 

103 ) 

104 return AIResult() 

105 except (KeyboardInterrupt, SystemExit): 

106 raise 

107 except Exception as e: 

108 if getattr(lintro_config.ai, "fail_on_ai_error", False): 

109 raise 

110 loguru_logger.opt(exception=e).debug( 

111 f"AI enhancement failed ({type(e).__name__}): {e}", 

112 ) 

113 is_json = output_format.lower() == OutputFormat.JSON 

114 if not is_json: 

115 logger.console_output( 

116 f" AI: enhancement unavailable ({type(e).__name__})", 

117 ) 

118 return AIResult(error=True) 

119 

120 

121def _run_ai_check( 

122 *, 

123 all_results: list[ToolResult], 

124 provider: BaseAIProvider, 

125 ai_config: AIConfig, 

126 logger: ThreadSafeConsoleLogger, 

127 is_json: bool, 

128 ai_fix: bool, 

129 workspace_root: Path, 

130 output_format: str = "auto", 

131) -> AIResult: 

132 """Run AI summary and optional AI fix suggestions for check action.""" 

133 budget = CostBudget(max_cost_usd=ai_config.max_cost_usd) 

134 

135 summary = generate_summary( 

136 all_results, 

137 provider, 

138 max_tokens=ai_config.max_tokens, 

139 workspace_root=workspace_root, 

140 timeout=ai_config.api_timeout, 

141 max_retries=ai_config.max_retries, 

142 base_delay=ai_config.retry_base_delay, 

143 max_delay=ai_config.retry_max_delay, 

144 backoff_factor=ai_config.retry_backoff_factor, 

145 fallback_models=ai_config.fallback_models, 

146 ) 

147 if summary and not is_json: 

148 output = render_summary( 

149 summary, 

150 show_cost=ai_config.show_cost_estimate, 

151 output_format=output_format, 

152 ) 

153 if output: 

154 logger.console_output(output) 

155 # Emit GitHub Actions annotations for summary insights 

156 if is_github_actions(): 

157 annotations = render_summary_annotations(summary) 

158 if annotations: 

159 logger.console_output(annotations) 

160 

161 if summary: 

162 budget.record(summary.cost_estimate) 

163 for result in all_results: 

164 if result.issues and not result.skipped: 

165 attach_summary_metadata(result, summary) 

166 

167 # Post summary as PR comment when enabled 

168 if summary and ai_config.github_pr_comments: 

169 _post_pr_comments( 

170 summary=summary, 

171 logger=logger, 

172 workspace_root=workspace_root, 

173 is_json=is_json, 

174 ) 

175 

176 if not ai_fix: 

177 return AIResult() 

178 

179 all_fix_issues: list[tuple[ToolResult, BaseIssue]] = [] 

180 for result in all_results: 

181 loguru_logger.debug( 

182 f"AI fix (chk): {result.name} " 

183 f"issues={len(result.issues) if result.issues else 0}", 

184 ) 

185 if not result.issues or result.skipped: 

186 continue 

187 filtered = filter_issues(list(result.issues), ai_config) 

188 for issue in filtered: 

189 if not issue.file: 

190 continue 

191 resolved = _resolve_issue_path( 

192 file=issue.file, 

193 workspace_root=workspace_root, 

194 cwd=result.cwd, 

195 ) 

196 if resolved is not None: 

197 # Shallow-copy to avoid mutating the shared issue on 

198 # ToolResult.issues (which downstream renderers read). 

199 fix_issue = dataclasses.replace(issue, file=str(resolved)) 

200 all_fix_issues.append((result, fix_issue)) 

201 

202 return _collect_and_fix( 

203 fix_issues=all_fix_issues, 

204 provider=provider, 

205 ai_config=ai_config, 

206 logger=logger, 

207 is_json=is_json, 

208 workspace_root=workspace_root, 

209 budget=budget, 

210 ) 

211 

212 

213def _run_ai_fix( 

214 *, 

215 all_results: list[ToolResult], 

216 provider: BaseAIProvider, 

217 ai_config: AIConfig, 

218 logger: ThreadSafeConsoleLogger, 

219 is_json: bool, 

220 workspace_root: Path, 

221) -> AIResult: 

222 """Run AI fix suggestions for format action.""" 

223 budget = CostBudget(max_cost_usd=ai_config.max_cost_usd) 

224 

225 all_fix_issues: list[tuple[ToolResult, BaseIssue]] = [] 

226 for result in all_results: 

227 loguru_logger.debug( 

228 f"AI: {result.name} skipped={result.skipped} " 

229 f"issues={type(result.issues).__name__} " 

230 f"len={len(result.issues) if result.issues else 0} " 

231 f"remaining={result.remaining_issues_count}", 

232 ) 

233 if result.skipped: 

234 continue 

235 remaining_issues = _remaining_issues_for_fix_result(result) 

236 if not remaining_issues: 

237 continue 

238 remaining_issues = filter_issues(remaining_issues, ai_config) 

239 for issue in remaining_issues: 

240 if not issue.file: 

241 continue 

242 resolved = _resolve_issue_path( 

243 file=issue.file, 

244 workspace_root=workspace_root, 

245 cwd=result.cwd, 

246 ) 

247 if resolved is not None: 

248 fix_issue = dataclasses.replace(issue, file=str(resolved)) 

249 all_fix_issues.append((result, fix_issue)) 

250 

251 return _collect_and_fix( 

252 fix_issues=all_fix_issues, 

253 provider=provider, 

254 ai_config=ai_config, 

255 logger=logger, 

256 is_json=is_json, 

257 workspace_root=workspace_root, 

258 budget=budget, 

259 ) 

260 

261 

262def _collect_and_fix( 

263 *, 

264 fix_issues: list[tuple[ToolResult, BaseIssue]], 

265 provider: BaseAIProvider, 

266 ai_config: AIConfig, 

267 logger: ThreadSafeConsoleLogger, 

268 is_json: bool, 

269 workspace_root: Path, 

270 budget: CostBudget, 

271) -> AIResult: 

272 """Run the fix pipeline and build an AIResult. 

273 

274 Shared by ``_run_ai_check`` and ``_run_ai_fix``. 

275 """ 

276 fixes_applied = 0 

277 fixes_failed = 0 

278 fix_suggestions: list[AIFixSuggestion] = [] 

279 if fix_issues: 

280 fixes_applied, fixes_failed, fix_suggestions = run_fix_pipeline( 

281 fix_issues=fix_issues, 

282 provider=provider, 

283 ai_config=ai_config, 

284 logger=logger, 

285 output_format=OutputFormat.JSON if is_json else OutputFormat.PLAIN, 

286 workspace_root=workspace_root, 

287 budget=budget, 

288 ) 

289 

290 if not is_json: 

291 _log_fix_limit_message( 

292 logger=logger, 

293 total_issues=len(fix_issues), 

294 max_fix_attempts=ai_config.max_fix_attempts, 

295 ) 

296 

297 if fix_suggestions and ai_config.github_pr_comments: 

298 _post_pr_comments( 

299 suggestions=fix_suggestions, 

300 logger=logger, 

301 workspace_root=workspace_root, 

302 is_json=is_json, 

303 ) 

304 

305 unfixed = len(fix_issues) - fixes_applied 

306 return AIResult( 

307 fixes_applied=fixes_applied, 

308 fixes_failed=fixes_failed, 

309 unfixed_issues=max(0, unfixed), 

310 budget_exceeded=( 

311 budget.remaining == 0.0 if budget.remaining is not None else False 

312 ), 

313 ) 

314 

315 

316def _remaining_issues_for_fix_result(result: ToolResult) -> list[BaseIssue]: 

317 """Return only issues still remaining after native fix step. 

318 

319 In format mode, many tools include both initially detected and remaining 

320 issues in ``result.issues``. AI fix generation should only analyze the 

321 remaining tail to avoid stale suggestions that cannot apply. 

322 """ 

323 if not result.issues: 

324 return [] 

325 

326 issues = list(result.issues) 

327 remaining_count = result.remaining_issues_count 

328 

329 if remaining_count is None: 

330 return issues 

331 if remaining_count <= 0: 

332 return [] 

333 if remaining_count > len(issues): 

334 loguru_logger.warning( 

335 f"remaining_issues_count ({remaining_count}) exceeds " 

336 f"issues length ({len(issues)}); clamping to {len(issues)}", 

337 ) 

338 remaining_count = len(issues) 

339 if remaining_count >= len(issues): 

340 return issues 

341 

342 # Convention: the remaining issues occupy the tail of the list. 

343 # Tools append all detected issues in order, so the last N are remaining. 

344 loguru_logger.debug( 

345 f"Tail-slicing {remaining_count} remaining issues from {len(issues)} total", 

346 ) 

347 return issues[-remaining_count:] 

348 

349 

350def _resolve_issue_path( 

351 *, 

352 file: str, 

353 workspace_root: Path, 

354 cwd: str | None, 

355) -> Path | None: 

356 """Resolve an issue file path to an absolute workspace-local path. 

357 

358 Returns the resolved path if valid, or ``None`` if the path is 

359 outside the workspace root or does not exist on disk. 

360 """ 

361 candidate = file 

362 if cwd and not os.path.isabs(candidate): 

363 candidate = os.path.join(cwd, candidate) 

364 

365 resolved = resolve_workspace_file(candidate, workspace_root) 

366 if resolved is None: 

367 loguru_logger.debug( 

368 f"Skipping issue outside workspace root: " 

369 f"file={candidate!r} root={workspace_root}", 

370 ) 

371 return None 

372 

373 if not resolved.is_file(): 

374 loguru_logger.debug( 

375 f"Skipping non-existent file: {resolved}", 

376 ) 

377 return None 

378 

379 return resolved 

380 

381 

382def _post_pr_comments( 

383 *, 

384 summary: AISummary | None = None, 

385 suggestions: list[AIFixSuggestion] | None = None, 

386 logger: ThreadSafeConsoleLogger, 

387 workspace_root: Path | None = None, 

388 is_json: bool = False, 

389) -> None: 

390 """Post AI findings as GitHub PR review comments. 

391 

392 Logs a warning and continues gracefully on failure. 

393 

394 Args: 

395 summary: Optional AI summary. 

396 suggestions: Optional fix suggestions. 

397 logger: Console logger. 

398 workspace_root: Workspace root for repo-relative paths. 

399 is_json: Whether output is JSON/SARIF (suppresses plain text). 

400 """ 

401 reporter = GitHubPRReporter(workspace_root=workspace_root) 

402 if not reporter.is_available(): 

403 loguru_logger.debug( 

404 "GitHub PR reporter not available — missing token, repo, or PR number", 

405 ) 

406 return 

407 success = reporter.post_review_comments( 

408 suggestions=suggestions or [], 

409 summary=summary, 

410 ) 

411 if success: 

412 loguru_logger.debug("GitHub PR review comments posted successfully") 

413 elif not is_json: 

414 logger.console_output(" AI: failed to post some PR review comments") 

415 else: 

416 loguru_logger.debug("GitHub PR review comments partially failed") 

417 

418 

419def _log_fix_limit_message( 

420 *, 

421 logger: ThreadSafeConsoleLogger, 

422 total_issues: int, 

423 max_fix_attempts: int, 

424) -> None: 

425 """Log a message when some issues were skipped due to the fix limit.""" 

426 if total_issues <= max_fix_attempts: 

427 return 

428 skipped = total_issues - max_fix_attempts 

429 logger.console_output( 

430 f"\n AI: analyzed {max_fix_attempts} of " 

431 f"{total_issues} issues " 

432 f"({skipped} skipped due to limit)\n" 

433 f" Increase ai.max_fix_attempts in .lintro-config.yaml " 

434 f"to analyze more", 

435 )