Coverage for lintro / parsers / cargo_deny / cargo_deny_parser.py: 82%
87 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Parser for cargo-deny JSON output."""
3from __future__ import annotations
5import json
6from typing import Any
8from loguru import logger
10from lintro.parsers.cargo_deny.cargo_deny_issue import CargoDenyIssue
13def _extract_crate_info(labels: list[dict[str, Any]]) -> tuple[str | None, str | None]:
14 """Extract crate name and version from diagnostic labels.
16 Args:
17 labels: List of label objects from the diagnostic.
19 Returns:
20 Tuple of (crate_name, crate_version).
21 """
22 for label in labels:
23 message = label.get("message", "")
24 if isinstance(message, str) and message.startswith("crate "):
25 # Format: "crate foo" or "crate foo@1.0.0"
26 crate_info = message[6:].strip() # Remove "crate " prefix
27 if not crate_info:
28 return None, None
29 if "@" in crate_info:
30 name, version = crate_info.split("@", 1)
31 return name.strip() or None, version.strip() or None
32 return crate_info, None
33 return None, None
36def _parse_diagnostic(item: dict[str, Any]) -> CargoDenyIssue | None:
37 """Parse a diagnostic message from cargo-deny output.
39 Args:
40 item: A diagnostic JSON object.
42 Returns:
43 CargoDenyIssue or None if parsing fails.
44 """
45 try:
46 fields = item.get("fields", {})
47 if not isinstance(fields, dict):
48 return None
50 severity = fields.get("severity", "")
51 code = fields.get("code", "")
52 message = fields.get("message", "")
53 labels = fields.get("labels", [])
55 if not isinstance(severity, str) or not severity:
56 return None
58 # Normalize severity to lowercase
59 severity = severity.lower()
61 # Extract crate info from labels
62 crate_name, crate_version = None, None
63 if isinstance(labels, list):
64 crate_name, crate_version = _extract_crate_info(labels)
66 return CargoDenyIssue(
67 file="Cargo.toml", # cargo-deny operates at project level
68 line=0, # No line information available
69 column=0,
70 code=str(code) if code else "",
71 severity=severity,
72 message=str(message) if message else "",
73 crate_name=crate_name,
74 crate_version=crate_version,
75 )
76 except (KeyError, TypeError, ValueError) as e:
77 logger.debug(f"Failed to parse cargo-deny diagnostic: {e}")
78 return None
81def _parse_advisory(item: dict[str, Any]) -> CargoDenyIssue | None:
82 """Parse an advisory message from cargo-deny output.
84 Args:
85 item: An advisory JSON object.
87 Returns:
88 CargoDenyIssue or None if parsing fails.
89 """
90 try:
91 fields = item.get("fields", {})
92 if not isinstance(fields, dict):
93 return None
95 advisory = fields.get("advisory", {})
96 if not isinstance(advisory, dict):
97 return None
99 advisory_id = advisory.get("id", "")
100 advisory_severity = advisory.get("severity", "")
102 # Extract patched versions
103 versions = fields.get("versions", {})
104 patched = None
105 if isinstance(versions, dict):
106 patched_list = versions.get("patched", [])
107 if isinstance(patched_list, list):
108 patched = [str(v) for v in patched_list if v]
110 # Get crate information from the package field
111 package = fields.get("package", {})
112 crate_name = None
113 crate_version = None
114 if isinstance(package, dict):
115 crate_name = package.get("name")
116 crate_version = package.get("version")
118 return CargoDenyIssue(
119 file="Cargo.toml",
120 line=0,
121 column=0,
122 code=str(advisory_id) if advisory_id else "ADVISORY",
123 severity="error", # Advisories are treated as errors
124 crate_name=str(crate_name) if crate_name else None,
125 crate_version=str(crate_version) if crate_version else None,
126 advisory_id=str(advisory_id) if advisory_id else None,
127 advisory_severity=str(advisory_severity) if advisory_severity else None,
128 patched_versions=patched,
129 )
130 except (KeyError, TypeError, ValueError) as e:
131 logger.debug(f"Failed to parse cargo-deny advisory: {e}")
132 return None
135def parse_cargo_deny_output(output: str) -> list[CargoDenyIssue]:
136 """Parse cargo-deny JSON Lines output into CargoDenyIssue objects.
138 cargo-deny outputs JSON Lines format (one JSON object per line) when
139 using --format json. Each line can be:
140 - A diagnostic message with type "diagnostic"
141 - An advisory message with type "advisory"
143 Args:
144 output: Raw stdout emitted by cargo deny check --format json.
146 Returns:
147 A list of CargoDenyIssue instances. Returns an empty list when
148 no issues are found or output cannot be parsed.
149 """
150 if not output or not output.strip():
151 return []
153 issues: list[CargoDenyIssue] = []
155 for line in output.splitlines():
156 line = line.strip()
157 if not line or not line.startswith("{"):
158 continue
160 try:
161 data = json.loads(line)
162 if not isinstance(data, dict):
163 continue
165 item_type = data.get("type", "")
167 if item_type == "diagnostic":
168 parsed = _parse_diagnostic(data)
169 if parsed is not None:
170 issues.append(parsed)
171 elif item_type == "advisory":
172 parsed = _parse_advisory(data)
173 if parsed is not None:
174 issues.append(parsed)
175 # Ignore other types like "summary", "build", etc.
177 except json.JSONDecodeError as exc:
178 logger.debug(f"Failed to decode cargo-deny JSON line: {exc}: {line}")
179 continue
181 return issues