Coverage for lintro / parsers / osv_scanner / osv_scanner_parser.py: 83%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Parser for OSV-Scanner JSON output.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from typing import Any 

7 

8from loguru import logger 

9 

10from lintro.parsers.base_parser import extract_str_field, validate_str_field 

11from lintro.parsers.osv_scanner.osv_scanner_issue import OsvScannerIssue 

12 

13# Severity ranking for selecting the highest severity from a vulnerability's 

14# database entries. These raw strings are later normalized to SeverityLevel 

15# (ERROR/WARNING/INFO) by BaseIssue.get_severity() via _SEVERITY_ALIASES. 

16# We need the finer-grained ranking here because the canonical enum collapses 

17# CRITICAL and HIGH into the same ERROR level. 

18_SEVERITY_RANK: dict[str, int] = { 

19 "CRITICAL": 4, 

20 "HIGH": 3, 

21 "MEDIUM": 2, 

22 "LOW": 1, 

23} 

24 

25 

26def _highest_severity(group: dict[str, Any]) -> str: 

27 """Extract the severity from a vulnerability group. 

28 

29 OSV-Scanner v2 groups vulnerabilities and may include CVSS severity 

30 in the group's max_severity field. 

31 

32 Args: 

33 group: A single group dictionary from OSV-Scanner output. 

34 

35 Returns: 

36 Severity string from the group, defaults to "MEDIUM". 

37 """ 

38 max_sev = group.get("max_severity") 

39 if isinstance(max_sev, str): 

40 sev_upper = max_sev.upper() 

41 if sev_upper in _SEVERITY_RANK: 

42 return sev_upper 

43 return "MEDIUM" 

44 

45 

46def _extract_fixed_version( 

47 vuln_detail: dict[str, Any], 

48 package_name: str, 

49 package_ecosystem: str, 

50) -> str: 

51 """Extract the fixed version from a vulnerability's affected data. 

52 

53 Args: 

54 vuln_detail: The full vulnerability object from OSV database. 

55 package_name: Package name to match. 

56 package_ecosystem: Ecosystem to match. 

57 

58 Returns: 

59 Fixed version string, or empty string if not found. 

60 """ 

61 affected = vuln_detail.get("affected", []) 

62 if not isinstance(affected, list): 

63 return "" 

64 

65 for entry in affected: 

66 if not isinstance(entry, dict): 

67 continue 

68 pkg = entry.get("package", {}) 

69 if not isinstance(pkg, dict): 

70 continue 

71 if pkg.get("name") != package_name: 

72 continue 

73 if pkg.get("ecosystem", "").upper() != package_ecosystem.upper(): 

74 continue 

75 ranges = entry.get("ranges", []) 

76 if not isinstance(ranges, list): 

77 continue 

78 for r in ranges: 

79 if not isinstance(r, dict): 

80 continue 

81 events = r.get("events", []) 

82 if not isinstance(events, list): 

83 continue 

84 for event in events: 

85 if isinstance(event, dict) and "fixed" in event: 

86 return str(event["fixed"]) 

87 return "" 

88 

89 

90def _parse_single_result(result: dict[str, Any]) -> list[OsvScannerIssue]: 

91 """Parse a single OSV-Scanner result into issues. 

92 

93 Each result corresponds to a package source (lockfile) and may contain 

94 multiple vulnerability groups, each with multiple vulnerability IDs. 

95 

96 Args: 

97 result: Dictionary containing a single OSV-Scanner result. 

98 

99 Returns: 

100 List of OsvScannerIssue objects parsed from this result. 

101 """ 

102 source = result.get("source", {}) 

103 if not isinstance(source, dict): 

104 return [] 

105 source_path = extract_str_field( 

106 data=source, 

107 candidates=["path"], 

108 default="lockfile", 

109 ) 

110 

111 packages = result.get("packages", []) 

112 if not isinstance(packages, list): 

113 return [] 

114 

115 issues: list[OsvScannerIssue] = [] 

116 

117 for pkg_entry in packages: 

118 if not isinstance(pkg_entry, dict): 

119 continue 

120 

121 package = pkg_entry.get("package", {}) 

122 if not isinstance(package, dict): 

123 continue 

124 

