Coverage for lintro / tools / definitions / semgrep.py: 91%

137 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Semgrep tool definition. 

2 

3Semgrep is a fast, open-source static analysis tool for finding bugs and 

4enforcing code standards. It supports 30+ languages using pattern-based rules 

5and is commonly used for security scanning and code quality enforcement. 

6""" 

7 

8from __future__ import annotations 

9 

10import json 

11import subprocess # nosec B404 - used safely with shell disabled 

12from dataclasses import dataclass 

13from typing import Any 

14from urllib.parse import quote as url_quote 

15 

16from loguru import logger 

17 

18from lintro._tool_versions import get_min_version 

19from lintro.enums.doc_url_template import DocUrlTemplate 

20from lintro.enums.semgrep_enums import SemgrepSeverity, normalize_semgrep_severity 

21from lintro.enums.tool_name import ToolName 

22from lintro.enums.tool_type import ToolType 

23from lintro.models.core.tool_result import ToolResult 

24from lintro.parsers.semgrep.semgrep_parser import parse_semgrep_output 

25from lintro.plugins.base import BaseToolPlugin 

26from lintro.plugins.protocol import ToolDefinition 

27from lintro.plugins.registry import register_tool 

28from lintro.tools.core.option_validators import ( 

29 filter_none_options, 

30 validate_list, 

31 validate_str, 

32) 

33 

34# Constants for Semgrep configuration 

35SEMGREP_DEFAULT_TIMEOUT: int = 300 # Semgrep needs more time on larger codebases 

36SEMGREP_DEFAULT_PRIORITY: int = 85 # High priority for security tool 

37SEMGREP_FILE_PATTERNS: list[str] = [ 

38 "*.py", 

39 "*.js", 

40 "*.ts", 

41 "*.jsx", 

42 "*.tsx", 

43 "*.go", 

44 "*.java", 

45 "*.rb", 

46 "*.php", 

47 "*.c", 

48 "*.cpp", 

49 "*.rs", 

50] 

51SEMGREP_OUTPUT_FORMAT: str = "json" 

52SEMGREP_DEFAULT_CONFIG: str = "auto" 

53 

54 

55def _extract_semgrep_json(raw_text: str) -> dict[str, Any]: 

56 """Extract Semgrep's JSON object from mixed stdout/stderr text. 

57 

58 Semgrep may print informational lines alongside the JSON report. 

59 This helper locates the first opening brace and the last closing brace 

60 and attempts to parse the enclosed JSON object. 

61 

62 Args: 

63 raw_text: Combined stdout+stderr text from Semgrep. 

64 

65 Returns: 

66 Parsed JSON object. 

67 

68 Raises: 

69 json.JSONDecodeError: If JSON cannot be parsed. 

70 ValueError: If no JSON object boundaries are found. 

71 """ 

72 if not raw_text or not raw_text.strip(): 

73 raise json.JSONDecodeError("Empty output", raw_text or "", 0) 

74 

75 text: str = raw_text.strip() 

76 

77 # Quick path: if the entire text is JSON 

78 if text.startswith("{") and text.endswith("}"): 

79 result: dict[str, Any] = json.loads(text) 

80 return result 

81 

82 start: int = text.find("{") 

83 end: int = text.rfind("}") 

84 if start == -1 or end == -1 or end < start: 

85 raise ValueError("Could not locate JSON object in Semgrep output") 

86 

87 json_str: str = text[start : end + 1] 

88 parsed: dict[str, Any] = json.loads(json_str) 

89 return parsed 

90 

91 

92@register_tool 

93@dataclass 

94class SemgrepPlugin(BaseToolPlugin): 

95 """Semgrep static analysis and security scanning plugin. 

96 

97 This plugin integrates Semgrep with Lintro for finding security 

98 vulnerabilities and enforcing code standards across multiple languages. 

99 """ 

100 

101 @property 

102 def definition(self) -> ToolDefinition: 

103 """Return the tool definition. 

104 

105 Returns: 

106 ToolDefinition containing tool metadata. 

107 """ 

108 return ToolDefinition( 

109 name="semgrep", 

110 description=( 

111 "Fast, open-source static analysis tool for finding bugs " 

112 "and enforcing code standards" 

113 ), 

114 can_fix=False, 

115 tool_type=ToolType.LINTER | ToolType.SECURITY, 

116 file_patterns=SEMGREP_FILE_PATTERNS, 

117 priority=SEMGREP_DEFAULT_PRIORITY, 

118 conflicts_with=[], 

119 native_configs=[".semgrep.yaml", ".semgrep.yml", ".semgrep/"], 

120 version_command=["semgrep", "--version"], 

121 min_version=get_min_version(ToolName.SEMGREP), 

122 default_options={ 

123 "timeout": SEMGREP_DEFAULT_TIMEOUT, 

124 "config": SEMGREP_DEFAULT_CONFIG, 

125 "exclude": None, 

126 "include": None, 

127 "severity": None, 

128 "timeout_threshold": None, 

129 "jobs": None, 

130 "verbose": False, 

131 "quiet": False, 

132 }, 

133 default_timeout=SEMGREP_DEFAULT_TIMEOUT, 

134 ) 

135 

136 def set_options( 

137 self, 

138 config: str | None = None, 

139 exclude: list[str] | None = None, 

140 include: list[str] | None = None, 

141 severity: str | SemgrepSeverity | None = None, 

142 timeout_threshold: int | None = None, 

143 jobs: int | None = None, 

144 verbose: bool | None = None, 

145 quiet: bool | None = None, 

146 **kwargs: Any, 

147 ) -> None: 

148 """Set Semgrep-specific options. 

