Coverage for lintro / parsers / streaming.py: 91%
87 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Streaming parser utilities for incremental output processing.
3This module provides generator-based parsing for tool output that can yield
4issues as they are parsed, rather than buffering the entire output first.
6Supports:
7- JSON Lines: Each line is a complete JSON object (naturally streamable)
8- Line-based text: Each line can be parsed independently
9- JSON arrays: Requires buffering but provides consistent interface
11Usage:
12 # Stream JSON Lines output
13 for issue in stream_json_lines(output, parse_func):
14 process(issue)
16 # Stream text output
17 for issue in stream_text_lines(output, parse_line_func):
18 process(issue)
19"""
21from __future__ import annotations
23import json
24from collections.abc import Callable, Generator, Iterable
25from typing import TypeVar
27from loguru import logger
29from lintro.parsers.base_issue import BaseIssue
31IssueT = TypeVar("IssueT", bound=BaseIssue)
34def stream_json_lines(
35 output: str | Iterable[str],
36 parse_item: Callable[[dict[str, object]], IssueT | None],
37 tool_name: str = "tool",
38) -> Generator[IssueT, None, None]:
39 r"""Stream JSON Lines output, yielding parsed issues incrementally.
41 JSON Lines format has one JSON object per line, making it naturally
42 streamable. Each line is parsed independently as soon as it's received.
44 Args:
45 output: Either a string containing newline-separated JSON objects,
46 or an iterable of lines (e.g., from subprocess stdout).
47 parse_item: Function that parses a single JSON object dict into an
48 issue. Should return None for items that should be skipped.
49 tool_name: Name of the tool for logging purposes.
51 Yields:
52 IssueT: Parsed issue objects as they are processed.
54 Examples:
55 >>> def parse(item):
56 ... return MyIssue(file=item.get("file", ""))
57 >>> output = '{"file": "a.py"}\\n{"file": "b.py"}\\n'
58 >>> list(stream_json_lines(output, parse)) # doctest: +SKIP
59 [MyIssue(file='a.py'), MyIssue(file='b.py')]
60 """
61 lines: Iterable[str]
62 lines = output.splitlines() if isinstance(output, str) else output
64 for line in lines:
65 line_str = line.strip() if isinstance(line, str) else str(line).strip()
67 if not line_str:
68 continue
70 # Skip lines that don't look like JSON objects
71 if not line_str.startswith("{"):
72 continue
74 try:
75 item = json.loads(line_str)
76 if not isinstance(item, dict):
77 logger.debug(f"Skipping non-dict JSON in {tool_name}: {type(item)}")
78 continue
80 parsed = parse_item(item)
81 if parsed is not None:
82 yield parsed
84 except json.JSONDecodeError as e:
85 logger.debug(f"Failed to parse {tool_name} JSON line: {e}")
86 continue
87 except (KeyError, TypeError, ValueError) as e:
88 logger.debug(f"Failed to parse {tool_name} item: {e}")
89 continue
92def stream_text_lines(
93 output: str | Iterable[str],
94 parse_line: Callable[[str], IssueT | None],
95 strip_ansi: bool = True,
96) -> Generator[IssueT, None, None]:
97 r"""Stream text output, parsing each line independently.
99 For tools that output one issue per line in a text format, this allows
100 processing each line as soon as it's received.
102 Args:
103 output: Either a string containing newlines, or an iterable of lines.
104 parse_line: Function that parses a single line into an issue.
105 Should return None for lines that don't contain issues.
106 strip_ansi: Whether to strip ANSI escape codes before parsing.
108 Yields:
109 IssueT: Parsed issue objects as they are processed.
111 Examples:
112 >>> def parse(line):
113 ... if "error" in line:
114 ... return MyIssue(message=line)
115 ... return None
116 >>> output = "info: ok\\nerror: bad\\n"
117 >>> list(stream_text_lines(output, parse)) # doctest: +SKIP
118 [MyIssue(message='error: bad')]
119 """
120 from lintro.parsers.base_parser import strip_ansi_codes
122 lines: Iterable[str]
123 lines = output.splitlines() if isinstance(output, str) else output
125 for line in lines:
126 line_str = line if isinstance(line, str) else str(line)
128 if strip_ansi:
129 line_str = strip_ansi_codes(line_str)
131 line_str = line_str.rstrip()
132 if not line_str:
133 continue
135 try:
136 parsed = parse_line(line_str)
137 if parsed is not None:
138 yield parsed
139 except (KeyError, TypeError, ValueError) as e:
140 logger.debug(f"Failed to parse line: {e}")
141 continue
144def stream_json_array_fallback(
145 output: str,
146 parse_item: Callable[[dict[str, object]], IssueT | None],
147 tool_name: str = "tool",
148) -> Generator[IssueT, None, None]:
149 """Parse a JSON array and yield items incrementally.
151 For tools that output a JSON array (not JSON Lines), this function
152 parses the full array but still yields items one at a time for
153 consistent streaming interface.
155 Falls back to JSON Lines parsing if array parsing fails.
157 Args:
158 output: String containing a JSON array or JSON Lines.
159 parse_item: Function that parses a single JSON object dict into an
160 issue. Should return None for items that should be skipped.
161 tool_name: Name of the tool for logging purposes.
163 Yields:
164 IssueT: Parsed issue objects.
165 """
166 if not output or output.strip() in ("[]", "{}"):
167 return
169 # Try JSON array first
170 try:
171 # Handle possible trailing non-JSON data
172 json_end = output.rfind("]")
173 if json_end != -1:
174 json_part = output[: json_end + 1]
175 data = json.loads(json_part)
176 else:
177 data = json.loads(output)
179 if isinstance(data, list):
180 for item in data:
181 if not isinstance(item, dict):
182 logger.debug(f"Skipping non-dict item in {tool_name}")
183 continue
184 try:
185 parsed = parse_item(item)
186 if parsed is not None:
187 yield parsed
188 except (KeyError, TypeError, ValueError) as e:
189 logger.debug(f"Failed to parse {tool_name} item: {e}")
190 continue
191 return
193 except json.JSONDecodeError:
194 logger.debug(f"{tool_name} array parsing failed, trying JSON Lines")
196 # Fallback to JSON Lines
197 yield from stream_json_lines(output, parse_item, tool_name)
200class StreamingParser:
201 """Base class for creating streaming parsers.
203 Subclasses implement the parse_item or parse_line method to define
204 how individual items/lines are converted to issues.
206 Attributes:
207 tool_name (str): Name of the tool for logging.
209 Examples:
210 >>> class MyStreamingParser(StreamingParser):
211 ... def parse_item(self, item):
212 ... return MyIssue(file=item.get("file", ""))
213 >>> parser = MyStreamingParser("mytool")
214 >>> for issue in parser.stream_json_lines(output):
215 ... print(issue)
216 """
218 tool_name: str
220 def __init__(self, tool_name: str = "tool") -> None:
221 """Initialize streaming parser.
223 Args:
224 tool_name: Name of the tool for logging purposes.
225 """
226 self.tool_name = tool_name
228 def parse_item(self, item: dict[str, object]) -> BaseIssue | None:
229 """Parse a single JSON item into an issue.
231 Override this method in subclasses to implement JSON parsing.
233 Args:
234 item: Dictionary from JSON parsing.
236 Returns:
237 BaseIssue | None: Parsed issue or None if item should be skipped.
239 Raises:
240 NotImplementedError: If not overridden in subclass.
241 """
242 raise NotImplementedError("Subclass must implement parse_item")
244 def parse_line(self, line: str) -> BaseIssue | None:
245 """Parse a single text line into an issue.
247 Override this method in subclasses to implement text parsing.
249 Args:
250 line: Text line to parse.
252 Returns:
253 BaseIssue | None: Parsed issue or None if line should be skipped.
255 Raises:
256 NotImplementedError: If not overridden in subclass.
257 """
258 raise NotImplementedError("Subclass must implement parse_line")
260 def stream_json_lines(
261 self,
262 output: str | Iterable[str],
263 ) -> Generator[BaseIssue, None, None]:
264 """Stream JSON Lines output using this parser's parse_item method.
266 Args:
267 output: String or iterable of lines to parse.
269 Yields:
270 BaseIssue: Parsed issues.
271 """
272 yield from stream_json_lines(output, self.parse_item, self.tool_name)
274 def stream_text_lines(
275 self,
276 output: str | Iterable[str],
277 strip_ansi: bool = True,
278 ) -> Generator[BaseIssue, None, None]:
279 """Stream text output using this parser's parse_line method.
281 Args:
282 output: String or iterable of lines to parse.
283 strip_ansi: Whether to strip ANSI codes.
285 Yields:
286 BaseIssue: Parsed issues.
287 """
288 yield from stream_text_lines(output, self.parse_line, strip_ansi)
290 def stream_json_array(
291 self,
292 output: str,
293 ) -> Generator[BaseIssue, None, None]:
294 """Stream JSON array with fallback to JSON Lines.
296 Args:
297 output: String containing JSON array or JSON Lines.
299 Yields:
300 BaseIssue: Parsed issues.
301 """
302 yield from stream_json_array_fallback(output, self.parse_item, self.tool_name)
305def collect_streaming_results(
306 generator: Generator[IssueT, None, None],
307) -> list[IssueT]:
308 """Collect all results from a streaming parser into a list.
310 Utility function to convert streaming parser output to the traditional
311 list-based interface used by existing code.
313 Args:
314 generator: Generator yielding parsed issues.
316 Returns:
317 List of all parsed issues.
319 Examples:
320 >>> results = collect_streaming_results(parser.stream_json_lines(output))
321 """
322 return list(generator)