Coverage for lintro / tools / definitions / bandit.py: 74%

189 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Bandit tool definition. 

2 

3Bandit is a security linter designed to find common security issues in Python code. 

4It processes Python files, builds an AST, and runs security plugins against the 

5AST nodes to identify potential vulnerabilities. 

6""" 

7 

8from __future__ import annotations 

9 

10import json 

11import subprocess # nosec B404 - used safely with shell disabled 

12from dataclasses import dataclass 

13from typing import Any 

14 

15from loguru import logger 

16 

17from lintro.enums.bandit_levels import ( 

18 BanditConfidenceLevel, 

19 BanditSeverityLevel, 

20 normalize_bandit_confidence_level, 

21 normalize_bandit_severity_level, 

22) 

23from lintro.enums.doc_url_template import DocUrlTemplate 

24from lintro.enums.tool_type import ToolType 

25from lintro.models.core.tool_result import ToolResult 

26from lintro.parsers.bandit.bandit_parser import parse_bandit_output 

27from lintro.plugins.base import BaseToolPlugin 

28from lintro.plugins.protocol import ToolDefinition 

29from lintro.plugins.registry import register_tool 

30from lintro.utils.config import load_bandit_config 

31 

32# Constants for Bandit configuration 

33BANDIT_DEFAULT_TIMEOUT: int = 30 

34BANDIT_DEFAULT_PRIORITY: int = 90 # High priority for security tool 

35BANDIT_FILE_PATTERNS: list[str] = ["*.py", "*.pyi"] 

36BANDIT_OUTPUT_FORMAT: str = "json" 

37 

38 

39def _extract_bandit_json(raw_text: str) -> dict[str, Any]: 

40 """Extract Bandit's JSON object from mixed stdout/stderr text. 

41 

42 Bandit may print informational lines and a progress bar alongside the 

43 JSON report. This helper locates the first opening brace and the last 

44 closing brace and attempts to parse the enclosed JSON object. 

45 

46 Args: 

47 raw_text: Combined stdout+stderr text from Bandit. 

48 

49 Returns: 

50 Parsed JSON object. 

51 

52 Raises: 

53 json.JSONDecodeError: If JSON cannot be parsed. 

54 ValueError: If no JSON object boundaries are found. 

55 """ 

56 if not raw_text or not raw_text.strip(): 

57 raise json.JSONDecodeError("Empty output", raw_text or "", 0) 

58 

59 text: str = raw_text.strip() 

60 

61 # Quick path: if the entire text is JSON 

62 if text.startswith("{") and text.endswith("}"): 

63 result: dict[str, Any] = json.loads(text) 

64 return result 

65 

66 start: int = text.find("{") 

67 end: int = text.rfind("}") 

68 if start == -1 or end == -1 or end < start: 

69 raise ValueError("Could not locate JSON object in Bandit output") 

70 

71 json_str: str = text[start : end + 1] 

72 parsed: dict[str, Any] = json.loads(json_str) 

73 return parsed 

74 

75 

76@register_tool 

77@dataclass 

78class BanditPlugin(BaseToolPlugin): 

79 """Bandit security linter plugin. 

80 

81 This plugin integrates Bandit with Lintro for finding common security 

82 issues in Python code. 

83 """ 

84 

85 @property 

86 def definition(self) -> ToolDefinition: 

87 """Return the tool definition. 

88 

89 Returns: 

90 ToolDefinition containing tool metadata. 

91 """ 

92 return ToolDefinition( 

93 name="bandit", 

94 description=( 

95 "Security linter that finds common security issues in Python code" 

96 ), 

97 can_fix=False, 

98 tool_type=ToolType.SECURITY, 

99 file_patterns=BANDIT_FILE_PATTERNS, 

100 priority=BANDIT_DEFAULT_PRIORITY, 

101 conflicts_with=[], 

102 native_configs=["pyproject.toml", ".bandit", "bandit.yaml"], 

103 version_command=["bandit", "--version"], 

104 min_version="1.7.0", 

105 default_options={ 

106 "timeout": BANDIT_DEFAULT_TIMEOUT, 

107 "severity": None, 

108 "confidence": None, 

109 "tests": None, 

110 "skips": None, 

111 "profile": None, 

112 "configfile": None, 

113 "baseline": None, 

114 "ignore_nosec": False, 

115 "aggregate": "vuln", 

116 "verbose": False, 

117 "quiet": False, 

118 }, 

119 default_timeout=BANDIT_DEFAULT_TIMEOUT, 

120 ) 

121 

122 def __post_init__(self) -> None: 

123 """Initialize the tool with configuration from pyproject.toml.""" 

124 super().__post_init__() 

125 self._apply_native_config() 

126 

127 def reset_options(self) -> None: 

128 """Reset options and re-apply native [tool.bandit] config. 

