Coverage for lintro / ai / providers / anthropic.py: 57%

76 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Anthropic AI provider implementation. 

2 

3Uses the Anthropic Python SDK to communicate with Claude models. 

4Requires the ``anthropic`` package (installed via ``lintro[ai]``). 

5""" 

6 

7from __future__ import annotations 

8 

9from collections.abc import Iterator 

10from contextlib import contextmanager 

11from typing import Any 

12 

13from loguru import logger 

14 

15from lintro.ai.cost import estimate_cost 

16from lintro.ai.exceptions import ( 

17 AIAuthenticationError, 

18 AIProviderError, 

19 AIRateLimitError, 

20) 

21from lintro.ai.providers.base import AIResponse, AIStreamResult, BaseAIProvider 

22from lintro.ai.providers.constants import ( 

23 DEFAULT_MAX_TOKENS, 

24 DEFAULT_PER_CALL_MAX_TOKENS, 

25 DEFAULT_TIMEOUT, 

26) 

27from lintro.ai.registry import PROVIDERS, AIProvider 

28 

29_has_anthropic = False 

30try: 

31 import anthropic 

32 

33 _has_anthropic = True 

34except ImportError: 

35 pass 

36 

37DEFAULT_MODEL = PROVIDERS.anthropic.default_model 

38DEFAULT_API_KEY_ENV = PROVIDERS.anthropic.default_api_key_env 

39 

40 

41class AnthropicProvider(BaseAIProvider): 

42 """Anthropic Claude provider.""" 

43 

44 @staticmethod 

45 @contextmanager 

46 def _map_errors() -> Iterator[None]: 

47 """Map Anthropic SDK exceptions to AI exceptions. 

48 

49 Safe to call only when the ``anthropic`` SDK is installed — 

50 the base class ``__init__`` raises ``AINotAvailableError`` 

51 before any method can be called if the SDK is missing. 

52 """ 

53 try: 

54 yield 

55 except anthropic.AuthenticationError as e: 

56 raise AIAuthenticationError( 

57 f"Anthropic authentication failed: {e}", 

58 ) from e 

59 except anthropic.RateLimitError as e: 

60 raise AIRateLimitError( 

61 f"Anthropic rate limit exceeded: {e}", 

62 ) from e 

63 except anthropic.AnthropicError as e: 

64 logger.debug(f"Anthropic API error: {e}") 

65 raise AIProviderError( 

66 f"Anthropic API error: {e}", 

67 ) from e 

68 

69 def __init__( 

70 self, 

71 *, 

72 model: str | None = None, 

73 api_key_env: str | None = None, 

74 max_tokens: int = DEFAULT_MAX_TOKENS, 

75 base_url: str | None = None, 

76 ) -> None: 

77 """Initialize the Anthropic provider. 

78 

79 Args: 

80 model: Model identifier. Defaults to claude-sonnet-4-6. 

81 api_key_env: Environment variable for API key. 

82 Defaults to ANTHROPIC_API_KEY. 

83 max_tokens: Default max tokens for completions. 

84 base_url: Custom API base URL for Anthropic-compatible 

85 endpoints (proxies, self-hosted, etc.). 

86 """ 

87 super().__init__( 

88 provider_name=AIProvider.ANTHROPIC, 

89 has_sdk=_has_anthropic, 

90 sdk_package="anthropic", 

91 default_model=DEFAULT_MODEL, 

92 default_api_key_env=DEFAULT_API_KEY_ENV, 

93 model=model, 

94 api_key_env=api_key_env, 

95 max_tokens=max_tokens, 

96 base_url=base_url, 

97 ) 

98 

99 def _create_client(self, *, api_key: str) -> Any: 

100 """Create the Anthropic SDK client. 

101 

102 Args: 

103 api_key: The resolved API key. 

104 

105 Returns: 

106 anthropic.Anthropic: The API client. 

107 """ 

108 kwargs: dict[str, Any] = {"api_key": api_key} 

109 if self._base_url: 

110 kwargs["base_url"] = self._base_url 

111 return anthropic.Anthropic(**kwargs) 

112 

113 def complete( 

114 self, 

115 prompt: str, 

116 *, 

117 system: str | None = None, 

118 max_tokens: int = DEFAULT_PER_CALL_MAX_TOKENS, 

119 timeout: float = DEFAULT_TIMEOUT, 

120 ) -> AIResponse: 

121 """Generate a completion using Claude. 

