a_sync package

Subpackages

Submodules

a_sync.ENVIRONMENT_VARIABLES module

a_sync.ENVIRONMENT_VARIABLES.DEBUG_CLASS_NAME = <EnvironmentVariable[name=`EZASYNC_DEBUG_CLASS_NAME`, type=str, default_value=, current_value=, using_default=True]>

The name of the class to debug.

If you’re only interested in debugging a specific class, set this to the class name.

Examples

To debug a class named MyClass, set the environment variable:

export EZASYNC_DEBUG_CLASS_NAME=MyClass

See also

DEBUG_MODE() for enabling debug mode on all classes.

Type:

str

a_sync.ENVIRONMENT_VARIABLES.DEBUG_MODE = <EnvironmentVariable[name=`EZASYNC_DEBUG_MODE`, type=bool, default_value=False, current_value=False, using_default=True]>

Enables debug mode on all classes.

Set this environment variable to True to enable debug mode on all classes. If DEBUG_CLASS_NAME is set to a non-empty string, DEBUG_MODE will default to True.

Examples

To enable debug mode globally, set the environment variable:

export EZASYNC_DEBUG_MODE=True

If you have set DEBUG_CLASS_NAME to a specific class, DEBUG_MODE will automatically be True unless DEBUG_CLASS_NAME is an empty string.

See also

DEBUG_CLASS_NAME() for debugging a specific class.

Type:

bool

a_sync._smart module

This module defines smart future and task utilities for the a_sync library. These utilities provide enhanced functionality for managing asynchronous tasks and futures, including a custom task factory for creating SmartTask instances and a shielding mechanism to protect tasks from cancellation.

class a_sync._smart.SmartFuture[source]

Bases: _SmartFutureMixin[T], Future

A smart future that tracks waiters and integrates with a smart processing queue.

Inherits from both _SmartFutureMixin and asyncio.Future, providing additional functionality for tracking waiters and integrating with a smart processing queue.

Example

Creating and awaiting a SmartFuture:

`python future = SmartFuture() await future `

See also

__init__(*, queue, key=None, loop=None)[source]

Initialize the SmartFuture with an optional queue and key.

Parameters:
  • queue (SmartProcessingQueue[Any, Any, T] | None) – Optional; a smart processing queue.

  • key (Tuple[Tuple[Any], Tuple[Tuple[str, Any]]] | None) – Optional; a key identifying the future.

  • loop (AbstractEventLoop | None) – Optional; the event loop.

Return type:

None

Example

`python future = SmartFuture(queue=my_queue, key=my_key) `

See also

  • SmartProcessingQueue

__iter__()

Implement iter(self).

_make_cancelled_error()

Create the CancelledError to raise if the Future is cancelled.

This should only be called once when handling a cancellation since it erases the context exception value.

_repr_info()
_self_done_cleanup_callback()

Callback to clean up waiters and remove the future from the queue when done.

This method clears all waiters and removes the future from the associated queue.

Parameters:

self (SmartFuture | SmartTask)

Return type:

None

_waiter_done_cleanup_callback(waiter)

Callback to clean up waiters when a waiter task is done.

Removes the waiter from _waiters, and _queue._futs if applicable.

Parameters:
Return type:

None

Example

Automatically called when a waiter task completes.

add_done_callback()

Add a callback to be run when the future becomes done.

The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.

cancel(msg=None)

Cancel the future and schedule callbacks.

If the future is already done or cancelled, return False. Otherwise, change the future’s state to cancelled, schedule the callbacks and return True.

cancelled()

Return True if the future was cancelled.

done()

Return True if the future is done.

Done means either that a result / exception are available, or that the future was cancelled.

exception()

Return the exception that was set on this future.

The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn’t done yet, raises InvalidStateError.

get_loop()

Return the event loop the Future is bound to.

remove_done_callback(fn, /)

Remove all instances of a callback from the “call when done” list.

Returns the number of callbacks removed.

result()

Return the result this future represents.

If the future has been cancelled, raises CancelledError. If the future’s result isn’t yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised.

set_exception(exception, /)

Mark the future done and set an exception.

If the future is already done when this method is called, raises InvalidStateError.

set_result(result, /)

Mark the future done and set its result.

If the future is already done when this method is called, raises InvalidStateError.

_asyncio_future_blocking
_callbacks
_cancel_message
_exception
_key: _Key = None
_log_traceback
_loop
_queue: 'SmartProcessingQueue[Any, Any, T]' | None = None
_result
_source_traceback
_state
_waiters: weakref.WeakSet[SmartTask[T]]
property num_waiters: int

Get the number of waiters currently awaiting the future or task.

This property checks if the future or task is done to ensure accurate counting of waiters, as the callback may not have run yet.

Example

`python future = SmartFuture() print(future.num_waiters) `

class a_sync._smart.SmartTask[source]

Bases: _SmartFutureMixin[T], Task

A smart task that tracks waiters and integrates with a smart processing queue.

Inherits from both _SmartFutureMixin and asyncio.Task, providing additional functionality for tracking waiters and integrating with a smart processing queue.

Example

Creating and awaiting a SmartTask:

`python task = SmartTask(coro=my_coroutine()) await task `

See also

__init__(coro, *, loop=None, name=None)[source]

Initialize the SmartTask with a coroutine and optional event loop.

Parameters:
  • coro (Awaitable[T]) – The coroutine to run in the task.

  • loop (AbstractEventLoop | None) – Optional; the event loop.

  • name (str | None) – Optional; the name of the task.

Return type:

None

Example

`python task = SmartTask(coro=my_coroutine(), name="my_task") `

__iter__()

Implement iter(self).

_make_cancelled_error()

Create the CancelledError to raise if the Task is cancelled.

This should only be called once when handling a cancellation since it erases the context exception value.

_repr_info()
_self_done_cleanup_callback()

Callback to clean up waiters and remove the future from the queue when done.

This method clears all waiters and removes the future from the associated queue.

Parameters:

self (SmartFuture | SmartTask)

Return type:

None

_waiter_done_cleanup_callback(waiter)

Callback to clean up waiters when a waiter task is done.

Removes the waiter from _waiters, and _queue._futs if applicable.

Parameters:
Return type:

None

Example

Automatically called when a waiter task completes.

add_done_callback()

Add a callback to be run when the future becomes done.

The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.

cancel(msg=None)

Request that this task cancel itself.

This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop. The coroutine then has a chance to clean up or even deny the request using try/except/finally.

Unlike Future.cancel, this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.

Immediately after this method is called, Task.cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).

cancelled()

Return True if the future was cancelled.

done()

Return True if the future is done.

Done means either that a result / exception are available, or that the future was cancelled.

exception()

Return the exception that was set on this future.

The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn’t done yet, raises InvalidStateError.

get_coro()
get_loop()

Return the event loop the Future is bound to.

get_name()
get_stack(*, limit=None)

Return the list of stack frames for this task’s coroutine.

If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback frames.

The frames are always ordered from oldest to newest.

The optional limit gives the maximum number of frames to return; by default all available frames are returned. Its meaning differs depending on whether a stack or a traceback is returned: the newest frames of a stack are returned, but the oldest frames of a traceback are returned. (This matches the behavior of the traceback module.)

For reasons beyond our control, only one stack frame is returned for a suspended coroutine.

print_stack(*, limit=None, file=None)

Print the stack or traceback for this task’s coroutine.

This produces output similar to that of the traceback module, for the frames retrieved by get_stack(). The limit argument is passed to get_stack(). The file argument is an I/O stream to which the output is written; by default output is written to sys.stderr.

remove_done_callback(fn, /)

Remove all instances of a callback from the “call when done” list.

Returns the number of callbacks removed.

result()

Return the result this future represents.

If the future has been cancelled, raises CancelledError. If the future’s result isn’t yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised.

set_exception(exception, /)

Mark the future done and set an exception.

If the future is already done when this method is called, raises InvalidStateError.

set_name(value, /)
set_result(result, /)

Mark the future done and set its result.

If the future is already done when this method is called, raises InvalidStateError.

_asyncio_future_blocking
_callbacks
_cancel_message
_coro
_exception
_fut_waiter
_key: _Key
_log_destroy_pending
_log_traceback
_loop
_must_cancel
_queue: 'SmartProcessingQueue[Any, Any, T]' | None = None
_result
_source_traceback
_state
_waiters: Set['asyncio.Task[T]']
property num_waiters: int

Get the number of waiters currently awaiting the future or task.

This property checks if the future or task is done to ensure accurate counting of waiters, as the callback may not have run yet.

Example

`python future = SmartFuture() print(future.num_waiters) `

a_sync._smart.create_future(*, queue=None, key=None, loop=None)[source]

Create a SmartFuture instance.

Parameters:
Returns:

A SmartFuture instance.

Return type:

SmartFuture[V]

Example

Creating a SmartFuture using the factory function:

`python future = create_future(queue=my_queue, key=my_key) `

See also

a_sync._smart.set_smart_task_factory(loop=None)[source]

Set the event loop’s task factory to smart_task_factory() so all tasks will be SmartTask instances.

Parameters:

loop (AbstractEventLoop | None) – Optional; the event loop. If None, the current event loop is used.

Return type:

None

Example

Setting the smart task factory for the current event loop:

`python set_smart_task_factory() `

a_sync._smart.shield(arg, *, loop=None)[source]

Wait for a future, shielding it from cancellation.

The statement

res = await shield(something())

is exactly equivalent to the statement

res = await something()

except that if the coroutine containing it is cancelled, the task running in something() is not cancelled. From the POV of something(), the cancellation did not happen. But its caller is still cancelled, so the yield-from expression still raises CancelledError. Note: If something() is cancelled by other means this will still cancel shield().

If you want to completely ignore cancellation (not recommended) you can combine shield() with a try/except clause, as follows:

try:

res = await shield(something())

except CancelledError:

res = None

Parameters:
  • arg (Awaitable[T]) – The awaitable to shield from cancellation.

  • loop (AbstractEventLoop | None) – Optional; the event loop. Deprecated since Python 3.8.

Returns:

A SmartFuture or asyncio.Future instance.

Return type:

SmartFuture[T] | Future

Example

Using shield to protect a coroutine from cancellation:

`python result = await shield(my_coroutine()) `

See also

a_sync._smart.smart_task_factory(loop, coro)[source]

Task factory function that an event loop calls to create new tasks.

This factory function utilizes ez-a-sync’s custom SmartTask implementation.

Parameters:
  • loop (AbstractEventLoop) – The event loop.

  • coro (Awaitable[T]) – The coroutine to run in the task.

Returns:

A SmartTask instance running the provided coroutine.

Return type:

SmartTask[T]

Example

Using the smart task factory to create a SmartTask:

`python loop = asyncio.get_event_loop() task = smart_task_factory(loop, my_coroutine()) `

a_sync._typing module

This module provides type definitions and type-related utilities for the a_sync library.

It includes various type aliases and protocols used throughout the library to enhance type checking and provide better IDE support.

Examples

The following examples demonstrate how to use some of the type aliases and protocols defined in this module.

Example of a function that can return either an awaitable or a direct value:

```python from a_sync._typing import MaybeAwaitable from typing import Awaitable

async def process_data(data: MaybeAwaitable[int]) -> int:
if isinstance(data, Awaitable):

return await data

return data

# Usage import asyncio

async def main():

result = await process_data(asyncio.sleep(1, result=42)) print(result) # Output: 42

result = await process_data(42) print(result) # Output: 42

asyncio.run(main()) ```

Example of defining a coroutine function type using CoroFn with ParamSpec:

```python from a_sync._typing import CoroFn from typing_extensions import ParamSpec from typing import Awaitable

P = ParamSpec(“P”)

async def async_function(x: int) -> str:

return str(x)

coro_fn: CoroFn[[int], str] = async_function ```

Example of defining a synchronous function type using SyncFn with ParamSpec:

```python from a_sync._typing import SyncFn from typing_extensions import ParamSpec

P = ParamSpec(“P”)

def sync_function(x: int) -> str:

return str(x)

sync_fn: SyncFn[[int], str] = sync_function ```

See also

class a_sync._typing.AsyncUnboundMethod[source]

Bases: Protocol[I, P, T]

Protocol for unbound asynchronous methods.

An unbound method is a method that hasn’t been bound to an instance of a class yet. It’s essentially the function object itself, before it’s accessed through an instance.

__init__(*args, **kwargs)
class a_sync._typing.CoroBoundMethod[source]

Bases: Protocol[I, P, T]

Protocol for coroutine bound methods.

Example

class MyClass:
async def my_method(self, x: int) -> str:

return str(x)

instance = MyClass() bound_method: CoroBoundMethod[MyClass, [int], str] = instance.my_method

__init__(*args, **kwargs)
__call__: Callable[[P], Awaitable[T]] = <method-wrapper '__call__' of _ProtocolMeta object>
class a_sync._typing.I

A TypeVar that is used to represent instances of a common class.

alias of TypeVar(‘I’)

__init__(name, *constraints, bound=None, covariant=False, contravariant=False)
class a_sync._typing.ModifierKwargs[source]

Bases: TypedDict

TypedDict for keyword arguments that modify the behavior of asynchronous operations.

__getitem__()

x.__getitem__(y) <==> x[y]

__init__(*args, **kwargs)
__iter__()

Implement iter(self).

clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values
cache_type: Literal['memory', None]
cache_typed: bool
default: Literal['sync', 'async', None]
executor: Executor
ram_cache_maxsize: int | None
ram_cache_ttl: int | float | Decimal | None
runs_per_minute: int | None
semaphore: Semaphore | int | None
class a_sync._typing.SyncBoundMethod[source]

Bases: Protocol[I, P, T]

Protocol for synchronous bound methods.

Example

class MyClass:
def my_method(self, x: int) -> str:

return str(x)

instance = MyClass() bound_method: SyncBoundMethod[MyClass, [int], str] = instance.my_method

__init__(*args, **kwargs)
__call__: Callable[[P], T] = <method-wrapper '__call__' of _ProtocolMeta object>
class a_sync._typing.SyncUnboundMethod[source]

Bases: Protocol[I, P, T]

Protocol for unbound synchronous methods.

An unbound method is a method that hasn’t been bound to an instance of a class yet. It’s essentially the function object itself, before it’s accessed through an instance.

__init__(*args, **kwargs)
a_sync._typing.AnyBoundMethod

Type alias for any bound method, whether synchronous or asynchronous.

alias of CoroBoundMethod[Any, P, T] | SyncBoundMethod[Any, P, T]

a_sync._typing.AnyFn

Type alias for any function, whether synchronous or asynchronous.

alias of Callable[[P], Awaitable[T]] | Callable[[P], T]

a_sync._typing.AnyGetterFunction

Type alias for any getter function, whether synchronous or asynchronous.

alias of Callable[[I], Awaitable[T]] | Callable[[I], T]

a_sync._typing.AnyIterable

Type alias for any iterable, whether synchronous or asynchronous.

alias of AsyncIterable[K] | Iterable[K]

a_sync._typing.AnyIterableOrAwaitableIterable

Type alias for any iterable, whether synchronous or asynchronous, or an awaitable that resolves to any iterable, whether synchronous or asynchronous.

alias of AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]]

a_sync._typing.AnyUnboundMethod

Type alias for any unbound method, whether synchronous or asynchronous.

alias of AsyncUnboundMethod[I, P, T] | SyncUnboundMethod[I, P, T]

a_sync._typing.AsyncDecorator

Type alias for decorators for coroutine functions.

alias of Callable[[Callable[[P], Awaitable[T]]], Callable[[P], Awaitable[T]]]

a_sync._typing.AsyncDecoratorOrCoroFn

Type alias for either an asynchronous decorator or a coroutine function.

alias of Callable[[Callable[[P], Awaitable[T]]], Callable[[P], Awaitable[T]]] | Callable[[P], Awaitable[T]]

