Coverage for lintro / parsers / sqlfluff / sqlfluff_parser.py: 95%

39 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Parser for SQLFluff JSON output. 

2 

3This module provides functions to parse SQLFluff JSON output into 

4SqlfluffIssue objects. 

5 

6SQLFluff JSON output has a nested structure with a "violations" array per file: 

7[ 

8 { 

9 "filepath": "query.sql", 

10 "violations": [ 

11 { 

12 "start_line_no": 1, 

13 "start_line_pos": 1, 

14 "end_line_no": 1, 

15 "end_line_pos": 6, 

16 "code": "L010", 

17 "description": "Keywords must be upper case.", 

18 "name": "capitalisation.keywords" 

19 } 

20 ] 

21 } 

22] 

23""" 

24 

25from __future__ import annotations 

26 

27import json 

28 

29from loguru import logger 

30 

31from lintro.parsers.base_parser import ( 

32 extract_int_field, 

33 extract_str_field, 

34 safe_parse_items, 

35) 

36from lintro.parsers.sqlfluff.sqlfluff_issue import SqlfluffIssue 

37 

38 

39def _parse_sqlfluff_violation( 

40 violation: dict[str, object], 

41 filepath: str, 

42) -> SqlfluffIssue | None: 

43 """Parse a single SQLFluff violation into a SqlfluffIssue object. 

44 

45 Args: 

46 violation: Dictionary containing violation data from SQLFluff JSON output. 

47 filepath: File path for the violation. 

48 

49 Returns: 

50 SqlfluffIssue object if parsing succeeds, None otherwise. 

51 """ 

52 line = extract_int_field(violation, ["start_line_no", "line"], default=0) or 0 

53 column = extract_int_field(violation, ["start_line_pos", "column"], default=0) or 0 

54 end_line = extract_int_field(violation, ["end_line_no"], default=None) 

55 end_column = extract_int_field(violation, ["end_line_pos"], default=None) 

56 

57 code = extract_str_field(violation, ["code"]) or "" 

58 rule_name = extract_str_field(violation, ["name", "rule_name"]) or "" 

59 message = extract_str_field(violation, ["description", "message"]) or "" 

60 

61 return SqlfluffIssue( 

62 file=filepath, 

63 line=line, 

64 column=column, 

65 code=code, 

66 rule_name=rule_name, 

67 message=message, 

68 end_line=end_line, 

69 end_column=end_column, 

70 ) 

71 

72 

73def parse_sqlfluff_output(output: str | None) -> list[SqlfluffIssue]: 

74 """Parse SQLFluff JSON output into SqlfluffIssue objects. 

75 

76 Supports SQLFluff's nested JSON structure where each file entry contains 

77 a "violations" array. 

78 

79 Args: 

80 output: Raw output from `sqlfluff lint --format=json`. 

81 

82 Returns: 

83 list[SqlfluffIssue]: Parsed issues. 

84 """ 

85 if not output or output.strip() in ("", "[]", "{}"): 

86 return [] 

87 

88 try: 

89 data = json.loads(output) 

90 except (json.JSONDecodeError, TypeError) as e: 

91 logger.debug(f"SQLFluff JSON parsing failed: {e}") 

92 return [] 

93 

94 if not isinstance(data, list): 

95 logger.debug(f"SQLFluff output is not a list: {type(data).__name__}") 

96 return [] 

97 

98 issues: list[SqlfluffIssue] = [] 

99 

100 for file_entry in data: 

101 if not isinstance(file_entry, dict): 

102 logger.debug("Skipping non-dict file entry in SQLFluff output") 

103 continue 

104 

105 filepath = extract_str_field(file_entry, ["filepath", "file"]) 

106 violations = file_entry.get("violations") 

107 

108 if not isinstance(violations, list): 

109 # No violations for this file 

110 continue 

111 

112 # Parse violations for this file 

113 def parse_with_filepath( 

114 violation: dict[str, object], 

115 fp: str = filepath, # Capture by value to avoid B023 

116 ) -> SqlfluffIssue | None: 

117 return _parse_sqlfluff_violation(violation=violation, filepath=fp) 

118 

119 file_issues = safe_parse_items( 

120 items=violations, 

121 parse_func=parse_with_filepath, 

122 tool_name="sqlfluff", 

123 ) 

124 issues.extend(file_issues) 

125 

126 return issues