Coverage for lintro / utils / streaming_output.py: 94%

100 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Streaming output handler for memory-efficient result processing. 

2 

3This module provides functionality to process and output tool results as they 

4arrive, instead of buffering all results in memory before output. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10from collections.abc import Callable 

11from dataclasses import dataclass, field 

12from typing import TYPE_CHECKING, Any, TextIO 

13 

14from loguru import logger 

15 

16if TYPE_CHECKING: 

17 from lintro.enums.action import Action 

18 from lintro.models.core.tool_result import ToolResult 

19 

20 

21@dataclass 

22class StreamingResultHandler: 

23 """Handle tool results as they arrive, without buffering all in memory. 

24 

25 This handler supports streaming output in various formats: 

26 - Console: Print formatted results immediately 

27 - JSONL: Write one JSON object per line 

28 - File: Write to file as results arrive 

29 

30 Attributes: 

31 output_format: Output format (grid, json, jsonl, etc.). 

32 action: The action being performed. 

33 output_file: Optional file path for output. 

34 """ 

35 

36 output_format: str 

37 action: Action 

38 output_file: str | None = None 

39 _file_handle: TextIO | None = field(default=None, init=False) 

40 _totals: dict[str, int] = field(default_factory=dict, init=False) 

41 _results_buffer: list[ToolResult] = field(default_factory=list, init=False) 

42 _first_jsonl: bool = field(default=True, init=False) 

43 

44 def __post_init__(self) -> None: 

45 """Initialize totals dictionary.""" 

46 self._totals = { 

47 "issues": 0, 

48 "fixed": 0, 

49 "remaining": 0, 

50 "tools_run": 0, 

51 "tools_failed": 0, 

52 } 

53 

54 def __enter__(self) -> StreamingResultHandler: 

55 """Open file handle if output file is specified. 

56 

57 Returns: 

58 Self for context manager protocol. 

59 """ 

60 if self.output_file: 

61 try: 

62 self._file_handle = open(self.output_file, "w", encoding="utf-8") 

63 if self.output_format.lower() == "jsonl": 

64 # JSONL format starts immediately 

65 pass 

66 elif self.output_format.lower() == "json": 

67 # For JSON array format, write opening bracket 

68 self._file_handle.write("[\n") 

69 except OSError as e: 

70 logger.warning(f"Could not open output file {self.output_file}: {e}") 

71 self._file_handle = None 

72 return self 

73 

74 def __exit__( 

75 self, 

76 exc_type: type[BaseException] | None, 

77 exc_val: BaseException | None, 

78 exc_tb: Any, 

79 ) -> None: 

80 """Close file handle. 

81 

82 Args: 

83 exc_type: Exception type if an exception was raised. 

84 exc_val: Exception value if an exception was raised. 

85 exc_tb: Exception traceback if an exception was raised. 

86 """ 

87 if self._file_handle: 

88 try: 

89 if self.output_format.lower() == "json": 

90 # Close JSON array 

91 self._file_handle.write("\n]") 

92 self._file_handle.close() 

93 except OSError as e: 

94 logger.warning(f"Error closing output file: {e}") 

95 

96 def handle_result( 

97 self, 

98 result: ToolResult, 

99 console_output_func: Callable[[str], None] | None = None, 

100 raw_output: bool = False, 

101 ) -> None: 

102 """Process a single tool result immediately. 

103 

104 Args: 

105 result: The tool result to process. 

106 console_output_func: Optional function to print to console. 

107 raw_output: Whether to use raw output instead of formatted. 

108 """ 

109 from lintro.enums.action import Action 

110 

111 # Update totals 

112 self._totals["tools_run"] += 1 

113 self._totals["issues"] += result.issues_count 

114 

115 if self.action == Action.FIX: 

116 fixed = getattr(result, "fixed_issues_count", None) 

117 remaining = getattr(result, "remaining_issues_count", None) 

118 if fixed is not None: 

119 self._totals["fixed"] += fixed 

120 if remaining is not None: 

121 self._totals["remaining"] += remaining 

122 

123 if not result.success: 

124 self._totals["tools_failed"] += 1 