a_sync._typing.AsyncGetterFunction

Type alias for asynchronous getter functions.

alias of Callable[[I], Awaitable[T]]

a_sync._typing.CacheType

Type alias for cache types.

alias of Literal[‘memory’, None]

a_sync._typing.CoroFn

Type alias for any function that returns an awaitable.

alias of Callable[[P], Awaitable[T]]

a_sync._typing.DefaultMode

Type alias for default modes of operation.

alias of Literal[‘sync’, ‘async’, None]

a_sync._typing.MaybeAwaitable

Type alias for values that may or may not be awaitable. Useful for functions that can return either an awaitable or a direct value.

alias of Awaitable[T] | T

a_sync._typing.MaybeCoro

Type alias for values that may or may not be coroutine.

alias of Coroutine[Any, Any, T] | T

a_sync._typing.Numeric

Type alias for numeric values of types int, float, or Decimal.

alias of int | float | Decimal

a_sync._typing.P = ~P

A ParamSpec used everywhere in the lib.

a_sync._typing.SemaphoreSpec

Type alias for semaphore specifications.

alias of Semaphore | int | None

a_sync._typing.SyncFn

Type alias for synchronous functions.

alias of Callable[[P], T]

a_sync._typing.SyncGetterFunction

Type alias for synchronous getter functions.

alias of Callable[[I], T]

a_sync.aliases module

a_sync.exceptions module

This module defines custom exceptions for the a_sync library.

exception a_sync.exceptions.ASyncFlagException[source]

Bases: ValueError

Base exception class for flag-related errors in the a_sync library.

A-Sync uses ‘flags’ to indicate whether objects or function calls will be sync or async. You can use any of the provided flags, which include ‘sync’ and ‘asynchronous’, whichever makes most sense for your use case.

Examples

>>> try:
...     raise ASyncFlagException("An error occurred with flags.")
... except ASyncFlagException as e:
...     print(e)
An error occurred with flags.

See also

  • VIABLE_FLAGS

desc(target)[source]

Returns a description of the target for the flag error message.

Parameters:

target – The target object or string to describe.

Return type:

str

Examples

>>> exception = ASyncFlagException()
>>> exception.desc("kwargs")
"flags present in 'kwargs'"
>>> exception.desc("some_target")
'flag attributes defined on some_target'
viable_flags = {'asynchronous', 'sync'}

{‘sync’, ‘asynchronous’}.

Type:

The set of viable flags

exception a_sync.exceptions.ASyncRuntimeError[source]

Bases: RuntimeError

Raised for runtime errors in asynchronous operations.

Examples

>>> try:
...     raise ASyncRuntimeError(RuntimeError("Some runtime error"))
... except ASyncRuntimeError as e:
...     print(e)
Some runtime error
__init__(e)[source]

Initializes the ASyncRuntimeError exception.

Parameters:

e (RuntimeError) – The original runtime error.

See also

exception a_sync.exceptions.EmptySequenceError[source]

Bases: ValueError

Raised when an operation is attempted on an empty sequence but items are required.

Examples

>>> try:
...     raise EmptySequenceError("Sequence is empty")
... except EmptySequenceError as e:
...     print(e)
Sequence is empty
exception a_sync.exceptions.FlagNotDefined[source]

Bases: ASyncFlagException

Raised when a flag is not defined on an object.

Examples

>>> class SomeClass:
...     pass
...
>>> try:
...     raise FlagNotDefined(SomeClass, "some_flag")
... except FlagNotDefined as e:
...     print(e)
<class '__main__.SomeClass'> flag some_flag is not defined.
__init__(obj, flag)[source]

Initializes the FlagNotDefined exception.

Parameters:
  • obj (Type) – The object where the flag is not defined.

  • flag (str) – The undefined flag.

desc(target)

Returns a description of the target for the flag error message.

Parameters:

target – The target object or string to describe.

Return type:

str

Examples

>>> exception = ASyncFlagException()
>>> exception.desc("kwargs")
"flags present in 'kwargs'"
>>> exception.desc("some_target")
'flag attributes defined on some_target'
viable_flags = {'asynchronous', 'sync'}

{‘sync’, ‘asynchronous’}.

Type:

The set of viable flags

exception a_sync.exceptions.FunctionNotAsync[source]

Bases: ImproperFunctionType

Raised when a function expected to be async is not.

Examples

>>> def some_function():
...     pass
...
>>> try:
...     raise FunctionNotAsync(some_function)
... except FunctionNotAsync as e:
...     print(e)
`coro_fn` must be a coroutine function defined with `async def`. You passed <function some_function at 0x...>.
__init__(fn)[source]

Initializes the FunctionNotAsync exception.

Parameters:

fn – The function that is not async.

exception a_sync.exceptions.FunctionNotSync[source]

Bases: ImproperFunctionType

Raised when a function expected to be sync is actually async.

Examples

>>> async def some_async_function():
...     pass
...
>>> try:
...     raise FunctionNotSync(some_async_function)
... except FunctionNotSync as e:
...     print(e)
`func` must be a coroutine function defined with `def`. You passed <function some_async_function at 0x...>.
__init__(fn)[source]

Initializes the FunctionNotSync exception.

Parameters:

fn – The function that is not sync.

exception a_sync.exceptions.ImproperFunctionType[source]

Bases: ValueError

Raised when a function that should be sync is async or vice-versa.

exception a_sync.exceptions.InvalidFlag[source]

Bases: ASyncFlagException

Raised when an invalid flag is encountered.

Examples

>>> try:
...     raise InvalidFlag("invalid_flag")
... except InvalidFlag as e:
...     print(e)
'flag' must be one of: {'sync', 'asynchronous'}. You passed invalid_flag.
This code should not be reached and likely indicates an issue with a custom subclass definition.

See also

  • VIABLE_FLAGS

__init__(flag)[source]

Initializes the InvalidFlag exception.

Parameters:

flag (str | None) – The invalid flag.

desc(target)

Returns a description of the target for the flag error message.

Parameters:

target – The target object or string to describe.

Return type:

str

Examples

>>> exception = ASyncFlagException()
>>> exception.desc("kwargs")
"flags present in 'kwargs'"
>>> exception.desc("some_target")
'flag attributes defined on some_target'
viable_flags = {'asynchronous', 'sync'}

{‘sync’, ‘asynchronous’}.

Type:

The set of viable flags

exception a_sync.exceptions.InvalidFlagValue[source]

Bases: ASyncFlagException

Raised when a flag has an invalid value.

Examples

>>> try:
...     raise InvalidFlagValue("some_flag", "not_a_boolean")
... except InvalidFlagValue as e:
...     print(e)
'some_flag' should be boolean. You passed not_a_boolean.
__init__(flag, flag_value)[source]

Initializes the InvalidFlagValue exception.

Parameters:
  • flag (str) – The flag with an invalid value.

  • flag_value (Any) – The invalid value of the flag.

desc(target)

Returns a description of the target for the flag error message.

Parameters:

target – The target object or string to describe.

Return type:

str

Examples

>>> exception = ASyncFlagException()
>>> exception.desc("kwargs")
"flags present in 'kwargs'"
>>> exception.desc("some_target")
'flag attributes defined on some_target'
viable_flags = {'asynchronous', 'sync'}

{‘sync’, ‘asynchronous’}.

Type:

The set of viable flags

exception a_sync.exceptions.MappingError[source]

Bases: Exception

Base class for errors related to TaskMapping.

Examples

>>> from a_sync import TaskMapping
>>> try:
...     raise MappingError(TaskMapping(), "Some mapping error")
... except MappingError as e:
...     print(e)
Some mapping error:
<TaskMapping object at 0x...>
{}
__init__(mapping, msg='')[source]

Initializes the MappingError exception.

Parameters:
  • mapping (TaskMapping) – The TaskMapping where the error occurred.

  • msg (str) – An optional message describing the error.

See also

  • TaskMapping

_msg: str
exception a_sync.exceptions.MappingIsEmptyError[source]

Bases: MappingError

Raised when a TaskMapping is empty and an operation requires it to have items.

Examples

>>> from a_sync import TaskMapping
>>> try:
...     raise MappingIsEmptyError(TaskMapping())
... except MappingIsEmptyError as e:
...     print(e)
TaskMapping does not contain anything to yield:
<TaskMapping object at 0x...>
{}
__init__(mapping, msg='')

Initializes the MappingError exception.

Parameters:
  • mapping (TaskMapping) – The TaskMapping where the error occurred.

  • msg (str) – An optional message describing the error.

See also

  • TaskMapping

_msg: str = 'TaskMapping does not contain anything to yield'
exception a_sync.exceptions.MappingNotEmptyError[source]

Bases: MappingError

Raised when a TaskMapping is not empty and an operation requires it to be empty.

Examples

>>> from a_sync import TaskMapping
>>> task_mapping = TaskMapping()
>>> task_mapping['key'] = 'value'
>>> try:
...     raise MappingNotEmptyError(task_mapping)
... except MappingNotEmptyError as e:
...     print(e)
TaskMapping already contains some data. In order to use `map`, you need a fresh one:
<TaskMapping object at 0x...>
{'key': 'value'}
__init__(mapping, msg='')

Initializes the MappingError exception.

Parameters:
  • mapping (TaskMapping) – The TaskMapping where the error occurred.

  • msg (str) – An optional message describing the error.

See also

  • TaskMapping

_msg: str = 'TaskMapping already contains some data. In order to use `map`, you need a fresh one'
exception a_sync.exceptions.NoFlagsFound[source]

Bases: ASyncFlagException

Raised when no viable flags are found in the target.

Examples

>>> try:
...     raise NoFlagsFound("some_target")
... except NoFlagsFound as e:
...     print(e)
There are no viable a_sync flag attributes defined on some_target:
Viable flags: {'sync', 'asynchronous'}
This is likely an issue with a custom subclass definition.
__init__(target, kwargs_keys=None)[source]

Initializes the NoFlagsFound exception.

Parameters:
  • target – The target object where flags were expected.

  • kwargs_keys – Optional; keys in the kwargs if applicable.

desc(target)

Returns a description of the target for the flag error message.

Parameters:

target – The target object or string to describe.

Return type:

str

Examples

>>> exception = ASyncFlagException()
>>> exception.desc("kwargs")
"flags present in 'kwargs'"
>>> exception.desc("some_target")
'flag attributes defined on some_target'
viable_flags = {'asynchronous', 'sync'}

{‘sync’, ‘asynchronous’}.

Type:

The set of viable flags

exception a_sync.exceptions.PersistedTaskException[source]

Bases: Exception

Raised when an exception persists in an asyncio Task.

Examples

>>> import asyncio
>>> async def some_task():
...     raise ValueError("Some error")
...
>>> task = asyncio.create_task(some_task())
>>> try:
...     raise PersistedTaskException(ValueError("Some error"), task)
... except PersistedTaskException as e:
...     print(e)
ValueError: Some error
__init__(exc, task)[source]

Initializes the PersistedTaskException exception.

Parameters:
  • exc (E) – The exception that persisted.

  • task (Task) – The asyncio Task where the exception occurred.

Return type:

None

See also

exception a_sync.exceptions.SyncModeInAsyncContextError[source]

Bases: ASyncRuntimeError

Raised when synchronous code is used within an asynchronous context.

Examples

>>> try:
...     raise SyncModeInAsyncContextError()
... except SyncModeInAsyncContextError as e:
...     print(e)
The event loop is already running, which means you're trying to use an `ASyncFunction` synchronously from within an async context.
Check your traceback to determine which, then try calling asynchronously instead with one of the following kwargs:
{'sync', 'asynchronous'}
__init__(err='')[source]

Initializes the SyncModeInAsyncContextError exception.

Parameters:

err (str)

exception a_sync.exceptions.TooManyFlags[source]

Bases: ASyncFlagException

Raised when multiple flags are found, but only one was expected.

Examples

>>> try:
...     raise TooManyFlags("some_target", ["flag1", "flag2"])
... except TooManyFlags as e:
...     print(e)
There are multiple a_sync flag attributes defined on some_target and there should only be one.
Present flags: ['flag1', 'flag2']
This is likely an issue with a custom subclass definition.
__init__(target, present_flags)[source]

Initializes the TooManyFlags exception.

Parameters:
  • target – The target object where flags were found.

  • present_flags – The flags that were found.

desc(target)

Returns a description of the target for the flag error message.

Parameters:

target – The target object or string to describe.

Return type:

str

Examples

>>> exception = ASyncFlagException()
>>> exception.desc("kwargs")
"flags present in 'kwargs'"
>>> exception.desc("some_target")
'flag attributes defined on some_target'
viable_flags = {'asynchronous', 'sync'}

{‘sync’, ‘asynchronous’}.

Type:

The set of viable flags

a_sync.executor module

This module provides several executor classes that facilitate running synchronous functions asynchronously using asyncio.

With these executors, you can run sync functions in your executor with await executor.run(fn, *args, **kwargs). The executor.submit(fn, *args, **kwargs) method works similarly to the concurrent.futures implementation but returns an asyncio.Future instead of a concurrent.futures.Future.

Executor Classes:

See also

class a_sync.executor.AsyncProcessPoolExecutor[source]

Bases: _AsyncExecutorMixin, ProcessPoolExecutor

A :class:`concurrent.futures.ProcessPoolExecutor’ subclass providing asynchronous run and submit methods that support kwargs, with support for synchronous mode

Examples

>>> executor = AsyncProcessPoolExecutor(max_workers=4)
>>> future = executor.submit(some_function, arg1, arg2, kwarg1='kwarg1')
>>> result = await future
__init__(max_workers=None, mp_context=None, initializer=None, initargs=())[source]

Initializes the AsyncProcessPoolExecutor.

Parameters:
  • max_workers (int | None) – The maximum number of workers. Defaults to None.

  • mp_context (BaseContext | None) – The multiprocessing context. Defaults to None.

  • initializer (Callable[[...], object] | None) – An initializer callable. Defaults to None.

  • initargs (Tuple[Any, ...]) – Arguments for the initializer. Defaults to ().

Return type:

None

Examples

>>> executor = AsyncProcessPoolExecutor(max_workers=4)
>>> future = executor.submit(some_function, arg1, arg2)
>>> result = await future
_adjust_process_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item.

Parameters:
  • fut (Future) – The future being debugged.

  • fn – The function being executed.

  • *args – Positional arguments for the function.

  • **kwargs – Keyword arguments for the function.

Return type:

None

See also

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_launch_processes()
_spawn_process()
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_start_executor_manager_thread()
_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args). Doesn’t await this_executor.run(fn, *args) look so much better?

In synchronous mode, the function is executed directly in the current thread. In asynchronous mode, the function is submitted to the executor and awaited.

Parameters:
  • fn (Callable[[~P], T]) – The function to run.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Examples

>>> async def example():
>>>     result = await executor.run(some_function, arg1, arg2, kwarg1=value1)
>>>     print(result)

See also

  • submit() for submitting functions to the executor.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T]) – The function to submit.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Return type:

Future[T]

Examples

>>> future = executor.submit(some_function, arg1, arg2, kwarg1=value1)
>>> result = await future
>>> print(result)

See also

  • run() for running functions with the executor.

_broken
_call_queue
_initargs
_initializer
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_max_workers: int
_mp_context
_pending_work_items
_processes
_queue_count
_queue_management_thread
_queue_management_thread_wakeup
_result_queue
_shutdown_lock
_shutdown_thread
_work_ids
_workers: str = 'processes'

The type of workers used, set to “processes”.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

property sync_mode: bool

Indicates if the executor is in synchronous mode (max_workers == 0).

Examples

>>> if executor.sync_mode:
>>>     print("Executor is in synchronous mode.")
property worker_count_current: int

Returns the current number of workers.

Examples

>>> print(f"Current worker count: {executor.worker_count_current}")
class a_sync.executor.AsyncThreadPoolExecutor[source]

Bases: _AsyncExecutorMixin, ThreadPoolExecutor

A :class:`concurrent.futures.ThreadPoolExecutor’ subclass providing asynchronous run and submit methods that support kwargs, with support for synchronous mode

