Coverage for lintro / parsers / ruff / ruff_parser.py: 80%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Parser for ruff output (lint and format). 

2 

3This module provides functions to parse both: 

4- ruff check --output-format json (linting issues) 

5- ruff format --check (plain text: files needing formatting) 

6 

7Supports both batch and streaming parsing modes. 

8""" 

9 

10from __future__ import annotations 

11 

12import json 

13from collections.abc import Generator, Iterable 

14 

15from loguru import logger 

16 

17from lintro.parsers.base_parser import ( 

18 extract_dict_field, 

19 extract_int_field, 

20 extract_str_field, 

21 safe_parse_items, 

22) 

23from lintro.parsers.ruff.ruff_issue import RuffIssue 

24from lintro.parsers.streaming import stream_json_array_fallback 

25 

26 

27def _parse_ruff_item(item: dict[str, object]) -> RuffIssue | None: 

28 """Parse a single Ruff issue item into a RuffIssue object. 

29 

30 Args: 

31 item: Dictionary containing issue data from Ruff JSON output. 

32 

33 Returns: 

34 RuffIssue object if parsing succeeds, None otherwise. 

35 """ 

36 filename = extract_str_field(item, ["filename", "file"]) 

37 loc = extract_dict_field(item, ["location", "start"]) 

38 end_loc = extract_dict_field(item, ["end_location", "end"]) 

39 

40 line = extract_int_field(loc, ["row", "line"], default=0) or 0 

41 column = extract_int_field(loc, ["column", "col"], default=0) or 0 

42 end_line = extract_int_field(end_loc, ["row", "line"], default=line) or line 

43 end_column = extract_int_field(end_loc, ["column", "col"], default=column) or column 

44 

45 code = extract_str_field(item, ["code", "rule"]) 

46 message = extract_str_field(item, ["message"]) 

47 url_candidate = item.get("url") 

48 url: str | None = url_candidate if isinstance(url_candidate, str) else None 

49 

50 fix = extract_dict_field(item, ["fix"]) 

51 fixable: bool = bool(fix) 

52 fix_applicability_raw = fix.get("applicability") if fix else None 

53 fix_applicability: str | None = ( 

54 fix_applicability_raw if isinstance(fix_applicability_raw, str) else None 

55 ) 

56 

57 return RuffIssue( 

58 file=filename, 

59 line=line, 

60 column=column, 

61 code=code, 

62 message=message, 

63 url=url, 

64 end_line=end_line, 

65 end_column=end_column, 

66 fixable=fixable, 

67 fix_applicability=fix_applicability, 

68 ) 

69 

70 

71def parse_ruff_output(output: str) -> list[RuffIssue]: 

72 """Parse Ruff JSON or JSON Lines output into `RuffIssue` objects. 

73 

74 Supports multiple Ruff schema variants across versions by accepting: 

75 - JSON array of issue objects 

76 - JSON Lines (one object per line) 

77 

78 Field name variations handled: 

79 - location: "location" or "start" with keys "row"|"line" and 

80 "column"|"col" 

81 - end location: "end_location" or "end" with keys "row"|"line" and 

82 "column"|"col" 

83 - filename: "filename" (preferred) or "file" 

84 

85 Args: 

86 output: Raw output from `ruff check --output-format json`. 

87 

88 Returns: 

89 list[RuffIssue]: Parsed issues. 

