Coverage for lintro / tools / definitions / semgrep.py: 91%
137 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Semgrep tool definition.
3Semgrep is a fast, open-source static analysis tool for finding bugs and
4enforcing code standards. It supports 30+ languages using pattern-based rules
5and is commonly used for security scanning and code quality enforcement.
6"""
8from __future__ import annotations
10import json
11import subprocess # nosec B404 - used safely with shell disabled
12from dataclasses import dataclass
13from typing import Any
14from urllib.parse import quote as url_quote
16from loguru import logger
18from lintro._tool_versions import get_min_version
19from lintro.enums.doc_url_template import DocUrlTemplate
20from lintro.enums.semgrep_enums import SemgrepSeverity, normalize_semgrep_severity
21from lintro.enums.tool_name import ToolName
22from lintro.enums.tool_type import ToolType
23from lintro.models.core.tool_result import ToolResult
24from lintro.parsers.semgrep.semgrep_parser import parse_semgrep_output
25from lintro.plugins.base import BaseToolPlugin
26from lintro.plugins.protocol import ToolDefinition
27from lintro.plugins.registry import register_tool
28from lintro.tools.core.option_validators import (
29 filter_none_options,
30 validate_list,
31 validate_str,
32)
34# Constants for Semgrep configuration
35SEMGREP_DEFAULT_TIMEOUT: int = 300 # Semgrep needs more time on larger codebases
36SEMGREP_DEFAULT_PRIORITY: int = 85 # High priority for security tool
37SEMGREP_FILE_PATTERNS: list[str] = [
38 "*.py",
39 "*.js",
40 "*.ts",
41 "*.jsx",
42 "*.tsx",
43 "*.go",
44 "*.java",
45 "*.rb",
46 "*.php",
47 "*.c",
48 "*.cpp",
49 "*.rs",
50]
51SEMGREP_OUTPUT_FORMAT: str = "json"
52SEMGREP_DEFAULT_CONFIG: str = "auto"
55def _extract_semgrep_json(raw_text: str) -> dict[str, Any]:
56 """Extract Semgrep's JSON object from mixed stdout/stderr text.
58 Semgrep may print informational lines alongside the JSON report.
59 This helper locates the first opening brace and the last closing brace
60 and attempts to parse the enclosed JSON object.
62 Args:
63 raw_text: Combined stdout+stderr text from Semgrep.
65 Returns:
66 Parsed JSON object.
68 Raises:
69 json.JSONDecodeError: If JSON cannot be parsed.
70 ValueError: If no JSON object boundaries are found.
71 """
72 if not raw_text or not raw_text.strip():
73 raise json.JSONDecodeError("Empty output", raw_text or "", 0)
75 text: str = raw_text.strip()
77 # Quick path: if the entire text is JSON
78 if text.startswith("{") and text.endswith("}"):
79 result: dict[str, Any] = json.loads(text)
80 return result
82 start: int = text.find("{")
83 end: int = text.rfind("}")
84 if start == -1 or end == -1 or end < start:
85 raise ValueError("Could not locate JSON object in Semgrep output")
87 json_str: str = text[start : end + 1]
88 parsed: dict[str, Any] = json.loads(json_str)
89 return parsed
92@register_tool
93@dataclass
94class SemgrepPlugin(BaseToolPlugin):
95 """Semgrep static analysis and security scanning plugin.
97 This plugin integrates Semgrep with Lintro for finding security
98 vulnerabilities and enforcing code standards across multiple languages.
99 """
101 @property
102 def definition(self) -> ToolDefinition:
103 """Return the tool definition.
105 Returns:
106 ToolDefinition containing tool metadata.
107 """
108 return ToolDefinition(
109 name="semgrep",
110 description=(
111 "Fast, open-source static analysis tool for finding bugs "
112 "and enforcing code standards"
113 ),
114 can_fix=False,
115 tool_type=ToolType.LINTER | ToolType.SECURITY,
116 file_patterns=SEMGREP_FILE_PATTERNS,
117 priority=SEMGREP_DEFAULT_PRIORITY,
118 conflicts_with=[],
119 native_configs=[".semgrep.yaml", ".semgrep.yml", ".semgrep/"],
120 version_command=["semgrep", "--version"],
121 min_version=get_min_version(ToolName.SEMGREP),
122 default_options={
123 "timeout": SEMGREP_DEFAULT_TIMEOUT,
124 "config": SEMGREP_DEFAULT_CONFIG,
125 "exclude": None,
126 "include": None,
127 "severity": None,
128 "timeout_threshold": None,
129 "jobs": None,
130 "verbose": False,
131 "quiet": False,
132 },
133 default_timeout=SEMGREP_DEFAULT_TIMEOUT,
134 )
136 def set_options(
137 self,
138 config: str | None = None,
139 exclude: list[str] | None = None,
140 include: list[str] | None = None,
141 severity: str | SemgrepSeverity | None = None,
142 timeout_threshold: int | None = None,
143 jobs: int | None = None,
144 verbose: bool | None = None,
145 quiet: bool | None = None,
146 **kwargs: Any,
147 ) -> None:
148 """Set Semgrep-specific options.
150 Args:
151 config: Config string (auto, p/python, p/javascript, path to YAML).
152 exclude: Patterns to exclude from scanning.
153 include: Patterns to include in scanning.
154 severity: Minimum severity level (INFO, WARNING, ERROR).
155 timeout_threshold: Per-file timeout in seconds.
156 jobs: Number of parallel jobs.
157 verbose: Verbose output.
158 quiet: Quiet mode.
159 **kwargs: Other tool options.
161 Raises:
162 ValueError: If an option value is invalid.
163 """
164 validate_str(config, "config")
165 validate_list(exclude, "exclude")
166 validate_list(include, "include")
168 severity_str: str | None = None
169 if severity is not None:
170 severity_str = normalize_semgrep_severity(severity).name
172 if timeout_threshold is not None and (
173 not isinstance(timeout_threshold, int) or timeout_threshold < 0
174 ):
175 raise ValueError("timeout_threshold must be a non-negative integer")
177 if jobs is not None and (not isinstance(jobs, int) or jobs < 1):
178 raise ValueError("jobs must be a positive integer")
180 options = filter_none_options(
181 config=config,
182 exclude=exclude,
183 include=include,
184 severity=severity_str,
185 timeout_threshold=timeout_threshold,
186 jobs=jobs,
187 verbose=verbose,
188 quiet=quiet,
189 )
190 super().set_options(**options, **kwargs)
192 def _build_check_command(self, files: list[str]) -> list[str]:
193 """Build the semgrep check command.
195 Args:
196 files: List of files to check.
198 Returns:
199 List of command arguments.
200 """
201 cmd: list[str] = self._get_executable_command("semgrep") + ["scan"]
203 # Output format - always use JSON for reliable parsing
204 cmd.extend([f"--{SEMGREP_OUTPUT_FORMAT}"])
206 # Config option (required for semgrep to know what rules to use)
207 config_opt = self.options.get("config", SEMGREP_DEFAULT_CONFIG)
208 if config_opt is not None:
209 cmd.extend(["--config", str(config_opt)])
211 # Exclude patterns
212 exclude_opt = self.options.get("exclude")
213 if exclude_opt is not None and isinstance(exclude_opt, list):
214 for pattern in exclude_opt:
215 cmd.extend(["--exclude", str(pattern)])
217 # Include patterns
218 include_opt = self.options.get("include")
219 if include_opt is not None and isinstance(include_opt, list):
220 for pattern in include_opt:
221 cmd.extend(["--include", str(pattern)])
223 # Severity filter
224 severity_opt = self.options.get("severity")
225 if severity_opt is not None:
226 cmd.extend(["--severity", str(severity_opt)])
228 # Per-file timeout
229 timeout_threshold_opt = self.options.get("timeout_threshold")
230 if timeout_threshold_opt is not None:
231 cmd.extend(["--timeout", str(timeout_threshold_opt)])
233 # Parallel jobs
234 jobs_opt = self.options.get("jobs")
235 if jobs_opt is not None:
236 cmd.extend(["--jobs", str(jobs_opt)])
238 # Verbose/quiet flags
239 if self.options.get("verbose"):
240 cmd.append("--verbose")
242 if self.options.get("quiet"):
243 cmd.append("--quiet")
245 # Add files/directories to scan
246 cmd.extend(files)
248 return cmd
250 def doc_url(self, code: str) -> str | None:
251 """Return Semgrep registry URL for the given rule ID.
253 Registry rule IDs use dotted notation (e.g.,
254 ``python.lang.security.insecure-random``). Custom rules loaded
255 from local files typically contain ``/`` and are not in the
256 public registry.
258 Args:
259 code: Semgrep rule ID.
261 Returns:
262 URL to the Semgrep registry page, or None if the rule is
263 empty or appears to be a local/custom rule.
264 """
265 if not code:
266 return None
267 if "." in code and "/" not in code:
268 return DocUrlTemplate.SEMGREP.format(code=url_quote(code, safe=""))
269 return None
271 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult:
272 """Check files with Semgrep for security issues and code quality.
274 Args:
275 paths: List of file or directory paths to check.
276 options: Runtime options that override defaults.
278 Returns:
279 ToolResult with check results.
280 """
281 # Use shared preparation for version check, path validation, file discovery
282 ctx = self._prepare_execution(paths=paths, options=options)
283 if ctx.should_skip:
284 return ctx.early_result # type: ignore[return-value]
286 cmd: list[str] = self._build_check_command(files=ctx.rel_files)
287 logger.debug(f"[semgrep] Running: {' '.join(cmd[:10])}... (cwd={ctx.cwd})")
289 output: str
290 execution_failure: bool = False
291 try:
292 # Note: semgrep returns non-zero exit code when findings exist,
293 # so we intentionally ignore the success return value
294 _, combined = self._run_subprocess(
295 cmd=cmd,
296 timeout=ctx.timeout,
297 cwd=ctx.cwd,
298 )
299 output = (combined or "").strip()
300 except subprocess.TimeoutExpired:
301 timeout_msg = (
302 f"Semgrep execution timed out ({ctx.timeout}s limit exceeded).\n\n"
303 "This may indicate:\n"
304 " - Large codebase taking too long to process\n"
305 " - Need to increase timeout via --tool-options semgrep:timeout=N"
306 )
307 return ToolResult(
308 name=self.definition.name,
309 success=False,
310 output=timeout_msg,
311 issues_count=0,
312 )
313 except (OSError, ValueError, RuntimeError) as e:
314 logger.error(f"Failed to run Semgrep: {e}")
315 output = f"Semgrep failed: {e}"
316 execution_failure = True
318 # Parse the JSON output
319 try:
320 if ("{" not in output or "}" not in output) and execution_failure:
321 return ToolResult(
322 name=self.definition.name,
323 success=False,
324 output=output,
325 issues_count=0,
326 )
328 semgrep_data = _extract_semgrep_json(raw_text=output)
329 json_output = json.dumps(semgrep_data)
330 issues = parse_semgrep_output(output=json_output)
331 issues_count = len(issues)
333 # Check for errors in the response
334 # Partial parsing errors (e.g., TypeScript 4.9+ 'satisfies' keyword)
335 # are warnings, not fatal errors. Only fail on actual errors.
336 errors = semgrep_data.get("errors", [])
337 fatal_errors = [e for e in errors if e.get("level", "error") == "error"]
339 def _is_partial_parsing(err: dict[str, Any]) -> bool:
340 """Check if error is a PartialParsing warning.
342 Semgrep's error type can be either a string or a list where
343 the first element is the error type name.
344 """
345 if err.get("level") != "warn":
346 return False
347 err_type = err.get("type")
348 if isinstance(err_type, str):
349 return err_type == "PartialParsing"
350 if isinstance(err_type, list) and len(err_type) > 0:
351 return str(err_type[0]) == "PartialParsing"
352 return False
354 parsing_warnings = [e for e in errors if _is_partial_parsing(e)]
356 # Log parsing warnings but don't fail
357 if parsing_warnings:
358 logger.warning(
359 "[semgrep] {} file(s) partially parsed (may use unsupported "
360 "syntax like TypeScript 4.9+ 'satisfies')",
361 len(parsing_warnings),
362 )
364 execution_success = len(fatal_errors) == 0 and not execution_failure
365 has_fatal_errors = execution_failure or len(fatal_errors) > 0
367 return ToolResult(
368 name=self.definition.name,
369 success=execution_success,
370 output=output if has_fatal_errors else None,
371 issues_count=issues_count,
372 issues=issues,
373 )
375 except (json.JSONDecodeError, ValueError) as e:
376 logger.error(f"Failed to parse semgrep output: {e}")
377 return ToolResult(
378 name=self.definition.name,
379 success=False,
380 output=(output or f"Failed to parse semgrep output: {str(e)}"),
381 issues_count=0,
382 )
384 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult:
385 """Semgrep cannot fix issues, only report them.
387 Args:
388 paths: List of file or directory paths to fix.
389 options: Tool-specific options.
391 Returns:
392 ToolResult: Never returns, always raises NotImplementedError.
394 Raises:
395 NotImplementedError: Semgrep does not support fixing issues.
396 """
397 raise NotImplementedError(
398 "Semgrep cannot automatically fix security issues. Run 'lintro check' to "
399 "see issues.",
400 )