Coverage for lintro / parsers / osv_scanner / osv_scanner_parser.py: 83%
115 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Parser for OSV-Scanner JSON output."""
3from __future__ import annotations
5import json
6from typing import Any
8from loguru import logger
10from lintro.parsers.base_parser import extract_str_field, validate_str_field
11from lintro.parsers.osv_scanner.osv_scanner_issue import OsvScannerIssue
13# Severity ranking for selecting the highest severity from a vulnerability's
14# database entries. These raw strings are later normalized to SeverityLevel
15# (ERROR/WARNING/INFO) by BaseIssue.get_severity() via _SEVERITY_ALIASES.
16# We need the finer-grained ranking here because the canonical enum collapses
17# CRITICAL and HIGH into the same ERROR level.
18_SEVERITY_RANK: dict[str, int] = {
19 "CRITICAL": 4,
20 "HIGH": 3,
21 "MEDIUM": 2,
22 "LOW": 1,
23}
26def _highest_severity(group: dict[str, Any]) -> str:
27 """Extract the severity from a vulnerability group.
29 OSV-Scanner v2 groups vulnerabilities and may include CVSS severity
30 in the group's max_severity field.
32 Args:
33 group: A single group dictionary from OSV-Scanner output.
35 Returns:
36 Severity string from the group, defaults to "MEDIUM".
37 """
38 max_sev = group.get("max_severity")
39 if isinstance(max_sev, str):
40 sev_upper = max_sev.upper()
41 if sev_upper in _SEVERITY_RANK:
42 return sev_upper
43 return "MEDIUM"
46def _extract_fixed_version(
47 vuln_detail: dict[str, Any],
48 package_name: str,
49 package_ecosystem: str,
50) -> str:
51 """Extract the fixed version from a vulnerability's affected data.
53 Args:
54 vuln_detail: The full vulnerability object from OSV database.
55 package_name: Package name to match.
56 package_ecosystem: Ecosystem to match.
58 Returns:
59 Fixed version string, or empty string if not found.
60 """
61 affected = vuln_detail.get("affected", [])
62 if not isinstance(affected, list):
63 return ""
65 for entry in affected:
66 if not isinstance(entry, dict):
67 continue
68 pkg = entry.get("package", {})
69 if not isinstance(pkg, dict):
70 continue
71 if pkg.get("name") != package_name:
72 continue
73 if pkg.get("ecosystem", "").upper() != package_ecosystem.upper():
74 continue
75 ranges = entry.get("ranges", [])
76 if not isinstance(ranges, list):
77 continue
78 for r in ranges:
79 if not isinstance(r, dict):
80 continue
81 events = r.get("events", [])
82 if not isinstance(events, list):
83 continue
84 for event in events:
85 if isinstance(event, dict) and "fixed" in event:
86 return str(event["fixed"])
87 return ""
90def _parse_single_result(result: dict[str, Any]) -> list[OsvScannerIssue]:
91 """Parse a single OSV-Scanner result into issues.
93 Each result corresponds to a package source (lockfile) and may contain
94 multiple vulnerability groups, each with multiple vulnerability IDs.
96 Args:
97 result: Dictionary containing a single OSV-Scanner result.
99 Returns:
100 List of OsvScannerIssue objects parsed from this result.
101 """
102 source = result.get("source", {})
103 if not isinstance(source, dict):
104 return []
105 source_path = extract_str_field(
106 data=source,
107 candidates=["path"],
108 default="lockfile",
109 )
111 packages = result.get("packages", [])
112 if not isinstance(packages, list):
113 return []
115 issues: list[OsvScannerIssue] = []
117 for pkg_entry in packages:
118 if not isinstance(pkg_entry, dict):
119 continue
121 package = pkg_entry.get("package", {})
122 if not isinstance(package, dict):
123 continue
125 pkg_name = validate_str_field(
126 package.get("name"),
127 "package_name",
128 log_warning=True,
129 )
130 if not pkg_name:
131 continue
133 pkg_version = extract_str_field(
134 data=package,
135 candidates=["version"],
136 default="",
137 )
138 pkg_ecosystem = extract_str_field(
139 data=package,
140 candidates=["ecosystem"],
141 default="",
142 )
144 groups = pkg_entry.get("groups", [])
145 if not isinstance(groups, list):
146 groups = []
148 vulnerabilities = pkg_entry.get("vulnerabilities", [])
149 if not isinstance(vulnerabilities, list):
150 vulnerabilities = []
152 # Build a lookup for vulnerability details
153 vuln_details: dict[str, dict[str, Any]] = {}
154 for v in vulnerabilities:
155 if isinstance(v, dict) and "id" in v:
156 vuln_details[v["id"]] = v
158 # Each group represents a set of related vulnerability IDs
159 for group in groups:
160 if not isinstance(group, dict):
161 continue
163 vuln_ids = group.get("ids", [])
164 if not isinstance(vuln_ids, list) or not vuln_ids:
165 continue
167 # Use the first ID as the primary
168 primary_id = str(vuln_ids[0])
169 severity = _highest_severity(group)
171 # Try all IDs in the group to find vulnerability details —
172 # the primary ID may not be in the vulnerabilities array
173 # (e.g. a CVE alias when only the GHSA entry has details).
174 detail: dict[str, Any] = {}
175 for vid in vuln_ids:
176 detail = vuln_details.get(str(vid), {})
177 if detail:
178 break
179 fixed = _extract_fixed_version(detail, pkg_name, pkg_ecosystem)
181 issues.append(
182 OsvScannerIssue(
183 file=source_path,
184 line=0,
185 column=0,
186 message="", # __post_init__ builds the message
187 vuln_id=primary_id,
188 severity=severity,
189 package_name=pkg_name,
190 package_version=pkg_version,
191 package_ecosystem=pkg_ecosystem,
192 fixed_version=fixed,
193 ),
194 )
196 return issues
199def parse_osv_scanner_output(output: str | None) -> list[OsvScannerIssue]:
200 """Parse OSV-Scanner JSON output into OsvScannerIssue objects.
202 Args:
203 output: JSON string from OSV-Scanner output, or None.
205 Returns:
206 List of parsed vulnerability issues. Returns empty list for
207 None, empty string, invalid JSON, or unexpected data structure.
208 """
209 if output is None or not output.strip():
210 return []
212 try:
213 # Use raw_decode to ignore trailing stderr text that
214 # _run_subprocess appends after the JSON stdout.
215 decoder = json.JSONDecoder()
216 data, _ = decoder.raw_decode(output.lstrip())
217 except (json.JSONDecodeError, ValueError) as e:
218 logger.warning("Failed to parse OSV-Scanner JSON output: {}", e)
219 return []
221 if not isinstance(data, dict):
222 logger.warning(
223 "OSV-Scanner output must be a JSON object, got {}",
224 type(data).__name__,
225 )
226 return []
228 results = data.get("results", [])
229 if not isinstance(results, list):
230 logger.warning(
231 "OSV-Scanner results must be a list, got {}",
232 type(results).__name__,
233 )
234 return []
236 issues: list[OsvScannerIssue] = []
238 for result in results:
239 if not isinstance(result, dict):
240 logger.debug("Skipping non-dict item in OSV-Scanner results")
241 continue
243 try:
244 result_issues = _parse_single_result(result=result)
245 issues.extend(result_issues)
246 except (KeyError, TypeError, ValueError) as e:
247 logger.warning("Failed to parse OSV-Scanner result: {}", e)
248 continue
250 return issues