Coverage for lintro / ai / integrations / github_pr.py: 66%

210 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""GitHub PR review comment integration for AI findings. 

2 

3Posts AI summaries and fix suggestions as inline PR review comments 

4using the GitHub REST API via ``urllib.request``. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import os 

11import urllib.error 

12import urllib.parse 

13import urllib.request 

14from collections.abc import Sequence 

15from pathlib import Path 

16from typing import Any 

17 

18from loguru import logger 

19 

20from lintro.ai.enums import ConfidenceLevel 

21from lintro.ai.models import AIFixSuggestion, AISummary 

22from lintro.ai.paths import OUTSIDE_WORKSPACE_SENTINEL, to_provider_path 

23 

24 

25class GitHubPRReporter: 

26 """Post AI findings as GitHub PR review comments. 

27 

28 Requires the following environment variables: 

29 - ``GITHUB_TOKEN``: GitHub API token with ``pull-requests: write`` 

30 - ``GITHUB_REPOSITORY``: Owner/repo (e.g. ``"octocat/hello-world"``) 

31 

32 The PR number is detected from ``GITHUB_REF`` (``refs/pull/<n>/merge``) 

33 or can be provided directly. 

34 """ 

35 

36 def __init__( 

37 self, 

38 *, 

39 token: str | None = None, 

40 repo: str | None = None, 

41 pr_number: int | None = None, 

42 api_base: str = "https://api.github.com", 

43 workspace_root: Path | None = None, 

44 ) -> None: 

45 """Initialize the GitHub PR reporter. 

46 

47 Args: 

48 token: GitHub API token. Falls back to ``GITHUB_TOKEN`` env var. 

49 repo: Repository in ``owner/repo`` format. Falls back to 

50 ``GITHUB_REPOSITORY`` env var. 

51 pr_number: PR number. Falls back to parsing ``GITHUB_REF``. 

52 api_base: GitHub API base URL. 

53 workspace_root: Workspace root for deriving repo-relative paths. 

54 Falls back to ``GITHUB_WORKSPACE`` env var, then the 

55 git repository root via ``git rev-parse``. 

56 """ 

57 self.token = token if token is not None else os.environ.get("GITHUB_TOKEN", "") 

58 self.repo = ( 

59 repo if repo is not None else os.environ.get("GITHUB_REPOSITORY", "") 

60 ) 

61 self.pr_number = pr_number if pr_number is not None else _detect_pr_number() 

62 self.api_base = api_base.rstrip("/") 

63 

64 self.workspace_root: Path | None 

65 if workspace_root is not None: 

66 self.workspace_root = workspace_root 

67 else: 

68 gh_ws = os.environ.get("GITHUB_WORKSPACE", "") 

69 self.workspace_root = Path(gh_ws) if gh_ws else _detect_repo_root() 

70 

71 def is_available(self) -> bool: 

72 """Check whether all required context is present. 

73 

74 Returns: 

75 True if token, repo, and PR number are all set. 

76 """ 

77 return bool(self.token and self.repo and self.pr_number) 

78 

79 def post_review_comments( 

80 self, 

81 suggestions: Sequence[AIFixSuggestion], 

82 summary: AISummary | None = None, 

83 ) -> bool: 

84 """Post AI findings as PR review comments. 

85 

86 Posts a top-level comment with the AI summary (if present), 

87 then individual inline review comments for each fix suggestion. 

88 

89 Args: 

90 suggestions: AI fix suggestions to post as inline comments. 

91 summary: Optional AI summary to post as a top-level comment. 

92 

93 Returns: 

94 True if all comments were posted successfully. 

95 """ 

96 if not self.is_available(): 

97 logger.warning( 

98 "GitHub PR context not available — skipping review comments", 

99 ) 

100 return False 

101 

102 success = True 

103 

104 if summary and summary.overview: 

105 body = _format_summary_comment(summary) 

106 if not self._post_issue_comment(body): 

107 success = False 

108 

109 if suggestions and not self._post_review(suggestions): 

110 success = False 

111 

112 return success 

113 

114 def _post_review(self, suggestions: Sequence[AIFixSuggestion]) -> bool: 

