Coverage for lintro / tools / implementations / pytest / collection.py: 45%

111 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Utility functions for pytest tool implementation. 

2 

3This module contains helper functions extracted from tool_pytest.py to improve 

4maintainability and reduce file size. Functions are organized by category: 

5- JUnit XML processing 

6- Environment and system utilities 

7- Flaky test detection 

8""" 

9 

10import json 

11import os 

12import xml.etree.ElementTree # nosec B405 - only used for exception type, parsing uses defusedxml 

13from pathlib import Path 

14 

15from loguru import logger 

16 

17from lintro.enums.pytest_enums import PytestParallelPreset 

18from lintro.parsers.pytest.pytest_issue import PytestIssue 

19 

20# Constants for flaky test detection 

21PYTEST_FLAKY_CACHE_FILE: str = ".pytest_cache/lintro_flaky_tests.json" 

22PYTEST_FLAKY_MIN_RUNS: int = 3 # Minimum runs before detecting flaky tests 

23PYTEST_FLAKY_FAILURE_RATE: float = 0.3 # Consider flaky if fails >= 30% but < 100% 

24 

25 

26def extract_all_test_results_from_junit(junitxml_path: str) -> dict[str, str] | None: 

27 """Extract all test results from JUnit XML file. 

28 

29 Args: 

30 junitxml_path: Path to JUnit XML file. 

31 

32 Returns: 

33 dict[str, str] | None: Dictionary mapping node_id to status 

34 (PASSED/FAILED/ERROR), or None if file doesn't exist or can't be parsed. 

35 """ 

36 xml_path = Path(junitxml_path) 

37 if not xml_path.exists(): 

38 return None 

39 

40 try: 

41 from defusedxml import ElementTree 

42 

43 tree = ElementTree.parse(xml_path) 

44 root = tree.getroot() 

45 if root is None: 

46 return None 

47 

48 test_results: dict[str, str] = {} 

49 

50 for testcase in root.findall(".//testcase"): 

51 file_path = testcase.get("file", "") 

52 class_name = testcase.get("classname", "") 

53 test_name = testcase.get("name", "") 

54 if file_path: 

55 if class_name: 

56 node_id = f"{file_path}::{class_name}::{test_name}" 

57 else: 

58 node_id = f"{file_path}::{test_name}" 

59 else: 

60 node_id = f"{class_name}::{test_name}" if class_name else test_name 

61 

62 # Determine status 

63 if testcase.find("failure") is not None: 

64 status = "FAILED" 

65 elif testcase.find("error") is not None: 

66 status = "ERROR" 

67 elif testcase.find("skipped") is not None: 

68 status = "SKIPPED" 

69 else: 

70 status = "PASSED" 

71 

72 test_results[node_id] = status 

73 

74 return test_results 

75 except ( 

76 ImportError, 

77 OSError, 

78 xml.etree.ElementTree.ParseError, 

79 KeyError, 

80 AttributeError, 

81 ) as e: 

82 logger.debug(f"Failed to parse JUnit XML for all tests: {e}") 

83 return None 

84 

85 

86def get_cpu_count() -> int: 

87 """Get the number of available CPU cores. 

88 

89 Returns: 

90 int: Number of CPU cores, minimum 1. 

91 """ 

92 try: 

93 import multiprocessing 

94 

95 return max(1, multiprocessing.cpu_count()) 

96 except (OSError, ValueError, NotImplementedError): 

97 return 1 

98 

99 

100def get_parallel_workers_from_preset( 

101 preset: str, 

102 test_count: int | None = None, 

103) -> str: 

104 """Convert parallel preset to worker count. 

105 

106 Args: 

107 preset: Preset name (auto, small, medium, large) or number as string. 

108 test_count: Optional test count for dynamic presets. 

109 

110 Returns: 

111 str: Worker count string for pytest-xdist (-n flag). 

112 

113 Raises: 

114 ValueError: If preset is invalid. 

115 """ 

116 preset_lower = preset.lower() 

117 

118 if preset_lower == PytestParallelPreset.AUTO: 

119 return "auto" 

120 elif preset_lower == PytestParallelPreset.SMALL: 

121 return "2" 

122 elif preset_lower == PytestParallelPreset.MEDIUM: 

123 return "4" 

124 elif preset_lower == PytestParallelPreset.LARGE: 

125 cpu_count = get_cpu_count() 

126 # Use up to 8 workers for large suites, but not more than CPU count 

127 return str(min(8, cpu_count)) 

128 elif preset_lower.isdigit(): 

129 # Already a number, return as-is 

130 return preset 

131 else: 

132 raise ValueError( 

133 f"Invalid parallel preset: {preset}. " 

134 "Must be one of: auto, small, medium, large, or a number", 

135 ) 

136 

137 

138def is_ci_environment() -> bool: 

139 """Detect if running in a CI/CD environment. 

140 

