Coverage for lintro / parsers / semgrep / semgrep_parser.py: 90%

63 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Semgrep output parser for security and code quality findings.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from typing import Any 

7 

8from loguru import logger 

9 

10from lintro.parsers.base_parser import ( 

11 extract_dict_field, 

12 extract_int_field, 

13 extract_str_field, 

14 validate_str_field, 

15) 

16from lintro.parsers.semgrep.semgrep_issue import SemgrepIssue 

17 

18 

19def _parse_single_result(result: dict[str, Any]) -> SemgrepIssue | None: 

20 """Parse a single Semgrep result into a SemgrepIssue. 

21 

22 Args: 

23 result: Dictionary containing a single Semgrep result. 

24 

25 Returns: 

26 SemgrepIssue if parsing succeeds, None otherwise. 

27 """ 

28 # Extract required fields 

29 check_id = validate_str_field( 

30 value=result.get("check_id"), 

31 field_name="check_id", 

32 log_warning=True, 

33 ) 

34 path = validate_str_field( 

35 value=result.get("path"), 

36 field_name="path", 

37 log_warning=True, 

38 ) 

39 

40 # Skip if required fields are missing 

41 if not check_id or not path: 

42 logger.warning("Skipping issue with missing check_id or path") 

43 return None 

44 

45 # Extract start position (nested structure) 

46 start = extract_dict_field(data=result, candidates=["start"]) 

47 line = extract_int_field(data=start, candidates=["line"], default=0) 

48 column = extract_int_field(data=start, candidates=["col"], default=0) 

49 

50 # Skip if line is missing (required field) 

51 if line is None or line == 0: 

52 logger.warning("Skipping issue with missing or invalid line number") 

53 return None 

54 

55 # Extract end position (nested structure) 

56 end = extract_dict_field(data=result, candidates=["end"]) 

57 end_line = extract_int_field(data=end, candidates=["line"], default=0) 

58 end_column = extract_int_field(data=end, candidates=["col"], default=0) 

59 

60 # Extract extra fields (nested structure) 

61 extra = extract_dict_field(data=result, candidates=["extra"]) 

62 message = extract_str_field(data=extra, candidates=["message"], default="") 

63 severity = extract_str_field(data=extra, candidates=["severity"], default="WARNING") 

64 

65 # Extract metadata (nested inside extra) 

66 metadata = extract_dict_field(data=extra, candidates=["metadata"]) 

67 if metadata is None or not isinstance(metadata, dict): 

68 metadata = {} 

69 category = extract_str_field(data=metadata, candidates=["category"], default="") 

70 

71 # Extract CWE IDs (may be a list or None) 

72 cwe_raw = metadata.get("cwe") 

73 cwe: list[str] | None = None 

74 if isinstance(cwe_raw, list): 

75 cwe = [str(c) for c in cwe_raw if c is not None] 

76 elif isinstance(cwe_raw, str): 

77 cwe = [cwe_raw] 

78 

79 return SemgrepIssue( 

80 file=path, 

81 line=line, 

82 column=column or 0, 

83 message=message, 

84 check_id=check_id, 

85 end_line=end_line or 0, 

86 end_column=end_column or 0, 

87 severity=severity.upper() if severity else "WARNING", 

88 category=category, 

89 cwe=cwe, 

90 metadata=metadata if metadata else None, 

91 ) 

92 

93 

94def parse_semgrep_output(output: str | None) -> list[SemgrepIssue]: 

95 """Parse Semgrep JSON output into SemgrepIssue objects. 

96 

97 Args: 

98 output: JSON string from Semgrep output, or None. 

99 

100 Returns: 

101 List of parsed security/code quality issues. Returns empty list for 

102 None, empty string, invalid JSON, or unexpected data structure. 

103 """ 

104 if output is None or not output.strip(): 

105 return [] 

106 

107 try: 

108 data = json.loads(output) 

109 except json.JSONDecodeError as e: 

110 logger.warning(f"Failed to parse Semgrep JSON output: {e}") 

111 return [] 

112 

113 if not isinstance(data, dict): 

114 logger.warning( 

115 "Semgrep output must be a JSON object, got %s", 

116 type(data).__name__, 

117 ) 

118 return [] 

119 

120 results = data.get("results", []) 

121 if not isinstance(results, list): 

122 logger.warning( 

123 "Semgrep results must be a list, got %s", 

124 type(results).__name__, 

125 ) 

126 return [] 

127 

128 issues: list[SemgrepIssue] = [] 

129 

130 for result in results: 

131 if not isinstance(result, dict): 

132 logger.debug("Skipping non-dict item in Semgrep results") 

133 continue 

134 

135 try: 

136 issue = _parse_single_result(result=result) 

137 if issue is not None: 

138 issues.append(issue) 

139 except (KeyError, TypeError, ValueError) as e: 

140 logger.warning(f"Failed to parse Semgrep issue: {e}") 

141 continue 

142 

143 return issues