a_sync package

Subpackages

Submodules

a_sync.ENVIRONMENT_VARIABLES module

a_sync._smart module

class a_sync._smart.SmartFuture[source]

Bases: _SmartFutureMixin[T], Future

__init__(*, queue, key=None, loop=None)[source]
Parameters:
Return type:

None

__iter__()

Implement iter(self).

__new__(**kwargs)
_make_cancelled_error()

Create the CancelledError to raise if the Future is cancelled.

This should only be called once when handling a cancellation since it erases the context exception value.

_repr_info()
_self_done_cleanup_callback()
Parameters:

self (SmartFuture | SmartTask)

Return type:

None

_waiter_done_cleanup_callback(waiter)

Removes the waiter from _waiters, and _queue._futs if applicable

Parameters:
Return type:

None

add_done_callback()

Add a callback to be run when the future becomes done.

The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.

cancel(msg=None)

Cancel the future and schedule callbacks.

If the future is already done or cancelled, return False. Otherwise, change the future’s state to cancelled, schedule the callbacks and return True.

cancelled()

Return True if the future was cancelled.

done()

Return True if the future is done.

Done means either that a result / exception are available, or that the future was cancelled.

exception()

Return the exception that was set on this future.

The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn’t done yet, raises InvalidStateError.

get_loop()

Return the event loop the Future is bound to.

remove_done_callback(fn, /)

Remove all instances of a callback from the “call when done” list.

Returns the number of callbacks removed.

result()

Return the result this future represents.

If the future has been cancelled, raises CancelledError. If the future’s result isn’t yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised.

set_exception(exception, /)

Mark the future done and set an exception.

If the future is already done when this method is called, raises InvalidStateError.

set_result(result, /)

Mark the future done and set its result.

If the future is already done when this method is called, raises InvalidStateError.

_asyncio_future_blocking
_callbacks
_cancel_message
_exception
_key: _Key = None
_log_traceback
_loop
_queue: 'SmartProcessingQueue[Any, Any, T]' | None = None
_result
_source_traceback
_state
_waiters: weakref.WeakSet[SmartTask[T]]
property num_waiters: int
class a_sync._smart.SmartTask[source]

Bases: _SmartFutureMixin[T], Task

__init__(coro, *, loop=None, name=None)[source]
Parameters:
  • coro (Awaitable[T])

  • loop (AbstractEventLoop | None)

  • name (str | None)

Return type:

None

__iter__()

Implement iter(self).

__new__(**kwargs)
_make_cancelled_error()

Create the CancelledError to raise if the Task is cancelled.

This should only be called once when handling a cancellation since it erases the context exception value.

_repr_info()
_self_done_cleanup_callback()
Parameters:

self (SmartFuture | SmartTask)

Return type:

None

_waiter_done_cleanup_callback(waiter)

Removes the waiter from _waiters, and _queue._futs if applicable

Parameters:
Return type:

None

add_done_callback()

Add a callback to be run when the future becomes done.

The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.

cancel(msg=None)

Request that this task cancel itself.

This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop. The coroutine then has a chance to clean up or even deny the request using try/except/finally.

Unlike Future.cancel, this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.

Immediately after this method is called, Task.cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).

cancelled()

Return True if the future was cancelled.

done()

Return True if the future is done.

Done means either that a result / exception are available, or that the future was cancelled.

exception()

Return the exception that was set on this future.

The exception (or None if no exception was set) is returned only if the future is done. If the future has been cancelled, raises CancelledError. If the future isn’t done yet, raises InvalidStateError.

get_coro()
get_loop()

Return the event loop the Future is bound to.

get_name()
get_stack(*, limit=None)

Return the list of stack frames for this task’s coroutine.

If the coroutine is not done, this returns the stack where it is suspended. If the coroutine has completed successfully or was cancelled, this returns an empty list. If the coroutine was terminated by an exception, this returns the list of traceback frames.

The frames are always ordered from oldest to newest.

The optional limit gives the maximum number of frames to return; by default all available frames are returned. Its meaning differs depending on whether a stack or a traceback is returned: the newest frames of a stack are returned, but the oldest frames of a traceback are returned. (This matches the behavior of the traceback module.)

For reasons beyond our control, only one stack frame is returned for a suspended coroutine.

print_stack(*, limit=None, file=None)

Print the stack or traceback for this task’s coroutine.

This produces output similar to that of the traceback module, for the frames retrieved by get_stack(). The limit argument is passed to get_stack(). The file argument is an I/O stream to which the output is written; by default output is written to sys.stderr.

remove_done_callback(fn, /)

Remove all instances of a callback from the “call when done” list.

Returns the number of callbacks removed.

result()

Return the result this future represents.

If the future has been cancelled, raises CancelledError. If the future’s result isn’t yet available, raises InvalidStateError. If the future is done and has an exception set, this exception is raised.

set_exception(exception, /)

Mark the future done and set an exception.

If the future is already done when this method is called, raises InvalidStateError.

set_name(value, /)
set_result(result, /)

Mark the future done and set its result.

If the future is already done when this method is called, raises InvalidStateError.

_asyncio_future_blocking
_callbacks
_cancel_message
_coro
_exception
_fut_waiter
_key: _Key
_log_destroy_pending
_log_traceback
_loop
_must_cancel
_queue: 'SmartProcessingQueue[Any, Any, T]' | None = None
_result
_source_traceback
_state
_waiters: Set['asyncio.Task[T]']
property num_waiters: int
a_sync._smart.create_future(*, queue=None, key=None, loop=None)[source]
Parameters:
Return type:

SmartFuture[V]

a_sync._smart.set_smart_task_factory(loop=None)[source]
Parameters:

loop (AbstractEventLoop | None)

Return type:

None

a_sync._smart.shield(arg, *, loop=None)[source]

Wait for a future, shielding it from cancellation.

The statement

res = await shield(something())

is exactly equivalent to the statement

res = await something()

except that if the coroutine containing it is cancelled, the task running in something() is not cancelled. From the POV of something(), the cancellation did not happen. But its caller is still cancelled, so the yield-from expression still raises CancelledError. Note: If something() is cancelled by other means this will still cancel shield().

If you want to completely ignore cancellation (not recommended) you can combine shield() with a try/except clause, as follows:

try:

res = await shield(something())

except CancelledError:

res = None

Parameters:
  • arg (Awaitable[T])

  • loop (AbstractEventLoop | None)

Return type:

SmartFuture[T]

a_sync._smart.smart_task_factory(loop, coro)[source]
Parameters:
  • loop (AbstractEventLoop)

  • coro (Awaitable[T])

Return type:

SmartTask[T]

a_sync._typing module

class a_sync._typing.AsyncUnboundMethod[source]

Bases: Protocol[I, P, T]

__init__(*args, **kwargs)
class a_sync._typing.CoroBoundMethod[source]

Bases: Protocol[I, P, T]

__init__(*args, **kwargs)
__call__: Callable[[P], Awaitable[T]] = <method-wrapper '__call__' of _ProtocolMeta object>
class a_sync._typing.ModifierKwargs[source]

Bases: TypedDict

__getitem__()

x.__getitem__(y) <==> x[y]

__init__(*args, **kwargs)
__iter__()

Implement iter(self).

__new__(**kwargs)
clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values
cache_type: Literal['memory', None]
cache_typed: bool
default: Literal['sync', 'async', None]
executor: Executor
ram_cache_maxsize: int | None
ram_cache_ttl: int | float | Decimal | None
runs_per_minute: int | None
semaphore: Semaphore | int | None
class a_sync._typing.SyncBoundMethod[source]

Bases: Protocol[I, P, T]

__init__(*args, **kwargs)
__call__: Callable[[P], T] = <method-wrapper '__call__' of _ProtocolMeta object>
class a_sync._typing.SyncUnboundMethod[source]

Bases: Protocol[I, P, T]

__init__(*args, **kwargs)

a_sync.aliases module

a_sync.exceptions module

exception a_sync.exceptions.ASyncFlagException[source]