Examples

>>> executor = AsyncThreadPoolExecutor(max_workers=10, thread_name_prefix="MyThread")
>>> future = executor.submit(some_function, arg1, arg2, kwarg1='kwarg1')
>>> result = await future
__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=())[source]

Initializes the AsyncThreadPoolExecutor.

Parameters:
  • max_workers (int | None) – The maximum number of workers. Defaults to None.

  • thread_name_prefix (str) – Prefix for thread names. Defaults to ‘’.

  • initializer (Callable[[...], object] | None) – An initializer callable. Defaults to None.

  • initargs (Tuple[Any, ...]) – Arguments for the initializer. Defaults to ().

Return type:

None

Examples

>>> executor = AsyncThreadPoolExecutor(max_workers=10, thread_name_prefix="MyThread")
>>> future = executor.submit(some_function, arg1, arg2)
>>> result = await future
_adjust_thread_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item.

Parameters:
  • fut (Future) – The future being debugged.

  • fn – The function being executed.

  • *args – Positional arguments for the function.

  • **kwargs – Keyword arguments for the function.

Return type:

None

See also

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_initializer_failed()
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args). Doesn’t await this_executor.run(fn, *args) look so much better?

In synchronous mode, the function is executed directly in the current thread. In asynchronous mode, the function is submitted to the executor and awaited.

Parameters:
  • fn (Callable[[~P], T]) – The function to run.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Examples

>>> async def example():
>>>     result = await executor.run(some_function, arg1, arg2, kwarg1=value1)
>>>     print(result)

See also

  • submit() for submitting functions to the executor.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T]) – The function to submit.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Return type:

Future[T]

Examples

>>> future = executor.submit(some_function, arg1, arg2, kwarg1=value1)
>>> result = await future
>>> print(result)

See also

  • run() for running functions with the executor.

_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_idle_semaphore
_initargs
_initializer
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_work_queue
_workers: str = 'threads'

The type of workers used, set to “threads”.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

property sync_mode: bool

Indicates if the executor is in synchronous mode (max_workers == 0).

Examples

>>> if executor.sync_mode:
>>>     print("Executor is in synchronous mode.")
property worker_count_current: int

Returns the current number of workers.

Examples

>>> print(f"Current worker count: {executor.worker_count_current}")
class a_sync.executor.PruningThreadPoolExecutor[source]

Bases: AsyncThreadPoolExecutor

This AsyncThreadPoolExecutor implementation prunes inactive threads after ‘timeout’ seconds without a work item. Pruned threads will be automatically recreated as needed for future workloads. Up to ‘max_threads’ can be active at any one time. The executor ensures that at least one active thread remains to prevent locks.

Note

The _worker function includes a check (len(executor) > 1) to ensure that at least one thread remains active. This prevents the executor from having zero active threads, which could lead to deadlocks.

Examples

>>> executor = PruningThreadPoolExecutor(max_workers=5, timeout=300)
>>> future = executor.submit(some_function, arg1, arg2, kwarg1='kwarg1')
>>> result = await future
__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), timeout=600)[source]

Initializes the PruningThreadPoolExecutor.

Parameters:
  • max_workers – The maximum number of workers. Defaults to None.

  • thread_name_prefix – Prefix for thread names. Defaults to ‘’.

  • initializer – An initializer callable. Defaults to None.

  • initargs – Arguments for the initializer. Defaults to ().

  • timeout – Timeout duration for pruning inactive threads. Defaults to TEN_MINUTES.

Examples

>>> executor = PruningThreadPoolExecutor(max_workers=5, timeout=300)
>>> future = executor.submit(some_function, arg1, arg2)
>>> result = await future
_adjust_thread_count()[source]

Adjusts the number of threads based on workload and idle threads.

See also

  • _worker() for the worker function that handles thread pruning.

async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item.

Parameters:
  • fut (Future) – The future being debugged.

  • fn – The function being executed.

  • *args – Positional arguments for the function.

  • **kwargs – Keyword arguments for the function.

Return type:

None

See also

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_initializer_failed()
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args). Doesn’t await this_executor.run(fn, *args) look so much better?

In synchronous mode, the function is executed directly in the current thread. In asynchronous mode, the function is submitted to the executor and awaited.

Parameters:
  • fn (Callable[[~P], T]) – The function to run.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Examples

>>> async def example():
>>>     result = await executor.run(some_function, arg1, arg2, kwarg1=value1)
>>>     print(result)

See also

  • submit() for submitting functions to the executor.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T]) – The function to submit.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Return type:

Future[T]

Examples

>>> future = executor.submit(some_function, arg1, arg2, kwarg1=value1)
>>> result = await future
>>> print(result)

See also

  • run() for running functions with the executor.

_adjusting_lock

Lock used to adjust the number of threads.

_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_idle_semaphore
_initargs
_initializer
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_timeout

Timeout duration for pruning inactive threads.

_work_queue
_workers: str = 'threads'

The type of workers used, set to “threads”.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

property sync_mode: bool

Indicates if the executor is in synchronous mode (max_workers == 0).

Examples

>>> if executor.sync_mode:
>>>     print("Executor is in synchronous mode.")
property worker_count_current: int

Returns the current number of workers.

Examples

>>> print(f"Current worker count: {executor.worker_count_current}")

a_sync.future module

The future.py module provides functionality for handling asynchronous futures, including a decorator for converting callables into ASyncFuture objects and utilities for managing asynchronous computations.

Functions:
future(callable: Union[Callable[P, Awaitable[T]], Callable[P, T]] = None, **kwargs: Unpack[ModifierKwargs]) -> Callable[P, Union[T, “ASyncFuture[T]”]]:

A decorator to convert a callable into an ASyncFuture, with optional modifiers.

_gather_check_and_materialize(*things: Unpack[MaybeAwaitable[T]]) -> List[T]:

Gathers and materializes a list of awaitable or non-awaitable items.

_check_and_materialize(thing: T) -> T:

Checks if an item is awaitable and materializes it.

_materialize(meta: “ASyncFuture[T]”) -> T:

Materializes the result of an ASyncFuture.

Classes:

ASyncFuture: Represents an asynchronous future result.

TODO include a simple mathematics example a one complex example with numerous variables and operations TODO include attribute access examples TODO describe a bit more about both of the above 2 TODOs somewhere in this module-level docstring TODO describe why a user would want to use these (to write cleaner code that doesn’t require as many ugly gathers) TODO include comparisons between the ‘new way’ with this future class and the ‘old way’ with gathers

class a_sync.future.ASyncFuture[source]

Bases: Future, Awaitable[T]

A class representing an asynchronous future result.

Inherits from both concurrent.futures.Future and Awaitable[T], allowing it to be used in both synchronous and asynchronous contexts.

The ASyncFuture class provides additional functionality for arithmetic operations, comparisons, and conversions, making it versatile for various use cases.

Example

>>> async def async_fn():
...     return 42
>>> future = ASyncFuture(async_fn())
>>> await future
42

Note

Arithmetic operations are implemented, allowing for mathematical operations on future results. You no longer have to choose between optimized async code and clean, readable code.

Example

>>> future1 = ASyncFuture(asyncio.sleep(1, result=10))
>>> future2 = ASyncFuture(asyncio.sleep(1, result=5))
>>> future3 = ASyncFuture(asyncio.sleep(1, result=10))
>>> future4 = ASyncFuture(asyncio.sleep(1, result=2))
>>> result = (future1 + future2) / future3 ** future4
>>> await result
0.15
Attribute Access:

The ASyncFuture allows attribute access on the materialized result.

Example

>>> class Example:
...     def __init__(self, value):
...         self.value = value
>>> future = ASyncFuture(asyncio.sleep(1, result=Example(42)))
>>> future.value
42

See also

future() for creating ASyncFuture instances.

__getitem__(key)[source]

Allows item access on the materialized result.

Parameters:

key – The key to access.

Returns:

The item from the materialized result.

Return type:

Any

Example

>>> future = ASyncFuture(asyncio.sleep(1, result={'key': 'value'}))
>>> future['key']
'value'
__init__(awaitable, dependencies=[])[source]

Initializes an ASyncFuture with an awaitable and optional dependencies.

Parameters:
  • awaitable (Awaitable[T]) – The awaitable object.

  • dependencies (List[ASyncFuture]) – A list of dependencies. Defaults to [].

Return type:

None

Example

>>> async def async_fn():
...     return 42
>>> future = ASyncFuture(async_fn())
>>> await future
42
__iter__()[source]

Returns an iterator for the materialized result.

Example

>>> future = ASyncFuture(asyncio.sleep(1, result=[1, 2, 3]))
>>> for item in future:
...     print(item)
1
2
3
__next__()[source]

Returns the next item from the materialized result.

Example

>>> future = ASyncFuture(asyncio.sleep(1, result=iter([1, 2, 3])))
>>> next(future)
1
_invoke_callbacks()
add_done_callback(fn)

Attaches a callable that will be called when the future finishes.

Parameters:

fn – A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.

cancel()

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

cancelled()

Return True if the future was cancelled.

done()

Return True if the future was cancelled or finished executing.

exception(timeout=None)

Return the exception raised by the call that the future represents.

Parameters:

timeout – The number of seconds to wait for the exception if the future isn’t done. If None, then there is no limit on the wait time.

Returns:

The exception raised by the call that the future represents or None if the call completed without raising.

Raises:
  • CancelledError – If the future was cancelled.

  • TimeoutError – If the future didn’t finish executing before the given timeout.

running()

Return True if the future is currently executing.

set_exception(exception)

Sets the result of the future as being the given exception.

Should only be used by Executor implementations and unit tests.

set_result(result)

Sets the return value of work associated with the future.

Should only be used by Executor implementations and unit tests.

set_running_or_notify_cancel()

Mark the future as running or process any cancel notifications.

Should only be used by Executor implementations and unit tests.

If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.

If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.

This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed.

Returns:

False if the Future was cancelled, True otherwise.

Raises:

RuntimeError – if this method was already called or if set_result() or set_exception() was called.

property result: Callable[[], T] | Any

If this future is not done, it will work like cf.Future.result. It will block, await the awaitable, and return the result when ready. If this future is done and the result has attribute result, will return getattr(future_result, ‘result’) If this future is done and the result does NOT have attribute result, will again work like cf.Future.result

Example

>>> future = ASyncFuture(asyncio.sleep(1, result=42))
>>> future.result()
42
a_sync.future.future(callable=None, **kwargs)[source]

A decorator function to convert a callable into an ASyncFuture, with optional modifiers.

This function wraps the provided callable in an _ASyncFutureWrappedFn instance, which returns an ASyncFuture when called. The ASyncFuture allows the result of the callable to be awaited or accessed synchronously.

Parameters:
  • callable (Callable[[~P], Awaitable[T]] | Callable[[~P], T] | None) – The callable to convert. Defaults to None.

  • **kwargs (Unpack[ModifierKwargs]) – Additional keyword arguments for the modifier.

Returns:

A callable that returns either the result or an ASyncFuture.

Return type:

Callable[[~P], T | ASyncFuture[T]]

Example

>>> @future
... async def async_function():
...     return 42
>>> result = async_function()
>>> isinstance(result, ASyncFuture)
True

See also

ASyncFuture

a_sync.iter module

class a_sync.iter.ASyncFilter

Bases: _ASyncView[T]

An async filter class that filters items of an async iterable based on a provided function.

This class inherits from _ASyncView and provides the functionality to asynchronously iterate over items, applying the filter function to each item to determine if it should be included in the result. The filter function can be either synchronous or asynchronous.

Example:
>>> async def is_even(x):
...     return x % 2 == 0
>>> filtered_iterable = ASyncFilter(is_even, some_async_iterable)
>>> async for item in filtered_iterable:
...     print(item)
See Also:

When awaited, a list of all T objects will be returned.

Example

>>> my_object = ASyncFilter(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], T)
True
__aiter__(self) Self

Return the {cls} for aiteration.

Return type:

Self

async __anext__(self) T
Return type:

T

__init__(self, function: ViewFn[T], iterable: AnyIterable[T]) None

Initializes the {cls} with a function and an iterable.

Parameters:
Return type:

None

__iter__(self) Self

Return the {cls} for iteration.

Note

Synchronous iteration uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Self

__next__(self) T

Synchronously fetch the next item from the {cls}.

Note

This method uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Raises:
Return type:

T

async _check(self, obj: T) bool

Checks if an object passes the filter function.

Parameters:

obj (T) – The object to check.

Returns:

True if the object passes the filter, False otherwise.

Return type:

bool

filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the {obj} yielded by the {cls} based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered {obj} from the {cls}.

Return type:

ASyncFilter[T]

sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the {obj} yielded by the {cls}.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the {obj} yielded from this {cls}, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped)

Class method to wrap either an AsyncIterator or an async generator function.

property materialized: List[T]

Synchronously iterate through the {cls} and return all {obj}.

Returns:

A list of the {obj} yielded by the {cls}.

class a_sync.iter.ASyncGeneratorFunction

Bases: Generic[P, T]

Encapsulates an asynchronous generator function, providing a mechanism to use it as an asynchronous iterator with enhanced capabilities. This class wraps an async generator function, allowing it to be called with parameters and return an ASyncIterator object. It is particularly useful for situations where an async generator function needs to be used in a manner that is consistent with both synchronous and asynchronous execution contexts.

The ASyncGeneratorFunction class supports dynamic binding to instances, enabling it to be used as a method on class instances. When accessed as a descriptor, it automatically handles the binding to the instance, thereby allowing the wrapped async generator function to be invoked with instance context (‘self’) automatically provided. This feature is invaluable for designing classes that need to expose asynchronous generators as part of their interface while maintaining the ease of use and calling semantics similar to regular methods.

By providing a unified interface to asynchronous generator functions, this class facilitates the creation of APIs that are flexible and easy to use in a wide range of asynchronous programming scenarios. It abstracts away the complexities involved in managing asynchronous generator lifecycles and invocation semantics, making it easier for developers to integrate asynchronous iteration patterns into their applications.

Example

>>> async def my_async_gen():
...     yield 1
...     yield 2
>>> async_gen_func = ASyncGeneratorFunction(my_async_gen)
>>> for item in async_gen_func():
...     print(item)
__call__(self, *args: P.args, **kwargs: P.kwargs) ASyncIterator[T]

Calls the wrapped async generator function with the given arguments and keyword arguments, returning an ASyncIterator.

Parameters:
  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Return type:

ASyncIterator[T]

__init__(self, async_gen_func: AsyncGenFunc[P, T], instance: Any = None) None

Initializes the ASyncGeneratorFunction with the given async generator function and optionally an instance.

Parameters:
  • async_gen_func (Callable[[~P], AsyncGenerator[T, None] | AsyncIterator[T]]) – The async generator function to wrap.

  • instance (optional) – The object to bind to the function, if applicable.

Return type:

None

_cache_handle: TimerHandle
class a_sync.iter.ASyncIterable

Bases: _AwaitableAsyncIterableMixin[T], Iterable[T]

A hybrid Iterable/AsyncIterable implementation designed to offer dual compatibility with both synchronous and asynchronous iteration protocols.

This class allows objects to be iterated over using either a standard for loop or an async for loop, making it versatile in scenarios where the mode of iteration (synchronous or asynchronous) needs to be flexible or is determined at runtime.

The class achieves this by implementing both __iter__ and __aiter__ methods, enabling it to return appropriate iterator objects that can handle synchronous and asynchronous iteration, respectively. However, note that synchronous iteration relies on the ASyncIterator class, which uses asyncio.get_event_loop().run_until_complete to fetch items. This can raise a RuntimeError if the event loop is already running, and in such cases, a SyncModeInAsyncContextError is raised from the RuntimeError.

