Coverage for lintro / parsers / cargo_deny / cargo_deny_parser.py: 82%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Parser for cargo-deny JSON output.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from typing import Any 

7 

8from loguru import logger 

9 

10from lintro.parsers.cargo_deny.cargo_deny_issue import CargoDenyIssue 

11 

12 

13def _extract_crate_info(labels: list[dict[str, Any]]) -> tuple[str | None, str | None]: 

14 """Extract crate name and version from diagnostic labels. 

15 

16 Args: 

17 labels: List of label objects from the diagnostic. 

18 

19 Returns: 

20 Tuple of (crate_name, crate_version). 

21 """ 

22 for label in labels: 

23 message = label.get("message", "") 

24 if isinstance(message, str) and message.startswith("crate "): 

25 # Format: "crate foo" or "crate foo@1.0.0" 

26 crate_info = message[6:].strip() # Remove "crate " prefix 

27 if not crate_info: 

28 return None, None 

29 if "@" in crate_info: 

30 name, version = crate_info.split("@", 1) 

31 return name.strip() or None, version.strip() or None 

32 return crate_info, None 

33 return None, None 

34 

35 

36def _parse_diagnostic(item: dict[str, Any]) -> CargoDenyIssue | None: 

37 """Parse a diagnostic message from cargo-deny output. 

38 

39 Args: 

40 item: A diagnostic JSON object. 

41 

42 Returns: 

43 CargoDenyIssue or None if parsing fails. 

44 """ 

45 try: 

46 fields = item.get("fields", {}) 

47 if not isinstance(fields, dict): 

48 return None 

49 

50 severity = fields.get("severity", "") 

51 code = fields.get("code", "") 

52 message = fields.get("message", "") 

53 labels = fields.get("labels", []) 

54 

55 if not isinstance(severity, str) or not severity: 

56 return None 

57 

58 # Normalize severity to lowercase 

59 severity = severity.lower() 

60 

61 # Extract crate info from labels 

62 crate_name, crate_version = None, None 

63 if isinstance(labels, list): 

64 crate_name, crate_version = _extract_crate_info(labels) 

65 

66 return CargoDenyIssue( 

67 file="Cargo.toml", # cargo-deny operates at project level 

68 line=0, # No line information available 

69 column=0, 

70 code=str(code) if code else "", 

71 severity=severity, 

72 message=str(message) if message else "", 

73 crate_name=crate_name, 

74 crate_version=crate_version, 

75 ) 

76 except (KeyError, TypeError, ValueError) as e: 

77 logger.debug(f"Failed to parse cargo-deny diagnostic: {e}") 

78 return None 

79 

80 

81def _parse_advisory(item: dict[str, Any]) -> CargoDenyIssue | None: 

82 """Parse an advisory message from cargo-deny output. 

83 

84 Args: 

85 item: An advisory JSON object. 

86 

87 Returns: 

88 CargoDenyIssue or None if parsing fails. 

89 """ 

90 try: 

91 fields = item.get("fields", {}) 

92 if not isinstance(fields, dict): 

93 return None 

94 

95 advisory = fields.get("advisory", {}) 

96 if not isinstance(advisory, dict): 

97 return None 

98 

99 advisory_id = advisory.get("id", "") 

100 advisory_severity = advisory.get("severity", "") 

101 

102 # Extract patched versions 

103 versions = fields.get("versions", {}) 

104 patched = None 

105 if isinstance(versions, dict): 

106 patched_list = versions.get("patched", []) 

107 if isinstance(patched_list, list): 

108 patched = [str(v) for v in patched_list if v] 

109 

110 # Get crate information from the package field 

111 package = fields.get("package", {}) 

112 crate_name = None 

113 crate_version = None 

114 if isinstance(package, dict): 

115 crate_name = package.get("name") 

116 crate_version = package.get("version") 

117 

118 return CargoDenyIssue( 

119 file="Cargo.toml", 

120 line=0, 

121 column=0, 

122 code=str(advisory_id) if advisory_id else "ADVISORY", 

123 severity="error", # Advisories are treated as errors 

124 crate_name=str(crate_name) if crate_name else None, 

125 crate_version=str(crate_version) if crate_version else None, 

126 advisory_id=str(advisory_id) if advisory_id else None, 

127 advisory_severity=str(advisory_severity) if advisory_severity else None, 

128 patched_versions=patched, 

129 ) 

130 except (KeyError, TypeError, ValueError) as e: 

131 logger.debug(f"Failed to parse cargo-deny advisory: {e}") 

132 return None 

133 

134 

135def parse_cargo_deny_output(output: str) -> list[CargoDenyIssue]: 

136 """Parse cargo-deny JSON Lines output into CargoDenyIssue objects. 

137 

138 cargo-deny outputs JSON Lines format (one JSON object per line) when 

139 using --format json. Each line can be: 

140 - A diagnostic message with type "diagnostic" 

141 - An advisory message with type "advisory" 

142 

143 Args: 

144 output: Raw stdout emitted by cargo deny check --format json. 

145 

146 Returns: 

147 A list of CargoDenyIssue instances. Returns an empty list when 

148 no issues are found or output cannot be parsed. 

149 """ 

150 if not output or not output.strip(): 

151 return [] 

152 

153 issues: list[CargoDenyIssue] = [] 

154 

155 for line in output.splitlines(): 

156 line = line.strip() 

157 if not line or not line.startswith("{"): 

158 continue 

159 

160 try: 

161 data = json.loads(line) 

162 if not isinstance(data, dict): 

163 continue 

164 

165 item_type = data.get("type", "") 

166 

167 if item_type == "diagnostic": 

168 parsed = _parse_diagnostic(data) 

169 if parsed is not None: 

170 issues.append(parsed) 

171 elif item_type == "advisory": 

172 parsed = _parse_advisory(data) 

173 if parsed is not None: 

174 issues.append(parsed) 

175 # Ignore other types like "summary", "build", etc. 

176 

177 except json.JSONDecodeError as exc: 

178 logger.debug(f"Failed to decode cargo-deny JSON line: {exc}: {line}") 

179 continue 

180 

181 return issues