Coverage for tests / unit / parsers / test_semgrep_parser.py: 100%

162 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Unit tests for Semgrep output parsing and tool JSON extraction.""" 

2 

3from __future__ import annotations 

4 

5import json 

6from pathlib import Path 

7from types import SimpleNamespace 

8from typing import Any 

9 

10import pytest 

11from assertpy import assert_that 

12 

13from lintro.models.core.tool_result import ToolResult 

14from lintro.parsers.semgrep.semgrep_parser import parse_semgrep_output 

15from lintro.plugins import ToolRegistry 

16from lintro.tools.core.version_parsing import get_minimum_versions 

17 

18 

19def test_parse_semgrep_valid_output() -> None: 

20 """Parse a representative Semgrep JSON result and validate fields.""" 

21 sample_output = { 

22 "results": [ 

23 { 

24 "check_id": "python.lang.security.audit.dangerous-subprocess-use", 

25 "path": "app.py", 

26 "start": {"line": 15, "col": 5}, 

27 "end": {"line": 15, "col": 45}, 

28 "extra": { 

29 "message": "Detected subprocess call with shell=True", 

30 "severity": "WARNING", 

31 "metadata": { 

32 "category": "security", 

33 "cwe": ["CWE-78"], 

34 }, 

35 }, 

36 }, 

37 ], 

38 "errors": [], 

39 } 

40 output = json.dumps(sample_output) 

41 issues = parse_semgrep_output(output=output) 

42 assert_that(len(issues)).is_equal_to(1) 

43 issue = issues[0] 

44 assert_that(issue.file).is_equal_to("app.py") 

45 assert_that(issue.line).is_equal_to(15) 

46 assert_that(issue.column).is_equal_to(5) 

47 assert_that(issue.end_line).is_equal_to(15) 

48 assert_that(issue.end_column).is_equal_to(45) 

49 assert_that(issue.check_id).is_equal_to( 

50 "python.lang.security.audit.dangerous-subprocess-use", 

51 ) 

52 assert_that(issue.severity).is_equal_to("WARNING") 

53 assert_that(issue.category).is_equal_to("security") 

54 assert_that(issue.cwe).is_equal_to(["CWE-78"]) 

55 assert_that(issue.message).contains("shell=True") 

56 

57 

58def test_parse_semgrep_multiple_issues() -> None: 

59 """Parser should handle multiple results correctly.""" 

60 sample_output = { 

61 "results": [ 

62 { 

63 "check_id": "python.lang.security.audit.sql-injection", 

64 "path": "a.py", 

65 "start": {"line": 10, "col": 1}, 

66 "end": {"line": 10, "col": 50}, 

67 "extra": { 

68 "message": "SQL injection detected", 

69 "severity": "ERROR", 

70 "metadata": {"category": "security"}, 

71 }, 

72 }, 

73 { 

74 "check_id": "python.lang.security.audit.hardcoded-password", 

75 "path": "b.py", 

76 "start": {"line": 5, "col": 1}, 

77 "end": {"line": 5, "col": 30}, 

78 "extra": { 

79 "message": "Hardcoded password detected", 

80 "severity": "WARNING", 

81 "metadata": {"category": "security"}, 

82 }, 

83 }, 

84 ], 

85 "errors": [], 

86 } 

87 output = json.dumps(sample_output) 

88 issues = parse_semgrep_output(output=output) 

89 assert_that(len(issues)).is_equal_to(2) 

90 assert_that(issues[0].file).is_equal_to("a.py") 

91 assert_that(issues[0].severity).is_equal_to("ERROR") 

92 assert_that(issues[1].file).is_equal_to("b.py") 

93 assert_that(issues[1].severity).is_equal_to("WARNING") 

94 

95 

96def test_parse_semgrep_empty_results() -> None: 

97 """Ensure an empty results list returns no issues.""" 

98 issues = parse_semgrep_output(output=json.dumps({"results": []})) 

99 assert_that(issues).is_equal_to([]) 

100 

101 

102def test_parse_semgrep_none_output() -> None: 

103 """None output should return empty list.""" 

104 issues = parse_semgrep_output(output=None) 

105 assert_that(issues).is_equal_to([]) 

106 

107 

108def test_parse_semgrep_empty_string_output() -> None: 

109 """Empty string output should return empty list.""" 

110 issues = parse_semgrep_output(output="") 

111 assert_that(issues).is_equal_to([]) 

112 

113 

114def test_parse_semgrep_missing_results_key() -> None: 

115 """Missing results should behave as empty list (no crash).""" 

116 issues = parse_semgrep_output(output=json.dumps({})) 

117 assert_that(issues).is_equal_to([]) 

118 

119 

120def test_parse_semgrep_handles_malformed_issue_gracefully() -> None: 

121 """Malformed issue entries should be skipped gracefully. 

