Source code for a_sync.asyncio.as_completed

"""
This module extends Python's :func:`asyncio.as_completed` with additional functionality.
"""

import asyncio

try:
    from tqdm.asyncio import tqdm_asyncio
except ImportError as e:

    class tqdm_asyncio:  # type: ignore [no-redef]
        @staticmethod
        def as_completed(*args, **kwargs):
            raise ImportError("You must have tqdm installed to use this feature")


from a_sync._typing import *
from a_sync.iter import ASyncIterator


@overload
def as_completed(
    fs: Iterable[Awaitable[T]],
    *,
    timeout: Optional[float] = None,
    return_exceptions: bool = False,
    aiter: Literal[False] = False,
    tqdm: bool = False,
    **tqdm_kwargs: Any
) -> Iterator[Coroutine[Any, Any, T]]: ...
@overload
def as_completed(
    fs: Iterable[Awaitable[T]],
    *,
    timeout: Optional[float] = None,
    return_exceptions: bool = False,
    aiter: Literal[True] = True,
    tqdm: bool = False,
    **tqdm_kwargs: Any
) -> ASyncIterator[T]: ...
@overload
def as_completed(
    fs: Mapping[K, Awaitable[V]],
    *,
    timeout: Optional[float] = None,
    return_exceptions: bool = False,
    aiter: Literal[False] = False,
    tqdm: bool = False,
    **tqdm_kwargs: Any
) -> Iterator[Coroutine[Any, Any, Tuple[K, V]]]: ...
@overload
def as_completed(
    fs: Mapping[K, Awaitable[V]],
    *,
    timeout: Optional[float] = None,
    return_exceptions: bool = False,
    aiter: Literal[True] = True,
    tqdm: bool = False,
    **tqdm_kwargs: Any
) -> ASyncIterator[Tuple[K, V]]: ...
[docs] def as_completed( fs, *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any ): """ Concurrently awaits a list of awaitable objects or mappings of awaitables and returns an iterator of results. This function extends Python's :func:`asyncio.as_completed`, providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables. Differences from :func:`asyncio.as_completed`: - Uses type hints for use with static type checkers. - Supports either individual awaitables or a k:v mapping of awaitables. - Can be used as an async iterator which yields the result values using :class:`ASyncIterator`. - Provides progress reporting using :mod:`tqdm` if 'tqdm' is set to True. Note: The `return_exceptions` parameter is used to wrap awaitables with exceptions if set to True, allowing exceptions to be returned as results instead of being raised. Args: fs: The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables. timeout: The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). return_exceptions: If True, exceptions are wrapped and returned as results instead of raising them. Defaults to False. aiter: If True, returns an async iterator of results using :class:`ASyncIterator`. Defaults to False. tqdm: If True, enables progress reporting using :mod:`tqdm`. Defaults to False. **tqdm_kwargs: Additional keyword arguments for :mod:`tqdm` if progress reporting is enabled. Examples: Awaiting individual awaitables: >>> awaitables = [async_function1(), async_function2()] >>> for coro in as_completed(awaitables): ... val = await coro ... ... >>> async for val in as_completed(awaitables, aiter=True): ... ... Awaiting mappings of awaitables: >>> mapping = {'key1': async_function1(), 'key2': async_function2()} >>> for coro in as_completed(mapping): ... k, v = await coro ... ... >>> async for k, v in as_completed(mapping, aiter=True): ... ... See Also: - :func:`asyncio.as_completed` - :class:`ASyncIterator` """ if isinstance(fs, Mapping): return as_completed_mapping( fs, timeout=timeout, return_exceptions=return_exceptions, aiter=aiter, tqdm=tqdm, **tqdm_kwargs ) if return_exceptions: fs = [_exc_wrap(f) for f in fs] return ( ASyncIterator( __yield_as_completed(fs, timeout=timeout, tqdm=tqdm, **tqdm_kwargs) ) if aiter else ( tqdm_asyncio.as_completed(fs, timeout=timeout, **tqdm_kwargs) if tqdm else asyncio.as_completed(fs, timeout=timeout) ) )
@overload def as_completed_mapping( mapping: Mapping[K, Awaitable[V]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[True] = True, tqdm: bool = False, **tqdm_kwargs: Any ) -> ASyncIterator[Tuple[K, V]]: ... @overload def as_completed_mapping( mapping: Mapping[K, Awaitable[V]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: Literal[False] = False, tqdm: bool = False, **tqdm_kwargs: Any ) -> Iterator[Coroutine[Any, Any, Tuple[K, V]]]: ...
[docs] def as_completed_mapping( mapping: Mapping[K, Awaitable[V]], *, timeout: Optional[float] = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any ) -> Union[Iterator[Coroutine[Any, Any, Tuple[K, V]]], ASyncIterator[Tuple[K, V]]]: """ Concurrently awaits a mapping of awaitable objects and returns an iterator or async iterator of results. This function is designed to await a mapping of awaitable objects, where each key-value pair represents a unique awaitable. It enables concurrent execution and gathers results into an iterator or an async iterator. Args: mapping: A dictionary-like object where keys are of type K and values are awaitable objects of type V. timeout: The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). return_exceptions: If True, exceptions are wrapped and returned as results instead of raising them. Defaults to False. aiter: If True, returns an async iterator of results using :class:`ASyncIterator`. Defaults to False. tqdm: If True, enables progress reporting using :mod:`tqdm`. Defaults to False. **tqdm_kwargs: Additional keyword arguments for :mod:`tqdm` if progress reporting is enabled. Example: >>> mapping = {'key1': async_function1(), 'key2': async_function2()} >>> for coro in as_completed_mapping(mapping): ... k, v = await coro ... ... >>> async for k, v in as_completed_mapping(mapping, aiter=True): ... ... See Also: - :func:`as_completed` - :class:`ASyncIterator` """ return as_completed( [ __mapping_wrap(k, v, return_exceptions=return_exceptions) for k, v in mapping.items() ], timeout=timeout, aiter=aiter, tqdm=tqdm, **tqdm_kwargs )
async def _exc_wrap(awaitable: Awaitable[T]) -> Union[T, Exception]: """Wraps an awaitable to catch exceptions and return them instead of raising. Args: awaitable: The awaitable to wrap. Returns: The result of the awaitable or the exception if one is raised. """ try: return await awaitable except Exception as e: return e async def __yield_as_completed( futs: Iterable[Awaitable[T]], *, timeout: Optional[float] = None, return_exceptions: bool = False, tqdm: bool = False, **tqdm_kwargs: Any ) -> AsyncIterator[T]: """Yields results from awaitables as they complete. Args: futs: The awaitables to await. timeout: The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout). return_exceptions: If True, exceptions are wrapped and returned as results instead of raising them. Defaults to False. tqdm: If True, enables progress reporting using :mod:`tqdm`. Defaults to False. **tqdm_kwargs: Additional keyword arguments for :mod:`tqdm` if progress reporting is enabled. """ for fut in as_completed( futs, timeout=timeout, return_exceptions=return_exceptions, tqdm=tqdm, **tqdm_kwargs ): yield await fut @overload async def __mapping_wrap( k: K, v: Awaitable[V], return_exceptions: Literal[True] = True ) -> Union[V, Exception]: ... @overload async def __mapping_wrap( k: K, v: Awaitable[V], return_exceptions: Literal[False] = False ) -> V: ... async def __mapping_wrap( k: K, v: Awaitable[V], return_exceptions: bool = False ) -> Union[V, Exception]: """Wraps a key-value pair of awaitable to catch exceptions and return them with the key. Args: k: The key associated with the awaitable. v: The awaitable to wrap. return_exceptions: If True, exceptions are wrapped and returned as results instead of raising them. Defaults to False. Returns: A tuple of the key and the result of the awaitable or the exception if one is raised. """ try: return k, await v except Exception as e: if return_exceptions: return k, e raise __all__ = ["as_completed", "as_completed_mapping"]