Coverage for lintro / parsers / streaming.py: 91%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Streaming parser utilities for incremental output processing. 

2 

3This module provides generator-based parsing for tool output that can yield 

4issues as they are parsed, rather than buffering the entire output first. 

5 

6Supports: 

7- JSON Lines: Each line is a complete JSON object (naturally streamable) 

8- Line-based text: Each line can be parsed independently 

9- JSON arrays: Requires buffering but provides consistent interface 

10 

11Usage: 

12 # Stream JSON Lines output 

13 for issue in stream_json_lines(output, parse_func): 

14 process(issue) 

15 

16 # Stream text output 

17 for issue in stream_text_lines(output, parse_line_func): 

18 process(issue) 

19""" 

20 

21from __future__ import annotations 

22 

23import json 

24from collections.abc import Callable, Generator, Iterable 

25from typing import TypeVar 

26 

27from loguru import logger 

28 

29from lintro.parsers.base_issue import BaseIssue 

30 

31IssueT = TypeVar("IssueT", bound=BaseIssue) 

32 

33 

34def stream_json_lines( 

35 output: str | Iterable[str], 

36 parse_item: Callable[[dict[str, object]], IssueT | None], 

37 tool_name: str = "tool", 

38) -> Generator[IssueT, None, None]: 

39 r"""Stream JSON Lines output, yielding parsed issues incrementally. 

40 

41 JSON Lines format has one JSON object per line, making it naturally 

42 streamable. Each line is parsed independently as soon as it's received. 

43 

44 Args: 

45 output: Either a string containing newline-separated JSON objects, 

46 or an iterable of lines (e.g., from subprocess stdout). 

47 parse_item: Function that parses a single JSON object dict into an 

48 issue. Should return None for items that should be skipped. 

49 tool_name: Name of the tool for logging purposes. 

50 

51 Yields: 

52 IssueT: Parsed issue objects as they are processed. 

53 

54 Examples: 

55 >>> def parse(item): 

56 ... return MyIssue(file=item.get("file", "")) 

57 >>> output = '{"file": "a.py"}\\n{"file": "b.py"}\\n' 

58 >>> list(stream_json_lines(output, parse)) # doctest: +SKIP 

59 [MyIssue(file='a.py'), MyIssue(file='b.py')] 

60 """ 

61 lines: Iterable[str] 

62 lines = output.splitlines() if isinstance(output, str) else output 

63 

64 for line in lines: 

65 line_str = line.strip() if isinstance(line, str) else str(line).strip() 

66 

67 if not line_str: 

68 continue 

69 

70 # Skip lines that don't look like JSON objects 

71 if not line_str.startswith("{"): 

72 continue 

73 

74 try: 

75 item = json.loads(line_str) 

76 if not isinstance(item, dict): 

77 logger.debug(f"Skipping non-dict JSON in {tool_name}: {type(item)}") 

78 continue 

79 

80 parsed = parse_item(item) 

81 if parsed is not None: 

82 yield parsed 

83 

84 except json.JSONDecodeError as e: 

85 logger.debug(f"Failed to parse {tool_name} JSON line: {e}") 

86 continue 

87 except (KeyError, TypeError, ValueError) as e: 

88 logger.debug(f"Failed to parse {tool_name} item: {e}") 

89 continue 

90 

91 

92def stream_text_lines( 

93 output: str | Iterable[str], 

94 parse_line: Callable[[str], IssueT | None], 

95 strip_ansi: bool = True, 

96) -> Generator[IssueT, None, None]: 

97 r"""Stream text output, parsing each line independently. 

98 

99 For tools that output one issue per line in a text format, this allows 

100 processing each line as soon as it's received. 

101 

102 Args: 

103 output: Either a string containing newlines, or an iterable of lines. 

104 parse_line: Function that parses a single line into an issue. 

105 Should return None for lines that don't contain issues. 

106 strip_ansi: Whether to strip ANSI escape codes before parsing. 

107 

108 Yields: 

109 IssueT: Parsed issue objects as they are processed. 

110 

111 Examples: 

112 >>> def parse(line): 

113 ... if "error" in line: 

114 ... return MyIssue(message=line) 

115 ... return None 

116 >>> output = "info: ok\\nerror: bad\\n" 

117 >>> list(stream_text_lines(output, parse)) # doctest: +SKIP 

118 [MyIssue(message='error: bad')] 

119 """ 

120 from lintro.parsers.base_parser import strip_ansi_codes 

121 

122 lines: Iterable[str] 

123 lines = output.splitlines() if isinstance(output, str) else output 

124 

125 for line in lines: 

126 line_str = line if isinstance(line, str) else str(line) 

127 

128 if strip_ansi: 

129 line_str = strip_ansi_codes(line_str) 

130 

131 line_str = line_str.rstrip() 

132 if not line_str: 

133 continue 

134 

135 try: 

136 parsed = parse_line(line_str) 

137 if parsed is not None: 

138 yield parsed 

139 except (KeyError, TypeError, ValueError) as e: 

140 logger.debug(f"Failed to parse line: {e}") 

141 continue 

142 

143 

144def stream_json_array_fallback( 

145 output: str, 

146 parse_item: Callable[[dict[str, object]], IssueT | None], 

147 tool_name: str = "tool", 

148) -> Generator[IssueT, None, None]: 

149 """Parse a JSON array and yield items incrementally. 

150 

151 For tools that output a JSON array (not JSON Lines), this function 

152 parses the full array but still yields items one at a time for 