141 Checks for common CI environment variables: 

142 - CI (generic CI indicator) 

143 - GITHUB_ACTIONS (GitHub Actions) 

144 - GITLAB_CI (GitLab CI) 

145 - JENKINS_URL (Jenkins) 

146 - CIRCLE_CI (CircleCI) 

147 - TRAVIS (Travis CI) 

148 - AZURE_HTTP_USER_AGENT (Azure DevOps) 

149 - TEAMCITY_VERSION (TeamCity) 

150 - BUILDKITE (Buildkite) 

151 - DRONE (Drone CI) 

152 

153 Returns: 

154 bool: True if running in CI environment, False otherwise. 

155 """ 

156 ci_indicators = [ 

157 "CI", 

158 "GITHUB_ACTIONS", 

159 "GITLAB_CI", 

160 "JENKINS_URL", 

161 "CIRCLE_CI", 

162 "CIRCLECI", 

163 "TRAVIS", 

164 "AZURE_HTTP_USER_AGENT", 

165 "TEAMCITY_VERSION", 

166 "BUILDKITE", 

167 "DRONE", 

168 ] 

169 return any( 

170 os.environ.get(indicator, "").lower() not in ("", "false", "0") 

171 for indicator in ci_indicators 

172 ) 

173 

174 

175def get_flaky_cache_path() -> Path: 

176 """Get the path to the flaky test cache file. 

177 

178 Returns: 

179 Path: Path to the cache file. 

180 """ 

181 cache_path = Path(PYTEST_FLAKY_CACHE_FILE) 

182 cache_path.parent.mkdir(exist_ok=True) 

183 return cache_path 

184 

185 

186def load_flaky_test_history() -> dict[str, dict[str, int]]: 

187 """Load flaky test history from cache file. 

188 

189 Returns: 

190 dict[str, dict[str, int]]: Dictionary mapping test node_id to status counts. 

191 Format: {node_id: {"passed": count, "failed": count, "error": count}} 

192 """ 

193 cache_path = get_flaky_cache_path() 

194 if not cache_path.exists(): 

195 return {} 

196 

197 try: 

198 with open(cache_path, encoding="utf-8") as f: 

199 data: dict[str, dict[str, int]] = json.load(f) 

200 return data 

201 except (json.JSONDecodeError, OSError) as e: 

202 logger.debug(f"Failed to load flaky test history: {e}") 

203 return {} 

204 

205 

206def save_flaky_test_history(history: dict[str, dict[str, int]]) -> None: 

207 """Save flaky test history to cache file. 

208 

209 Args: 

210 history: Dictionary mapping test node_id to status counts. 

211 """ 

212 cache_path = get_flaky_cache_path() 

213 try: 

214 # Ensure parent directory exists before writing 

215 cache_path.parent.mkdir(parents=True, exist_ok=True) 

216 with open(cache_path, "w", encoding="utf-8") as f: 

217 json.dump(history, f, indent=2) 

218 except OSError as e: 

219 logger.debug(f"Failed to save flaky test history: {e}") 

220 

221 

222def compute_updated_flaky_test_history( 

223 issues: list[PytestIssue], 

224 all_test_results: dict[str, str] | None = None, 

225) -> dict[str, dict[str, int]]: 

226 """Update flaky test history with current test results. 

227 

228 Args: 

229 issues: List of parsed test issues (failures/errors). 

230 all_test_results: Optional dictionary mapping node_id to status for all tests. 

231 If None, only tracks failures from issues. 

232 

233 Returns: 

234 Dictionary mapping test node IDs to their pass/fail/error counts. 

235 Format: {node_id: {"passed": count, "failed": count, "error": count}} 

236 """ 

237 history = load_flaky_test_history() 

238 

239 # If we have full test results (e.g., from JUnit XML), use those 

240 if all_test_results: 

241 for node_id, status in all_test_results.items(): 

242 if node_id not in history: 

243 history[node_id] = {"passed": 0, "failed": 0, "error": 0} 

244 

245 if status == "FAILED": 

246 history[node_id]["failed"] += 1 

247 elif status == "ERROR": 

248 history[node_id]["error"] += 1 

249 elif status == "PASSED": 

250 history[node_id]["passed"] += 1 

251 else: 

252 # Only track failures from issues (simpler but less accurate) 

253 for issue in issues: 

254 # Skip Mock objects in tests - only process real PytestIssue objects 

255 if not isinstance(issue, PytestIssue): 

256 continue 

257 if issue.node_id and isinstance(issue.node_id, str): 

258 if issue.node_id not in history: 

259 history[issue.node_id] = {"passed": 0, "failed": 0, "error": 0} 

260 

261 if issue.test_status == "FAILED": 

262 history[issue.node_id]["failed"] += 1 

263 elif issue.test_status == "ERROR": 

264 history[issue.node_id]["error"] += 1 

265 

266 return history