122 

123 Note: Warnings are logged via loguru (visible in test output when run with 

124 -s flag) but not asserted here due to loguru's capture complexity. 

125 """ 

126 malformed = { 

127 "results": [ 

128 None, 

129 42, 

130 {"check_id": "test", "path": "x.py", "start": {"line": "NaN"}}, 

131 ], 

132 } 

133 issues = parse_semgrep_output(output=json.dumps(malformed)) 

134 assert_that(issues).is_equal_to([]) 

135 

136 

137def test_parse_semgrep_cwe_as_string() -> None: 

138 """CWE can be a single string instead of list.""" 

139 sample_output = { 

140 "results": [ 

141 { 

142 "check_id": "test.rule", 

143 "path": "test.py", 

144 "start": {"line": 1, "col": 1}, 

145 "end": {"line": 1, "col": 10}, 

146 "extra": { 

147 "message": "Test issue", 

148 "severity": "INFO", 

149 "metadata": { 

150 "category": "correctness", 

151 "cwe": "CWE-123", 

152 }, 

153 }, 

154 }, 

155 ], 

156 } 

157 output = json.dumps(sample_output) 

158 issues = parse_semgrep_output(output=output) 

159 assert_that(len(issues)).is_equal_to(1) 

160 assert_that(issues[0].cwe).is_equal_to(["CWE-123"]) 

161 

162 

163def test_parse_semgrep_missing_optional_fields() -> None: 

164 """Parser should handle missing optional fields gracefully.""" 

165 sample_output = { 

166 "results": [ 

167 { 

168 "check_id": "test.rule", 

169 "path": "test.py", 

170 "start": {"line": 1}, 

171 "end": {"line": 1}, 

172 "extra": { 

173 "message": "Test issue", 

174 }, 

175 }, 

176 ], 

177 } 

178 output = json.dumps(sample_output) 

179 issues = parse_semgrep_output(output=output) 

180 assert_that(len(issues)).is_equal_to(1) 

181 issue = issues[0] 

182 assert_that(issue.column).is_equal_to(0) 

183 assert_that(issue.end_column).is_equal_to(0) 

184 assert_that(issue.severity).is_equal_to("WARNING") 

185 assert_that(issue.category).is_equal_to("") 

186 assert_that(issue.cwe).is_none() 

187 

188 

189def test_parse_semgrep_invalid_json() -> None: 

190 """Invalid JSON should return empty list without crashing.""" 

191 issues = parse_semgrep_output(output="not valid json") 

192 assert_that(issues).is_equal_to([]) 

193 

194 

195def test_parse_semgrep_non_object_json() -> None: 

196 """Non-object JSON should return empty list and log warning.""" 

197 issues = parse_semgrep_output(output=json.dumps([1, 2, 3])) 

198 assert_that(issues).is_equal_to([]) 

199 

200 

201def test_parse_semgrep_non_list_results() -> None: 

202 """Non-list results should return empty list and log warning.""" 

203 issues = parse_semgrep_output(output=json.dumps({"results": "not a list"})) 

204 assert_that(issues).is_equal_to([]) 

205 

206 

207def test_semgrep_check_parses_mixed_output_json( 

208 monkeypatch: pytest.MonkeyPatch, 

209 tmp_path: Path, 

210) -> None: 

211 """SemgrepTool.check should parse JSON amidst mixed stdout/stderr text. 

212 

213 Args: 

