Coverage for lintro / utils / file_cache.py: 81%
107 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""File fingerprint caching for incremental checks.
3This module provides functionality to cache file metadata (mtime, size) to enable
4incremental linting - only checking files that have changed since the last run.
5"""
7from __future__ import annotations
9import json
10import tempfile
11from dataclasses import asdict, dataclass, field
12from pathlib import Path
13from typing import Any
15from loguru import logger
17# Cache directory location
18CACHE_DIR = Path.home() / ".lintro" / "cache"
21@dataclass
22class FileFingerprint:
23 """Fingerprint of a file for change detection.
25 Attributes:
26 path: Absolute path to the file.
27 mtime: Last modification time (seconds since epoch).
28 size: File size in bytes.
29 """
31 path: str
32 mtime: float
33 size: int
35 def to_dict(self) -> dict[str, Any]:
36 """Convert to dictionary for JSON serialization.
38 Returns:
39 Dictionary representation of the fingerprint.
40 """
41 return asdict(self)
43 @classmethod
44 def from_dict(cls, data: dict[str, Any]) -> FileFingerprint:
45 """Create from dictionary.
47 Args:
48 data: Dictionary with path, mtime, and size keys.
50 Returns:
51 FileFingerprint instance created from the dictionary.
52 """
53 return cls(
54 path=data["path"],
55 mtime=data["mtime"],
56 size=data["size"],
57 )
60@dataclass
61class ToolCache:
62 """Cache of file fingerprints for a specific tool.
64 Attributes:
65 tool_name: Name of the tool this cache is for.
66 fingerprints: Dictionary mapping file paths to their fingerprints.
67 """
69 tool_name: str
70 fingerprints: dict[str, FileFingerprint] = field(default_factory=dict)
72 def get_changed_files(self, files: list[str]) -> list[str]:
73 """Return only files that have changed since last run.
75 A file is considered changed if:
76 - It's new (not in cache)
77 - Its mtime has changed
78 - Its size has changed
80 Args:
81 files: List of absolute file paths to check.
83 Returns:
84 List of file paths that have changed.
85 """
86 changed: list[str] = []
88 for file_path in files:
89 path = Path(file_path)
90 if not path.exists():
91 continue
93 try:
94 stat = path.stat()
95 except OSError as e:
96 logger.debug(f"Could not stat {file_path}: {e}")
97 changed.append(file_path)
98 continue
100 cached = self.fingerprints.get(file_path)
102 if cached is None:
103 # New file not in cache
104 changed.append(file_path)
105 elif cached.mtime != stat.st_mtime or cached.size != stat.st_size:
106 # File has been modified
107 changed.append(file_path)
108 # else: file unchanged, skip it
110 return changed
112 def update(self, files: list[str]) -> None:
113 """Update cache with current file states.
115 Args:
116 files: List of file paths to update in cache.
117 """
118 for file_path in files:
119 path = Path(file_path)
120 if not path.exists():
121 # Remove from cache if file no longer exists
122 self.fingerprints.pop(file_path, None)
123 continue
125 try:
126 stat = path.stat()
127 self.fingerprints[file_path] = FileFingerprint(
128 path=file_path,
129 mtime=stat.st_mtime,
130 size=stat.st_size,
131 )
132 except OSError as e:
133 logger.debug(f"Could not update cache for {file_path}: {e}")
135 def save(self) -> None:
136 """Persist cache to disk using atomic write.
138 Uses temp file + rename pattern to prevent corruption if write fails.
139 """
140 cache_file = CACHE_DIR / f"{self.tool_name}.json"
141 cache_file.parent.mkdir(parents=True, exist_ok=True)
143 try:
144 data = {
145 "tool_name": self.tool_name,
146 "fingerprints": {
147 path: fp.to_dict() for path, fp in self.fingerprints.items()
148 },
149 }
150 # Write to temp file first, then atomically rename
151 # This prevents corruption if the write is interrupted
152 with tempfile.NamedTemporaryFile(
153 mode="w",
154 encoding="utf-8",
155 dir=cache_file.parent,
156 suffix=".tmp",
157 delete=False,
158 ) as tmp_file:
159 json.dump(data, tmp_file, indent=2)
160 tmp_path = Path(tmp_file.name)
162 # Atomic rename (on POSIX systems)
163 tmp_path.replace(cache_file)
165 logger.debug(
166 f"Saved cache for {self.tool_name} ({len(self.fingerprints)} files)",
167 )
168 except (OSError, TypeError, ValueError) as e:
169 logger.warning(f"Could not save cache for {self.tool_name}: {e}")
170 # Clean up temp file if it exists
171 if "tmp_path" in locals() and tmp_path.exists():
172 tmp_path.unlink(missing_ok=True)
174 @classmethod
175 def load(cls, tool_name: str) -> ToolCache:
176 """Load cache from disk.
178 Args:
179 tool_name: Name of the tool to load cache for.
181 Returns:
182 Loaded cache, or empty cache if file doesn't exist.
183 """
184 cache_file = CACHE_DIR / f"{tool_name}.json"
186 if not cache_file.exists():
187 return cls(tool_name=tool_name)
189 try:
190 with cache_file.open("r", encoding="utf-8") as f:
191 data = json.load(f)
193 fingerprints = {
194 path: FileFingerprint.from_dict(fp_data)
195 for path, fp_data in data.get("fingerprints", {}).items()
196 }
198 cache = cls(tool_name=tool_name, fingerprints=fingerprints)
199 logger.debug(f"Loaded cache for {tool_name} ({len(fingerprints)} files)")
200 return cache
201 except (OSError, json.JSONDecodeError, KeyError, TypeError) as e:
202 logger.debug(f"Could not load cache for {tool_name}: {e}")
203 return cls(tool_name=tool_name)
205 def clear(self) -> None:
206 """Clear all cached fingerprints."""
207 self.fingerprints.clear()
208 logger.debug(f"Cleared cache for {self.tool_name}")
211def clear_all_caches() -> None:
212 """Clear all tool caches."""
213 if CACHE_DIR.exists():
214 for cache_file in CACHE_DIR.glob("*.json"):
215 try:
216 cache_file.unlink()
217 logger.debug(f"Deleted cache file: {cache_file}")
218 except OSError as e:
219 logger.warning(f"Could not delete {cache_file}: {e}")
220 logger.info("Cleared all incremental check caches")
221 else:
222 logger.debug("No cache directory to clear")
225def get_cache_stats() -> dict[str, int]:
226 """Get statistics about cached files.
228 Returns:
229 Dictionary with tool names and their cached file counts.
230 """
231 stats: dict[str, int] = {}
233 if not CACHE_DIR.exists():
234 return stats
236 for cache_file in CACHE_DIR.glob("*.json"):
237 try:
238 with cache_file.open("r", encoding="utf-8") as f:
239 data = json.load(f)
240 tool_name = data.get("tool_name", cache_file.stem)
241 count = len(data.get("fingerprints", {}))
242 stats[tool_name] = count
243 except (OSError, json.JSONDecodeError):
244 pass
246 return stats