Coverage for lintro / utils / output / file_writer.py: 93%
149 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""File writing and output formatting functions.
3This module provides functions for writing tool results to files
4and formatting tool output for display.
5"""
7# mypy: ignore-errors
8# Note: mypy errors are suppressed because lintro runs mypy from file's directory,
9# breaking package resolution. When run properly (mypy lintro/...), this file passes.
11from __future__ import annotations
13import csv
14import datetime
15import html
16import json
17from collections.abc import Sequence
18from pathlib import Path
19from typing import TYPE_CHECKING, Any
21# Import parser_registration to auto-register all parsers
22import lintro.utils.output.parser_registration # noqa: F401
23from lintro.enums.action import Action
24from lintro.enums.output_format import OutputFormat, normalize_output_format
25from lintro.enums.tool_name import ToolName
26from lintro.formatters.formatter import format_issues, format_issues_with_sections
27from lintro.parsers.base_issue import BaseIssue
28from lintro.utils.output.helpers import sanitize_csv_value
29from lintro.utils.output.parser_registration import ParserError
30from lintro.utils.output.parser_registry import ParserRegistry
32try:
33 import tabulate as _tabulate_module # noqa: F401
35 TABULATE_AVAILABLE = True
36 del _tabulate_module
37except ImportError:
38 TABULATE_AVAILABLE = False
40if TYPE_CHECKING:
41 from lintro.models.core.tool_result import ToolResult
44def build_doc_url_map(all_results: Sequence[Any]) -> dict[str, str]:
45 """Build a mapping of rule codes to documentation URLs from results.
47 Iterates all issues across results and collects non-empty doc_url
48 values keyed by their code. Used by SARIF output to populate helpUri.
50 Args:
51 all_results: Sequence of ToolResult objects.
53 Returns:
54 Mapping of rule codes to documentation URLs (may be empty).
55 """
56 doc_url_map: dict[str, str] = {}
57 for result in all_results:
58 if hasattr(result, "issues") and result.issues:
59 for issue in result.issues:
60 code = str(getattr(issue, "code", "") or "")
61 url = str(getattr(issue, "doc_url", "") or "")
62 if code and url:
63 doc_url_map[code] = url
64 return doc_url_map
67def write_output_file(
68 *,
69 output_path: str,
70 output_format: OutputFormat,
71 all_results: list[ToolResult],
72 action: Action,
73 total_issues: int,
74 total_fixed: int,
75) -> None:
76 """Write results to user-specified output file.
78 Args:
79 output_path: str: Path to the output file.
80 output_format: OutputFormat: Format for the output.
81 all_results: list: List of ToolResult objects.
82 action: Action: The action performed (check, fmt, test).
83 total_issues: int: Total number of issues found.
84 total_fixed: int: Total number of issues fixed.
85 """
86 output_file = Path(output_path)
87 output_file.parent.mkdir(parents=True, exist_ok=True)
89 if output_format == OutputFormat.JSON:
90 # Build JSON structure similar to stdout JSON mode
91 json_data: dict[str, Any] = {
92 "timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
93 "action": action.value,
94 "summary": {
95 "total_issues": total_issues,
96 "total_fixed": total_fixed,
97 "tools_run": len(all_results),
98 },
99 "results": [],
100 }
101 for result in all_results:
102 result_data = {
103 "tool": result.name,
104 "success": getattr(result, "success", True),
105 "issues_count": getattr(result, "issues_count", 0),
106 "output": getattr(result, "output", ""),
107 }
108 ai_metadata = getattr(result, "ai_metadata", None)
109 if isinstance(ai_metadata, dict) and ai_metadata:
110 from lintro.ai.metadata import normalize_ai_metadata
112 normalized = normalize_ai_metadata(ai_metadata)
113 if normalized:
114 result_data["ai_metadata"] = normalized
115 if hasattr(result, "issues") and result.issues:
116 result_data["issues"] = [
117 {
118 "file": getattr(issue, "file", "") or "",
119 "line": getattr(issue, "line", None) or 0,
120 "code": getattr(issue, "code", "") or "",
121 "message": getattr(issue, "message", "") or "",
122 **(
123 {"doc_url": issue.doc_url}
124 if getattr(issue, "doc_url", "")
125 else {}
126 ),
127 }
128 for issue in result.issues
129 ]
130 json_data["results"].append(result_data)
131 output_file.write_text(
132 json.dumps(json_data, indent=2, ensure_ascii=False),
133 encoding="utf-8",
134 )
136 elif output_format == OutputFormat.CSV:
137 # Write CSV format
138 rows: list[list[str]] = []
139 header: list[str] = [
140 "tool",
141 "issues_count",
142 "file",
143 "line",
144 "code",
145 "message",
146 "doc_url",
147 ]
148 for result in all_results:
149 if hasattr(result, "issues") and result.issues:
150 for issue in result.issues:
151 rows.append(
152 [
153 sanitize_csv_value(result.name),
154 sanitize_csv_value(
155 str(getattr(result, "issues_count", 0)),
156 ),
157 sanitize_csv_value(str(getattr(issue, "file", "") or "")),
158 sanitize_csv_value(
159 str(getattr(issue, "line", None) or 0),
160 ),
161 sanitize_csv_value(str(getattr(issue, "code", "") or "")),
162 sanitize_csv_value(
163 str(getattr(issue, "message", "") or ""),
164 ),
165 sanitize_csv_value(
166 str(getattr(issue, "doc_url", "") or ""),
167 ),
168 ],
169 )
170 else:
171 rows.append(
172 [
173 sanitize_csv_value(result.name),
174 sanitize_csv_value(str(getattr(result, "issues_count", 0))),
175 "",
176 "",
177 "",
178 "",
179 "",
180 ],
181 )
182 with output_file.open("w", encoding="utf-8", newline="") as f:
183 writer = csv.writer(f)
184 writer.writerow(header)
185 writer.writerows(rows)
187 elif output_format == OutputFormat.MARKDOWN:
188 # Write Markdown format
189 lines: list[str] = ["# Lintro Report", ""]
190 lines.append("## Summary\n")
191 lines.append("| Tool | Issues |")
192 lines.append("|------|--------|")
193 for result in all_results:
194 lines.append(f"| {result.name} | {getattr(result, 'issues_count', 0)} |")
195 lines.append("")
196 for result in all_results:
197 issues_count = getattr(result, "issues_count", 0)
198 lines.append(f"### {result.name} ({issues_count} issues)")
199 if hasattr(result, "issues") and result.issues:
200 lines.append("| File | Line | Code | Message | Docs |")
201 lines.append("|------|------|------|---------|------|")
202 for issue in result.issues:
203 file_val = str(getattr(issue, "file", "") or "").replace("|", r"\|")
204 line_val = getattr(issue, "line", None) or 0
205 code_val = str(getattr(issue, "code", "") or "").replace("|", r"\|")
206 msg_val = str(getattr(issue, "message", "") or "").replace(
207 "|",
208 r"\|",
209 )
210 doc_url = str(getattr(issue, "doc_url", "") or "")
211 doc_val = (
212 f"[docs]({doc_url})".replace("|", r"\|") if doc_url else ""
213 )
214 lines.append(
215 f"| {file_val} | {line_val} | {code_val}"
216 f" | {msg_val} | {doc_val} |",
217 )
218 lines.append("")
219 else:
220 lines.append("No issues found.\n")
221 output_file.write_text("\n".join(lines), encoding="utf-8")
223 elif output_format == OutputFormat.HTML:
224 # Write HTML format
225 html_lines: list[str] = [
226 "<html><head><title>Lintro Report</title></head><body>",
227 ]
228 html_lines.append("<h1>Lintro Report</h1>")
229 html_lines.append("<h2>Summary</h2>")
230 html_lines.append("<table border='1'><tr><th>Tool</th><th>Issues</th></tr>")
231 for result in all_results:
232 safe_name = html.escape(result.name)
233 html_lines.append(
234 f"<tr><td>{safe_name}</td>"
235 f"<td>{getattr(result, 'issues_count', 0)}</td></tr>",
236 )
237 html_lines.append("</table>")
238 for result in all_results:
239 issues_count = getattr(result, "issues_count", 0)
240 html_lines.append(
241 f"<h3>{html.escape(result.name)} ({issues_count} issues)</h3>",
242 )
243 if hasattr(result, "issues") and result.issues:
244 html_lines.append(
245 "<table border='1'><tr><th>File</th><th>Line</th>"
246 "<th>Code</th><th>Message</th><th>Docs</th></tr>",
247 )
248 for issue in result.issues:
249 f_val = html.escape(str(getattr(issue, "file", "") or ""))
250 l_val = html.escape(str(getattr(issue, "line", None) or 0))
251 c_val = html.escape(str(getattr(issue, "code", "") or ""))
252 m_val = html.escape(str(getattr(issue, "message", "") or ""))
253 doc_url = str(getattr(issue, "doc_url", "") or "")
254 d_val = (
255 f'<a href="{html.escape(doc_url, quote=True)}">docs</a>'
256 if doc_url
257 else ""
258 )
259 html_lines.append(
260 f"<tr><td>{f_val}</td><td>{l_val}</td>"
261 f"<td>{c_val}</td><td>{m_val}</td><td>{d_val}</td></tr>",
262 )
263 html_lines.append("</table>")
264 else:
265 html_lines.append("<p>No issues found.</p>")
266 html_lines.append("</body></html>")
267 output_file.write_text("\n".join(html_lines), encoding="utf-8")
269 elif output_format == OutputFormat.SARIF:
270 from lintro.ai.output.sarif import write_sarif
271 from lintro.ai.output.sarif_bridge import (
272 suggestions_from_results,
273 summary_from_results,
274 )
276 suggestions = suggestions_from_results(all_results)
277 summary = summary_from_results(all_results)
279 write_sarif(
280 suggestions,
281 summary,
282 output_path=output_file,
283 doc_urls=build_doc_url_map(all_results) or None,
284 )
286 else:
287 # Plain or Grid format - write formatted text output
288 lines = [f"Lintro {action.value.capitalize()} Report", "=" * 40, ""]
289 for result in all_results:
290 issues_count = getattr(result, "issues_count", 0)
291 lines.append(f"{result.name}: {issues_count} issues")
292 output_text = getattr(result, "output", "")
293 if output_text and output_text.strip():
294 lines.append(output_text.strip())
295 lines.append("")
296 lines.append(f"Total Issues: {total_issues}")
297 if action == Action.FIX:
298 lines.append(f"Total Fixed: {total_fixed}")
299 output_file.write_text("\n".join(lines), encoding="utf-8")
302def format_tool_output(
303 tool_name: str,
304 output: str,
305 output_format: str | OutputFormat = "grid",
306 issues: Sequence[BaseIssue] | None = None,
307) -> str:
308 """Format tool output using the specified format.
310 Args:
311 tool_name: str: Name of the tool that generated the output.
312 output: str: Raw output from the tool.
313 output_format: str: Output format (plain, grid, markdown, html, json, csv).
314 issues: Sequence[BaseIssue] | None: List of parsed issue objects (optional).
316 Returns:
317 str: Formatted output string.
318 """
319 output_format = normalize_output_format(output_format)
321 # Pytest output is already formatted by build_output_with_failures
322 # in pytest_output_processor.py, so return it directly
323 if tool_name == ToolName.PYTEST:
324 return output if output else ""
326 # If parsed issues are provided, use the unified formatter
327 if issues:
328 # Get fixability predicate from registry (O(1) lookup)
329 is_fixable = ParserRegistry.get_fixability_predicate(tool_name)
331 if output_format != "json" and is_fixable is not None and TABULATE_AVAILABLE:
332 # Use unified formatter with built-in fixable grouping
333 return format_issues_with_sections(
334 issues=issues,
335 output_format=output_format,
336 group_by_fixable=True,
337 tool_name=tool_name,
338 )
340 # Use unified formatter for all issues
341 return format_issues(issues=issues, output_format=output_format)
343 if not output or not output.strip():
344 return "No issues found."
346 # Try to parse the output using registered parser (O(1) lookup)
347 # Note: pytest output is already formatted by build_output_with_failures
348 # in pytest_output_processor.py, so we skip re-parsing here
349 try:
350 parsed_issues = ParserRegistry.parse(tool_name, output)
351 except ParserError as e:
352 # Parsing failed - return error message with raw output for debugging
353 return f"Error: {e}\n\nRaw output:\n{output}"
355 if parsed_issues:
356 return format_issues(issues=parsed_issues, output_format=output_format)
358 # Fallback: return the raw output
359 return output