214 monkeypatch: Pytest monkeypatch fixture. 

215 tmp_path: Temporary directory path fixture. 

216 """ 

217 p = tmp_path / "a.py" 

218 p.write_text("print('hello')\n") 

219 sample = { 

220 "errors": [], 

221 "results": [ 

222 { 

223 "check_id": "test.rule", 

224 "path": str(p), 

225 "start": {"line": 1, "col": 1}, 

226 "end": {"line": 1, "col": 15}, 

227 "extra": { 

228 "message": "Test issue detected.", 

229 "severity": "INFO", 

230 "metadata": {"category": "correctness"}, 

231 }, 

232 }, 

233 ], 

234 } 

235 mixed_stdout = "Running semgrep...\n" + json.dumps(sample) + "\n" 

236 mixed_stderr = "[main] INFO done\n" 

237 

238 def fake_run( 

239 cmd: list[str], 

240 capture_output: bool, 

241 text: bool, 

242 timeout: int, 

243 **kwargs: Any, 

244 ) -> SimpleNamespace: 

245 # Handle version check calls 

246 if "--version" in cmd: 

247 return SimpleNamespace(stdout="semgrep 1.151.0", stderr="", returncode=0) 

248 # Handle actual check calls 

249 return SimpleNamespace( 

250 stdout=mixed_stdout, 

251 stderr=mixed_stderr, 

252 returncode=0, 

253 ) 

254 

255 monkeypatch.setattr("subprocess.run", fake_run) 

256 tool = ToolRegistry.get("semgrep") 

257 assert_that(tool).is_not_none() 

258 result: ToolResult = tool.check([str(p)], {}) 

259 assert_that(isinstance(result, ToolResult)).is_true() 

260 assert_that(result.name).is_equal_to("semgrep") 

261 assert_that(result.success is True).is_true() 

262 assert_that(result.issues_count).is_equal_to(1) 

263 

264 

265def test_semgrep_check_handles_nonzero_rc_with_errors_array( 

266 monkeypatch: pytest.MonkeyPatch, 

267 tmp_path: Path, 

268) -> None: 

269 """Ensure nonzero return with JSON errors[] sets success False but parses. 

270 

271 Args: 

272 monkeypatch: Pytest monkeypatch fixture. 

273 tmp_path: Temporary directory path fixture. 

274 """ 

275 p = tmp_path / "c.py" 

276 p.write_text("print('x')\n") 

277 sample = { 

278 "errors": [ 

279 {"path": str(p), "message": "config error"}, 

280 ], 

281 "results": [ 

282 { 

283 "check_id": "test.rule", 

284 "path": str(p), 

285 "start": {"line": 1, "col": 1}, 

286 "end": {"line": 1, "col": 10}, 

287 "extra": { 

288 "message": "Test issue detected.", 

289 "severity": "WARNING", 

290 "metadata": {"category": "security"}, 

291 }, 

292 }, 

293 ], 

294 } 

295 

296 def fake_run( 

297 cmd: list[str], 

298 capture_output: bool, 

299 text: bool, 

300 timeout: int, 

301 **kwargs: Any, 

302 ) -> SimpleNamespace: 

303 # Handle version check calls 

304 if "--version" in cmd: 

305 return SimpleNamespace(stdout="semgrep 1.151.0", stderr="", returncode=0) 

306 # Handle actual check calls 

307 return SimpleNamespace(stdout=json.dumps(sample), stderr="", returncode=1) 

308 

309 monkeypatch.setattr("subprocess.run", fake_run) 

310 tool = ToolRegistry.get("semgrep") 

311 assert_that(tool).is_not_none() 

312 result: ToolResult = tool.check([str(p)], {}) 

313 assert_that(result.success).is_false() 

314 assert_that(result.issues_count).is_equal_to(1) 

315 

316 

317def test_semgrep_check_handles_unparseable_output( 

318 monkeypatch: pytest.MonkeyPatch, 

319 tmp_path: Path, 

320) -> None: 

321 """On unparseable output, SemgrepTool.check should fail gracefully. 

322 

323 Args: 

324 monkeypatch: Pytest monkeypatch fixture. 

