Coverage for lintro / tools / definitions / black.py: 81%

140 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Black tool definition. 

2 

3Black is an opinionated Python code formatter. It enforces a consistent style 

4by parsing Python code and re-printing it with its own rules, ensuring uniformity 

5across projects. 

6""" 

7 

8from __future__ import annotations 

9 

10import subprocess # nosec B404 - used safely with shell disabled 

11from dataclasses import dataclass 

12from typing import Any 

13 

14from loguru import logger 

15 

16from lintro.enums.tool_type import ToolType 

17from lintro.models.core.tool_result import ToolResult 

18from lintro.parsers.black.black_issue import BlackIssue 

19from lintro.parsers.black.black_parser import parse_black_output 

20from lintro.plugins.base import BaseToolPlugin 

21from lintro.plugins.protocol import ToolDefinition 

22from lintro.plugins.registry import register_tool 

23from lintro.tools.core.option_validators import ( 

24 filter_none_options, 

25 validate_bool, 

26 validate_int, 

27 validate_str, 

28) 

29 

30# Constants for Black configuration 

31BLACK_DEFAULT_TIMEOUT: int = 30 

32BLACK_DEFAULT_PRIORITY: int = 90 # Prefer Black ahead of Ruff formatting 

33BLACK_FILE_PATTERNS: list[str] = ["*.py", "*.pyi"] 

34 

35 

36@register_tool 

37@dataclass 

38class BlackPlugin(BaseToolPlugin): 

39 """Black Python formatter plugin. 

40 

41 This plugin integrates Black with Lintro for formatting Python files. 

42 """ 

43 

44 @property 

45 def definition(self) -> ToolDefinition: 

46 """Return the tool definition. 

47 

48 Returns: 

49 ToolDefinition containing tool metadata. 

50 """ 

51 return ToolDefinition( 

52 name="black", 

53 description="Opinionated Python code formatter", 

54 can_fix=True, 

55 tool_type=ToolType.FORMATTER, 

56 file_patterns=BLACK_FILE_PATTERNS, 

57 priority=BLACK_DEFAULT_PRIORITY, 

58 conflicts_with=[], 

59 native_configs=["pyproject.toml"], 

60 version_command=["black", "--version"], 

61 min_version="24.0.0", 

62 default_options={ 

63 "timeout": BLACK_DEFAULT_TIMEOUT, 

64 "line_length": None, 

65 "target_version": None, 

66 "fast": False, 

67 "preview": False, 

68 "diff": False, 

69 }, 

70 default_timeout=BLACK_DEFAULT_TIMEOUT, 

71 ) 

72 

73 def set_options( 

74 self, 

75 line_length: int | None = None, 

76 target_version: str | None = None, 

77 fast: bool | None = None, 

78 preview: bool | None = None, 

79 diff: bool | None = None, 

80 **kwargs: Any, 

81 ) -> None: 

82 """Set Black-specific options with validation. 

83 

84 Args: 

85 line_length: Optional line length override. 

86 target_version: String per Black CLI (e.g., "py313"). 

87 fast: Use --fast mode (skip safety checks). 

88 preview: Enable preview style. 

89 diff: Show diffs in output when formatting. 

90 **kwargs: Additional base options. 

91 """ 

92 validate_int(line_length, "line_length") 

93 validate_str(target_version, "target_version") 

94 validate_bool(fast, "fast") 

95 validate_bool(preview, "preview") 

96 validate_bool(diff, "diff") 

97 

98 options = filter_none_options( 

99 line_length=line_length, 

100 target_version=target_version, 

101 fast=fast, 

102 preview=preview, 

103 diff=diff, 

104 ) 

105 super().set_options(**options, **kwargs) 

106 

107 def _build_common_args(self) -> list[str]: 

108 """Build common CLI arguments for Black. 

109 

110 Returns: 

111 CLI arguments for Black. 

112 """ 

113 args: list[str] = [] 

114 

115 # Try Lintro config injection first 

116 config_args = self._build_config_args() 

117 if config_args: 

118 args.extend(config_args) 

119 else: 

120 if self.options.get("line_length"): 

121 args.extend(["--line-length", str(self.options["line_length"])]) 

122 if self.options.get("target_version"): 

123 args.extend(["--target-version", str(self.options["target_version"])]) 

124 

125 if self.options.get("fast"): 

126 args.append("--fast") 

127 if self.options.get("preview"): 

128 args.append("--preview") 

129 return args 

130 

131 def _check_line_length_violations( 

132 self, 

133 files: list[str], 

134 cwd: str | None, 

135 ) -> list[BlackIssue]: 

136 """Check for line length violations using the shared line-length checker. 

137 

138 Args: 