Bases: ValueError

__init__(*args, **kwargs)
__new__(**kwargs)
desc(target)[source]
Return type:

str

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
property viable_flags: Set[str]
exception a_sync.exceptions.ASyncRuntimeError[source]

Bases: RuntimeError

__init__(e)[source]
Parameters:

e (RuntimeError)

__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.EmptySequenceError[source]

Bases: ValueError

__init__(*args, **kwargs)
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.FlagNotDefined[source]

Bases: ASyncFlagException

__init__(obj, flag)[source]
Parameters:
__new__(**kwargs)
desc(target)
Return type:

str

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
property viable_flags: Set[str]
exception a_sync.exceptions.FunctionNotAsync[source]

Bases: ImproperFunctionType

__init__(fn)[source]
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.FunctionNotSync[source]

Bases: ImproperFunctionType

__init__(fn)[source]
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.ImproperFunctionType[source]

Bases: ValueError

__init__(*args, **kwargs)
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.InvalidFlag[source]

Bases: ASyncFlagException

__init__(flag)[source]
Parameters:

flag (str | None)

__new__(**kwargs)
desc(target)
Return type:

str

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
property viable_flags: Set[str]
exception a_sync.exceptions.InvalidFlagValue[source]

Bases: ASyncFlagException

__init__(flag, flag_value)[source]
Parameters:
__new__(**kwargs)
desc(target)
Return type:

str

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
property viable_flags: Set[str]
exception a_sync.exceptions.KwargsUnsupportedError[source]

Bases: ValueError

__init__()[source]
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.MappingError[source]

Bases: Exception

__init__(mapping, msg='')[source]
Parameters:
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

_msg: str
args
exception a_sync.exceptions.MappingIsEmptyError[source]

Bases: MappingError

__init__(mapping, msg='')
Parameters:
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

_msg: str = 'TaskMapping does not contain anything to yield'
args
exception a_sync.exceptions.MappingNotEmptyError[source]

Bases: MappingError

__init__(mapping, msg='')
Parameters:
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

_msg: str = 'TaskMapping already contains some data. In order to use `map`, you need a fresh one'
args
exception a_sync.exceptions.NoFlagsFound[source]

Bases: ASyncFlagException

__init__(target, kwargs_keys=None)[source]
__new__(**kwargs)
desc(target)
Return type:

str

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
property viable_flags: Set[str]
exception a_sync.exceptions.PersistedTaskException[source]

Bases: Exception

__init__(exc, task)[source]
Parameters:
  • exc (E)

  • task (Task)

Return type:

None

__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.SyncModeInAsyncContextError[source]

Bases: ASyncRuntimeError

__init__()[source]
__new__(**kwargs)
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
exception a_sync.exceptions.TooManyFlags[source]

Bases: ASyncFlagException

__init__(target, present_flags)[source]
__new__(**kwargs)
desc(target)
Return type:

str

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

args
property viable_flags: Set[str]

a_sync.executor module

With these executors, you can simply run sync fns in your executor with await executor.run(fn, *args)

executor.submit(fn, *args) will work the same as the concurrent.futures implementation, but will return an asyncio.Future instead of a concurrent.futures.Future

class a_sync.executor.AsyncProcessPoolExecutor[source]

Bases: _AsyncExecutorMixin, ProcessPoolExecutor

__init__(max_workers=None, mp_context=None, initializer=None, initargs=())[source]
Parameters:
  • max_workers (int | None)

  • mp_context (BaseContext | None)

  • initializer (Callable[[...], object] | None)

  • initargs (Tuple[Any, ...])

Return type:

None

_adjust_process_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item

Parameters:

fut (Future)

Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_launch_processes()
_spawn_process()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_start_executor_manager_thread()
_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args) Doesn’t await this_executor.run(fn, *args) look so much better?

Oh, and you can also use kwargs!

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

T

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

Future[T]

_broken
_call_queue
_daemon
_initargs
_initializer
_max_workers: int
_mp_context
_pending_work_items
_processes
_queue_count
_queue_management_thread
_queue_management_thread_wakeup
_result_queue
_shutdown_lock
_shutdown_thread
_work_ids
_workers: str = 'processes'
property debug_logs_enabled: bool
property logger: Logger
property sync_mode: bool
property worker_count_current: int
class a_sync.executor.AsyncThreadPoolExecutor[source]

Bases: _AsyncExecutorMixin, ThreadPoolExecutor

__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=())[source]
Parameters:
Return type:

None

_adjust_thread_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item

Parameters:

fut (Future)

Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_initializer_failed()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args) Doesn’t await this_executor.run(fn, *args) look so much better?

Oh, and you can also use kwargs!

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

T

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

Future[T]

_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_daemon
_idle_semaphore
_initargs
_initializer
_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_work_queue
_workers: str = 'threads'
property debug_logs_enabled: bool
property logger: Logger
property sync_mode: bool
property worker_count_current: int
class a_sync.executor.PruningThreadPoolExecutor[source]

Bases: AsyncThreadPoolExecutor

This ThreadPoolExecutor implementation prunes inactive threads after ‘timeout’ seconds without a work item. Pruned threads will be automatically recreated as needed for future workloads. up to ‘max_threads’ can be active at any one time.

__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), timeout=600)[source]
_adjust_thread_count()[source]
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item

Parameters:

fut (Future)

Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_initializer_failed()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args) Doesn’t await this_executor.run(fn, *args) look so much better?

Oh, and you can also use kwargs!

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

T

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

Future[T]

_adjusting_lock
_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_daemon
_idle_semaphore
_initargs
_initializer
_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_timeout
_work_queue
_workers: str = 'threads'
property debug_logs_enabled: bool
property logger: Logger
property sync_mode: bool
property worker_count_current: int

a_sync.future module

class a_sync.future.ASyncFuture[source]

Bases: Future, Awaitable[T]

async __add(other)
Return type:

Any

async __await()
Return type:

T

async __contains(item)
Parameters:

item (Any)

Return type:

bool

async __eq(other)
Return type:

bool

async __floordiv(other)
Return type:

Any

async __ge(other)
Return type:

bool

__getitem__(key)[source]
Return type:

Any

async __gt(other)
Return type:

bool

async __iadd(other)
Return type:

Any

async __ifloordiv(other)
Return type:

Any

async __imul(other)
Return type:

Any

__init__(awaitable, dependencies=[])[source]

Initializes the future. Should not be called by clients.

Parameters:
Return type:

None

async __ipow(other)
Return type:

Any

async __isub(other)
Return type:

Any

__iter__()[source]
async __itruediv(other)
Return type:

Any

async __le(other)
Return type:

bool

__list_dependencies(other)
Return type:

List[ASyncFuture]

async __lt(other)
Return type:

bool

async __mul(other)
Return type:

Any

__next__()[source]
async __pow(other)
Return type:

Any

async __radd(other)
Return type:

Any

async __rfloordiv(other)
Return type:

Any

async __rmul(other)
Return type:

Any

async __rpow(other)
Return type:

Any

async __rsub(other)
Return type:

Any

async __rtruediv(other)
Return type:

Any

async __sub(other)
Return type:

Any

async __truediv(other)
Return type:

Any

_invoke_callbacks()
add_done_callback(fn)

Attaches a callable that will be called when the future finishes.

Parameters:

fn – A callable that will be called with this future as its only argument when the future completes or is cancelled. The callable will always be called by a thread in the same process in which it was added. If the future has already completed or been cancelled then the callable will be called immediately. These callables are called in the order that they were added.

cancel()

Cancel the future if possible.

Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed.

cancelled()

Return True if the future was cancelled.

done()

Return True if the future was cancelled or finished executing.

exception(timeout=None)

Return the exception raised by the call that the future represents.

Parameters:

timeout – The number of seconds to wait for the exception if the future isn’t done. If None, then there is no limit on the wait time.

Returns:

The exception raised by the call that the future represents or None if the call completed without raising.

Raises:
  • CancelledError – If the future was cancelled.

  • TimeoutError – If the future didn’t finish executing before the given timeout.

