Coverage for lintro / parsers / ruff / ruff_parser.py: 80%
76 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Parser for ruff output (lint and format).
3This module provides functions to parse both:
4- ruff check --output-format json (linting issues)
5- ruff format --check (plain text: files needing formatting)
7Supports both batch and streaming parsing modes.
8"""
10from __future__ import annotations
12import json
13from collections.abc import Generator, Iterable
15from loguru import logger
17from lintro.parsers.base_parser import (
18 extract_dict_field,
19 extract_int_field,
20 extract_str_field,
21 safe_parse_items,
22)
23from lintro.parsers.ruff.ruff_issue import RuffIssue
24from lintro.parsers.streaming import stream_json_array_fallback
27def _parse_ruff_item(item: dict[str, object]) -> RuffIssue | None:
28 """Parse a single Ruff issue item into a RuffIssue object.
30 Args:
31 item: Dictionary containing issue data from Ruff JSON output.
33 Returns:
34 RuffIssue object if parsing succeeds, None otherwise.
35 """
36 filename = extract_str_field(item, ["filename", "file"])
37 loc = extract_dict_field(item, ["location", "start"])
38 end_loc = extract_dict_field(item, ["end_location", "end"])
40 line = extract_int_field(loc, ["row", "line"], default=0) or 0
41 column = extract_int_field(loc, ["column", "col"], default=0) or 0
42 end_line = extract_int_field(end_loc, ["row", "line"], default=line) or line
43 end_column = extract_int_field(end_loc, ["column", "col"], default=column) or column
45 code = extract_str_field(item, ["code", "rule"])
46 message = extract_str_field(item, ["message"])
47 url_candidate = item.get("url")
48 url: str | None = url_candidate if isinstance(url_candidate, str) else None
50 fix = extract_dict_field(item, ["fix"])
51 fixable: bool = bool(fix)
52 fix_applicability_raw = fix.get("applicability") if fix else None
53 fix_applicability: str | None = (
54 fix_applicability_raw if isinstance(fix_applicability_raw, str) else None
55 )
57 return RuffIssue(
58 file=filename,
59 line=line,
60 column=column,
61 code=code,
62 message=message,
63 url=url,
64 end_line=end_line,
65 end_column=end_column,
66 fixable=fixable,
67 fix_applicability=fix_applicability,
68 )
71def parse_ruff_output(output: str) -> list[RuffIssue]:
72 """Parse Ruff JSON or JSON Lines output into `RuffIssue` objects.
74 Supports multiple Ruff schema variants across versions by accepting:
75 - JSON array of issue objects
76 - JSON Lines (one object per line)
78 Field name variations handled:
79 - location: "location" or "start" with keys "row"|"line" and
80 "column"|"col"
81 - end location: "end_location" or "end" with keys "row"|"line" and
82 "column"|"col"
83 - filename: "filename" (preferred) or "file"
85 Args:
86 output: Raw output from `ruff check --output-format json`.
88 Returns:
89 list[RuffIssue]: Parsed issues.
90 """
91 if not output or output.strip() in ("[]", "{}"):
92 return []
94 # First try JSON array (with possible trailing non-JSON data)
95 try:
96 json_end = output.rfind("]")
97 if json_end != -1:
98 json_part = output[: json_end + 1]
99 ruff_data = json.loads(json_part)
100 else:
101 ruff_data = json.loads(output)
103 if isinstance(ruff_data, list):
104 return safe_parse_items(ruff_data, _parse_ruff_item, "ruff")
105 except (json.JSONDecodeError, TypeError) as e:
106 # Fall back to JSON Lines parsing below
107 logger.debug(f"Ruff array JSON parsing failed, falling back to JSON Lines: {e}")
109 # Fallback: parse JSON Lines (each line is a JSON object)
110 items: list[object] = []
111 for line in output.splitlines():
112 line_str = line.strip()
113 if not line_str or not line_str.startswith("{"):
114 continue
115 try:
116 item = json.loads(line_str)
117 if isinstance(item, dict):
118 items.append(item)
119 except json.JSONDecodeError as e:
120 logger.debug(f"Failed to parse ruff JSON line '{line_str[:50]}': {e}")
121 continue
123 return safe_parse_items(items, _parse_ruff_item, "ruff")
126def parse_ruff_format_check_output(output: str) -> list[str]:
127 """Parse the output of `ruff format --check` to get files needing formatting.
129 Args:
130 output: The raw output from `ruff format --check`
132 Returns:
133 List of file paths that would be reformatted
134 """
135 from lintro.parsers.base_parser import strip_ansi_codes
137 if not output:
138 return []
139 files = []
140 for raw in output.splitlines():
141 # Strip ANSI color codes for stable parsing across environments
142 line = strip_ansi_codes(raw).strip()
143 # Ruff format --check output: 'Would reformat: path/to/file.py' or
144 # 'Would reformat path/to/file.py'
145 if line.startswith("Would reformat: "):
146 files.append(line[len("Would reformat: ") :])
147 elif line.startswith("Would reformat "):
148 files.append(line[len("Would reformat ") :])
149 return files
152# ---------------------------------------------------------------------------
153# Streaming parser variants
154# ---------------------------------------------------------------------------
157def stream_ruff_output(
158 output: str | Iterable[str],
159) -> Generator[RuffIssue, None, None]:
160 """Stream Ruff JSON output, yielding issues as they are parsed.
162 Supports both JSON array and JSON Lines formats. For large outputs,
163 this is more memory-efficient than parse_ruff_output() as it yields
164 issues incrementally rather than building a full list.
166 Args:
167 output: Raw output from `ruff check --output-format json`, either
168 as a complete string or as an iterable of lines.
170 Yields:
171 RuffIssue: Parsed issues one at a time.
173 Examples:
174 >>> for issue in stream_ruff_output(ruff_output):
175 ... print(f"{issue.file}:{issue.line}: {issue.message}")
176 """
177 if isinstance(output, str):
178 # Use fallback parser that handles both JSON array and JSON Lines
179 yield from stream_json_array_fallback(output, _parse_ruff_item, "ruff")
180 else:
181 # Iterable of lines - stream JSON Lines directly
182 from lintro.parsers.streaming import stream_json_lines
184 yield from stream_json_lines(output, _parse_ruff_item, "ruff")
187def stream_ruff_format_output(
188 output: str | Iterable[str],
189) -> Generator[str, None, None]:
190 """Stream ruff format --check output, yielding file paths incrementally.
192 Args:
193 output: Raw output from `ruff format --check`, either as a complete
194 string or as an iterable of lines.
196 Yields:
197 str: File paths that would be reformatted.
199 Examples:
200 >>> for filepath in stream_ruff_format_output(format_output):
201 ... print(f"Needs formatting: {filepath}")
202 """
203 from lintro.parsers.base_parser import strip_ansi_codes
205 lines: Iterable[str]
206 lines = output.splitlines() if isinstance(output, str) else output
208 for raw_line in lines:
209 line = strip_ansi_codes(raw_line).strip()
210 if line.startswith("Would reformat: "):
211 yield line[len("Would reformat: ") :]
212 elif line.startswith("Would reformat "):
213 yield line[len("Would reformat ") :]