a_sync.primitives package

Subpackages

Submodules

a_sync.primitives._debug module

This module provides a mixin class used to provide helpful information to developers during the debugging process.

The mixin ensures that rich debug logs are automagically emitted from subclass instances whenever debug logging is enabled.

class a_sync.primitives._debug._DebugDaemonMixin[source]

Bases: _LoggerMixin

A mixin class that provides debugging capabilities using a daemon task.

This mixin ensures that rich debug logs are automagically emitted from subclass instances whenever debug logging is enabled.

abstract async _debug_daemon(fut, fn, *args, **kwargs)[source]

Abstract method to define the debug daemon’s behavior.

Parameters:
  • fut (Future) – The future associated with the daemon.

  • fn – The function to be debugged.

  • *args – Positional arguments for the function.

  • **kwargs – Keyword arguments for the function.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)[source]

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_start_debug_daemon(*args, **kwargs)[source]

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)[source]

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_daemon
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

a_sync.primitives._loggable module

This module provides a mixin class to add debug logging capabilities to other classes.

class a_sync.primitives._loggable._LoggerMixin[source]

Bases: object

A mixin class that adds logging capabilities to other classes.

This mixin provides a cached property for accessing a logger instance and a property to check if debug logging is enabled.

property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

a_sync.primitives.queue module

This module provides various queue implementations for managing asynchronous tasks, including standard FIFO queues, priority queues, and processing queues. These queues support advanced features like waiting for multiple items, handling priority tasks, and processing tasks with multiple workers.

class a_sync.primitives.queue.PriorityProcessingQueue[source]

Bases: _PriorityQueueMixin[T], ProcessingQueue[T, V]

__call__(*args, **kwargs)

Call self as a function.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, return_data=True, name='', loop=None)
Parameters:
Return type:

None

_create_future()
Return type:

Future[V]

_ensure_workers()
Return type:

None

_format()
_get(heappop=<built-in function heappop>)[source]
_get_loop()
_init(maxsize)
_put(item, heappush=<built-in function heappush>)
_wakeup_next(waiters)
close()
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(priority, *args, **kwargs)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:
  • priority (Any)

  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

put_nowait(priority, *args, **kwargs)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:
  • priority (Any)

  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False
_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]
func
property maxsize

Number of items allowed in the queue.

property name: str
num_workers
class a_sync.primitives.queue.ProcessingQueue[source]

Bases: _Queue[Tuple[P, asyncio.Future[V]]], Generic[P, V]

__call__(*args, **kwargs)[source]

Call self as a function.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, return_data=True, name='', loop=None)[source]
Parameters:
Return type:

None

async __worker_coro()
Return type:

NoReturn

_create_future()[source]
Return type:

Future[V]

_ensure_workers()[source]
Return type:

None

_format()
_get()
_get_loop()
_init(maxsize)
_put(item)
_wakeup_next(waiters)
close()[source]
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

put_nowait(*args, **kwargs)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False
_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]
func
property maxsize

Number of items allowed in the queue.

property name: str
num_workers
class a_sync.primitives.queue.Queue[source]

Bases: _Queue[T]

__init__(maxsize=0, *, loop=<object object>)[source]
_format()[source]
_get()[source]
_get_loop()
_init(maxsize)[source]
_put(item)[source]
_wakeup_next(waiters)[source]
empty()[source]

Return True if the queue is empty, False otherwise.

full()[source]

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()[source]

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Return type:

T

async get_all()[source]

returns 1 or more items

Return type:

List[T]

get_all_nowait()[source]

returns 1 or more items, or raises asyncio.QueueEmpty

Return type:

List[T]

async get_multi(i, can_return_less=False)[source]
Parameters:
Return type:

List[T]

get_multi_nowait(i, can_return_less=False)[source]

Just like asyncio.Queue.get_nowait, but will return i items instead of 1. Set can_return_less to True if you want to receive up to i items.

Parameters:
Return type:

List[T]

get_nowait()[source]

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

Return type:

T

async join()[source]

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(item)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:

item (T)

Return type:

None

put_nowait(item)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:

item (T)

Return type:

None

qsize()[source]

Number of items in the queue.

task_done()[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
property maxsize

Number of items allowed in the queue.

class a_sync.primitives.queue.SmartProcessingQueue[source]

Bases: _VariablePriorityQueueMixin[T], ProcessingQueue[Concatenate[T, P], V]

A PriorityProcessingQueue subclass that will execute jobs with the most waiters first

__call__(*args, **kwargs)

Call self as a function.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, name='', loop=None)[source]
Parameters:
Return type:

None

async __worker_coro()
Return type:

NoReturn

_create_future(key)[source]
Parameters:

key (Tuple[Tuple[Any], Tuple[Tuple[str, Any]]])

Return type:

Future[V]

_ensure_workers()
Return type:

None

_format()
_get()[source]

Resort the heap to consider any changes in priorities and pop the smallest value

_get_key(*args, **kwargs)
Return type:

Tuple[Tuple[Any], Tuple[Tuple[str, Any]]]

_get_loop()
_init(maxsize)
_put(item, heappush=<built-in function heappush>)
_wakeup_next(waiters)
close()
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

SmartFuture[V]

put_nowait(*args, **kwargs)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

SmartFuture[V]

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False
_finished
_futs: WeakValueDictionary[Tuple[Tuple[Any], Tuple[Tuple[str, Any]]], SmartFuture[T]]
_getters
_loop = None
_maxsize
_no_futs = False
_putters
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]
func
property maxsize

Number of items allowed in the queue.

property name: str
num_workers
class a_sync.primitives.queue.VariablePriorityQueue[source]

Bases: _VariablePriorityQueueMixin[T], PriorityQueue

A PriorityQueue subclass that allows priorities to be updated (or computed) on the fly

__init__(maxsize=0, *, loop=<object object>)
_format()
_get(heapify=<built-in function heapify>, heappop=<built-in function heappop>)

Resort the heap to consider any changes in priorities and pop the smallest value

_get_key(*args, **kwargs)
Return type:

Tuple[Tuple[Any], Tuple[Tuple[str, Any]]]

_get_loop()
_init(maxsize)
_put(item, heappush=<built-in function heappush>)
_wakeup_next(waiters)
empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(item)

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

put_nowait(item)

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_loop = None
property maxsize

Number of items allowed in the queue.

class a_sync.primitives.queue._PriorityQueueMixin[source]

Bases: Generic[T]

_get(heappop=<built-in function heappop>)[source]
_init(maxsize)[source]
_put(item, heappush=<built-in function heappush>)[source]
class a_sync.primitives.queue._Queue[source]

Bases: Queue[T]

__init__(maxsize=0, *, loop=<object object>)
_format()
_get()
_get_loop()
_init(maxsize)
_put(item)
_wakeup_next(waiters)
empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(item)

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

put_nowait(item)

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
property maxsize

Number of items allowed in the queue.

class a_sync.primitives.queue._SmartFutureRef[source]

Bases: ReferenceType, Generic[T]

__call__(*args, **kwargs)

Call self as a function.

__init__(*args, **kwargs)
class a_sync.primitives.queue._VariablePriorityQueueMixin[source]

Bases: _PriorityQueueMixin[T]

_get(heapify=<built-in function heapify>, heappop=<built-in function heappop>)[source]

Resort the heap to consider any changes in priorities and pop the smallest value

_get_key(*args, **kwargs)[source]
Return type:

Tuple[Tuple[Any], Tuple[Tuple[str, Any]]]

_init(maxsize)
_put(item, heappush=<built-in function heappush>)
a_sync.primitives.queue._validate_args(i, can_return_less)[source]

Validates the arguments for methods that retrieve multiple items from the queue.

Parameters:
  • i (int) – The number of items to retrieve.

  • can_return_less (bool) – Whether the method is allowed to return fewer than i items.

Raises:
  • TypeError – If i is not an integer or can_return_less is not a boolean.

  • ValueError – If i is not greater than 1.

Return type:

None

Module contents

While not the focus of this lib, this module includes some new primitives and some modified versions of standard asyncio primitives.

class a_sync.primitives.CounterLock[source]

Bases: _DebugDaemonMixin

An async primitive that blocks until the internal counter has reached a specific value.

A coroutine can await counter.wait_for(3) and it will block until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will unblock as 5 >= 3.

The internal counter can only increase.

__init__(start_value=0, name=None)[source]

Initializes the CounterLock with a starting value and an optional name.

Parameters:
  • start_value (int) – The initial value of the counter.

  • name (optional) – An optional name for the counter, used in debug logs.

async _debug_daemon()[source]

Periodically logs debug information about the counter state and waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

set(value)[source]

Sets the counter to the specified value.

Parameters:

value (int) – The value to set the counter to. Must be >= the current value.

Raises:

ValueError – If the new value is less than the current value.

Return type:

None

async wait_for(value)[source]

Waits until the counter reaches or exceeds the specified value.

Parameters:

value (int) – The value to wait for.

Returns:

True when the counter reaches or exceeds the specified value.

Return type:

bool

_daemon
_events: DefaultDict[int, Event]

A defaultdict that maps each awaited value to an asyncio.Event that manages the waiters for that value.

_name

An optional name for the counter, used in debug logs.

_value

The current value of the counter.

property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

is_ready

A lambda function that indicates whether a given value has already been surpassed.

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

property value: int

Gets the current value of the counter.

Returns:

The current value of the counter.

class a_sync.primitives.Event[source]

Bases: Event, _DebugDaemonMixin

An asyncio.Event with additional debug logging to help detect deadlocks.

This event class extends asyncio.Event by adding debug logging capabilities. It logs detailed information about the event state and waiters, which can be useful for diagnosing and debugging potential deadlocks.

__init__(name='', debug_daemon_interval=300, *, loop=None)[source]

Initializes the Event.

Parameters:
  • name (str) – An optional name for the event, used in debug logs.

  • debug_daemon_interval (int) – The interval in seconds for the debug daemon to log information.

  • loop (Optional[asyncio.AbstractEventLoop]) – The event loop to use.

async _debug_daemon()[source]

Periodically logs debug information about the event state and waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

clear()[source]

Reset the internal flag to false. Subsequently, coroutines calling wait() will block until set() is called to set the internal flag to true again.

is_set()[source]

Return True if and only if the internal flag is true.

set()[source]

Set the internal flag to true. All coroutines waiting for it to become true are awakened. Coroutine that call wait() once the flag is true will not block at all.

async wait()[source]

Wait until the event is set.

Returns:

True when the event is set.

Return type:

Literal[True]

_daemon
_debug_daemon_interval
_loop: AbstractEventLoop = None
_value: bool
_waiters: Deque[Future[None]]
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

class a_sync.primitives.PrioritySemaphore[source]

Bases: _AbstractPrioritySemaphore[int | float | Decimal, _PrioritySemaphoreContextManager]

_context_manager_class

alias of _PrioritySemaphoreContextManager

__call__(fn)

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__getitem__(priority)
Parameters:

priority (PT | None)

Return type:

_AbstractPrioritySemaphoreContextManager[PT]

__init__(value=1, *, name=None)

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value (int) – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

Return type:

None

_count_waiters()
Return type:

Dict[PT, int]

async _debug_daemon()

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

Return type:

None

async acquire()

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

Return type:

bool

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_capacity
_context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
_daemon
_decorated: Set[str]
_loop = None
_potential_lost_waiters: List[Future[None]]
_top_priority = -1

It’s kinda like a regular Semaphore but you must give each waiter a priority:

``` priority_semaphore = PrioritySemaphore(10)

async with priority_semaphore[priority]:

await do_stuff()

```

You can aenter and aexit this semaphore without a priority and it will process those first. Like so:

``` priority_semaphore = PrioritySemaphore(10)

async with priority_semaphore:

await do_stuff()

```

_value: int
_waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

name: str | None
class a_sync.primitives.ProcessingQueue[source]

Bases: _Queue[Tuple[P, asyncio.Future[V]]], Generic[P, V]

__call__(*args, **kwargs)[source]

Call self as a function.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, return_data=True, name='', loop=None)[source]
Parameters:
Return type:

None

async __worker_coro()
Return type:

NoReturn

_create_future()[source]
Return type:

Future[V]

_ensure_workers()[source]
Return type:

None

_format()
_get()
_get_loop()
_init(maxsize)
_put(item)
_wakeup_next(waiters)
close()[source]
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

put_nowait(*args, **kwargs)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False
_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]
func
property maxsize

Number of items allowed in the queue.

property name: str
num_workers
class a_sync.primitives.Queue[source]

Bases: _Queue[T]

__init__(maxsize=0, *, loop=<object object>)[source]
_format()[source]
_get()[source]
_get_loop()
_init(maxsize)[source]
_put(item)[source]
_wakeup_next(waiters)[source]
empty()[source]

Return True if the queue is empty, False otherwise.

full()[source]

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()[source]

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Return type:

T

async get_all()[source]

returns 1 or more items

Return type:

List[T]

get_all_nowait()[source]

