Coverage for lintro / utils / output / manager.py: 99%

110 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Output manager for timestamped run directories. 

2 

3This module provides the OutputManager class for managing output 

4directories and result files for Lintro runs. 

5""" 

6 

7from __future__ import annotations 

8 

9import csv 

10import datetime 

11import json 

12import os 

13import shutil 

14import tempfile 

15from pathlib import Path 

16from typing import TYPE_CHECKING 

17 

18from loguru import logger 

19 

20from lintro.utils.output.constants import ( 

21 DEFAULT_BASE_DIR, 

22 DEFAULT_KEEP_LAST, 

23 DEFAULT_RUN_PREFIX, 

24 DEFAULT_TEMP_PREFIX, 

25 DEFAULT_TIMESTAMP_FORMAT, 

26) 

27from lintro.utils.output.helpers import html_escape, markdown_escape 

28 

29if TYPE_CHECKING: 

30 from lintro.models.core.tool_result import ToolResult 

31 

32 

33class OutputManager: 

34 """Manages output directories and result files for Lintro runs. 

35 

36 This class creates a timestamped directory under .lintro/run-{timestamp}/ 

37 and provides methods to write all required output formats. 

38 """ 

39 

40 def __init__( 

41 self, 

42 base_dir: str = DEFAULT_BASE_DIR, 

43 keep_last: int = DEFAULT_KEEP_LAST, 

44 ) -> None: 

45 """Initialize the OutputManager. 

46 

47 Args: 

48 base_dir: str: Base directory for output (default: .lintro). 

49 keep_last: int: Number of runs to keep (default: 10). 

50 """ 

51 # Allow override via environment variable 

52 env_base_dir: str | None = os.environ.get("LINTRO_LOG_DIR") 

53 if env_base_dir: 

54 self.base_dir = Path(env_base_dir) 

55 else: 

56 self.base_dir = Path(base_dir) 

57 self.keep_last = keep_last 

58 self.run_dir = self._create_run_dir() 

59 

60 def _create_run_dir(self) -> Path: 

61 """Create a new timestamped run directory. 

62 

63 Returns: 

64 Path: Path to the created run directory. 

65 """ 

66 timestamp: str = datetime.datetime.now().strftime(DEFAULT_TIMESTAMP_FORMAT) 

67 run_dir: Path = self.base_dir / f"{DEFAULT_RUN_PREFIX}{timestamp}" 

68 try: 

69 run_dir.mkdir(parents=True, exist_ok=True) 

70 except PermissionError: 

71 # Fallback to temp directory if not writable 

72 temp_base: Path = Path(tempfile.gettempdir()) / DEFAULT_TEMP_PREFIX 

73 run_dir = temp_base / f"{DEFAULT_RUN_PREFIX}{timestamp}" 

74 run_dir.mkdir(parents=True, exist_ok=True) 

75 logger.warning( 

76 f"Cannot write to {self.base_dir} (permission denied), " 

77 f"using fallback: {run_dir}", 

78 ) 

79 return run_dir 

80 

81 def write_console_log( 

82 self, 

83 content: str, 

84 ) -> None: 

85 """Write the console log to console.log in the run directory. 

86 

87 Args: 

88 content: str: The console output as a string. 

89 """ 

90 (self.run_dir / "console.log").write_text(content, encoding="utf-8") 

91 

92 def write_json( 

93 self, 

94 data: object, 

95 filename: str = "results.json", 

96 ) -> None: 

97 """Write data as JSON to the run directory. 

98 

99 Args: 

100 data: object: The data to serialize as JSON. 

101 filename: str: The output filename (default: results.json). 

102 """ 

103 with open(self.run_dir / filename, "w", encoding="utf-8") as f: 

104 json.dump(data, f, indent=2, ensure_ascii=False) 

105 

106 def write_markdown( 

107 self, 

108 content: str, 

109 filename: str = "report.md", 

110 ) -> None: 

111 """Write Markdown content to the run directory. 

112 

113 Args: 

114 content: str: Markdown content as a string. 

115 filename: str: The output filename (default: report.md). 

116 """ 

117 (self.run_dir / filename).write_text(content, encoding="utf-8") 

118 

119 def write_html( 

120 self, 

121 content: str, 

122 filename: str = "report.html", 

123 ) -> None: 

124 """Write HTML content to the run directory. 

125 

126 Args: 

127 content: str: HTML content as a string. 

128 filename: str: The output filename (default: report.html). 

129 """ 

130 (self.run_dir / filename).write_text(content, encoding="utf-8") 

131 

132 def write_csv( 

133 self, 

134 rows: list[list[str]], 

135 header: list[str], 

136 filename: str = "summary.csv", 

137 ) -> None: 

138 """Write CSV data to the run directory. 

139 

140 Args: 

141 rows: list[list[str]]: List of rows (each row is a list of strings). 

142 header: list[str]: List of column headers. 

143 filename: str: The output filename (default: summary.csv). 

144 """ 

145 with open(self.run_dir / filename, "w", encoding="utf-8", newline="") as f: 

146 writer = csv.writer(f) 

147 writer.writerow(header) 

148 writer.writerows(rows) 

149 

150 def write_reports_from_results( 

151 self, 

152 results: list[ToolResult], 

153 ) -> None: 

154 """Generate and write Markdown, HTML, and CSV reports from tool results. 

155 

