Coverage for lintro / utils / tool_executor.py: 77%

298 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Helper functions for tool execution. 

2 

3Clean, straightforward approach using Loguru with rich formatting: 

41. OutputManager - handles structured output files only 

52. ThreadSafeConsoleLogger - handles console display with thread-safe message 

6 tracking for parallel execution 

73. No tee, no stream redirection, no complex state management 

8 

9Supports parallel execution when enabled via configuration. 

10""" 

11 

12from __future__ import annotations 

13 

14from typing import TYPE_CHECKING, Any 

15 

16from lintro.enums.action import Action, normalize_action 

17from lintro.models.core.tool_result import ToolResult 

18from lintro.tools import tool_manager 

19from lintro.utils.config import load_post_checks_config 

20from lintro.utils.execution.exit_codes import ( 

21 DEFAULT_EXIT_CODE_FAILURE, 

22 DEFAULT_EXIT_CODE_SUCCESS, 

23 DEFAULT_REMAINING_COUNT, 

24 aggregate_tool_results, 

25 determine_exit_code, 

26) 

27from lintro.utils.execution.parallel_executor import run_tools_parallel 

28from lintro.utils.execution.tool_configuration import ( 

29 configure_tool_for_execution, 

30 get_tool_display_name, 

31 get_tools_to_run, 

32) 

33from lintro.utils.output import OutputManager 

34from lintro.utils.post_checks import execute_post_checks 

35from lintro.utils.unified_config import UnifiedConfigManager 

36 

37if TYPE_CHECKING: 

38 from collections.abc import Callable 

39 

40 from lintro.plugins.base import BaseToolPlugin 

41 

42# Re-export constants for backwards compatibility 

43__all__ = [ 

44 "DEFAULT_EXIT_CODE_FAILURE", 

45 "DEFAULT_EXIT_CODE_SUCCESS", 

46 "DEFAULT_REMAINING_COUNT", 

47 "run_lint_tools_simple", 

48] 

49 

50 

51def _get_remaining_count(result: ToolResult) -> int: 

52 """Get remaining issue count from a ToolResult. 

53 

54 Falls back to issues_count when remaining_issues_count is not set, 

55 then to 0 if neither is available. 

56 

57 Args: 

58 result: The tool result to inspect. 

59 

60 Returns: 

61 int: Number of remaining issues. 

62 """ 

63 if result.remaining_issues_count is not None: 

64 return result.remaining_issues_count 

65 if result.issues_count is not None: 

66 return result.issues_count 

67 return 0 

68 

69 

70def _run_fix_with_retry( 

71 tool: BaseToolPlugin, 

72 paths: list[str], 

73 options: dict[str, object], 

74 max_retries: int, 

75) -> ToolResult: 

76 """Run tool.fix() with convergence retries. 

77 

78 Some formatters (e.g. prettier with proseWrap) are non-idempotent and 

79 need multiple write→verify cycles to stabilize. This function retries 

80 fix() up to ``max_retries`` times, keeping the initial issue count from 

81 the first pass and the remaining count from the last pass. 

82 

83 Args: 

84 tool: The tool plugin to execute. 

85 paths: List of file paths to process. 

86 options: Runtime options for the tool. 

87 max_retries: Maximum number of fix→verify cycles. 

88 

89 Returns: 

90 ToolResult: Merged result across all passes. 

