Coverage for lintro / utils / output / file_writer.py: 93%

149 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""File writing and output formatting functions. 

2 

3This module provides functions for writing tool results to files 

4and formatting tool output for display. 

5""" 

6 

7# mypy: ignore-errors 

8# Note: mypy errors are suppressed because lintro runs mypy from file's directory, 

9# breaking package resolution. When run properly (mypy lintro/...), this file passes. 

10 

11from __future__ import annotations 

12 

13import csv 

14import datetime 

15import html 

16import json 

17from collections.abc import Sequence 

18from pathlib import Path 

19from typing import TYPE_CHECKING, Any 

20 

21# Import parser_registration to auto-register all parsers 

22import lintro.utils.output.parser_registration # noqa: F401 

23from lintro.enums.action import Action 

24from lintro.enums.output_format import OutputFormat, normalize_output_format 

25from lintro.enums.tool_name import ToolName 

26from lintro.formatters.formatter import format_issues, format_issues_with_sections 

27from lintro.parsers.base_issue import BaseIssue 

28from lintro.utils.output.helpers import sanitize_csv_value 

29from lintro.utils.output.parser_registration import ParserError 

30from lintro.utils.output.parser_registry import ParserRegistry 

31 

32try: 

33 import tabulate as _tabulate_module # noqa: F401 

34 

35 TABULATE_AVAILABLE = True 

36 del _tabulate_module 

37except ImportError: 

38 TABULATE_AVAILABLE = False 

39 

40if TYPE_CHECKING: 

41 from lintro.models.core.tool_result import ToolResult 

42 

43 

44def build_doc_url_map(all_results: Sequence[Any]) -> dict[str, str]: 

45 """Build a mapping of rule codes to documentation URLs from results. 

46 

47 Iterates all issues across results and collects non-empty doc_url 

48 values keyed by their code. Used by SARIF output to populate helpUri. 

49 

50 Args: 

51 all_results: Sequence of ToolResult objects. 

52 

53 Returns: 

54 Mapping of rule codes to documentation URLs (may be empty). 

55 """ 

56 doc_url_map: dict[str, str] = {} 

57 for result in all_results: 

58 if hasattr(result, "issues") and result.issues: 

59 for issue in result.issues: 

60 code = str(getattr(issue, "code", "") or "") 

61 url = str(getattr(issue, "doc_url", "") or "") 

62 if code and url: 

63 doc_url_map[code] = url 

64 return doc_url_map 

65 

66 

67def write_output_file( 

68 *, 

69 output_path: str, 

70 output_format: OutputFormat, 

71 all_results: list[ToolResult], 

72 action: Action, 

73 total_issues: int, 

74 total_fixed: int, 

75) -> None: 

76 """Write results to user-specified output file. 

77 

78 Args: 

79 output_path: str: Path to the output file. 

80 output_format: OutputFormat: Format for the output. 

81 all_results: list: List of ToolResult objects. 

82 action: Action: The action performed (check, fmt, test). 

83 total_issues: int: Total number of issues found. 

84 total_fixed: int: Total number of issues fixed. 

