Coverage for lintro / utils / output / manager.py: 99%
110 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Output manager for timestamped run directories.
3This module provides the OutputManager class for managing output
4directories and result files for Lintro runs.
5"""
7from __future__ import annotations
9import csv
10import datetime
11import json
12import os
13import shutil
14import tempfile
15from pathlib import Path
16from typing import TYPE_CHECKING
18from loguru import logger
20from lintro.utils.output.constants import (
21 DEFAULT_BASE_DIR,
22 DEFAULT_KEEP_LAST,
23 DEFAULT_RUN_PREFIX,
24 DEFAULT_TEMP_PREFIX,
25 DEFAULT_TIMESTAMP_FORMAT,
26)
27from lintro.utils.output.helpers import html_escape, markdown_escape
29if TYPE_CHECKING:
30 from lintro.models.core.tool_result import ToolResult
33class OutputManager:
34 """Manages output directories and result files for Lintro runs.
36 This class creates a timestamped directory under .lintro/run-{timestamp}/
37 and provides methods to write all required output formats.
38 """
40 def __init__(
41 self,
42 base_dir: str = DEFAULT_BASE_DIR,
43 keep_last: int = DEFAULT_KEEP_LAST,
44 ) -> None:
45 """Initialize the OutputManager.
47 Args:
48 base_dir: str: Base directory for output (default: .lintro).
49 keep_last: int: Number of runs to keep (default: 10).
50 """
51 # Allow override via environment variable
52 env_base_dir: str | None = os.environ.get("LINTRO_LOG_DIR")
53 if env_base_dir:
54 self.base_dir = Path(env_base_dir)
55 else:
56 self.base_dir = Path(base_dir)
57 self.keep_last = keep_last
58 self.run_dir = self._create_run_dir()
60 def _create_run_dir(self) -> Path:
61 """Create a new timestamped run directory.
63 Returns:
64 Path: Path to the created run directory.
65 """
66 timestamp: str = datetime.datetime.now().strftime(DEFAULT_TIMESTAMP_FORMAT)
67 run_dir: Path = self.base_dir / f"{DEFAULT_RUN_PREFIX}{timestamp}"
68 try:
69 run_dir.mkdir(parents=True, exist_ok=True)
70 except PermissionError:
71 # Fallback to temp directory if not writable
72 temp_base: Path = Path(tempfile.gettempdir()) / DEFAULT_TEMP_PREFIX
73 run_dir = temp_base / f"{DEFAULT_RUN_PREFIX}{timestamp}"
74 run_dir.mkdir(parents=True, exist_ok=True)
75 logger.warning(
76 f"Cannot write to {self.base_dir} (permission denied), "
77 f"using fallback: {run_dir}",
78 )
79 return run_dir
81 def write_console_log(
82 self,
83 content: str,
84 ) -> None:
85 """Write the console log to console.log in the run directory.
87 Args:
88 content: str: The console output as a string.
89 """
90 (self.run_dir / "console.log").write_text(content, encoding="utf-8")
92 def write_json(
93 self,
94 data: object,
95 filename: str = "results.json",
96 ) -> None:
97 """Write data as JSON to the run directory.
99 Args:
100 data: object: The data to serialize as JSON.
101 filename: str: The output filename (default: results.json).
102 """
103 with open(self.run_dir / filename, "w", encoding="utf-8") as f:
104 json.dump(data, f, indent=2, ensure_ascii=False)
106 def write_markdown(
107 self,
108 content: str,
109 filename: str = "report.md",
110 ) -> None:
111 """Write Markdown content to the run directory.
113 Args:
114 content: str: Markdown content as a string.
115 filename: str: The output filename (default: report.md).
116 """
117 (self.run_dir / filename).write_text(content, encoding="utf-8")
119 def write_html(
120 self,
121 content: str,
122 filename: str = "report.html",
123 ) -> None:
124 """Write HTML content to the run directory.
126 Args:
127 content: str: HTML content as a string.
128 filename: str: The output filename (default: report.html).
129 """
130 (self.run_dir / filename).write_text(content, encoding="utf-8")
132 def write_csv(
133 self,
134 rows: list[list[str]],
135 header: list[str],
136 filename: str = "summary.csv",
137 ) -> None:
138 """Write CSV data to the run directory.
140 Args:
141 rows: list[list[str]]: List of rows (each row is a list of strings).
142 header: list[str]: List of column headers.
143 filename: str: The output filename (default: summary.csv).
144 """
145 with open(self.run_dir / filename, "w", encoding="utf-8", newline="") as f:
146 writer = csv.writer(f)
147 writer.writerow(header)
148 writer.writerows(rows)
150 def write_reports_from_results(
151 self,
152 results: list[ToolResult],
153 ) -> None:
154 """Generate and write Markdown, HTML, and CSV reports from tool results.
156 Args:
157 results: list["ToolResult"]: List of ToolResult objects from a Lintro run.
158 """
159 self._write_markdown_report(results=results)
160 self._write_html_report(results=results)
161 self._write_csv_summary(results=results)
163 def _write_markdown_report(
164 self,
165 results: list[ToolResult],
166 ) -> None:
167 """Write a Markdown report summarizing all tool results and issues.
169 Args:
170 results: list["ToolResult"]: List of ToolResult objects from the linting
171 run.
172 """
173 lines: list[str] = ["# Lintro Report", ""]
174 lines.append("## Summary\n")
175 lines.append("| Tool | Issues |")
176 lines.append("|------|--------|")
177 for r in results:
178 lines.append(f"| {r.name} | {r.issues_count} |")
179 lines.append("")
180 for r in results:
181 lines.append(f"### {r.name} ({r.issues_count} issues)")
182 if hasattr(r, "issues") and r.issues:
183 lines.append("| File | Line | Code | Message |")
184 lines.append("|------|------|------|---------|")
185 for issue in r.issues:
186 file: str = markdown_escape(getattr(issue, "file", "") or "")
187 line = getattr(issue, "line", None) or 0
188 code: str = markdown_escape(getattr(issue, "code", "") or "")
189 msg: str = markdown_escape(getattr(issue, "message", "") or "")
190 lines.append(f"| {file} | {line} | {code} | {msg} |")
191 lines.append("")
192 else:
193 lines.append("No issues found.\n")
194 self.write_markdown(content="\n".join(lines))
196 def _write_html_report(
197 self,
198 results: list[ToolResult],
199 ) -> None:
200 """Write an HTML report summarizing all tool results and issues.
202 Args:
203 results: list["ToolResult"]: List of ToolResult objects from the linting
204 run.
205 """
206 html_content: list[str] = [
207 "<html><head><title>Lintro Report</title></head><body>",
208 ]
209 html_content.append("<h1>Lintro Report</h1>")
210 html_content.append("<h2>Summary</h2>")
211 html_content.append("<table border='1'><tr><th>Tool</th><th>Issues</th></tr>")
212 for r in results:
213 html_content.append(
214 f"<tr><td>{html_escape(r.name)}</td><td>{r.issues_count}</td></tr>",
215 )
216 html_content.append("</table>")
217 for r in results:
218 html_content.append(
219 f"<h3>{html_escape(r.name)} ({r.issues_count} issues)</h3>",
220 )
221 if hasattr(r, "issues") and r.issues:
222 html_content.append(
223 "<table border='1'><tr><th>File</th><th>Line</th><th>Code</th>"
224 "<th>Message</th></tr>",
225 )
226 for issue in r.issues:
227 file: str = html_escape(getattr(issue, "file", "") or "")
228 line = getattr(issue, "line", None) or 0
229 code: str = html_escape(getattr(issue, "code", "") or "")
230 msg: str = html_escape(getattr(issue, "message", "") or "")
231 html_content.append(
232 f"<tr><td>{file}</td><td>{line}</td><td>{code}</td>"
233 f"<td>{msg}</td></tr>",
234 )
235 html_content.append("</table>")
236 else:
237 html_content.append("<p>No issues found.</p>")
238 html_content.append("</body></html>")
239 self.write_html(content="\n".join(html_content))
241 def _write_csv_summary(
242 self,
243 results: list[ToolResult],
244 ) -> None:
245 """Write a CSV summary of all tool results and issues.
247 Args:
248 results: list["ToolResult"]: List of ToolResult objects from the linting
249 run.
250 """
251 rows: list[list[str]] = []
252 header: list[str] = ["tool", "issues_count", "file", "line", "code", "message"]
253 for r in results:
254 if hasattr(r, "issues") and r.issues:
255 for issue in r.issues:
256 rows.append(
257 [
258 r.name,
259 str(r.issues_count),
260 getattr(issue, "file", "") or "",
261 str(getattr(issue, "line", None) or 0),
262 getattr(issue, "code", "") or "",
263 getattr(issue, "message", "") or "",
264 ],
265 )
266 else:
267 rows.append([r.name, str(r.issues_count), "", "", "", ""])
268 self.write_csv(rows=rows, header=header)
270 def cleanup_old_runs(self) -> None:
271 """Remove old run directories, keeping only the most recent N runs."""
272 if not self.base_dir.exists():
273 return
274 runs: list[Path] = sorted(
275 [
276 d
277 for d in self.base_dir.iterdir()
278 if d.is_dir() and d.name.startswith(DEFAULT_RUN_PREFIX)
279 ],
280 key=lambda d: d.name,
281 reverse=True,
282 )
283 for old_run in runs[self.keep_last :]:
284 shutil.rmtree(old_run)
286 def get_run_dir(self) -> Path:
287 """Get the current run directory.
289 Returns:
290 Path: Path to the current run directory.
291 """
292 return self.run_dir