Coverage for lintro / utils / file_cache.py: 81%

107 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""File fingerprint caching for incremental checks. 

2 

3This module provides functionality to cache file metadata (mtime, size) to enable 

4incremental linting - only checking files that have changed since the last run. 

5""" 

6 

7from __future__ import annotations 

8 

9import json 

10import tempfile 

11from dataclasses import asdict, dataclass, field 

12from pathlib import Path 

13from typing import Any 

14 

15from loguru import logger 

16 

17# Cache directory location 

18CACHE_DIR = Path.home() / ".lintro" / "cache" 

19 

20 

21@dataclass 

22class FileFingerprint: 

23 """Fingerprint of a file for change detection. 

24 

25 Attributes: 

26 path: Absolute path to the file. 

27 mtime: Last modification time (seconds since epoch). 

28 size: File size in bytes. 

29 """ 

30 

31 path: str 

32 mtime: float 

33 size: int 

34 

35 def to_dict(self) -> dict[str, Any]: 

36 """Convert to dictionary for JSON serialization. 

37 

38 Returns: 

39 Dictionary representation of the fingerprint. 

40 """ 

41 return asdict(self) 

42 

43 @classmethod 

44 def from_dict(cls, data: dict[str, Any]) -> FileFingerprint: 

45 """Create from dictionary. 

46 

47 Args: 

48 data: Dictionary with path, mtime, and size keys. 

49 

50 Returns: 

51 FileFingerprint instance created from the dictionary. 

52 """ 

53 return cls( 

54 path=data["path"], 

55 mtime=data["mtime"], 

56 size=data["size"], 

57 ) 

58 

59 

60@dataclass 

61class ToolCache: 

62 """Cache of file fingerprints for a specific tool. 

63 

64 Attributes: 

65 tool_name: Name of the tool this cache is for. 

66 fingerprints: Dictionary mapping file paths to their fingerprints. 

67 """ 

68 

69 tool_name: str 

70 fingerprints: dict[str, FileFingerprint] = field(default_factory=dict) 

71 

72 def get_changed_files(self, files: list[str]) -> list[str]: 

73 """Return only files that have changed since last run. 

74 

75 A file is considered changed if: 

76 - It's new (not in cache) 

77 - Its mtime has changed 

78 - Its size has changed 

79 

80 Args: 

81 files: List of absolute file paths to check. 

82 

83 Returns: 

84 List of file paths that have changed. 

85 """ 

86 changed: list[str] = [] 

87 

88 for file_path in files: 

89 path = Path(file_path) 

90 if not path.exists(): 

91 continue 

92 

93 try: 

94 stat = path.stat() 

95 except OSError as e: 

96 logger.debug(f"Could not stat {file_path}: {e}") 

97 changed.append(file_path) 

98 continue 

99 

100 cached = self.fingerprints.get(file_path) 

101 

102 if cached is None: 

103 # New file not in cache 

104 changed.append(file_path) 

105 elif cached.mtime != stat.st_mtime or cached.size != stat.st_size: 

106 # File has been modified 

107 changed.append(file_path) 

108 # else: file unchanged, skip it 

109 

110 return changed 

111 

112 def update(self, files: list[str]) -> None: 

113 """Update cache with current file states. 

114 

115 Args: 

116 files: List of file paths to update in cache. 

117 """ 

118 for file_path in files: 

119 path = Path(file_path) 

120 if not path.exists(): 

121 # Remove from cache if file no longer exists 

122 self.fingerprints.pop(file_path, None) 

123 continue 

124 

125 try: 

126 stat = path.stat() 

127 self.fingerprints[file_path] = FileFingerprint( 

128 path=file_path, 

129 mtime=stat.st_mtime, 

130 size=stat.st_size, 

131 ) 

132 except OSError as e: 

133 logger.debug(f"Could not update cache for {file_path}: {e}") 

134 

135 def save(self) -> None: 

136 """Persist cache to disk using atomic write. 

137 

138 Uses temp file + rename pattern to prevent corruption if write fails. 