85 """ 

86 output_file = Path(output_path) 

87 output_file.parent.mkdir(parents=True, exist_ok=True) 

88 

89 if output_format == OutputFormat.JSON: 

90 # Build JSON structure similar to stdout JSON mode 

91 json_data: dict[str, Any] = { 

92 "timestamp": datetime.datetime.now(datetime.UTC).isoformat(), 

93 "action": action.value, 

94 "summary": { 

95 "total_issues": total_issues, 

96 "total_fixed": total_fixed, 

97 "tools_run": len(all_results), 

98 }, 

99 "results": [], 

100 } 

101 for result in all_results: 

102 result_data = { 

103 "tool": result.name, 

104 "success": getattr(result, "success", True), 

105 "issues_count": getattr(result, "issues_count", 0), 

106 "output": getattr(result, "output", ""), 

107 } 

108 ai_metadata = getattr(result, "ai_metadata", None) 

109 if isinstance(ai_metadata, dict) and ai_metadata: 

110 from lintro.ai.metadata import normalize_ai_metadata 

111 

112 normalized = normalize_ai_metadata(ai_metadata) 

113 if normalized: 

114 result_data["ai_metadata"] = normalized 

115 if hasattr(result, "issues") and result.issues: 

116 result_data["issues"] = [ 

117 { 

118 "file": getattr(issue, "file", "") or "", 

119 "line": getattr(issue, "line", None) or 0, 

120 "code": getattr(issue, "code", "") or "", 

121 "message": getattr(issue, "message", "") or "", 

122 **( 

123 {"doc_url": issue.doc_url} 

124 if getattr(issue, "doc_url", "") 

125 else {} 

126 ), 

127 } 

128 for issue in result.issues 

129 ] 

130 json_data["results"].append(result_data) 

131 output_file.write_text( 

132 json.dumps(json_data, indent=2, ensure_ascii=False), 

133 encoding="utf-8", 

134 ) 

135 

136 elif output_format == OutputFormat.CSV: 

137 # Write CSV format 

138 rows: list[list[str]] = [] 

139 header: list[str] = [ 

140 "tool", 

141 "issues_count", 

142 "file", 

143 "line", 

144 "code", 

145 "message", 

146 "doc_url", 

147 ] 

148 for result in all_results: 

149 if hasattr(result, "issues") and result.issues: 

150 for issue in result.issues: 

151 rows.append( 

152 [ 

153 sanitize_csv_value(result.name), 

154 sanitize_csv_value( 

155 str(getattr(result, "issues_count", 0)), 

156 ), 

157 sanitize_csv_value(str(getattr(issue, "file", "") or "")), 

158 sanitize_csv_value( 

159 str(getattr(issue, "line", None) or 0), 

160 ), 

161 sanitize_csv_value(str(getattr(issue, "code", "") or "")), 

162 sanitize_csv_value( 

163 str(getattr(issue, "message", "") or ""), 

164 ), 

165 sanitize_csv_value( 

166 str(getattr(issue, "doc_url", "") or ""), 

167 ), 

168 ], 

169 ) 

170 else: 

171 rows.append( 

172 [ 

173 sanitize_csv_value(result.name), 

174 sanitize_csv_value(str(getattr(result, "issues_count", 0))), 

175 "", 

176 "", 

177 "", 

178 "", 

179 "", 

180 ], 

181 ) 

182 with output_file.open("w", encoding="utf-8", newline="") as f: 

183 writer = csv.writer(f) 

184 writer.writerow(header) 

185 writer.writerows(rows) 

186 

187 elif output_format == OutputFormat.MARKDOWN: 

188 # Write Markdown format 

189 lines: list[str] = ["# Lintro Report", ""] 

190 lines.append("## Summary\n") 

191 lines.append("| Tool | Issues |") 

192 lines.append("|------|--------|") 

193 for result in all_results: 

194 lines.append(f"| {result.name} | {getattr(result, 'issues_count', 0)} |") 

195 lines.append("") 

196 for result in all_results: 

197 issues_count = getattr(result, "issues_count", 0) 

198 lines.append(f"### {result.name} ({issues_count} issues)") 

199 if hasattr(result, "issues") and result.issues: 

200 lines.append("| File | Line | Code | Message | Docs |") 

201 lines.append("|------|------|------|---------|------|") 

202 for issue in result.issues: 

203 file_val = str(getattr(issue, "file", "") or "").replace("|", r"\|") 

204 line_val = getattr(issue, "line", None) or 0 

205 code_val = str(getattr(issue, "code", "") or "").replace("|", r"\|") 

206 msg_val = str(getattr(issue, "message", "") or "").replace( 

207 "|", 

208 r"\|", 

209 ) 

210 doc_url = str(getattr(issue, "doc_url", "") or "") 

211 doc_val = ( 

212 f"[docs]({doc_url})".replace("|", r"\|") if doc_url else "" 

213 ) 

214 lines.append( 

215 f"| {file_val} | {line_val} | {code_val}" 

216 f" | {msg_val} | {doc_val} |", 

217 ) 

218 lines.append("") 

219 else: 

220 lines.append("No issues found.\n") 

221 output_file.write_text("\n".join(lines), encoding="utf-8") 

222 

223 elif output_format == OutputFormat.HTML: 

224 # Write HTML format 

225 html_lines: list[str] = [ 

226 "<html><head><title>Lintro Report</title></head><body>", 

227 ] 

228 html_lines.append("<h1>Lintro Report</h1>") 

229 html_lines.append("<h2>Summary</h2>") 

230 html_lines.append("<table border='1'><tr><th>Tool</th><th>Issues</th></tr>") 

231 for result in all_results: 

232 safe_name = html.escape(result.name) 

233 html_lines.append( 

234 f"<tr><td>{safe_name}</td>" 

235 f"<td>{getattr(result, 'issues_count', 0)}</td></tr>", 

236 ) 

237 html_lines.append("</table>") 

238 for result in all_results: 

239 issues_count = getattr(result, "issues_count", 0) 

240 html_lines.append( 

241 f"<h3>{html.escape(result.name)} ({issues_count} issues)</h3>", 

242 ) 

243 if hasattr(result, "issues") and result.issues: 

244 html_lines.append( 

245 "<table border='1'><tr><th>File</th><th>Line</th>" 

246 "<th>Code</th><th>Message</th><th>Docs</th></tr>", 

247 ) 

248 for issue in result.issues: 

249 f_val = html.escape(str(getattr(issue, "file", "") or "")) 

250 l_val = html.escape(str(getattr(issue, "line", None) or 0)) 

251 c_val = html.escape(str(getattr(issue, "code", "") or "")) 

252 m_val = html.escape(str(getattr(issue, "message", "") or "")) 

253 doc_url = str(getattr(issue, "doc_url", "") or "") 

254 d_val = ( 

255 f'<a href="{html.escape(doc_url, quote=True)}">docs</a>' 

256 if doc_url 

257 else "" 

258 ) 

259 html_lines.append( 

260 f"<tr><td>{f_val}</td><td>{l_val}</td>" 

261 f"<td>{c_val}</td><td>{m_val}</td><td>{d_val}</td></tr>", 

262 ) 

263 html_lines.append("</table>") 

264 else: 

265 html_lines.append("<p>No issues found.</p>") 

266 html_lines.append("</body></html>") 

267 output_file.write_text("\n".join(html_lines), encoding="utf-8") 

268 

269 elif output_format == OutputFormat.SARIF: 

270 from lintro.ai.output.sarif import write_sarif 

271 from lintro.ai.output.sarif_bridge import ( 

272 suggestions_from_results, 

273 summary_from_results, 

274 ) 

275 

276 suggestions = suggestions_from_results(all_results) 

277 summary = summary_from_results(all_results) 

278 

279 write_sarif( 

280 suggestions, 

281 summary, 

282 output_path=output_file, 

283 doc_urls=build_doc_url_map(all_results) or None, 

284 ) 

285 

286 else: 

287 # Plain or Grid format - write formatted text output 

288 lines = [f"Lintro {action.value.capitalize()} Report", "=" * 40, ""] 

289 for result in all_results: 

290 issues_count = getattr(result, "issues_count", 0) 

291 lines.append(f"{result.name}: {issues_count} issues") 

292 output_text = getattr(result, "output", "") 

293 if output_text and output_text.strip(): 

294 lines.append(output_text.strip()) 

295 lines.append("") 

296 lines.append(f"Total Issues: {total_issues}") 

297 if action == Action.FIX: 

298 lines.append(f"Total Fixed: {total_fixed}") 

299 output_file.write_text("\n".join(lines), encoding="utf-8") 

300 

301 

302def format_tool_output( 

303 tool_name: str, 

304 output: str, 

305 output_format: str | OutputFormat = "grid", 

306 issues: Sequence[BaseIssue] | None = None, 

307) -> str: 

308 """Format tool output using the specified format. 