90 """ 

91 if not output or output.strip() in ("[]", "{}"): 

92 return [] 

93 

94 # First try JSON array (with possible trailing non-JSON data) 

95 try: 

96 json_end = output.rfind("]") 

97 if json_end != -1: 

98 json_part = output[: json_end + 1] 

99 ruff_data = json.loads(json_part) 

100 else: 

101 ruff_data = json.loads(output) 

102 

103 if isinstance(ruff_data, list): 

104 return safe_parse_items(ruff_data, _parse_ruff_item, "ruff") 

105 except (json.JSONDecodeError, TypeError) as e: 

106 # Fall back to JSON Lines parsing below 

107 logger.debug(f"Ruff array JSON parsing failed, falling back to JSON Lines: {e}") 

108 

109 # Fallback: parse JSON Lines (each line is a JSON object) 

110 items: list[object] = [] 

111 for line in output.splitlines(): 

112 line_str = line.strip() 

113 if not line_str or not line_str.startswith("{"): 

114 continue 

115 try: 

116 item = json.loads(line_str) 

117 if isinstance(item, dict): 

118 items.append(item) 

119 except json.JSONDecodeError as e: 

120 logger.debug(f"Failed to parse ruff JSON line '{line_str[:50]}': {e}") 

121 continue 

122 

123 return safe_parse_items(items, _parse_ruff_item, "ruff") 

124 

125 

126def parse_ruff_format_check_output(output: str) -> list[str]: 

127 """Parse the output of `ruff format --check` to get files needing formatting. 

128 

129 Args: 

130 output: The raw output from `ruff format --check` 

131 

132 Returns: 

133 List of file paths that would be reformatted 

134 """ 

135 from lintro.parsers.base_parser import strip_ansi_codes 

136 

137 if not output: 

138 return [] 

139 files = [] 

140 for raw in output.splitlines(): 

141 # Strip ANSI color codes for stable parsing across environments 

142 line = strip_ansi_codes(raw).strip() 

143 # Ruff format --check output: 'Would reformat: path/to/file.py' or 

144 # 'Would reformat path/to/file.py' 

145 if line.startswith("Would reformat: "): 

146 files.append(line[len("Would reformat: ") :]) 

147 elif line.startswith("Would reformat "): 

148 files.append(line[len("Would reformat ") :]) 

149 return files 

150 

151 

152# --------------------------------------------------------------------------- 

153# Streaming parser variants 

154# --------------------------------------------------------------------------- 

155 

156 

157def stream_ruff_output( 

158 output: str | Iterable[str], 

159) -> Generator[RuffIssue, None, None]: 

160 """Stream Ruff JSON output, yielding issues as they are parsed. 

161 

162 Supports both JSON array and JSON Lines formats. For large outputs, 

163 this is more memory-efficient than parse_ruff_output() as it yields 

164 issues incrementally rather than building a full list. 

165 

166 Args: 

167 output: Raw output from `ruff check --output-format json`, either 

168 as a complete string or as an iterable of lines. 

169 

170 Yields: 

171 RuffIssue: Parsed issues one at a time. 

172 

173 Examples: 

174 >>> for issue in stream_ruff_output(ruff_output): 

175 ... print(f"{issue.file}:{issue.line}: {issue.message}") 

176 """ 

177 if isinstance(output, str): 

178 # Use fallback parser that handles both JSON array and JSON Lines 

179 yield from stream_json_array_fallback(output, _parse_ruff_item, "ruff") 

180 else: 

181 # Iterable of lines - stream JSON Lines directly 

182 from lintro.parsers.streaming import stream_json_lines 

183 

184 yield from stream_json_lines(output, _parse_ruff_item, "ruff") 

185 

186 

187def stream_ruff_format_output( 

188 output: str | Iterable[str], 

189) -> Generator[str, None, None]: 

190 """Stream ruff format --check output, yielding file paths incrementally. 

191 

192 Args: 

193 output: Raw output from `ruff format --check`, either as a complete 

194 string or as an iterable of lines. 

195 

196 Yields: 

197 str: File paths that would be reformatted. 

198 

199 Examples: 

200 >>> for filepath in stream_ruff_format_output(format_output): 

201 ... print(f"Needs formatting: {filepath}") 

202 """ 

203 from lintro.parsers.base_parser import strip_ansi_codes 

204 

205 lines: Iterable[str] 

206 lines = output.splitlines() if isinstance(output, str) else output 

207 

208 for raw_line in lines: 

209 line = strip_ansi_codes(raw_line).strip() 

210 if line.startswith("Would reformat: "): 

211 yield line[len("Would reformat: ") :] 

212 elif line.startswith("Would reformat "): 

213 yield line[len("Would reformat ") :]