139 """ 

140 cache_file = CACHE_DIR / f"{self.tool_name}.json" 

141 cache_file.parent.mkdir(parents=True, exist_ok=True) 

142 

143 try: 

144 data = { 

145 "tool_name": self.tool_name, 

146 "fingerprints": { 

147 path: fp.to_dict() for path, fp in self.fingerprints.items() 

148 }, 

149 } 

150 # Write to temp file first, then atomically rename 

151 # This prevents corruption if the write is interrupted 

152 with tempfile.NamedTemporaryFile( 

153 mode="w", 

154 encoding="utf-8", 

155 dir=cache_file.parent, 

156 suffix=".tmp", 

157 delete=False, 

158 ) as tmp_file: 

159 json.dump(data, tmp_file, indent=2) 

160 tmp_path = Path(tmp_file.name) 

161 

162 # Atomic rename (on POSIX systems) 

163 tmp_path.replace(cache_file) 

164 

165 logger.debug( 

166 f"Saved cache for {self.tool_name} ({len(self.fingerprints)} files)", 

167 ) 

168 except (OSError, TypeError, ValueError) as e: 

169 logger.warning(f"Could not save cache for {self.tool_name}: {e}") 

170 # Clean up temp file if it exists 

171 if "tmp_path" in locals() and tmp_path.exists(): 

172 tmp_path.unlink(missing_ok=True) 

173 

174 @classmethod 

175 def load(cls, tool_name: str) -> ToolCache: 

176 """Load cache from disk. 

177 

178 Args: 

179 tool_name: Name of the tool to load cache for. 

180 

181 Returns: 

182 Loaded cache, or empty cache if file doesn't exist. 

183 """ 

184 cache_file = CACHE_DIR / f"{tool_name}.json" 

185 

186 if not cache_file.exists(): 

187 return cls(tool_name=tool_name) 

188 

189 try: 

190 with cache_file.open("r", encoding="utf-8") as f: 

191 data = json.load(f) 

192 

193 fingerprints = { 

194 path: FileFingerprint.from_dict(fp_data) 

195 for path, fp_data in data.get("fingerprints", {}).items() 

196 } 

197 

198 cache = cls(tool_name=tool_name, fingerprints=fingerprints) 

199 logger.debug(f"Loaded cache for {tool_name} ({len(fingerprints)} files)") 

200 return cache 

201 except (OSError, json.JSONDecodeError, KeyError, TypeError) as e: 

202 logger.debug(f"Could not load cache for {tool_name}: {e}") 

203 return cls(tool_name=tool_name) 

204 

205 def clear(self) -> None: 

206 """Clear all cached fingerprints.""" 

207 self.fingerprints.clear() 

208 logger.debug(f"Cleared cache for {self.tool_name}") 

209 

210 

211def clear_all_caches() -> None: 

212 """Clear all tool caches.""" 

213 if CACHE_DIR.exists(): 

214 for cache_file in CACHE_DIR.glob("*.json"): 

215 try: 

216 cache_file.unlink() 

217 logger.debug(f"Deleted cache file: {cache_file}") 

218 except OSError as e: 

219 logger.warning(f"Could not delete {cache_file}: {e}") 

220 logger.info("Cleared all incremental check caches") 

221 else: 

222 logger.debug("No cache directory to clear") 

223 

224 

225def get_cache_stats() -> dict[str, int]: 

226 """Get statistics about cached files. 

227 

228 Returns: 

229 Dictionary with tool names and their cached file counts. 

230 """ 

231 stats: dict[str, int] = {} 

232 

233 if not CACHE_DIR.exists(): 

234 return stats 

235 

236 for cache_file in CACHE_DIR.glob("*.json"): 

237 try: 

238 with cache_file.open("r", encoding="utf-8") as f: 

239 data = json.load(f) 

240 tool_name = data.get("tool_name", cache_file.stem) 

241 count = len(data.get("fingerprints", {})) 

242 stats[tool_name] = count 

243 except (OSError, json.JSONDecodeError): 

244 pass 

245 

246 return stats