Coverage for lintro / tools / definitions / osv_scanner.py: 93%

114 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""OSV-Scanner tool definition. 

2 

3OSV-Scanner is Google's vulnerability scanner that uses the Open Source 

4Vulnerabilities (OSV) database. It supports scanning lockfiles and SBOMs 

5for known vulnerabilities across multiple ecosystems including PyPI, npm, 

6Go, Rust, Ruby, PHP, .NET, Java, and more. 

7""" 

8 

9from __future__ import annotations 

10 

11import os 

12import subprocess # nosec B404 - used safely with shell disabled 

13from dataclasses import dataclass 

14from pathlib import Path 

15from typing import Any 

16 

17from loguru import logger 

18 

19from lintro._tool_versions import get_min_version 

20from lintro.enums.doc_url_template import DocUrlTemplate 

21from lintro.enums.tool_name import ToolName 

22from lintro.enums.tool_type import ToolType 

23from lintro.models.core.tool_result import ToolResult 

24from lintro.parsers.osv_scanner import ( 

25 classify_suppressions, 

26 parse_osv_scanner_output, 

27 parse_suppressions, 

28) 

29from lintro.plugins.base import BaseToolPlugin 

30from lintro.plugins.execution_preparation import ( 

31 get_effective_timeout, 

32 verify_tool_version, 

33) 

34from lintro.plugins.protocol import ToolDefinition 

35from lintro.plugins.registry import register_tool 

36from lintro.tools.core.option_validators import validate_bool, validate_positive_int 

37 

38# Constants 

39OSV_SCANNER_DEFAULT_TIMEOUT: int = 120 # Network operations can be slow 

40OSV_SCANNER_DEFAULT_PRIORITY: int = 90 # High priority for security tool 

41 

42 

43@register_tool 

44@dataclass 

45class OsvScannerPlugin(BaseToolPlugin): 

46 """OSV-Scanner vulnerability scanning plugin. 

47 

48 This plugin integrates OSV-Scanner with Lintro for scanning lockfiles 

49 for known vulnerabilities across multiple ecosystems. 

50 

51 Unlike other tool plugins, osv-scanner handles its own file discovery 

52 via --recursive, so file_patterns is empty and check() bypasses the 

53 standard file discovery pipeline. 

54 """ 

55 

56 @property 

57 def definition(self) -> ToolDefinition: 

58 """Return the tool definition. 

59 

60 Returns: 

61 ToolDefinition containing tool metadata. 

62 """ 

63 return ToolDefinition( 

64 name="osv_scanner", 

65 description=( 

66 "Google's vulnerability scanner using the OSV database " 

67 "for multi-ecosystem dependency scanning" 

68 ), 

69 can_fix=False, 

70 tool_type=ToolType.SECURITY, 

71 file_patterns=[], 

72 priority=OSV_SCANNER_DEFAULT_PRIORITY, 

73 conflicts_with=[], 

74 native_configs=[".osv-scanner.toml"], 

75 version_command=["osv-scanner", "--version"], 

76 min_version=get_min_version(ToolName.OSV_SCANNER), 

77 default_options={ 

78 "timeout": OSV_SCANNER_DEFAULT_TIMEOUT, 

79 "check_suppressions": True, 

80 }, 

81 default_timeout=OSV_SCANNER_DEFAULT_TIMEOUT, 

82 ) 

83 

84 def set_options(self, **kwargs: Any) -> None: 

85 """Set tool-specific options. 

86 

87 Args: 

88 **kwargs: Options to set, including timeout and 

89 check_suppressions. 

90 """ 

91 if "timeout" in kwargs: 

92 validate_positive_int(kwargs["timeout"], "timeout") 

93 if "check_suppressions" in kwargs: 

94 validate_bool(kwargs["check_suppressions"], "check_suppressions") 

95 super().set_options(**kwargs) 

96 

97 def _build_command(self, scan_root: Path) -> list[str]: 

98 """Build the osv-scanner scan command. 

99 

100 Uses --recursive to let osv-scanner discover lockfiles itself, 

101 rather than maintaining a separate list of file patterns. 

102 Passes --config explicitly because osv-scanner's auto-discovery 

103 does not work reliably in --recursive mode. 

104 

105 Args: 

106 scan_root: Root directory to scan recursively. 

107 

108 Returns: 

109 Command list for running osv-scanner with JSON output. 

