Coverage for lintro / utils / streaming_output.py: 94%
100 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Streaming output handler for memory-efficient result processing.
3This module provides functionality to process and output tool results as they
4arrive, instead of buffering all results in memory before output.
5"""
7from __future__ import annotations
9import json
10from collections.abc import Callable
11from dataclasses import dataclass, field
12from typing import TYPE_CHECKING, Any, TextIO
14from loguru import logger
16if TYPE_CHECKING:
17 from lintro.enums.action import Action
18 from lintro.models.core.tool_result import ToolResult
21@dataclass
22class StreamingResultHandler:
23 """Handle tool results as they arrive, without buffering all in memory.
25 This handler supports streaming output in various formats:
26 - Console: Print formatted results immediately
27 - JSONL: Write one JSON object per line
28 - File: Write to file as results arrive
30 Attributes:
31 output_format: Output format (grid, json, jsonl, etc.).
32 action: The action being performed.
33 output_file: Optional file path for output.
34 """
36 output_format: str
37 action: Action
38 output_file: str | None = None
39 _file_handle: TextIO | None = field(default=None, init=False)
40 _totals: dict[str, int] = field(default_factory=dict, init=False)
41 _results_buffer: list[ToolResult] = field(default_factory=list, init=False)
42 _first_jsonl: bool = field(default=True, init=False)
44 def __post_init__(self) -> None:
45 """Initialize totals dictionary."""
46 self._totals = {
47 "issues": 0,
48 "fixed": 0,
49 "remaining": 0,
50 "tools_run": 0,
51 "tools_failed": 0,
52 }
54 def __enter__(self) -> StreamingResultHandler:
55 """Open file handle if output file is specified.
57 Returns:
58 Self for context manager protocol.
59 """
60 if self.output_file:
61 try:
62 self._file_handle = open(self.output_file, "w", encoding="utf-8")
63 if self.output_format.lower() == "jsonl":
64 # JSONL format starts immediately
65 pass
66 elif self.output_format.lower() == "json":
67 # For JSON array format, write opening bracket
68 self._file_handle.write("[\n")
69 except OSError as e:
70 logger.warning(f"Could not open output file {self.output_file}: {e}")
71 self._file_handle = None
72 return self
74 def __exit__(
75 self,
76 exc_type: type[BaseException] | None,
77 exc_val: BaseException | None,
78 exc_tb: Any,
79 ) -> None:
80 """Close file handle.
82 Args:
83 exc_type: Exception type if an exception was raised.
84 exc_val: Exception value if an exception was raised.
85 exc_tb: Exception traceback if an exception was raised.
86 """
87 if self._file_handle:
88 try:
89 if self.output_format.lower() == "json":
90 # Close JSON array
91 self._file_handle.write("\n]")
92 self._file_handle.close()
93 except OSError as e:
94 logger.warning(f"Error closing output file: {e}")
96 def handle_result(
97 self,
98 result: ToolResult,
99 console_output_func: Callable[[str], None] | None = None,
100 raw_output: bool = False,
101 ) -> None:
102 """Process a single tool result immediately.
104 Args:
105 result: The tool result to process.
106 console_output_func: Optional function to print to console.
107 raw_output: Whether to use raw output instead of formatted.
108 """
109 from lintro.enums.action import Action
111 # Update totals
112 self._totals["tools_run"] += 1
113 self._totals["issues"] += result.issues_count
115 if self.action == Action.FIX:
116 fixed = getattr(result, "fixed_issues_count", None)
117 remaining = getattr(result, "remaining_issues_count", None)
118 if fixed is not None:
119 self._totals["fixed"] += fixed
120 if remaining is not None:
121 self._totals["remaining"] += remaining
123 if not result.success:
124 self._totals["tools_failed"] += 1
126 # Buffer for summary/JSON output
127 self._results_buffer.append(result)
129 # Stream output based on format
130 if self.output_format.lower() == "jsonl":
131 self._write_jsonl(result)
132 elif self.output_format.lower() == "json":
133 self._write_json_item(result)
135 def _write_jsonl(self, result: ToolResult) -> None:
136 """Write result as JSON Lines (one object per line).
138 Args:
139 result: The tool result to write.
140 """
141 data = self._result_to_dict(result)
142 json_line = json.dumps(data)
144 if self._file_handle:
145 self._file_handle.write(json_line + "\n")
146 self._file_handle.flush()
148 def _write_json_item(self, result: ToolResult) -> None:
149 """Write result as JSON array item.
151 Args:
152 result: The tool result to write.
153 """
154 if self._file_handle:
155 data = self._result_to_dict(result)
156 if not self._first_jsonl:
157 self._file_handle.write(",\n")
158 json_str = json.dumps(data, indent=2)
159 # Indent each line for pretty printing in array
160 indented = "\n".join(" " + line for line in json_str.split("\n"))
161 self._file_handle.write(indented)
162 self._first_jsonl = False
163 self._file_handle.flush()
165 def _result_to_dict(self, result: ToolResult) -> dict[str, Any]:
166 """Convert ToolResult to dictionary for JSON serialization.
168 Args:
169 result: The tool result to convert.
171 Returns:
172 Dictionary representation of the result.
173 """
174 data: dict[str, Any] = {
175 "tool": result.name,
176 "success": result.success,
177 "issues_count": result.issues_count,
178 }
180 if result.output:
181 data["output"] = result.output
183 if result.initial_issues_count is not None:
184 data["initial_issues_count"] = result.initial_issues_count
185 if result.fixed_issues_count is not None:
186 data["fixed_issues_count"] = result.fixed_issues_count
187 if result.remaining_issues_count is not None:
188 data["remaining_issues_count"] = result.remaining_issues_count
190 # Include issues if available
191 if result.issues:
192 data["issues"] = [
193 {
194 "file": issue.file,
195 "line": issue.line,
196 "column": issue.column,
197 "message": issue.message,
198 }
199 for issue in result.issues
200 ]
202 return data
204 def get_totals(self) -> dict[str, int]:
205 """Get accumulated totals.
207 Returns:
208 Dictionary with total counts.
209 """
210 return self._totals.copy()
212 def get_results(self) -> list[ToolResult]:
213 """Get buffered results for final processing.
215 Returns:
216 List of all processed results.
217 """
218 return self._results_buffer.copy()
220 def get_exit_code(self) -> int:
221 """Calculate exit code based on results.
223 Returns:
224 0 for success, 1 for failures.
225 """
226 from lintro.enums.action import Action
228 # Any tool failure means exit code 1
229 if self._totals["tools_failed"] > 0:
230 return 1
232 # Check for issues based on action
233 if self.action == Action.FIX:
234 if self._totals["remaining"] > 0:
235 return 1
236 else: # check
237 if self._totals["issues"] > 0:
238 return 1
240 return 0
243def create_streaming_handler(
244 output_format: str,
245 action: Action,
246 output_file: str | None = None,
247) -> StreamingResultHandler:
248 """Create a streaming result handler.
250 Args:
251 output_format: Output format (grid, json, jsonl, etc.).
252 action: The action being performed.
253 output_file: Optional file path for output.
255 Returns:
256 Configured StreamingResultHandler instance.
257 """
258 return StreamingResultHandler(
259 output_format=output_format,
260 action=action,
261 output_file=output_file,
262 )