Coverage for lintro / utils / execution / parallel_executor.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Parallel tool execution utilities. 

2 

3This module provides functions for running tools in parallel using async execution. 

4""" 

5 

6from __future__ import annotations 

7 

8import asyncio 

9import sys 

10from typing import TYPE_CHECKING 

11 

12from rich.progress import ( 

13 BarColumn, 

14 Progress, 

15 SpinnerColumn, 

16 TaskProgressColumn, 

17 TextColumn, 

18) 

19 

20from lintro.enums.action import Action 

21from lintro.models.core.tool_result import ToolResult 

22from lintro.tools import tool_manager 

23from lintro.utils.execution.tool_configuration import configure_tool_for_execution 

24from lintro.utils.unified_config import UnifiedConfigManager 

25 

26if TYPE_CHECKING: 

27 from lintro.plugins.base import BaseToolPlugin 

28 

29 

30def run_tools_parallel( 

31 tools_to_run: list[str], 

32 paths: list[str], 

33 action: Action, 

34 config_manager: UnifiedConfigManager, 

35 tool_option_dict: dict[str, dict[str, object]], 

36 exclude: str | None, 

37 include_venv: bool, 

38 post_tools: set[str], 

39 max_workers: int, 

40 incremental: bool = False, 

41 auto_install: bool = False, 

42 max_fix_retries: int = 3, 

43) -> list[ToolResult]: 

44 """Run tools in parallel using async executor. 

45 

46 Args: 

47 tools_to_run: List of tool names to run. 

48 paths: List of file paths to process. 

49 action: Action to perform. 

50 config_manager: Unified config manager. 

51 tool_option_dict: Parsed tool options from CLI. 

52 exclude: Exclude patterns. 

53 include_venv: Whether to include venv. 

54 post_tools: Set of post-check tool names. 

55 max_workers: Maximum parallel workers. 

56 incremental: Whether to only check changed files. 

57 auto_install: Whether to auto-install Node.js deps if missing. 

58 max_fix_retries: Maximum fix→verify convergence cycles. 

59 

60 Returns: 

61 List of ToolResult objects. 

62 """ 

63 from loguru import logger 

64 

65 from lintro.utils.async_tool_executor import ( 

66 AsyncToolExecutor, 

67 get_parallel_batches, 

68 ) 

69 

70 # Group tools into batches that can run in parallel 

71 batches = get_parallel_batches(tools_to_run, tool_manager) 

72 logger.debug(f"Parallel execution batches: {batches}") 

73 

74 all_results: list[ToolResult] = [] 

75 executor = AsyncToolExecutor(max_workers=max_workers) 

76 total_tools = len(tools_to_run) 

77 

78 # Disable progress when not in a TTY 

79 disable_progress = not sys.stdout.isatty() 

80 

81 try: 

82 with Progress( 

83 SpinnerColumn(), 

84 TextColumn("[progress.description]{task.description}"), 

85 BarColumn(), 

86 TaskProgressColumn(), 

87 transient=True, 

88 disable=disable_progress, 

89 ) as progress: 

90 task = progress.add_task( 

91 f"Running {total_tools} tools...", 

92 total=total_tools, 

93 ) 

94 completed_count = 0 

95 

96 for batch in batches: 

97 # Prepare tools in batch 

98 tools_with_instances: list[tuple[str, BaseToolPlugin]] = [] 

99 

100 for tool_name in batch: 

101 tool = tool_manager.get_tool(tool_name) 

102 

103 # Configure tool using shared helper 

104 configure_tool_for_execution( 

105 tool=tool, 

106 tool_name=tool_name, 

107 config_manager=config_manager, 

108 tool_option_dict=tool_option_dict, 

109 exclude=exclude, 

110 include_venv=include_venv, 

111 incremental=incremental, 

112 action=action, 

113 post_tools=post_tools, 

114 auto_install=auto_install, 

115 ) 

116 

117 tools_with_instances.append((tool_name, tool)) 

118 

119 # Update progress description for this batch 

120 batch_names = ", ".join(batch) 

121 progress.update( 

122 task, 

123 description=f"Running: {batch_names}", 

124 ) 

125 

126 # Create callback to update progress on completion 

127 def on_tool_complete( 

128 name: str, 

129 result: ToolResult, 

130 ) -> None: 

131 """Update progress when a tool completes. 

132 

133 Args: 

134 name: Name of the completed tool. 

135 result: Result from the tool execution. 

136 """ 

137 nonlocal completed_count 

138 completed_count += 1 

139 status = "✓" if result.success else "✗" 

140 desc = f"{status} {name} done ({completed_count}/{total_tools})" 

141 progress.update( 

142 task, 

143 completed=completed_count, 

144 description=desc, 

145 ) 

146 

147 # Run batch in parallel with progress callback 

148 batch_results = asyncio.run( 

149 executor.run_tools_parallel( 

150 tools=tools_with_instances, 

151 paths=paths, 

152 action=action, 

153 on_result=on_tool_complete, 

154 max_fix_retries=max_fix_retries, 

155 ), 

156 ) 

157 

158 # Collect results 

159 for _, result in batch_results: 

160 all_results.append(result) 

161 

162 finally: 

163 executor.shutdown() 

164 

165 return all_results