91 """ 

92 from loguru import logger 

93 

94 result = tool.fix(paths, options) 

95 

96 if max_retries <= 1: 

97 return result 

98 

99 initial_issues_count = getattr(result, "initial_issues_count", None) 

100 first_pass_initial_issues = getattr(result, "initial_issues", None) 

101 remaining = _get_remaining_count(result) 

102 

103 for attempt in range(2, max_retries + 1): 

104 if remaining == 0: 

105 break 

106 

107 logger.debug( 

108 f"Fix retry {attempt}/{max_retries} for " 

109 f"{getattr(getattr(tool, 'definition', None), 'name', 'unknown')} " 

110 f"({remaining} remaining issues)", 

111 ) 

112 result = tool.fix(paths, options) 

113 remaining = _get_remaining_count(result) 

114 

115 # Merge: keep initial_issues_count and initial_issues from first pass, 

116 # rest from last pass 

117 if initial_issues_count is not None: 

118 fixed = max(0, initial_issues_count - remaining) 

119 result = ToolResult( 

120 name=result.name, 

121 success=result.success, 

122 output=result.output, 

123 issues_count=remaining, 

124 issues=result.issues, 

125 initial_issues_count=initial_issues_count, 

126 fixed_issues_count=fixed, 

127 remaining_issues_count=remaining, 

128 formatted_output=result.formatted_output, 

129 initial_issues=first_pass_initial_issues, 

130 cwd=result.cwd, 

131 ) 

132 elif first_pass_initial_issues is not None: 

133 # Preserve initial_issues even when initial_issues_count is absent 

134 fixed = max(0, len(first_pass_initial_issues) - remaining) 

135 result = ToolResult( 

136 name=result.name, 

137 success=result.success, 

138 output=result.output, 

139 issues_count=remaining, 

140 issues=result.issues, 

141 initial_issues_count=len(first_pass_initial_issues), 

142 fixed_issues_count=fixed, 

143 remaining_issues_count=remaining, 

144 formatted_output=result.formatted_output, 

145 initial_issues=first_pass_initial_issues, 

146 cwd=result.cwd, 

147 ) 

148 

149 return result 

150 

151 

152def _warn_ai_fix_disabled( 

153 *, 

154 action: Action, 

155 ai_fix: bool, 

156 ai_enabled: bool, 

157 logger: Any, 

158 output_format: str = "", 

159) -> None: 

160 """Warn when users request AI fixes but AI is disabled in config.""" 

161 if action != Action.CHECK or not ai_fix or ai_enabled: 

162 return 

163 # Suppress plain-text warnings for machine-readable output formats 

164 if output_format.lower() in ("json", "sarif"): 

165 return 

166 logger.console_output( 

167 "AI fixes requested with --fix, but ai.enabled is false in " 

168 ".lintro-config.yaml; skipping AI enhancements.", 

169 ) 

170 

171 

172def _display_fix_result( 

173 result: ToolResult, 

174 *, 

175 output_format: str, 

176 raw_output: bool, 

177 console_output_func: Callable[..., None], 

178 success_func: Callable[..., None], 

179 action: Action, 

180) -> None: 

181 """Display fix result with initial issue details when available. 

182 

183 When a tool fixes issues, this shows WHAT was fixed (via initial_issues) 

184 before showing the count summary. Falls back to the standard display 

185 when initial_issues is not populated. 

186 

187 Args: 

188 result: The tool result to display. 

189 output_format: Output format for formatting issues. 

190 raw_output: Whether to show raw tool output. 

191 console_output_func: Function to output text to console. 

192 success_func: Function to display success message. 

193 action: The action being performed. 