Example:
>>> async_iterable = ASyncIterable(some_async_iterable)
>>> async for item in async_iterable:
...     print(item)
>>> for item in async_iterable:
...     print(item)
See Also:

When awaited, a list of all T objects will be returned.

Example

>>> my_object = ASyncIterable(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], T)
True
__aiter__(self) AsyncIterator[T]

Return an async iterator that yields {obj} from the {cls}.

Return type:

AsyncIterator[T]

__init__(self, async_iterable: AsyncIterable[T])

Initializes the ASyncIterable with an async iterable.

Parameters:

async_iterable (AsyncIterable[T]) – The async iterable to wrap.

__iter__(self) Iterator[T]

Return an iterator that yields {obj} from the {cls}.

Note

Synchronous iteration leverages ASyncIterator, which uses asyncio.BaseEventLoop.run_until_complete() to fetch items. ASyncIterator.__next__() raises a SyncModeInAsyncContextError if the event loop is already running.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Iterator[T]

filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the {obj} yielded by the {cls} based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered {obj} from the {cls}.

Return type:

ASyncFilter[T]

sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the {obj} yielded by the {cls}.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the {obj} yielded from this {cls}, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

property materialized: List[T]

Synchronously iterate through the {cls} and return all {obj}.

Returns:

A list of the {obj} yielded by the {cls}.

class a_sync.iter.ASyncIterator

Bases: _AwaitableAsyncIterableMixin[T], Iterator[T]

A hybrid Iterator/AsyncIterator implementation that bridges the gap between synchronous and asynchronous iteration. This class provides a unified interface for iteration that can seamlessly operate in both synchronous (for loop) and asynchronous (async for loop) contexts. It allows the wrapping of asynchronous iterable objects or async generator functions, making them usable in synchronous code without explicitly managing event loops or asynchronous context switches.

By implementing both __next__ and __anext__ methods, ASyncIterator enables objects to be iterated using standard iteration protocols while internally managing the complexities of asynchronous iteration. This design simplifies the use of asynchronous iterables in environments or frameworks that are not inherently asynchronous, such as standard synchronous functions or older codebases being gradually migrated to asynchronous IO.

Note:

Synchronous iteration with ASyncIterator uses asyncio.get_event_loop().run_until_complete, which can raise a RuntimeError if the event loop is already running. In such cases, a SyncModeInAsyncContextError is raised from the RuntimeError, indicating that synchronous iteration is not possible in an already running event loop.

Example:
>>> async_iterator = ASyncIterator(some_async_iterator)
>>> async for item in async_iterator:
...     print(item)
>>> for item in async_iterator:
...     print(item)
See Also:

When awaited, a list of all T objects will be returned.

Example

>>> my_object = ASyncIterator(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], T)
True
__aiter__(self) Self

Return the {cls} for aiteration.

Return type:

Self

__anext__(self) Coroutine[Any, Any, T]

Asynchronously fetch the next item from the {cls}.

Raises:

StopAsyncIteration – Once all {obj} have been fetched from the {cls}.

Return type:

Coroutine[Any, Any, T]

__init__(self, async_iterator: AsyncIterator[T])

Initializes the ASyncIterator with an async iterator.

Parameters:

async_iterator (AsyncIterator[T]) – The async iterator to wrap.

__iter__(self) Self

Return the {cls} for iteration.

Note

Synchronous iteration uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Self

__next__(self) T

Synchronously fetch the next item from the {cls}.

Note

This method uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Raises:
Return type:

T

filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the {obj} yielded by the {cls} based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered {obj} from the {cls}.

Return type:

ASyncFilter[T]

sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the {obj} yielded by the {cls}.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the {obj} yielded from this {cls}, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped)

Class method to wrap either an AsyncIterator or an async generator function.

property materialized: List[T]

Synchronously iterate through the {cls} and return all {obj}.

Returns:

A list of the {obj} yielded by the {cls}.

class a_sync.iter.ASyncSorter

Bases: _ASyncView[T]

An async sorter class that sorts items of an async iterable based on a provided key function.

This class inherits from _ASyncView and provides the functionality to asynchronously iterate over items, applying the key function to each item for sorting. The key function can be either synchronous or asynchronous. Note that the ASyncSorter instance can only be consumed once.

Example:
>>> sorted_iterable = ASyncSorter(some_async_iterable, key=lambda x: x.value)
>>> async for item in sorted_iterable:
...     print(item)
See Also:

When awaited, a list of all T objects will be returned.

Example

>>> my_object = ASyncSorter(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], T)
True
__aiter__(self) AsyncIterator[T]

Return an async iterator for the {cls}.

Raises:

RuntimeError – If the ASyncSorter instance has already been consumed.

Returns:

An async iterator that will yield the sorted {obj}.

Return type:

AsyncIterator[T]

__anext__(self) T
Return type:

T

__init__(self, iterable: AsyncIterable[T], *, key: SortKey[T] = None, reverse: bool = False) None

Initializes the ASyncSorter with an iterable and an optional sorting configuration (key function, and reverse flag).

Parameters:
  • iterable (AsyncIterable[T]) – The async iterable to sort.

  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If none is provided, elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the list elements will be sorted in reverse order. Defaults to False.

Return type:

None

__iter__(self) Self

Return the {cls} for iteration.

Note

Synchronous iteration uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Self

__next__(self) T

Synchronously fetch the next item from the {cls}.

Note

This method uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Raises:
Return type:

T

filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the {obj} yielded by the {cls} based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered {obj} from the {cls}.

Return type:

ASyncFilter[T]

sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the {obj} yielded by the {cls}.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the {obj} yielded from this {cls}, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped)

Class method to wrap either an AsyncIterator or an async generator function.

_consumed: bool = False
property materialized: List[T]

Synchronously iterate through the {cls} and return all {obj}.

Returns:

A list of the {obj} yielded by the {cls}.

reversed: bool = False

a_sync.task module

This module provides asynchronous task management utilities, specifically focused on creating and handling mappings of tasks.

The main components include: - TaskMapping: A class for managing and asynchronously generating tasks based on input iterables. - TaskMappingKeys: A view to asynchronously iterate over the keys of a TaskMapping. - TaskMappingValues: A view to asynchronously iterate over the values of a TaskMapping. - TaskMappingItems: A view to asynchronously iterate over the items (key-value pairs) of a TaskMapping.

class a_sync.task.TaskMapping[source]

Bases: DefaultDict[K, asyncio.Task[V]], AsyncIterable[Tuple[K, V]]

A mapping of keys to asynchronous tasks with additional functionality.

TaskMapping is a specialized dictionary that maps keys to asyncio Tasks. It provides convenient methods for creating, managing, and iterating over these tasks asynchronously.

Tasks are created automatically for each key using a provided function. You cannot manually set items in a TaskMapping using dictionary-like syntax.

Example

>>> async def fetch_data(url: str) -> str:
...     async with aiohttp.ClientSession() as session:
...         async with session.get(url) as response:
...             return await response.text()
...
>>> tasks = TaskMapping(fetch_data, ['http://example.com', 'https://www.python.org'], name='url_fetcher', concurrency=5)
>>> async for key, result in tasks:
...     print(f"Data for {key}: {result}")
...
Data for python.org: http://python.org
Data for example.com: http://example.com

Note

You cannot manually set items in a TaskMapping using dictionary-like syntax. Tasks are created and managed internally.

async __aiter__(pop=False)[source]

Asynchronously iterate through all key-task pairs, yielding the key-result pair as each task completes.

Parameters:

pop (bool)

Return type:

AsyncIterator[Tuple[K, V]]

__getitem__(item)[source]

x.__getitem__(y) <==> x[y]

Parameters:

item (K)

Return type:

Task[V]

__init__(wrapped_func=None, *iterables, name='', concurrency=None, **wrapped_func_kwargs)[source]

Initialize a TaskMapping instance.

Parameters:
  • wrapped_func (Callable[[Concatenate[K, ~P]], Awaitable[V]] | None) – A callable that takes a key and additional parameters and returns an Awaitable.

  • *iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]]) – Any number of iterables whose elements will be used as keys for task generation.

  • name (str) – An optional name for the tasks created by this mapping.

  • concurrency (int | None) – Maximum number of tasks to run concurrently.

  • **wrapped_func_kwargs (~P) – Additional keyword arguments to be passed to wrapped_func.

Return type:

None

Example

async def process_item(item: int) -> int:

await asyncio.sleep(1) return item * 2

task_map = TaskMapping(process_item, [1, 2, 3], concurrency=2)

__iter__()

Implement iter(self).

_if_pop_check_destroyed(pop)[source]
Parameters:

pop (bool)

Return type:

None

async _if_pop_clear(pop)[source]
Parameters:

pop (bool)

Return type:

None

_raise_if_empty(msg='')[source]
Parameters:

msg (str)

Return type:

None

_raise_if_not_empty()[source]
Return type:

None

_start_tasks_for_iterables(*iterables)[source]

Start new tasks for each key in the provided iterables.

Parameters:

iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]])

Return type:

AsyncIterator[Tuple[K, Task[V]]]

_tasks_for_iterables(*iterables)[source]

Ensure tasks are running for each key in the provided iterables.

Parameters:

iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]])

Return type:

AsyncIterator[Tuple[K, Task[V]]]

async _wait_for_next_key()[source]
Return type:

None

clear(cancel=False)[source]

# TODO write docs for this

Parameters:

cancel (bool)

Return type:

None

async close()[source]
Return type:

None

copy() a shallow copy of D.
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

keys() a set-like object providing a view on D's keys[source]
Parameters:

pop (bool)

Return type:

TaskMappingKeys[K, V]

map(*iterables, pop=True, yields='both')[source]

Asynchronously map iterables to tasks and yield their results.

Parameters:
  • *iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]]) – Iterables to map over.

  • pop (bool) – Whether to remove tasks from the internal storage once they are completed.

  • yields (Literal['keys', 'both']) – Whether to yield ‘keys’, ‘values’, or ‘both’ (key-value pairs).

Yields:

Depending on yields, either keys, values, or tuples of key-value pairs representing the results of completed tasks.

Return type:

AsyncIterator[Tuple[K, V]]

Example

async def process_item(item: int) -> int:

await asyncio.sleep(1) return item * 2

task_map = TaskMapping(process_item) async for key, result in task_map.map([1, 2, 3]):

print(f”Processed {key}: {result}”)

pop(*args, cancel=False)[source]

Pop a task from the TaskMapping.

Parameters:
  • *args (K) – One key to pop.

  • cancel (bool) – Whether to cancel the task when popping it.

Return type:

Task[V] | Future[V]

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

yield_completed(pop=True)[source]

Asynchronously yield tuples of key-value pairs representing the results of any completed tasks.

Parameters:

pop (bool) – Whether to remove tasks from the internal storage once they are completed.

Yields:

Tuples of key-value pairs representing the results of completed tasks.

Return type:

AsyncIterator[Tuple[K, V]]

Example

async def process_item(item: int) -> int:

await asyncio.sleep(1) return item * 2

task_map = TaskMapping(process_item, [1, 2, 3]) async for key, result in task_map.yield_completed():

print(f”Completed {key}: {result}”)

_destroyed: bool = False

Boolean indicating whether his mapping has been consumed and is no longer usable for aggregations.

property _init_loader: Task[None] | None

An asyncio Task used to preload values from the iterables.

_init_loader_next: Callable[[], Awaitable[Tuple[Tuple[K, Task[V]]]]] | None = None

A coro function that blocks until the _init_loader starts a new task(s), and then returns a Tuple[Tuple[K, asyncio.Task[V]]] with all of the new tasks and the keys that started them.

_name: str | None = None

Optional name for tasks created by this mapping.

_next: CythonEvent = None

An asyncio Event that indicates the next result is ready

property _queue: ProcessingQueue
_wrapped_func

The function used to create tasks for each key.

_wrapped_func_kwargs: Dict[str, Any] = {}

Additional keyword arguments passed to _wrapped_func.

all[source]
Parameters:

pop (bool)

Return type:

bool

any[source]
Parameters:

pop (bool)

Return type:

bool

concurrency: int | None = None

The max number of tasks that will run at one time.

default_factory

Factory for default value called by __missing__().

gather[source]

Wait for all tasks to complete and return a dictionary of the results.

Parameters:
Return type:

Dict[K, V]

max[source]
Parameters:

pop (bool)

Return type:

V

min[source]

Return the minimum result from the tasks in the mapping.

Parameters:

pop (bool)

Return type:

V

sum[source]

Return the sum of the results from the tasks in the mapping.

Parameters:

pop (bool)

Return type:

V

class a_sync.task.TaskMappingItems[source]

Bases: _TaskMappingView[Tuple[K, V], K, V], Generic[K, V]

Asynchronous view to iterate over the items (key-value pairs) of a TaskMapping.

async _TaskMappingView__await()
Return type:

List[T]

async __aiter__()[source]
Return type:

AsyncIterator[Tuple[K, V]]

__init__(self)
Parameters:
Return type:

None

__iter__()
Return type:

Iterator[T]

