Coverage for lintro / tools / definitions / shfmt.py: 94%

120 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Shfmt tool definition. 

2 

3Shfmt is a shell script formatter that supports POSIX, Bash, and mksh shells. 

4It formats shell scripts to ensure consistent style and can detect formatting 

5issues in diff mode. 

6""" 

7 

8from __future__ import annotations 

9 

10import subprocess # nosec B404 - used safely with shell disabled 

11from dataclasses import dataclass 

12from typing import Any 

13 

14from loguru import logger 

15 

16from lintro._tool_versions import get_min_version 

17from lintro.enums.tool_name import ToolName 

18from lintro.enums.tool_type import ToolType 

19from lintro.models.core.tool_result import ToolResult 

20from lintro.parsers.shfmt.shfmt_parser import parse_shfmt_output 

21from lintro.plugins.base import BaseToolPlugin 

22from lintro.plugins.file_processor import FileProcessingResult 

23from lintro.plugins.protocol import ToolDefinition 

24from lintro.plugins.registry import register_tool 

25from lintro.tools.core.option_validators import ( 

26 filter_none_options, 

27 validate_bool, 

28 validate_int, 

29 validate_str, 

30) 

31 

32# Constants for shfmt configuration 

33SHFMT_DEFAULT_TIMEOUT: int = 30 

34SHFMT_DEFAULT_PRIORITY: int = 50 

35SHFMT_FILE_PATTERNS: list[str] = ["*.sh", "*.bash", "*.ksh"] 

36 

37 

38@register_tool 

39@dataclass 

40class ShfmtPlugin(BaseToolPlugin): 

41 """Shfmt shell script formatter plugin. 

42 

43 This plugin integrates shfmt with Lintro for formatting shell scripts. 

44 It supports POSIX, Bash, and mksh shells with various formatting options. 

45 """ 

46 

47 @property 

48 def definition(self) -> ToolDefinition: 

49 """Return the tool definition. 

50 

51 Returns: 

52 ToolDefinition containing tool metadata. 

53 """ 

54 return ToolDefinition( 

55 name="shfmt", 

56 description=( 

57 "Shell script formatter supporting POSIX, Bash, and mksh shells" 

58 ), 

59 can_fix=True, 

60 tool_type=ToolType.FORMATTER, 

61 file_patterns=SHFMT_FILE_PATTERNS, 

62 priority=SHFMT_DEFAULT_PRIORITY, 

63 conflicts_with=[], 

64 native_configs=[".editorconfig"], 

65 version_command=["shfmt", "--version"], 

66 min_version=get_min_version(ToolName.SHFMT), 

67 default_options={ 

68 "timeout": SHFMT_DEFAULT_TIMEOUT, 

69 "indent": None, 

70 "binary_next_line": False, 

71 "switch_case_indent": False, 

72 "space_redirects": False, 

73 "language_dialect": None, 

74 "simplify": False, 

75 }, 

76 default_timeout=SHFMT_DEFAULT_TIMEOUT, 

77 ) 

78 

79 def set_options( 

80 self, 

81 indent: int | None = None, 

82 binary_next_line: bool | None = None, 

83 switch_case_indent: bool | None = None, 

84 space_redirects: bool | None = None, 

85 language_dialect: str | None = None, 

86 simplify: bool | None = None, 

87 **kwargs: Any, 

88 ) -> None: 

89 """Set shfmt-specific options. 

90 

91 Args: 

92 indent: Indentation size. 0 for tabs, >0 for that many spaces. 

93 binary_next_line: Binary ops like && and | may start a line. 

94 switch_case_indent: Indent switch cases. 

95 space_redirects: Redirect operators followed by space. 

96 language_dialect: Shell language dialect (bash, posix, mksh, bats). 

97 simplify: Simplify code where possible. 

98 **kwargs: Other tool options. 

99 

100 Raises: 

101 ValueError: If language_dialect is not a valid dialect. 