139 files: List of file paths to check. 

140 cwd: Working directory for the check. 

141 

142 Returns: 

143 List of line length violations converted to BlackIssue objects. 

144 """ 

145 if not files: 

146 return [] 

147 

148 from lintro.tools.core.line_length_checker import check_line_length_violations 

149 

150 line_length_opt = self.options.get("line_length") 

151 timeout_opt = self.options.get("timeout", BLACK_DEFAULT_TIMEOUT) 

152 line_length_val: int | None = None 

153 if isinstance(line_length_opt, int): 

154 line_length_val = line_length_opt 

155 elif line_length_opt is not None: 

156 line_length_val = int(str(line_length_opt)) 

157 if isinstance(timeout_opt, int): 

158 timeout_val = timeout_opt 

159 elif timeout_opt is not None: 

160 timeout_val = int(str(timeout_opt)) 

161 else: 

162 timeout_val = BLACK_DEFAULT_TIMEOUT 

163 

164 violations = check_line_length_violations( 

165 files=files, 

166 cwd=cwd, 

167 line_length=line_length_val, 

168 timeout=timeout_val, 

169 ) 

170 

171 black_issues: list[BlackIssue] = [] 

172 for violation in violations: 

173 message = ( 

174 f"Line {violation.line} exceeds line length limit " 

175 f"({violation.message})" 

176 ) 

177 black_issues.append( 

178 BlackIssue( 

179 file=violation.file, 

180 line=violation.line, 

181 column=violation.column, 

182 code=violation.code, 

183 message=message, 

184 severity="error", 

185 fixable=False, 

186 ), 

187 ) 

188 

189 return black_issues 

190 

191 def _handle_timeout_error( 

192 self, 

193 timeout_val: int, 

194 initial_count: int | None = None, 

195 cwd: str | None = None, 

196 ) -> ToolResult: 

197 """Handle timeout errors consistently. 

198 

199 Args: 

200 timeout_val: The timeout value that was exceeded. 

201 initial_count: Optional initial issues count for fix operations. 

202 cwd: Working directory for the tool result. 

203 

204 Returns: 

205 Standardized timeout error result. 

206 """ 

207 timeout_msg = ( 

208 f"Black execution timed out ({timeout_val}s limit exceeded).\n\n" 

209 "This may indicate:\n" 

210 " - Large codebase taking too long to process\n" 

211 " - Need to increase timeout via --tool-options black:timeout=N" 

212 ) 

213 if initial_count is not None: 

214 return ToolResult( 

215 name=self.definition.name, 

216 success=False, 

217 output=timeout_msg, 

218 issues_count=0, 

219 issues=[], 

220 initial_issues_count=initial_count, 

221 cwd=cwd, 

222 ) 

223 return ToolResult( 

224 name=self.definition.name, 

225 success=False, 

226 output=timeout_msg, 

227 issues_count=1, 

228 issues=[], 

229 cwd=cwd, 

230 ) 

231 

232 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

233 """Check files using Black without applying changes. 

234 

235 Args: 

236 paths: List of file or directory paths to check. 

237 options: Runtime options that override defaults. 

238 

239 Returns: 

240 ToolResult with check results. 

241 """ 

242 # Merge runtime options 

243 merged_options = dict(self.options) 

244 merged_options.update(options) 

245 

246 # Use shared preparation for version check, path validation, file discovery 

247 ctx = self._prepare_execution(paths, options) 

248 if ctx.should_skip: 

249 return ctx.early_result # type: ignore[return-value] 

250 

251 cmd: list[str] = self._get_executable_command(tool_name="black") + ["--check"] 

252 cmd.extend(self._build_common_args()) 

253 cmd.extend(ctx.rel_files) 

254 

255 logger.debug(f"[BlackPlugin] Running: {' '.join(cmd)} (cwd={ctx.cwd})") 

256 try: 

257 success, output = self._run_subprocess( 

258 cmd=cmd, 

259 timeout=ctx.timeout, 

260 cwd=ctx.cwd, 

261 ) 

262 except subprocess.TimeoutExpired: 

263 return self._handle_timeout_error(ctx.timeout, cwd=ctx.cwd) 

264 

265 black_issues = parse_black_output(output=output) 

266 

267 # Check for line length violations that Black cannot wrap 

268 line_length_issues = self._check_line_length_violations( 

269 files=ctx.rel_files, 

270 cwd=ctx.cwd, 

271 ) 

272 

273 all_issues = black_issues + line_length_issues 

274 count = len(all_issues) 

275 

276 return ToolResult( 

277 name=self.definition.name, 

278 success=(success and count == 0), 

279 output=None if count == 0 else output, 

280 issues_count=count, 

281 issues=all_issues, 

282 cwd=ctx.cwd, 

283 ) 

284 

285 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

286 """Format files using Black. 

