Coverage for lintro / utils / execution / parallel_executor.py: 100%
41 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Parallel tool execution utilities.
3This module provides functions for running tools in parallel using async execution.
4"""
6from __future__ import annotations
8import asyncio
9import sys
10from typing import TYPE_CHECKING
12from rich.progress import (
13 BarColumn,
14 Progress,
15 SpinnerColumn,
16 TaskProgressColumn,
17 TextColumn,
18)
20from lintro.enums.action import Action
21from lintro.models.core.tool_result import ToolResult
22from lintro.tools import tool_manager
23from lintro.utils.execution.tool_configuration import configure_tool_for_execution
24from lintro.utils.unified_config import UnifiedConfigManager
26if TYPE_CHECKING:
27 from lintro.plugins.base import BaseToolPlugin
30def run_tools_parallel(
31 tools_to_run: list[str],
32 paths: list[str],
33 action: Action,
34 config_manager: UnifiedConfigManager,
35 tool_option_dict: dict[str, dict[str, object]],
36 exclude: str | None,
37 include_venv: bool,
38 post_tools: set[str],
39 max_workers: int,
40 incremental: bool = False,
41 auto_install: bool = False,
42 max_fix_retries: int = 3,
43) -> list[ToolResult]:
44 """Run tools in parallel using async executor.
46 Args:
47 tools_to_run: List of tool names to run.
48 paths: List of file paths to process.
49 action: Action to perform.
50 config_manager: Unified config manager.
51 tool_option_dict: Parsed tool options from CLI.
52 exclude: Exclude patterns.
53 include_venv: Whether to include venv.
54 post_tools: Set of post-check tool names.
55 max_workers: Maximum parallel workers.
56 incremental: Whether to only check changed files.
57 auto_install: Whether to auto-install Node.js deps if missing.
58 max_fix_retries: Maximum fix→verify convergence cycles.
60 Returns:
61 List of ToolResult objects.
62 """
63 from loguru import logger
65 from lintro.utils.async_tool_executor import (
66 AsyncToolExecutor,
67 get_parallel_batches,
68 )
70 # Group tools into batches that can run in parallel
71 batches = get_parallel_batches(tools_to_run, tool_manager)
72 logger.debug(f"Parallel execution batches: {batches}")
74 all_results: list[ToolResult] = []
75 executor = AsyncToolExecutor(max_workers=max_workers)
76 total_tools = len(tools_to_run)
78 # Disable progress when not in a TTY
79 disable_progress = not sys.stdout.isatty()
81 try:
82 with Progress(
83 SpinnerColumn(),
84 TextColumn("[progress.description]{task.description}"),
85 BarColumn(),
86 TaskProgressColumn(),
87 transient=True,
88 disable=disable_progress,
89 ) as progress:
90 task = progress.add_task(
91 f"Running {total_tools} tools...",
92 total=total_tools,
93 )
94 completed_count = 0
96 for batch in batches:
97 # Prepare tools in batch
98 tools_with_instances: list[tuple[str, BaseToolPlugin]] = []
100 for tool_name in batch:
101 tool = tool_manager.get_tool(tool_name)
103 # Configure tool using shared helper
104 configure_tool_for_execution(
105 tool=tool,
106 tool_name=tool_name,
107 config_manager=config_manager,
108 tool_option_dict=tool_option_dict,
109 exclude=exclude,
110 include_venv=include_venv,
111 incremental=incremental,
112 action=action,
113 post_tools=post_tools,
114 auto_install=auto_install,
115 )
117 tools_with_instances.append((tool_name, tool))
119 # Update progress description for this batch
120 batch_names = ", ".join(batch)
121 progress.update(
122 task,
123 description=f"Running: {batch_names}",
124 )
126 # Create callback to update progress on completion
127 def on_tool_complete(
128 name: str,
129 result: ToolResult,
130 ) -> None:
131 """Update progress when a tool completes.
133 Args:
134 name: Name of the completed tool.
135 result: Result from the tool execution.
136 """
137 nonlocal completed_count
138 completed_count += 1
139 status = "✓" if result.success else "✗"
140 desc = f"{status} {name} done ({completed_count}/{total_tools})"
141 progress.update(
142 task,
143 completed=completed_count,
144 description=desc,
145 )
147 # Run batch in parallel with progress callback
148 batch_results = asyncio.run(
149 executor.run_tools_parallel(
150 tools=tools_with_instances,
151 paths=paths,
152 action=action,
153 on_result=on_tool_complete,
154 max_fix_retries=max_fix_retries,
155 ),
156 )
158 # Collect results
159 for _, result in batch_results:
160 all_results.append(result)
162 finally:
163 executor.shutdown()
165 return all_results