129 

130 Overrides the base ``reset_options()`` so that native bandit 

131 configuration from pyproject.toml (skips, tests, severity, etc.) 

132 is preserved after the reset. Without this, the base reset 

133 restores ``definition.default_options`` which has ``skips: None``, 

134 silently dropping the user's configured skip list. 

135 """ 

136 super().reset_options() 

137 self._apply_native_config() 

138 

139 def _apply_native_config(self) -> None: 

140 """Load and apply native [tool.bandit] config from pyproject.toml.""" 

141 try: 

142 bandit_config = load_bandit_config() 

143 except Exception as e: 

144 logger.warning(f"[bandit] Failed to load native config: {e}") 

145 return 

146 

147 # Apply exclude_dirs 

148 if "exclude_dirs" in bandit_config: 

149 exclude_dirs = bandit_config["exclude_dirs"] 

150 if isinstance(exclude_dirs, list): 

151 for exclude_dir in exclude_dirs: 

152 pattern = f"{exclude_dir}/*" 

153 if pattern not in self.exclude_patterns: 

154 self.exclude_patterns.append(pattern) 

155 recursive_pattern = f"{exclude_dir}/**/*" 

156 if recursive_pattern not in self.exclude_patterns: 

157 self.exclude_patterns.append(recursive_pattern) 

158 

159 # Apply other options from configuration 

160 config_mapping = { 

161 "tests": "tests", 

162 "skips": "skips", 

163 "profile": "profile", 

164 "configfile": "configfile", 

165 "baseline": "baseline", 

166 "ignore_nosec": "ignore_nosec", 

167 "aggregate": "aggregate", 

168 "severity": "severity", 

169 "confidence": "confidence", 

170 } 

171 

172 valid_aggregates = {"vuln", "file"} 

173 

174 for config_key, option_key in config_mapping.items(): 

175 if config_key in bandit_config: 

176 value = bandit_config[config_key] 

177 try: 

178 if config_key == "severity" and value is not None: 

179 value = normalize_bandit_severity_level(value).value 

180 elif config_key == "confidence" and value is not None: 

181 value = normalize_bandit_confidence_level(value).value 

182 elif config_key in ("skips", "tests") and isinstance( 

183 value, 

184 list, 

185 ): 

186 value = ",".join(value) 

187 elif config_key == "aggregate" and value not in valid_aggregates: 

188 logger.warning( 

189 f"[bandit] Invalid native aggregate value: {value!r}", 

190 ) 

191 continue 

192 except (ValueError, TypeError) as e: 

193 logger.warning( 

194 f"[bandit] Invalid native config for {config_key}: {e}", 

195 ) 

196 continue 

197 self.options[option_key] = value 

198 

199 def set_options( 

200 self, 

201 severity: str | None = None, 

202 confidence: str | None = None, 

203 tests: str | None = None, 

204 skips: str | None = None, 

205 profile: str | None = None, 

206 configfile: str | None = None, 

207 baseline: str | None = None, 

208 ignore_nosec: bool | None = None, 

209 aggregate: str | None = None, 

210 verbose: bool | None = None, 

211 quiet: bool | None = None, 

212 **kwargs: Any, 

213 ) -> None: 

214 """Set Bandit-specific options. 

215 

216 Args: 

217 severity: Minimum severity level (LOW, MEDIUM, HIGH). 

218 confidence: Minimum confidence level (LOW, MEDIUM, HIGH). 

219 tests: Comma-separated list of test IDs to run. 

220 skips: Comma-separated list of test IDs to skip. 

221 profile: Profile to use. 

222 configfile: Path to config file. 

223 baseline: Path to baseline report for comparison. 

224 ignore_nosec: Ignore # nosec comments. 

225 aggregate: Aggregate by vulnerability or file. 

226 verbose: Verbose output. 

227 quiet: Quiet mode. 

228 **kwargs: Other tool options. 

229 

230 Raises: 

231 ValueError: If an option value is invalid. 