_get_from_item(item)
aiterbykeys(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

aiterbyvalues(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

_pop: bool = False
class a_sync.task.TaskMappingKeys[source]

Bases: _TaskMappingView[K, K, V], Generic[K, V]

Asynchronous view to iterate over the keys of a TaskMapping.

async _TaskMappingView__await()
Return type:

List[T]

async __aiter__()[source]
Return type:

AsyncIterator[K]

__init__(self)
Parameters:
Return type:

None

__iter__()
Return type:

Iterator[T]

_get_from_item(item)
aiterbykeys(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

aiterbyvalues(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

_pop: bool = False
class a_sync.task.TaskMappingValues[source]

Bases: _TaskMappingView[V, K, V], Generic[K, V]

Asynchronous view to iterate over the values of a TaskMapping.

async _TaskMappingView__await()
Return type:

List[T]

async __aiter__()[source]
Return type:

AsyncIterator[V]

__init__(self)
Parameters:
Return type:

None

__iter__()
Return type:

Iterator[T]

_get_from_item(item)
aiterbykeys(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

aiterbyvalues(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

_pop: bool = False

Module contents

This module initializes the a_sync library by importing and organizing various components, utilities, and classes. It provides a convenient and unified interface for asynchronous programming with a focus on flexibility and efficiency.

The a_sync library offers decorators and base classes to facilitate writing both synchronous and asynchronous code. It includes the @a_sync() decorator and the ASyncGenericBase class, which allow for creating functions and classes that can operate in both synchronous and asynchronous contexts. Additionally, it provides enhanced asyncio primitives, such as queues and locks, with extra functionality.

Modules and components included:

Alias for backward compatibility: - ASyncBase is an alias for ASyncGenericBase, which will be removed eventually, probably in version 0.1.0.

Examples

Using the @a_sync decorator: >>> from a_sync import a_sync >>> @a_sync … async def my_function(): … return “Hello, World!” >>> result = await my_function() >>> print(result)

Using ASyncGenericBase for dual-context classes: >>> from a_sync import ASyncGenericBase >>> class MyClass(ASyncGenericBase): … async def my_method(self): … return “Hello from MyClass” >>> obj = MyClass() >>> result = await obj.my_method() >>> print(result)

See also

class a_sync.ASyncCachedPropertyDescriptor

Bases: _ASyncPropertyDescriptorBase[I, T], AsyncCachedPropertyDescriptor

A descriptor class for dual-function sync/async cached properties.

This class extends the API of ASyncPropertyDescriptor to provide caching functionality, storing the computed value after the first access.

__init__(self, _fget: AsyncGetterFunction[I, T], _fset=None, _fdel=None, field_name=None, **modifiers: Unpack[ModifierKwargs]) None

Initializes the ASyncCachedPropertyDescriptor.

Parameters:
  • _fget (Callable[[I], Awaitable[T]]) – The function to be wrapped.

  • _fset – Optional setter function for the property.

  • _fdel – Optional deleter function for the property.

  • field_name – Optional name for the field. If not provided, the function’s name will be used.

  • **modifiers (Unpack[ModifierKwargs]) – Additional modifier arguments.

Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)

Check if all results are truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._all([1, 2, 3]) ```

async _any(*instances, concurrency=None, name='', **kwargs)

Check if any result is truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._any([-1, 0, 1]) ```

_asyncify(self, func: SyncFn[P, T]) CoroFn[P, T]

Converts a synchronous function to an asynchronous one and applies async modifiers.

Parameters:

func (Callable[[~P], T]) – The synchronous function to be converted.

Returns:

The asynchronous version of the function with applied modifiers.

Return type:

Callable[[~P], Awaitable[T]]

See also

  • ModifierManager.apply_async_modifiers()

_check_method_name(method, method_type)
_check_method_sync(method, method_type)
async _max(*instances, concurrency=None, name='', **kwargs)

Find the maximum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._max([3, 1, 2]) ```

async _min(*instances, concurrency=None, name='', **kwargs)

Find the minimum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._min([3, 1, 2]) ```

async _sum(*instances, concurrency=None, name='', **kwargs)

Calculate the sum of results.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to sum.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._sum([1, 2, 3]) ```

already_loaded(instance)
del_cache_value(instance)
deleter(method)
async get(self, instance: I, owner: Type[I] | None = None) T

Asynchronously retrieves the property value.

Parameters:
  • instance (I) – The instance from which the property is accessed.

  • owner (Type[I] | None) – The owner class of the property.

Returns:

The property value.

Return type:

T

get_cache(instance)
get_cache_value(instance)
get_instance_state(instance)
get_loader(self, instance: I) Callable[[], T]

Retrieves the loader function for the property.

Parameters:

instance (I) – The instance from which the property is accessed.

Returns:

A callable that loads the property value.

Return type:

Callable[[], T]

get_lock(self, instance: I) 'asyncio.Task[T]'

Retrieves the lock for the property.

Parameters:

instance (I) – The instance from which the property is accessed.

Returns:

An asyncio Task representing the lock.

Return type:

Task[T]

has_cache_value(instance)
map(self, instances: AnyIterable[I], owner: Optional[Type[I]] = None, concurrency: Optional[int] = None, unicode name: str = u'') 'TaskMapping[I, T]'

Maps the property across multiple instances.

Parameters:
  • instances (AnyIterable[I]) – An iterable of instances.

  • owner (Optional[Type[I]]) – The owner class of the property.

  • concurrency (Optional[int]) – Optional concurrency limit.

  • name (str) – Optional name for the task mapping.

Returns:

A TaskMapping object.

Return type:

TaskMapping[I, T]

not_loaded(instance)
pop_lock(self, instance: I) None

Removes the lock for the property.

Parameters:

instance (I) – The instance from which the property is accessed.

Return type:

None

set_cache_value(instance, value)
setter(method)
property _TaskMapping: Type[TaskMapping]

This silly helper just fixes a circular import

property _await: Callable[[Awaitable[T]], T]

Applies sync modifiers to the _helpers._await function and caches it.

Returns:

The modified _await function.

See also

  • ModifierManager.apply_sync_modifiers()

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if all results are truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.all([1, 2, 3])

property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if any result is truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.any([-1, 0, 1])

property default: Literal['sync', 'async', None]

Gets the default execution mode (sync, async, or None) for the function.

Returns:

The default execution mode.

See also

  • ModifierManager.default

field_name

The name of the field the ASyncDescriptor is bound to.

hidden_method_descriptor
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the maximum result.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.max([3, 1, 2])

property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the minimum result.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.min([3, 1, 2]) ```

modifiers
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the sum of results.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.sum([1, 2, 3]) ```

wrapped
class a_sync.ASyncGenericSingleton[source]

Bases: ASyncGenericBase

A base class for creating singleton-esque ASync classes.

This class combines the functionality of ASyncGenericBase with a singleton pattern, ensuring that only one instance of the class exists per execution mode (sync/async). It uses a custom metaclass ASyncSingletonMeta to manage instance creation and caching.

Subclasses of ASyncGenericSingleton will have two instances instead of one: - one synchronous instance - one asynchronous instance

This allows for proper behavior in both synchronous and asynchronous contexts while maintaining the singleton pattern within each context.

Note

This class can be instantiated directly, but it is intended to be subclassed to define specific asynchronous behavior. Subclasses should define the necessary properties and methods to specify the asynchronous behavior, as outlined in ASyncGenericBase.

Example

Create a subclass of ASyncGenericSingleton to define specific behavior:

class MyAsyncSingleton(ASyncGenericSingleton):
    @property
    def __a_sync_flag_name__(self):
        return "asynchronous"

    @property
    def __a_sync_flag_value__(self):
        return self.asynchronous

    @classmethod
    def __a_sync_default_mode__(cls):
        return False

    @a_sync
    def my_method(self):
        # Method implementation

# These will return the same synchronous instance
sync_instance1 = MyAsyncSingleton(sync=True)
sync_instance2 = MyAsyncSingleton(sync=True)

# These will return the same asynchronous instance
async_instance1 = MyAsyncSingleton(asynchronous=True)
async_instance2 = MyAsyncSingleton(asynchronous=True)

assert sync_instance1 is sync_instance2
assert async_instance1 is async_instance2
assert sync_instance1 is not async_instance1

See also

  • ASyncGenericBase for base functionality.

  • ASyncSingletonMeta for the metaclass managing the singleton behavior.

__init__(self)
class a_sync.ASyncIterable

Bases: _AwaitableAsyncIterableMixin[T], Iterable[T]

A hybrid Iterable/AsyncIterable implementation designed to offer dual compatibility with both synchronous and asynchronous iteration protocols.

This class allows objects to be iterated over using either a standard for loop or an async for loop, making it versatile in scenarios where the mode of iteration (synchronous or asynchronous) needs to be flexible or is determined at runtime.

The class achieves this by implementing both __iter__ and __aiter__ methods, enabling it to return appropriate iterator objects that can handle synchronous and asynchronous iteration, respectively. However, note that synchronous iteration relies on the ASyncIterator class, which uses asyncio.get_event_loop().run_until_complete to fetch items. This can raise a RuntimeError if the event loop is already running, and in such cases, a SyncModeInAsyncContextError is raised from the RuntimeError.

Example:
>>> async_iterable = ASyncIterable(some_async_iterable)
>>> async for item in async_iterable:
...     print(item)
>>> for item in async_iterable:
...     print(item)
See Also:

When awaited, a list of all T objects will be returned.

Example

>>> my_object = ASyncIterable(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], T)
True
__aiter__(self) AsyncIterator[T]

Return an async iterator that yields {obj} from the {cls}.

Return type:

AsyncIterator[T]

__init__(self, async_iterable: AsyncIterable[T])

Initializes the ASyncIterable with an async iterable.

Parameters:

async_iterable (AsyncIterable[T]) – The async iterable to wrap.

__iter__(self) Iterator[T]

Return an iterator that yields {obj} from the {cls}.

Note

Synchronous iteration leverages ASyncIterator, which uses asyncio.BaseEventLoop.run_until_complete() to fetch items. ASyncIterator.__next__() raises a SyncModeInAsyncContextError if the event loop is already running.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Iterator[T]

filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the {obj} yielded by the {cls} based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered {obj} from the {cls}.

Return type:

ASyncFilter[T]

sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the {obj} yielded by the {cls}.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the {obj} yielded from this {cls}, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

property materialized: List[T]

Synchronously iterate through the {cls} and return all {obj}.

Returns:

A list of the {obj} yielded by the {cls}.

class a_sync.ASyncIterator

Bases: _AwaitableAsyncIterableMixin[T], Iterator[T]

A hybrid Iterator/AsyncIterator implementation that bridges the gap between synchronous and asynchronous iteration. This class provides a unified interface for iteration that can seamlessly operate in both synchronous (for loop) and asynchronous (async for loop) contexts. It allows the wrapping of asynchronous iterable objects or async generator functions, making them usable in synchronous code without explicitly managing event loops or asynchronous context switches.

By implementing both __next__ and __anext__ methods, ASyncIterator enables objects to be iterated using standard iteration protocols while internally managing the complexities of asynchronous iteration. This design simplifies the use of asynchronous iterables in environments or frameworks that are not inherently asynchronous, such as standard synchronous functions or older codebases being gradually migrated to asynchronous IO.

Note:

Synchronous iteration with ASyncIterator uses asyncio.get_event_loop().run_until_complete, which can raise a RuntimeError if the event loop is already running. In such cases, a SyncModeInAsyncContextError is raised from the RuntimeError, indicating that synchronous iteration is not possible in an already running event loop.

Example:
>>> async_iterator = ASyncIterator(some_async_iterator)
>>> async for item in async_iterator:
...     print(item)
>>> for item in async_iterator:
...     print(item)
See Also:

When awaited, a list of all T objects will be returned.

Example

>>> my_object = ASyncIterator(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], T)
True
__aiter__(self) Self

Return the {cls} for aiteration.

Return type:

Self

__anext__(self) Coroutine[Any, Any, T]

Asynchronously fetch the next item from the {cls}.

Raises:

StopAsyncIteration – Once all {obj} have been fetched from the {cls}.

Return type:

Coroutine[Any, Any, T]

__init__(self, async_iterator: AsyncIterator[T])

Initializes the ASyncIterator with an async iterator.

Parameters:

async_iterator (AsyncIterator[T]) – The async iterator to wrap.

__iter__(self) Self

Return the {cls} for iteration.

Note

Synchronous iteration uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Self

__next__(self) T

Synchronously fetch the next item from the {cls}.

Note

This method uses asyncio.BaseEventLoop.run_until_complete() to fetch {obj}. This raises a RuntimeError if the event loop is already running. This RuntimeError will be caught and a more descriptive SyncModeInAsyncContextError will be raised in its place.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Raises:
Return type:

T

filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the {obj} yielded by the {cls} based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered {obj} from the {cls}.

Return type:

ASyncFilter[T]

sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the {obj} yielded by the {cls}.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the {obj} yielded from this {cls}, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped)

Class method to wrap either an AsyncIterator or an async generator function.

property materialized: List[T]

Synchronously iterate through the {cls} and return all {obj}.

Returns:

A list of the {obj} yielded by the {cls}.

class a_sync.ASyncPropertyDescriptor

Bases: _ASyncPropertyDescriptorBase[I, T], AsyncPropertyDescriptor

Descriptor class for asynchronous properties.

__init__(self, _fget: AsyncGetterFunction[I, T], field_name: str | None = None, **modifiers: Unpack[ModifierKwargs]) None

Initializes the _ASyncPropertyDescriptorBase.

Parameters:
  • _fget (Callable[[I], Awaitable[T]]) – The function to be wrapped.

  • field_name (str | None) – Optional name for the field. If not provided, the function’s name will be used.

  • **modifiers (Unpack[ModifierKwargs]) – Additional modifier arguments.

Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)

Check if all results are truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._all([1, 2, 3]) ```

async _any(*instances, concurrency=None, name='', **kwargs)

Check if any result is truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._any([-1, 0, 1]) ```

_asyncify(self, func: SyncFn[P, T]) CoroFn[P, T]

Converts a synchronous function to an asynchronous one and applies async modifiers.

Parameters:

func (Callable[[~P], T]) – The synchronous function to be converted.

Returns:

The asynchronous version of the function with applied modifiers.

Return type:

Callable[[~P], Awaitable[T]]

See also

  • ModifierManager.apply_async_modifiers()

async _max(*instances, concurrency=None, name='', **kwargs)

Find the maximum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._max([3, 1, 2]) ```

async _min(*instances, concurrency=None, name='', **kwargs)

Find the minimum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._min([3, 1, 2]) ```

async _sum(*instances, concurrency=None, name='', **kwargs)

Calculate the sum of results.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to sum.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._sum([1, 2, 3]) ```

awaitable_only(instance)
async get(self, instance: I, owner: Type[I] | None = None) T

Asynchronously retrieves the property value.

Parameters:
  • instance (I) – The instance from which the property is accessed.

  • owner (Type[I] | None) – The owner class of the property.

Returns:

The property value.

Return type:

T

get_loader(instance)
map(self, instances: AnyIterable[I], owner: Optional[Type[I]] = None, concurrency: Optional[int] = None, unicode name: str = u'') 'TaskMapping[I, T]'

Maps the property across multiple instances.

Parameters:
  • instances (AnyIterable[I]) – An iterable of instances.

  • owner (Optional[Type[I]]) – The owner class of the property.

  • concurrency (Optional[int]) – Optional concurrency limit.

  • name (str) – Optional name for the task mapping.

Returns:

A TaskMapping object.

Return type:

TaskMapping[I, T]

property _TaskMapping: Type[TaskMapping]

This silly helper just fixes a circular import

property _await: Callable[[Awaitable[T]], T]

Applies sync modifiers to the _helpers._await function and caches it.

Returns:

The modified _await function.

See also

  • ModifierManager.apply_sync_modifiers()

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if all results are truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.all([1, 2, 3])

property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if any result is truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.any([-1, 0, 1])

property default: Literal['sync', 'async', None]

Gets the default execution mode (sync, async, or None) for the function.

Returns:

The default execution mode.

See also

  • ModifierManager.default

field_name

The name of the field the ASyncDescriptor is bound to.

hidden_method_descriptor
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the maximum result.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.max([3, 1, 2])

property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the minimum result.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.min([3, 1, 2]) ```

modifiers
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the sum of results.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.sum([1, 2, 3]) ```

wrapped
class a_sync.AsyncProcessPoolExecutor[source]

Bases: _AsyncExecutorMixin, ProcessPoolExecutor

A :class:`concurrent.futures.ProcessPoolExecutor’ subclass providing asynchronous run and submit methods that support kwargs, with support for synchronous mode

Examples

>>> executor = AsyncProcessPoolExecutor(max_workers=4)
>>> future = executor.submit(some_function, arg1, arg2, kwarg1='kwarg1')
>>> result = await future
__init__(max_workers=None, mp_context=None, initializer=None, initargs=())[source]

Initializes the AsyncProcessPoolExecutor.

Parameters:
  • max_workers (int | None) – The maximum number of workers. Defaults to None.

  • mp_context (BaseContext | None) – The multiprocessing context. Defaults to None.

  • initializer (Callable[[...], object] | None) – An initializer callable. Defaults to None.

  • initargs (Tuple[Any, ...]) – Arguments for the initializer. Defaults to ().

Return type:

None

Examples

>>> executor = AsyncProcessPoolExecutor(max_workers=4)
>>> future = executor.submit(some_function, arg1, arg2)
>>> result = await future
_adjust_process_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item.

Parameters:
  • fut (Future) – The future being debugged.

  • fn – The function being executed.

  • *args – Positional arguments for the function.

  • **kwargs – Keyword arguments for the function.

Return type:

None

See also

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_launch_processes()
_spawn_process()
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_start_executor_manager_thread()
_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args). Doesn’t await this_executor.run(fn, *args) look so much better?

In synchronous mode, the function is executed directly in the current thread. In asynchronous mode, the function is submitted to the executor and awaited.

Parameters:
  • fn (Callable[[~P], T]) – The function to run.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Examples

>>> async def example():
>>>     result = await executor.run(some_function, arg1, arg2, kwarg1=value1)
>>>     print(result)

See also

  • submit() for submitting functions to the executor.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T]) – The function to submit.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Return type:

Future[T]

Examples

>>> future = executor.submit(some_function, arg1, arg2, kwarg1=value1)
>>> result = await future
>>> print(result)

See also

  • run() for running functions with the executor.

_broken
_call_queue
_initargs
_initializer
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_max_workers: int
_mp_context
_pending_work_items
_processes
_queue_count
_queue_management_thread
_queue_management_thread_wakeup
_result_queue
_shutdown_lock
_shutdown_thread
_work_ids
_workers: str = 'processes'

The type of workers used, set to “processes”.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

property sync_mode: bool

Indicates if the executor is in synchronous mode (max_workers == 0).

Examples

>>> if executor.sync_mode:
>>>     print("Executor is in synchronous mode.")
property worker_count_current: int

Returns the current number of workers.

Examples

>>> print(f"Current worker count: {executor.worker_count_current}")
class a_sync.AsyncThreadPoolExecutor[source]

Bases: _AsyncExecutorMixin, ThreadPoolExecutor

A :class:`concurrent.futures.ThreadPoolExecutor’ subclass providing asynchronous run and submit methods that support kwargs, with support for synchronous mode

Examples

>>> executor = AsyncThreadPoolExecutor(max_workers=10, thread_name_prefix="MyThread")
>>> future = executor.submit(some_function, arg1, arg2, kwarg1='kwarg1')
>>> result = await future
__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=())[source]

Initializes the AsyncThreadPoolExecutor.

Parameters:
  • max_workers (int | None) – The maximum number of workers. Defaults to None.

  • thread_name_prefix (str) – Prefix for thread names. Defaults to ‘’.

  • initializer (Callable[[...], object] | None) – An initializer callable. Defaults to None.

  • initargs (Tuple[Any, ...]) – Arguments for the initializer. Defaults to ().

Return type:

None

Examples

>>> executor = AsyncThreadPoolExecutor(max_workers=10, thread_name_prefix="MyThread")
>>> future = executor.submit(some_function, arg1, arg2)
>>> result = await future
_adjust_thread_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item.

Parameters:
  • fut (Future) – The future being debugged.

  • fn – The function being executed.

  • *args – Positional arguments for the function.

  • **kwargs – Keyword arguments for the function.

Return type:

None

See also

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_initializer_failed()
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args). Doesn’t await this_executor.run(fn, *args) look so much better?

In synchronous mode, the function is executed directly in the current thread. In asynchronous mode, the function is submitted to the executor and awaited.

Parameters:
  • fn (Callable[[~P], T]) – The function to run.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Examples

>>> async def example():
>>>     result = await executor.run(some_function, arg1, arg2, kwarg1=value1)
>>>     print(result)

See also

  • submit() for submitting functions to the executor.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T]) – The function to submit.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Return type:

Future[T]

Examples

>>> future = executor.submit(some_function, arg1, arg2, kwarg1=value1)
>>> result = await future
>>> print(result)

See also

  • run() for running functions with the executor.

_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_idle_semaphore
_initargs
_initializer
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_work_queue
_workers: str = 'threads'

The type of workers used, set to “threads”.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

property sync_mode: bool

Indicates if the executor is in synchronous mode (max_workers == 0).

Examples

>>> if executor.sync_mode:
>>>     print("Executor is in synchronous mode.")
property worker_count_current: int

Returns the current number of workers.

Examples

>>> print(f"Current worker count: {executor.worker_count_current}")
class a_sync.CounterLock

Bases: _DebugDaemonMixin

CounterLock(int start_value: int = 0, unicode name=u’’)

An async primitive that uses an internal counter to manage task synchronization.

A coroutine can await counter.wait_for(3) and it will wait until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will proceed as 5 >= 3.

The internal counter can only be set to a value greater than the current value.

See also

CounterLockCluster for managing multiple CounterLock instances.

__init__()

Initializes the CounterLock with a starting value and an optional name.

Parameters:
  • start_value – The initial value of the counter.

  • name – An optional name for the counter, used in debug logs.

Examples

>>> counter = CounterLock(start_value=0, name="example_counter")
>>> counter.value
0
async _debug_daemon(self) None

Periodically logs debug information about the counter state and waiters.

This method is used internally to provide debugging information when debug logging is enabled.

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

is_ready(self, long long v) bool

A function that indicates whether the current counter value is greater than or equal to a given value.

set(self, long long value) void

Sets the counter to the specified value.

This method internally uses the value property to enforce that the new value must be strictly greater than the current value.

Parameters:

value – The value to set the counter to. Must be strictly greater than the current value.

Raises:

ValueError – If the new value is less than or equal to the current value.

Examples

>>> counter = CounterLock(start_value=0)
>>> counter.set(5)
>>> counter.value
5

See also

CounterLock.value() for direct value assignment.

async wait_for(self, long long value) bint

Waits until the counter reaches or exceeds the specified value.

This method will ensure the debug daemon is running if the counter is not ready.

Parameters:

value – The value to wait for.

Return type:

bint

Examples

>>> counter = CounterLock(start_value=0)
>>> await counter.wait_for(5)  # This will block until counter.value >= 5

See also

CounterLock.set() to set the counter value.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_name

str

Type:

CounterLock._name

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

value

int

Gets the current value of the counter.

Examples

>>> counter = CounterLock(start_value=0)
>>> counter.value
0
Type:

CounterLock.value

a_sync.Event

alias of CythonEvent

class a_sync.PrioritySemaphore

Bases: _AbstractPrioritySemaphore

PrioritySemaphore(int value: int = 1, name: Optional[str] = None, *) -> None Semaphore that uses numeric priorities for waiters.

This class extends _AbstractPrioritySemaphore and provides a concrete implementation using numeric priorities. The _context_manager_class is set to _PrioritySemaphoreContextManager, and the _top_priority is set to -1, which is the highest priority.

Examples:

The primary way to use this semaphore is by specifying a priority.

>>> priority_semaphore = PrioritySemaphore(10)
>>> async with priority_semaphore[priority]:
...     await do_stuff()

You can also enter and exit this semaphore without specifying a priority, and it will use the top priority by default:

>>> priority_semaphore = PrioritySemaphore(10)
>>> async with priority_semaphore:
...     await do_stuff()
See Also:

_AbstractPrioritySemaphore for the base class implementation.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__getitem__()

Gets the context manager for a given priority.

Parameters:

priority – The priority for which to get the context manager. If None, uses the top priority.

Returns:

The context manager associated with the given priority.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = semaphore[priority]
__init__(*args, **kwargs)
async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) None