194 """ 

195 from lintro.utils.output import format_tool_output 

196 from lintro.utils.result_formatters import print_tool_result 

197 

198 # When in fix mode and initial_issues is populated and ALL issues 

199 # were fixed (remaining == 0), show what was fixed. When some issues 

200 # remain, the initial_issues list is misleading because not all of 

201 # them were actually resolved. 

202 remaining = getattr(result, "remaining_issues_count", None) 

203 if remaining is None: 

204 remaining = getattr(result, "issues_count", None) 

205 if ( 

206 action == Action.FIX 

207 and result.initial_issues 

208 and not raw_output 

209 and remaining == 0 

210 ): 

211 # Format the initial issues as a table 

212 issues_display = format_tool_output( 

213 tool_name=result.name, 

214 output="", 

215 output_format=output_format, 

216 issues=list(result.initial_issues), 

217 ) 

218 if issues_display and issues_display.strip(): 

219 console_output_func(text=issues_display) 

220 

221 # Show the count summary below the table 

222 print_tool_result( 

223 console_output_func=console_output_func, 

224 success_func=success_func, 

225 tool_name=result.name, 

226 output=result.output or "", 

227 issues_count=result.issues_count, 

228 raw_output_for_meta=result.output, 

229 action=action, 

230 success=result.success, 

231 ai_metadata=result.ai_metadata, 

232 ) 

233 return 

234 

235 # Standard display path (no initial_issues available) 

236 display_output: str | None = None 

237 if result.formatted_output: 

238 display_output = result.formatted_output 

239 elif result.issues or result.output: 

240 display_output = format_tool_output( 

241 tool_name=result.name, 

242 output=result.output or "", 

243 output_format=output_format, 

244 issues=list(result.issues) if result.issues else None, 

245 ) 

246 if result.output and raw_output: 

247 display_output = result.output 

248 

249 if display_output and display_output.strip(): 

250 print_tool_result( 

251 console_output_func=console_output_func, 

252 success_func=success_func, 

253 tool_name=result.name, 

254 output=display_output, 

255 issues_count=result.issues_count, 

256 raw_output_for_meta=result.output, 

257 action=action, 

258 success=result.success, 

259 ai_metadata=result.ai_metadata, 

260 ) 

261 elif ( 

262 result.issues_count == 0 

263 and result.success 

264 and not getattr(result, "fixed_issues_count", 0) 

265 ): 

266 print_tool_result( 

267 console_output_func=console_output_func, 

268 success_func=success_func, 

269 tool_name=result.name, 

270 output="", 

271 issues_count=0, 

272 action=action, 

273 success=result.success, 

274 ai_metadata=result.ai_metadata, 

275 ) 

276 

277 

278_ARTIFACT_EXTENSIONS: dict[str, str] = { 

279 "json": "results.json", 

280 "csv": "results.csv", 

281 "markdown": "results.md", 

282 "html": "results.html", 

283 "sarif": "results.sarif.json", 

284 "plain": "results.txt", 

285} 

286 

287 

288def _write_artifacts( 

289 all_results: list[ToolResult], 

290 lintro_config: Any, 

291 logger: Any, 

292 action: Action, 

293 total_issues: int, 

294 total_fixed: int, 

295 *, 

296 warn_func: Any = None, 

297) -> None: 

298 """Write side-channel artifact files alongside primary output. 

299 

300 Emits artifact files into ``.lintro/artifacts/<format>/`` for each 

301 format listed in ``execution.artifacts``. SARIF is also auto-emitted 

302 when ``GITHUB_ACTIONS=true`` is detected (for Code Scanning). 

303 

304 Supported formats match ``OutputFormat``: json, csv, markdown, 

305 html, sarif, plain. 

306 

307 Args: 

308 all_results: Completed tool results. 

309 lintro_config: Loaded LintroConfig instance. 

310 logger: Console logger for warning output. 

311 action: The action performed (check, fmt, test). 

312 total_issues: Total number of issues found. 

313 total_fixed: Total number of issues fixed. 

314 warn_func: Optional callback for emitting warnings. When ``None``, 

315 falls back to ``logger.console_output``. 

316 """ 

317 import os 

318 from pathlib import Path 

319 

320 from lintro.enums.output_format import normalize_output_format 

321 from lintro.utils.output.file_writer import write_output_file 

322 

323 artifacts: list[str] = [a.lower() for a in lintro_config.execution.artifacts] 

324 is_gha = os.environ.get("GITHUB_ACTIONS") == "true" 

325 

326 # Auto-emit SARIF in GitHub Actions for Code Scanning integration. 

327 if is_gha and "sarif" not in artifacts: 

328 artifacts.append("sarif") 

329 

330 if not artifacts: 

331 return 

332 

333 _emit = warn_func if warn_func is not None else logger.console_output 

334 

335 for artifact in artifacts: 

336 filename = _ARTIFACT_EXTENSIONS.get(artifact) 

337 if filename is None: 

338 _emit(f"Warning: Unknown artifact format '{artifact}', skipping") 

339 continue 

340 

341 artifact_path = Path(".lintro") / "artifacts" / artifact / filename 

342 try: 

343 fmt = normalize_output_format(artifact) 

344 write_output_file( 

345 output_path=str(artifact_path), 

346 output_format=fmt, 

347 all_results=all_results, 

348 action=action, 

349 total_issues=total_issues, 

350 total_fixed=total_fixed, 

351 ) 

352 except (OSError, ValueError, TypeError) as e: 

353 _emit(f"Warning: Failed to write {artifact} artifact: {e}") 

354 

355 

356def _enrich_issues_with_doc_urls( 

357 tool: BaseToolPlugin, 

358 result: ToolResult, 

359) -> None: 

360 """Populate doc_url on each issue using the plugin's doc_url method. 