309 

310 Args: 

311 tool_name: str: Name of the tool that generated the output. 

312 output: str: Raw output from the tool. 

313 output_format: str: Output format (plain, grid, markdown, html, json, csv). 

314 issues: Sequence[BaseIssue] | None: List of parsed issue objects (optional). 

315 

316 Returns: 

317 str: Formatted output string. 

318 """ 

319 output_format = normalize_output_format(output_format) 

320 

321 # Pytest output is already formatted by build_output_with_failures 

322 # in pytest_output_processor.py, so return it directly 

323 if tool_name == ToolName.PYTEST: 

324 return output if output else "" 

325 

326 # If parsed issues are provided, use the unified formatter 

327 if issues: 

328 # Get fixability predicate from registry (O(1) lookup) 

329 is_fixable = ParserRegistry.get_fixability_predicate(tool_name) 

330 

331 if output_format != "json" and is_fixable is not None and TABULATE_AVAILABLE: 

332 # Use unified formatter with built-in fixable grouping 

333 return format_issues_with_sections( 

334 issues=issues, 

335 output_format=output_format, 

336 group_by_fixable=True, 

337 tool_name=tool_name, 

338 ) 

339 

340 # Use unified formatter for all issues 

341 return format_issues(issues=issues, output_format=output_format) 

342 

343 if not output or not output.strip(): 

344 return "No issues found." 

345 

346 # Try to parse the output using registered parser (O(1) lookup) 

347 # Note: pytest output is already formatted by build_output_with_failures 

348 # in pytest_output_processor.py, so we skip re-parsing here 

349 try: 

350 parsed_issues = ParserRegistry.parse(tool_name, output) 

351 except ParserError as e: 

352 # Parsing failed - return error message with raw output for debugging 

353 return f"Error: {e}\n\nRaw output:\n{output}" 

354 

355 if parsed_issues: 

356 return format_issues(issues=parsed_issues, output_format=output_format) 

357 

358 # Fallback: return the raw output 

359 return output