Wakes up the next waiter in line.

This method handles the waking of waiters based on priority. It includes an emergency procedure to handle potential lost waiters, ensuring that no waiter is left indefinitely waiting.

The emergency procedure is a temporary measure to address potential issues with lost waiters.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore._wake_up_next()
Return type:

None

async acquire(self) Literal[True]

Acquires the semaphore with the top priority.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> await semaphore.acquire()
Return type:

Literal[True]

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Checks if the semaphore is locked.

Returns:

True if the semaphore cannot be acquired immediately, False otherwise.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore.locked()
release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

int

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

a_sync.ProcessPoolExecutor

alias of AsyncProcessPoolExecutor

class a_sync.ProcessingQueue[source]

Bases: _Queue[Tuple[P, asyncio.Future[V]]], Generic[P, V]

A queue designed for processing tasks asynchronously with multiple workers.

Each item in the queue is processed by a worker, and tasks can return results via asynchronous futures. This queue is ideal for scenarios where tasks need to be processed concurrently with a fixed number of workers.

Example

>>> async def process_task(data): return data.upper()
>>> queue = ProcessingQueue(func=process_task, num_workers=5)
>>> fut = await queue.put(item='task')
>>> print(await fut)
TASK
__call__(*args, **kwargs)[source]

Submits a task to the queue.

Example

>>> fut = queue(*args, **kwargs)
>>> print(fut)
Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, return_data=True, name='', loop=None)[source]

Initializes a processing queue with the given worker function and worker count.

Parameters:
  • func (Callable[[~P], Awaitable[V]]) – The task function to process.

  • num_workers (int) – Number of workers to process tasks.

  • return_data (bool) – Whether tasks should return data via futures. Defaults to True.

  • name (str) – Name of the queue. Defaults to an empty string.

  • loop (AbstractEventLoop | None) – Optional event loop for the queue.

Return type:

None

Example

>>> queue = ProcessingQueue(func=my_task_func, num_workers=3, name='myqueue')
_create_future()[source]

Creates a future for the task.

Return type:

Future[V]

_ensure_workers()[source]

Ensures that the worker tasks are running.

Return type:

None

_format()
_get()
_get_loop()
_init(maxsize)
_put(item)
_wakeup_next(waiters)
close()[source]

Closes the queue, preventing further task submissions.

Example

>>> queue.close()
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Asynchronously submits a task to the queue.

Parameters:
  • args (~P) – Positional arguments for the task.

  • kwargs (~P) – Keyword arguments for the task.

Returns:

The future result of the task.

Return type:

Future[V]

Example

>>> fut = await queue.put(item='task')
>>> print(await fut)
put_nowait(*args, **kwargs)[source]

Immediately submits a task to the queue without waiting.

Parameters:
  • args (~P) – Positional arguments for the task.

  • kwargs (~P) – Keyword arguments for the task.

Returns:

The future result of the task.

Return type:

Future[V]

Example

>>> fut = queue.put_nowait(item='task')
>>> print(await fut)
qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False

Indicates whether the queue is closed.

_finished
_getters
_loop = None
_maxsize
_name

Optional name for the queue.

_no_futs

Indicates whether tasks will return data via futures.

_putters
_queue
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]

Creates and manages the worker tasks for the queue.

func

The function that each worker will process.

property maxsize

Number of items allowed in the queue.

property name: str

Returns the name of the queue, or its representation.

Example

>>> print(queue.name)
num_workers

The number of worker tasks for processing.

class a_sync.PruningThreadPoolExecutor[source]

Bases: AsyncThreadPoolExecutor

This AsyncThreadPoolExecutor implementation prunes inactive threads after ‘timeout’ seconds without a work item. Pruned threads will be automatically recreated as needed for future workloads. Up to ‘max_threads’ can be active at any one time. The executor ensures that at least one active thread remains to prevent locks.

Note

The _worker function includes a check (len(executor) > 1) to ensure that at least one thread remains active. This prevents the executor from having zero active threads, which could lead to deadlocks.

Examples

>>> executor = PruningThreadPoolExecutor(max_workers=5, timeout=300)
>>> future = executor.submit(some_function, arg1, arg2, kwarg1='kwarg1')
>>> result = await future
__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), timeout=600)[source]

Initializes the PruningThreadPoolExecutor.

Parameters:
  • max_workers – The maximum number of workers. Defaults to None.

  • thread_name_prefix – Prefix for thread names. Defaults to ‘’.

  • initializer – An initializer callable. Defaults to None.

  • initargs – Arguments for the initializer. Defaults to ().

  • timeout – Timeout duration for pruning inactive threads. Defaults to TEN_MINUTES.

Examples

>>> executor = PruningThreadPoolExecutor(max_workers=5, timeout=300)
>>> future = executor.submit(some_function, arg1, arg2)
>>> result = await future
_adjust_thread_count()[source]

Adjusts the number of threads based on workload and idle threads.

See also

  • _worker() for the worker function that handles thread pruning.

async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item.

Parameters:
  • fut (Future) – The future being debugged.

  • fn – The function being executed.

  • *args – Positional arguments for the function.

  • **kwargs – Keyword arguments for the function.

Return type:

None

See also

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_initializer_failed()
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args). Doesn’t await this_executor.run(fn, *args) look so much better?

In synchronous mode, the function is executed directly in the current thread. In asynchronous mode, the function is submitted to the executor and awaited.

Parameters:
  • fn (Callable[[~P], T]) – The function to run.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Examples

>>> async def example():
>>>     result = await executor.run(some_function, arg1, arg2, kwarg1=value1)
>>>     print(result)

See also

  • submit() for submitting functions to the executor.

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T]) – The function to submit.

  • *args (~P) – Positional arguments for the function.

  • **kwargs (~P) – Keyword arguments for the function.

Return type:

Future[T]

Examples

>>> future = executor.submit(some_function, arg1, arg2, kwarg1=value1)
>>> result = await future
>>> print(result)

See also

  • run() for running functions with the executor.

_adjusting_lock

Lock used to adjust the number of threads.

_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_idle_semaphore
_initargs
_initializer
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_timeout

Timeout duration for pruning inactive threads.

_work_queue
_workers: str = 'threads'

The type of workers used, set to “threads”.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

property sync_mode: bool

Indicates if the executor is in synchronous mode (max_workers == 0).

Examples

>>> if executor.sync_mode:
>>>     print("Executor is in synchronous mode.")
property worker_count_current: int

Returns the current number of workers.

Examples

>>> print(f"Current worker count: {executor.worker_count_current}")
class a_sync.Queue[source]

Bases: _Queue[T]

A generic asynchronous queue that extends the functionality of asyncio.Queue.

This implementation supports retrieving multiple items at once and handling task processing in both FIFO and LIFO order. It provides enhanced type hinting support and additional methods for bulk operations.

Inherits from:

Example

>>> queue = Queue()
>>> await queue.put(item='task1')
>>> await queue.put(item='task2')
>>> result = await queue.get()
>>> print(result)
task1
>>> all_tasks = await queue.get_all()
>>> print(all_tasks)
['task2']
__init__(maxsize=0, *, loop=<object object>)[source]
_format()[source]
_get()[source]
_get_loop()
_init(maxsize)[source]
_put(item)[source]
_wakeup_next(waiters)[source]
empty()[source]

Return True if the queue is empty, False otherwise.

full()[source]

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()[source]

Asynchronously retrieves and removes the next item from the queue.

If the queue is empty, this method will block until an item is available.

Example

>>> result = await queue.get()
>>> print(result)
Return type:

T

async get_all()[source]

Asynchronously retrieves and removes all available items from the queue.

If the queue is empty, this method will wait until at least one item is available before returning.

Example

>>> tasks = await queue.get_all()
>>> print(tasks)
Return type:

List[T]

get_all_nowait()[source]

Retrieves and removes all available items from the queue without waiting.

This method does not wait for items to be available and will raise an exception if the queue is empty.

Raises:

QueueEmpty – If the queue is empty.

Return type:

List[T]

Example

>>> tasks = queue.get_all_nowait()
>>> print(tasks)
async get_multi(i, can_return_less=False)[source]

Asynchronously retrieves up to i items from the queue.

Parameters:
  • i (int) – The number of items to retrieve.

  • can_return_less (bool) – If True, may return fewer than i items if queue is emptied.

Raises:

QueueEmpty – If no items are available and fewer items cannot be returned.

Return type:

List[T]

Example

>>> tasks = await queue.get_multi(i=2, can_return_less=True)
>>> print(tasks)
get_multi_nowait(i, can_return_less=False)[source]

Retrieves up to i items from the queue without waiting.

Parameters:
  • i (int) – The number of items to retrieve.

  • can_return_less (bool) – If True, may return fewer than i items if queue is emptied.

Raises:

QueueEmpty – If no items are available and fewer items cannot be returned.

Return type:

List[T]

Example

>>> tasks = queue.get_multi_nowait(i=3, can_return_less=True)
>>> print(tasks)
get_nowait()[source]

Retrieves and removes the next item from the queue without blocking.

This method does not wait for an item to be available and will raise an exception if the queue is empty.

Raises:

QueueEmpty – If the queue is empty.

Return type:

T

Example

>>> result = queue.get_nowait()
>>> print(result)
async join()[source]

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(item)[source]

Asynchronously adds an item to the queue.

If the queue is full, this method will block until space is available.

Parameters:

item (T) – The item to add to the queue.

Return type:

None

Example

>>> await queue.put(item='task')
put_nowait(item)[source]

Adds an item to the queue without blocking.

This method does not wait for space to be available and will raise an exception if the queue is full.

Parameters:

item (T) – The item to add to the queue.

Raises:

QueueFull – If the queue is full.