232 """ 

233 if severity is not None: 

234 severity = normalize_bandit_severity_level(severity).value 

235 

236 if confidence is not None: 

237 confidence = normalize_bandit_confidence_level(confidence).value 

238 

239 if aggregate is not None: 

240 valid_aggregates = ["vuln", "file"] 

241 if aggregate not in valid_aggregates: 

242 raise ValueError(f"aggregate must be one of {valid_aggregates}") 

243 

244 options: dict[str, Any] = { 

245 "severity": severity, 

246 "confidence": confidence, 

247 "tests": tests, 

248 "skips": skips, 

249 "profile": profile, 

250 "configfile": configfile, 

251 "baseline": baseline, 

252 "ignore_nosec": ignore_nosec, 

253 "aggregate": aggregate, 

254 "verbose": verbose, 

255 "quiet": quiet, 

256 } 

257 options = {k: v for k, v in options.items() if v is not None} 

258 super().set_options(**options, **kwargs) 

259 

260 def _build_check_command(self, files: list[str]) -> list[str]: 

261 """Build the bandit check command. 

262 

263 Args: 

264 files: List of files to check. 

265 

266 Returns: 

267 List of command arguments. 

268 """ 

269 cmd: list[str] = self._get_executable_command("bandit") + ["-r"] 

270 

271 severity_opt = self.options.get("severity") 

272 if severity_opt is not None: 

273 severity = normalize_bandit_severity_level(str(severity_opt)) 

274 if severity == BanditSeverityLevel.LOW: 

275 cmd.append("-l") 

276 elif severity == BanditSeverityLevel.MEDIUM: 

277 cmd.extend(["-ll"]) 

278 elif severity == BanditSeverityLevel.HIGH: 

279 cmd.extend(["-lll"]) 

280 

281 confidence_opt = self.options.get("confidence") 

282 if confidence_opt is not None: 

283 confidence = normalize_bandit_confidence_level(str(confidence_opt)) 

284 if confidence == BanditConfidenceLevel.LOW: 

285 cmd.append("-i") 

286 elif confidence == BanditConfidenceLevel.MEDIUM: 

287 cmd.extend(["-ii"]) 

288 elif confidence == BanditConfidenceLevel.HIGH: 

289 cmd.extend(["-iii"]) 

290 

291 tests_opt = self.options.get("tests") 

292 if tests_opt is not None: 

293 cmd.extend(["-t", str(tests_opt)]) 

294 

295 skips_opt = self.options.get("skips") 

296 if skips_opt is not None: 

297 cmd.extend(["-s", str(skips_opt)]) 

298 

299 profile_opt = self.options.get("profile") 

300 if profile_opt is not None: 

301 cmd.extend(["-p", str(profile_opt)]) 

302 

303 configfile_opt = self.options.get("configfile") 

304 if configfile_opt is not None: 

305 cmd.extend(["-c", str(configfile_opt)]) 

306 

307 baseline_opt = self.options.get("baseline") 

308 if baseline_opt is not None: 

309 cmd.extend(["-b", str(baseline_opt)]) 

310 

311 if self.options.get("ignore_nosec"): 

312 cmd.append("--ignore-nosec") 

313 

314 aggregate_opt = self.options.get("aggregate") 

315 if aggregate_opt is not None: 

316 cmd.extend(["-a", str(aggregate_opt)]) 

317 

318 if self.options.get("verbose"): 

319 cmd.append("-v") 

320 

321 if self.options.get("quiet"): 

322 cmd.append("-q") 

323 

324 # Output format 

325 cmd.extend(["-f", BANDIT_OUTPUT_FORMAT]) 

326 

327 # Add quiet flag to suppress log messages that interfere with JSON parsing 

328 if "-q" not in cmd: 

329 cmd.append("-q") 

330 

331 # Add files 

332 cmd.extend(files) 

333 

334 return cmd 

335 

336 def doc_url(self, code: str) -> str | None: 

337 """Return Bandit documentation URL for the given code. 

338 

339 Returns the plugins index page. Individual plugin page slugs do not 

340 follow a deterministic pattern so we link to the index instead. 

341 

342 Args: 

343 code: Bandit code (e.g., "B101"). 

344 

345 Returns: 

346 URL to the Bandit plugins index page, or None if code is empty. 

347 """ 

348 if code: 

349 return DocUrlTemplate.BANDIT 

350 return None 

351 

352 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

353 """Check files with Bandit for security issues. 

354 

355 Args: 

356 paths: List of file or directory paths to check. 

357 options: Runtime options that override defaults. 

358 

359 Returns: 

360 ToolResult with check results. 

