Coverage for tests / unit / utils / async_tool_executor / test_init.py: 100%

17 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-03 18:53 +0000

1"""Tests for AsyncToolExecutor initialization.""" 

2 

3from __future__ import annotations 

4 

5import os 

6 

7from assertpy import assert_that 

8 

9from lintro.utils.async_tool_executor import AsyncToolExecutor 

10 

11 

12def test_executor_initializes_with_default_workers() -> None: 

13 """Test executor initializes with default max_workers based on CPU count.""" 

14 exec_instance = AsyncToolExecutor() 

15 

16 try: 

17 expected_workers = max(1, min(os.cpu_count() or 4, 32)) 

18 assert_that(exec_instance.max_workers).is_equal_to(expected_workers) 

19 assert_that(exec_instance._executor).is_not_none() 

20 finally: 

21 exec_instance.shutdown() 

22 

23 

24def test_executor_initializes_with_custom_workers() -> None: 

25 """Test executor respects custom max_workers value.""" 

26 exec_instance = AsyncToolExecutor(max_workers=8) 

27 

28 try: 

29 assert_that(exec_instance.max_workers).is_equal_to(8) 

30 assert_that(exec_instance._executor).is_not_none() 

31 finally: 

32 exec_instance.shutdown()