running()

Return True if the future is currently executing.

set_exception(exception)

Sets the result of the future as being the given exception.

Should only be used by Executor implementations and unit tests.

set_result(result)

Sets the return value of work associated with the future.

Should only be used by Executor implementations and unit tests.

set_running_or_notify_cancel()

Mark the future as running or process any cancel notifications.

Should only be used by Executor implementations and unit tests.

If the future has been cancelled (cancel() was called and returned True) then any threads waiting on the future completing (though calls to as_completed() or wait()) are notified and False is returned.

If the future was not cancelled then it is put in the running state (future calls to running() will return True) and True is returned.

This method should be called by Executor implementations before executing the work associated with this future. If this method returns False then the work should not be executed.

Returns:

False if the Future was cancelled, True otherwise.

Raises:

RuntimeError – if this method was already called or if set_result() or set_exception() was called.

__dependants: List[ASyncFuture]
__dependencies
__task
property result: Callable[[], T] | Any

If this future is not done, it will work like cf.Future.result. It will block, await the awaitable, and return the result when ready. If this future is done and the result has attribute results, will return getattr(future_result, ‘result’) If this future is done and the result does NOT have attribute results, will again work like cf.Future.result

a_sync.future.future(callable=None, **kwargs)[source]
Parameters:
Return type:

Callable[[~P], T | ASyncFuture[T]]

a_sync.iter module

class a_sync.iter.ASyncFilter[source]

Bases: _ASyncView[T]

__aiter__()

Returns an async iterator for the wrapped async iterable.

Return type:

AsyncIterator[T]

async __anext__()[source]

Asynchronously fetches the next item from the async iterator.

Return type:

T

__init__(function, iterable)

Initializes the ASyncIterator with an async iterator.

Parameters:
Return type:

None

__iter__()
__next__()

Return the next item from the iterator. When exhausted, raise StopIteration

Return type:

T

async _check(obj)[source]
Parameters:

obj (T)

Return type:

bool

filter(function)
Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool])

Return type:

ASyncFilter[T]

sort(*, key=None, reverse=False)
Parameters:
Return type:

ASyncSorter[T]

classmethod wrap(wrapped)

Class method to wrap either an AsyncIterator or an async generator function.

_materialized

Asynchronously iterates through all contents of Self and returns a list containing the results.

property materialized: List[T]

Iterates through all contents of Self and returns a list containing the results.

class a_sync.iter.ASyncGeneratorFunction[source]

Bases: Generic[P, T]

Description:

Encapsulates an asynchronous generator function, providing a mechanism to use it as an asynchronous iterator with enhanced capabilities. This class wraps an async generator function, allowing it to be called with parameters and return an ASyncIterator object. It is particularly useful for situations where an async generator function needs to be used in a manner that is consistent with both synchronous and asynchronous execution contexts.

The ASyncGeneratorFunction class supports dynamic binding to instances, enabling it to be used as a method on class instances. When accessed as a descriptor, it automatically handles the binding to the instance, thereby allowing the wrapped async generator function to be invoked with instance context (‘self’) automatically provided. This feature is invaluable for designing classes that need to expose asynchronous generators as part of their interface while maintaining the ease of use and calling semantics similar to regular methods.

By providing a unified interface to asynchronous generator functions, this class facilitates the creation of APIs that are flexible and easy to use in a wide range of asynchronous programming scenarios. It abstracts away the complexities involved in managing asynchronous generator lifecycles and invocation semantics, making it easier for developers to integrate asynchronous iteration patterns into their applications.

__call__(*args, **kwargs)[source]

Calls the wrapped async generator function with the given arguments and keyword arguments, returning an ASyncIterator.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

ASyncIterator[T]

__cancel_cache_handle(instance)
Parameters:

instance (object)

Return type:

None

__get_cache_handle(instance)
Parameters:

instance (object)

Return type:

TimerHandle

__init__(async_gen_func, instance=None)[source]

Initializes the ASyncGeneratorFunction with the given async generator function and optionally an instance.

Parameters:
Return type:

None

_cache_handle: TimerHandle

An asyncio handle used to pop the bound method from instance.__dict__ 5 minutes after its last use.

field_name

The name of the async generator function.

class a_sync.iter.ASyncIterable[source]

Bases: _AwaitableAsyncIterableMixin[T], Iterable[T]

Description:

A hybrid Iterable/AsyncIterable implementation designed to offer dual compatibility with both synchronous and asynchronous iteration protocols. This class allows objects to be iterated over using either a standard for loop or an async for loop, making it versatile in scenarios where the mode of iteration (synchronous or asynchronous) needs to be flexible or is determined at runtime.

The class achieves this by implementing both __iter__ and __aiter__ methods, enabling it to return appropriate iterator objects that can handle synchronous and asynchronous iteration, respectively. This dual functionality is particularly useful in codebases that are transitioning between synchronous and asynchronous code, or in libraries that aim to support both synchronous and asynchronous usage patterns without requiring the user to manage different types of iterable objects.

__aiter__()

Returns an async iterator for the wrapped async iterable.

Return type:

AsyncIterator[T]

__init__(async_iterable)[source]

Initializes the ASyncIterable with an async iterable.

Parameters:

async_iterable (AsyncIterable[T])

__iter__()[source]

Returns an iterator for the wrapped async iterable.

Return type:

Iterator[T]

filter(function)
Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool])

Return type:

ASyncFilter[T]

sort(*, key=None, reverse=False)
Parameters:
Return type:

ASyncSorter[T]

classmethod wrap(wrapped)[source]

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

_materialized

Asynchronously iterates through all contents of Self and returns a list containing the results.

property materialized: List[T]

Iterates through all contents of Self and returns a list containing the results.

class a_sync.iter.ASyncIterator[source]

Bases: _AwaitableAsyncIterableMixin[T], Iterator[T]

Description:

A hybrid Iterator/AsyncIterator implementation that bridges the gap between synchronous and asynchronous iteration. This class provides a unified interface for iteration that can seamlessly operate in both synchronous (for loop) and asynchronous (async for loop) contexts. It allows the wrapping of asynchronous iterable objects or async generator functions, making them usable in synchronous code without explicitly managing event loops or asynchronous context switches.

By implementing both __next__ and __anext__ methods, ASyncIterator enables objects to be iterated using standard iteration protocols while internally managing the complexities of asynchronous iteration. This design simplifies the use of asynchronous iterables in environments or frameworks that are not inherently asynchronous, such as standard synchronous functions or older codebases being gradually migrated to asynchronous IO.

This class is particularly useful for library developers seeking to provide a consistent iteration interface across synchronous and asynchronous code, reducing the cognitive load on users and promoting code reusability and simplicity.

__aiter__()

Returns an async iterator for the wrapped async iterable.

Return type:

AsyncIterator[T]

async __anext__()[source]

Asynchronously fetches the next item from the async iterator.

Return type:

T

__init__(async_iterator)[source]

Initializes the ASyncIterator with an async iterator.

Parameters:

async_iterator (AsyncIterator[T])

__iter__()
__next__()[source]

Return the next item from the iterator. When exhausted, raise StopIteration

Return type:

T

filter(function)
Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool])

Return type:

ASyncFilter[T]

sort(*, key=None, reverse=False)
Parameters:
Return type:

ASyncSorter[T]

classmethod wrap(wrapped)[source]

Class method to wrap either an AsyncIterator or an async generator function.

_materialized

Asynchronously iterates through all contents of Self and returns a list containing the results.

property materialized: List[T]

Iterates through all contents of Self and returns a list containing the results.

class a_sync.iter.ASyncSorter[source]

Bases: _ASyncView[T]

__aiter__()[source]

Returns an async iterator for the wrapped async iterable.

Return type:

AsyncIterator[T]

__anext__()[source]

Asynchronously fetches the next item from the async iterator.

Return type:

T

__init__(iterable, *, key=None, reverse=False)[source]

Initializes the ASyncIterator with an async iterator.

Parameters:
Return type:

None

__iter__()
__next__()

