Coverage for lintro / tools / definitions / cargo_audit.py: 97%

66 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Cargo-audit tool definition. 

2 

3Cargo-audit is a security tool for Rust projects that scans Cargo.lock 

4for dependencies with known security vulnerabilities from the RustSec 

5advisory database. 

6""" 

7 

8from __future__ import annotations 

9 

10import subprocess # nosec B404 - used safely with shell disabled 

11from dataclasses import dataclass 

12from pathlib import Path 

13from typing import Any 

14 

15from lintro._tool_versions import get_min_version 

16from lintro.enums.doc_url_template import DocUrlTemplate 

17from lintro.enums.tool_name import ToolName 

18from lintro.enums.tool_type import ToolType 

19from lintro.models.core.tool_result import ToolResult 

20from lintro.parsers.cargo_audit.cargo_audit_parser import parse_cargo_audit_output 

21from lintro.plugins.base import BaseToolPlugin 

22from lintro.plugins.protocol import ToolDefinition 

23from lintro.plugins.registry import register_tool 

24 

25# Constants for cargo-audit configuration 

26CARGO_AUDIT_DEFAULT_TIMEOUT: int = 120 # Network operations can be slow 

27CARGO_AUDIT_DEFAULT_PRIORITY: int = 95 # Security scans run late 

28CARGO_AUDIT_FILE_PATTERNS: list[str] = ["Cargo.lock"] 

29 

30 

31def _find_cargo_root(paths: list[str]) -> Path | None: 

32 """Return the nearest directory containing Cargo.lock for given paths. 

33 

34 Args: 

35 paths: List of file paths to search from. 

36 

37 Returns: 

38 Path to directory containing Cargo.lock, or None if not found. 

39 """ 

40 for raw_path in paths: 

41 current = Path(raw_path).resolve() 

42 # If it's a file, start from its parent 

43 if current.is_file(): 

44 current = current.parent 

45 # Search upward for Cargo.lock 

46 for candidate in [current, *list(current.parents)]: 

47 lock_file = candidate / "Cargo.lock" 

48 if lock_file.exists(): 

49 return candidate 

50 

51 return None 

52 

53 

54@register_tool 

55@dataclass 

56class CargoAuditPlugin(BaseToolPlugin): 

57 """Cargo-audit plugin for Lintro. 

58 

59 Provides security vulnerability scanning for Rust dependencies using 

60 cargo-audit to check against the RustSec advisory database. 

61 """ 

62 

63 @property 

64 def definition(self) -> ToolDefinition: 

65 """Return the tool definition. 

66 

67 Returns: 

68 ToolDefinition with cargo-audit configuration. 

69 """ 

70 return ToolDefinition( 

71 name="cargo_audit", 

72 description="Security vulnerability scanner for Rust dependencies", 

73 can_fix=False, 

74 tool_type=ToolType.SECURITY, 

75 file_patterns=CARGO_AUDIT_FILE_PATTERNS, 

76 priority=CARGO_AUDIT_DEFAULT_PRIORITY, 

77 conflicts_with=[], 

78 native_configs=[".cargo/audit.toml"], 

79 version_command=["cargo", "audit", "--version"], 

80 min_version=get_min_version(ToolName.CARGO_AUDIT), 

81 default_options={ 

82 "timeout": CARGO_AUDIT_DEFAULT_TIMEOUT, 

83 }, 

84 default_timeout=CARGO_AUDIT_DEFAULT_TIMEOUT, 

85 ) 

86 

87 def set_options(self, **kwargs: Any) -> None: 

88 """Set tool-specific options. 

89 

90 Args: 

91 **kwargs: Options to set, including timeout. 

92 

93 Raises: 

94 ValueError: If timeout is negative or not a number. 

95 """ 

96 if "timeout" in kwargs: 

97 timeout = kwargs["timeout"] 

98 if timeout is not None: 

99 if isinstance(timeout, bool) or not isinstance(timeout, (int, float)): 

100 raise ValueError("timeout must be a number") 

101 if timeout < 0: 

102 raise ValueError("timeout must be non-negative") 

103 super().set_options(**kwargs) 

104 

105 def _build_command(self) -> list[str]: 

106 """Build the cargo-audit command. 

107 

108 Returns: 

109 Command list for running cargo-audit with JSON output. 

110 """ 

111 return ["cargo", "audit", "--json"] 

112 

113 def doc_url(self, code: str) -> str | None: 

114 """Return RustSec advisory URL for the given advisory ID. 

115 

116 Args: 

117 code: RUSTSEC advisory ID (e.g., "RUSTSEC-2021-0124"). 

118 

119 Returns: 

120 URL to the RustSec advisory page, or None if code is empty. 

121 """ 

122 if not code: 

123 return None 

124 return DocUrlTemplate.CARGO_AUDIT.format(code=code) 

125 

126 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

127 """Check Rust dependencies for security vulnerabilities. 

128 

129 Args: 

130 paths: List of paths to check. 

131 options: Additional options for the check. 

132 

133 Returns: 

134 ToolResult with security scan results. 

135 """ 

136 ctx = self._prepare_execution(paths, options) 

137 if ctx.should_skip: 

138 # early_result is guaranteed non-None when should_skip is True 

139 return ctx.early_result # type: ignore[return-value] 

140 

141 # Find Cargo.lock root from filtered file list 

142 cargo_root = _find_cargo_root(ctx.files) 

143 if cargo_root is None: 

144 return ToolResult( 

145 name=self.definition.name, 

146 success=True, 

147 output="No Cargo.lock found; skipping cargo-audit.", 

148 issues_count=0, 

149 ) 

150 

151 cmd = self._build_command() 

152 try: 

153 success, output = self._run_subprocess( 

154 cmd, 

155 timeout=ctx.timeout, 

156 cwd=str(cargo_root), 

157 ) 

158 except subprocess.TimeoutExpired: 

159 return ToolResult( 

160 name=self.definition.name, 

161 success=False, 

162 output=f"cargo-audit timed out after {ctx.timeout}s", 

163 issues_count=0, 

164 ) 

165 

166 issues = parse_cargo_audit_output(output) 

167 

168 # Determine overall success: subprocess must succeed AND no issues found. 

169 # cargo-audit returns non-zero if vulnerabilities found, but also if 

170 # execution fails. If subprocess failed with no issues parsed, it's 

171 # an execution error (not a clean scan). 

172 overall_success = success and len(issues) == 0 

173 

174 # Show output when there are issues OR when subprocess failed without 

175 # issues (execution error case) 

176 should_show_output = bool(issues) or not success 

177 

178 return ToolResult( 

179 name=self.definition.name, 

180 success=overall_success, 

181 output=output if should_show_output else None, 

182 issues_count=len(issues), 

183 issues=issues if issues else None, 

184 ) 

185 

186 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

187 """Cargo-audit cannot automatically fix vulnerabilities. 

188 

189 Args: 

190 paths: List of paths (unused). 

191 options: Additional options (unused). 

192 

193 Returns: 

194 ToolResult: Never returns, always raises NotImplementedError. 

195 

196 Raises: 

197 NotImplementedError: Always, as cargo-audit cannot auto-fix. 

198 """ 

199 raise NotImplementedError( 

200 "cargo-audit cannot automatically fix vulnerabilities. " 

201 "Update dependencies manually using `cargo update`.", 

202 )