361 """ 

362 # Merge runtime options 

363 merged_options = dict(self.options) 

364 merged_options.update(options) 

365 

366 # Use shared preparation for version check, path validation, file discovery 

367 ctx = self._prepare_execution(paths, options) 

368 if ctx.should_skip: 

369 return ctx.early_result # type: ignore[return-value] 

370 

371 # Use absolute paths to avoid running from inside Python package directories. 

372 # When bandit runs from inside lintro/, it may trigger imports that corrupt 

373 # the JSON output with loguru messages. 

374 cmd: list[str] = self._build_check_command(files=ctx.files) 

375 logger.debug(f"[bandit] Running: {' '.join(cmd[:10])}...") 

376 

377 output: str 

378 stderr_output: str = "" 

379 execution_failure: bool = False 

380 try: 

381 # Run subprocess directly to capture stdout and stderr separately. 

382 # Bandit outputs JSON to stdout, but stderr may contain info/warning 

383 # messages that would corrupt JSON parsing if combined. 

384 result = subprocess.run( # nosec B603 - cmd is validated 

385 cmd, 

386 capture_output=True, 

387 text=True, 

388 timeout=ctx.timeout, 

389 # Don't set cwd - use absolute paths instead to avoid 

390 # running from inside Python package directories 

391 ) 

392 # Use only stdout for JSON parsing 

393 output = (result.stdout or "").strip() 

394 stderr_output = (result.stderr or "").strip() 

395 # Log stderr for debugging if present 

396 if stderr_output: 

397 logger.debug(f"[bandit] stderr: {stderr_output[:500]}") 

398 except subprocess.TimeoutExpired: 

399 timeout_msg = ( 

400 f"Bandit execution timed out ({ctx.timeout}s limit exceeded).\n\n" 

401 "This may indicate:\n" 

402 " - Large codebase taking too long to process\n" 

403 " - Need to increase timeout via --tool-options bandit:timeout=N" 

404 ) 

405 return ToolResult( 

406 name=self.definition.name, 

407 success=False, 

408 output=timeout_msg, 

409 issues_count=0, 

410 ) 

411 except (OSError, ValueError, RuntimeError) as e: 

412 logger.error(f"Failed to run Bandit: {e}") 

413 output = f"Bandit failed: {e}" 

414 execution_failure = True 

415 

416 # Parse the JSON output 

417 try: 

418 # Handle "no files found" case - bandit outputs this to stderr, not stdout 

419 if ( 

420 "No .py/.pyi files found" in output 

421 or "No .py/.pyi files found" in stderr_output 

422 ): 

423 logger.debug("[bandit] No Python files found to check") 

424 return ToolResult( 

425 name=self.definition.name, 

426 success=True, 

427 output="No .py/.pyi files found to check.", 

428 issues_count=0, 

429 ) 

430 

431 if ("{" not in output or "}" not in output) and execution_failure: 

432 return ToolResult( 

433 name=self.definition.name, 

434 success=False, 

435 output=output, 

436 issues_count=0, 

437 ) 

438 

439 # Handle empty output (no JSON content to parse) 

440 if not output: 

441 # If Bandit exited non-zero with no output, treat as failure 

442 if result.returncode != 0: 

443 return ToolResult( 

444 name=self.definition.name, 

445 success=False, 

446 output=stderr_output or "Bandit failed with non-zero exit code", 

447 issues_count=0, 

448 ) 

449 logger.debug("[bandit] Empty output received") 

450 return ToolResult( 

451 name=self.definition.name, 

452 success=True, 

453 output="Bandit ran successfully and found no issues", 

454 issues_count=0, 

455 ) 

456 

457 bandit_data = _extract_bandit_json(raw_text=output) 

458 issues = parse_bandit_output(bandit_data) 

459 issues_count = len(issues) 

460 

461 execution_success = ( 

462 len(bandit_data.get("errors", [])) == 0 and not execution_failure 

463 ) 

464 

465 return ToolResult( 

466 name=self.definition.name, 

467 success=execution_success, 

468 output=output if execution_failure else None, 

469 issues_count=issues_count, 

470 issues=issues, 

471 ) 

472 

473 except (json.JSONDecodeError, ValueError) as e: 

474 logger.error(f"Failed to parse bandit output: {e}") 

475 return ToolResult( 

476 name=self.definition.name, 

477 success=False, 

478 output=(output or f"Failed to parse bandit output: {str(e)}"), 

479 issues_count=0, 

480 ) 

481 

482 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

483 """Bandit cannot fix issues, only report them. 

484 

485 Args: 

486 paths: List of file or directory paths to fix. 

487 options: Tool-specific options. 

488 

489 Returns: 

490 ToolResult: Never returns, always raises NotImplementedError. 

491 

492 Raises: 

493 NotImplementedError: Bandit does not support fixing issues. 

494 """ 

495 raise NotImplementedError( 

496 "Bandit cannot automatically fix security issues. Run 'lintro check' to " 

497 "see issues.", 

498 )