Return type:

None

Example

>>> queue.put_nowait(item='task')
qsize()[source]

Number of items in the queue.

task_done()[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_finished
_getters
_loop = None
_maxsize
_putters
_queue
_unfinished_tasks
property maxsize

Number of items allowed in the queue.

class a_sync.Semaphore

Bases: _DebugDaemonMixin

Semaphore(int value: int = 1, name=None, loop=None, **kwargs) -> None

A semaphore with additional debugging capabilities inherited from _DebugDaemonMixin.

This semaphore includes debug logging capabilities that are activated when the semaphore has waiters. It allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

You can write this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

See also

_DebugDaemonMixin for more details on debugging capabilities.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.

Returns:

True when the semaphore is successfully acquired.

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

int

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.SmartProcessingQueue[source]

Bases: _VariablePriorityQueueMixin[T], ProcessingQueue[Concatenate[T, P], V]

A processing queue that will execute jobs with the most waiters first, supporting dynamic priorities.

This queue is designed to handle tasks with dynamic priorities, ensuring that tasks with the most waiters are prioritized. It is ideal for scenarios where task execution order is influenced by the number of waiters.

Example

>>> async def process_task(data): return data.upper()
>>> queue = SmartProcessingQueue(func=process_task, num_workers=5)
>>> fut = await queue.put(item='task')
>>> print(await fut)
TASK

See also

ProcessingQueue

__call__(*args, **kwargs)

Submits a task to the queue.

Example

>>> fut = queue(*args, **kwargs)
>>> print(fut)
Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, name='', loop=None)[source]

Initializes a smart processing queue with the given worker function.

Parameters:
  • func (Callable[[Concatenate[T, ~P]], Awaitable[V]]) – The worker function.

  • num_workers (int) – Number of worker tasks.

  • name (str) – Optional name for the queue.

  • loop (AbstractEventLoop | None) – Optional event loop.

Return type:

None

Example

>>> queue = SmartProcessingQueue(func=my_task_func, num_workers=3, name='smart_queue')
_create_future(key)[source]

Creates a smart future for the task.

Parameters:

key (Tuple[Tuple[Any], Tuple[Tuple[str, Any]]])

Return type:

Future[V]

_ensure_workers()

Ensures that the worker tasks are running.

Return type:

None

_format()
_get()[source]

Retrieves the task with the highest priority from the queue.

Returns:

The priority, task arguments, keyword arguments, and future of the task.

Example

>>> task = queue._get()
>>> print(task)
_get_key(*args, **kwargs)

Generates a unique key for task identification based on arguments.

Parameters:
  • args – Positional arguments for the task.

  • kwargs – Keyword arguments for the task.

Returns:

The generated key for the task.

Return type:

Tuple[Tuple[Any], Tuple[Tuple[str, Any]]]

Example

>>> key = queue._get_key(*args, **kwargs)
>>> print(key)
_get_loop()
_init(maxsize)

Initializes the priority queue.

Example

>>> queue._init(maxsize=10)
_put(item, heappush=<built-in function heappush>)

Adds an item to the priority queue based on its priority.

Example

>>> queue._put(item='task')
_wakeup_next(waiters)
close()

Closes the queue, preventing further task submissions.

Example

>>> queue.close()
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Asynchronously adds a task with smart future handling to the queue.

Parameters:
  • args (~P) – Positional arguments for the task.

  • kwargs (~P) – Keyword arguments for the task.

Returns:

The future representing the task’s result.

Return type:

SmartFuture[V]

Example

>>> fut = await queue.put(item='task')
>>> print(await fut)
put_nowait(*args, **kwargs)[source]

Immediately adds a task with smart future handling to the queue without waiting.

Parameters:
  • args (~P) – Positional arguments for the task.

  • kwargs (~P) – Keyword arguments for the task.

Returns:

The future representing the task’s result.

Return type:

SmartFuture[V]

Example

>>> fut = queue.put_nowait(item='task')
>>> print(await fut)
qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False

Indicates whether the queue is closed.

_finished
_futs: weakref.WeakValueDictionary[_smart._Key[T], _smart.SmartFuture[T]]

Weak reference dictionary for managing smart futures.

_getters
_loop = None
_maxsize
_name

Optional name for the queue.

_no_futs = False

Whether smart futures are used.

_putters
_queue
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]

Creates and manages the worker tasks for the queue.

func

The function that each worker will process.

property maxsize

Number of items allowed in the queue.

property name: str

Returns the name of the queue, or its representation.

Example

>>> print(queue.name)
num_workers

The number of worker tasks for processing.

class a_sync.TaskMapping[source]

Bases: DefaultDict[K, asyncio.Task[V]], AsyncIterable[Tuple[K, V]]

A mapping of keys to asynchronous tasks with additional functionality.

TaskMapping is a specialized dictionary that maps keys to asyncio Tasks. It provides convenient methods for creating, managing, and iterating over these tasks asynchronously.

Tasks are created automatically for each key using a provided function. You cannot manually set items in a TaskMapping using dictionary-like syntax.

Example

>>> async def fetch_data(url: str) -> str:
...     async with aiohttp.ClientSession() as session:
...         async with session.get(url) as response:
...             return await response.text()
...
>>> tasks = TaskMapping(fetch_data, ['http://example.com', 'https://www.python.org'], name='url_fetcher', concurrency=5)
>>> async for key, result in tasks:
...     print(f"Data for {key}: {result}")
...
Data for python.org: http://python.org
Data for example.com: http://example.com

Note

You cannot manually set items in a TaskMapping using dictionary-like syntax. Tasks are created and managed internally.

async __aiter__(pop=False)[source]

Asynchronously iterate through all key-task pairs, yielding the key-result pair as each task completes.

Parameters:

pop (bool)

Return type:

AsyncIterator[Tuple[K, V]]

__getitem__(item)[source]

x.__getitem__(y) <==> x[y]

Parameters:

item (K)

Return type:

Task[V]

__init__(wrapped_func=None, *iterables, name='', concurrency=None, **wrapped_func_kwargs)[source]

Initialize a TaskMapping instance.

Parameters:
  • wrapped_func (Callable[[Concatenate[K, ~P]], Awaitable[V]] | None) – A callable that takes a key and additional parameters and returns an Awaitable.

  • *iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]]) – Any number of iterables whose elements will be used as keys for task generation.

  • name (str) – An optional name for the tasks created by this mapping.

  • concurrency (int | None) – Maximum number of tasks to run concurrently.

  • **wrapped_func_kwargs (~P) – Additional keyword arguments to be passed to wrapped_func.

Return type:

None

Example

async def process_item(item: int) -> int:

await asyncio.sleep(1) return item * 2

task_map = TaskMapping(process_item, [1, 2, 3], concurrency=2)

__iter__()

Implement iter(self).

_if_pop_check_destroyed(pop)[source]
Parameters:

pop (bool)

Return type:

None

async _if_pop_clear(pop)[source]
Parameters:

pop (bool)

Return type:

None

_raise_if_empty(msg='')[source]
Parameters:

msg (str)

Return type:

None

_raise_if_not_empty()[source]
Return type:

None

_start_tasks_for_iterables(*iterables)[source]

Start new tasks for each key in the provided iterables.

Parameters:

iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]])

Return type:

AsyncIterator[Tuple[K, Task[V]]]

_tasks_for_iterables(*iterables)[source]

Ensure tasks are running for each key in the provided iterables.

Parameters:

iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]])

Return type:

AsyncIterator[Tuple[K, Task[V]]]

async _wait_for_next_key()[source]
Return type:

None

clear(cancel=False)[source]

# TODO write docs for this

Parameters:

cancel (bool)

Return type:

None

async close()[source]
Return type:

None

copy() a shallow copy of D.
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

keys() a set-like object providing a view on D's keys[source]
Parameters:

pop (bool)

Return type:

TaskMappingKeys[K, V]

map(*iterables, pop=True, yields='both')[source]

Asynchronously map iterables to tasks and yield their results.

Parameters:
  • *iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]]) – Iterables to map over.

  • pop (bool) – Whether to remove tasks from the internal storage once they are completed.

  • yields (Literal['keys', 'both']) – Whether to yield ‘keys’, ‘values’, or ‘both’ (key-value pairs).

Yields:

Depending on yields, either keys, values, or tuples of key-value pairs representing the results of completed tasks.

Return type:

AsyncIterator[Tuple[K, V]]

Example

async def process_item(item: int) -> int:

await asyncio.sleep(1) return item * 2

task_map = TaskMapping(process_item) async for key, result in task_map.map([1, 2, 3]):

print(f”Processed {key}: {result}”)

pop(*args, cancel=False)[source]

Pop a task from the TaskMapping.

Parameters:
  • *args (K) – One key to pop.

  • cancel (bool) – Whether to cancel the task when popping it.

Return type:

Task[V] | Future[V]

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

yield_completed(pop=True)[source]

Asynchronously yield tuples of key-value pairs representing the results of any completed tasks.

Parameters:

pop (bool) – Whether to remove tasks from the internal storage once they are completed.

Yields:

Tuples of key-value pairs representing the results of completed tasks.

Return type:

AsyncIterator[Tuple[K, V]]

Example

async def process_item(item: int) -> int:

await asyncio.sleep(1) return item * 2

task_map = TaskMapping(process_item, [1, 2, 3]) async for key, result in task_map.yield_completed():

print(f”Completed {key}: {result}”)

_destroyed: bool = False

Boolean indicating whether his mapping has been consumed and is no longer usable for aggregations.

property _init_loader: Task[None] | None

An asyncio Task used to preload values from the iterables.

_init_loader_next: Callable[[], Awaitable[Tuple[Tuple[K, Task[V]]]]] | None = None

A coro function that blocks until the _init_loader starts a new task(s), and then returns a Tuple[Tuple[K, asyncio.Task[V]]] with all of the new tasks and the keys that started them.

_name: str | None = None

Optional name for tasks created by this mapping.

_next: CythonEvent = None

An asyncio Event that indicates the next result is ready

property _queue: ProcessingQueue
_wrapped_func

The function used to create tasks for each key.

_wrapped_func_kwargs: Dict[str, Any] = {}

Additional keyword arguments passed to _wrapped_func.

all[source]
Parameters:

pop (bool)

Return type:

bool

any[source]
Parameters:

pop (bool)

Return type:

bool

concurrency: int | None = None

The max number of tasks that will run at one time.

default_factory

Factory for default value called by __missing__().

gather[source]

Wait for all tasks to complete and return a dictionary of the results.

Parameters:
Return type:

Dict[K, V]

max[source]
Parameters:

pop (bool)

Return type:

V

min[source]

Return the minimum result from the tasks in the mapping.

Parameters:

pop (bool)

Return type:

V

sum[source]

Return the sum of the results from the tasks in the mapping.

Parameters:

pop (bool)

Return type:

V

a_sync.ThreadPoolExecutor

alias of AsyncThreadPoolExecutor

class a_sync.ThreadsafeSemaphore

Bases: Semaphore

ThreadsafeSemaphore(value: Optional[int], name: Optional[str] = None) -> None

A semaphore that works in a multi-threaded environment.

This semaphore ensures that the program functions correctly even when used with multiple event loops. It provides a workaround for edge cases involving multiple threads and event loops by using a separate semaphore for each thread.

Example

semaphore = ThreadsafeSemaphore(5)

async def limited():
async with semaphore:

return 1

See also

Semaphore for the base class implementation.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the threadsafe semaphore with a given value and optional name.

Parameters:
  • value – The initial value for the semaphore, should be an integer.

  • name (optional) – An optional name for the semaphore.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.

Returns:

True when the semaphore is successfully acquired.

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

int

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

semaphore

Semaphore

Returns the appropriate semaphore for the current thread.

NOTE: We can’t cache this property because we need to check the current thread every time we access it.

Example

semaphore = ThreadsafeSemaphore(5)

async def limited():
async with semaphore.semaphore:

return 1

Type:

ThreadsafeSemaphore.semaphore

class a_sync.cached_property

Bases: ASyncCachedPropertyDescriptor[I, T]

Descriptor for defining cached properties that can be accessed both synchronously and asynchronously.

__init__(self, _fget: AsyncGetterFunction[I, T], _fset=None, _fdel=None, field_name=None, **modifiers: Unpack[ModifierKwargs]) None

Initializes the ASyncCachedPropertyDescriptor.

Parameters:
  • _fget (Callable[[I], Awaitable[T]]) – The function to be wrapped.

  • _fset – Optional setter function for the property.

  • _fdel – Optional deleter function for the property.

  • field_name – Optional name for the field. If not provided, the function’s name will be used.

  • **modifiers (Unpack[ModifierKwargs]) – Additional modifier arguments.

Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)

Check if all results are truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._all([1, 2, 3]) ```

async _any(*instances, concurrency=None, name='', **kwargs)

Check if any result is truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._any([-1, 0, 1]) ```

_asyncify(self, func: SyncFn[P, T]) CoroFn[P, T]

Converts a synchronous function to an asynchronous one and applies async modifiers.

Parameters:

func (Callable[[~P], T]) – The synchronous function to be converted.

Returns:

The asynchronous version of the function with applied modifiers.

Return type:

Callable[[~P], Awaitable[T]]

See also

  • ModifierManager.apply_async_modifiers()

_check_method_name(method, method_type)
_check_method_sync(method, method_type)
async _max(*instances, concurrency=None, name='', **kwargs)

Find the maximum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._max([3, 1, 2]) ```

async _min(*instances, concurrency=None, name='', **kwargs)

Find the minimum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._min([3, 1, 2]) ```

async _sum(*instances, concurrency=None, name='', **kwargs)

Calculate the sum of results.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to sum.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._sum([1, 2, 3]) ```

already_loaded(instance)
del_cache_value(instance)
deleter(method)
async get(self, instance: I, owner: Type[I] | None = None) T

Asynchronously retrieves the property value.

Parameters:
  • instance (I) – The instance from which the property is accessed.

  • owner (Type[I] | None) – The owner class of the property.

Returns:

The property value.

Return type:

T

get_cache(instance)
get_cache_value(instance)
get_instance_state(instance)
get_loader(self, instance: I) Callable[[], T]

Retrieves the loader function for the property.

Parameters:

instance (I) – The instance from which the property is accessed.

Returns:

A callable that loads the property value.

Return type:

Callable[[], T]

get_lock(self, instance: I) 'asyncio.Task[T]'

Retrieves the lock for the property.

Parameters:

instance (I) – The instance from which the property is accessed.

Returns:

An asyncio Task representing the lock.

Return type:

Task[T]

has_cache_value(instance)
map(self, instances: AnyIterable[I], owner: Optional[Type[I]] = None, concurrency: Optional[int] = None, unicode name: str = u'') 'TaskMapping[I, T]'

Maps the property across multiple instances.

Parameters:
  • instances (AnyIterable[I]) – An iterable of instances.

  • owner (Optional[Type[I]]) – The owner class of the property.

  • concurrency (Optional[int]) – Optional concurrency limit.

  • name (str) – Optional name for the task mapping.

Returns:

A TaskMapping object.

Return type:

TaskMapping[I, T]

not_loaded(instance)
pop_lock(self, instance: I) None

Removes the lock for the property.

Parameters:

instance (I) – The instance from which the property is accessed.

Return type:

None

set_cache_value(instance, value)
setter(method)
property _TaskMapping: Type[TaskMapping]

This silly helper just fixes a circular import

property _await: Callable[[Awaitable[T]], T]

Applies sync modifiers to the _helpers._await function and caches it.

Returns:

The modified _await function.

See also

  • ModifierManager.apply_sync_modifiers()

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if all results are truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.all([1, 2, 3])

property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if any result is truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.any([-1, 0, 1])

property default: Literal['sync', 'async', None]

Gets the default execution mode (sync, async, or None) for the function.

Returns:

The default execution mode.

See also

  • ModifierManager.default

field_name

The name of the field the ASyncDescriptor is bound to.

hidden_method_descriptor
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the maximum result.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.max([3, 1, 2])

property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the minimum result.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.min([3, 1, 2]) ```