115 """Post inline review comments for fix suggestions. 

116 

117 Suggestions whose file/line can be mapped to the PR diff are posted 

118 as inline review comments. Any suggestion that cannot be mapped 

119 (file not in diff, or line outside changed hunks) is posted as a 

120 standalone issue comment so one unmappable entry cannot cause a 422 

121 that rejects the entire review batch. 

122 

123 Args: 

124 suggestions: Fix suggestions to post. 

125 

126 Returns: 

127 True if all comments were posted successfully. 

128 """ 

129 diff_lines = self._fetch_pr_diff_lines() 

130 comments: list[dict[str, Any]] = [] 

131 fallback_suggestions: list[AIFixSuggestion] = [] 

132 

133 for s in suggestions: 

134 # Resolve repo-relative path 

135 if self.workspace_root is not None: 

136 raw_path = to_provider_path(s.file, self.workspace_root) 

137 else: 

138 raw_path = s.file 

139 rel = raw_path.removeprefix("./").replace("\\", "/") if raw_path else "" 

140 # Skip empty, outside-workspace sentinel, and parent-relative paths. 

141 # Note: absence of "/" does not imply out-of-workspace — repo-root 

142 # files like "README.md" or "pyproject.toml" are valid. 

143 if not rel or rel == OUTSIDE_WORKSPACE_SENTINEL or rel.startswith(".."): 

144 continue 

145 body = _format_inline_comment(s) 

146 has_line = isinstance(s.line, int) and s.line > 0 

147 

148 # Suggestions without a valid line or not in the PR diff fall back 

149 # to standalone issue comments instead of inline review comments. 

150 if ( 

151 not has_line 

152 or diff_lines is None 

153 or s.line not in diff_lines.get(rel, set()) 

154 ): 

155 fallback_suggestions.append(s) 

156 continue 

157 

158 comment: dict[str, Any] = { 

159 "path": rel, 

160 "body": body, 

161 "line": s.line, 

162 "side": "RIGHT", 

163 } 

164 comments.append(comment) 

165 

166 success = True 

167 

168 if comments: 

169 payload = { 

170 "event": "COMMENT", 

171 "body": "Lintro AI review", 

172 "comments": comments, 

173 } 

174 url = f"{self.api_base}/repos/{self.repo}/pulls/{self.pr_number}/reviews" 

175 if not self._api_request("POST", url, payload): 

176 success = False 

177 

178 # Post unmappable suggestions as standalone issue comments 

179 for s in fallback_suggestions: 

180 body = _format_inline_comment(s) 

181 location = f"`{s.file}:{s.line}`" if s.line else f"`{s.file}`" 

182 if not self._post_issue_comment(f"{location}\n\n{body}"): 

183 success = False 

184 

185 return success 

186 

187 def _fetch_pr_diff_lines(self) -> dict[str, set[int]] | None: 

188 """Fetch changed lines per file from the PR diff. 

189 

190 Paginates through all pages of the ``GET /pulls/{pr}/files`` 

191 endpoint (up to 100 files per page) so large PRs are fully covered. 

192 

193 Returns: 

194 Mapping of ``{file_path: {line_numbers...}}`` for right-side 

195 (added/modified) lines, or ``None`` if the diff cannot be fetched. 

196 """ 

197 base_url = f"{self.api_base}/repos/{self.repo}/pulls/{self.pr_number}/files" 

198 parsed = urllib.parse.urlparse(base_url) 

199 if parsed.scheme != "https": 

200 return None 

201 

202 all_files: list[dict[str, Any]] = [] 

203 page = 1 

204 while True: 

205 url = f"{base_url}?per_page=100&page={page}" 

206 req = urllib.request.Request( 

207 url, 

208 method="GET", 

209 headers={ 

210 "Authorization": f"Bearer {self.token}", 

211 "Accept": "application/vnd.github+json", 

212 "X-GitHub-Api-Version": "2022-11-28", 

213 }, 

214 ) 

215 try: 

216 with urllib.request.urlopen( # noqa: S310 — HTTPS-only validated above # nosemgrep: dynamic-urllib-use-detected # nosec B310 

217 req, 

218 timeout=30, 

219 ) as resp: 

220 files_page = json.loads(resp.read().decode()) 

221 except (urllib.error.URLError, json.JSONDecodeError, OSError): 

