Coverage for lintro / tools / implementations / pytest / collection.py: 45%
111 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""Utility functions for pytest tool implementation.
3This module contains helper functions extracted from tool_pytest.py to improve
4maintainability and reduce file size. Functions are organized by category:
5- JUnit XML processing
6- Environment and system utilities
7- Flaky test detection
8"""
10import json
11import os
12import xml.etree.ElementTree # nosec B405 - only used for exception type, parsing uses defusedxml
13from pathlib import Path
15from loguru import logger
17from lintro.enums.pytest_enums import PytestParallelPreset
18from lintro.parsers.pytest.pytest_issue import PytestIssue
20# Constants for flaky test detection
21PYTEST_FLAKY_CACHE_FILE: str = ".pytest_cache/lintro_flaky_tests.json"
22PYTEST_FLAKY_MIN_RUNS: int = 3 # Minimum runs before detecting flaky tests
23PYTEST_FLAKY_FAILURE_RATE: float = 0.3 # Consider flaky if fails >= 30% but < 100%
26def extract_all_test_results_from_junit(junitxml_path: str) -> dict[str, str] | None:
27 """Extract all test results from JUnit XML file.
29 Args:
30 junitxml_path: Path to JUnit XML file.
32 Returns:
33 dict[str, str] | None: Dictionary mapping node_id to status
34 (PASSED/FAILED/ERROR), or None if file doesn't exist or can't be parsed.
35 """
36 xml_path = Path(junitxml_path)
37 if not xml_path.exists():
38 return None
40 try:
41 from defusedxml import ElementTree
43 tree = ElementTree.parse(xml_path)
44 root = tree.getroot()
45 if root is None:
46 return None
48 test_results: dict[str, str] = {}
50 for testcase in root.findall(".//testcase"):
51 file_path = testcase.get("file", "")
52 class_name = testcase.get("classname", "")
53 test_name = testcase.get("name", "")
54 if file_path:
55 if class_name:
56 node_id = f"{file_path}::{class_name}::{test_name}"
57 else:
58 node_id = f"{file_path}::{test_name}"
59 else:
60 node_id = f"{class_name}::{test_name}" if class_name else test_name
62 # Determine status
63 if testcase.find("failure") is not None:
64 status = "FAILED"
65 elif testcase.find("error") is not None:
66 status = "ERROR"
67 elif testcase.find("skipped") is not None:
68 status = "SKIPPED"
69 else:
70 status = "PASSED"
72 test_results[node_id] = status
74 return test_results
75 except (
76 ImportError,
77 OSError,
78 xml.etree.ElementTree.ParseError,
79 KeyError,
80 AttributeError,
81 ) as e:
82 logger.debug(f"Failed to parse JUnit XML for all tests: {e}")
83 return None
86def get_cpu_count() -> int:
87 """Get the number of available CPU cores.
89 Returns:
90 int: Number of CPU cores, minimum 1.
91 """
92 try:
93 import multiprocessing
95 return max(1, multiprocessing.cpu_count())
96 except (OSError, ValueError, NotImplementedError):
97 return 1
100def get_parallel_workers_from_preset(
101 preset: str,
102 test_count: int | None = None,
103) -> str:
104 """Convert parallel preset to worker count.
106 Args:
107 preset: Preset name (auto, small, medium, large) or number as string.
108 test_count: Optional test count for dynamic presets.
110 Returns:
111 str: Worker count string for pytest-xdist (-n flag).
113 Raises:
114 ValueError: If preset is invalid.
115 """
116 preset_lower = preset.lower()
118 if preset_lower == PytestParallelPreset.AUTO:
119 return "auto"
120 elif preset_lower == PytestParallelPreset.SMALL:
121 return "2"
122 elif preset_lower == PytestParallelPreset.MEDIUM:
123 return "4"
124 elif preset_lower == PytestParallelPreset.LARGE:
125 cpu_count = get_cpu_count()
126 # Use up to 8 workers for large suites, but not more than CPU count
127 return str(min(8, cpu_count))
128 elif preset_lower.isdigit():
129 # Already a number, return as-is
130 return preset
131 else:
132 raise ValueError(
133 f"Invalid parallel preset: {preset}. "
134 "Must be one of: auto, small, medium, large, or a number",
135 )
138def is_ci_environment() -> bool:
139 """Detect if running in a CI/CD environment.
141 Checks for common CI environment variables:
142 - CI (generic CI indicator)
143 - GITHUB_ACTIONS (GitHub Actions)
144 - GITLAB_CI (GitLab CI)
145 - JENKINS_URL (Jenkins)
146 - CIRCLE_CI (CircleCI)
147 - TRAVIS (Travis CI)
148 - AZURE_HTTP_USER_AGENT (Azure DevOps)
149 - TEAMCITY_VERSION (TeamCity)
150 - BUILDKITE (Buildkite)
151 - DRONE (Drone CI)
153 Returns:
154 bool: True if running in CI environment, False otherwise.
155 """
156 ci_indicators = [
157 "CI",
158 "GITHUB_ACTIONS",
159 "GITLAB_CI",
160 "JENKINS_URL",
161 "CIRCLE_CI",
162 "CIRCLECI",
163 "TRAVIS",
164 "AZURE_HTTP_USER_AGENT",
165 "TEAMCITY_VERSION",
166 "BUILDKITE",
167 "DRONE",
168 ]
169 return any(
170 os.environ.get(indicator, "").lower() not in ("", "false", "0")
171 for indicator in ci_indicators
172 )
175def get_flaky_cache_path() -> Path:
176 """Get the path to the flaky test cache file.
178 Returns:
179 Path: Path to the cache file.
180 """
181 cache_path = Path(PYTEST_FLAKY_CACHE_FILE)
182 cache_path.parent.mkdir(exist_ok=True)
183 return cache_path
186def load_flaky_test_history() -> dict[str, dict[str, int]]:
187 """Load flaky test history from cache file.
189 Returns:
190 dict[str, dict[str, int]]: Dictionary mapping test node_id to status counts.
191 Format: {node_id: {"passed": count, "failed": count, "error": count}}
192 """
193 cache_path = get_flaky_cache_path()
194 if not cache_path.exists():
195 return {}
197 try:
198 with open(cache_path, encoding="utf-8") as f:
199 data: dict[str, dict[str, int]] = json.load(f)
200 return data
201 except (json.JSONDecodeError, OSError) as e:
202 logger.debug(f"Failed to load flaky test history: {e}")
203 return {}
206def save_flaky_test_history(history: dict[str, dict[str, int]]) -> None:
207 """Save flaky test history to cache file.
209 Args:
210 history: Dictionary mapping test node_id to status counts.
211 """
212 cache_path = get_flaky_cache_path()
213 try:
214 # Ensure parent directory exists before writing
215 cache_path.parent.mkdir(parents=True, exist_ok=True)
216 with open(cache_path, "w", encoding="utf-8") as f:
217 json.dump(history, f, indent=2)
218 except OSError as e:
219 logger.debug(f"Failed to save flaky test history: {e}")
222def compute_updated_flaky_test_history(
223 issues: list[PytestIssue],
224 all_test_results: dict[str, str] | None = None,
225) -> dict[str, dict[str, int]]:
226 """Update flaky test history with current test results.
228 Args:
229 issues: List of parsed test issues (failures/errors).
230 all_test_results: Optional dictionary mapping node_id to status for all tests.
231 If None, only tracks failures from issues.
233 Returns:
234 Dictionary mapping test node IDs to their pass/fail/error counts.
235 Format: {node_id: {"passed": count, "failed": count, "error": count}}
236 """
237 history = load_flaky_test_history()
239 # If we have full test results (e.g., from JUnit XML), use those
240 if all_test_results:
241 for node_id, status in all_test_results.items():
242 if node_id not in history:
243 history[node_id] = {"passed": 0, "failed": 0, "error": 0}
245 if status == "FAILED":
246 history[node_id]["failed"] += 1
247 elif status == "ERROR":
248 history[node_id]["error"] += 1
249 elif status == "PASSED":
250 history[node_id]["passed"] += 1
251 else:
252 # Only track failures from issues (simpler but less accurate)
253 for issue in issues:
254 # Skip Mock objects in tests - only process real PytestIssue objects
255 if not isinstance(issue, PytestIssue):
256 continue
257 if issue.node_id and isinstance(issue.node_id, str):
258 if issue.node_id not in history:
259 history[issue.node_id] = {"passed": 0, "failed": 0, "error": 0}
261 if issue.test_status == "FAILED":
262 history[issue.node_id]["failed"] += 1
263 elif issue.test_status == "ERROR":
264 history[issue.node_id]["error"] += 1
266 return history