modifiers
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the sum of results.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.sum([1, 2, 3]) ```

wrapped
a_sync.filter

alias of ASyncFilter

a_sync.map

alias of TaskMapping

class a_sync.property

Bases: ASyncPropertyDescriptor[I, T]

Descriptor for defining properties that can be accessed both synchronously and asynchronously.

__init__(self, _fget: AsyncGetterFunction[I, T], field_name: str | None = None, **modifiers: Unpack[ModifierKwargs]) None

Initializes the _ASyncPropertyDescriptorBase.

Parameters:
  • _fget (Callable[[I], Awaitable[T]]) – The function to be wrapped.

  • field_name (str | None) – Optional name for the field. If not provided, the function’s name will be used.

  • **modifiers (Unpack[ModifierKwargs]) – Additional modifier arguments.

Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)

Check if all results are truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._all([1, 2, 3]) ```

async _any(*instances, concurrency=None, name='', **kwargs)

Check if any result is truthy.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

bool

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method._any([-1, 0, 1]) ```

_asyncify(self, func: SyncFn[P, T]) CoroFn[P, T]

Converts a synchronous function to an asynchronous one and applies async modifiers.

Parameters:

func (Callable[[~P], T]) – The synchronous function to be converted.

Returns:

The asynchronous version of the function with applied modifiers.

Return type:

Callable[[~P], Awaitable[T]]

See also

  • ModifierManager.apply_async_modifiers()

async _max(*instances, concurrency=None, name='', **kwargs)

Find the maximum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._max([3, 1, 2]) ```

async _min(*instances, concurrency=None, name='', **kwargs)

Find the minimum result.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to check.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._min([3, 1, 2]) ```

async _sum(*instances, concurrency=None, name='', **kwargs)

Calculate the sum of results.

Parameters:
  • *instances (AsyncIterable[I] | Iterable[I]) – Iterable of instances to sum.

  • concurrency (int | None) – Optional maximum number of concurrent tasks.

  • name (str) – Optional name for the task.

  • **kwargs (~P) – Additional keyword arguments.

Return type:

T

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method._sum([1, 2, 3]) ```

awaitable_only(instance)
async get(self, instance: I, owner: Type[I] | None = None) T

Asynchronously retrieves the property value.

Parameters:
  • instance (I) – The instance from which the property is accessed.

  • owner (Type[I] | None) – The owner class of the property.

Returns:

The property value.

Return type:

T

get_loader(instance)
map(self, instances: AnyIterable[I], owner: Optional[Type[I]] = None, concurrency: Optional[int] = None, unicode name: str = u'') 'TaskMapping[I, T]'

Maps the property across multiple instances.

Parameters:
  • instances (AnyIterable[I]) – An iterable of instances.

  • owner (Optional[Type[I]]) – The owner class of the property.

  • concurrency (Optional[int]) – Optional concurrency limit.

  • name (str) – Optional name for the task mapping.

Returns:

A TaskMapping object.

Return type:

TaskMapping[I, T]

property _TaskMapping: Type[TaskMapping]

This silly helper just fixes a circular import

property _await: Callable[[Awaitable[T]], T]

Applies sync modifiers to the _helpers._await function and caches it.

Returns:

The modified _await function.

See also

  • ModifierManager.apply_sync_modifiers()

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if all results are truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.all([1, 2, 3])

property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]

Create an ASyncFunction that checks if any result is truthy.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x > 0

instance = MyClass() result = await instance.my_method.any([-1, 0, 1])

property default: Literal['sync', 'async', None]

Gets the default execution mode (sync, async, or None) for the function.

Returns:

The default execution mode.

See also

  • ModifierManager.default

field_name

The name of the field the ASyncDescriptor is bound to.

hidden_method_descriptor
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the maximum result.

Returns:

An ASyncFunction object.

Examples

class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.max([3, 1, 2])

property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the minimum result.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.min([3, 1, 2]) ```

modifiers
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]

Create an ASyncFunction that returns the sum of results.

Returns:

An ASyncFunction object.

Examples

```python class MyClass:

@ASyncDescriptor def my_method(self, x):

return x

instance = MyClass() result = await instance.my_method.sum([1, 2, 3]) ```

wrapped
a_sync.sorted

alias of ASyncSorter

a_sync.a_sync(coro_fn=None, default=None, **modifiers)[source]

A versatile decorator that enables both synchronous and asynchronous execution of functions.

This decorator allows a function to be called either synchronously or asynchronously, depending on the context and parameters. It provides a powerful way to write code that can be used in both synchronous and asynchronous environments.

Parameters:
  • coro_fn (Callable[[~P], Awaitable[T]] | Callable[[~P], T] | None) – The function to be decorated. Can be either a coroutine function or a regular function.

  • default (Literal['sync', 'async', None] | None) – Determines the default execution mode. Can be ‘async’, ‘sync’, or None. If None, the mode is inferred from the decorated function type.

  • **modifiers (Unpack[ModifierKwargs]) – Additional keyword arguments to modify the behavior of the decorated function. See ModifierKwargs for available options.

Return type:

ASyncDecorator | ASyncFunction[~P, T]

Modifiers:

The following modifiers can be used to customize the behavior of the decorator:

  • cache_type: Can be None or ‘memory’. ‘memory’ is an LRU cache which can be modified with the ‘cache_typed’, ‘ram_cache_maxsize’, and ‘ram_cache_ttl’ modifiers.

  • cache_typed: Set to True if you want types considered for cache keys. For example, with cache_typed=True, Decimal(0) and 0 will be considered separate keys.

  • ram_cache_maxsize: The max size for your LRU cache. None if the cache is unbounded. If you set this value without specifying a cache type, ‘memory’ will automatically be applied.

  • ram_cache_ttl: The TTL for items in your LRU cache. Set to None. If you set this value without specifying a cache type, ‘memory’ will automatically be applied.

  • runs_per_minute: Setting this value enables a rate limiter for the decorated function.

  • semaphore: Drop in a Semaphore for your async defined functions.

  • executor: The executor for the synchronous function. Set to the library’s default of config.default_sync_executor.

Examples

The decorator can be used in several ways.

  1. As a simple decorator:
    >>> @a_sync
    ... async def some_async_fn():
    ...     return True
    >>> await some_async_fn()
    True
    >>> some_async_fn(sync=True)
    True
    
    >>> @a_sync
    ... def some_sync_fn():
    ...     return True
    >>> some_sync_fn()
    True
    >>> some_sync_fn(sync=False)
    <coroutine object some_sync_fn at 0x7fb4f5fb49c0>
    
  2. As a decorator with default mode specified:
    >>> @a_sync(default='sync')
    ... async def some_fn():
    ...     return True
    ...
    >>> some_fn()
    True
    >>> some_fn(sync=False)
    <coroutine object some_fn at 0x7fb4f5fb49c0>
    
    >>> @a_sync('async')
    ... def some_fn():
    ...     return True
    ...
    >>> some_fn()
    <coroutine object some_fn at 0x7fb4f5fb49c0>
    >>> some_fn(asynchronous=False)
    True
    
  3. As a decorator with modifiers:
    >>> @a_sync(cache_type='memory', runs_per_minute=60)
    ... async def some_fn():
    ...    return True
    ...
    >>> some_fn(sync=True)
    True
    
  4. Applied directly to a function:
    >>> some_fn = a_sync(some_existing_function, default='sync')
    >>> some_fn()
    "some return value"
    
The decorated function can then be called either synchronously or asynchronously:
>>> result = some_fn()  # Synchronous call
>>> result = await some_fn()  # Asynchronous call
The execution mode can also be explicitly specified during the call:
>>> result = some_fn(sync=True)  # Force synchronous execution
>>> result = await some_fn(sync=False)  # Force asynchronous execution

This decorator is particularly useful for libraries that need to support both synchronous and asynchronous usage, or for gradually migrating synchronous code to asynchronous without breaking existing interfaces.

Note

If the coro_fn argument is passed as ‘async’ or ‘sync’, it is treated as the default argument, and coro_fn is set to None.

See also

ASyncFunction, ASyncDecorator

async a_sync.all(*awaitables)[source]

Asynchronously evaluates whether all of the given awaitables evaluate to True.

This function takes multiple awaitable objects and returns True if all of them evaluate to True. It cancels the remaining awaitables once a False result is found.

Parameters:

*awaitables – A variable length list of awaitable objects.

Returns:

True if all elements are truthy or the iterable is empty, False otherwise.

Return type:

bool

Example

>>> async def is_even(x):
...    return x % 2 == 0
...
>>> numbers = [2, 4, 6, 8]
>>> result = await all(*[is_even(x) for x in numbers])
>>> result
True
>>> numbers = [2, 3, 4, 6]
>>> result = await all(*[is_even(x) for x in numbers])
>>> result
False
async a_sync.any(*awaitables)[source]

Asynchronously evaluates whether any of the given awaitables evaluates to True.

This function returns True if any element in the asynchronous iterable is truthy. It short-circuits on the first truthy value. If the iterable is empty, it returns False.

Parameters:

*awaitables – A variable length list of awaitable objects.

Returns:

True if any element is truthy, False if all are falsy or the iterable is empty.

Return type:

bool

Example

>>> async def is_odd(x):
...     await asyncio.sleep(0.1)  # Simulate some async work
...     return x % 2 != 0
...
>>> numbers = [2, 4, 6, 7]
>>> result = await any(*[is_odd(x) for x in numbers])
>>> result
True
>>> numbers = [2, 4, 6, 8]
>>> result = await any(*[is_odd(x) for x in numbers])
>>> result
False

Note

This function will stop iterating as soon as it encounters a truthy value.

a_sync.as_completed(fs, *, timeout: float | None = None, return_exceptions: bool = False, aiter: bool = False, tqdm: bool = False, **tqdm_kwargs: Any)

Concurrently awaits a list of awaitable objects or mappings of awaitables and returns an iterator of results.

This function extends Python’s asyncio.as_completed(), providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables.

Differences from asyncio.as_completed(): - Uses type hints for use with static type checkers. - Supports either individual awaitables or a k:v mapping of awaitables. - Can be used as an async iterator which yields the result values using ASyncIterator. - Provides progress reporting using tqdm if ‘tqdm’ is set to True.

Note

The return_exceptions parameter is used to wrap awaitables with exceptions if set to True, allowing exceptions to be returned as results instead of being raised.

Parameters:
  • fs – The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables.

  • timeout (float | None) – The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout).

  • return_exceptions (bool) – If True, exceptions are wrapped and returned as results instead of raising them. Defaults to False.

  • aiter (bool) – If True, returns an async iterator of results using ASyncIterator. Defaults to False.

  • tqdm (bool) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs (Any) – Additional keyword arguments for tqdm if progress reporting is enabled.

Examples

Awaiting individual awaitables:

>>> awaitables = [async_function1(), async_function2()]
>>> for coro in as_completed(awaitables):
...     val = await coro
...     ...
>>> async for val in as_completed(awaitables, aiter=True):
...     ...

Awaiting mappings of awaitables:

>>> mapping = {'key1': async_function1(), 'key2': async_function2()}
>>> for coro in as_completed(mapping):
...     k, v = await coro
...     ...
>>> async for k, v in as_completed(mapping, aiter=True):
...     ...
async a_sync.as_yielded(*iterators)[source]

Merges multiple async iterators into a single async iterator that yields items as they become available from any of the source iterators.

This function is designed to streamline the handling of multiple asynchronous data streams by consolidating them into a single asynchronous iteration context. It enables concurrent fetching and processing of items from multiple sources, improving efficiency and simplifying code structure when dealing with asynchronous operations.

The merging process is facilitated by the exhaust_iterators() function, which concurrently processes the source iterators and places their items into a queue. This mechanism ensures that the merged stream of items is delivered in an order determined by the availability of items from the source iterators, rather than their original sequence.

The function handles exceptions and ensures robustness and reliability by using asyncio tasks and queues. It manages edge cases such as early termination and exception management. The _Done sentinel class is used internally to signal the completion of processing.

Parameters:

*iterators (AsyncIterator[T]) – Variable length list of AsyncIterator objects to be merged.

Return type:

AsyncIterator[T]

Note

This implementation leverages asyncio tasks and queues to efficiently manage the asynchronous iteration and merging process. It handles edge cases such as early termination and exception management, ensuring robustness and reliability. The _Done sentinel class is used internally to signal the completion of processing.

Example

>>> async def example():
>>>     async for item in as_yielded(iterator1, iterator2):
>>>         print(item)

See also

  • exhaust_iterator()

  • exhaust_iterators()

a_sync.create_task(coro: Awaitable[T], *, unicode name: str = u'', skip_gc_until_done: bint = False, log_destroy_pending: bint = True) 'asyncio.Task[T]'

Extends asyncio.create_task() to support any Awaitable, manage task lifecycle, and enhance error handling.

This function accepts any Awaitable, ensuring broader compatibility. If the Awaitable is not a coroutine, it is awaited directly using a private helper function __await, which can handle non-coroutine Awaitable objects.

Note

The __await function is designed to handle non-coroutine Awaitables by awaiting them directly.

Parameters:
  • coro (Awaitable[T]) – An Awaitable object from which to create the task.

  • name (str) – Optional name for the task, aiding in debugging.

  • skip_gc_until_done (bint) – If True, the task is kept alive until it completes, preventing garbage collection. Exceptions are wrapped in PersistedTaskException for special handling within the __persisted_task_exc_wrap function.

  • log_destroy_pending (bint) – If False, asyncio’s default error log when a pending task is destroyed is suppressed.

Return type:

asyncio.Task[T]

Examples

Create a simple task with a coroutine:

>>> async def my_coroutine():
...     return "Hello, World!"
>>> task = create_task(my_coroutine())

Create a task with a non-coroutine Awaitable:

>>> from concurrent.futures import Future
>>> future = Future()
>>> task = create_task(future)
async a_sync.gather(*awaitables: Awaitable[T] | Mapping[K, Awaitable[V]], return_exceptions: bool = False, exclude_if: Excluder[T] | None = None, tqdm: bool = False, **tqdm_kwargs: Any) List[T] | Dict[K, V]

Concurrently awaits a list of awaitable objects or a k:v mapping of awaitables, and returns the results.

This function extends Python’s asyncio.gather(), providing additional features for handling either individual awaitable objects or a single mapping of awaitables.

Differences from asyncio.gather(): - Uses type hints for use with static type checkers. - Supports gathering either individual awaitables or a k:v mapping of awaitables. - Provides progress reporting using tqdm if ‘tqdm’ is set to True. - Allows exclusion of results based on a condition using the ‘exclude_if’ parameter. Note: This is only applied when the input is not a mapping.

Parameters:
  • *awaitables (Awaitable[T] | Mapping[K, Awaitable[V]]) – The awaitables to await concurrently. It can be a list of individual awaitables or a single mapping of awaitables.

  • return_exceptions (bool, optional) – If True, exceptions are returned as results instead of raising them. Defaults to False.

  • exclude_if (Optional[Excluder[T]], optional) – A callable that takes a result and returns True if the result should be excluded from the final output. Defaults to None. Note: This is only applied when the input is not a mapping.

  • tqdm (bool, optional) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs – Additional keyword arguments for tqdm if progress reporting is enabled.

Return type:

List[T] | Dict[K, V]

Examples

Awaiting individual awaitables:

>>> results = await gather(thing1(), thing2())
>>> results
['result', 123]

Awaiting a mapping of awaitables:

>>> mapping = {'key1': thing1(), 'key2': thing2()}
>>> results = await gather(mapping)
>>> results
{'key1': 'result', 'key2': 123}

See also

asyncio.gather()