153 consistent streaming interface. 

154 

155 Falls back to JSON Lines parsing if array parsing fails. 

156 

157 Args: 

158 output: String containing a JSON array or JSON Lines. 

159 parse_item: Function that parses a single JSON object dict into an 

160 issue. Should return None for items that should be skipped. 

161 tool_name: Name of the tool for logging purposes. 

162 

163 Yields: 

164 IssueT: Parsed issue objects. 

165 """ 

166 if not output or output.strip() in ("[]", "{}"): 

167 return 

168 

169 # Try JSON array first 

170 try: 

171 # Handle possible trailing non-JSON data 

172 json_end = output.rfind("]") 

173 if json_end != -1: 

174 json_part = output[: json_end + 1] 

175 data = json.loads(json_part) 

176 else: 

177 data = json.loads(output) 

178 

179 if isinstance(data, list): 

180 for item in data: 

181 if not isinstance(item, dict): 

182 logger.debug(f"Skipping non-dict item in {tool_name}") 

183 continue 

184 try: 

185 parsed = parse_item(item) 

186 if parsed is not None: 

187 yield parsed 

188 except (KeyError, TypeError, ValueError) as e: 

189 logger.debug(f"Failed to parse {tool_name} item: {e}") 

190 continue 

191 return 

192 

193 except json.JSONDecodeError: 

194 logger.debug(f"{tool_name} array parsing failed, trying JSON Lines") 

195 

196 # Fallback to JSON Lines 

197 yield from stream_json_lines(output, parse_item, tool_name) 

198 

199 

200class StreamingParser: 

201 """Base class for creating streaming parsers. 

202 

203 Subclasses implement the parse_item or parse_line method to define 

204 how individual items/lines are converted to issues. 

205 

206 Attributes: 

207 tool_name (str): Name of the tool for logging. 

208 

209 Examples: 

210 >>> class MyStreamingParser(StreamingParser): 

211 ... def parse_item(self, item): 

212 ... return MyIssue(file=item.get("file", "")) 

213 >>> parser = MyStreamingParser("mytool") 

214 >>> for issue in parser.stream_json_lines(output): 

215 ... print(issue) 

216 """ 

217 

218 tool_name: str 

219 

220 def __init__(self, tool_name: str = "tool") -> None: 

221 """Initialize streaming parser. 

222 

223 Args: 

224 tool_name: Name of the tool for logging purposes. 

225 """ 

226 self.tool_name = tool_name 

227 

228 def parse_item(self, item: dict[str, object]) -> BaseIssue | None: 

229 """Parse a single JSON item into an issue. 

230 

231 Override this method in subclasses to implement JSON parsing. 

232 

233 Args: 

234 item: Dictionary from JSON parsing. 

235 

236 Returns: 

237 BaseIssue | None: Parsed issue or None if item should be skipped. 

238 

239 Raises: 

240 NotImplementedError: If not overridden in subclass. 

241 """ 

242 raise NotImplementedError("Subclass must implement parse_item") 

243 

244 def parse_line(self, line: str) -> BaseIssue | None: 

245 """Parse a single text line into an issue. 

246 

247 Override this method in subclasses to implement text parsing. 

248 

249 Args: 

250 line: Text line to parse. 

251 

252 Returns: 

253 BaseIssue | None: Parsed issue or None if line should be skipped. 

254 

255 Raises: 

256 NotImplementedError: If not overridden in subclass. 

257 """ 

258 raise NotImplementedError("Subclass must implement parse_line") 

259 

260 def stream_json_lines( 

261 self, 

262 output: str | Iterable[str], 

263 ) -> Generator[BaseIssue, None, None]: 

264 """Stream JSON Lines output using this parser's parse_item method. 

265 

266 Args: 

267 output: String or iterable of lines to parse. 

268 

269 Yields: 

270 BaseIssue: Parsed issues. 

271 """ 

272 yield from stream_json_lines(output, self.parse_item, self.tool_name) 

273 

274 def stream_text_lines( 

275 self, 

276 output: str | Iterable[str], 

277 strip_ansi: bool = True, 

278 ) -> Generator[BaseIssue, None, None]: 

279 """Stream text output using this parser's parse_line method. 

280 

281 Args: 

282 output: String or iterable of lines to parse. 

283 strip_ansi: Whether to strip ANSI codes. 

284 

285 Yields: 

286 BaseIssue: Parsed issues. 

287 """ 

288 yield from stream_text_lines(output, self.parse_line, strip_ansi) 

289 

290 def stream_json_array( 

291 self, 

292 output: str, 

293 ) -> Generator[BaseIssue, None, None]: 

294 """Stream JSON array with fallback to JSON Lines. 

295 

296 Args: 

297 output: String containing JSON array or JSON Lines. 

298 

299 Yields: 

300 BaseIssue: Parsed issues. 

301 """ 

302 yield from stream_json_array_fallback(output, self.parse_item, self.tool_name) 

303 

304 

305def collect_streaming_results( 

306 generator: Generator[IssueT, None, None], 

307) -> list[IssueT]: 

308 """Collect all results from a streaming parser into a list. 

309 

310 Utility function to convert streaming parser output to the traditional 

311 list-based interface used by existing code. 

312 

313 Args: 

314 generator: Generator yielding parsed issues. 

315 

316 Returns: 

317 List of all parsed issues. 

318 

319 Examples: 

320 >>> results = collect_streaming_results(parser.stream_json_lines(output)) 

321 """ 

322 return list(generator)