361 

362 Skips issues that already have a doc_url set. 

363 

364 Args: 

365 tool: Plugin instance that may provide a doc_url method. 

366 result: ToolResult whose issues will be enriched in-place. 

367 """ 

368 if not hasattr(tool, "doc_url") or not result.issues: 

369 return 

370 for issue in result.issues: 

371 if getattr(issue, "doc_url", ""): 

372 continue 

373 # Resolve the code attribute via DISPLAY_FIELD_MAP so tools that 

374 # store their identifier under a different name (e.g. advisory_id, 

375 # vuln_id, rule_id) are handled correctly. 

376 field_map = getattr(issue, "DISPLAY_FIELD_MAP", {}) 

377 code_attr = field_map.get("code", "code") 

378 code = str(getattr(issue, code_attr, "") or "") 

379 if code: 

380 url = tool.doc_url(code) 

381 if url: 

382 issue.doc_url = url 

383 

384 

385def run_lint_tools_simple( 

386 *, 

387 action: str | Action, 

388 paths: list[str], 

389 tools: str | None, 

390 tool_options: str | None, 

391 exclude: str | None, 

392 include_venv: bool, 

393 group_by: str, 

394 output_format: str, 

395 verbose: bool, 

396 raw_output: bool = False, 

397 output_file: str | None = None, 

398 incremental: bool = False, 

399 debug: bool = False, 

400 stream: bool = False, 

401 no_log: bool = False, 

402 auto_install: bool = False, 

403 yes: bool = False, 

404 ai_fix: bool = False, 

405 ignore_conflicts: bool = False, 

406) -> int: 

407 """Simplified runner using Loguru-based logging with rich formatting. 

408 

409 Clean approach with beautiful output: 

410 - ThreadSafeConsoleLogger handles ALL console output with thread-safe 

411 message tracking 

412 - OutputManager handles structured output files 

413 - No tee, no complex state management 

414 

415 Args: 

416 action: Action to perform ("check", "fmt", "test"). 

417 paths: List of paths to check. 

418 tools: Comma-separated list of tools to run. 

419 tool_options: Additional tool options. 

420 exclude: Patterns to exclude. 

421 include_venv: Whether to include virtual environments. 

422 group_by: How to group results. 

423 output_format: Output format for results. 

424 verbose: Whether to enable verbose output. 

425 raw_output: Whether to show raw tool output instead of formatted output. 

426 output_file: Optional file path to write results to. 

427 incremental: Whether to only check files changed since last run. 

428 debug: Whether to show DEBUG messages on console. 

429 stream: Whether to stream output in real-time (not yet implemented). 

430 no_log: Whether to disable file logging (not yet implemented). 

431 auto_install: Whether to auto-install Node.js deps if node_modules missing. 

432 yes: Skip confirmation prompt and proceed immediately. 

433 ai_fix: Enable AI fix suggestions with interactive review (check only). 

434 ignore_conflicts: Whether to ignore tool configuration conflicts. 

435 

436 Returns: 

437 Exit code (0 for success, 1 for failures). 

438 

439 Raises: 

440 TypeError: If a programming error occurs during tool execution. 

441 AttributeError: If a programming error occurs during tool execution. 

442 Exception: Re-raised from AI hook when ``ai.fail_on_ai_error`` is enabled. 