156 Args: 

157 results: list["ToolResult"]: List of ToolResult objects from a Lintro run. 

158 """ 

159 self._write_markdown_report(results=results) 

160 self._write_html_report(results=results) 

161 self._write_csv_summary(results=results) 

162 

163 def _write_markdown_report( 

164 self, 

165 results: list[ToolResult], 

166 ) -> None: 

167 """Write a Markdown report summarizing all tool results and issues. 

168 

169 Args: 

170 results: list["ToolResult"]: List of ToolResult objects from the linting 

171 run. 

172 """ 

173 lines: list[str] = ["# Lintro Report", ""] 

174 lines.append("## Summary\n") 

175 lines.append("| Tool | Issues |") 

176 lines.append("|------|--------|") 

177 for r in results: 

178 lines.append(f"| {r.name} | {r.issues_count} |") 

179 lines.append("") 

180 for r in results: 

181 lines.append(f"### {r.name} ({r.issues_count} issues)") 

182 if hasattr(r, "issues") and r.issues: 

183 lines.append("| File | Line | Code | Message |") 

184 lines.append("|------|------|------|---------|") 

185 for issue in r.issues: 

186 file: str = markdown_escape(getattr(issue, "file", "") or "") 

187 line = getattr(issue, "line", None) or 0 

188 code: str = markdown_escape(getattr(issue, "code", "") or "") 

189 msg: str = markdown_escape(getattr(issue, "message", "") or "") 

190 lines.append(f"| {file} | {line} | {code} | {msg} |") 

191 lines.append("") 

192 else: 

193 lines.append("No issues found.\n") 

194 self.write_markdown(content="\n".join(lines)) 

195 

196 def _write_html_report( 

197 self, 

198 results: list[ToolResult], 

199 ) -> None: 

200 """Write an HTML report summarizing all tool results and issues. 

201 

202 Args: 

203 results: list["ToolResult"]: List of ToolResult objects from the linting 

204 run. 

205 """ 

206 html_content: list[str] = [ 

207 "<html><head><title>Lintro Report</title></head><body>", 

208 ] 

209 html_content.append("<h1>Lintro Report</h1>") 

210 html_content.append("<h2>Summary</h2>") 

211 html_content.append("<table border='1'><tr><th>Tool</th><th>Issues</th></tr>") 

212 for r in results: 

213 html_content.append( 

214 f"<tr><td>{html_escape(r.name)}</td><td>{r.issues_count}</td></tr>", 

215 ) 

216 html_content.append("</table>") 

217 for r in results: 

218 html_content.append( 

219 f"<h3>{html_escape(r.name)} ({r.issues_count} issues)</h3>", 

220 ) 

221 if hasattr(r, "issues") and r.issues: 

222 html_content.append( 

223 "<table border='1'><tr><th>File</th><th>Line</th><th>Code</th>" 

224 "<th>Message</th></tr>", 

225 ) 

226 for issue in r.issues: 

227 file: str = html_escape(getattr(issue, "file", "") or "") 

228 line = getattr(issue, "line", None) or 0 

229 code: str = html_escape(getattr(issue, "code", "") or "") 

230 msg: str = html_escape(getattr(issue, "message", "") or "") 

231 html_content.append( 

232 f"<tr><td>{file}</td><td>{line}</td><td>{code}</td>" 

233 f"<td>{msg}</td></tr>", 

234 ) 

235 html_content.append("</table>") 

236 else: 

237 html_content.append("<p>No issues found.</p>") 

238 html_content.append("</body></html>") 

239 self.write_html(content="\n".join(html_content)) 

240 

241 def _write_csv_summary( 

242 self, 

243 results: list[ToolResult], 

244 ) -> None: 

245 """Write a CSV summary of all tool results and issues. 

246 

247 Args: 

248 results: list["ToolResult"]: List of ToolResult objects from the linting 

249 run. 

250 """ 

251 rows: list[list[str]] = [] 

252 header: list[str] = ["tool", "issues_count", "file", "line", "code", "message"] 

253 for r in results: 

254 if hasattr(r, "issues") and r.issues: 

255 for issue in r.issues: 

256 rows.append( 

257 [ 

258 r.name, 

259 str(r.issues_count), 

260 getattr(issue, "file", "") or "", 

261 str(getattr(issue, "line", None) or 0), 

262 getattr(issue, "code", "") or "", 

263 getattr(issue, "message", "") or "", 

264 ], 

265 ) 

266 else: 

267 rows.append([r.name, str(r.issues_count), "", "", "", ""]) 

268 self.write_csv(rows=rows, header=header) 

269 

270 def cleanup_old_runs(self) -> None: 

271 """Remove old run directories, keeping only the most recent N runs.""" 

272 if not self.base_dir.exists(): 

273 return 

274 runs: list[Path] = sorted( 

275 [ 

276 d 

277 for d in self.base_dir.iterdir() 

278 if d.is_dir() and d.name.startswith(DEFAULT_RUN_PREFIX) 

279 ], 

280 key=lambda d: d.name, 

281 reverse=True, 

282 ) 

283 for old_run in runs[self.keep_last :]: 

284 shutil.rmtree(old_run) 

285 

286 def get_run_dir(self) -> Path: 

287 """Get the current run directory. 

288 

289 Returns: 

290 Path: Path to the current run directory. 

291 """ 

292 return self.run_dir