|
1 | 1 | import asyncio |
2 | | -from typing import Any, Callable |
| 2 | +from abc import ABC, abstractmethod |
| 3 | +from collections.abc import Awaitable, Callable, Coroutine |
| 4 | +from typing import Any |
3 | 5 |
|
4 | 6 |
|
5 | | -def sync_to_async_func(sync_func: Callable) -> Callable: |
6 | | - """ |
7 | | - 同期関数を非同期関数として使えるように変換する |
8 | | - """ |
| 7 | +def sync_to_async_func[R](sync_func: Callable[..., R]) -> Callable[..., Awaitable[R]]: |
| 8 | + """Convert a synchronous callable into an asynchronous callable.""" |
9 | 9 |
|
10 | | - async def wrapper(*args: Any, **kwargs: Any) -> Any: |
| 10 | + async def wrapper(*args: object, **kwargs: object) -> R: |
11 | 11 | return await asyncio.to_thread(sync_func, *args, **kwargs) |
12 | 12 |
|
13 | 13 | wrapper.__name__ = sync_func.__name__ |
14 | 14 | wrapper.__doc__ = sync_func.__doc__ |
15 | 15 | return wrapper |
16 | 16 |
|
17 | 17 |
|
18 | | -def async_to_sync_func(async_func: Callable) -> Callable: |
19 | | - """ |
20 | | - 非同期関数を同期関数として使えるように変換する |
21 | | - """ |
| 18 | +def async_to_sync_func[R](async_func: Callable[..., Coroutine[Any, Any, R]]) -> Callable[..., R]: |
| 19 | + """Convert an asynchronous callable into a synchronous callable.""" |
22 | 20 |
|
23 | | - def wrapper(*args: Any, **kwargs: Any) -> Any: |
| 21 | + def wrapper(*args: object, **kwargs: object) -> R: |
24 | 22 | return asyncio.run(async_func(*args, **kwargs)) |
25 | 23 |
|
26 | 24 | wrapper.__name__ = async_func.__name__ |
27 | 25 | wrapper.__doc__ = async_func.__doc__ |
28 | 26 | return wrapper |
29 | 27 |
|
30 | 28 |
|
31 | | -async def run_async_function_with_semaphore( |
32 | | - async_func: Callable, concurrency_sema: asyncio.Semaphore | None, *args: Any, **kwargs: Any |
33 | | -) -> Any: |
34 | | - """ |
35 | | - 指定した関数 func を、セマフォで同時実行数を制限して呼び出す関数。 |
36 | | - concurrency_sema が None の場合は制限しない。 |
37 | | - """ |
| 29 | +async def run_async_function_with_semaphore[R]( |
| 30 | + async_func: Callable[..., Awaitable[R]], |
| 31 | + concurrency_sema: asyncio.Semaphore | None, |
| 32 | + *args: object, |
| 33 | + **kwargs: object, |
| 34 | +) -> R: |
| 35 | + """Execute async_func with an optional semaphore limiting concurrency.""" |
38 | 36 | if concurrency_sema is not None: |
39 | 37 | async with concurrency_sema: |
40 | 38 | return await async_func(*args, **kwargs) |
41 | | - else: |
42 | | - return await async_func(*args, **kwargs) |
| 39 | + return await async_func(*args, **kwargs) |
| 40 | + |
| 41 | + |
| 42 | +class AsyncResource[R](ABC): |
| 43 | + """Base class for async resources protected by a semaphore.""" |
| 44 | + |
| 45 | + def __init__(self, concurrency: int = 1) -> None: |
| 46 | + self.semaphore = asyncio.Semaphore(concurrency) |
| 47 | + |
| 48 | + async def task(self, *args: object, **kwargs: object) -> R: |
| 49 | + async with self.semaphore: |
| 50 | + return await self.call(*args, **kwargs) |
| 51 | + |
| 52 | + @abstractmethod |
| 53 | + async def call(self, *args: object, **kwargs: object) -> R: |
| 54 | + """Execute the concrete asynchronous operation.""" |
| 55 | + raise NotImplementedError |
0 commit comments