a_sync.asyncio package

Submodules

a_sync.asyncio.as_completed module

This module extends Python’s asyncio.as_completed() with additional functionality.

class a_sync.asyncio.as_completed.tqdm_asyncio

Bases: object

static as_completed(*args, **kwargs)
async a_sync.asyncio.as_completed._exc_wrap(awaitable: Awaitable[T]) T | Exception

Wraps an awaitable to catch exceptions and return them instead of raising.

Parameters:

awaitable (Awaitable[T]) – The awaitable to wrap.

Returns:

The result of the awaitable or the exception if one is raised.

Return type:

T | Exception

a_sync.asyncio.as_completed.as_completed(fs, *, timeout: float | None = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any)

Concurrently awaits a list of awaitable objects or mappings of awaitables and returns an iterator of results.

This function extends Python’s asyncio.as_completed(), providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables.

Differences from asyncio.as_completed(): - Uses type hints for use with static type checkers. - Supports either individual awaitables or a k:v mapping of awaitables. - Can be used as an async iterator which yields the result values using ASyncIterator. - Provides progress reporting using tqdm if ‘tqdm’ is set to True.

Note

The return_exceptions parameter is used to wrap awaitables with exceptions if set to True, allowing exceptions to be returned as results instead of being raised.

Parameters:
  • fs – The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables.

  • timeout (float | None) – The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout).

  • return_exceptions (bool) – If True, exceptions are wrapped and returned as results instead of raising them. Defaults to False.

  • aiter (bool) – If True, returns an async iterator of results using ASyncIterator. Defaults to False.

  • tqdm (bool) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs (Any) – Additional keyword arguments for tqdm if progress reporting is enabled.

Examples

Awaiting individual awaitables:

>>> awaitables = [async_function1(), async_function2()]
>>> for coro in as_completed(awaitables):
...     val = await coro
...     ...
>>> async for val in as_completed(awaitables, aiter=True):
...     ...

Awaiting mappings of awaitables:

>>> mapping = {'key1': async_function1(), 'key2': async_function2()}
>>> for coro in as_completed(mapping):
...     k, v = await coro
...     ...
>>> async for k, v in as_completed(mapping, aiter=True):
...     ...

See also

a_sync.asyncio.create_task module

This module extends asyncio.create_task() to support any Awaitable, manage task lifecycle, and enhance error handling.

a_sync.asyncio.create_task.create_task(coro: Awaitable[T], *, unicode name: str = u'', skip_gc_until_done: bint = False, log_destroy_pending: bint = True) 'asyncio.Task[T]'

Extends asyncio.create_task() to support any Awaitable, manage task lifecycle, and enhance error handling.

This function accepts any Awaitable, ensuring broader compatibility. If the Awaitable is not a coroutine, it is awaited directly using a private helper function __await, which can handle non-coroutine Awaitable objects.

Note

The __await function is designed to handle non-coroutine Awaitables by awaiting them directly.

Parameters:
  • coro (Awaitable[T]) – An Awaitable object from which to create the task.

  • name (str) – Optional name for the task, aiding in debugging.

  • skip_gc_until_done (bint) – If True, the task is kept alive until it completes, preventing garbage collection. Exceptions are wrapped in PersistedTaskException for special handling within the __persisted_task_exc_wrap function.

  • log_destroy_pending (bint) – If False, asyncio’s default error log when a pending task is destroyed is suppressed.

Return type:

asyncio.Task[T]

Examples

Create a simple task with a coroutine:

>>> async def my_coroutine():
...     return "Hello, World!"
>>> task = create_task(my_coroutine())

Create a task with a non-coroutine Awaitable:

>>> from concurrent.futures import Future
>>> future = Future()
>>> task = create_task(future)

a_sync.asyncio.gather module

This module provides an enhanced version of asyncio.gather().

class a_sync.asyncio.gather.tqdm_asyncio

Bases: object

async static gather(*args, **kwargs)
async a_sync.asyncio.gather._exc_wrap(awaitable: Awaitable[T]) T | Exception

Wraps an awaitable to catch exceptions and return them instead of raising.

Parameters:

awaitable (Awaitable[T]) – The awaitable to wrap.

Returns:

The result of the awaitable or the exception if one is raised.

Return type:

T | Exception