122 

123 Args: 

124 prompt: The user prompt. 

125 system: Optional system prompt. 

126 max_tokens: Maximum tokens to generate. 

127 timeout: Request timeout in seconds. 

128 

129 Returns: 

130 AIResponse: The model's response with usage metadata. 

131 """ 

132 client = self._get_client() 

133 # Per-call cap: the lower of the caller's request and the 

134 # provider-level cap set at init time. 

135 effective_max = min(max_tokens, self._max_tokens) 

136 

137 with self._map_errors(): 

138 kwargs: dict[str, Any] = { 

139 "model": self._model, 

140 "max_tokens": effective_max, 

141 "messages": [{"role": "user", "content": prompt}], 

142 "timeout": timeout, 

143 } 

144 if system: 

145 kwargs["system"] = system 

146 

147 response = client.messages.create(**kwargs) 

148 

149 content = "" 

150 for block in response.content: 

151 if hasattr(block, "text"): 

152 content += block.text 

153 

154 input_tokens = response.usage.input_tokens 

155 output_tokens = response.usage.output_tokens 

156 cost = estimate_cost(self._model, input_tokens, output_tokens) 

157 

158 return AIResponse( 

159 content=content, 

160 model=self._model, 

161 input_tokens=input_tokens, 

162 output_tokens=output_tokens, 

163 cost_estimate=cost, 

164 provider=AIProvider.ANTHROPIC, 

165 ) 

166 

167 def stream_complete( 

168 self, 

169 prompt: str, 

170 *, 

171 system: str | None = None, 

172 max_tokens: int = DEFAULT_PER_CALL_MAX_TOKENS, 

173 timeout: float = DEFAULT_TIMEOUT, 

174 ) -> AIStreamResult: 

175 """Stream a completion from the Anthropic API token-by-token. 

176 

177 Args: 

178 prompt: The user prompt. 

179 system: Optional system prompt. 

180 max_tokens: Maximum tokens to generate. 

181 timeout: Request timeout in seconds. 

182 

183 Returns: 

184 An AIStreamResult wrapping the token stream. 

185 """ 

186 client = self._get_client() 

187 effective_max = min(max_tokens, self._max_tokens) 

188 

189 kwargs: dict[str, Any] = { 

190 "model": self._model, 

191 "max_tokens": effective_max, 

192 "messages": [{"role": "user", "content": prompt}], 

193 "timeout": timeout, 

194 } 

195 if system: 

196 kwargs["system"] = system 

197 

198 logger.debug( 

199 f"Anthropic stream request: model={self._model}, " 

200 f"max_tokens={effective_max}", 

201 ) 

202 

203 final_response: list[AIResponse] = [] 

204 

205 def _generate() -> Iterator[str]: 

206 # Note: mid-stream errors surface as exceptions during 

207 # iteration and are NOT retried because partial content has 

208 # already been yielded to the caller. Only setup failures 

209 # (before the first token) are caught by the fallback chain. 

210 with self._map_errors(): 

211 with client.messages.stream(**kwargs) as stream: 

212 yield from stream.text_stream 

213 final_message = stream.get_final_message() 

214 

215 input_tokens = final_message.usage.input_tokens 

216 output_tokens = final_message.usage.output_tokens 

217 cost = estimate_cost(self._model, input_tokens, output_tokens) 

218 final_response.append( 

219 AIResponse( 

220 content="", 

221 model=self._model, 

222 input_tokens=input_tokens, 

223 output_tokens=output_tokens, 

224 cost_estimate=cost, 

225 provider=AIProvider.ANTHROPIC, 

226 ), 

227 ) 

228 

229 def _on_done() -> AIResponse: 

230 if not final_response: 

231 raise AIProviderError( 

232 "Anthropic stream was not fully consumed", 

233 ) 

234 return final_response[0] 

235 

236 return AIStreamResult( 

237 _chunks=_generate(), 

238 _on_done=_on_done, 

239 )