110 """ 

111 cmd = [ 

112 *self._get_executable_command("osv-scanner"), 

113 "scan", 

114 "--recursive", 

115 "--format", 

116 "json", 

117 ] 

118 

119 config = self._find_config_file(scan_root) 

120 if config is not None: 

121 cmd.extend(["--config", str(config)]) 

122 

123 cmd.append(str(scan_root)) 

124 return cmd 

125 

126 def _build_probe_command(self, scan_root: Path) -> list[str]: 

127 """Build an osv-scanner command that ignores all suppressions. 

128 

129 Uses --config /dev/null to disable .osv-scanner.toml so the 

130 scan reports all vulnerabilities, including suppressed ones. 

131 This "probe" output is used to detect stale suppressions. 

132 

133 Args: 

134 scan_root: Root directory to scan recursively. 

135 

136 Returns: 

137 Command list for running osv-scanner without suppressions. 

138 """ 

139 return [ 

140 *self._get_executable_command("osv-scanner"), 

141 "scan", 

142 "--recursive", 

143 "--format", 

144 "json", 

145 "--config", 

146 os.devnull, 

147 str(scan_root), 

148 ] 

149 

150 @staticmethod 

151 def _find_config_file(scan_root: Path) -> Path | None: 

152 """Find .osv-scanner.toml by walking up from the scan root. 

153 

154 Matches osv-scanner's own config resolution: looks for the file 

155 in the scan root and each parent directory up to the filesystem 

156 root. 

157 

158 Args: 

159 scan_root: Directory to start searching from. 

160 

161 Returns: 

162 Path to the config file, or None if not found. 

163 """ 

164 current = scan_root.resolve() 

165 for directory in [current, *current.parents]: 

166 config = directory / ".osv-scanner.toml" 

167 if config.is_file(): 

168 return config 

169 return None 

170 

171 def _resolve_scan_root(self, paths: list[str]) -> Path: 

172 """Resolve the scan root from input paths. 

173 

174 Args: 

175 paths: Input file or directory paths. 

176 

177 Returns: 

178 Common ancestor directory for all paths. 

179 """ 

180 resolved: list[Path] = [] 

181 for raw_path in paths: 

182 p = Path(raw_path).resolve() 

183 resolved.append(p if p.is_dir() else p.parent) 

184 

185 if len(resolved) == 1: 

186 return resolved[0] 

187 

188 try: 

189 return Path(os.path.commonpath([str(p) for p in resolved])) 

190 except ValueError: 

191 return resolved[0] 

192 

193 def doc_url(self, code: str) -> str | None: 

194 """Return OSV vulnerability database URL for the given ID. 

195 

196 Args: 

197 code: Vulnerability ID (e.g., "GHSA-xxxx", "CVE-xxxx", "PYSEC-xxxx"). 

198 

199 Returns: 

200 URL to the OSV vulnerability page, or None if code is empty. 

201 """ 

202 if not code: 

203 return None 

204 return DocUrlTemplate.OSV.format(code=code) 

205 

206 def check(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

207 """Scan for known vulnerabilities using osv-scanner --recursive. 

208 

209 Bypasses the standard file discovery pipeline since osv-scanner 

210 discovers lockfiles itself. Only does version checking and 

211 options merging before running the scan. 

212 

213 Args: 

214 paths: List of file or directory paths to scan. 

215 options: Runtime options that override defaults. 

216 

217 Returns: 

218 ToolResult with scan results. 