async a_sync.asyncio.gather.gather(*awaitables: Awaitable[T] | Mapping[K, Awaitable[V]], return_exceptions: bool = False, exclude_if: Excluder[T] | None = None, tqdm: bool = False, **tqdm_kwargs: Any) List[T] | Dict[K, V]

Concurrently awaits a list of awaitable objects or a k:v mapping of awaitables, and returns the results.

This function extends Python’s asyncio.gather(), providing additional features for handling either individual awaitable objects or a single mapping of awaitables.

Differences from asyncio.gather(): - Uses type hints for use with static type checkers. - Supports gathering either individual awaitables or a k:v mapping of awaitables. - Provides progress reporting using tqdm if ‘tqdm’ is set to True. - Allows exclusion of results based on a condition using the ‘exclude_if’ parameter. Note: This is only applied when the input is not a mapping.

Parameters:
  • *awaitables (Awaitable[T] | Mapping[K, Awaitable[V]]) – The awaitables to await concurrently. It can be a list of individual awaitables or a single mapping of awaitables.

  • return_exceptions (bool, optional) – If True, exceptions are returned as results instead of raising them. Defaults to False.

  • exclude_if (Optional[Excluder[T]], optional) – A callable that takes a result and returns True if the result should be excluded from the final output. Defaults to None. Note: This is only applied when the input is not a mapping.

  • tqdm (bool, optional) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs – Additional keyword arguments for tqdm if progress reporting is enabled.

Return type:

List[T] | Dict[K, V]

Examples

Awaiting individual awaitables:

>>> results = await gather(thing1(), thing2())
>>> results
['result', 123]

Awaiting a mapping of awaitables:

>>> mapping = {'key1': thing1(), 'key2': thing2()}
>>> results = await gather(mapping)
>>> results
{'key1': 'result', 'key2': 123}

See also

asyncio.gather()

async a_sync.asyncio.gather.gather_mapping(mapping: Mapping[K, Awaitable[V]], return_exceptions: bool = False, exclude_if: Excluder[V] | None = None, tqdm: bool = False, **tqdm_kwargs: Any) Dict[K, V]

Concurrently awaits a mapping of awaitable objects and returns a dictionary of results.

This function is designed to await a mapping of awaitable objects, where each key-value pair represents a unique awaitable. It enables concurrent execution and gathers results into a dictionary.

Parameters:
  • mapping (Mapping[K, Awaitable[V]]) – A dictionary-like object where keys are of type K and values are awaitable objects of type V.

  • return_exceptions (bool, optional) – If True, exceptions are returned as results instead of raising them. Defaults to False.

  • exclude_if (Optional[Excluder[V]], optional) – A callable that takes a result and returns True if the result should be excluded from the final output. Defaults to None. Note: This is not applied when the input is a mapping.

  • tqdm (bool, optional) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs – Additional keyword arguments for tqdm if progress reporting is enabled.

Return type:

Dict[K, V]

Example

The ‘results’ dictionary will contain the awaited results, where keys match the keys in the ‘mapping’ and values contain the results of the corresponding awaitables.

>>> mapping = {'task1': async_function1(), 'task2': async_function2(), 'task3': async_function3()}
>>> results = await gather_mapping(mapping)
>>> results
{'task1': "result", 'task2': 123, 'task3': None}

See also

asyncio.gather()

Module contents

This package provides custom utilities and extensions to the built-in asyncio package.

These utilities include enhanced versions of common asyncio functions, offering additional features and improved functionality for asynchronous programming.

Modules: - create_task(): Extends asyncio.create_task to support any Awaitable, manage task lifecycle, and enhance error handling. - gather(): Provides an enhanced version of asyncio.gather with additional features like progress reporting and exclusion of results based on a condition. - as_completed(): Extends asyncio.as_completed with additional functionality such as progress reporting using tqdm. - get_event_loop(): Utility to get the current event loop, creating a new one if none exists.

See also

  • asyncio: The standard asyncio library documentation for more details on the original functions.

a_sync.asyncio.as_completed(fs, *, timeout: float | None = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any)

Concurrently awaits a list of awaitable objects or mappings of awaitables and returns an iterator of results.

This function extends Python’s asyncio.as_completed(), providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables.

Differences from asyncio.as_completed(): - Uses type hints for use with static type checkers. - Supports either individual awaitables or a k:v mapping of awaitables. - Can be used as an async iterator which yields the result values using ASyncIterator. - Provides progress reporting using tqdm if ‘tqdm’ is set to True.

