Coverage for lintro / cli_utils / command_chainer.py: 100%

79 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Command chaining orchestration for Lintro CLI. 

2 

3This module provides the CommandChainer class that handles parsing and 

4execution of comma-separated command chains in the Lintro CLI. 

5 

6Example: 

7 lintro fmt . , chk . , tst 

8 This chains format, check, and test commands sequentially. 

9""" 

10 

11from __future__ import annotations 

12 

13from collections.abc import Sequence 

14from typing import TYPE_CHECKING 

15 

16import click 

17from loguru import logger 

18 

19if TYPE_CHECKING: 

20 pass 

21 

22 

23class CommandChainer: 

24 """Orchestrates execution of multiple CLI commands in sequence. 

25 

26 This class extracts command chaining logic from the main CLI group, 

27 making it easier to test and maintain. It handles: 

28 

29 - Detection of command chains (comma-separated commands) 

30 - Normalization of arguments (splitting joined commands) 

31 - Grouping of commands with their arguments 

32 - Sequential execution with proper error handling 

33 """ 

34 

35 def __init__(self, group: click.Group, separator: str = ",") -> None: 

36 """Initialize the command chainer. 

37 

38 Args: 

39 group: The Click group containing available commands. 

40 separator: The character used to separate commands (default: ","). 

41 

42 Attributes: 

43 group: The Click group containing available commands. 

44 separator: The character used to separate commands. 

45 """ 

46 self.group = group 

47 self.separator = separator 

48 self._command_names: set[str] | None = None 

49 

50 @property 

51 def command_names(self) -> set[str]: 

52 """Get available command names lazily. 

53 

54 Returns: 

55 Set of command names and aliases available in the group. 

56 """ 

57 if self._command_names is None: 

58 ctx = click.Context(self.group) 

59 self._command_names = set(self.group.list_commands(ctx)) 

60 return self._command_names 

61 

62 def should_chain(self, args: Sequence[str]) -> bool: 

63 """Check if arguments contain command chaining. 

64 

65 Args: 

66 args: Command line arguments to check. 

67 

68 Returns: 

69 True if the arguments contain comma separators indicating chaining. 

70 """ 

71 for arg in args: 

72 if arg == self.separator: 

73 return True 

74 if self.separator in arg: 

75 # Check if splitting by comma yields known commands 

76 parts = [p.strip() for p in arg.split(self.separator) if p.strip()] 

77 if parts and all(p in self.command_names for p in parts): 

78 return True 

79 return False 

80 

81 def normalize_args(self, args: Sequence[str]) -> list[str]: 

82 """Normalize comma-adjacent args into separate tokens. 

83 

84 Handles cases like: 

85 - "fmt,chk" -> ["fmt", ",", "chk"] 

86 - "fmt , chk" -> ["fmt", ",", "chk"] 

87 - "--tools ruff,bandit" -> ["--tools", "ruff,bandit"] (preserved) 

88 

89 Args: 

90 args: Raw command line arguments. 

91 

92 Returns: 

93 Normalized list of arguments with separators as distinct tokens. 

94 """ 

95 normalized: list[str] = [] 

96 

97 for arg in args: 

98 if arg == self.separator: 

99 normalized.append(arg) 

100 continue 

101 

102 if self.separator in arg: 

103 # Check if this looks like comma-separated commands 

104 raw_parts = [part.strip() for part in arg.split(self.separator)] 

105 fragments = [part for part in raw_parts if part] 

106 

107 # Only split if all parts are known commands 

108 if fragments and all(part in self.command_names for part in fragments): 

109 for idx, part in enumerate(fragments): 

110 if part: 

111 normalized.append(part) 

112 if idx < len(fragments) - 1: 

113 normalized.append(self.separator) 

114 continue 

115 

116 # Not comma-separated commands, keep as-is 

117 normalized.append(arg) 

118 

119 return normalized 

120 

121 def group_commands(self, args: list[str]) -> list[list[str]]: 

122 """Split arguments into command groups at separators. 

123 

124 Args: 

125 args: Normalized arguments with separators as distinct tokens. 

126 

127 Returns: 

128 List of command groups, where each group is a command with its args. 

129 """ 

130 command_groups: list[list[str]] = [] 

131 current_group: list[str] = [] 

132 

133 for arg in args: 

134 if arg == self.separator: 

135 if current_group: 

136 command_groups.append(current_group) 

137 current_group = [] 

138 continue 

139 current_group.append(arg) 

140 

141 if current_group: 

142 command_groups.append(current_group) 

143 

144 return command_groups 

145 

146 def execute_chain( 

147 self, 

148 ctx: click.Context, 

149 command_groups: list[list[str]], 

150 ) -> int: 

151 """Execute command groups sequentially, return max exit code. 

152 

153 Args: 

154 ctx: The parent Click context. 

155 command_groups: List of command groups to execute. 

156 

157 Returns: 

158 The maximum exit code from all commands (0 if all succeeded). 

159 """ 

160 exit_codes: list[int] = [] 

161 

162 for cmd_args in command_groups: 

163 if not cmd_args: 

164 continue 

165 

166 exit_code = self._execute_single_command(ctx, cmd_args) 

167 exit_codes.append(exit_code) 

168 

169 return max(exit_codes) if exit_codes else 0 

170 

171 def _execute_single_command( 

172 self, 

173 parent_ctx: click.Context, 

174 cmd_args: list[str], 

175 ) -> int: 

176 """Execute a single command with its arguments. 

177 

178 Args: 

179 parent_ctx: The parent Click context. 

180 cmd_args: Command name followed by its arguments. 

181 

182 Returns: 

183 Exit code from the command execution. 

184 

185 Raises: 

186 KeyboardInterrupt: Re-raised to allow normal user interruption. 

187 """ 

188 try: 

189 # Create a new context for this command 

190 ctx_copy = self.group.make_context( 

191 parent_ctx.info_name, 

192 cmd_args, 

193 parent=parent_ctx, 

194 allow_extra_args=True, 

195 allow_interspersed_args=False, 

196 ) 

197 

198 # Invoke the command 

199 with ctx_copy.scope() as subctx: 

200 result = self.group.invoke(subctx) 

201 return result if isinstance(result, int) else 0 

202 

203 except SystemExit as e: 

204 return e.code if isinstance(e.code, int) else (0 if e.code is None else 1) 

205 except KeyboardInterrupt: 

206 # Re-raise KeyboardInterrupt to allow normal interruption 

207 raise 

208 except Exception as e: # noqa: BLE001 - intentional: allow chain to continue 

209 # Catch all other exceptions to allow command chain to continue 

210 exit_code = getattr(e, "exit_code", 1) 

211 logger.exception( 

212 f"Error executing command '{' '.join(cmd_args)}': " 

213 f"{type(e).__name__}: {e}", 

214 ) 

215 click.echo( 

216 click.style( 

217 f"Error executing command '{' '.join(cmd_args)}': " 

218 f"{type(e).__name__}: {e}", 

219 fg="red", 

220 ), 

221 err=True, 

222 ) 

223 return exit_code