Return the next item from the iterator. When exhausted, raise StopIteration

Return type:

T

async __sort(reverse)

This method is internal so the original iterator can only ever be consumed once.

Parameters:

reverse (bool)

Return type:

AsyncIterator[T]

filter(function)
Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool])

Return type:

ASyncFilter[T]

sort(*, key=None, reverse=False)
Parameters:
Return type:

ASyncSorter[T]

classmethod wrap(wrapped)

Class method to wrap either an AsyncIterator or an async generator function.

_consumed: bool = False
_materialized

Asynchronously iterates through all contents of Self and returns a list containing the results.

property materialized: List[T]

Iterates through all contents of Self and returns a list containing the results.

reversed: bool = False

a_sync.task module

class a_sync.task.TaskMapping[source]

Bases: DefaultDict[K, asyncio.Task[V]], AsyncIterable[Tuple[K, V]]

A mapping from keys to asyncio Tasks that asynchronously generates and manages tasks based on input iterables.

async __aiter__(pop=False)[source]

aiterate thru all key-task pairs, yielding the key-result pair as each task completes

Parameters:

pop (bool)

Return type:

AsyncIterator[Tuple[K, V]]

__cleanup(t)
Parameters:

t (Task[None])

Return type:

None

__getitem__(item)[source]

x.__getitem__(y) <==> x[y]

Parameters:

item (K)

Return type:

Task[V]

__init__(wrapped_func=None, *iterables, name='', concurrency=None, **wrapped_func_kwargs)[source]
Parameters:
  • wrapped_func (Callable[[Concatenate[K, ~P]], Awaitable[V]] | None) – A function that takes a key (and optional parameters) and returns an Awaitable.

  • *iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]]) – Any number of iterables whose elements will be used as keys for task generation.

  • name (str) – An optional name for the tasks created by this mapping.

  • **wrapped_func_kwargs (~P) – Keyword arguments that will be passed to wrapped_func.

  • concurrency (int | None)

  • **wrapped_func_kwargs

Return type:

None

__iter__()

Implement iter(self).

__new__(**kwargs)
_if_pop_check_destroyed(pop)[source]
Parameters:

pop (bool)

Return type:

None

async _if_pop_clear(pop)[source]
Parameters:

pop (bool)

Return type:

None

_raise_if_empty(msg='')[source]
Parameters:

msg (str)

Return type:

None

_raise_if_not_empty()[source]
Return type:

None

_tasks_for_iterables(*iterables)[source]

Ensure tasks are running for each key in the provided iterables.

Parameters:

iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]])

Return type:

AsyncIterator[Tuple[K, Task[V]]]

async _wait_for_next_key()[source]
Return type:

None

clear() None.  Remove all items from D.[source]
Parameters:

cancel (bool)

Return type:

None

async close()[source]
Return type:

None

copy() a shallow copy of D.
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

keys() a set-like object providing a view on D's keys[source]
Parameters:

pop (bool)

Return type:

TaskMappingKeys[K, V]

map(*iterables, pop=True, yields='both')[source]

Asynchronously map iterables to tasks and yield their results.

Args: *iterables: Iterables to map over. pop: Whether to remove tasks from the internal storage once they are completed. yields: Whether to yield ‘keys’, ‘values’, or ‘both’ (key-value pairs).

Yields: Depending on yields, either keys, values, or tuples of key-value pairs representing the results of completed tasks.

Parameters:
Return type:

AsyncIterator[Tuple[K, V]]

pop(k[, d]) v, remove specified key and return the corresponding value.[source]

If the key is not found, return the default if given; otherwise, raise a KeyError.

Parameters:
  • args (K)

  • cancel (bool)

Return type:

Task[V] | Future[V]

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

yield_completed(pop=True)[source]

Asynchronously yield tuples of key-value pairs representing the results of any completed tasks.

Parameters:

pop (bool) – Whether to remove tasks from the internal storage once they are completed.

Yields:

Tuples of key-value pairs representing the results of completed tasks.

Return type:

AsyncIterator[Tuple[K, V]]

__init_loader_coro: Awaitable[None] | None = None

An optional asyncio Coroutine to be run by the _init_loader

_destroyed: bool = False

Boolean indicating whether his mapping has been consumed and is no longer usable for aggregations.

property _init_loader: Task[None] | None

An asyncio Task used to preload values from the iterables.

_init_loader_next: Callable[[], Awaitable[Tuple[Tuple[K, Task[V]]]]] | None = None

A coro function that blocks until the _init_loader starts a new task(s), and then returns a Tuple[Tuple[K, asyncio.Task[V]]] with all of the new tasks and the keys that started them.

_name: str | None = None

Optional name for tasks created by this mapping.

_next: Event = None

An asyncio Event that indicates the next result is ready

property _queue: ProcessingQueue
_wrapped_func

The function used to create tasks for each key.

_wrapped_func_kwargs: Dict[str, Any] = {}

Additional keyword arguments passed to _wrapped_func.

all[source]
Parameters:

pop (bool)

Return type:

bool

any[source]
Parameters:

pop (bool)

Return type:

bool

concurrency: int | None = None

The max number of tasks that will run at one time.

default_factory

Factory for default value called by __missing__().

gather[source]

Wait for all tasks to complete and return a dictionary of the results.

Parameters:
Return type:

Dict[K, V]

max[source]
Parameters:

pop (bool)

Return type:

V

min[source]
Parameters:

pop (bool)

Return type:

V

sum[source]
Parameters:

pop (bool)

Return type:

V

class a_sync.task.TaskMappingItems[source]

Bases: _TaskMappingView[Tuple[K, V], K, V], Generic[K, V]

async __aiter__()[source]
Return type:

AsyncIterator[Tuple[K, V]]

__init__(view, task_mapping, pop=False)
Parameters:
Return type:

None

__iter__()
Return type:

Iterator[T]

async _await()
Return type:

List[T]

