Coverage for lintro / ai / providers / stream_result.py: 100%
21 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-03 18:53 +0000
1"""AI provider streaming result wrapper.
3Contains the ``AIStreamResult`` dataclass that wraps a token iterator
4and provides finalized metadata after the stream is exhausted.
5"""
7from __future__ import annotations
9from collections.abc import Callable, Iterator
10from dataclasses import dataclass, field
12from lintro.ai.providers.response import AIResponse
15@dataclass
16class AIStreamResult:
17 """Wraps a token iterator and provides finalized metadata after exhaustion."""
19 _chunks: Iterator[str]
20 _on_done: Callable[[], AIResponse]
21 _consumed: bool = field(default=False, init=False)
23 def __iter__(self) -> Iterator[str]:
24 """Yield text chunks from the underlying iterator."""
25 try:
26 yield from self._chunks
27 finally:
28 self._consumed = True
30 def response(self) -> AIResponse:
31 """Return the finalized AIResponse.
33 Only valid after iteration completes.
35 Returns:
36 The finalized AIResponse with usage metadata.
37 """
38 return self._on_done()
40 def collect(self) -> AIResponse:
41 """Consume all tokens and return the complete AIResponse.
43 May only be called once -- a second call raises ``RuntimeError``
44 because the underlying iterator has already been exhausted.
46 Returns:
47 AIResponse with concatenated content and usage metadata.
49 Raises:
50 RuntimeError: If the stream has already been consumed.
51 """
52 if self._consumed:
53 raise RuntimeError("AIStreamResult already consumed")
54 content = "".join(self)
55 resp = self.response()
56 return AIResponse(
57 content=content,
58 model=resp.model,
59 input_tokens=resp.input_tokens,
60 output_tokens=resp.output_tokens,
61 cost_estimate=resp.cost_estimate,
62 provider=resp.provider,
63 )