Coverage for lintro / parsers / sqlfluff / sqlfluff_parser.py: 95%
39 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Parser for SQLFluff JSON output.
3This module provides functions to parse SQLFluff JSON output into
4SqlfluffIssue objects.
6SQLFluff JSON output has a nested structure with a "violations" array per file:
7[
8 {
9 "filepath": "query.sql",
10 "violations": [
11 {
12 "start_line_no": 1,
13 "start_line_pos": 1,
14 "end_line_no": 1,
15 "end_line_pos": 6,
16 "code": "L010",
17 "description": "Keywords must be upper case.",
18 "name": "capitalisation.keywords"
19 }
20 ]
21 }
22]
23"""
25from __future__ import annotations
27import json
29from loguru import logger
31from lintro.parsers.base_parser import (
32 extract_int_field,
33 extract_str_field,
34 safe_parse_items,
35)
36from lintro.parsers.sqlfluff.sqlfluff_issue import SqlfluffIssue
39def _parse_sqlfluff_violation(
40 violation: dict[str, object],
41 filepath: str,
42) -> SqlfluffIssue | None:
43 """Parse a single SQLFluff violation into a SqlfluffIssue object.
45 Args:
46 violation: Dictionary containing violation data from SQLFluff JSON output.
47 filepath: File path for the violation.
49 Returns:
50 SqlfluffIssue object if parsing succeeds, None otherwise.
51 """
52 line = extract_int_field(violation, ["start_line_no", "line"], default=0) or 0
53 column = extract_int_field(violation, ["start_line_pos", "column"], default=0) or 0
54 end_line = extract_int_field(violation, ["end_line_no"], default=None)
55 end_column = extract_int_field(violation, ["end_line_pos"], default=None)
57 code = extract_str_field(violation, ["code"]) or ""
58 rule_name = extract_str_field(violation, ["name", "rule_name"]) or ""
59 message = extract_str_field(violation, ["description", "message"]) or ""
61 return SqlfluffIssue(
62 file=filepath,
63 line=line,
64 column=column,
65 code=code,
66 rule_name=rule_name,
67 message=message,
68 end_line=end_line,
69 end_column=end_column,
70 )
73def parse_sqlfluff_output(output: str | None) -> list[SqlfluffIssue]:
74 """Parse SQLFluff JSON output into SqlfluffIssue objects.
76 Supports SQLFluff's nested JSON structure where each file entry contains
77 a "violations" array.
79 Args:
80 output: Raw output from `sqlfluff lint --format=json`.
82 Returns:
83 list[SqlfluffIssue]: Parsed issues.
84 """
85 if not output or output.strip() in ("", "[]", "{}"):
86 return []
88 try:
89 data = json.loads(output)
90 except (json.JSONDecodeError, TypeError) as e:
91 logger.debug(f"SQLFluff JSON parsing failed: {e}")
92 return []
94 if not isinstance(data, list):
95 logger.debug(f"SQLFluff output is not a list: {type(data).__name__}")
96 return []
98 issues: list[SqlfluffIssue] = []
100 for file_entry in data:
101 if not isinstance(file_entry, dict):
102 logger.debug("Skipping non-dict file entry in SQLFluff output")
103 continue
105 filepath = extract_str_field(file_entry, ["filepath", "file"])
106 violations = file_entry.get("violations")
108 if not isinstance(violations, list):
109 # No violations for this file
110 continue
112 # Parse violations for this file
113 def parse_with_filepath(
114 violation: dict[str, object],
115 fp: str = filepath, # Capture by value to avoid B023
116 ) -> SqlfluffIssue | None:
117 return _parse_sqlfluff_violation(violation=violation, filepath=fp)
119 file_issues = safe_parse_items(
120 items=violations,
121 parse_func=parse_with_filepath,
122 tool_name="sqlfluff",
123 )
124 issues.extend(file_issues)
126 return issues