443 """ 

444 # Normalize action to enum 

445 action = normalize_action(action) 

446 

447 # Initialize output manager for this run 

448 output_manager = OutputManager() 

449 

450 # Initialize Loguru logging (must happen before any logger.debug() calls) 

451 from lintro.utils.logger_setup import setup_execution_logging 

452 

453 setup_execution_logging(output_manager.run_dir, debug=debug) 

454 

455 # Create simplified logger with rich formatting 

456 from lintro.utils.console import create_logger 

457 

458 logger = create_logger(run_dir=output_manager.run_dir) 

459 

460 # Get tools to run (now returns ToolsToRunResult with skip info) 

461 try: 

462 tools_result = get_tools_to_run( 

463 tools, 

464 action, 

465 ignore_conflicts=ignore_conflicts, 

466 ) 

467 except ValueError as e: 

468 logger.console_output(f"Error: {e}") 

469 return 1 

470 

471 tools_to_run = tools_result.to_run 

472 skipped_tools = tools_result.skipped 

473 

474 if not tools_to_run and not skipped_tools: 

475 logger.console_output("No tools to run.") 

476 return 0 

477 

478 if not tools_to_run and skipped_tools: 

479 skipped_names = ", ".join(st.name for st in skipped_tools) 

480 logger.console_output( 

481 f"All tools were skipped ({len(skipped_tools)}): {skipped_names}", 

482 ) 

483 

484 # Load post-checks config early to exclude those tools from main phase 

485 post_cfg_early = load_post_checks_config() 

486 post_enabled_early = bool(post_cfg_early.get("enabled", False)) 

487 post_tools_early: set[str] = ( 

488 {t.lower() for t in (post_cfg_early.get("tools", []) or [])} 

489 if post_enabled_early 

490 else set() 

491 ) 

492 

493 # Filter out post-check tools from main phase 

494 if post_tools_early: 

495 tools_to_run = [t for t in tools_to_run if t.lower() not in post_tools_early] 

496 

497 # If early post-check filtering removed all tools from the main phase, 

498 # that's okay - post-checks will still run. Just log the situation. 

499 # Track this state so we can return failure if post-checks don't run. 

500 main_phase_empty_due_to_filter = bool(not tools_to_run and post_tools_early) 

501 if main_phase_empty_due_to_filter: 

502 logger.console_output( 

503 text=( 

504 "All selected tools are configured as post-checks - " 

505 "skipping main phase" 

506 ), 

507 ) 

508 

509 # Print main header with output directory information 

510 logger.print_lintro_header() 

511 

512 # Show incremental mode message 

513 if incremental: 

514 logger.console_output( 

515 text="Incremental mode: only checking files changed since last run", 

516 color="cyan", 

517 ) 

518 

519 # Execute tools and collect results 

520 all_results: list[ToolResult] = [] 

521 total_issues = 0 

522 total_fixed = 0 

523 total_remaining = 0 

524 

525 # Parse tool options once for all tools 

526 from lintro.utils.tool_options import parse_tool_options 

527 

528 tool_option_dict = parse_tool_options(tool_options) 

529 

530 # Create UnifiedConfigManager once before the loop 

531 config_manager = UnifiedConfigManager() 

532 

533 # Check if parallel execution is enabled 

534 from lintro.config.config_loader import get_config 

535 

536 lintro_config = get_config() 

537 use_parallel = lintro_config.execution.parallel and len(tools_to_run) > 1 

538 

539 # Determine auto_install: CLI flag > config > container default 

540 from lintro.utils.environment.container_detection import is_container_environment 

541 

542 is_container = is_container_environment() 

543 if auto_install: 

544 effective_auto_install = True 

545 elif lintro_config.execution.auto_install_deps is not None: 

546 effective_auto_install = lintro_config.execution.auto_install_deps 

547 else: 

548 effective_auto_install = is_container 

549 

550 # Pre-execution config summary (suppress in JSON mode) 

551 if output_format.lower() not in {"json", "sarif"} and ( 

552 tools_to_run or skipped_tools 

553 ): 

554 from lintro.utils.console.pre_execution_summary import ( 

555 print_pre_execution_summary, 

556 ) 

557 from lintro.utils.environment import detect_ci_environment 

558 

559 # Collect per-tool auto_install settings 

560 per_tool_auto: dict[str, bool | None] = {} 

561 for name in tools_to_run: 

562 tool_cfg = lintro_config.get_tool_config(name) 

563 if tool_cfg.auto_install is not None: 

564 per_tool_auto[name] = tool_cfg.auto_install 

565 

566 ci_env = detect_ci_environment() 

567 is_ci = ci_env is not None and ci_env.is_ci 

568 print_pre_execution_summary( 

569 tools_to_run=tools_to_run, 

570 skipped_tools=skipped_tools, 

571 effective_auto_install=effective_auto_install, 

572 is_container=is_container, 

573 is_ci=is_ci, 

574 per_tool_auto_install=per_tool_auto if per_tool_auto else None, 

575 ai_config=lintro_config.ai, 

576 ) 

577 

578 # Confirmation prompt — skip when non-interactive 

579 import sys 

580 

581 auto_continue = yes or is_ci or not sys.stdin.isatty() 

582 if not auto_continue: 

583 import click as _click 

584 

585 _click.echo("Proceed? [Y/n] ", nl=False) 

586 try: 

587 answer = _click.getchar() 

588 _click.echo(answer) # echo the keypress 

589 except (EOFError, KeyboardInterrupt): 

590 _click.echo() 

591 answer = "n" 

592 if answer.lower() == "n": 

593 logger.console_output(text="Aborted.", color="yellow") 

594 return int(DEFAULT_EXIT_CODE_SUCCESS) 

595 

596 # Define success_func once before the loop 

597 def success_func(message: str) -> None: 

598 logger.console_output(text=message, color="green") 

599 

600 # Use parallel execution if enabled 

601 if use_parallel: 

602 logger.console_output( 

603 text=f"Running {len(tools_to_run)} tools in parallel " 

604 f"(max {lintro_config.execution.max_workers} workers)", 

605 ) 

606 all_results = run_tools_parallel( 

607 tools_to_run=tools_to_run, 

608 paths=paths, 

609 action=action, 

610 config_manager=config_manager, 

611 tool_option_dict=tool_option_dict, 

612 exclude=exclude, 

613 include_venv=include_venv, 

614 post_tools=post_tools_early, 

615 max_workers=lintro_config.execution.max_workers, 

616 incremental=incremental, 

617 auto_install=effective_auto_install, 

618 max_fix_retries=lintro_config.execution.max_fix_retries, 

619 ) 

620 

621 # Enrich parallel results with doc_url from each plugin 

622 for result in all_results: 

623 try: 

624 tool = tool_manager.get_tool(result.name) 

625 _enrich_issues_with_doc_urls(tool, result) 

626 except (KeyError, ValueError): 

627 pass # Tool not found — skip enrichment 

628 

629 # Calculate totals from parallel results using helper 

630 total_issues, total_fixed, total_remaining = aggregate_tool_results( 

631 all_results, 

632 action, 

633 ) 

634 # Display results for parallel execution 

635 for result in all_results: 

636 # Print tool header like sequential mode does 

637 display_name = get_tool_display_name(result.name) 

638 logger.print_tool_header(tool_name=display_name, action=action) 

639 

640 _display_fix_result( 

641 result, 

642 output_format=output_format, 

643 raw_output=raw_output, 

644 console_output_func=logger.console_output, 

645 success_func=success_func, 

646 action=action, 

647 ) 

648 

649 else: 

650 # Sequential execution (original behavior) 

651 for tool_name in tools_to_run: 

652 try: 

653 tool = tool_manager.get_tool(tool_name) 

654 display_name = get_tool_display_name(tool_name) 

655 

656 # Print tool header before execution 

657 logger.print_tool_header(tool_name=display_name, action=action) 

658 

659 # Configure tool using shared helper 

660 configure_tool_for_execution( 

661 tool=tool, 

662 tool_name=tool_name, 

663 config_manager=config_manager, 

664 tool_option_dict=tool_option_dict, 

665 exclude=exclude, 

666 include_venv=include_venv, 

667 incremental=incremental, 

668 action=action, 

669 post_tools=post_tools_early, 

670 auto_install=effective_auto_install, 

671 lintro_config=lintro_config, 

672 ) 

673 

674 # Execute the tool 

675 if action == Action.FIX: 

676 result = _run_fix_with_retry( 

677 tool=tool, 

678 paths=paths, 

679 options={}, 

680 max_retries=lintro_config.execution.max_fix_retries, 

681 ) 

682 else: 

683 result = tool.check(paths, {}) 

684 

685 # Populate doc_url on each issue from the plugin 

686 _enrich_issues_with_doc_urls(tool, result) 

687 

688 all_results.append(result) 

689 

690 # Update totals 

691 total_issues += getattr(result, "issues_count", 0) 

692 if action == Action.FIX: 

693 fixed_count = getattr(result, "fixed_issues_count", None) 

694 total_fixed += fixed_count if fixed_count is not None else 0 

695 remaining_count = getattr(result, "remaining_issues_count", None) 

696 total_remaining += ( 

697 remaining_count if remaining_count is not None else 0 

698 ) 

699 

700 # Display the result (with initial issue details in fix mode) 

701 _display_fix_result( 

702 result, 

703 output_format=output_format, 

704 raw_output=raw_output, 

705 console_output_func=logger.console_output, 

706 success_func=success_func, 

707 action=action, 

708 ) 

709 

710 except (TypeError, AttributeError): 

711 # Programming errors should be re-raised for debugging 

712 from loguru import logger as loguru_logger 

713 

714 loguru_logger.exception(f"Programming error running {tool_name}") 

715 raise 

716 except (OSError, ValueError, RuntimeError) as e: 

717 from loguru import logger as loguru_logger 

718 

719 # Log full exception with traceback to debug.log via loguru 

720 loguru_logger.exception(f"Error running {tool_name}") 

721 # Show user-friendly error message on console 

722 logger.console_output(f"Error running {tool_name}: {e}") 

723 

724 # Create a failed result for this tool 

725 failed_result = ToolResult( 

726 name=tool_name, 

727 success=False, 

728 output=f"Failed to initialize tool: {e}", 

729 issues_count=0, 

730 ) 

731 all_results.append(failed_result) 

732 

733 # Add skipped tool results for display in summary table 

734 for st in skipped_tools: 

735 all_results.append( 

736 ToolResult( 

737 name=st.name, 

738 skipped=True, 

739 skip_reason=st.reason, 

740 issues_count=0, 

741 ), 

742 ) 

743 

744 # Execute post-checks if configured 

745 total_issues, total_fixed, total_remaining = execute_post_checks( 

746 action=action, 

747 paths=paths, 

748 exclude=exclude, 

749 include_venv=include_venv, 

750 group_by=group_by, 

751 output_format=output_format, 

752 verbose=verbose, 

753 raw_output=raw_output, 

754 logger=logger, 

755 all_results=all_results, 

756 total_issues=total_issues, 

757 total_fixed=total_fixed, 

758 total_remaining=total_remaining, 

759 ) 

760 

761 # AI enhancement via hook pattern 

762 effective_ai_fix = ai_fix or lintro_config.ai.default_fix 

763 _warn_ai_fix_disabled( 

764 action=action, 

765 ai_fix=effective_ai_fix, 

766 ai_enabled=lintro_config.ai.enabled, 

767 logger=logger, 

768 output_format=output_format, 

769 ) 

770 

771 from lintro.ai.hook import AIPostExecutionHook 

772 

773 ai_hook = AIPostExecutionHook(lintro_config, ai_fix=effective_ai_fix) 

774 ai_result = None 

775 if ai_hook.should_run(action): 

776 try: 

777 ai_result = ai_hook.execute( 

778 action, 

779 all_results, 

780 console_logger=logger, 

781 output_format=output_format, 

782 ) 

783 except Exception as exc: 

784 from loguru import logger as loguru_logger 

785 

786 loguru_logger.opt(exception=True).debug(f"AI hook failed: {exc}") 

787 if getattr(lintro_config.ai, "fail_on_ai_error", False): 

788 raise 

789 if output_format.lower() not in ("json", "sarif"): 

790 logger.console_output(f"Warning: AI enhancement failed: {exc}") 

791 from lintro.ai.models import AIResult 

792 

793 ai_result = AIResult(error=True, message=str(exc)) 

794 if ai_result is not None: 

795 total_issues, total_fixed, total_remaining = aggregate_tool_results( 

796 all_results, 

797 action, 

798 ) 

799 

800 # Determine final exit code once — used for both JSON output and return 

801 final_exit_code = int( 

802 determine_exit_code( 

803 action=action, 

804 all_results=all_results, 

805 total_issues=total_issues, 

806 total_remaining=total_remaining, 

807 main_phase_empty_due_to_filter=main_phase_empty_due_to_filter, 

808 ), 

809 ) 

810 

811 # AI-driven exit code adjustments 

812 if ai_result is not None: 

813 ai_config = lintro_config.ai 

814 if ai_config.fail_on_unfixed and ai_result.unfixed_issues > 0: 

815 final_exit_code = 1 

816 if ai_config.fail_on_ai_error and ai_result.error: 

817 final_exit_code = 1 

818 

819 # Display results 

820 if all_results: 

821 if output_format.lower() == "json": 

822 # Output JSON to stdout 

823 import json 

824 

825 from lintro.utils.json_output import create_json_output 

826 

827 json_data = create_json_output( 

828 action=str(action), 

829 results=all_results, 

830 total_issues=total_issues, 

831 total_fixed=total_fixed, 

832 total_remaining=total_remaining, 

833 exit_code=final_exit_code, 

834 ) 

835 print(json.dumps(json_data, indent=2)) 

836 elif output_format.lower() == "sarif": 

837 from lintro.ai.output.sarif import render_fixes_sarif 

838 from lintro.ai.output.sarif_bridge import ( 

839 suggestions_from_results, 

840 summary_from_results, 

841 ) 

842 

843 suggestions = suggestions_from_results(all_results) 

844 summary = summary_from_results(all_results) 

845 sarif_json = render_fixes_sarif(suggestions, summary) 

846 print(sarif_json) 

847 else: 

848 logger.print_execution_summary(action, all_results) 

849 

850 # Route warnings to stderr (loguru) for machine-readable formats so 

851 # plain-text messages don't corrupt JSON/SARIF output on stdout. 

852 _is_machine = output_format.lower() in ("json", "sarif") 

853 

854 def _warn(msg: str) -> None: 

855 if _is_machine: 

856 from loguru import logger as loguru_logger 

857 

858 loguru_logger.warning(msg) 

859 else: 

860 logger.console_output(msg) 

861 

862 # Write report files (markdown, html, csv) 

863 try: 

864 output_manager.write_reports_from_results(all_results) 

865 except (OSError, ValueError, TypeError) as e: 

866 _warn(f"Warning: Failed to write reports: {e}") 

867 # Continue execution - report writing failures should not stop the tool 

868 

869 # Write user-specified output file (--output flag) 

870 if output_file is not None: 

871 try: 

872 from lintro.enums.output_format import ( 

873 OutputFormat, 

874 normalize_output_format, 

875 ) 

876 from lintro.utils.output.file_writer import write_output_file 

877 

878 fmt = normalize_output_format(output_format) 

879 if fmt == OutputFormat.SARIF: 

880 from pathlib import Path 

881 

882 from lintro.ai.output.sarif import write_sarif 

883 from lintro.ai.output.sarif_bridge import ( 

884 suggestions_from_results, 

885 summary_from_results, 

886 ) 

887 from lintro.utils.output.file_writer import build_doc_url_map 

888 

889 suggestions = suggestions_from_results(all_results) 

890 summary = summary_from_results(all_results) 

891 write_sarif( 

892 suggestions, 

893 summary, 

894 output_path=Path(output_file), 

895 doc_urls=build_doc_url_map(all_results) or None, 

896 ) 

897 else: 

898 write_output_file( 

899 output_path=output_file, 

900 output_format=fmt, 

901 all_results=all_results, 

902 action=action, 

903 total_issues=total_issues, 

904 total_fixed=total_fixed, 

905 ) 

906 except (OSError, ValueError, TypeError) as e: 

907 _warn(f"Warning: Failed to write output file: {e}") 

908 

909 # Write side-channel artifact files when configured or when 

910 # running inside GitHub Actions (SARIF auto-emit for Code Scanning). 

911 _write_artifacts( 

912 all_results, 

913 lintro_config, 

914 logger, 

915 action=action, 

916 total_issues=total_issues, 

917 total_fixed=total_fixed, 

918 warn_func=_warn, 

919 ) 

920 

921 # Clean up old run directories to prevent unbounded growth 

922 try: 

923 output_manager.cleanup_old_runs() 

924 except OSError as e: 

925 _warn(f"Warning: Failed to clean up old runs: {e}") 

926 

927 return final_exit_code