Coverage for lintro / parsers / cargo_audit / cargo_audit_parser.py: 84%
70 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Parser for cargo-audit JSON output."""
3from __future__ import annotations
5import json
6from typing import Any
8from loguru import logger
10from lintro.parsers.base_parser import validate_str_field
11from lintro.parsers.cargo_audit.cargo_audit_issue import CargoAuditIssue
14def _extract_cargo_audit_json(raw_text: str) -> dict[str, Any]:
15 """Extract cargo-audit's JSON object from output text.
17 This function finds JSON by locating the first '{' and last '}' in the output.
18 This approach works because cargo-audit outputs a single top-level JSON object.
19 It would not work for tools that output multiple JSON objects or have nested
20 structures where the last '}' doesn't correspond to the opening '{'.
22 Args:
23 raw_text: Raw stdout/stderr text from cargo-audit.
25 Returns:
26 dict[str, Any]: Parsed JSON object.
28 Raises:
29 json.JSONDecodeError: If JSON cannot be parsed.
30 ValueError: If no JSON object boundaries are found.
31 """
32 if not raw_text or not raw_text.strip():
33 raise json.JSONDecodeError("Empty output", raw_text or "", 0)
35 text: str = raw_text.strip()
37 # Quick path: if the entire text is JSON
38 if text.startswith("{") and text.endswith("}"):
39 result: dict[str, Any] = json.loads(text)
40 return result
42 # Fallback: find JSON boundaries. This works because cargo-audit outputs
43 # a single JSON object, so the first '{' and last '}' delimit it correctly.
44 start: int = text.find("{")
45 end: int = text.rfind("}")
46 if start == -1 or end == -1 or end < start:
47 raise ValueError("Could not locate JSON object in cargo-audit output")
49 json_str: str = text[start : end + 1]
50 parsed: dict[str, Any] = json.loads(json_str)
51 return parsed
54def _normalize_severity(severity: str | None) -> str:
55 """Normalize severity level to uppercase standard format.
57 Args:
58 severity: Raw severity string from cargo-audit.
60 Returns:
61 Normalized severity string (UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL).
62 """
63 if not severity:
64 return "UNKNOWN"
66 normalized = severity.upper().strip()
68 # Map common variations (including RustSec's "none" severity level)
69 severity_map = {
70 "LOW": "LOW",
71 "MEDIUM": "MEDIUM",
72 "HIGH": "HIGH",
73 "CRITICAL": "CRITICAL",
74 "INFO": "LOW",
75 "INFORMATIONAL": "LOW",
76 "MODERATE": "MEDIUM",
77 "SEVERE": "HIGH",
78 "NONE": "LOW",
79 }
81 return severity_map.get(normalized, "UNKNOWN")
84def parse_cargo_audit_output(
85 output: str | None,
86) -> list[CargoAuditIssue]:
87 """Parse cargo-audit JSON output into CargoAuditIssue objects.
89 Args:
90 output: Raw JSON output from cargo-audit --json command.
92 Returns:
93 List of parsed security vulnerability issues.
94 """
95 if not output:
96 return []
98 try:
99 data = _extract_cargo_audit_json(output)
100 except (json.JSONDecodeError, ValueError) as e:
101 logger.warning(f"Failed to parse cargo-audit JSON output: {e}")
102 return []
104 if not isinstance(data, dict):
105 logger.warning("cargo-audit output is not a dictionary")
106 return []
108 # Extract vulnerabilities from the nested structure
109 vulnerabilities = data.get("vulnerabilities", {})
110 if not isinstance(vulnerabilities, dict):
111 return []
113 vuln_list = vulnerabilities.get("list", [])
114 if not isinstance(vuln_list, list):
115 return []
117 issues: list[CargoAuditIssue] = []
119 for vuln in vuln_list:
120 if not isinstance(vuln, dict):
121 continue
123 try:
124 # Extract advisory information
125 advisory = vuln.get("advisory", {})
126 if not isinstance(advisory, dict):
127 continue
129 advisory_id = validate_str_field(
130 advisory.get("id"),
131 "advisory_id",
132 log_warning=True,
133 )
135 # Skip if no advisory ID
136 if not advisory_id:
137 logger.warning("Skipping vulnerability with empty advisory ID")
138 continue
140 title = validate_str_field(advisory.get("title"), "title")
141 description = validate_str_field(advisory.get("description"), "description")
142 severity = _normalize_severity(advisory.get("severity"))
143 url = validate_str_field(advisory.get("url"), "url")
145 # Extract package information
146 package = vuln.get("package", {})
147 if not isinstance(package, dict):
148 package = {}
150 package_name = validate_str_field(package.get("name"), "package_name")
151 package_version = validate_str_field(
152 package.get("version"),
153 "package_version",
154 )
156 issue = CargoAuditIssue(
157 file="Cargo.lock",
158 line=0, # cargo-audit doesn't provide line numbers
159 column=0,
160 advisory_id=advisory_id,
161 package_name=package_name,
162 package_version=package_version,
163 severity=severity,
164 title=title,
165 description=description,
166 url=url,
167 )
168 issues.append(issue)
170 except (KeyError, TypeError, ValueError) as e:
171 logger.warning(f"Failed to parse cargo-audit vulnerability: {e}")
172 continue
174 return issues