Coverage for lintro / tools / definitions / bandit.py: 74%
189 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Bandit tool definition.
3Bandit is a security linter designed to find common security issues in Python code.
4It processes Python files, builds an AST, and runs security plugins against the
5AST nodes to identify potential vulnerabilities.
6"""
8from __future__ import annotations
10import json
11import subprocess # nosec B404 - used safely with shell disabled
12from dataclasses import dataclass
13from typing import Any
15from loguru import logger
17from lintro.enums.bandit_levels import (
18 BanditConfidenceLevel,
19 BanditSeverityLevel,
20 normalize_bandit_confidence_level,
21 normalize_bandit_severity_level,
22)
23from lintro.enums.doc_url_template import DocUrlTemplate
24from lintro.enums.tool_type import ToolType
25from lintro.models.core.tool_result import ToolResult
26from lintro.parsers.bandit.bandit_parser import parse_bandit_output
27from lintro.plugins.base import BaseToolPlugin
28from lintro.plugins.protocol import ToolDefinition
29from lintro.plugins.registry import register_tool
30from lintro.utils.config import load_bandit_config
32# Constants for Bandit configuration
33BANDIT_DEFAULT_TIMEOUT: int = 30
34BANDIT_DEFAULT_PRIORITY: int = 90 # High priority for security tool
35BANDIT_FILE_PATTERNS: list[str] = ["*.py", "*.pyi"]
36BANDIT_OUTPUT_FORMAT: str = "json"
39def _extract_bandit_json(raw_text: str) -> dict[str, Any]:
40 """Extract Bandit's JSON object from mixed stdout/stderr text.
42 Bandit may print informational lines and a progress bar alongside the
43 JSON report. This helper locates the first opening brace and the last
44 closing brace and attempts to parse the enclosed JSON object.
46 Args:
47 raw_text: Combined stdout+stderr text from Bandit.
49 Returns:
50 Parsed JSON object.
52 Raises:
53 json.JSONDecodeError: If JSON cannot be parsed.
54 ValueError: If no JSON object boundaries are found.
55 """
56 if not raw_text or not raw_text.strip():
57 raise json.JSONDecodeError("Empty output", raw_text or "", 0)
59 text: str = raw_text.strip()
61 # Quick path: if the entire text is JSON
62 if text.startswith("{") and text.endswith("}"):
63 result: dict[str, Any] = json.loads(text)
64 return result
66 start: int = text.find("{")
67 end: int = text.rfind("}")
68 if start == -1 or end == -1 or end < start:
69 raise ValueError("Could not locate JSON object in Bandit output")
71 json_str: str = text[start : end + 1]
72 parsed: dict[str, Any] = json.loads(json_str)
73 return parsed
76@register_tool
77@dataclass
78class BanditPlugin(BaseToolPlugin):
79 """Bandit security linter plugin.
81 This plugin integrates Bandit with Lintro for finding common security
82 issues in Python code.
83 """
85 @property
86 def definition(self) -> ToolDefinition:
87 """Return the tool definition.
89 Returns:
90 ToolDefinition containing tool metadata.
91 """
92 return ToolDefinition(
93 name="bandit",
94 description=(
95 "Security linter that finds common security issues in Python code"
96 ),
97 can_fix=False,
98 tool_type=ToolType.SECURITY,
99 file_patterns=BANDIT_FILE_PATTERNS,
100 priority=BANDIT_DEFAULT_PRIORITY,
101 conflicts_with=[],
102 native_configs=["pyproject.toml", ".bandit", "bandit.yaml"],
103 version_command=["bandit", "--version"],
104 min_version="1.7.0",
105 default_options={
106 "timeout": BANDIT_DEFAULT_TIMEOUT,
107 "severity": None,
108 "confidence": None,
109 "tests": None,
110 "skips": None,
111 "profile": None,
112 "configfile": None,
113 "baseline": None,
114 "ignore_nosec": False,
115 "aggregate": "vuln",
116 "verbose": False,
117 "quiet": False,
118 },
119 default_timeout=BANDIT_DEFAULT_TIMEOUT,
120 )
122 def __post_init__(self) -> None:
123 """Initialize the tool with configuration from pyproject.toml."""
124 super().__post_init__()
125 self._apply_native_config()
127 def reset_options(self) -> None:
128 """Reset options and re-apply native [tool.bandit] config.
130 Overrides the base ``reset_options()`` so that native bandit
131 configuration from pyproject.toml (skips, tests, severity, etc.)
132 is preserved after the reset. Without this, the base reset
133 restores ``definition.default_options`` which has ``skips: None``,
134 silently dropping the user's configured skip list.
135 """
136 super().reset_options()
137 self._apply_native_config()
139 def _apply_native_config(self) -> None:
140 """Load and apply native [tool.bandit] config from pyproject.toml."""
141 try:
142 bandit_config = load_bandit_config()
143 except Exception as e:
144 logger.warning(f"[bandit] Failed to load native config: {e}")
145 return
147 # Apply exclude_dirs
148 if "exclude_dirs" in bandit_config:
149 exclude_dirs = bandit_config["exclude_dirs"]
150 if isinstance(exclude_dirs, list):
151 for exclude_dir in exclude_dirs:
152 pattern = f"{exclude_dir}/*"
153 if pattern not in self.exclude_patterns:
154 self.exclude_patterns.append(pattern)
155 recursive_pattern = f"{exclude_dir}/**/*"
156 if recursive_pattern not in self.exclude_patterns:
157 self.exclude_patterns.append(recursive_pattern)
159 # Apply other options from configuration
160 config_mapping = {
161 "tests": "tests",
162 "skips": "skips",
163 "profile": "profile",
164 "configfile": "configfile",
165 "baseline": "baseline",
166 "ignore_nosec": "ignore_nosec",
167 "aggregate": "aggregate",
168 "severity": "severity",
169 "confidence": "confidence",
170 }
172 valid_aggregates = {"vuln", "file"}
174 for config_key, option_key in config_mapping.items():
175 if config_key in bandit_config:
176 value = bandit_config[config_key]
177 try:
178 if config_key == "severity" and value is not None:
179 value = normalize_bandit_severity_level(value).value
180 elif config_key == "confidence" and value is not None:
181 value = normalize_bandit_confidence_level(value).value
182 elif config_key in ("skips", "tests") and isinstance(
183 value,
184 list,
185 ):
186 value = ",".join(value)
187 elif config_key == "aggregate" and value not in valid_aggregates:
188 logger.warning(
189 f"[bandit] Invalid native aggregate value: {value!r}",
190 )
191 continue
192 except (ValueError, TypeError) as e:
193 logger.warning(
194 f"[bandit] Invalid native config for {config_key}: {e}",
195 )
196 continue
197 self.options[option_key] = value
199 def set_options(
200 self,
201 severity: str | None = None,
202 confidence: str | None = None,
203 tests: str | None = None,
204 skips: str | None = None,
205 profile: str | None = None,
206 configfile: str | None = None,
207 baseline: str | None = None,
208 ignore_nosec: bool | None = None,
209 aggregate: str | None = None,
210 verbose: bool | None = None,
211 quiet: bool | None = None,
212 **kwargs: Any,
213 ) -> None:
214 """Set Bandit-specific options.
216 Args:
217 severity: Minimum severity level (LOW, MEDIUM, HIGH).
218 confidence: Minimum confidence level (LOW, MEDIUM, HIGH).
219 tests: Comma-separated list of test IDs to run.
220 skips: Comma-separated list of test IDs to skip.
221 profile: Profile to use.
222 configfile: Path to config file.
223 baseline: Path to baseline report for comparison.
224 ignore_nosec: Ignore # nosec comments.
225 aggregate: Aggregate by vulnerability or file.
226 verbose: Verbose output.
227 quiet: Quiet mode.
228 **kwargs: Other tool options.
230 Raises:
231 ValueError: If an option value is invalid.
232 """
233 if severity is not None:
234 severity = normalize_bandit_severity_level(severity).value
236 if confidence is not None:
237 confidence = normalize_bandit_confidence_level(confidence).value
239 if aggregate is not None:
240 valid_aggregates = ["vuln", "file"]
241 if aggregate not in valid_aggregates:
242 raise ValueError(f"aggregate must be one of {valid_aggregates}")
244 options: dict[str, Any] = {
245 "severity": severity,
246 "confidence": confidence,
247 "tests": tests,
248 "skips": skips,
249 "profile": profile,
250 "configfile": configfile,
251 "baseline": baseline,
252 "ignore_nosec": ignore_nosec,
253 "aggregate": aggregate,
254 "verbose": verbose,
255 "quiet": quiet,
256 }
257 options = {k: v for k, v in options.items() if v is not None}
258 super().set_options(**options, **kwargs)
260 def _build_check_command(self, files: list[str]) -> list[str]:
261 """Build the bandit check command.
263 Args:
264 files: List of files to check.
266 Returns:
267 List of command arguments.
268 """
269 cmd: list[str] = self._get_executable_command("bandit") + ["-r"]
271 severity_opt = self.options.get("severity")
272 if severity_opt is not None:
273 severity = normalize_bandit_severity_level(str(severity_opt))
274 if severity == BanditSeverityLevel.LOW:
275 cmd.append("-l")
276 elif severity == BanditSeverityLevel.MEDIUM:
277 cmd.extend(["-ll"])
278 elif severity == BanditSeverityLevel.HIGH:
279 cmd.extend(["-lll"])
281 confidence_opt = self.options.get("confidence")
282 if confidence_opt is not None:
283 confidence = normalize_bandit_confidence_level(str(confidence_opt))
284 if confidence == BanditConfidenceLevel.LOW:
285 cmd.append("-i")
286 elif confidence == BanditConfidenceLevel.MEDIUM:
287 cmd.extend(["-ii"])
288 elif confidence == BanditConfidenceLevel.HIGH:
289 cmd.extend(["-iii"])
291 tests_opt = self.options.get("tests")
292 if tests_opt is not None:
293 cmd.extend(["-t", str(tests_opt)])
295 skips_opt = self.options.get("skips")
296 if skips_opt is not None:
297 cmd.extend(["-s", str(skips_opt)])
299 profile_opt = self.options.get("profile")
300 if profile_opt is not None:
301 cmd.extend(["-p", str(profile_opt)])
303 configfile_opt = self.options.get("configfile")
304 if configfile_opt is not None:
305 cmd.extend(["-c", str(configfile_opt)])
307 baseline_opt = self.options.get("baseline")
308 if baseline_opt is not None:
309 cmd.extend(["-b", str(baseline_opt)])
311 if self.options.get("ignore_nosec"):
312 cmd.append("--ignore-nosec")
314 aggregate_opt = self.options.get("aggregate")
315 if aggregate_opt is not None:
316 cmd.extend(["-a", str(aggregate_opt)])
318 if self.options.get("verbose"):
319 cmd.append("-v")
321 if self.options.get("quiet"):
322 cmd.append("-q")
324 # Output format
325 cmd.extend(["-f", BANDIT_OUTPUT_FORMAT])
327 # Add quiet flag to suppress log messages that interfere with JSON parsing
328 if "-q" not in cmd:
329 cmd.append("-q")
331 # Add files
332 cmd.extend(files)
334 return cmd
336 def doc_url(self, code: str) -> str | None:
337 """Return Bandit documentation URL for the given code.
339 Returns the plugins index page. Individual plugin page slugs do not
340 follow a deterministic pattern so we link to the index instead.
342 Args:
343 code: Bandit code (e.g., "B101").
345 Returns:
346 URL to the Bandit plugins index page, or None if code is empty.
347 """
348 if code:
349 return DocUrlTemplate.BANDIT
350 return None
352 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult:
353 """Check files with Bandit for security issues.
355 Args:
356 paths: List of file or directory paths to check.
357 options: Runtime options that override defaults.
359 Returns:
360 ToolResult with check results.
361 """
362 # Merge runtime options
363 merged_options = dict(self.options)
364 merged_options.update(options)
366 # Use shared preparation for version check, path validation, file discovery
367 ctx = self._prepare_execution(paths, options)
368 if ctx.should_skip:
369 return ctx.early_result # type: ignore[return-value]
371 # Use absolute paths to avoid running from inside Python package directories.
372 # When bandit runs from inside lintro/, it may trigger imports that corrupt
373 # the JSON output with loguru messages.
374 cmd: list[str] = self._build_check_command(files=ctx.files)
375 logger.debug(f"[bandit] Running: {' '.join(cmd[:10])}...")
377 output: str
378 stderr_output: str = ""
379 execution_failure: bool = False
380 try:
381 # Run subprocess directly to capture stdout and stderr separately.
382 # Bandit outputs JSON to stdout, but stderr may contain info/warning
383 # messages that would corrupt JSON parsing if combined.
384 result = subprocess.run( # nosec B603 - cmd is validated
385 cmd,
386 capture_output=True,
387 text=True,
388 timeout=ctx.timeout,
389 # Don't set cwd - use absolute paths instead to avoid
390 # running from inside Python package directories
391 )
392 # Use only stdout for JSON parsing
393 output = (result.stdout or "").strip()
394 stderr_output = (result.stderr or "").strip()
395 # Log stderr for debugging if present
396 if stderr_output:
397 logger.debug(f"[bandit] stderr: {stderr_output[:500]}")
398 except subprocess.TimeoutExpired:
399 timeout_msg = (
400 f"Bandit execution timed out ({ctx.timeout}s limit exceeded).\n\n"
401 "This may indicate:\n"
402 " - Large codebase taking too long to process\n"
403 " - Need to increase timeout via --tool-options bandit:timeout=N"
404 )
405 return ToolResult(
406 name=self.definition.name,
407 success=False,
408 output=timeout_msg,
409 issues_count=0,
410 )
411 except (OSError, ValueError, RuntimeError) as e:
412 logger.error(f"Failed to run Bandit: {e}")
413 output = f"Bandit failed: {e}"
414 execution_failure = True
416 # Parse the JSON output
417 try:
418 # Handle "no files found" case - bandit outputs this to stderr, not stdout
419 if (
420 "No .py/.pyi files found" in output
421 or "No .py/.pyi files found" in stderr_output
422 ):
423 logger.debug("[bandit] No Python files found to check")
424 return ToolResult(
425 name=self.definition.name,
426 success=True,
427 output="No .py/.pyi files found to check.",
428 issues_count=0,
429 )
431 if ("{" not in output or "}" not in output) and execution_failure:
432 return ToolResult(
433 name=self.definition.name,
434 success=False,
435 output=output,
436 issues_count=0,
437 )
439 # Handle empty output (no JSON content to parse)
440 if not output:
441 # If Bandit exited non-zero with no output, treat as failure
442 if result.returncode != 0:
443 return ToolResult(
444 name=self.definition.name,
445 success=False,
446 output=stderr_output or "Bandit failed with non-zero exit code",
447 issues_count=0,
448 )
449 logger.debug("[bandit] Empty output received")
450 return ToolResult(
451 name=self.definition.name,
452 success=True,
453 output="Bandit ran successfully and found no issues",
454 issues_count=0,
455 )
457 bandit_data = _extract_bandit_json(raw_text=output)
458 issues = parse_bandit_output(bandit_data)
459 issues_count = len(issues)
461 execution_success = (
462 len(bandit_data.get("errors", [])) == 0 and not execution_failure
463 )
465 return ToolResult(
466 name=self.definition.name,
467 success=execution_success,
468 output=output if execution_failure else None,
469 issues_count=issues_count,
470 issues=issues,
471 )
473 except (json.JSONDecodeError, ValueError) as e:
474 logger.error(f"Failed to parse bandit output: {e}")
475 return ToolResult(
476 name=self.definition.name,
477 success=False,
478 output=(output or f"Failed to parse bandit output: {str(e)}"),
479 issues_count=0,
480 )
482 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult:
483 """Bandit cannot fix issues, only report them.
485 Args:
486 paths: List of file or directory paths to fix.
487 options: Tool-specific options.
489 Returns:
490 ToolResult: Never returns, always raises NotImplementedError.
492 Raises:
493 NotImplementedError: Bandit does not support fixing issues.
494 """
495 raise NotImplementedError(
496 "Bandit cannot automatically fix security issues. Run 'lintro check' to "
497 "see issues.",
498 )