Coverage for tests / unit / parsers / streaming / test_text_lines.py: 100%

26 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Tests for stream_text_lines function.""" 

2 

3from __future__ import annotations 

4 

5from typing import TYPE_CHECKING 

6 

7from assertpy import assert_that 

8 

9from lintro.parsers.streaming import stream_text_lines 

10 

11if TYPE_CHECKING: 

12 from collections.abc import Callable 

13 

14 from tests.unit.parsers.streaming.conftest import SimpleIssue 

15 

16 

17def test_parses_string( 

18 parse_error_line: Callable[[str], SimpleIssue | None], 

19) -> None: 

20 """Parse lines from string. 

21 

22 Args: 

23 parse_error_line: Fixture providing parser function for error lines. 

24 """ 

25 output = "INFO: ok\nERROR: bad thing\nINFO: fine\nERROR: another\n" 

26 results = list(stream_text_lines(output, parse_error_line)) 

27 

28 assert_that(results).is_length(2) 

29 assert_that(results[0].message).is_equal_to("bad thing") 

30 assert_that(results[1].message).is_equal_to("another") 

31 

32 

33def test_parses_iterable( 

34 parse_error_line: Callable[[str], SimpleIssue | None], 

35) -> None: 

36 """Parse lines from iterable. 

37 

38 Args: 

39 parse_error_line: Fixture providing parser function for error lines. 

40 """ 

41 lines = ["ERROR: first", "OK", "ERROR: second"] 

42 results = list(stream_text_lines(lines, parse_error_line)) 

43 

44 assert_that(results).is_length(2) 

45 

46 

47def test_strips_ansi_by_default( 

48 identity_line_parser: Callable[[str], SimpleIssue], 

49) -> None: 

50 """Strip ANSI codes by default. 

51 

52 Args: 

53 identity_line_parser: Fixture providing identity parser for lines. 

54 """ 

55 output = "\x1b[31mError\x1b[0m: message\n" 

56 results = list(stream_text_lines(output, identity_line_parser)) 

57 

58 assert_that(results[0].message).is_equal_to("Error: message") 

59 

60 

61def test_preserves_ansi_when_disabled( 

62 identity_line_parser: Callable[[str], SimpleIssue], 

63) -> None: 

64 """Preserve ANSI codes when strip_ansi=False. 

65 

66 Args: 

67 identity_line_parser: Fixture providing identity parser for lines. 

68 """ 

69 output = "\x1b[31mError\x1b[0m\n" 

70 results = list(stream_text_lines(output, identity_line_parser, strip_ansi=False)) 

71 

72 assert_that(results[0].message).contains("\x1b[31m") 

73 

74 

75def test_skips_empty_lines( 

76 identity_line_parser: Callable[[str], SimpleIssue], 

77) -> None: 

78 """Skip empty lines. 

79 

80 Args: 

81 identity_line_parser: Fixture providing identity parser for lines. 

82 """ 

83 output = "line1\n\n\nline2\n" 

84 results = list(stream_text_lines(output, identity_line_parser)) 

85 

86 assert_that(results).is_length(2)