Coverage for lintro / ai / providers / stream_result.py: 100%

21 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""AI provider streaming result wrapper. 

2 

3Contains the ``AIStreamResult`` dataclass that wraps a token iterator 

4and provides finalized metadata after the stream is exhausted. 

5""" 

6 

7from __future__ import annotations 

8 

9from collections.abc import Callable, Iterator 

10from dataclasses import dataclass, field 

11 

12from lintro.ai.providers.response import AIResponse 

13 

14 

15@dataclass 

16class AIStreamResult: 

17 """Wraps a token iterator and provides finalized metadata after exhaustion.""" 

18 

19 _chunks: Iterator[str] 

20 _on_done: Callable[[], AIResponse] 

21 _consumed: bool = field(default=False, init=False) 

22 

23 def __iter__(self) -> Iterator[str]: 

24 """Yield text chunks from the underlying iterator.""" 

25 try: 

26 yield from self._chunks 

27 finally: 

28 self._consumed = True 

29 

30 def response(self) -> AIResponse: 

31 """Return the finalized AIResponse. 

32 

33 Only valid after iteration completes. 

34 

35 Returns: 

36 The finalized AIResponse with usage metadata. 

37 """ 

38 return self._on_done() 

39 

40 def collect(self) -> AIResponse: 

41 """Consume all tokens and return the complete AIResponse. 

42 

43 May only be called once -- a second call raises ``RuntimeError`` 

44 because the underlying iterator has already been exhausted. 

45 

46 Returns: 

47 AIResponse with concatenated content and usage metadata. 

48 

49 Raises: 

50 RuntimeError: If the stream has already been consumed. 

51 """ 

52 if self._consumed: 

53 raise RuntimeError("AIStreamResult already consumed") 

54 content = "".join(self) 

55 resp = self.response() 

56 return AIResponse( 

57 content=content, 

58 model=resp.model, 

59 input_tokens=resp.input_tokens, 

60 output_tokens=resp.output_tokens, 

61 cost_estimate=resp.cost_estimate, 

62 provider=resp.provider, 

63 )