287 

288 Args: 

289 paths: List of file or directory paths to format. 

290 options: Runtime options that override defaults. 

291 

292 Returns: 

293 ToolResult with fix results. 

294 """ 

295 # Merge runtime options 

296 merged_options = dict(self.options) 

297 merged_options.update(options) 

298 

299 # Use shared preparation for version check, path validation, file discovery 

300 ctx = self._prepare_execution( 

301 paths, 

302 options, 

303 no_files_message="No files to format.", 

304 ) 

305 if ctx.should_skip: 

306 return ctx.early_result # type: ignore[return-value] 

307 

308 # Build reusable check command 

309 check_cmd: list[str] = self._get_executable_command(tool_name="black") + [ 

310 "--check", 

311 ] 

312 check_cmd.extend(self._build_common_args()) 

313 check_cmd.extend(ctx.rel_files) 

314 

315 if self.options.get("diff"): 

316 initial_issues = [] 

317 initial_line_length_issues = [] 

318 initial_count = 0 

319 else: 

320 try: 

321 _, check_output = self._run_subprocess( 

322 cmd=check_cmd, 

323 timeout=ctx.timeout, 

324 cwd=ctx.cwd, 

325 ) 

326 except subprocess.TimeoutExpired: 

327 return self._handle_timeout_error( 

328 ctx.timeout, 

329 initial_count=0, 

330 cwd=ctx.cwd, 

331 ) 

332 initial_issues = parse_black_output(output=check_output) 

333 initial_line_length_issues = self._check_line_length_violations( 

334 files=ctx.rel_files, 

335 cwd=ctx.cwd, 

336 ) 

337 initial_issues = initial_issues + initial_line_length_issues 

338 initial_count = len(initial_issues) 

339 

340 # Apply formatting 

341 fix_cmd_base: list[str] = self._get_executable_command(tool_name="black") 

342 fix_cmd: list[str] = list(fix_cmd_base) 

343 if self.options.get("diff"): 

344 fix_cmd.append("--diff") 

345 fix_cmd.extend(self._build_common_args()) 

346 fix_cmd.extend(ctx.rel_files) 

347 

348 logger.debug(f"[BlackPlugin] Fixing: {' '.join(fix_cmd)} (cwd={ctx.cwd})") 

349 try: 

350 _, fix_output = self._run_subprocess( 

351 cmd=fix_cmd, 

352 timeout=ctx.timeout, 

353 cwd=ctx.cwd, 

354 ) 

355 except subprocess.TimeoutExpired: 

356 return self._handle_timeout_error( 

357 ctx.timeout, 

358 initial_count=initial_count, 

359 cwd=ctx.cwd, 

360 ) 

361 

362 # Final check for remaining differences 

363 try: 

364 final_success, final_output = self._run_subprocess( 

365 cmd=check_cmd, 

366 timeout=ctx.timeout, 

367 cwd=ctx.cwd, 

368 ) 

369 except subprocess.TimeoutExpired: 

370 return self._handle_timeout_error( 

371 ctx.timeout, 

372 initial_count=initial_count, 

373 cwd=ctx.cwd, 

374 ) 

375 remaining_issues = parse_black_output(output=final_output) 

376 

377 # Check for line length violations that Black cannot wrap 

378 line_length_issues = self._check_line_length_violations( 

379 files=ctx.rel_files, 

380 cwd=ctx.cwd, 

381 ) 

382 

383 all_remaining_issues = remaining_issues + line_length_issues 

384 remaining_count = len(all_remaining_issues) 

385 

386 fixed_issues_parsed = parse_black_output(output=fix_output) 

387 fixed_count = max(0, initial_count - remaining_count) 

388 

389 # Build summary 

390 summary: list[str] = [] 

391 if fixed_count > 0: 

392 summary.append(f"Fixed {fixed_count} issue(s)") 

393 if remaining_count > 0: 

394 summary.append( 

395 f"Found {remaining_count} issue(s) that cannot be auto-fixed", 

396 ) 

397 final_summary = "\n".join(summary) if summary else "No fixes applied." 

398 

399 all_issues = (fixed_issues_parsed or []) + all_remaining_issues 

400 

401 return ToolResult( 

402 name=self.definition.name, 

403 success=(remaining_count == 0), 

404 output=final_summary, 

405 issues_count=remaining_count, 

406 issues=all_issues, 

407 initial_issues_count=initial_count, 

408 fixed_issues_count=fixed_count, 

409 remaining_issues_count=remaining_count, 

410 initial_issues=initial_issues if initial_issues else None, 

411 cwd=ctx.cwd, 

412 )