Note

The return_exceptions parameter is used to wrap awaitables with exceptions if set to True, allowing exceptions to be returned as results instead of being raised.

Parameters:
  • fs – The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables.

  • timeout (float | None) – The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout).

  • return_exceptions (bool) – If True, exceptions are wrapped and returned as results instead of raising them. Defaults to False.

  • aiter (bool) – If True, returns an async iterator of results using ASyncIterator. Defaults to False.

  • tqdm (bool) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs (Any) – Additional keyword arguments for tqdm if progress reporting is enabled.

Examples

Awaiting individual awaitables:

>>> awaitables = [async_function1(), async_function2()]
>>> for coro in as_completed(awaitables):
...     val = await coro
...     ...
>>> async for val in as_completed(awaitables, aiter=True):
...     ...

Awaiting mappings of awaitables:

>>> mapping = {'key1': async_function1(), 'key2': async_function2()}
>>> for coro in as_completed(mapping):
...     k, v = await coro
...     ...
>>> async for k, v in as_completed(mapping, aiter=True):
...     ...

See also

a_sync.asyncio.create_task(coro: Awaitable[T], *, unicode name: str = u'', skip_gc_until_done: bint = False, log_destroy_pending: bint = True) 'asyncio.Task[T]'

Extends asyncio.create_task() to support any Awaitable, manage task lifecycle, and enhance error handling.

This function accepts any Awaitable, ensuring broader compatibility. If the Awaitable is not a coroutine, it is awaited directly using a private helper function __await, which can handle non-coroutine Awaitable objects.

Note

The __await function is designed to handle non-coroutine Awaitables by awaiting them directly.

Parameters:
  • coro (Awaitable[T]) – An Awaitable object from which to create the task.

  • name (str) – Optional name for the task, aiding in debugging.

  • skip_gc_until_done (bint) – If True, the task is kept alive until it completes, preventing garbage collection. Exceptions are wrapped in PersistedTaskException for special handling within the __persisted_task_exc_wrap function.

  • log_destroy_pending (bint) – If False, asyncio’s default error log when a pending task is destroyed is suppressed.

Return type:

asyncio.Task[T]

Examples

Create a simple task with a coroutine:

>>> async def my_coroutine():
...     return "Hello, World!"
>>> task = create_task(my_coroutine())

Create a task with a non-coroutine Awaitable:

>>> from concurrent.futures import Future
>>> future = Future()
>>> task = create_task(future)
async a_sync.asyncio.gather(*awaitables: Awaitable[T] | Mapping[K, Awaitable[V]], return_exceptions: bool = False, exclude_if: Excluder[T] | None = None, tqdm: bool = False, **tqdm_kwargs: Any) List[T] | Dict[K, V]

Concurrently awaits a list of awaitable objects or a k:v mapping of awaitables, and returns the results.

This function extends Python’s asyncio.gather(), providing additional features for handling either individual awaitable objects or a single mapping of awaitables.

Differences from asyncio.gather(): - Uses type hints for use with static type checkers. - Supports gathering either individual awaitables or a k:v mapping of awaitables. - Provides progress reporting using tqdm if ‘tqdm’ is set to True. - Allows exclusion of results based on a condition using the ‘exclude_if’ parameter. Note: This is only applied when the input is not a mapping.

Parameters:
  • *awaitables (Awaitable[T] | Mapping[K, Awaitable[V]]) – The awaitables to await concurrently. It can be a list of individual awaitables or a single mapping of awaitables.

  • return_exceptions (bool, optional) – If True, exceptions are returned as results instead of raising them. Defaults to False.

  • exclude_if (Optional[Excluder[T]], optional) – A callable that takes a result and returns True if the result should be excluded from the final output. Defaults to None. Note: This is only applied when the input is not a mapping.

  • tqdm (bool, optional) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs – Additional keyword arguments for tqdm if progress reporting is enabled.

Return type:

List[T] | Dict[K, V]

Examples

Awaiting individual awaitables:

>>> results = await gather(thing1(), thing2())
>>> results
['result', 123]

Awaiting a mapping of awaitables:

>>> mapping = {'key1': thing1(), 'key2': thing2()}
>>> results = await gather(mapping)
>>> results
{'key1': 'result', 'key2': 123}

See also

asyncio.gather()

a_sync.asyncio.get_event_loop()