Coverage for lintro / tools / definitions / cargo_deny.py: 72%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""cargo-deny tool definition. 

2 

3cargo-deny is a Rust tool that checks licenses, advisories, bans, and duplicate 

4dependencies in Cargo projects. It requires a Cargo.toml file and optionally 

5uses deny.toml for configuration. 

6""" 

7 

8from __future__ import annotations 

9 

10import os 

11import subprocess # nosec B404 - used safely with shell disabled 

12from dataclasses import dataclass 

13from pathlib import Path 

14from typing import Any 

15 

16from lintro.enums.doc_url_template import DocUrlTemplate 

17from lintro.enums.tool_type import ToolType 

18from lintro.models.core.tool_result import ToolResult 

19from lintro.parsers.cargo_deny.cargo_deny_parser import parse_cargo_deny_output 

20from lintro.plugins.base import BaseToolPlugin 

21from lintro.plugins.protocol import ToolDefinition 

22from lintro.plugins.registry import register_tool 

23from lintro.tools.core.option_validators import ( 

24 filter_none_options, 

25 validate_positive_int, 

26) 

27from lintro.tools.core.timeout_utils import ( 

28 create_timeout_result, 

29 run_subprocess_with_timeout, 

30) 

31 

32# Constants for cargo-deny configuration 

33CARGO_DENY_DEFAULT_TIMEOUT: int = 60 

34CARGO_DENY_DEFAULT_PRIORITY: int = 90 # High priority for security tool 

35CARGO_DENY_FILE_PATTERNS: list[str] = ["Cargo.toml", "deny.toml"] 

36 

37 

38def _find_cargo_root(paths: list[str]) -> Path | None: 

39 """Return the nearest directory containing Cargo.toml for given paths. 

40 

41 Args: 

42 paths: List of file paths to search from. 

43 

44 Returns: 

45 Path to Cargo.toml directory, or None if not found. 

46 """ 

47 roots: list[Path] = [] 

48 for raw_path in paths: 

49 current = Path(raw_path).resolve() 

50 # If it's a file, start from its parent 

51 if current.is_file(): 

52 current = current.parent 

53 # Search upward for Cargo.toml 

54 for candidate in [current, *list(current.parents)]: 

55 manifest = candidate / "Cargo.toml" 

56 if manifest.exists(): 

57 roots.append(candidate) 

58 break 

59 

60 if not roots: 

61 return None 

62 

63 # Prefer a single root; if multiple, use common path when valid 

64 unique_roots = set(roots) 

65 if len(unique_roots) == 1: 

66 return roots[0] 

67 

68 try: 

69 common = Path(os.path.commonpath([str(r) for r in unique_roots])) 

70 except ValueError: 

71 return None 

72 

73 manifest = common / "Cargo.toml" 

74 return common if manifest.exists() else None 

75 

76 

77def _build_cargo_deny_command() -> list[str]: 

78 """Build the cargo deny check command. 

79 

80 Returns: 

81 List of command arguments. 

82 """ 

83 return [ 

84 "cargo", 

85 "deny", 

86 "check", 

87 "--format", 

88 "json", 

89 ] 

90 

91 

92@register_tool 

93@dataclass 

94class CargoDenyPlugin(BaseToolPlugin): 

95 """cargo-deny security and compliance checker plugin. 

96 

97 This plugin integrates cargo-deny with Lintro for checking Rust projects 

98 for license compliance, security advisories, banned dependencies, and 

99 duplicate dependencies. 

100 """ 

101 

102 @property 

103 def definition(self) -> ToolDefinition: 

104 """Return the tool definition. 

105 

106 Returns: 

107 ToolDefinition containing tool metadata. 