325 tmp_path: Temporary directory path fixture. 

326 """ 

327 p = tmp_path / "b.py" 

328 p.write_text("x=1\n") 

329 

330 def fake_run( 

331 cmd: list[str], 

332 capture_output: bool, 

333 text: bool, 

334 timeout: int, 

335 **kwargs: Any, 

336 ) -> SimpleNamespace: 

337 # Handle version check calls 

338 if "--version" in cmd: 

339 return SimpleNamespace(stdout="semgrep 1.151.0", stderr="", returncode=0) 

340 # Handle actual check calls 

341 return SimpleNamespace(stdout="nonsense", stderr="also nonsense", returncode=1) 

342 

343 monkeypatch.setattr("subprocess.run", fake_run) 

344 tool = ToolRegistry.get("semgrep") 

345 assert_that(tool).is_not_none() 

346 result: ToolResult = tool.check([str(p)], {}) 

347 assert_that(isinstance(result, ToolResult)).is_true() 

348 assert_that(result.name).is_equal_to("semgrep") 

349 assert_that(result.success is False).is_true() 

350 assert_that(result.issues_count).is_equal_to(0) 

351 

352 

353def test_semgrep_issue_display_row() -> None: 

354 """Test that SemgrepIssue.to_display_row returns correct values.""" 

355 from lintro.parsers.semgrep.semgrep_issue import SemgrepIssue 

356 

357 issue = SemgrepIssue( 

358 file="test.py", 

359 line=10, 

360 column=5, 

361 message="Test message", 

362 check_id="test.rule.id", 

363 severity="ERROR", 

364 ) 

365 row = issue.to_display_row() 

366 assert_that(row["file"]).is_equal_to("test.py") 

367 assert_that(row["line"]).is_equal_to("10") 

368 assert_that(row["column"]).is_equal_to("5") 

369 assert_that(row["code"]).is_equal_to("test.rule.id") 

370 assert_that(row["severity"]).is_equal_to("ERROR") 

371 

372 

373def test_semgrep_tool_definition() -> None: 

374 """Test that Semgrep tool definition has correct values.""" 

375 from lintro.enums.tool_type import ToolType 

376 

377 tool = ToolRegistry.get("semgrep") 

378 assert_that(tool).is_not_none() 

379 defn = tool.definition 

380 assert_that(defn.name).is_equal_to("semgrep") 

381 assert_that(defn.can_fix).is_false() 

382 assert_that(defn.tool_type).is_equal_to(ToolType.LINTER | ToolType.SECURITY) 

383 assert_that(defn.min_version).is_equal_to(get_minimum_versions()["semgrep"]) 

384 assert_that("*.py" in defn.file_patterns).is_true() 

385 assert_that("*.js" in defn.file_patterns).is_true() 

386 assert_that("*.go" in defn.file_patterns).is_true() 

387 

388 

389def test_semgrep_set_options_validates_severity() -> None: 

390 """Test that set_options validates severity correctly.""" 

391 tool = ToolRegistry.get("semgrep") 

392 assert_that(tool).is_not_none() 

393 

394 # Valid severity should work 

395 tool.set_options(severity="WARNING") 

396 assert_that(tool.options.get("severity")).is_equal_to("WARNING") 

397 

398 # Invalid severity should raise 

399 with pytest.raises(ValueError, match="Invalid Semgrep severity"): 

400 tool.set_options(severity="INVALID") 

401 

402 

403def test_semgrep_set_options_validates_jobs() -> None: 

404 """Test that set_options validates jobs correctly.""" 

405 tool = ToolRegistry.get("semgrep") 

406 assert_that(tool).is_not_none() 

407 

408 # Valid jobs should work 

409 tool.set_options(jobs=4) 

410 assert_that(tool.options.get("jobs")).is_equal_to(4) 

411 

412 # Invalid jobs should raise 

413 with pytest.raises(ValueError, match="jobs must be a positive integer"): 

414 tool.set_options(jobs=0) 

415 

416 with pytest.raises(ValueError, match="jobs must be a positive integer"): 

417 tool.set_options(jobs=-1)