a_sync.primitives package
Subpackages
- a_sync.primitives.locks package
- Submodules
- a_sync.primitives.locks.counter module
CounterLock
CounterLock.__init__()
CounterLock._debug_daemon()
CounterLock._ensure_debug_daemon()
CounterLock._start_debug_daemon()
CounterLock._stop_debug_daemon()
CounterLock.set()
CounterLock.wait_for()
CounterLock._daemon
CounterLock._events
CounterLock._name
CounterLock._value
CounterLock.debug_logs_enabled
CounterLock.is_ready
CounterLock.logger
CounterLock.value
CounterLockCluster
- a_sync.primitives.locks.event module
Event
Event.__init__()
Event._debug_daemon()
Event._ensure_debug_daemon()
Event._get_loop()
Event._start_debug_daemon()
Event._stop_debug_daemon()
Event.clear()
Event.is_set()
Event.set()
Event.wait()
Event._daemon
Event._debug_daemon_interval
Event._loop
Event._value
Event._waiters
Event.debug_logs_enabled
Event.logger
- a_sync.primitives.locks.prio_semaphore module
Priority
PrioritySemaphore
PrioritySemaphore._context_manager_class
PrioritySemaphore.__call__()
PrioritySemaphore.__getitem__()
PrioritySemaphore.__init__()
PrioritySemaphore._count_waiters()
PrioritySemaphore._debug_daemon()
PrioritySemaphore._ensure_debug_daemon()
PrioritySemaphore._get_loop()
PrioritySemaphore._start_debug_daemon()
PrioritySemaphore._stop_debug_daemon()
PrioritySemaphore._wake_up_next()
PrioritySemaphore.acquire()
PrioritySemaphore.decorate()
PrioritySemaphore.locked()
PrioritySemaphore.release()
PrioritySemaphore._capacity
PrioritySemaphore._context_managers
PrioritySemaphore._daemon
PrioritySemaphore._decorated
PrioritySemaphore._loop
PrioritySemaphore._potential_lost_waiters
PrioritySemaphore._top_priority
PrioritySemaphore._value
PrioritySemaphore._waiters
PrioritySemaphore.debug_logs_enabled
PrioritySemaphore.logger
PrioritySemaphore.name
_AbstractPrioritySemaphore
_AbstractPrioritySemaphore.__call__()
_AbstractPrioritySemaphore.__getitem__()
_AbstractPrioritySemaphore.__init__()
_AbstractPrioritySemaphore._count_waiters()
_AbstractPrioritySemaphore._debug_daemon()
_AbstractPrioritySemaphore._ensure_debug_daemon()
_AbstractPrioritySemaphore._get_loop()
_AbstractPrioritySemaphore._start_debug_daemon()
_AbstractPrioritySemaphore._stop_debug_daemon()
_AbstractPrioritySemaphore._wake_up_next()
_AbstractPrioritySemaphore.acquire()
_AbstractPrioritySemaphore.decorate()
_AbstractPrioritySemaphore.locked()
_AbstractPrioritySemaphore.release()
_AbstractPrioritySemaphore._capacity
_AbstractPrioritySemaphore._context_manager_class
_AbstractPrioritySemaphore._context_managers
_AbstractPrioritySemaphore._daemon
_AbstractPrioritySemaphore._decorated
_AbstractPrioritySemaphore._loop
_AbstractPrioritySemaphore._potential_lost_waiters
_AbstractPrioritySemaphore._top_priority
_AbstractPrioritySemaphore._value
_AbstractPrioritySemaphore._waiters
_AbstractPrioritySemaphore.debug_logs_enabled
_AbstractPrioritySemaphore.logger
_AbstractPrioritySemaphore.name
_AbstractPrioritySemaphoreContextManager
_AbstractPrioritySemaphoreContextManager.__call__()
_AbstractPrioritySemaphoreContextManager.__init__()
_AbstractPrioritySemaphoreContextManager._debug_daemon()
_AbstractPrioritySemaphoreContextManager._ensure_debug_daemon()
_AbstractPrioritySemaphoreContextManager._get_loop()
_AbstractPrioritySemaphoreContextManager._repr_no_parent_()
_AbstractPrioritySemaphoreContextManager._start_debug_daemon()
_AbstractPrioritySemaphoreContextManager._stop_debug_daemon()
_AbstractPrioritySemaphoreContextManager._wake_up_next()
_AbstractPrioritySemaphoreContextManager.acquire()
_AbstractPrioritySemaphoreContextManager.decorate()
_AbstractPrioritySemaphoreContextManager.locked()
_AbstractPrioritySemaphoreContextManager.release()
_AbstractPrioritySemaphoreContextManager._daemon
_AbstractPrioritySemaphoreContextManager._decorated
_AbstractPrioritySemaphoreContextManager._loop
_AbstractPrioritySemaphoreContextManager._parent
_AbstractPrioritySemaphoreContextManager._priority
_AbstractPrioritySemaphoreContextManager._priority_name
_AbstractPrioritySemaphoreContextManager._value
_AbstractPrioritySemaphoreContextManager._waiters
_AbstractPrioritySemaphoreContextManager.debug_logs_enabled
_AbstractPrioritySemaphoreContextManager.logger
_AbstractPrioritySemaphoreContextManager.loop
_AbstractPrioritySemaphoreContextManager.name
_AbstractPrioritySemaphoreContextManager.waiters
_PrioritySemaphoreContextManager
_PrioritySemaphoreContextManager.__call__()
_PrioritySemaphoreContextManager.__init__()
_PrioritySemaphoreContextManager._debug_daemon()
_PrioritySemaphoreContextManager._ensure_debug_daemon()
_PrioritySemaphoreContextManager._get_loop()
_PrioritySemaphoreContextManager._repr_no_parent_()
_PrioritySemaphoreContextManager._start_debug_daemon()
_PrioritySemaphoreContextManager._stop_debug_daemon()
_PrioritySemaphoreContextManager._wake_up_next()
_PrioritySemaphoreContextManager.acquire()
_PrioritySemaphoreContextManager.decorate()
_PrioritySemaphoreContextManager.locked()
_PrioritySemaphoreContextManager.release()
_PrioritySemaphoreContextManager._daemon
_PrioritySemaphoreContextManager._decorated
_PrioritySemaphoreContextManager._loop
_PrioritySemaphoreContextManager._parent
_PrioritySemaphoreContextManager._priority
_PrioritySemaphoreContextManager._priority_name
_PrioritySemaphoreContextManager._value
_PrioritySemaphoreContextManager._waiters
_PrioritySemaphoreContextManager.debug_logs_enabled
_PrioritySemaphoreContextManager.logger
_PrioritySemaphoreContextManager.loop
_PrioritySemaphoreContextManager.name
_PrioritySemaphoreContextManager.waiters
- a_sync.primitives.locks.semaphore module
DummySemaphore
Semaphore
Semaphore.__call__()
Semaphore.__init__()
Semaphore._debug_daemon()
Semaphore._ensure_debug_daemon()
Semaphore._get_loop()
Semaphore._start_debug_daemon()
Semaphore._stop_debug_daemon()
Semaphore._wake_up_next()
Semaphore.acquire()
Semaphore.decorate()
Semaphore.locked()
Semaphore.release()
Semaphore._daemon
Semaphore._decorated
Semaphore._loop
Semaphore._value
Semaphore._waiters
Semaphore.debug_logs_enabled
Semaphore.logger
Semaphore.name
ThreadsafeSemaphore
ThreadsafeSemaphore.__call__()
ThreadsafeSemaphore.__init__()
ThreadsafeSemaphore._debug_daemon()
ThreadsafeSemaphore._ensure_debug_daemon()
ThreadsafeSemaphore._get_loop()
ThreadsafeSemaphore._start_debug_daemon()
ThreadsafeSemaphore._stop_debug_daemon()
ThreadsafeSemaphore._wake_up_next()
ThreadsafeSemaphore.acquire()
ThreadsafeSemaphore.decorate()
ThreadsafeSemaphore.locked()
ThreadsafeSemaphore.release()
ThreadsafeSemaphore._daemon
ThreadsafeSemaphore._decorated
ThreadsafeSemaphore._loop
ThreadsafeSemaphore._value
ThreadsafeSemaphore._waiters
ThreadsafeSemaphore.debug_logs_enabled
ThreadsafeSemaphore.dummy
ThreadsafeSemaphore.logger
ThreadsafeSemaphore.name
ThreadsafeSemaphore.semaphore
ThreadsafeSemaphore.semaphores
ThreadsafeSemaphore.use_dummy
- Module contents
Submodules
a_sync.primitives._debug module
This module provides a mixin class used to facilitate the creation of debugging daemons in subclasses.
The mixin provides a framework for managing a debug daemon task, which can be used to emit rich debug logs from subclass instances whenever debug logging is enabled. Subclasses must implement the specific logging behavior.
- class a_sync.primitives._debug._DebugDaemonMixin[source]
Bases:
_LoggerMixin
A mixin class that provides a framework for debugging capabilities using a daemon task.
This mixin sets up the structure for managing a debug daemon task. Subclasses are responsible for implementing the specific behavior of the daemon, including any logging functionality.
See also
_LoggerMixin
for logging capabilities.- abstract async _debug_daemon(fut, fn, *args, **kwargs)[source]
Abstract method to define the debug daemon’s behavior.
Subclasses must implement this method to specify what the debug daemon should do, including any logging or monitoring tasks.
- Parameters:
fut (Future) – The future associated with the daemon.
fn – The function to be debugged.
*args – Positional arguments for the function.
**kwargs – Keyword arguments for the function.
- Return type:
None
Examples
Implementing a simple debug daemon in a subclass:
class MyDebugClass(_DebugDaemonMixin): async def _debug_daemon(self, fut, fn, *args, **kwargs): while not fut.done(): self.logger.debug("Debugging...") await asyncio.sleep(1)
- _ensure_debug_daemon(*args, **kwargs)[source]
Ensures that the debug daemon task is running.
This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
Examples
Ensuring the debug daemon is running:
my_instance = MyDebugClass() my_instance._ensure_debug_daemon()
See also
_start_debug_daemon()
for starting the daemon.
- _start_debug_daemon(*args, **kwargs)[source]
Starts the debug daemon task if debug logging is enabled and the event loop is running.
This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
Examples
Starting the debug daemon:
my_instance = MyDebugClass() my_instance._start_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _stop_debug_daemon(t=None)[source]
Stops the debug daemon task.
This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
Examples
Stopping the debug daemon:
my_instance = MyDebugClass() my_instance._stop_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _daemon
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- property logger: Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
a_sync.primitives._loggable module
This module provides a mixin class to add debug logging capabilities to other classes.
- class a_sync.primitives._loggable._LoggerMixin[source]
Bases:
object
A mixin class that adds logging capabilities to other classes.
This mixin provides a cached property for accessing a logger instance and a property to check if debug logging is enabled.
See also
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- property logger: Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
a_sync.primitives.queue module
This module provides various queue implementations for managing asynchronous tasks. It includes standard FIFO queues, priority queues, and processing queues with enhanced functionality.
- Classes:
Queue: A generic asynchronous queue that extends the functionality of asyncio.Queue. ProcessingQueue: A queue designed for processing tasks asynchronously with multiple workers. PriorityProcessingQueue: A priority-based processing queue where tasks are processed based on priority. SmartProcessingQueue: A processing queue that executes jobs with the most waiters first, supporting dynamic priorities.
See also
asyncio.Queue: The base class for asynchronous FIFO queues. asyncio.PriorityQueue: The base class for priority queues.
- class a_sync.primitives.queue.PriorityProcessingQueue[source]
Bases:
_PriorityQueueMixin
[T
],ProcessingQueue
[T
,V
]A priority-based processing queue where tasks are processed based on priority.
This queue allows tasks to be added with a specified priority, ensuring that higher priority tasks are processed before lower priority ones. It is ideal for scenarios where task prioritization is crucial.
Example
>>> async def process_task(data): return data.upper() >>> queue = PriorityProcessingQueue(func=process_task, num_workers=5) >>> fut = await queue.put(priority=1, item='task') >>> print(await fut) TASK
See also
- __call__(*args, **kwargs)
Submits a task to the queue.
Example
>>> fut = queue(*args, **kwargs) >>> print(fut)
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, return_data=True, name='', loop=None)
Initializes a processing queue with the given worker function and worker count.
- Parameters:
func (Callable[[~P], Awaitable[V]]) – The task function to process.
num_workers (int) – Number of workers to process tasks.
return_data (bool) – Whether tasks should return data via futures. Defaults to True.
name (str) – Name of the queue. Defaults to an empty string.
loop (AbstractEventLoop | None) – Optional event loop for the queue.
- Return type:
None
Example
>>> queue = ProcessingQueue(func=my_task_func, num_workers=3, name='myqueue')
- _create_future()
Creates a future for the task.
- Return type:
Future[V]
- _ensure_workers()
Ensures that the worker tasks are running.
- Return type:
None
- _format()
- _get(heappop=<built-in function heappop>)[source]
Retrieves the highest priority task from the queue.
- Returns:
The priority, task arguments, keyword arguments, and future of the task.
Example
>>> task = queue._get() >>> print(task)
- _get_loop()
- _init(maxsize)
Initializes the priority queue.
Example
>>> queue._init(maxsize=10)
- _put(item, heappush=<built-in function heappush>)
Adds an item to the priority queue based on its priority.
Example
>>> queue._put(item='task')
- _wakeup_next(waiters)
- close()
Closes the queue, preventing further task submissions.
Example
>>> queue.close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(priority, *args, **kwargs)[source]
Asynchronously adds a task with priority to the queue.
- Parameters:
priority (Any) – The priority of the task.
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future representing the result of the task.
- Return type:
Future[V]
Example
>>> fut = await queue.put(priority=1, item='task') >>> print(await fut)
- put_nowait(priority, *args, **kwargs)[source]
Immediately adds a task with priority to the queue without waiting.
- Parameters:
priority (Any) – The priority of the task.
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future representing the result of the task.
- Return type:
Future[V]
Example
>>> fut = queue.put_nowait(priority=1, item='task') >>> print(await fut)
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _name
Optional name for the queue.
- _no_futs
Indicates whether tasks will return data via futures.
- _putters
- _unfinished_tasks
- _worker_coro
- func
The function that each worker will process.
- property maxsize
Number of items allowed in the queue.
- property name: str
Returns the name of the queue, or its representation.
Example
>>> print(queue.name)
- num_workers
The number of worker tasks for processing.
- class a_sync.primitives.queue.ProcessingQueue[source]
Bases:
_Queue
[Tuple
[P
,asyncio.Future[V]
]],Generic
[P
,V
]A queue designed for processing tasks asynchronously with multiple workers.
Each item in the queue is processed by a worker, and tasks can return results via asynchronous futures. This queue is ideal for scenarios where tasks need to be processed concurrently with a fixed number of workers.
Example
>>> async def process_task(data): return data.upper() >>> queue = ProcessingQueue(func=process_task, num_workers=5) >>> fut = await queue.put(item='task') >>> print(await fut) TASK
- __call__(*args, **kwargs)[source]
Submits a task to the queue.
Example
>>> fut = queue(*args, **kwargs) >>> print(fut)
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, return_data=True, name='', loop=None)[source]
Initializes a processing queue with the given worker function and worker count.
- Parameters:
func (Callable[[~P], Awaitable[V]]) – The task function to process.
num_workers (int) – Number of workers to process tasks.
return_data (bool) – Whether tasks should return data via futures. Defaults to True.
name (str) – Name of the queue. Defaults to an empty string.
loop (AbstractEventLoop | None) – Optional event loop for the queue.
- Return type:
None
Example
>>> queue = ProcessingQueue(func=my_task_func, num_workers=3, name='myqueue')
- async __worker_coro()
The coroutine executed by worker tasks to process the queue.
- Return type:
- _format()
- _get()
- _get_loop()
- _init(maxsize)
- _put(item)
- _wakeup_next(waiters)
- close()[source]
Closes the queue, preventing further task submissions.
Example
>>> queue.close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Asynchronously submits a task to the queue.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future result of the task.
- Return type:
Future[V]
Example
>>> fut = await queue.put(item='task') >>> print(await fut)
- put_nowait(*args, **kwargs)[source]
Immediately submits a task to the queue without waiting.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future result of the task.
- Return type:
Future[V]
Example
>>> fut = queue.put_nowait(item='task') >>> print(await fut)
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _name
Optional name for the queue.
- _no_futs
Indicates whether tasks will return data via futures.
- _putters
- _unfinished_tasks
- _worker_coro
- func
The function that each worker will process.
- property maxsize
Number of items allowed in the queue.
- property name: str
Returns the name of the queue, or its representation.
Example
>>> print(queue.name)
- num_workers
The number of worker tasks for processing.
- class a_sync.primitives.queue.Queue[source]
Bases:
_Queue
[T
]A generic asynchronous queue that extends the functionality of asyncio.Queue.
This implementation supports retrieving multiple items at once and handling task processing in both FIFO and LIFO order. It provides enhanced type hinting support and additional methods for bulk operations.
- Inherits from:
Example
>>> queue = Queue() >>> await queue.put(item='task1') >>> await queue.put(item='task2') >>> result = await queue.get() >>> print(result) task1 >>> all_tasks = await queue.get_all() >>> print(all_tasks) ['task2']
- _get_loop()
- full()[source]
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()[source]
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- Return type:
T
- async get_all()[source]
Asynchronously retrieves and removes all available items from the queue.
If the queue is empty, this method will wait until at least one item is available before returning.
Example
>>> tasks = await queue.get_all() >>> print(tasks)
- Return type:
List[T]
- get_all_nowait()[source]
Retrieves and removes all available items from the queue without waiting.
This method does not wait for items to be available and will raise an exception if the queue is empty.
- Raises:
QueueEmpty – If the queue is empty.
- Return type:
List[T]
Example
>>> tasks = queue.get_all_nowait() >>> print(tasks)
- async get_multi(i, can_return_less=False)[source]
Asynchronously retrieves up to i items from the queue.
- Parameters:
- Raises:
QueueEmpty – If no items are available and fewer items cannot be returned.
- Return type:
List[T]
Example
>>> tasks = await queue.get_multi(i=2, can_return_less=True) >>> print(tasks)
- get_multi_nowait(i, can_return_less=False)[source]
Retrieves up to i items from the queue without waiting.
- Parameters:
- Raises:
QueueEmpty – If no items are available and fewer items cannot be returned.
- Return type:
List[T]
Example
>>> tasks = queue.get_multi_nowait(i=3, can_return_less=True) >>> print(tasks)
- get_nowait()[source]
Retrieves and removes the next item from the queue without blocking.
This method does not wait for an item to be available and will raise an exception if the queue is empty.
- Raises:
QueueEmpty – If the queue is empty.
- Return type:
T
Example
>>> result = queue.get_nowait() >>> print(result)
- async join()[source]
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)[source]
Asynchronously adds an item to the queue.
If the queue is full, this method will block until space is available.
- Parameters:
item (T) – The item to add to the queue.
- Return type:
None
Example
>>> await queue.put(item='task')
- put_nowait(item)[source]
Adds an item to the queue without blocking.
This method does not wait for space to be available and will raise an exception if the queue is full.
- Parameters:
item (T) – The item to add to the queue.
- Raises:
QueueFull – If the queue is full.
- Return type:
None
Example
>>> queue.put_nowait(item='task')
- task_done()[source]
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.queue.SmartProcessingQueue[source]
Bases:
_VariablePriorityQueueMixin
[T
],ProcessingQueue
[Concatenate
[T
,P
],V
]A processing queue that will execute jobs with the most waiters first, supporting dynamic priorities.
This queue is designed to handle tasks with dynamic priorities, ensuring that tasks with the most waiters are prioritized. It is ideal for scenarios where task execution order is influenced by the number of waiters.
Example
>>> async def process_task(data): return data.upper() >>> queue = SmartProcessingQueue(func=process_task, num_workers=5) >>> fut = await queue.put(item='task') >>> print(await fut) TASK
See also
- __call__(*args, **kwargs)
Submits a task to the queue.
Example
>>> fut = queue(*args, **kwargs) >>> print(fut)
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, name='', loop=None)[source]
Initializes a smart processing queue with the given worker function.
- Parameters:
func (Callable[[Concatenate[T, ~P]], Awaitable[V]]) – The worker function.
num_workers (int) – Number of worker tasks.
name (str) – Optional name for the queue.
loop (AbstractEventLoop | None) – Optional event loop.
- Return type:
None
Example
>>> queue = SmartProcessingQueue(func=my_task_func, num_workers=3, name='smart_queue')
- async __worker_coro()
Worker coroutine responsible for processing tasks in the queue.
Retrieves tasks, executes them, and sets the results or exceptions for the futures.
- Raises:
Any – Exceptions raised during task processing are logged.
- Return type:
Example
>>> await queue.__worker_coro()
- _ensure_workers()
Ensures that the worker tasks are running.
- Return type:
None
- _format()
- _get()[source]
Retrieves the task with the highest priority from the queue.
- Returns:
The priority, task arguments, keyword arguments, and future of the task.
Example
>>> task = queue._get() >>> print(task)
- _get_key(*args, **kwargs)
Generates a unique key for task identification based on arguments.
- Parameters:
args – Positional arguments for the task.
kwargs – Keyword arguments for the task.
- Returns:
The generated key for the task.
- Return type:
Example
>>> key = queue._get_key(*args, **kwargs) >>> print(key)
- _get_loop()
- _init(maxsize)
Initializes the priority queue.
Example
>>> queue._init(maxsize=10)
- _put(item, heappush=<built-in function heappush>)
Adds an item to the priority queue based on its priority.
Example
>>> queue._put(item='task')
- _wakeup_next(waiters)
- close()
Closes the queue, preventing further task submissions.
Example
>>> queue.close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Asynchronously adds a task with smart future handling to the queue.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future representing the task’s result.
- Return type:
SmartFuture[V]
Example
>>> fut = await queue.put(item='task') >>> print(await fut)
- put_nowait(*args, **kwargs)[source]
Immediately adds a task with smart future handling to the queue without waiting.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future representing the task’s result.
- Return type:
SmartFuture[V]
Example
>>> fut = queue.put_nowait(item='task') >>> print(await fut)
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _futs: weakref.WeakValueDictionary[_smart._Key[T], _smart.SmartFuture[T]]
Weak reference dictionary for managing smart futures.
- _getters
- _loop = None
- _maxsize
- _name
Optional name for the queue.
- _no_futs = False
Whether smart futures are used.
- _putters
- _unfinished_tasks
- _worker_coro
- func
The function that each worker will process.
- property maxsize
Number of items allowed in the queue.
- property name: str
Returns the name of the queue, or its representation.
Example
>>> print(queue.name)
- num_workers
The number of worker tasks for processing.
- class a_sync.primitives.queue.VariablePriorityQueue[source]
Bases:
_VariablePriorityQueueMixin
[T
],PriorityQueue
A
PriorityQueue
subclass that allows priorities to be updated (or computed) on the fly.This queue supports dynamic priority updates, making it suitable for tasks where priorities may change over time. It ensures that tasks are processed based on the most current priority.
Example
>>> queue = VariablePriorityQueue() >>> queue.put_nowait((1, 'task1')) >>> queue.put_nowait((2, 'task2')) >>> task = queue.get_nowait() >>> print(task)
See also
- __init__(maxsize=0, *, loop=<object object>)
- _format()
- _get(heapify=<built-in function heapify>, heappop=<built-in function heappop>)
Resorts the priority queue to consider any changes in priorities and retrieves the task with the highest updated priority.
- Parameters:
heapify – Function to resort the heap.
heappop – Function to pop the highest priority task.
- Returns:
The highest priority task in the queue.
Example
>>> task = queue._get() >>> print(task)
- _get_key(*args, **kwargs)
Generates a unique key for task identification based on arguments.
- Parameters:
args – Positional arguments for the task.
kwargs – Keyword arguments for the task.
- Returns:
The generated key for the task.
- Return type:
Example
>>> key = queue._get_key(*args, **kwargs) >>> print(key)
- _get_loop()
- _init(maxsize)
Initializes the priority queue.
Example
>>> queue._init(maxsize=10)
- _put(item, heappush=<built-in function heappush>)
Adds an item to the priority queue based on its priority.
Example
>>> queue._put(item='task')
- _wakeup_next(waiters)
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- put_nowait(item)
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _loop = None
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.queue._PriorityQueueMixin[source]
Bases:
Generic
[T
]Mixin for creating priority queue functionality with support for custom comparison.
See also
- class a_sync.primitives.queue._Queue[source]
Bases:
Queue
[T
]- __init__(maxsize=0, *, loop=<object object>)
- _format()
- _get()
- _get_loop()
- _init(maxsize)
- _put(item)
- _wakeup_next(waiters)
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)
Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free slot is available before adding item.
- put_nowait(item)
Put an item into the queue without blocking.
If no free slot is immediately available, raise QueueFull.
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.queue._SmartFutureRef[source]
Bases:
ReferenceType
,Generic
[T
]Weak reference for
SmartFuture
objects used in priority queues.See also
SmartFuture
- __call__(*args, **kwargs)
Call self as a function.
- __init__(*args, **kwargs)
- class a_sync.primitives.queue._VariablePriorityQueueMixin[source]
Bases:
_PriorityQueueMixin
[T
]Mixin for priority queues where task priorities can be updated dynamically.
See also
- _get(heapify=<built-in function heapify>, heappop=<built-in function heappop>)[source]
Resorts the priority queue to consider any changes in priorities and retrieves the task with the highest updated priority.
- Parameters:
heapify – Function to resort the heap.
heappop – Function to pop the highest priority task.
- Returns:
The highest priority task in the queue.
Example
>>> task = queue._get() >>> print(task)
- _get_key(*args, **kwargs)[source]
Generates a unique key for task identification based on arguments.
- Parameters:
args – Positional arguments for the task.
kwargs – Keyword arguments for the task.
- Returns:
The generated key for the task.
- Return type:
Example
>>> key = queue._get_key(*args, **kwargs) >>> print(key)
- _init(maxsize)
Initializes the priority queue.
Example
>>> queue._init(maxsize=10)
- _put(item, heappush=<built-in function heappush>)
Adds an item to the priority queue based on its priority.
Example
>>> queue._put(item='task')
- a_sync.primitives.queue._validate_args(i, can_return_less)[source]
Validates the arguments for methods that retrieve multiple items from the queue.
- Parameters:
- Raises:
~TypeError – If i is not an integer or can_return_less is not a boolean.
~ValueError – If i is not greater than 1.
- Return type:
None
Example
>>> _validate_args(i=2, can_return_less=False)
Module contents
This module includes both new primitives and modified versions of standard asyncio primitives.
The primitives provided in this module are:
- Semaphore
- PrioritySemaphore
- ThreadsafeSemaphore
- CounterLock
- Event
- Queue
- ProcessingQueue
- SmartProcessingQueue
These primitives extend or modify the functionality of standard asyncio primitives to provide additional features or improved performance for specific use cases.
Examples
Using a Semaphore to limit concurrent access:
>>> from a_sync.primitives.locks import Semaphore
>>> semaphore = Semaphore(2)
>>> async with semaphore:
... # perform some operation
... pass
Using a Queue to manage tasks:
>>> from a_sync.primitives.queue import Queue
>>> queue = Queue()
>>> await queue.put('task1')
>>> task = await queue.get()
>>> print(task)
task1
See also
asyncio
for standard asyncio primitives.a_sync.primitives.locks
for lock-related primitives.a_sync.primitives.queue
for queue-related primitives.
- class a_sync.primitives.CounterLock[source]
Bases:
_DebugDaemonMixin
An async primitive that uses an internal counter to manage task synchronization.
A coroutine can await counter.wait_for(3) and it will wait until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will proceed as 5 >= 3.
The internal counter can only be set to a value greater than the current value.
See also
CounterLockCluster
for managing multipleCounterLock
instances.- __init__(start_value=0, name=None)[source]
Initializes the
CounterLock
with a starting value and an optional name.- Parameters:
Examples
>>> counter = CounterLock(start_value=0, name="example_counter") >>> counter.value 0
- async _debug_daemon()[source]
Periodically logs debug information about the counter state and waiters.
This method is used internally to provide debugging information when debug logging is enabled.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
Examples
Ensuring the debug daemon is running:
my_instance = MyDebugClass() my_instance._ensure_debug_daemon()
See also
_start_debug_daemon()
for starting the daemon.
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
Examples
Starting the debug daemon:
my_instance = MyDebugClass() my_instance._start_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
Examples
Stopping the debug daemon:
my_instance = MyDebugClass() my_instance._stop_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- set(value)[source]
Sets the counter to the specified value.
This method internally uses the value property to enforce that the new value must be strictly greater than the current value.
- Parameters:
value (int) – The value to set the counter to. Must be strictly greater than the current value.
- Raises:
ValueError – If the new value is less than or equal to the current value.
- Return type:
None
Examples
>>> counter = CounterLock(start_value=0) >>> counter.set(5) >>> counter.value 5
See also
CounterLock.value()
for direct value assignment.
- async wait_for(value)[source]
Waits until the counter reaches or exceeds the specified value.
This method will ensure the debug daemon is running if the counter is not ready.
Examples
>>> counter = CounterLock(start_value=0) >>> await counter.wait_for(5) # This will block until counter.value >= 5
See also
CounterLock.set()
to set the counter value.
- _daemon
- _events: DefaultDict[int, Event]
A defaultdict that maps each awaited value to an
Event
that manages the waiters for that value.
- _name
An optional name for the counter, used in debug logs.
- _value
The current value of the counter.
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- is_ready
A lambda function that indicates whether the current counter value is greater than or equal to a given value.
- property logger: Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
- class a_sync.primitives.Event[source]
Bases:
Event
,_DebugDaemonMixin
An asyncio.Event with additional debug logging to help detect deadlocks.
This event class extends asyncio.Event by adding debug logging capabilities. It logs detailed information about the event state and waiters, which can be useful for diagnosing and debugging potential deadlocks.
- __init__(name='', debug_daemon_interval=300, *, loop=None)[source]
Initializes the Event.
- Parameters:
name (str) – An optional name for the event, used in debug logs.
debug_daemon_interval (int) – The interval in seconds for the debug daemon to log information.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop to use.
- async _debug_daemon()[source]
Periodically logs debug information about the event state and waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
Examples
Ensuring the debug daemon is running:
my_instance = MyDebugClass() my_instance._ensure_debug_daemon()
See also
_start_debug_daemon()
for starting the daemon.
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
Examples
Starting the debug daemon:
my_instance = MyDebugClass() my_instance._start_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
Examples
Stopping the debug daemon:
my_instance = MyDebugClass() my_instance._stop_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- clear()[source]
Reset the internal flag to false. Subsequently, coroutines calling wait() will block until set() is called to set the internal flag to true again.
- set()[source]
Set the internal flag to true. All coroutines waiting for it to become true are awakened. Coroutine that call wait() once the flag is true will not block at all.
- async wait()[source]
Wait until the event is set.
- Returns:
True when the event is set.
- Return type:
Literal[True]
- _daemon
- _debug_daemon_interval
- _loop: AbstractEventLoop = None
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- property logger: Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
- class a_sync.primitives.PrioritySemaphore[source]
Bases:
_AbstractPrioritySemaphore
[int
|float
|Decimal
,_PrioritySemaphoreContextManager
]Semaphore that uses numeric priorities for waiters.
This class extends
_AbstractPrioritySemaphore
and provides a concrete implementation using numeric priorities. The _context_manager_class is set to_PrioritySemaphoreContextManager
, and the _top_priority is set to -1, which is the highest priority.Examples
The primary way to use this semaphore is by specifying a priority.
>>> priority_semaphore = PrioritySemaphore(10) >>> async with priority_semaphore[priority]: ... await do_stuff()
You can also enter and exit this semaphore without specifying a priority, and it will use the top priority by default:
>>> priority_semaphore = PrioritySemaphore(10) >>> async with priority_semaphore: ... await do_stuff()
See also
_AbstractPrioritySemaphore
for the base class implementation.- _context_manager_class
alias of
_PrioritySemaphoreContextManager
- __call__(fn)
Decorator method to wrap coroutine functions with the semaphore.
This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __getitem__(priority)
Gets the context manager for a given priority.
- Parameters:
priority (PT | None) – The priority for which to get the context manager. If None, uses the top priority.
- Returns:
The context manager associated with the given priority.
- Return type:
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> context_manager = semaphore[priority]
- __init__(value=1, *, name=None)
Initializes the priority semaphore.
- Parameters:
- Return type:
None
Examples
>>> semaphore = _AbstractPrioritySemaphore(5, name="test_semaphore")
- _count_waiters()
Counts the number of waiters for each priority.
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> semaphore._count_waiters()
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
This method is part of the
_DebugDaemonMixin
and is used to provide detailed logging information about the semaphore’s state when it is being waited on.Example
semaphore = Semaphore(5)
- async def monitor():
await semaphore._debug_daemon()
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
Examples
Ensuring the debug daemon is running:
my_instance = MyDebugClass() my_instance._ensure_debug_daemon()
See also
_start_debug_daemon()
for starting the daemon.
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
Examples
Starting the debug daemon:
my_instance = MyDebugClass() my_instance._start_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
Examples
Stopping the debug daemon:
my_instance = MyDebugClass() my_instance._stop_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _wake_up_next()
Wakes up the next waiter in line.
This method handles the waking of waiters based on priority. It includes an emergency procedure to handle potential lost waiters, ensuring that no waiter is left indefinitely waiting.
The emergency procedure is a temporary measure to address potential issues with lost waiters.
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> semaphore._wake_up_next()
- Return type:
None
- async acquire()
Acquires the semaphore with the top priority.
This method overrides
Semaphore.acquire()
to handle priority-based logic.Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> await semaphore.acquire()
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked()
Checks if the semaphore is locked.
- Returns:
True if the semaphore cannot be acquired immediately, False otherwise.
- Return type:
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> semaphore.locked()
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _capacity
The initial capacity of the semaphore.
- _context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
A dictionary mapping priorities to their context managers.
- _daemon
- _loop = None
- _potential_lost_waiters: List[Future[None]]
A list of futures representing waiters that might have been lost.
- _top_priority = -1
- _waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
A heap queue of context managers, sorted by priority.
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- property logger: Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
- class a_sync.primitives.ProcessingQueue[source]
Bases:
_Queue
[Tuple
[P
,asyncio.Future[V]
]],Generic
[P
,V
]A queue designed for processing tasks asynchronously with multiple workers.
Each item in the queue is processed by a worker, and tasks can return results via asynchronous futures. This queue is ideal for scenarios where tasks need to be processed concurrently with a fixed number of workers.
Example
>>> async def process_task(data): return data.upper() >>> queue = ProcessingQueue(func=process_task, num_workers=5) >>> fut = await queue.put(item='task') >>> print(await fut) TASK
- __call__(*args, **kwargs)[source]
Submits a task to the queue.
Example
>>> fut = queue(*args, **kwargs) >>> print(fut)
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, return_data=True, name='', loop=None)[source]
Initializes a processing queue with the given worker function and worker count.
- Parameters:
func (Callable[[~P], Awaitable[V]]) – The task function to process.
num_workers (int) – Number of workers to process tasks.
return_data (bool) – Whether tasks should return data via futures. Defaults to True.
name (str) – Name of the queue. Defaults to an empty string.
loop (AbstractEventLoop | None) – Optional event loop for the queue.
- Return type:
None
Example
>>> queue = ProcessingQueue(func=my_task_func, num_workers=3, name='myqueue')
- async __worker_coro()
The coroutine executed by worker tasks to process the queue.
- Return type:
- _format()
- _get()
- _get_loop()
- _init(maxsize)
- _put(item)
- _wakeup_next(waiters)
- close()[source]
Closes the queue, preventing further task submissions.
Example
>>> queue.close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Asynchronously submits a task to the queue.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future result of the task.
- Return type:
Future[V]
Example
>>> fut = await queue.put(item='task') >>> print(await fut)
- put_nowait(*args, **kwargs)[source]
Immediately submits a task to the queue without waiting.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future result of the task.
- Return type:
Future[V]
Example
>>> fut = queue.put_nowait(item='task') >>> print(await fut)
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _name
Optional name for the queue.
- _no_futs
Indicates whether tasks will return data via futures.
- _putters
- _unfinished_tasks
- _worker_coro
- func
The function that each worker will process.
- property maxsize
Number of items allowed in the queue.
- property name: str
Returns the name of the queue, or its representation.
Example
>>> print(queue.name)
- num_workers
The number of worker tasks for processing.
- class a_sync.primitives.Queue[source]
Bases:
_Queue
[T
]A generic asynchronous queue that extends the functionality of asyncio.Queue.
This implementation supports retrieving multiple items at once and handling task processing in both FIFO and LIFO order. It provides enhanced type hinting support and additional methods for bulk operations.
- Inherits from:
Example
>>> queue = Queue() >>> await queue.put(item='task1') >>> await queue.put(item='task2') >>> result = await queue.get() >>> print(result) task1 >>> all_tasks = await queue.get_all() >>> print(all_tasks) ['task2']
- _get_loop()
- full()[source]
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()[source]
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- Return type:
T
- async get_all()[source]
Asynchronously retrieves and removes all available items from the queue.
If the queue is empty, this method will wait until at least one item is available before returning.
Example
>>> tasks = await queue.get_all() >>> print(tasks)
- Return type:
List[T]
- get_all_nowait()[source]
Retrieves and removes all available items from the queue without waiting.
This method does not wait for items to be available and will raise an exception if the queue is empty.
- Raises:
QueueEmpty – If the queue is empty.
- Return type:
List[T]
Example
>>> tasks = queue.get_all_nowait() >>> print(tasks)
- async get_multi(i, can_return_less=False)[source]
Asynchronously retrieves up to i items from the queue.
- Parameters:
- Raises:
QueueEmpty – If no items are available and fewer items cannot be returned.
- Return type:
List[T]
Example
>>> tasks = await queue.get_multi(i=2, can_return_less=True) >>> print(tasks)
- get_multi_nowait(i, can_return_less=False)[source]
Retrieves up to i items from the queue without waiting.
- Parameters:
- Raises:
QueueEmpty – If no items are available and fewer items cannot be returned.
- Return type:
List[T]
Example
>>> tasks = queue.get_multi_nowait(i=3, can_return_less=True) >>> print(tasks)
- get_nowait()[source]
Retrieves and removes the next item from the queue without blocking.
This method does not wait for an item to be available and will raise an exception if the queue is empty.
- Raises:
QueueEmpty – If the queue is empty.
- Return type:
T
Example
>>> result = queue.get_nowait() >>> print(result)
- async join()[source]
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(item)[source]
Asynchronously adds an item to the queue.
If the queue is full, this method will block until space is available.
- Parameters:
item (T) – The item to add to the queue.
- Return type:
None
Example
>>> await queue.put(item='task')
- put_nowait(item)[source]
Adds an item to the queue without blocking.
This method does not wait for space to be available and will raise an exception if the queue is full.
- Parameters:
item (T) – The item to add to the queue.
- Raises:
QueueFull – If the queue is full.
- Return type:
None
Example
>>> queue.put_nowait(item='task')
- task_done()[source]
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _getters
- _loop = None
- _maxsize
- _putters
- _unfinished_tasks
- property maxsize
Number of items allowed in the queue.
- class a_sync.primitives.Semaphore[source]
Bases:
Semaphore
,_DebugDaemonMixin
A semaphore with additional debugging capabilities inherited from
_DebugDaemonMixin
.This semaphore includes debug logging capabilities that are activated when the semaphore has waiters. It allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example
You can write this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
See also
_DebugDaemonMixin
for more details on debugging capabilities.- __call__(fn)[source]
Decorator method to wrap coroutine functions with the semaphore.
This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(value, name=None, **kwargs)[source]
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()[source]
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
This method is part of the
_DebugDaemonMixin
and is used to provide detailed logging information about the semaphore’s state when it is being waited on.Example
semaphore = Semaphore(5)
- async def monitor():
await semaphore._debug_daemon()
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
Examples
Ensuring the debug daemon is running:
my_instance = MyDebugClass() my_instance._ensure_debug_daemon()
See also
_start_debug_daemon()
for starting the daemon.
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
Examples
Starting the debug daemon:
my_instance = MyDebugClass() my_instance._start_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
Examples
Stopping the debug daemon:
my_instance = MyDebugClass() my_instance._stop_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- async acquire()[source]
Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.
If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.
- Returns:
True when the semaphore is successfully acquired.
- Return type:
Literal[True]
- decorate(fn)[source]
Wrap a coroutine function to ensure it runs with the semaphore.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- release()[source]
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _daemon
- _loop = None
- _value
- _waiters
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- property logger: Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
- name
- class a_sync.primitives.SmartProcessingQueue[source]
Bases:
_VariablePriorityQueueMixin
[T
],ProcessingQueue
[Concatenate
[T
,P
],V
]A processing queue that will execute jobs with the most waiters first, supporting dynamic priorities.
This queue is designed to handle tasks with dynamic priorities, ensuring that tasks with the most waiters are prioritized. It is ideal for scenarios where task execution order is influenced by the number of waiters.
Example
>>> async def process_task(data): return data.upper() >>> queue = SmartProcessingQueue(func=process_task, num_workers=5) >>> fut = await queue.put(item='task') >>> print(await fut) TASK
See also
- __call__(*args, **kwargs)
Submits a task to the queue.
Example
>>> fut = queue(*args, **kwargs) >>> print(fut)
- Parameters:
args (~P)
kwargs (~P)
- Return type:
Future[V]
- __init__(func, num_workers, *, name='', loop=None)[source]
Initializes a smart processing queue with the given worker function.
- Parameters:
func (Callable[[Concatenate[T, ~P]], Awaitable[V]]) – The worker function.
num_workers (int) – Number of worker tasks.
name (str) – Optional name for the queue.
loop (AbstractEventLoop | None) – Optional event loop.
- Return type:
None
Example
>>> queue = SmartProcessingQueue(func=my_task_func, num_workers=3, name='smart_queue')
- async __worker_coro()
Worker coroutine responsible for processing tasks in the queue.
Retrieves tasks, executes them, and sets the results or exceptions for the futures.
- Raises:
Any – Exceptions raised during task processing are logged.
- Return type:
Example
>>> await queue.__worker_coro()
- _ensure_workers()
Ensures that the worker tasks are running.
- Return type:
None
- _format()
- _get()[source]
Retrieves the task with the highest priority from the queue.
- Returns:
The priority, task arguments, keyword arguments, and future of the task.
Example
>>> task = queue._get() >>> print(task)
- _get_key(*args, **kwargs)
Generates a unique key for task identification based on arguments.
- Parameters:
args – Positional arguments for the task.
kwargs – Keyword arguments for the task.
- Returns:
The generated key for the task.
- Return type:
Example
>>> key = queue._get_key(*args, **kwargs) >>> print(key)
- _get_loop()
- _init(maxsize)
Initializes the priority queue.
Example
>>> queue._init(maxsize=10)
- _put(item, heappush=<built-in function heappush>)
Adds an item to the priority queue based on its priority.
Example
>>> queue._put(item='task')
- _wakeup_next(waiters)
- close()
Closes the queue, preventing further task submissions.
Example
>>> queue.close()
- Return type:
None
- empty()
Return True if the queue is empty, False otherwise.
- full()
Return True if there are maxsize items in the queue.
Note: if the Queue was initialized with maxsize=0 (the default), then full() is never True.
- async get()
Remove and return an item from the queue.
If queue is empty, wait until an item is available.
- get_nowait()
Remove and return an item from the queue.
Return an item if one is immediately available, else raise QueueEmpty.
- async join()
Block until all items in the queue have been gotten and processed.
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
- async put(*args, **kwargs)[source]
Asynchronously adds a task with smart future handling to the queue.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future representing the task’s result.
- Return type:
SmartFuture[V]
Example
>>> fut = await queue.put(item='task') >>> print(await fut)
- put_nowait(*args, **kwargs)[source]
Immediately adds a task with smart future handling to the queue without waiting.
- Parameters:
args (~P) – Positional arguments for the task.
kwargs (~P) – Keyword arguments for the task.
- Returns:
The future representing the task’s result.
- Return type:
SmartFuture[V]
Example
>>> fut = queue.put_nowait(item='task') >>> print(await fut)
- qsize()
Number of items in the queue.
- task_done()
Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.
If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in the queue.
- _finished
- _futs: weakref.WeakValueDictionary[_smart._Key[T], _smart.SmartFuture[T]]
Weak reference dictionary for managing smart futures.
- _getters
- _loop = None
- _maxsize
- _name
Optional name for the queue.
- _no_futs = False
Whether smart futures are used.
- _putters
- _unfinished_tasks
- _worker_coro
- func
The function that each worker will process.
- property maxsize
Number of items allowed in the queue.
- property name: str
Returns the name of the queue, or its representation.
Example
>>> print(queue.name)
- num_workers
The number of worker tasks for processing.
- class a_sync.primitives.ThreadsafeSemaphore[source]
Bases:
Semaphore
A semaphore that works in a multi-threaded environment.
This semaphore ensures that the program functions correctly even when used with multiple event loops. It provides a workaround for edge cases involving multiple threads and event loops by using a separate semaphore for each thread.
Example
semaphore = ThreadsafeSemaphore(5)
- async def limited():
- async with semaphore:
return 1
See also
Semaphore
for the base class implementation.- __call__(fn)
Decorator method to wrap coroutine functions with the semaphore.
This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(value, name=None)[source]
Initialize the threadsafe semaphore with a given value and optional name.
- Parameters:
value (int | None) – The initial value for the semaphore, should be an integer.
name (optional) – An optional name for the semaphore.
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
This method is part of the
_DebugDaemonMixin
and is used to provide detailed logging information about the semaphore’s state when it is being waited on.Example
semaphore = Semaphore(5)
- async def monitor():
await semaphore._debug_daemon()
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
Examples
Ensuring the debug daemon is running:
my_instance = MyDebugClass() my_instance._ensure_debug_daemon()
See also
_start_debug_daemon()
for starting the daemon.
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
Examples
Starting the debug daemon:
my_instance = MyDebugClass() my_instance._start_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
Examples
Stopping the debug daemon:
my_instance = MyDebugClass() my_instance._stop_debug_daemon()
See also
_ensure_debug_daemon()
for ensuring the daemon is running.
- _wake_up_next()
Wake up the first waiter that isn’t done.
- async acquire()
Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.
If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.
- Returns:
True when the semaphore is successfully acquired.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked()
Returns True if semaphore cannot be acquired immediately.
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _daemon
- _loop = None
- _value
- _waiters
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- dummy
- property logger: Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
- name
- property semaphore: Semaphore
Returns the appropriate semaphore for the current thread.
NOTE: We can’t cache this property because we need to check the current thread every time we access it.
Example
semaphore = ThreadsafeSemaphore(5)
- async def limited():
- async with semaphore.semaphore:
return 1
- semaphores: DefaultDict[Thread, Semaphore]