_get_from_item(item)
aiterbykeys(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

aiterbyvalues(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

_pop: bool = False
class a_sync.task.TaskMappingKeys[source]

Bases: _TaskMappingView[K, K, V], Generic[K, V]

async __aiter__()[source]
Return type:

AsyncIterator[K]

__init__(view, task_mapping, pop=False)
Parameters:
Return type:

None

__iter__()
Return type:

Iterator[T]

__load_existing()
Return type:

Iterator[K]

async __load_init_loader(yielded)
Parameters:

yielded (Set[K])

Return type:

AsyncIterator[K]

async _await()
Return type:

List[T]

_get_from_item(item)
aiterbykeys(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

aiterbyvalues(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

_pop: bool = False
class a_sync.task.TaskMappingValues[source]

Bases: _TaskMappingView[V, K, V], Generic[K, V]

async __aiter__()[source]
Return type:

AsyncIterator[V]

__init__(view, task_mapping, pop=False)
Parameters:
Return type:

None

__iter__()
Return type:

Iterator[T]

async _await()
Return type:

List[T]

_get_from_item(item)
aiterbykeys(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

aiterbyvalues(reverse=False)
Parameters:

reverse (bool)

Return type:

ASyncIterator[T]

_pop: bool = False
a_sync.task.create_task(coro, *, name=None, skip_gc_until_done=False, log_destroy_pending=True)[source]

Extends asyncio.create_task to support any Awaitable, manage task lifecycle, and enhance error handling.

Unlike asyncio.create_task, which requires a coroutine, this function accepts any Awaitable, ensuring broader compatibility. It optionally prevents the task from being garbage-collected until completion and provides enhanced error management by wrapping exceptions in a custom exception.

Parameters:
  • coro (Awaitable[T]) – An Awaitable object from which to create the task.

  • name (str | None) – Optional name for the task, aiding in debugging.

  • skip_gc_until_done (bool) – If True, the task is kept alive until it completes, preventing garbage collection.

  • log_destroy_pending (bool) – If False, asyncio’s default error log when a pending task is destroyed is suppressed.

Returns:

An asyncio.Task object created from the provided Awaitable.

Return type:

Task[T]

Module contents

class a_sync.ASyncCachedPropertyDescriptor[source]

Bases: _ASyncPropertyDescriptorBase[I, T], AsyncCachedPropertyDescriptor

__init__(_fget, _fset=None, _fdel=None, field_name=None, **modifiers)[source]
Parameters:
Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

async _any(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

_asyncify(func)

Applies only async modifiers.

Parameters:

func (Callable[[~P], T])

Return type:

Callable[[~P], Awaitable[T]]

_check_method_name(method, method_type)
_check_method_sync(method, method_type)
async _max(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _min(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _sum(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

already_loaded(instance)
del_cache_value(instance)
deleter(method)
async get(instance, owner=None)
Parameters:
  • instance (I)

  • owner (Type[I] | None)

Return type:

T

get_cache(instance)
get_cache_value(instance)
get_instance_state(instance)
get_loader(instance)[source]
Parameters:

instance (I)

Return type:

Callable[[], T]

get_lock(instance)[source]
Parameters:

instance (I)

Return type:

Task[T]

has_cache_value(instance)
map(instances, owner=None, concurrency=None, name='')
Parameters:
Return type:

TaskMapping[I, T]

not_loaded(instance)
pop_lock(instance)[source]
Parameters:

instance (I)

Return type:

None

set_cache_value(instance, value)
setter(method)
property _await: Callable[[Awaitable[T]], T]

Applies only sync modifiers.

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property default: Literal['sync', 'async', None]
field_name
hidden_method_descriptor: HiddenMethodDescriptor[T]
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
modifiers: ModifierManager
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
wrapped
class a_sync.ASyncGenericSingleton[source]

Bases: ASyncGenericBase

Subclass this class if you want Singleton-esque ASync objects. They work kind of like a typical Singleton would, but you will have two instances instead of one: - one sync instance - one async instance

class a_sync.ASyncIterable[source]

Bases: _AwaitableAsyncIterableMixin[T], Iterable[T]

Description:

A hybrid Iterable/AsyncIterable implementation designed to offer dual compatibility with both synchronous and asynchronous iteration protocols. This class allows objects to be iterated over using either a standard for loop or an async for loop, making it versatile in scenarios where the mode of iteration (synchronous or asynchronous) needs to be flexible or is determined at runtime.

The class achieves this by implementing both __iter__ and __aiter__ methods, enabling it to return appropriate iterator objects that can handle synchronous and asynchronous iteration, respectively. This dual functionality is particularly useful in codebases that are transitioning between synchronous and asynchronous code, or in libraries that aim to support both synchronous and asynchronous usage patterns without requiring the user to manage different types of iterable objects.

__aiter__()

Returns an async iterator for the wrapped async iterable.

Return type:

AsyncIterator[T]

__init__(async_iterable)[source]

Initializes the ASyncIterable with an async iterable.

Parameters:

async_iterable (AsyncIterable[T])

__iter__()[source]

Returns an iterator for the wrapped async iterable.

Return type:

Iterator[T]

filter(function)
Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool])

Return type:

ASyncFilter[T]

sort(*, key=None, reverse=False)
Parameters:
Return type:

ASyncSorter[T]

classmethod wrap(wrapped)[source]

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

_materialized

Asynchronously iterates through all contents of Self and returns a list containing the results.

property materialized: List[T]

Iterates through all contents of Self and returns a list containing the results.

class a_sync.ASyncIterator[source]

Bases: _AwaitableAsyncIterableMixin[T], Iterator[T]

Description:

A hybrid Iterator/AsyncIterator implementation that bridges the gap between synchronous and asynchronous iteration. This class provides a unified interface for iteration that can seamlessly operate in both synchronous (for loop) and asynchronous (async for loop) contexts. It allows the wrapping of asynchronous iterable objects or async generator functions, making them usable in synchronous code without explicitly managing event loops or asynchronous context switches.

By implementing both __next__ and __anext__ methods, ASyncIterator enables objects to be iterated using standard iteration protocols while internally managing the complexities of asynchronous iteration. This design simplifies the use of asynchronous iterables in environments or frameworks that are not inherently asynchronous, such as standard synchronous functions or older codebases being gradually migrated to asynchronous IO.

This class is particularly useful for library developers seeking to provide a consistent iteration interface across synchronous and asynchronous code, reducing the cognitive load on users and promoting code reusability and simplicity.

__aiter__()

Returns an async iterator for the wrapped async iterable.

Return type:

AsyncIterator[T]

async __anext__()[source]

Asynchronously fetches the next item from the async iterator.

Return type:

T

__init__(async_iterator)[source]

Initializes the ASyncIterator with an async iterator.

Parameters:

async_iterator (AsyncIterator[T])

__iter__()
__next__()[source]

Return the next item from the iterator. When exhausted, raise StopIteration

Return type:

T

filter(function)
Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool])

Return type:

ASyncFilter[T]

sort(*, key=None, reverse=False)
Parameters:
Return type:

ASyncSorter[T]

classmethod wrap(wrapped)[source]

Class method to wrap either an AsyncIterator or an async generator function.

_materialized

Asynchronously iterates through all contents of Self and returns a list containing the results.

property materialized: List[T]

Iterates through all contents of Self and returns a list containing the results.

class a_sync.ASyncPropertyDescriptor[source]

Bases: _ASyncPropertyDescriptorBase[I, T], AsyncPropertyDescriptor

__init__(_fget, field_name=None, **modifiers)
Parameters:
Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

async _any(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

_asyncify(func)

Applies only async modifiers.

Parameters:

func (Callable[[~P], T])

Return type:

Callable[[~P], Awaitable[T]]

async _max(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _min(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _sum(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

awaitable_only(instance)
async get(instance, owner=None)
Parameters:
  • instance (I)

  • owner (Type[I] | None)

Return type:

T

get_loader(instance)
map(instances, owner=None, concurrency=None, name='')
Parameters:
Return type:

TaskMapping[I, T]

property _await: Callable[[Awaitable[T]], T]

Applies only sync modifiers.

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property default: Literal['sync', 'async', None]
field_name
hidden_method_descriptor: HiddenMethodDescriptor[T]
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
modifiers: ModifierManager
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
wrapped
class a_sync.AsyncProcessPoolExecutor[source]

Bases: _AsyncExecutorMixin, ProcessPoolExecutor

__init__(max_workers=None, mp_context=None, initializer=None, initargs=())[source]
Parameters:
  • max_workers (int | None)

  • mp_context (BaseContext | None)

  • initializer (Callable[[...], object] | None)

  • initargs (Tuple[Any, ...])

Return type:

None

_adjust_process_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item

Parameters:

fut (Future)

Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_launch_processes()
_spawn_process()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_start_executor_manager_thread()
_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. If set to one, the items in the list will be sent one at a time.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args) Doesn’t await this_executor.run(fn, *args) look so much better?

Oh, and you can also use kwargs!

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

T

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

Future[T]

_broken
_call_queue
_daemon
_initargs
_initializer
_max_workers: int
_mp_context
_pending_work_items
_processes
_queue_count
_queue_management_thread
_queue_management_thread_wakeup
_result_queue
_shutdown_lock
_shutdown_thread
_work_ids
_workers: str = 'processes'
property debug_logs_enabled: bool
property logger: Logger
property sync_mode: bool
property worker_count_current: int
class a_sync.AsyncThreadPoolExecutor[source]

Bases: _AsyncExecutorMixin, ThreadPoolExecutor

__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=())[source]
Parameters:
Return type:

None

_adjust_thread_count()
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item

Parameters:

fut (Future)

Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_initializer_failed()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args) Doesn’t await this_executor.run(fn, *args) look so much better?

Oh, and you can also use kwargs!

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

T

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

Future[T]

_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_daemon
_idle_semaphore
_initargs
_initializer
_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_work_queue
_workers: str = 'threads'
property debug_logs_enabled: bool
property logger: Logger
property sync_mode: bool
property worker_count_current: int
class a_sync.CounterLock[source]

Bases: _DebugDaemonMixin

A asyncio primative that blocks until the internal counter has reached a specific value.

counter = CounterLock() A coroutine can now await counter.wait_for(3) and it will block until the internal counter >= 3. Now if some other task executes counter.value = 5 or counter.set(5), the first coroutine will unblock as 5 >= 3.

The internal counter can only increase.

__init__(start_value=0, name=None)[source]
Parameters:
  • start_value (int)

  • name (str | None)

async _debug_daemon()[source]
Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

set(value)[source]
Parameters:

value (int)

Return type:

None

async wait_for(value)[source]
Parameters:

value (int)

Return type:

bool

_daemon
_events: DefaultDict[int, Event]
_name
_value
property debug_logs_enabled: bool
is_ready
property logger: Logger
property value: int
class a_sync.Event[source]

Bases: Event, _DebugDaemonMixin

asyncio.Event but with some additional debug logging to help detect deadlocks.

__init__(name='', debug_daemon_interval=300, *, loop=None)[source]
Parameters:
  • name (str)

  • debug_daemon_interval (int)

  • loop (AbstractEventLoop | None)

async _debug_daemon()[source]
Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

clear()[source]

Reset the internal flag to false. Subsequently, coroutines calling wait() will block until set() is called to set the internal flag to true again.

is_set()[source]

Return True if and only if the internal flag is true.

set()[source]

Set the internal flag to true. All coroutines waiting for it to become true are awakened. Coroutine that call wait() once the flag is true will not block at all.

async wait()[source]

Block until the internal flag is true.

If the internal flag is true on entry, return True immediately. Otherwise, block until another coroutine calls set() to set the flag to true, then return True.

Return type:

Literal[True]

_daemon
_debug_daemon_interval
_loop: AbstractEventLoop = None
_value: bool
_waiters: Deque[Future[None]]
property debug_logs_enabled: bool
property logger: Logger
class a_sync.PrioritySemaphore[source]

Bases: _AbstractPrioritySemaphore[int | float | Decimal, _PrioritySemaphoreContextManager]

_context_manager_class

alias of _PrioritySemaphoreContextManager

__call__(fn)

Call self as a function.

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__getitem__(priority)
Parameters:

priority (PT | None)

Return type:

_AbstractPrioritySemaphoreContextManager[PT]

__init__(value=1, *, name=None)

name is used only in debug logs to provide useful context

Parameters:
  • value (int)

  • name (str | None)

Return type:

None

_count_waiters()
Return type:

Dict[PT, int]

async _debug_daemon()
Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

Return type:

None

async acquire()

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)
Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

Return type:

bool

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_capacity
_context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
_daemon
_decorated: Set[str]
_loop = None
_potential_lost_waiters: List[Future[None]]
_top_priority = -1

It’s kinda like a regular Semaphore but you must give each waiter a priority:

``` priority_semaphore = PrioritySemaphore(10)

async with priority_semaphore[priority]:

await do_stuff()

```

You can aenter and aexit this semaphore without a priority and it will process those first. Like so:

``` priority_semaphore = PrioritySemaphore(10)

async with priority_semaphore:

await do_stuff()

```

_value: int
_waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
property debug_logs_enabled: bool
property logger: Logger
name: str | None
a_sync.ProcessPoolExecutor

alias of AsyncProcessPoolExecutor

class a_sync.ProcessingQueue[source]

Bases: _Queue[Tuple[P, asyncio.Future[V]]], Generic[P, V]

__call__(*args, **kwargs)[source]

Call self as a function.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, return_data=True, name='', loop=None)[source]
Parameters:
Return type:

None

async __worker_coro()
Return type:

NoReturn

_create_future()[source]
Return type:

Future[V]

_ensure_workers()[source]
Return type:

None

_format()
_get()
_get_loop()
_init(maxsize)
_put(item)
_wakeup_next(waiters)
close()[source]
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

put_nowait(*args, **kwargs)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False
_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]
func
property maxsize

Number of items allowed in the queue.

property name: str
num_workers
class a_sync.PruningThreadPoolExecutor[source]

Bases: AsyncThreadPoolExecutor

This ThreadPoolExecutor implementation prunes inactive threads after ‘timeout’ seconds without a work item. Pruned threads will be automatically recreated as needed for future workloads. up to ‘max_threads’ can be active at any one time.

__init__(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), timeout=600)[source]
_adjust_thread_count()[source]
async _debug_daemon(fut, fn, *args, **kwargs)

Runs until manually cancelled by the finished work item

Parameters:

fut (Future)

Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_initializer_failed()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

map(fn, *iterables, timeout=None, chunksize=1)

Returns an iterator equivalent to map(fn, iter).

Parameters:
  • fn – A callable that will take as many arguments as there are passed iterables.

  • timeout – The maximum number of seconds to wait. If None, then there is no limit on the wait time.

  • chunksize – The size of the chunks the iterable will be broken into before being passed to a child process. This argument is only used by ProcessPoolExecutor; it is ignored by ThreadPoolExecutor.

Returns:

map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

An iterator equivalent to

Raises:
  • TimeoutError – If the entire result iterator could not be generated before the given timeout.

  • Exception – If fn(*args) raises for any values.

async run(fn, *args, **kwargs)

A shorthand way to call await asyncio.get_event_loop().run_in_executor(this_executor, fn, *args) Doesn’t await this_executor.run(fn, *args) look so much better?

Oh, and you can also use kwargs!

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

T

shutdown(wait=True, *, cancel_futures=False)

Clean-up the resources associated with the Executor.

It is safe to call this method several times. Otherwise, no other methods can be called after this one.

Parameters:
  • wait – If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed.

  • cancel_futures – If True then shutdown will cancel all pending futures. Futures that are completed or running will not be cancelled.

submit(fn, *args, **kwargs)

Submits a job to the executor and returns an asyncio.Future that can be awaited for the result without blocking.

Parameters:
  • fn (Callable[[~P], T])

  • args (~P)

  • kwargs (~P)

Return type:

Future[T]

_adjusting_lock
_broken
_counter = <method-wrapper '__next__' of itertools.count object>
_daemon
_idle_semaphore
_initargs
_initializer
_max_workers: int
_shutdown
_shutdown_lock
_thread_name_prefix
_threads
_timeout
_work_queue
_workers: str = 'threads'
property debug_logs_enabled: bool
property logger: Logger
property sync_mode: bool
property worker_count_current: int
class a_sync.Queue[source]

Bases: _Queue[T]

__init__(maxsize=0, *, loop=<object object>)[source]
_format()[source]
_get()[source]
_get_loop()
_init(maxsize)[source]
_put(item)[source]
_wakeup_next(waiters)[source]
empty()[source]

Return True if the queue is empty, False otherwise.

full()[source]

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()[source]

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Return type:

T

async get_all()[source]

returns 1 or more items

Return type:

List[T]

get_all_nowait()[source]

returns 1 or more items, or raises asyncio.QueueEmpty

Return type:

List[T]

async get_multi(i, can_return_less=False)[source]
Parameters:
Return type:

List[T]

get_multi_nowait(i, can_return_less=False)[source]

Just like asyncio.Queue.get_nowait, but will return i items instead of 1. Set can_return_less to True if you want to receive up to i items.

Parameters:
Return type:

List[T]

get_nowait()[source]

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

Return type:

T

async join()[source]

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(item)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:

item (T)

Return type:

None

put_nowait(item)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:

item (T)

Return type:

None

qsize()[source]

Number of items in the queue.

task_done()[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
property maxsize

Number of items allowed in the queue.

class a_sync.Semaphore[source]

Bases: Semaphore, _DebugDaemonMixin

__call__(fn)[source]

Call self as a function.

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(value, name=None, **kwargs)[source]

name is used only in debug logs to provide useful context

Parameters:

value (int)

Return type:

None

async _debug_daemon()[source]
Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

_wake_up_next()[source]

Wake up the first waiter that isn’t done.

async acquire()[source]

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)[source]
Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()[source]

Returns True if semaphore cannot be acquired immediately.

release()[source]

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_daemon
_decorated: Set[str]
_loop = None
_value
_waiters
property debug_logs_enabled: bool
property logger: Logger
name
class a_sync.SmartProcessingQueue[source]

Bases: _VariablePriorityQueueMixin[T], ProcessingQueue[Concatenate[T, P], V]

A PriorityProcessingQueue subclass that will execute jobs with the most waiters first

__call__(*args, **kwargs)

Call self as a function.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, name='', loop=None)[source]
Parameters:
Return type:

None

async __worker_coro()
Return type:

NoReturn

_create_future(key)[source]
Parameters:

key (Tuple[Tuple[Any], Tuple[Tuple[str, Any]]])

Return type:

Future[V]

_ensure_workers()
Return type:

None

_format()
_get()[source]

Resort the heap to consider any changes in priorities and pop the smallest value

_get_key(*args, **kwargs)
Return type:

Tuple[Tuple[Any], Tuple[Tuple[str, Any]]]

_get_loop()
_init(maxsize)
_put(item, heappush=<built-in function heappush>)
_wakeup_next(waiters)
close()
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

SmartFuture[V]

put_nowait(*args, **kwargs)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

SmartFuture[V]

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False
_finished
_futs: WeakValueDictionary[Tuple[Tuple[Any], Tuple[Tuple[str, Any]]], SmartFuture[T]]
_getters
_loop = None
_maxsize
_no_futs = False
_putters
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]
func
property maxsize