125 

126 # Buffer for summary/JSON output 

127 self._results_buffer.append(result) 

128 

129 # Stream output based on format 

130 if self.output_format.lower() == "jsonl": 

131 self._write_jsonl(result) 

132 elif self.output_format.lower() == "json": 

133 self._write_json_item(result) 

134 

135 def _write_jsonl(self, result: ToolResult) -> None: 

136 """Write result as JSON Lines (one object per line). 

137 

138 Args: 

139 result: The tool result to write. 

140 """ 

141 data = self._result_to_dict(result) 

142 json_line = json.dumps(data) 

143 

144 if self._file_handle: 

145 self._file_handle.write(json_line + "\n") 

146 self._file_handle.flush() 

147 

148 def _write_json_item(self, result: ToolResult) -> None: 

149 """Write result as JSON array item. 

150 

151 Args: 

152 result: The tool result to write. 

153 """ 

154 if self._file_handle: 

155 data = self._result_to_dict(result) 

156 if not self._first_jsonl: 

157 self._file_handle.write(",\n") 

158 json_str = json.dumps(data, indent=2) 

159 # Indent each line for pretty printing in array 

160 indented = "\n".join(" " + line for line in json_str.split("\n")) 

161 self._file_handle.write(indented) 

162 self._first_jsonl = False 

163 self._file_handle.flush() 

164 

165 def _result_to_dict(self, result: ToolResult) -> dict[str, Any]: 

166 """Convert ToolResult to dictionary for JSON serialization. 

167 

168 Args: 

169 result: The tool result to convert. 

170 

171 Returns: 

172 Dictionary representation of the result. 

173 """ 

174 data: dict[str, Any] = { 

175 "tool": result.name, 

176 "success": result.success, 

177 "issues_count": result.issues_count, 

178 } 

179 

180 if result.output: 

181 data["output"] = result.output 

182 

183 if result.initial_issues_count is not None: 

184 data["initial_issues_count"] = result.initial_issues_count 

185 if result.fixed_issues_count is not None: 

186 data["fixed_issues_count"] = result.fixed_issues_count 

187 if result.remaining_issues_count is not None: 

188 data["remaining_issues_count"] = result.remaining_issues_count 

189 

190 # Include issues if available 

191 if result.issues: 

192 data["issues"] = [ 

193 { 

194 "file": issue.file, 

195 "line": issue.line, 

196 "column": issue.column, 

197 "message": issue.message, 

198 } 

199 for issue in result.issues 

200 ] 

201 

202 return data 

203 

204 def get_totals(self) -> dict[str, int]: 

205 """Get accumulated totals. 

206 

207 Returns: 

208 Dictionary with total counts. 

209 """ 

210 return self._totals.copy() 

211 

212 def get_results(self) -> list[ToolResult]: 

213 """Get buffered results for final processing. 

214 

215 Returns: 

216 List of all processed results. 

217 """ 

218 return self._results_buffer.copy() 

219 

220 def get_exit_code(self) -> int: 

221 """Calculate exit code based on results. 

222 

223 Returns: 

224 0 for success, 1 for failures. 

225 """ 

226 from lintro.enums.action import Action 

227 

228 # Any tool failure means exit code 1 

229 if self._totals["tools_failed"] > 0: 

230 return 1 

231 

232 # Check for issues based on action 

233 if self.action == Action.FIX: 

234 if self._totals["remaining"] > 0: 

235 return 1 

236 else: # check 

237 if self._totals["issues"] > 0: 

238 return 1 

239 

240 return 0 

241 

242 

243def create_streaming_handler( 

244 output_format: str, 

245 action: Action, 

246 output_file: str | None = None, 

247) -> StreamingResultHandler: 

248 """Create a streaming result handler. 

249 

250 Args: 

251 output_format: Output format (grid, json, jsonl, etc.). 

252 action: The action being performed. 

253 output_file: Optional file path for output. 

254 

255 Returns: 

256 Configured StreamingResultHandler instance. 

257 """ 

258 return StreamingResultHandler( 

259 output_format=output_format, 

260 action=action, 

261 output_file=output_file, 

262 )