Coverage for lintro / cli_utils / command_chainer.py: 100%
79 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Command chaining orchestration for Lintro CLI.
3This module provides the CommandChainer class that handles parsing and
4execution of comma-separated command chains in the Lintro CLI.
6Example:
7 lintro fmt . , chk . , tst
8 This chains format, check, and test commands sequentially.
9"""
11from __future__ import annotations
13from collections.abc import Sequence
14from typing import TYPE_CHECKING
16import click
17from loguru import logger
19if TYPE_CHECKING:
20 pass
23class CommandChainer:
24 """Orchestrates execution of multiple CLI commands in sequence.
26 This class extracts command chaining logic from the main CLI group,
27 making it easier to test and maintain. It handles:
29 - Detection of command chains (comma-separated commands)
30 - Normalization of arguments (splitting joined commands)
31 - Grouping of commands with their arguments
32 - Sequential execution with proper error handling
33 """
35 def __init__(self, group: click.Group, separator: str = ",") -> None:
36 """Initialize the command chainer.
38 Args:
39 group: The Click group containing available commands.
40 separator: The character used to separate commands (default: ",").
42 Attributes:
43 group: The Click group containing available commands.
44 separator: The character used to separate commands.
45 """
46 self.group = group
47 self.separator = separator
48 self._command_names: set[str] | None = None
50 @property
51 def command_names(self) -> set[str]:
52 """Get available command names lazily.
54 Returns:
55 Set of command names and aliases available in the group.
56 """
57 if self._command_names is None:
58 ctx = click.Context(self.group)
59 self._command_names = set(self.group.list_commands(ctx))
60 return self._command_names
62 def should_chain(self, args: Sequence[str]) -> bool:
63 """Check if arguments contain command chaining.
65 Args:
66 args: Command line arguments to check.
68 Returns:
69 True if the arguments contain comma separators indicating chaining.
70 """
71 for arg in args:
72 if arg == self.separator:
73 return True
74 if self.separator in arg:
75 # Check if splitting by comma yields known commands
76 parts = [p.strip() for p in arg.split(self.separator) if p.strip()]
77 if parts and all(p in self.command_names for p in parts):
78 return True
79 return False
81 def normalize_args(self, args: Sequence[str]) -> list[str]:
82 """Normalize comma-adjacent args into separate tokens.
84 Handles cases like:
85 - "fmt,chk" -> ["fmt", ",", "chk"]
86 - "fmt , chk" -> ["fmt", ",", "chk"]
87 - "--tools ruff,bandit" -> ["--tools", "ruff,bandit"] (preserved)
89 Args:
90 args: Raw command line arguments.
92 Returns:
93 Normalized list of arguments with separators as distinct tokens.
94 """
95 normalized: list[str] = []
97 for arg in args:
98 if arg == self.separator:
99 normalized.append(arg)
100 continue
102 if self.separator in arg:
103 # Check if this looks like comma-separated commands
104 raw_parts = [part.strip() for part in arg.split(self.separator)]
105 fragments = [part for part in raw_parts if part]
107 # Only split if all parts are known commands
108 if fragments and all(part in self.command_names for part in fragments):
109 for idx, part in enumerate(fragments):
110 if part:
111 normalized.append(part)
112 if idx < len(fragments) - 1:
113 normalized.append(self.separator)
114 continue
116 # Not comma-separated commands, keep as-is
117 normalized.append(arg)
119 return normalized
121 def group_commands(self, args: list[str]) -> list[list[str]]:
122 """Split arguments into command groups at separators.
124 Args:
125 args: Normalized arguments with separators as distinct tokens.
127 Returns:
128 List of command groups, where each group is a command with its args.
129 """
130 command_groups: list[list[str]] = []
131 current_group: list[str] = []
133 for arg in args:
134 if arg == self.separator:
135 if current_group:
136 command_groups.append(current_group)
137 current_group = []
138 continue
139 current_group.append(arg)
141 if current_group:
142 command_groups.append(current_group)
144 return command_groups
146 def execute_chain(
147 self,
148 ctx: click.Context,
149 command_groups: list[list[str]],
150 ) -> int:
151 """Execute command groups sequentially, return max exit code.
153 Args:
154 ctx: The parent Click context.
155 command_groups: List of command groups to execute.
157 Returns:
158 The maximum exit code from all commands (0 if all succeeded).
159 """
160 exit_codes: list[int] = []
162 for cmd_args in command_groups:
163 if not cmd_args:
164 continue
166 exit_code = self._execute_single_command(ctx, cmd_args)
167 exit_codes.append(exit_code)
169 return max(exit_codes) if exit_codes else 0
171 def _execute_single_command(
172 self,
173 parent_ctx: click.Context,
174 cmd_args: list[str],
175 ) -> int:
176 """Execute a single command with its arguments.
178 Args:
179 parent_ctx: The parent Click context.
180 cmd_args: Command name followed by its arguments.
182 Returns:
183 Exit code from the command execution.
185 Raises:
186 KeyboardInterrupt: Re-raised to allow normal user interruption.
187 """
188 try:
189 # Create a new context for this command
190 ctx_copy = self.group.make_context(
191 parent_ctx.info_name,
192 cmd_args,
193 parent=parent_ctx,
194 allow_extra_args=True,
195 allow_interspersed_args=False,
196 )
198 # Invoke the command
199 with ctx_copy.scope() as subctx:
200 result = self.group.invoke(subctx)
201 return result if isinstance(result, int) else 0
203 except SystemExit as e:
204 return e.code if isinstance(e.code, int) else (0 if e.code is None else 1)
205 except KeyboardInterrupt:
206 # Re-raise KeyboardInterrupt to allow normal interruption
207 raise
208 except Exception as e: # noqa: BLE001 - intentional: allow chain to continue
209 # Catch all other exceptions to allow command chain to continue
210 exit_code = getattr(e, "exit_code", 1)
211 logger.exception(
212 f"Error executing command '{' '.join(cmd_args)}': "
213 f"{type(e).__name__}: {e}",
214 )
215 click.echo(
216 click.style(
217 f"Error executing command '{' '.join(cmd_args)}': "
218 f"{type(e).__name__}: {e}",
219 fg="red",
220 ),
221 err=True,
222 )
223 return exit_code