102 """ 

103 validate_int(indent, "indent") 

104 validate_bool(binary_next_line, "binary_next_line") 

105 validate_bool(switch_case_indent, "switch_case_indent") 

106 validate_bool(space_redirects, "space_redirects") 

107 validate_str(language_dialect, "language_dialect") 

108 validate_bool(simplify, "simplify") 

109 

110 # Validate language_dialect if provided 

111 if language_dialect is not None: 

112 valid_dialects = {"bash", "posix", "mksh", "bats"} 

113 if language_dialect.lower() not in valid_dialects: 

114 msg = ( 

115 f"Invalid language_dialect: {language_dialect!r}. " 

116 f"Must be one of: {', '.join(sorted(valid_dialects))}" 

117 ) 

118 raise ValueError(msg) 

119 language_dialect = language_dialect.lower() 

120 

121 options = filter_none_options( 

122 indent=indent, 

123 binary_next_line=binary_next_line, 

124 switch_case_indent=switch_case_indent, 

125 space_redirects=space_redirects, 

126 language_dialect=language_dialect, 

127 simplify=simplify, 

128 ) 

129 super().set_options(**options, **kwargs) 

130 

131 def _build_common_args(self) -> list[str]: 

132 """Build common CLI arguments for shfmt. 

133 

134 Returns: 

135 CLI arguments for shfmt. 

136 """ 

137 args: list[str] = [] 

138 

139 # Indentation 

140 indent = self.options.get("indent") 

141 if indent is not None: 

142 args.extend(["-i", str(indent)]) 

143 

144 # Binary operations at start of line 

145 if self.options.get("binary_next_line"): 

146 args.append("-bn") 

147 

148 # Switch case indentation 

149 if self.options.get("switch_case_indent"): 

150 args.append("-ci") 

151 

152 # Space after redirect operators 

153 if self.options.get("space_redirects"): 

154 args.append("-sr") 

155 

156 # Language dialect 

157 language_dialect = self.options.get("language_dialect") 

158 if language_dialect is not None: 

159 args.extend(["-ln", str(language_dialect)]) 

160 

161 # Simplify code 

162 if self.options.get("simplify"): 

163 args.append("-s") 

164 

165 return args 

166 

167 def _process_single_file( 

168 self, 

169 file_path: str, 

170 timeout: int, 

171 ) -> FileProcessingResult: 

172 """Process a single file in check mode. 

173 

174 Args: 

175 file_path: Path to the shell script to check. 

176 timeout: Timeout in seconds for the shfmt command. 

177 

178 Returns: 

179 FileProcessingResult with processing outcome. 

180 """ 

181 cmd = self._get_executable_command(tool_name="shfmt") + ["-d"] 

182 cmd.extend(self._build_common_args()) 

183 cmd.append(file_path) 

184 

185 try: 

186 success, output = self._run_subprocess( 

187 cmd=cmd, 

188 timeout=timeout, 

189 ) 

190 issues = parse_shfmt_output(output=output) 

191 return FileProcessingResult( 

192 success=success, 

193 output=output, 

194 issues=issues, 

195 ) 

196 except subprocess.TimeoutExpired: 

197 return FileProcessingResult( 

198 success=False, 

199 output="", 

200 issues=[], 

201 skipped=True, 

202 ) 

203 except (OSError, ValueError, RuntimeError) as e: 

204 return FileProcessingResult( 

205 success=False, 

206 output="", 

207 issues=[], 

208 error=str(e), 

209 ) 

210 

211 def _process_single_file_fix( 

212 self, 

213 file_path: str, 

214 timeout: int, 

215 ) -> tuple[FileProcessingResult, int, int]: 

216 """Process a single file in fix mode. 

217 

218 Args: 

219 file_path: Path to the shell script to fix. 

220 timeout: Timeout in seconds for the shfmt command. 

221 

222 Returns: 

223 Tuple of (FileProcessingResult, initial_issues_count, fixed_issues_count). 

224 """ 

225 # First check if file needs formatting 

226 check_cmd = self._get_executable_command(tool_name="shfmt") + ["-d"] 

227 check_cmd.extend(self._build_common_args()) 

228 check_cmd.append(file_path) 

229 

230 try: 

231 _, check_output = self._run_subprocess( 

232 cmd=check_cmd, 

233 timeout=timeout, 

234 ) 

235 check_issues = parse_shfmt_output(output=check_output) 

236 

237 if not check_issues: 

238 # No issues found, file is already formatted 

239 return ( 

240 FileProcessingResult( 

241 success=True, 

242 output="", 

243 issues=[], 

244 ), 

245 0, 

246 0, 

247 ) 

248 

249 # Apply fix with -w flag 

250 fix_cmd = self._get_executable_command(tool_name="shfmt") + ["-w"] 

251 fix_cmd.extend(self._build_common_args()) 

252 fix_cmd.append(file_path) 

253 

254 fix_success, _ = self._run_subprocess( 

255 cmd=fix_cmd, 

256 timeout=timeout, 

257 ) 

258 

259 if fix_success: 

260 return ( 

261 FileProcessingResult( 

262 success=True, 

263 output="", 

264 issues=[], 

265 ), 

266 len(check_issues), 

267 len(check_issues), 

268 ) 

269 return ( 

270 FileProcessingResult( 

271 success=False, 

272 output="", 

273 issues=check_issues, 

274 ), 

275 len(check_issues), 

276 0, 

277 ) 

278 

279 except subprocess.TimeoutExpired: 

280 return ( 

281 FileProcessingResult( 

282 success=False, 

283 output="", 

284 issues=[], 

285 skipped=True, 

286 ), 

287 0, 

288 0, 

289 ) 

290 except (OSError, ValueError, RuntimeError) as e: 

291 return ( 

292 FileProcessingResult( 

293 success=False, 

294 output="", 

295 issues=[], 

296 error=str(e), 

297 ), 

298 0, 

299 0, 

300 ) 

301 

302 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

303 """Check files with shfmt. 