149 

150 Args: 

151 config: Config string (auto, p/python, p/javascript, path to YAML). 

152 exclude: Patterns to exclude from scanning. 

153 include: Patterns to include in scanning. 

154 severity: Minimum severity level (INFO, WARNING, ERROR). 

155 timeout_threshold: Per-file timeout in seconds. 

156 jobs: Number of parallel jobs. 

157 verbose: Verbose output. 

158 quiet: Quiet mode. 

159 **kwargs: Other tool options. 

160 

161 Raises: 

162 ValueError: If an option value is invalid. 

163 """ 

164 validate_str(config, "config") 

165 validate_list(exclude, "exclude") 

166 validate_list(include, "include") 

167 

168 severity_str: str | None = None 

169 if severity is not None: 

170 severity_str = normalize_semgrep_severity(severity).name 

171 

172 if timeout_threshold is not None and ( 

173 not isinstance(timeout_threshold, int) or timeout_threshold < 0 

174 ): 

175 raise ValueError("timeout_threshold must be a non-negative integer") 

176 

177 if jobs is not None and (not isinstance(jobs, int) or jobs < 1): 

178 raise ValueError("jobs must be a positive integer") 

179 

180 options = filter_none_options( 

181 config=config, 

182 exclude=exclude, 

183 include=include, 

184 severity=severity_str, 

185 timeout_threshold=timeout_threshold, 

186 jobs=jobs, 

187 verbose=verbose, 

188 quiet=quiet, 

189 ) 

190 super().set_options(**options, **kwargs) 

191 

192 def _build_check_command(self, files: list[str]) -> list[str]: 

193 """Build the semgrep check command. 

194 

195 Args: 

196 files: List of files to check. 

197 

198 Returns: 

199 List of command arguments. 

200 """ 

201 cmd: list[str] = self._get_executable_command("semgrep") + ["scan"] 

202 

203 # Output format - always use JSON for reliable parsing 

204 cmd.extend([f"--{SEMGREP_OUTPUT_FORMAT}"]) 

205 

206 # Config option (required for semgrep to know what rules to use) 

207 config_opt = self.options.get("config", SEMGREP_DEFAULT_CONFIG) 

208 if config_opt is not None: 

209 cmd.extend(["--config", str(config_opt)]) 

210 

211 # Exclude patterns 

212 exclude_opt = self.options.get("exclude") 

213 if exclude_opt is not None and isinstance(exclude_opt, list): 

214 for pattern in exclude_opt: 

215 cmd.extend(["--exclude", str(pattern)]) 

216 

217 # Include patterns 

218 include_opt = self.options.get("include") 

219 if include_opt is not None and isinstance(include_opt, list): 

220 for pattern in include_opt: 

221 cmd.extend(["--include", str(pattern)]) 

222 

223 # Severity filter 

224 severity_opt = self.options.get("severity") 

225 if severity_opt is not None: 

226 cmd.extend(["--severity", str(severity_opt)]) 

227 

228 # Per-file timeout 

229 timeout_threshold_opt = self.options.get("timeout_threshold") 

230 if timeout_threshold_opt is not None: 

231 cmd.extend(["--timeout", str(timeout_threshold_opt)]) 

232 

233 # Parallel jobs 

234 jobs_opt = self.options.get("jobs") 

235 if jobs_opt is not None: 

236 cmd.extend(["--jobs", str(jobs_opt)]) 

237 

238 # Verbose/quiet flags 

239 if self.options.get("verbose"): 

240 cmd.append("--verbose") 

241 

242 if self.options.get("quiet"): 

243 cmd.append("--quiet") 

244 

245 # Add files/directories to scan 

246 cmd.extend(files) 

247 

248 return cmd 

249 

250 def doc_url(self, code: str) -> str | None: 

251 """Return Semgrep registry URL for the given rule ID. 

252 

253 Registry rule IDs use dotted notation (e.g., 

254 ``python.lang.security.insecure-random``). Custom rules loaded 

255 from local files typically contain ``/`` and are not in the 

256 public registry. 

257 

258 Args: 

259 code: Semgrep rule ID. 

260 

261 Returns: 

262 URL to the Semgrep registry page, or None if the rule is 

263 empty or appears to be a local/custom rule. 

