a_sync.utils package

Submodules

a_sync.utils.iterators module

This module provides utility functions for handling and merging asynchronous iterators. It includes functions to exhaust async iterators, merge multiple async iterators into a single async iterator, and manage the processing flow of items in an asynchronous context.

async a_sync.utils.iterators.as_yielded(*iterators)[source]

Merges multiple async iterators into a single async iterator that yields items as they become available from any of the source iterators.

This function is designed to streamline the handling of multiple asynchronous data streams by consolidating them into a single asynchronous iteration context. It enables concurrent fetching and processing of items from multiple sources, improving efficiency and simplifying code structure when dealing with asynchronous operations.

The merging process is facilitated by the exhaust_iterators() function, which concurrently processes the source iterators and places their items into a queue. This mechanism ensures that the merged stream of items is delivered in an order determined by the availability of items from the source iterators, rather than their original sequence.

The function handles exceptions and ensures robustness and reliability by using asyncio tasks and queues. It manages edge cases such as early termination and exception management. The _Done sentinel class is used internally to signal the completion of processing.

Parameters:

*iterators (AsyncIterator[T]) – Variable length list of AsyncIterator objects to be merged.

Return type:

AsyncIterator[T]

Note

This implementation leverages asyncio tasks and queues to efficiently manage the asynchronous iteration and merging process. It handles edge cases such as early termination and exception management, ensuring robustness and reliability. The _Done sentinel class is used internally to signal the completion of processing.

Example

>>> async def example():
>>>     async for item in as_yielded(iterator1, iterator2):
>>>         print(item)
async a_sync.utils.iterators.exhaust_iterator(iterator, *, queue=None)[source]

Asynchronously iterates over items from the given async iterator and optionally places them into a queue.

This function is a utility to exhaust an async iterator, with an option to forward the iterated items to a provided queue-like object. The queue should have a put_nowait method. This is particularly useful when dealing with asynchronous operations that produce items to be consumed by other parts of an application, enabling a producer-consumer pattern.

Parameters:
  • iterator (AsyncIterator[T]) – The async iterator to exhaust.

  • queue (Optional[asyncio.Queue]) – An optional queue-like object where iterated items will be placed. The queue should support the put_nowait method. If None, items are simply consumed.

Return type:

None

Example

>>> async def example():
>>>     await exhaust_iterator(some_async_iterator, queue=my_queue)
async a_sync.utils.iterators.exhaust_iterators(iterators, *, queue=None, join=False)[source]

Asynchronously iterates over multiple async iterators concurrently and optionally places their items into a queue.

This function leverages asyncio.gather() to concurrently exhaust multiple async iterators. It’s useful in scenarios where items from multiple async sources need to be processed or collected together, supporting concurrent operations and efficient multitasking.

Parameters:
  • iterators – A sequence of async iterators to be exhausted concurrently.

  • queue (Optional[Queue]) – An optional queue-like object where items from all iterators will be placed. If None, items are simply consumed.

  • join (Optional[bool]) – If a queue was provided and join is True, this coroutine will continue to run until all queue items have been processed.

Raises:

ValueError – If join is True but no queue is provided.

Return type:

None

Example

>>> async def example():
>>>     await exhaust_iterators([iterator1, iterator2], queue=my_queue, join=True)

Module contents

This module initializes the utility functions for the a_sync library, including functions for handling asynchronous iterators and implementing asynchronous versions of the built-in any and all functions.

async a_sync.utils.as_yielded(*iterators)[source]

Merges multiple async iterators into a single async iterator that yields items as they become available from any of the source iterators.

This function is designed to streamline the handling of multiple asynchronous data streams by consolidating them into a single asynchronous iteration context. It enables concurrent fetching and processing of items from multiple sources, improving efficiency and simplifying code structure when dealing with asynchronous operations.

The merging process is facilitated by the exhaust_iterators() function, which concurrently processes the source iterators and places their items into a queue. This mechanism ensures that the merged stream of items is delivered in an order determined by the availability of items from the source iterators, rather than their original sequence.

The function handles exceptions and ensures robustness and reliability by using asyncio tasks and queues. It manages edge cases such as early termination and exception management. The _Done sentinel class is used internally to signal the completion of processing.

Parameters:

*iterators (AsyncIterator[T]) – Variable length list of AsyncIterator objects to be merged.

Return type:

AsyncIterator[T]

Note

This implementation leverages asyncio tasks and queues to efficiently manage the asynchronous iteration and merging process. It handles edge cases such as early termination and exception management, ensuring robustness and reliability. The _Done sentinel class is used internally to signal the completion of processing.

Example

>>> async def example():
>>>     async for item in as_yielded(iterator1, iterator2):
>>>         print(item)
async a_sync.utils.exhaust_iterator(iterator, *, queue=None)[source]

Asynchronously iterates over items from the given async iterator and optionally places them into a queue.

This function is a utility to exhaust an async iterator, with an option to forward the iterated items to a provided queue-like object. The queue should have a put_nowait method. This is particularly useful when dealing with asynchronous operations that produce items to be consumed by other parts of an application, enabling a producer-consumer pattern.

Parameters:
  • iterator (AsyncIterator[T]) – The async iterator to exhaust.

  • queue (Optional[asyncio.Queue]) – An optional queue-like object where iterated items will be placed. The queue should support the put_nowait method. If None, items are simply consumed.

Return type:

None

Example

>>> async def example():
>>>     await exhaust_iterator(some_async_iterator, queue=my_queue)
async a_sync.utils.exhaust_iterators(iterators, *, queue=None, join=False)[source]

Asynchronously iterates over multiple async iterators concurrently and optionally places their items into a queue.

This function leverages asyncio.gather() to concurrently exhaust multiple async iterators. It’s useful in scenarios where items from multiple async sources need to be processed or collected together, supporting concurrent operations and efficient multitasking.

Parameters:
  • iterators – A sequence of async iterators to be exhausted concurrently.

  • queue (Optional[Queue]) – An optional queue-like object where items from all iterators will be placed. If None, items are simply consumed.

  • join (Optional[bool]) – If a queue was provided and join is True, this coroutine will continue to run until all queue items have been processed.

Raises:

ValueError – If join is True but no queue is provided.

Return type:

None

Example

>>> async def example():
>>>     await exhaust_iterators([iterator1, iterator2], queue=my_queue, join=True)