222 logger.debug( 

223 "Failed to fetch PR diff; skipping diff-position filtering", 

224 ) 

225 return None 

226 

227 if not files_page: 

228 break 

229 all_files.extend(files_page) 

230 if len(files_page) < 100: 

231 break 

232 page += 1 

233 

234 result: dict[str, set[int]] = {} 

235 for f in all_files: 

236 filename = f.get("filename", "") 

237 patch = f.get("patch", "") 

238 if not filename or not patch: 

239 continue 

240 result[filename] = _parse_patch_lines(patch) 

241 return result 

242 

243 def _post_issue_comment(self, body: str) -> bool: 

244 """Post a top-level issue comment on the PR. 

245 

246 Args: 

247 body: Comment body in Markdown. 

248 

249 Returns: 

250 True if posted successfully. 

251 """ 

252 url = f"{self.api_base}/repos/{self.repo}/issues/{self.pr_number}/comments" 

253 return self._api_request("POST", url, {"body": body}) 

254 

255 def _api_request( 

256 self, 

257 method: str, 

258 url: str, 

259 payload: dict[str, Any], 

260 ) -> bool: 

261 """Make an authenticated GitHub API request. 

262 

263 Args: 

264 method: HTTP method. 

265 url: Full API URL. 

266 payload: JSON payload. 

267 

268 Returns: 

269 True if the request succeeded (2xx status). 

270 """ 

271 data = json.dumps(payload).encode() 

272 req = urllib.request.Request( 

273 url, 

274 data=data, 

275 method=method, 

276 headers={ 

277 "Authorization": f"Bearer {self.token}", 

278 "Accept": "application/vnd.github+json", 

279 "Content-Type": "application/json", 

280 "X-GitHub-Api-Version": "2022-11-28", 

281 }, 

282 ) 

283 parsed = urllib.parse.urlparse(url) 

284 if parsed.scheme != "https": 

285 logger.warning("Refusing non-HTTPS URL: {}", url) 

286 return False 

287 

288 try: 

289 with urllib.request.urlopen( # noqa: S310 — HTTPS-only validated above # nosemgrep: dynamic-urllib-use-detected # nosec B310 

290 req, 

291 timeout=30, 

292 ) as resp: 

293 status: int = resp.status 

294 return 200 <= status < 300 

295 except urllib.error.HTTPError as e: 

296 try: 

297 body = e.read().decode("utf-8", "replace")[:500] 

298 except (AttributeError, UnicodeDecodeError, ValueError, OSError): 

299 body = "<unreadable>" 

300 logger.warning( 

301 "GitHub API request failed: {} {} -> {}: {}", 

302 method, 

303 url, 

304 e.code, 

305 body, 

306 ) 

307 return False 

308 except urllib.error.URLError as e: 

309 logger.warning("GitHub API request error: {}", e.reason) 

310 return False 

311 

312 

313def _detect_repo_root() -> Path | None: 

314 """Detect the git repository root via ``git rev-parse``. 

315 

316 Returns: 

317 Repository root path, or ``None`` if detection fails. 

318 """ 

319 import shutil 

320 import subprocess 

321 

322 if not shutil.which("git"): 

323 return None 

324 

325 try: 

326 result = subprocess.run( 

327 ["git", "rev-parse", "--show-toplevel"], 

328 capture_output=True, 

329 text=True, 

330 check=True, 

331 timeout=5, 

332 ) 

333 toplevel = result.stdout.strip() 

334 return Path(toplevel) if toplevel else None 

335 except (subprocess.SubprocessError, FileNotFoundError, OSError): 

336 return None 

337 

338 

339def _parse_patch_lines(patch: str) -> set[int]: 

340 """Extract right-side (new) line numbers from a unified diff patch. 

341 

342 Args: 

343 patch: The ``patch`` field from the GitHub files API. 

344 

345 Returns: 

346 Set of line numbers on the right side of the diff. 

347 """ 

348 import re 

349 

350 lines: set[int] = set() 

351 current_line = 0 

352 for raw_line in patch.split("\n"): 

353 hunk_match = re.match(r"^@@ -\d+(?:,\d+)? \+(\d+)", raw_line) 

354 if hunk_match: 

355 current_line = int(hunk_match.group(1)) 

356 continue 

357 if raw_line.startswith("-"): 