Number of items allowed in the queue.

property name: str
num_workers
class a_sync.TaskMapping[source]

Bases: DefaultDict[K, asyncio.Task[V]], AsyncIterable[Tuple[K, V]]

A mapping from keys to asyncio Tasks that asynchronously generates and manages tasks based on input iterables.

async __aiter__(pop=False)[source]

aiterate thru all key-task pairs, yielding the key-result pair as each task completes

Parameters:

pop (bool)

Return type:

AsyncIterator[Tuple[K, V]]

__cleanup(t)
Parameters:

t (Task[None])

Return type:

None

__getitem__(item)[source]

x.__getitem__(y) <==> x[y]

Parameters:

item (K)

Return type:

Task[V]

__init__(wrapped_func=None, *iterables, name='', concurrency=None, **wrapped_func_kwargs)[source]
Parameters:
  • wrapped_func (Callable[[Concatenate[K, ~P]], Awaitable[V]] | None) – A function that takes a key (and optional parameters) and returns an Awaitable.

  • *iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]]) – Any number of iterables whose elements will be used as keys for task generation.

  • name (str) – An optional name for the tasks created by this mapping.

  • **wrapped_func_kwargs (~P) – Keyword arguments that will be passed to wrapped_func.

  • concurrency (int | None)

  • **wrapped_func_kwargs