108 """ 

109 return ToolDefinition( 

110 name="cargo_deny", 

111 description=( 

112 "Checks licenses, advisories, bans, and duplicate dependencies" 

113 ), 

114 can_fix=False, 

115 tool_type=ToolType.SECURITY | ToolType.INFRASTRUCTURE, 

116 file_patterns=CARGO_DENY_FILE_PATTERNS, 

117 priority=CARGO_DENY_DEFAULT_PRIORITY, 

118 conflicts_with=[], 

119 native_configs=["deny.toml"], 

120 version_command=["cargo", "deny", "--version"], 

121 min_version="0.14.0", 

122 default_options={ 

123 "timeout": CARGO_DENY_DEFAULT_TIMEOUT, 

124 }, 

125 default_timeout=CARGO_DENY_DEFAULT_TIMEOUT, 

126 ) 

127 

128 def set_options( 

129 self, 

130 timeout: int | None = None, 

131 **kwargs: Any, 

132 ) -> None: 

133 """Set cargo-deny-specific options. 

134 

135 Args: 

136 timeout: Timeout in seconds (default: 60). 

137 **kwargs: Additional options. 

138 """ 

139 validate_positive_int(timeout, "timeout") 

140 

141 options = filter_none_options(timeout=timeout) 

142 super().set_options(**options, **kwargs) 

143 

144 def doc_url(self, code: str) -> str | None: 

145 """Return cargo-deny documentation URL. 

146 

147 Args: 

148 code: cargo-deny code (e.g., "L001", "A001"). 

149 

150 Returns: 

151 URL to the cargo-deny documentation, or None if code is empty. 

152 """ 

153 if not code: 

154 return None 

155 return DocUrlTemplate.CARGO_DENY 

156 

157 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

158 """Run `cargo deny check` and parse results. 

159 

160 Args: 

161 paths: List of file or directory paths to check. 

162 options: Runtime options that override defaults. 

163 

164 Returns: 

165 ToolResult with check results. 

166 """ 

167 # Use shared preparation for version check, path validation, file discovery 

168 ctx = self._prepare_execution( 

169 paths, 

170 options, 

171 no_files_message="No Cargo files found to check.", 

172 ) 

173 if ctx.should_skip: 

174 return ctx.early_result # type: ignore[return-value] 

175 

176 cargo_root = _find_cargo_root(ctx.files) 

177 if cargo_root is None: 

178 return ToolResult( 

179 name=self.definition.name, 

180 success=True, 

181 output="No Cargo.toml found; skipping cargo-deny.", 

182 issues_count=0, 

183 ) 

184 

185 cmd = _build_cargo_deny_command() 

186 

187 try: 

188 _, output = run_subprocess_with_timeout( 

189 tool=self, 

190 cmd=cmd, 

191 timeout=ctx.timeout, 

192 cwd=str(cargo_root), 

193 tool_name="cargo-deny", 

194 ) 

195 except subprocess.TimeoutExpired: 

196 timeout_result = create_timeout_result( 

197 tool=self, 

198 timeout=ctx.timeout, 

199 cmd=cmd, 

200 tool_name="cargo-deny", 

201 ) 

202 return ToolResult( 

203 name=self.definition.name, 

204 success=timeout_result.success, 

205 output=timeout_result.output, 

206 issues_count=timeout_result.issues_count, 

207 issues=timeout_result.issues, 

208 ) 

209 

210 issues = parse_cargo_deny_output(output=output) 

211 issues_count = len(issues) 

212 

213 # cargo-deny returns non-zero on any issues found 

214 # Consider it successful if we parsed output correctly 

215 return ToolResult( 

216 name=self.definition.name, 

217 success=issues_count == 0, 

218 output=None, 

219 issues_count=issues_count, 

220 issues=issues, 

221 ) 

222 

223 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

224 """cargo-deny cannot fix issues, only report them. 

225 

226 Args: 

227 paths: List of file or directory paths to fix. 

228 options: Tool-specific options. 

229 

230 Returns: 

231 Never returns; always raises. 

232 

233 Raises: 

234 NotImplementedError: cargo-deny does not support fixing issues. 

235 """ 

236 raise NotImplementedError( 

237 "cargo-deny cannot automatically fix issues. Run 'lintro check' to " 

238 "see issues and resolve them manually.", 

239 )