264 """ 

265 if not code: 

266 return None 

267 if "." in code and "/" not in code: 

268 return DocUrlTemplate.SEMGREP.format(code=url_quote(code, safe="")) 

269 return None 

270 

271 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

272 """Check files with Semgrep for security issues and code quality. 

273 

274 Args: 

275 paths: List of file or directory paths to check. 

276 options: Runtime options that override defaults. 

277 

278 Returns: 

279 ToolResult with check results. 

280 """ 

281 # Use shared preparation for version check, path validation, file discovery 

282 ctx = self._prepare_execution(paths=paths, options=options) 

283 if ctx.should_skip: 

284 return ctx.early_result # type: ignore[return-value] 

285 

286 cmd: list[str] = self._build_check_command(files=ctx.rel_files) 

287 logger.debug(f"[semgrep] Running: {' '.join(cmd[:10])}... (cwd={ctx.cwd})") 

288 

289 output: str 

290 execution_failure: bool = False 

291 try: 

292 # Note: semgrep returns non-zero exit code when findings exist, 

293 # so we intentionally ignore the success return value 

294 _, combined = self._run_subprocess( 

295 cmd=cmd, 

296 timeout=ctx.timeout, 

297 cwd=ctx.cwd, 

298 ) 

299 output = (combined or "").strip() 

300 except subprocess.TimeoutExpired: 

301 timeout_msg = ( 

302 f"Semgrep execution timed out ({ctx.timeout}s limit exceeded).\n\n" 

303 "This may indicate:\n" 

304 " - Large codebase taking too long to process\n" 

305 " - Need to increase timeout via --tool-options semgrep:timeout=N" 

306 ) 

307 return ToolResult( 

308 name=self.definition.name, 

309 success=False, 

310 output=timeout_msg, 

311 issues_count=0, 

312 ) 

313 except (OSError, ValueError, RuntimeError) as e: 

314 logger.error(f"Failed to run Semgrep: {e}") 

315 output = f"Semgrep failed: {e}" 

316 execution_failure = True 

317 

318 # Parse the JSON output 

319 try: 

320 if ("{" not in output or "}" not in output) and execution_failure: 

321 return ToolResult( 

322 name=self.definition.name, 

323 success=False, 

324 output=output, 

325 issues_count=0, 

326 ) 

327 

328 semgrep_data = _extract_semgrep_json(raw_text=output) 

329 json_output = json.dumps(semgrep_data) 

330 issues = parse_semgrep_output(output=json_output) 

331 issues_count = len(issues) 

332 

333 # Check for errors in the response 

334 # Partial parsing errors (e.g., TypeScript 4.9+ 'satisfies' keyword) 

335 # are warnings, not fatal errors. Only fail on actual errors. 

336 errors = semgrep_data.get("errors", []) 

337 fatal_errors = [e for e in errors if e.get("level", "error") == "error"] 

338 

339 def _is_partial_parsing(err: dict[str, Any]) -> bool: 

340 """Check if error is a PartialParsing warning. 

341 

342 Semgrep's error type can be either a string or a list where 

343 the first element is the error type name. 

344 """ 

345 if err.get("level") != "warn": 

346 return False 

347 err_type = err.get("type") 

348 if isinstance(err_type, str): 

349 return err_type == "PartialParsing" 

350 if isinstance(err_type, list) and len(err_type) > 0: 

351 return str(err_type[0]) == "PartialParsing" 

352 return False 

353 

354 parsing_warnings = [e for e in errors if _is_partial_parsing(e)] 

355 

356 # Log parsing warnings but don't fail 

357 if parsing_warnings: 

358 logger.warning( 

359 "[semgrep] {} file(s) partially parsed (may use unsupported " 

360 "syntax like TypeScript 4.9+ 'satisfies')", 

361 len(parsing_warnings), 

362 ) 

363 

364 execution_success = len(fatal_errors) == 0 and not execution_failure 

365 has_fatal_errors = execution_failure or len(fatal_errors) > 0 

366 

367 return ToolResult( 

368 name=self.definition.name, 

369 success=execution_success, 

370 output=output if has_fatal_errors else None, 

371 issues_count=issues_count, 

372 issues=issues, 

373 ) 

374 

375 except (json.JSONDecodeError, ValueError) as e: 

376 logger.error(f"Failed to parse semgrep output: {e}") 

377 return ToolResult( 

378 name=self.definition.name, 

379 success=False, 

380 output=(output or f"Failed to parse semgrep output: {str(e)}"), 

381 issues_count=0, 

382 ) 

383 

384 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

385 """Semgrep cannot fix issues, only report them. 

386 

387 Args: 

388 paths: List of file or directory paths to fix. 

389 options: Tool-specific options. 

390 

391 Returns: 

392 ToolResult: Never returns, always raises NotImplementedError. 

393 

394 Raises: 

395 NotImplementedError: Semgrep does not support fixing issues. 

396 """ 

397 raise NotImplementedError( 

398 "Semgrep cannot automatically fix security issues. Run 'lintro check' to " 

399 "see issues.", 

400 )