Return type:

None

__iter__()

Implement iter(self).

__new__(**kwargs)
_if_pop_check_destroyed(pop)[source]
Parameters:

pop (bool)

Return type:

None

async _if_pop_clear(pop)[source]
Parameters:

pop (bool)

Return type:

None

_raise_if_empty(msg='')[source]
Parameters:

msg (str)

Return type:

None

_raise_if_not_empty()[source]
Return type:

None

_tasks_for_iterables(*iterables)[source]

Ensure tasks are running for each key in the provided iterables.

Parameters:

iterables (AsyncIterable[K] | Iterable[K] | Awaitable[AsyncIterable[K] | Iterable[K]])

Return type:

AsyncIterator[Tuple[K, Task[V]]]

async _wait_for_next_key()[source]
Return type:

None

clear() None.  Remove all items from D.[source]
Parameters:

cancel (bool)

Return type:

None

async close()[source]
Return type:

None

copy() a shallow copy of D.
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

keys() a set-like object providing a view on D's keys[source]
Parameters:

pop (bool)

Return type:

TaskMappingKeys[K, V]

map(*iterables, pop=True, yields='both')[source]

Asynchronously map iterables to tasks and yield their results.

Args: *iterables: Iterables to map over. pop: Whether to remove tasks from the internal storage once they are completed. yields: Whether to yield ‘keys’, ‘values’, or ‘both’ (key-value pairs).

Yields: Depending on yields, either keys, values, or tuples of key-value pairs representing the results of completed tasks.

Parameters:
Return type:

AsyncIterator[Tuple[K, V]]

pop(k[, d]) v, remove specified key and return the corresponding value.[source]

If the key is not found, return the default if given; otherwise, raise a KeyError.

Parameters:
  • args (K)

  • cancel (bool)

Return type:

Task[V] | Future[V]

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values[source]
Parameters:

pop (bool)

Return type:

TaskMappingValues[K, V]

yield_completed(pop=True)[source]

Asynchronously yield tuples of key-value pairs representing the results of any completed tasks.

Parameters:

pop (bool) – Whether to remove tasks from the internal storage once they are completed.

Yields:

Tuples of key-value pairs representing the results of completed tasks.

Return type:

AsyncIterator[Tuple[K, V]]

__init_loader_coro: Awaitable[None] | None = None

An optional asyncio Coroutine to be run by the _init_loader

_destroyed: bool = False

Boolean indicating whether his mapping has been consumed and is no longer usable for aggregations.

property _init_loader: Task[None] | None

An asyncio Task used to preload values from the iterables.

_init_loader_next: Callable[[], Awaitable[Tuple[Tuple[K, Task[V]]]]] | None = None

A coro function that blocks until the _init_loader starts a new task(s), and then returns a Tuple[Tuple[K, asyncio.Task[V]]] with all of the new tasks and the keys that started them.

_name: str | None = None

Optional name for tasks created by this mapping.

_next: Event = None

An asyncio Event that indicates the next result is ready

property _queue: ProcessingQueue
_wrapped_func

The function used to create tasks for each key.

_wrapped_func_kwargs: Dict[str, Any] = {}

Additional keyword arguments passed to _wrapped_func.

all[source]
Parameters:

pop (bool)

Return type:

bool

any[source]
Parameters:

pop (bool)

Return type:

bool

concurrency: int | None = None

The max number of tasks that will run at one time.

default_factory

Factory for default value called by __missing__().

gather[source]

Wait for all tasks to complete and return a dictionary of the results.

Parameters:
Return type:

Dict[K, V]

max[source]
Parameters:

pop (bool)

Return type:

V

min[source]
Parameters:

pop (bool)

Return type:

V

sum[source]
Parameters:

pop (bool)

Return type:

V

a_sync.ThreadPoolExecutor

alias of AsyncThreadPoolExecutor

class a_sync.ThreadsafeSemaphore[source]

Bases: Semaphore

While its a bit weird to run multiple event loops, sometimes either you or a lib you’re using must do so. When in use in threaded applications, this semaphore will not work as intended but at least your program will function. You may need to reduce the semaphore value for multi-threaded applications.

# TL;DR it’s a janky fix for an edge case problem and will otherwise function as a normal a_sync.Semaphore (which is just an asyncio.Semaphore with extra bells and whistles).

__call__(fn)

Call self as a function.

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(value, name=None)[source]

name is used only in debug logs to provide useful context

Parameters:
  • value (int | None)

  • name (str | None)

Return type:

None

async _debug_daemon()
Return type:

None

_ensure_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)
Return type:

Future[None]

_stop_debug_daemon(t=None)
Parameters:

t (Task | None)

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

async acquire()

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)
Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_daemon
_decorated: Set[str]
_loop = None
_value
_waiters
property debug_logs_enabled: bool
dummy
property logger: Logger
name
property semaphore: Semaphore
semaphores: DefaultDict[Thread, Semaphore]
property use_dummy: bool
class a_sync.cached_property[source]

Bases: ASyncCachedPropertyDescriptor[I, T]

__init__(_fget, _fset=None, _fdel=None, field_name=None, **modifiers)
Parameters:
Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