returns 1 or more items, or raises asyncio.QueueEmpty

Return type:

List[T]

async get_multi(i, can_return_less=False)[source]
Parameters:
Return type:

List[T]

get_multi_nowait(i, can_return_less=False)[source]

Just like asyncio.Queue.get_nowait, but will return i items instead of 1. Set can_return_less to True if you want to receive up to i items.

Parameters:
Return type:

List[T]

get_nowait()[source]

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

Return type:

T

async join()[source]

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(item)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:

item (T)

Return type:

None

put_nowait(item)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:

item (T)

Return type:

None

qsize()[source]

Number of items in the queue.

task_done()[source]

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_finished
_getters
_loop = None
_maxsize
_putters
_unfinished_tasks
property maxsize

Number of items allowed in the queue.

class a_sync.primitives.Semaphore[source]

Bases: Semaphore, _DebugDaemonMixin

A semaphore with additional debugging capabilities.

This semaphore includes debug logging.

Also, it can be used to decorate coroutine functions so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

__call__(fn)[source]

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(value, name=None, **kwargs)[source]

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value (int) – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

Return type:

None

async _debug_daemon()[source]

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()[source]

Wake up the first waiter that isn’t done.

async acquire()[source]

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)[source]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()[source]

Returns True if semaphore cannot be acquired immediately.

release()[source]

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_daemon
_decorated: Set[str]
_loop = None
_value
_waiters
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

name
class a_sync.primitives.SmartProcessingQueue[source]

Bases: _VariablePriorityQueueMixin[T], ProcessingQueue[Concatenate[T, P], V]

A PriorityProcessingQueue subclass that will execute jobs with the most waiters first

__call__(*args, **kwargs)

Call self as a function.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

Future[V]

__init__(func, num_workers, *, name='', loop=None)[source]
Parameters:
Return type:

None

async __worker_coro()
Return type:

NoReturn

_create_future(key)[source]
Parameters:

key (Tuple[Tuple[Any], Tuple[Tuple[str, Any]]])

Return type:

Future[V]

_ensure_workers()
Return type:

None

_format()
_get()[source]

Resort the heap to consider any changes in priorities and pop the smallest value

_get_key(*args, **kwargs)
Return type:

Tuple[Tuple[Any], Tuple[Tuple[str, Any]]]

_get_loop()
_init(maxsize)
_put(item, heappush=<built-in function heappush>)
_wakeup_next(waiters)
close()
Return type:

None

empty()

Return True if the queue is empty, False otherwise.

full()

Return True if there are maxsize items in the queue.

Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.

async get()

Remove and return an item from the queue.

If queue is empty, wait until an item is available.

get_nowait()

Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

async join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

async put(*args, **kwargs)[source]

Put an item into the queue.

Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

SmartFuture[V]

put_nowait(*args, **kwargs)[source]

Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Parameters:
  • args (~P)

  • kwargs (~P)

Return type:

SmartFuture[V]

qsize()

Number of items in the queue.

task_done()

Indicate that a formerly enqueued task is complete.

Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises ValueError if called more times than there were items placed in the queue.

_closed: bool = False
_finished
_futs: WeakValueDictionary[Tuple[Tuple[Any], Tuple[Tuple[str, Any]]], SmartFuture[T]]
_getters
_loop = None
_maxsize
_no_futs = False
_putters
_unfinished_tasks
_worker_coro
property _workers: Task[NoReturn]
func
property maxsize

Number of items allowed in the queue.

property name: str
num_workers
class a_sync.primitives.ThreadsafeSemaphore[source]

Bases: Semaphore

While its a bit weird to run multiple event loops, sometimes either you or a lib you’re using must do so. When in use in threaded applications, this semaphore will not work as intended but at least your program will function. You may need to reduce the semaphore value for multi-threaded applications.

# TL;DR it’s a janky fix for an edge case problem and will otherwise function as a normal a_sync.Semaphore (which is just an asyncio.Semaphore with extra bells and whistles).

__call__(fn)

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(value, name=None)[source]

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value (int | None) – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

Return type:

None

async _debug_daemon()

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

async acquire()

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_daemon
_decorated: Set[str]
_loop = None
_value
_waiters
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

dummy
property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

name
property semaphore: Semaphore

Returns the appropriate semaphore for the current thread.

NOTE: We can’t cache this property because we need to check the current thread every time we access it.

semaphores: DefaultDict[Thread, Semaphore]
property use_dummy: bool