358 # Deleted line — doesn't advance right-side counter 

359 continue 

360 if raw_line.startswith("+"): 

361 lines.add(current_line) 

362 # Both context lines and additions advance the right-side counter 

363 current_line += 1 

364 return lines 

365 

366 

367def _detect_pr_number() -> int | None: 

368 """Detect PR number from the GitHub event payload or ``GITHUB_REF``. 

369 

370 Tries ``GITHUB_EVENT_PATH`` first (works for ``pull_request_target`` 

371 workflows), then falls back to parsing ``GITHUB_REF`` 

372 (``refs/pull/<number>/merge``). 

373 

374 Returns: 

375 PR number if detected, else None. 

376 """ 

377 # Try event payload first (covers pull_request_target workflows) 

378 event_path = os.environ.get("GITHUB_EVENT_PATH", "") 

379 if event_path: 

380 try: 

381 with open(event_path) as f: 

382 event = json.load(f) 

383 number = event.get("number") 

384 if isinstance(number, int) and number > 0: 

385 return number 

386 except (OSError, json.JSONDecodeError, TypeError, AttributeError): 

387 pass 

388 

389 # Fall back to GITHUB_REF parsing 

390 ref = os.environ.get("GITHUB_REF", "") 

391 if ref.startswith("refs/pull/") and ref.endswith("/merge"): 

392 try: 

393 return int(ref.split("/")[2]) 

394 except (IndexError, ValueError): 

395 return None 

396 return None 

397 

398 

399def _format_summary_comment(summary: AISummary) -> str: 

400 """Format an AI summary as a Markdown PR comment. 

401 

402 Args: 

403 summary: AI summary to format. 

404 

405 Returns: 

406 Markdown-formatted comment body. 

407 """ 

408 lines: list[str] = [ 

409 "## Lintro AI Summary", 

410 "", 

411 summary.overview, 

412 ] 

413 

414 if summary.key_patterns: 

415 lines.append("") 

416 lines.append("### Key Patterns") 

417 lines.extend(f"- {pattern}" for pattern in summary.key_patterns) 

418 

419 if summary.priority_actions: 

420 lines.append("") 

421 lines.append("### Priority Actions") 

422 lines.extend( 

423 f"{i}. {action}" for i, action in enumerate(summary.priority_actions, 1) 

424 ) 

425 

426 if summary.triage_suggestions: 

427 lines.append("") 

428 lines.append("### Triage — Consider Suppressing") 

429 lines.extend(f"- {suggestion}" for suggestion in summary.triage_suggestions) 

430 

431 if summary.estimated_effort: 

432 lines.append("") 

433 lines.append(f"*Estimated effort: {summary.estimated_effort}*") 

434 

435 return "\n".join(lines) 

436 

437 

438def _format_inline_comment(suggestion: AIFixSuggestion) -> str: 

439 """Format an AI fix suggestion as an inline review comment. 

440 

441 Args: 

442 suggestion: Fix suggestion to format. 

443 

444 Returns: 

445 Markdown-formatted inline comment body. 

446 """ 

447 lines: list[str] = [] 

448 

449 code_label = f"**{suggestion.code}**" if suggestion.code else "" 

450 tool_label = f" ({suggestion.tool_name})" if suggestion.tool_name else "" 

451 if code_label: 

452 lines.append(f"{code_label}{tool_label}") 

453 lines.append("") 

454 

455 if suggestion.explanation: 

456 lines.append(suggestion.explanation) 

457 lines.append("") 

458 

459 if suggestion.diff: 

460 sanitized = suggestion.diff.replace("```", "``\u200b`") 

461 lines.append("```diff") 

462 lines.append(sanitized) 

463 lines.append("```") 

464 lines.append("") 

465 

466 if suggestion.suggested_code: 

467 sanitized_code = suggestion.suggested_code.replace("```", "``\u200b`") 

468 lines.append("```suggestion") 

469 lines.append(sanitized_code) 

470 lines.append("```") 

471 lines.append("") 

472 

473 confidence = suggestion.confidence or ConfidenceLevel.MEDIUM 

474 risk = suggestion.risk_level or "unknown" 

475 lines.append(f"Confidence: {confidence} | Risk: {risk}") 

476 

477 return "\n".join(lines)