Coverage for lintro / parsers / cargo_audit / cargo_audit_parser.py: 84%

70 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Parser for cargo-audit JSON output.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from typing import Any 

7 

8from loguru import logger 

9 

10from lintro.parsers.base_parser import validate_str_field 

11from lintro.parsers.cargo_audit.cargo_audit_issue import CargoAuditIssue 

12 

13 

14def _extract_cargo_audit_json(raw_text: str) -> dict[str, Any]: 

15 """Extract cargo-audit's JSON object from output text. 

16 

17 This function finds JSON by locating the first '{' and last '}' in the output. 

18 This approach works because cargo-audit outputs a single top-level JSON object. 

19 It would not work for tools that output multiple JSON objects or have nested 

20 structures where the last '}' doesn't correspond to the opening '{'. 

21 

22 Args: 

23 raw_text: Raw stdout/stderr text from cargo-audit. 

24 

25 Returns: 

26 dict[str, Any]: Parsed JSON object. 

27 

28 Raises: 

29 json.JSONDecodeError: If JSON cannot be parsed. 

30 ValueError: If no JSON object boundaries are found. 

31 """ 

32 if not raw_text or not raw_text.strip(): 

33 raise json.JSONDecodeError("Empty output", raw_text or "", 0) 

34 

35 text: str = raw_text.strip() 

36 

37 # Quick path: if the entire text is JSON 

38 if text.startswith("{") and text.endswith("}"): 

39 result: dict[str, Any] = json.loads(text) 

40 return result 

41 

42 # Fallback: find JSON boundaries. This works because cargo-audit outputs 

43 # a single JSON object, so the first '{' and last '}' delimit it correctly. 

44 start: int = text.find("{") 

45 end: int = text.rfind("}") 

46 if start == -1 or end == -1 or end < start: 

47 raise ValueError("Could not locate JSON object in cargo-audit output") 

48 

49 json_str: str = text[start : end + 1] 

50 parsed: dict[str, Any] = json.loads(json_str) 

51 return parsed 

52 

53 

54def _normalize_severity(severity: str | None) -> str: 

55 """Normalize severity level to uppercase standard format. 

56 

57 Args: 

58 severity: Raw severity string from cargo-audit. 

59 

60 Returns: 

61 Normalized severity string (UNKNOWN, LOW, MEDIUM, HIGH, CRITICAL). 

62 """ 

63 if not severity: 

64 return "UNKNOWN" 

65 

66 normalized = severity.upper().strip() 

67 

68 # Map common variations (including RustSec's "none" severity level) 

69 severity_map = { 

70 "LOW": "LOW", 

71 "MEDIUM": "MEDIUM", 

72 "HIGH": "HIGH", 

73 "CRITICAL": "CRITICAL", 

74 "INFO": "LOW", 

75 "INFORMATIONAL": "LOW", 

76 "MODERATE": "MEDIUM", 

77 "SEVERE": "HIGH", 

78 "NONE": "LOW", 

79 } 

80 

81 return severity_map.get(normalized, "UNKNOWN") 

82 

83 

84def parse_cargo_audit_output( 

85 output: str | None, 

86) -> list[CargoAuditIssue]: 

87 """Parse cargo-audit JSON output into CargoAuditIssue objects. 

88 

89 Args: 

90 output: Raw JSON output from cargo-audit --json command. 

91 

92 Returns: 

93 List of parsed security vulnerability issues. 

94 """ 

95 if not output: 

96 return [] 

97 

98 try: 

99 data = _extract_cargo_audit_json(output) 

100 except (json.JSONDecodeError, ValueError) as e: 

101 logger.warning(f"Failed to parse cargo-audit JSON output: {e}") 

102 return [] 

103 

104 if not isinstance(data, dict): 

105 logger.warning("cargo-audit output is not a dictionary") 

106 return [] 

107 

108 # Extract vulnerabilities from the nested structure 

109 vulnerabilities = data.get("vulnerabilities", {}) 

110 if not isinstance(vulnerabilities, dict): 

111 return [] 

112 

113 vuln_list = vulnerabilities.get("list", []) 

114 if not isinstance(vuln_list, list): 

115 return [] 

116 

117 issues: list[CargoAuditIssue] = [] 

118 

119 for vuln in vuln_list: 

120 if not isinstance(vuln, dict): 

121 continue 

122 

123 try: 

124 # Extract advisory information 

125 advisory = vuln.get("advisory", {}) 

126 if not isinstance(advisory, dict): 

127 continue 

128 

129 advisory_id = validate_str_field( 

130 advisory.get("id"), 

131 "advisory_id", 

132 log_warning=True, 

133 ) 

134 

135 # Skip if no advisory ID 

136 if not advisory_id: 

137 logger.warning("Skipping vulnerability with empty advisory ID") 

138 continue 

139 

140 title = validate_str_field(advisory.get("title"), "title") 

141 description = validate_str_field(advisory.get("description"), "description") 

142 severity = _normalize_severity(advisory.get("severity")) 

143 url = validate_str_field(advisory.get("url"), "url") 

144 

145 # Extract package information 

146 package = vuln.get("package", {}) 

147 if not isinstance(package, dict): 

148 package = {} 

149 

150 package_name = validate_str_field(package.get("name"), "package_name") 

151 package_version = validate_str_field( 

152 package.get("version"), 

153 "package_version", 

154 ) 

155 

156 issue = CargoAuditIssue( 

157 file="Cargo.lock", 

158 line=0, # cargo-audit doesn't provide line numbers 

159 column=0, 

160 advisory_id=advisory_id, 

161 package_name=package_name, 

162 package_version=package_version, 

163 severity=severity, 

164 title=title, 

165 description=description, 

166 url=url, 

167 ) 

168 issues.append(issue) 

169 

170 except (KeyError, TypeError, ValueError) as e: 

171 logger.warning(f"Failed to parse cargo-audit vulnerability: {e}") 

172 continue 

173 

174 return issues