Coverage for lintro / parsers / semgrep / semgrep_parser.py: 90%
63 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Semgrep output parser for security and code quality findings."""
3from __future__ import annotations
5import json
6from typing import Any
8from loguru import logger
10from lintro.parsers.base_parser import (
11 extract_dict_field,
12 extract_int_field,
13 extract_str_field,
14 validate_str_field,
15)
16from lintro.parsers.semgrep.semgrep_issue import SemgrepIssue
19def _parse_single_result(result: dict[str, Any]) -> SemgrepIssue | None:
20 """Parse a single Semgrep result into a SemgrepIssue.
22 Args:
23 result: Dictionary containing a single Semgrep result.
25 Returns:
26 SemgrepIssue if parsing succeeds, None otherwise.
27 """
28 # Extract required fields
29 check_id = validate_str_field(
30 value=result.get("check_id"),
31 field_name="check_id",
32 log_warning=True,
33 )
34 path = validate_str_field(
35 value=result.get("path"),
36 field_name="path",
37 log_warning=True,
38 )
40 # Skip if required fields are missing
41 if not check_id or not path:
42 logger.warning("Skipping issue with missing check_id or path")
43 return None
45 # Extract start position (nested structure)
46 start = extract_dict_field(data=result, candidates=["start"])
47 line = extract_int_field(data=start, candidates=["line"], default=0)
48 column = extract_int_field(data=start, candidates=["col"], default=0)
50 # Skip if line is missing (required field)
51 if line is None or line == 0:
52 logger.warning("Skipping issue with missing or invalid line number")
53 return None
55 # Extract end position (nested structure)
56 end = extract_dict_field(data=result, candidates=["end"])
57 end_line = extract_int_field(data=end, candidates=["line"], default=0)
58 end_column = extract_int_field(data=end, candidates=["col"], default=0)
60 # Extract extra fields (nested structure)
61 extra = extract_dict_field(data=result, candidates=["extra"])
62 message = extract_str_field(data=extra, candidates=["message"], default="")
63 severity = extract_str_field(data=extra, candidates=["severity"], default="WARNING")
65 # Extract metadata (nested inside extra)
66 metadata = extract_dict_field(data=extra, candidates=["metadata"])
67 if metadata is None or not isinstance(metadata, dict):
68 metadata = {}
69 category = extract_str_field(data=metadata, candidates=["category"], default="")
71 # Extract CWE IDs (may be a list or None)
72 cwe_raw = metadata.get("cwe")
73 cwe: list[str] | None = None
74 if isinstance(cwe_raw, list):
75 cwe = [str(c) for c in cwe_raw if c is not None]
76 elif isinstance(cwe_raw, str):
77 cwe = [cwe_raw]
79 return SemgrepIssue(
80 file=path,
81 line=line,
82 column=column or 0,
83 message=message,
84 check_id=check_id,
85 end_line=end_line or 0,
86 end_column=end_column or 0,
87 severity=severity.upper() if severity else "WARNING",
88 category=category,
89 cwe=cwe,
90 metadata=metadata if metadata else None,
91 )
94def parse_semgrep_output(output: str | None) -> list[SemgrepIssue]:
95 """Parse Semgrep JSON output into SemgrepIssue objects.
97 Args:
98 output: JSON string from Semgrep output, or None.
100 Returns:
101 List of parsed security/code quality issues. Returns empty list for
102 None, empty string, invalid JSON, or unexpected data structure.
103 """
104 if output is None or not output.strip():
105 return []
107 try:
108 data = json.loads(output)
109 except json.JSONDecodeError as e:
110 logger.warning(f"Failed to parse Semgrep JSON output: {e}")
111 return []
113 if not isinstance(data, dict):
114 logger.warning(
115 "Semgrep output must be a JSON object, got %s",
116 type(data).__name__,
117 )
118 return []
120 results = data.get("results", [])
121 if not isinstance(results, list):
122 logger.warning(
123 "Semgrep results must be a list, got %s",
124 type(results).__name__,
125 )
126 return []
128 issues: list[SemgrepIssue] = []
130 for result in results:
131 if not isinstance(result, dict):
132 logger.debug("Skipping non-dict item in Semgrep results")
133 continue
135 try:
136 issue = _parse_single_result(result=result)
137 if issue is not None:
138 issues.append(issue)
139 except (KeyError, TypeError, ValueError) as e:
140 logger.warning(f"Failed to parse Semgrep issue: {e}")
141 continue
143 return issues