a_sync.primitives package
Subpackages
- a_sync.primitives.locks package
- Submodules
- a_sync.primitives.locks.counter module
CounterLock
CounterLock.__init__()
CounterLock._debug_daemon()
CounterLock._ensure_debug_daemon()
CounterLock._start_debug_daemon()
CounterLock._stop_debug_daemon()
CounterLock.set()
CounterLock.wait_for()
CounterLock._daemon
CounterLock._events
CounterLock._name
CounterLock._value
CounterLock.debug_logs_enabled
CounterLock.is_ready
CounterLock.logger
CounterLock.value
CounterLockCluster
- a_sync.primitives.locks.event module
Event
Event.__init__()
Event._debug_daemon()
Event._ensure_debug_daemon()
Event._get_loop()
Event._start_debug_daemon()
Event._stop_debug_daemon()
Event.clear()
Event.is_set()
Event.set()
Event.wait()
Event._daemon
Event._debug_daemon_interval
Event._loop
Event._value
Event._waiters
Event.debug_logs_enabled
Event.logger
- a_sync.primitives.locks.prio_semaphore module
Priority
PrioritySemaphore
PrioritySemaphore._context_manager_class
PrioritySemaphore.__call__()
PrioritySemaphore.__getitem__()
PrioritySemaphore.__init__()
PrioritySemaphore._count_waiters()
PrioritySemaphore._debug_daemon()
PrioritySemaphore._ensure_debug_daemon()
PrioritySemaphore._get_loop()
PrioritySemaphore._start_debug_daemon()
PrioritySemaphore._stop_debug_daemon()
PrioritySemaphore._wake_up_next()
PrioritySemaphore.acquire()
PrioritySemaphore.decorate()
PrioritySemaphore.locked()
PrioritySemaphore.release()
PrioritySemaphore._capacity
PrioritySemaphore._context_managers
PrioritySemaphore._daemon
PrioritySemaphore._decorated
PrioritySemaphore._loop
PrioritySemaphore._potential_lost_waiters
PrioritySemaphore._top_priority
PrioritySemaphore._value
PrioritySemaphore._waiters
PrioritySemaphore.debug_logs_enabled
PrioritySemaphore.logger
PrioritySemaphore.name
_AbstractPrioritySemaphore
_AbstractPrioritySemaphore.__call__()
_AbstractPrioritySemaphore.__getitem__()
_AbstractPrioritySemaphore.__init__()
_AbstractPrioritySemaphore._count_waiters()
_AbstractPrioritySemaphore._debug_daemon()
_AbstractPrioritySemaphore._ensure_debug_daemon()
_AbstractPrioritySemaphore._get_loop()
_AbstractPrioritySemaphore._start_debug_daemon()
_AbstractPrioritySemaphore._stop_debug_daemon()
_AbstractPrioritySemaphore._wake_up_next()
_AbstractPrioritySemaphore.acquire()
_AbstractPrioritySemaphore.decorate()
_AbstractPrioritySemaphore.locked()
_AbstractPrioritySemaphore.release()
_AbstractPrioritySemaphore._capacity
_AbstractPrioritySemaphore._context_manager_class
_AbstractPrioritySemaphore._context_managers
_AbstractPrioritySemaphore._daemon
_AbstractPrioritySemaphore._decorated
_AbstractPrioritySemaphore._loop
_AbstractPrioritySemaphore._potential_lost_waiters
_AbstractPrioritySemaphore._top_priority
_AbstractPrioritySemaphore._value
_AbstractPrioritySemaphore._waiters
_AbstractPrioritySemaphore.debug_logs_enabled
_AbstractPrioritySemaphore.logger
_AbstractPrioritySemaphore.name
_AbstractPrioritySemaphoreContextManager
_AbstractPrioritySemaphoreContextManager.__call__()
_AbstractPrioritySemaphoreContextManager.__init__()
_AbstractPrioritySemaphoreContextManager._debug_daemon()
_AbstractPrioritySemaphoreContextManager._ensure_debug_daemon()
_AbstractPrioritySemaphoreContextManager._get_loop()
_AbstractPrioritySemaphoreContextManager._repr_no_parent_()
_AbstractPrioritySemaphoreContextManager._start_debug_daemon()
_AbstractPrioritySemaphoreContextManager._stop_debug_daemon()
_AbstractPrioritySemaphoreContextManager._wake_up_next()
_AbstractPrioritySemaphoreContextManager.acquire()
_AbstractPrioritySemaphoreContextManager.decorate()
_AbstractPrioritySemaphoreContextManager.locked()
_AbstractPrioritySemaphoreContextManager.release()
_AbstractPrioritySemaphoreContextManager._daemon
_AbstractPrioritySemaphoreContextManager._decorated
_AbstractPrioritySemaphoreContextManager._loop
_AbstractPrioritySemaphoreContextManager._parent
_AbstractPrioritySemaphoreContextManager._priority
_AbstractPrioritySemaphoreContextManager._priority_name
_AbstractPrioritySemaphoreContextManager._value
_AbstractPrioritySemaphoreContextManager._waiters
_AbstractPrioritySemaphoreContextManager.debug_logs_enabled
_AbstractPrioritySemaphoreContextManager.logger
_AbstractPrioritySemaphoreContextManager.loop
_AbstractPrioritySemaphoreContextManager.name
_AbstractPrioritySemaphoreContextManager.waiters
_PrioritySemaphoreContextManager
_PrioritySemaphoreContextManager.__call__()
_PrioritySemaphoreContextManager.__init__()
_PrioritySemaphoreContextManager._debug_daemon()
_PrioritySemaphoreContextManager._ensure_debug_daemon()
_PrioritySemaphoreContextManager._get_loop()
_PrioritySemaphoreContextManager._repr_no_parent_()
_PrioritySemaphoreContextManager._start_debug_daemon()
_PrioritySemaphoreContextManager._stop_debug_daemon()
_PrioritySemaphoreContextManager._wake_up_next()
_PrioritySemaphoreContextManager.acquire()
_PrioritySemaphoreContextManager.decorate()
_PrioritySemaphoreContextManager.locked()
_PrioritySemaphoreContextManager.release()
_PrioritySemaphoreContextManager._daemon
_PrioritySemaphoreContextManager._decorated
_PrioritySemaphoreContextManager._loop
_PrioritySemaphoreContextManager._parent
_PrioritySemaphoreContextManager._priority
_PrioritySemaphoreContextManager._priority_name
_PrioritySemaphoreContextManager._value
_PrioritySemaphoreContextManager._waiters
_PrioritySemaphoreContextManager.debug_logs_enabled
_PrioritySemaphoreContextManager.logger
_PrioritySemaphoreContextManager.loop
_PrioritySemaphoreContextManager.name
_PrioritySemaphoreContextManager.waiters
- a_sync.primitives.locks.semaphore module
DummySemaphore
Semaphore
Semaphore.__call__()
Semaphore.__init__()
Semaphore._debug_daemon()
Semaphore._ensure_debug_daemon()
Semaphore._get_loop()
Semaphore._start_debug_daemon()
Semaphore._stop_debug_daemon()
Semaphore._wake_up_next()
Semaphore.acquire()
Semaphore.decorate()
Semaphore.locked()
Semaphore.release()
Semaphore._daemon
Semaphore._decorated
Semaphore._loop
Semaphore._value
Semaphore._waiters
Semaphore.debug_logs_enabled
Semaphore.logger
Semaphore.name
ThreadsafeSemaphore
ThreadsafeSemaphore.__call__()
ThreadsafeSemaphore.__init__()
ThreadsafeSemaphore._debug_daemon()
ThreadsafeSemaphore._ensure_debug_daemon()
ThreadsafeSemaphore._get_loop()
ThreadsafeSemaphore._start_debug_daemon()
ThreadsafeSemaphore._stop_debug_daemon()
ThreadsafeSemaphore._wake_up_next()
ThreadsafeSemaphore.acquire()
ThreadsafeSemaphore.decorate()
ThreadsafeSemaphore.locked()
ThreadsafeSemaphore.release()
ThreadsafeSemaphore._daemon
ThreadsafeSemaphore._decorated
ThreadsafeSemaphore._loop
ThreadsafeSemaphore._value
ThreadsafeSemaphore._waiters
ThreadsafeSemaphore.debug_logs_enabled
ThreadsafeSemaphore.dummy
ThreadsafeSemaphore.logger
ThreadsafeSemaphore.name
ThreadsafeSemaphore.semaphore
ThreadsafeSemaphore.semaphores
ThreadsafeSemaphore.use_dummy
- Module contents
Submodules
a_sync.primitives._debug module
This module provides a mixin class used to provide helpful information to developers during the debugging process.
The mixin ensures that rich debug logs are automagically emitted from subclass instances whenever debug logging is enabled.
- class a_sync.primitives._debug._DebugDaemonMixin[source]
Bases:
_LoggerMixin
A mixin class that provides debugging capabilities using a daemon task.
This mixin ensures that rich debug logs are automagically emitted from subclass instances whenever debug logging is enabled.
- abstract async _debug_daemon(fut, fn, *args, **kwargs)[source]
Abstract method to define the debug daemon’s behavior.
- Parameters:
fut (Future) – The future associated with the daemon.
fn – The function to be debugged.
*args – Positional arguments for the function.
**kwargs – Keyword arguments for the function.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)[source]
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _start_debug_daemon(*args, **kwargs)[source]
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)[source]
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- _daemon
a_sync.primitives._loggable module
This module provides a mixin class to add debug logging capabilities to other classes.
- class a_sync.primitives._loggable._LoggerMixin[source]
Bases:
object
A mixin class that adds logging capabilities to other classes.
This mixin provides a cached property for accessing a logger instance and a property to check if debug logging is enabled.
a_sync.primitives.queue module
This module provides various queue implementations for managing asynchronous tasks, including standard FIFO queues, priority queues, and processing queues. These queues support advanced features like waiting for multiple items, handling priority tasks, and processing tasks with multiple workers.
- class a_sync.primitives.queue.PriorityProcessingQueue[source]
Bases:
_PriorityQueueMixin
[T
],ProcessingQueue
[T
,V
]- __call__(*args, **kwargs)
Call self as a function.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, return_data=True, name='', loop=None)
- _create_future()
- Return type:
Future[V]
- _ensure_workers()
- Return type:
None
- _format()
- _get_loop()
- _init(maxsize)
- _put(item, heappush=<built-in function heappush>)
- _wakeup_next(waiters)
- close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(priority, *args, **kwargs)[source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- Parameters:
priority (Any)
args (~P)
kwargs (~P)
- Return type:
Future[V]
- put_nowait(priority, *args, **kwargs)[source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- Parameters:
priority (Any)
args (~P)
kwargs (~P)
- Return type:
Future[V]
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- _worker_coro
- func
- property maxsize
Number of items allowed in the queue.
- num_workers
- class a_sync.primitives.queue.ProcessingQueue[source]
Bases:
_Queue
[Tuple
[P
,asyncio.Future[V]
]],Generic
[P
,V
]- __call__(*args, **kwargs)[source]
Call self as a function.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- _format()
- _get()
- _get_loop()
- _init(maxsize)
- _put(item)
- _wakeup_next(waiters)
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- put_nowait(*args, **kwargs)[source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- _worker_coro
- func
- property maxsize
Number of items allowed in the queue.
- num_workers
- class a_sync.primitives.queue.Queue[source]
Bases:
_Queue
[T
]- _get_loop()
- full()[source]
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()[source]
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- Return type:
T
- get_all_nowait()[source]
returns 1 or more items, or raises asyncio.QueueEmpty
- Return type:
List[T]
- get_multi_nowait(i, can_return_less=False)[source]
Just like asyncio.Queue.get_nowait, but will return i items instead of 1. Set can_return_less to True if you want to receive up to i items.
- get_nowait()[source]
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- Return type:
T
- async join()[source]
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)[source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- Parameters:
item (T)
- Return type:
None
- put_nowait(item)[source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- Parameters:
item (T)
- Return type:
None
- task_done()[source]
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.queue.SmartProcessingQueue[source]
Bases:
_VariablePriorityQueueMixin
[T
],ProcessingQueue
[Concatenate
[T
,P
],V
]A PriorityProcessingQueue subclass that will execute jobs with the most waiters first
- __call__(*args, **kwargs)
Call self as a function.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, name='', loop=None)[source]
- Parameters:
func (Callable[[Concatenate[T, ~P]], Awaitable[V]])
num_workers (int)
name (str)
loop (AbstractEventLoop | None)
- Return type:
None
- _ensure_workers()
- Return type:
None
- _format()
- _get_loop()
- _init(maxsize)
- _put(item, heappush=<built-in function heappush>)
- _wakeup_next(waiters)
- close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
SmartFuture[V]
- put_nowait(*args, **kwargs)[source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
SmartFuture[V]
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _futs: WeakValueDictionary[Tuple[Tuple[Any], Tuple[Tuple[str, Any]]], SmartFuture[T]]
- _getters
- _loop = None
- _maxsize
- _no_futs = False
- _putters
- _unfinished_tasks
- _worker_coro
- func
- property maxsize
Number of items allowed in the queue.
- num_workers
- class a_sync.primitives.queue.VariablePriorityQueue[source]
Bases:
_VariablePriorityQueueMixin
[T
],PriorityQueue
A PriorityQueue subclass that allows priorities to be updated (or computed) on the fly
- __init__(maxsize=0, *, loop=<object object>)
- _format()
- _get(heapify=<built-in function heapify>, heappop=<built-in function heappop>)
Resort the heap to consider any changes in priorities and pop the smallest value
- _get_loop()
- _init(maxsize)
- _put(item, heappush=<built-in function heappush>)
- _wakeup_next(waiters)
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- put_nowait(item)
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _loop = None
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.queue._Queue[source]
Bases:
Queue
[T
]- __init__(maxsize=0, *, loop=<object object>)
- _format()
- _get()
- _get_loop()
- _init(maxsize)
- _put(item)
- _wakeup_next(waiters)
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- put_nowait(item)
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.queue._SmartFutureRef[source]
Bases:
ReferenceType
,Generic
[T
]- __call__(*args, **kwargs)
Call self as a function.
- __init__(*args, **kwargs)
- class a_sync.primitives.queue._VariablePriorityQueueMixin[source]
Bases:
_PriorityQueueMixin
[T
]- _get(heapify=<built-in function heapify>, heappop=<built-in function heappop>)[source]
Resort the heap to consider any changes in priorities and pop the smallest value
- _init(maxsize)
- _put(item, heappush=<built-in function heappush>)
- a_sync.primitives.queue._validate_args(i, can_return_less)[source]
Validates the arguments for methods that retrieve multiple items from the queue.
- Parameters:
- Raises:
TypeError – If i is not an integer or can_return_less is not a boolean.
ValueError – If i is not greater than 1.
- Return type:
None
Module contents
While not the focus of this lib, this module includes some new primitives and some modified versions of standard asyncio primitives.
- class a_sync.primitives.CounterLock[source]
Bases:
_DebugDaemonMixin
An async primitive that blocks until the internal counter has reached a specific value.
A coroutine can await counter.wait_for(3) and it will block until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will unblock as 5 >= 3.
The internal counter can only increase.
- __init__(start_value=0, name=None)[source]
Initializes the CounterLock with a starting value and an optional name.
- Parameters:
start_value (int) – The initial value of the counter.
name (optional) – An optional name for the counter, used in debug logs.
- async _debug_daemon()[source]
Periodically logs debug information about the counter state and waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- set(value)[source]
Sets the counter to the specified value.
- Parameters:
value (int) – The value to set the counter to. Must be >= the current value.
- Raises:
ValueError – If the new value is less than the current value.
- Return type:
None
- _daemon
- _events: DefaultDict[int, Event]
A defaultdict that maps each awaited value to an
asyncio.Event
that manages the waiters for that value.
- _name
An optional name for the counter, used in debug logs.
- _value
The current value of the counter.
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- is_ready
A lambda function that indicates whether a given value has already been surpassed.
- class a_sync.primitives.Event[source]
Bases:
Event
,_DebugDaemonMixin
An asyncio.Event with additional debug logging to help detect deadlocks.
This event class extends asyncio.Event by adding debug logging capabilities. It logs detailed information about the event state and waiters, which can be useful for diagnosing and debugging potential deadlocks.
- __init__(name='', debug_daemon_interval=300, *, loop=None)[source]
Initializes the Event.
- Parameters:
name (str) – An optional name for the event, used in debug logs.
debug_daemon_interval (int) – The interval in seconds for the debug daemon to log information.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop to use.
- async _debug_daemon()[source]
Periodically logs debug information about the event state and waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- clear()[source]
Reset the internal flag to false. Subsequently, coroutines calling wait() will block until set() is called to set the internal flag to true again.
- set()[source]
Set the internal flag to true. All coroutines waiting for it to become true are awakened. Coroutine that call wait() once the flag is true will not block at all.
- async wait()[source]
Wait until the event is set.
- Returns:
True when the event is set.
- Return type:
Literal[True]
- _daemon
- _debug_daemon_interval
- _loop: AbstractEventLoop = None
- class a_sync.primitives.PrioritySemaphore[source]
Bases:
_AbstractPrioritySemaphore
[int
|float
|Decimal
,_PrioritySemaphoreContextManager
]- _context_manager_class
alias of
_PrioritySemaphoreContextManager
- __call__(fn)
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __getitem__(priority)
- Parameters:
priority (PT | None)
- Return type:
- __init__(value=1, *, name=None)
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- _wake_up_next()
Wake up the first waiter that isn’t done.
- Return type:
None
- async acquire()
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _capacity
- _context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
- _daemon
- _loop = None
- _top_priority = -1
It’s kinda like a regular Semaphore but you must give each waiter a priority:
``` priority_semaphore = PrioritySemaphore(10)
- async with priority_semaphore[priority]:
await do_stuff()
You can aenter and aexit this semaphore without a priority and it will process those first. Like so:
``` priority_semaphore = PrioritySemaphore(10)
- async with priority_semaphore:
await do_stuff()
- _waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- class a_sync.primitives.ProcessingQueue[source]
Bases:
_Queue
[Tuple
[P
,asyncio.Future[V]
]],Generic
[P
,V
]- __call__(*args, **kwargs)[source]
Call self as a function.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- _format()
- _get()
- _get_loop()
- _init(maxsize)
- _put(item)
- _wakeup_next(waiters)
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- put_nowait(*args, **kwargs)[source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- _worker_coro
- func
- property maxsize
Number of items allowed in the queue.
- num_workers
- class a_sync.primitives.Queue[source]
Bases:
_Queue
[T
]- _get_loop()
- full()[source]
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()[source]
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- Return type:
T
- get_all_nowait()[source]
returns 1 or more items, or raises asyncio.QueueEmpty
- Return type:
List[T]
- get_multi_nowait(i, can_return_less=False)[source]
Just like asyncio.Queue.get_nowait, but will return i items instead of 1. Set can_return_less to True if you want to receive up to i items.
- get_nowait()[source]
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- Return type:
T
- async join()[source]
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)[source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- Parameters:
item (T)
- Return type:
None
- put_nowait(item)[source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- Parameters:
item (T)
- Return type:
None
- task_done()[source]
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.Semaphore[source]
Bases:
Semaphore
,_DebugDaemonMixin
A semaphore with additional debugging capabilities.
This semaphore includes debug logging.
Also, it can be used to decorate coroutine functions so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __call__(fn)[source]
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(value, name=None, **kwargs)[source]
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()[source]
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- async acquire()[source]
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)[source]
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- release()[source]
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _daemon
- _loop = None
- _value
- _waiters
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- property logger: Logger
Returns a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
- Returns:
A logger instance for the class.
- Return type:
Logger
- name
- class a_sync.primitives.SmartProcessingQueue[source]
Bases:
_VariablePriorityQueueMixin
[T
],ProcessingQueue
[Concatenate
[T
,P
],V
]A PriorityProcessingQueue subclass that will execute jobs with the most waiters first
- __call__(*args, **kwargs)
Call self as a function.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, name='', loop=None)[source]
- Parameters:
func (Callable[[Concatenate[T, ~P]], Awaitable[V]])
num_workers (int)
name (str)
loop (AbstractEventLoop | None)
- Return type:
None
- _ensure_workers()
- Return type:
None
- _format()
- _get_loop()
- _init(maxsize)
- _put(item, heappush=<built-in function heappush>)
- _wakeup_next(waiters)
- close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
SmartFuture[V]
- put_nowait(*args, **kwargs)[source]
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- Parameters:
args (~P)
kwargs (~P)
- Return type:
SmartFuture[V]
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _futs: WeakValueDictionary[Tuple[Tuple[Any], Tuple[Tuple[str, Any]]], SmartFuture[T]]
- _getters
- _loop = None
- _maxsize
- _no_futs = False
- _putters
- _unfinished_tasks
- _worker_coro
- func
- property maxsize
Number of items allowed in the queue.
- num_workers
- class a_sync.primitives.ThreadsafeSemaphore[source]
Bases:
Semaphore
While its a bit weird to run multiple event loops, sometimes either you or a lib you’re using must do so. When in use in threaded applications, this semaphore will not work as intended but at least your program will function. You may need to reduce the semaphore value for multi-threaded applications.
# TL;DR it’s a janky fix for an edge case problem and will otherwise function as a normal a_sync.Semaphore (which is just an asyncio.Semaphore with extra bells and whistles).
- __call__(fn)
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(value, name=None)[source]
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int | None) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- _wake_up_next()
Wake up the first waiter that isn’t done.
- async acquire()
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked()
Returns True if semaphore cannot be acquired immediately.
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _daemon
- _loop = None
- _value
- _waiters
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- dummy
- property logger: Logger
Returns a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
- Returns:
A logger instance for the class.
- Return type:
Logger
- name
- property semaphore: Semaphore
Returns the appropriate semaphore for the current thread.
NOTE: We can’t cache this property because we need to check the current thread every time we access it.
- semaphores: DefaultDict[Thread, Semaphore]