304 

305 Args: 

306 paths: List of file or directory paths to check. 

307 options: Runtime options that override defaults. 

308 

309 Returns: 

310 ToolResult with check results. 

311 """ 

312 ctx = self._prepare_execution(paths, options) 

313 if ctx.should_skip: 

314 return ctx.early_result # type: ignore[return-value] 

315 

316 result = self._process_files_with_progress( 

317 files=ctx.files, 

318 processor=lambda f: self._process_single_file(f, ctx.timeout), 

319 timeout=ctx.timeout, 

320 ) 

321 

322 return ToolResult( 

323 name=self.definition.name, 

324 success=result.all_success and result.total_issues == 0, 

325 output=result.build_output(timeout=ctx.timeout), 

326 issues_count=result.total_issues, 

327 issues=result.all_issues, 

328 ) 

329 

330 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

331 """Fix formatting issues in files with shfmt. 

332 

333 Args: 

334 paths: List of file or directory paths to fix. 

335 options: Runtime options that override defaults. 

336 

337 Returns: 

338 ToolResult with fix results. 

339 """ 

340 ctx = self._prepare_execution( 

341 paths, 

342 options, 

343 no_files_message="No files to format.", 

344 ) 

345 if ctx.should_skip: 

346 return ctx.early_result # type: ignore[return-value] 

347 

348 # Track fix-specific metrics 

349 initial_issues_total = 0 

350 fixed_issues_total = 0 

351 fixed_files: list[str] = [] 

352 

353 def process_fix(file_path: str) -> FileProcessingResult: 

354 """Process a single file for fixing. 

355 

356 Args: 

357 file_path: Path to the file to process. 

358 

359 Returns: 

360 FileProcessingResult with processing outcome. 

361 """ 

362 nonlocal initial_issues_total, fixed_issues_total, fixed_files 

363 result, initial, fixed = self._process_single_file_fix( 

364 file_path=file_path, 

365 timeout=ctx.timeout, 

366 ) 

367 initial_issues_total += initial 

368 fixed_issues_total += fixed 

369 if fixed > 0: 

370 fixed_files.append(file_path) 

371 return result 

372 

373 result = self._process_files_with_progress( 

374 files=ctx.files, 

375 processor=process_fix, 

376 timeout=ctx.timeout, 

377 label="Formatting files", 

378 ) 

379 

380 # Calculate remaining issues 

381 remaining_issues = initial_issues_total - fixed_issues_total 

382 

383 # Build summary output 

384 summary_parts: list[str] = [] 

385 if fixed_issues_total > 0: 

386 summary_parts.append( 

387 f"Fixed {fixed_issues_total} issue(s) in {len(fixed_files)} file(s)", 

388 ) 

389 if remaining_issues > 0: 

390 summary_parts.append( 

391 f"Found {remaining_issues} issue(s) that could not be fixed", 

392 ) 

393 if result.execution_failures > 0: 

394 summary_parts.append( 

395 f"Failed to process {result.execution_failures} file(s)", 

396 ) 

397 

398 final_output = "\n".join(summary_parts) if summary_parts else "No fixes needed." 

399 

400 logger.debug( 

401 f"[ShfmtPlugin] Fix complete: initial={initial_issues_total}, " 

402 f"fixed={fixed_issues_total}, remaining={remaining_issues}", 

403 ) 

404 

405 return ToolResult( 

406 name=self.definition.name, 

407 success=result.all_success and remaining_issues == 0, 

408 output=final_output, 

409 issues_count=remaining_issues, 

410 issues=result.all_issues, 

411 initial_issues_count=initial_issues_total, 

412 fixed_issues_count=fixed_issues_total, 

413 remaining_issues_count=remaining_issues, 

414 )