125 pkg_name = validate_str_field( 

126 package.get("name"), 

127 "package_name", 

128 log_warning=True, 

129 ) 

130 if not pkg_name: 

131 continue 

132 

133 pkg_version = extract_str_field( 

134 data=package, 

135 candidates=["version"], 

136 default="", 

137 ) 

138 pkg_ecosystem = extract_str_field( 

139 data=package, 

140 candidates=["ecosystem"], 

141 default="", 

142 ) 

143 

144 groups = pkg_entry.get("groups", []) 

145 if not isinstance(groups, list): 

146 groups = [] 

147 

148 vulnerabilities = pkg_entry.get("vulnerabilities", []) 

149 if not isinstance(vulnerabilities, list): 

150 vulnerabilities = [] 

151 

152 # Build a lookup for vulnerability details 

153 vuln_details: dict[str, dict[str, Any]] = {} 

154 for v in vulnerabilities: 

155 if isinstance(v, dict) and "id" in v: 

156 vuln_details[v["id"]] = v 

157 

158 # Each group represents a set of related vulnerability IDs 

159 for group in groups: 

160 if not isinstance(group, dict): 

161 continue 

162 

163 vuln_ids = group.get("ids", []) 

164 if not isinstance(vuln_ids, list) or not vuln_ids: 

165 continue 

166 

167 # Use the first ID as the primary 

168 primary_id = str(vuln_ids[0]) 

169 severity = _highest_severity(group) 

170 

171 # Try all IDs in the group to find vulnerability details — 

172 # the primary ID may not be in the vulnerabilities array 

173 # (e.g. a CVE alias when only the GHSA entry has details). 

174 detail: dict[str, Any] = {} 

175 for vid in vuln_ids: 

176 detail = vuln_details.get(str(vid), {}) 

177 if detail: 

178 break 

179 fixed = _extract_fixed_version(detail, pkg_name, pkg_ecosystem) 

180 

181 issues.append( 

182 OsvScannerIssue( 

183 file=source_path, 

184 line=0, 

185 column=0, 

186 message="", # __post_init__ builds the message 

187 vuln_id=primary_id, 

188 severity=severity, 

189 package_name=pkg_name, 

190 package_version=pkg_version, 

191 package_ecosystem=pkg_ecosystem, 

192 fixed_version=fixed, 

193 ), 

194 ) 

195 

196 return issues 

197 

198 

199def parse_osv_scanner_output(output: str | None) -> list[OsvScannerIssue]: 

200 """Parse OSV-Scanner JSON output into OsvScannerIssue objects. 

201 

202 Args: 

203 output: JSON string from OSV-Scanner output, or None. 

204 

205 Returns: 

206 List of parsed vulnerability issues. Returns empty list for 

207 None, empty string, invalid JSON, or unexpected data structure. 

208 """ 

209 if output is None or not output.strip(): 

210 return [] 

211 

212 try: 

213 # Use raw_decode to ignore trailing stderr text that 

214 # _run_subprocess appends after the JSON stdout. 

215 decoder = json.JSONDecoder() 

216 data, _ = decoder.raw_decode(output.lstrip()) 

217 except (json.JSONDecodeError, ValueError) as e: 

218 logger.warning("Failed to parse OSV-Scanner JSON output: {}", e) 

219 return [] 

220 

221 if not isinstance(data, dict): 

222 logger.warning( 

223 "OSV-Scanner output must be a JSON object, got {}", 

224 type(data).__name__, 

225 ) 

226 return [] 

227 

228 results = data.get("results", []) 

229 if not isinstance(results, list): 

230 logger.warning( 

231 "OSV-Scanner results must be a list, got {}", 

232 type(results).__name__, 

233 ) 

234 return [] 

235 

236 issues: list[OsvScannerIssue] = [] 

237 

238 for result in results: 

239 if not isinstance(result, dict): 

240 logger.debug("Skipping non-dict item in OSV-Scanner results") 

241 continue 

242 

243 try: 

244 result_issues = _parse_single_result(result=result) 

245 issues.extend(result_issues) 

246 except (KeyError, TypeError, ValueError) as e: 

247 logger.warning("Failed to parse OSV-Scanner result: {}", e) 

248 continue 

249 

250 return issues