219 """ 

220 if not paths: 

221 return ToolResult( 

222 name=self.definition.name, 

223 success=True, 

224 output="No paths to check.", 

225 issues_count=0, 

226 ) 

227 

228 # Version check 

229 version_result = verify_tool_version(self.definition) 

230 if version_result is not None: 

231 return version_result 

232 

233 # Merge options 

234 merged_options = dict(self.options) 

235 merged_options.update(options) 

236 timeout = get_effective_timeout( 

237 timeout=None, 

238 options=merged_options, 

239 default_timeout=self.definition.default_timeout, 

240 ) 

241 

242 scan_root = self._resolve_scan_root(paths) 

243 cmd = self._build_command(scan_root) 

244 logger.debug( 

245 f"[osv-scanner] Running: {' '.join(cmd[:10])}... (cwd={scan_root})", 

246 ) 

247 

248 try: 

249 # osv-scanner returns non-zero when vulnerabilities exist 

250 success, output = self._run_subprocess( 

251 cmd, 

252 timeout=timeout, 

253 cwd=str(scan_root), 

254 ) 

255 except subprocess.TimeoutExpired: 

256 return ToolResult( 

257 name=self.definition.name, 

258 success=False, 

259 output=f"OSV-Scanner timed out after {timeout}s", 

260 issues_count=0, 

261 ) 

262 

263 issues = parse_osv_scanner_output(output) 

264 

265 # Treat "no package sources found" as a successful no-op, not an error. 

266 # osv-scanner returns non-zero when it finds no lockfiles to scan. 

267 if not success and len(issues) == 0 and output: 

268 no_op_indicators = ["no package sources found", "0 packages"] 

269 if any(indicator in output.lower() for indicator in no_op_indicators): 

270 success = True 

271 

272 # Determine overall success: subprocess must succeed AND no issues 

273 # found. A non-zero exit with 0 parsed issues indicates an execution 

274 # error (e.g. network failure), not a clean scan. 

275 overall_success = success and len(issues) == 0 

276 

277 # Show output when there are issues OR when subprocess failed without 

278 # issues (execution error case) 

279 should_show_output = bool(issues) or not success 

280 

281 # Suppression staleness check 

282 suppression_metadata = self._check_suppression_staleness( 

283 scan_root=scan_root, 

284 timeout=timeout, 

285 options=merged_options, 

286 ) 

287 

288 return ToolResult( 

289 name=self.definition.name, 

290 success=overall_success, 

291 output=output if should_show_output else None, 

292 issues_count=len(issues), 

293 issues=issues if issues else None, 

294 ai_metadata=suppression_metadata, 

295 ) 

296 

297 def _check_suppression_staleness( 

298 self, 

299 scan_root: Path, 

300 timeout: float, 

301 options: dict[str, object], 

302 ) -> dict[str, Any] | None: 

303 """Run a probe scan to classify suppression entries. 

304 

305 Skipped when check_suppressions is disabled or no config file 

306 with suppressions exists. 

307 

308 Args: 

309 scan_root: Root directory for the scan. 

310 timeout: Timeout for subprocess execution. 

311 options: Merged runtime options. 

312 

313 Returns: 

314 Metadata dict with suppression classifications, or None. 

315 """ 

316 check = options.get( 

317 "check_suppressions", 

318 self.options.get("check_suppressions", True), 

319 ) 

320 if not check: 

321 return None 

322 

323 config_path = self._find_config_file(scan_root) 

324 if config_path is None: 

325 return None 

326 

327 entries = parse_suppressions(config_path) 

328 if not entries: 

329 return None 

330 

331 # Run osv-scanner without suppressions to see all vulnerabilities 

332 probe_cmd = self._build_probe_command(scan_root) 

333 try: 

334 _probe_success, probe_output = self._run_subprocess( 

335 probe_cmd, 

336 timeout=timeout, 

337 cwd=str(scan_root), 

338 ) 

339 except subprocess.TimeoutExpired: 

340 logger.debug("[osv-scanner] Probe scan timed out, skipping staleness check") 

341 return None 

342 

343 probe_issues = parse_osv_scanner_output(probe_output) 

344 

345 # If probe failed and returned no parseable issues, skip classification 

346 # to avoid incorrectly marking all suppressions as stale. 

347 if not _probe_success and not probe_issues: 

348 logger.debug( 

349 "[osv-scanner] Probe scan failed with no parseable output, " 

350 "skipping staleness check", 

351 ) 

352 return None 

353 

354 probe_vuln_ids = {issue.vuln_id for issue in probe_issues} 

355 

356 classified = classify_suppressions(entries, probe_vuln_ids) 

357 

358 return { 

359 "suppressions": [ 

360 { 

361 "id": c.entry.id, 

362 "ignore_until": c.entry.ignore_until.isoformat(), 

363 "reason": c.entry.reason, 

364 "status": c.status.value, 

365 } 

366 for c in classified 

367 ], 

368 } 

369 

370 def fix(self, paths: list[str], options: dict[str, object]) -> ToolResult: 

371 """OSV-Scanner cannot fix vulnerabilities, only report them. 

372 

373 Args: 

374 paths: List of file or directory paths to fix. 

375 options: Tool-specific options. 

376 

377 Returns: 

378 ToolResult: Never returns, always raises NotImplementedError. 

379 

380 Raises: 

381 NotImplementedError: OSV-Scanner does not support fixing issues. 

382 """ 

383 raise NotImplementedError( 

384 "OSV-Scanner cannot automatically fix vulnerabilities. " 

385 "Update affected packages to their fixed versions.", 

386 )