async _any(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

_asyncify(func)

Applies only async modifiers.

Parameters:

func (Callable[[~P], T])

Return type:

Callable[[~P], Awaitable[T]]

_check_method_name(method, method_type)
_check_method_sync(method, method_type)
async _max(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _min(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _sum(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

already_loaded(instance)
del_cache_value(instance)
deleter(method)
async get(instance, owner=None)
Parameters:
  • instance (I)

  • owner (Type[I] | None)

Return type:

T

get_cache(instance)
get_cache_value(instance)
get_instance_state(instance)
get_loader(instance)
Parameters:

instance (I)

Return type:

Callable[[], T]

get_lock(instance)
Parameters:

instance (I)

Return type:

Task[T]

has_cache_value(instance)
map(instances, owner=None, concurrency=None, name='')
Parameters:
Return type:

TaskMapping[I, T]

not_loaded(instance)
pop_lock(instance)
Parameters:

instance (I)

Return type:

None

set_cache_value(instance, value)
setter(method)
property _await: Callable[[Awaitable[T]], T]

Applies only sync modifiers.

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property default: Literal['sync', 'async', None]
field_name
hidden_method_descriptor: HiddenMethodDescriptor[T]
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
modifiers: ModifierManager
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
wrapped
a_sync.filter

alias of ASyncFilter

a_sync.map

alias of TaskMapping

class a_sync.property[source]

Bases: ASyncPropertyDescriptor[I, T]

__init__(_fget, field_name=None, **modifiers)
Parameters:
Return type:

None

async _all(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

async _any(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

bool

_asyncify(func)

Applies only async modifiers.

Parameters:

func (Callable[[~P], T])

Return type:

Callable[[~P], Awaitable[T]]

async _max(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _min(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

async _sum(*instances, concurrency=None, name='', **kwargs)
Parameters:
Return type:

T

awaitable_only(instance)
async get(instance, owner=None)
Parameters:
  • instance (I)

  • owner (Type[I] | None)

Return type:

T

get_loader(instance)
map(instances, owner=None, concurrency=None, name='')
Parameters:
Return type:

TaskMapping[I, T]

property _await: Callable[[Awaitable[T]], T]

Applies only sync modifiers.

property all: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property any: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], bool]
property default: Literal['sync', 'async', None]
field_name
hidden_method_descriptor: HiddenMethodDescriptor[T]
hidden_method_name
property max: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
property min: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
modifiers: ModifierManager
property sum: ASyncFunction[Concatenate[AsyncIterable[I] | Iterable[I], P], T]
wrapped
a_sync.sorted

alias of ASyncSorter

a_sync.a_sync(coro_fn=None, default=None, **modifiers)[source]
Parameters:
Return type:

ASyncDecorator | ASyncFunction[~P, T]

async a_sync.all(*awaitables)[source]
Return type:

bool

async a_sync.any(*awaitables)[source]
Return type:

bool

a_sync.as_completed(fs, *, timeout=None, return_exceptions=False, aiter=False, tqdm=False, **tqdm_kwargs)[source]

Concurrently awaits a list of awaitable objects or mappings of awaitables and returns an iterator of results.

This function extends Python’s asyncio.as_completed, providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables.

Differences from asyncio.as_completed: - Uses type hints for use with static type checkers. - Supports either individual awaitables or a k:v mapping of awaitables. - Can be used as an async iterator which yields the result values. Example below. - Provides progress reporting using tqdm if ‘tqdm’ is set to True.

Parameters:
  • fs (Iterable[Awaitable[T] or Mapping[K, Awaitable[V]]]) – The awaitables to await concurrently. It can be a list of individual awaitables or a mapping of awaitables.

  • timeout (float, optional) – The maximum time, in seconds, to wait for the completion of awaitables. Defaults to None (no timeout).

  • return_exceptions (bool, optional) – If True, exceptions are returned as results instead of raising them. Defaults to False.

  • aiter (bool, optional) – If True, returns an async iterator of results. Defaults to False.

  • tqdm (bool, optional) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs – Additional keyword arguments for tqdm if progress reporting is enabled.

Returns:

An iterator of results when awaiting individual awaitables or an async iterator when awaiting mappings.

Return type:

Iterator[Coroutine[Any, Any, T] or ASyncIterator[Tuple[K, V]]]

Examples

Awaiting individual awaitables: ``` awaitables = [async_function1(), async_function2()] for coro in as_completed(awaitables):

val = await coro …

async for val in as_completed(awaitables, aiter=True):

```

Awaiting mappings of awaitables: ``` mapping = {‘key1’: async_function1(), ‘key2’: async_function2()}

for coro in as_completed(mapping):

k, v = await coro …

async for k, v in as_completed(mapping, aiter=True):

```

async a_sync.as_yielded(*iterators)[source]
Description:

Merges multiple async iterators into a single async iterator that yields items as they become available from any of the source iterators.

This function is designed to streamline the handling of multiple asynchronous data streams by consolidating them into a single asynchronous iteration context. It enables concurrent fetching and processing of items from multiple sources, improving efficiency and simplifying code structure when dealing with asynchronous operations.

The merging process is facilitated by internally managing a queue where items from the source iterators are placed as they are fetched. This mechanism ensures that the merged stream of items is delivered in an order determined by the availability of items from the source iterators, rather than their original sequence.

Parameters:

*iterators (AsyncIterator[T]) – Variable length list of AsyncIterator objects to be merged.

Returns:

An async iterator that yields items from the input async iterators as they become available.

Return type:

AsyncIterator[T]

Note

This implementation leverages asyncio tasks and queues to efficiently manage the asynchronous iteration and merging process. It handles edge cases such as early termination and exception management, ensuring robustness and reliability.

a_sync.create_task(coro, *, name=None, skip_gc_until_done=False, log_destroy_pending=True)[source]

Extends asyncio.create_task to support any Awaitable, manage task lifecycle, and enhance error handling.

Unlike asyncio.create_task, which requires a coroutine, this function accepts any Awaitable, ensuring broader compatibility. It optionally prevents the task from being garbage-collected until completion and provides enhanced error management by wrapping exceptions in a custom exception.

Parameters:
  • coro (Awaitable[T]) – An Awaitable object from which to create the task.

  • name (str | None) – Optional name for the task, aiding in debugging.

  • skip_gc_until_done (bool) – If True, the task is kept alive until it completes, preventing garbage collection.

  • log_destroy_pending (bool) – If False, asyncio’s default error log when a pending task is destroyed is suppressed.

Returns:

An asyncio.Task object created from the provided Awaitable.

Return type:

Task[T]

async a_sync.gather(*awaitables, return_exceptions=False, exclude_if=None, tqdm=False, **tqdm_kwargs)[source]

Concurrently awaits a list of awaitable objects or mappings of awaitables and returns the results.

This function extends Python’s asyncio.gather, providing additional features for mixed use cases of individual awaitable objects and mappings of awaitables.

Differences from asyncio.gather: - Uses type hints for use with static type checkers. - Supports gathering either individual awaitables or a k:v mapping of awaitables. - Provides progress reporting using tqdm if ‘tqdm’ is set to True.

Parameters:
  • *awaitables (Awaitable[T] | Mapping[K, Awaitable[V]]) – The awaitables to await concurrently. It can be a single awaitable or a mapping of awaitables.

  • return_exceptions (bool, optional) – If True, exceptions are returned as results instead of raising them. Defaults to False.

  • tqdm (bool, optional) – If True, enables progress reporting using tqdm. Defaults to False.

  • **tqdm_kwargs – Additional keyword arguments for tqdm if progress reporting is enabled.

  • exclude_if (Callable[[T], bool] | None)

Returns:

A list of results when awaiting individual awaitables or a dictionary of results when awaiting mappings.

Return type:

Union[List[T], Dict[K, V]]

Examples

Awaiting individual awaitables: results will be a list containing the result of each awaitable in sequential order ` results = await gather(thing1(), thing2()) `

Awaiting mappings of awaitables results will be a dictionary with ‘key1’ mapped to the result of thing1() and ‘key2’ mapped to the result of thing2. ` mapping = {'key1': thing1(), 'key2': thing2()} results = await gather(mapping) `