a_sync.primitives.locks package

Submodules

a_sync.primitives.locks.counter module

This module provides two specialized async flow management classes, CounterLock and CounterLockCluster.

These primitives manage synchronization of tasks that must wait for an internal counter to reach a specific value.

class a_sync.primitives.locks.counter.CounterLock

Bases: _DebugDaemonMixin

CounterLock(int start_value: int = 0, unicode name=u’’)

An async primitive that uses an internal counter to manage task synchronization.

A coroutine can await counter.wait_for(3) and it will wait until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will proceed as 5 >= 3.

The internal counter can only be set to a value greater than the current value.

See also

CounterLockCluster for managing multiple CounterLock instances.

__init__()

Initializes the CounterLock with a starting value and an optional name.

Parameters:
  • start_value – The initial value of the counter.

  • name – An optional name for the counter, used in debug logs.

Examples

>>> counter = CounterLock(start_value=0, name="example_counter")
>>> counter.value
0
async _debug_daemon(self) None

Periodically logs debug information about the counter state and waiters.

This method is used internally to provide debugging information when debug logging is enabled.

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

is_ready(self, long long v) bool

A function that indicates whether the current counter value is greater than or equal to a given value.

set(self, long long value) void

Sets the counter to the specified value.

This method internally uses the value property to enforce that the new value must be strictly greater than the current value.

Parameters:

value – The value to set the counter to. Must be strictly greater than the current value.

Raises:

ValueError – If the new value is less than or equal to the current value.

Examples

>>> counter = CounterLock(start_value=0)
>>> counter.set(5)
>>> counter.value
5

See also

CounterLock.value() for direct value assignment.

async wait_for(self, long long value) bint

Waits until the counter reaches or exceeds the specified value.

This method will ensure the debug daemon is running if the counter is not ready.

Parameters:

value – The value to wait for.

Return type:

bint

Examples

>>> counter = CounterLock(start_value=0)
>>> await counter.wait_for(5)  # This will block until counter.value >= 5

See also

CounterLock.set() to set the counter value.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_name

str

Type:

CounterLock._name

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

value

int

Gets the current value of the counter.

Examples

>>> counter = CounterLock(start_value=0)
>>> counter.value
0
Type:

CounterLock.value

class a_sync.primitives.locks.counter.CounterLockCluster

Bases: object

An asyncio primitive that represents a collection of CounterLock objects.

wait_for(i) will wait until the value of all CounterLock objects is >= i.

See also

CounterLock for managing individual counters.

__init__(self, counter_locks: Iterable[CounterLock]) None

Initializes the CounterLockCluster with a collection of CounterLock objects.

Parameters:

counter_locks (Iterable[CounterLock]) – The CounterLock objects to manage.

Return type:

None

Examples

>>> lock1 = CounterLock(start_value=0)
>>> lock2 = CounterLock(start_value=0)
>>> cluster = CounterLockCluster([lock1, lock2])
async wait_for(self, int value: int) bint

Waits until the value of all CounterLock objects in the cluster reaches or exceeds the specified value.

Parameters:

value (int) – The value to wait for.

Return type:

bint

Examples

>>> lock1 = CounterLock(start_value=0)
>>> lock2 = CounterLock(start_value=0)
>>> cluster = CounterLockCluster([lock1, lock2])
>>> await cluster.wait_for(5)  # This will block until all locks have value >= 5
locks

a_sync.primitives.locks.event module

This module provides an enhanced version of asyncio.Event with additional debug logging to help detect deadlocks.

class a_sync.primitives.locks.event.CythonEvent

Bases: _DebugDaemonMixin

CythonEvent(unicode name: str = u’’, int debug_daemon_interval: int = 300, loop: Optional[asyncio.AbstractEventLoop] = None, *)

An asyncio.Event with additional debug logging to help detect deadlocks.

This event class extends asyncio.Event by adding debug logging capabilities. It logs detailed information about the event state and waiters, which can be useful for diagnosing and debugging potential deadlocks.

__init__()

Initializes the Event.

Parameters:
  • name (str) – An optional name for the event, used in debug logs.

  • debug_daemon_interval (int) – The interval in seconds for the debug daemon to log information.

  • loop (Optional[asyncio.AbstractEventLoop]) – The event loop to use.

async _debug_daemon(self) None

Periodically logs debug information about the event state and waiters.

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

clear(self) void

Reset the internal flag to false. Subsequently, coroutines calling wait() will block until set() is called to set the internal flag to true again.

is_set(self) bool

Return True if and only if the internal flag is true.

set(self) void

Set the internal flag to true. All coroutines waiting for it to become true are awakened. Coroutine that call wait() once the flag is true will not block at all.

wait(self)

Block until the internal flag is true.

If the internal flag is true on entry, return True immediately. Otherwise, block until another coroutine calls set() to set the flag to true, then return True.

Returns:

True when the event is set.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_name

str

Type:

CythonEvent._name

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

async a_sync.primitives.locks.event._return_true()

a_sync.primitives.locks.prio_semaphore module

This module provides priority-based semaphore implementations. These semaphores allow waiters to be assigned priorities, ensuring that higher priority waiters are processed before lower priority ones.

class a_sync.primitives.locks.prio_semaphore.Priority

Bases: Protocol

__init__(*args, **kwargs)
class a_sync.primitives.locks.prio_semaphore.PrioritySemaphore

Bases: _AbstractPrioritySemaphore

PrioritySemaphore(int value: int = 1, name: Optional[str] = None, *) -> None Semaphore that uses numeric priorities for waiters.

This class extends _AbstractPrioritySemaphore and provides a concrete implementation using numeric priorities. The _context_manager_class is set to _PrioritySemaphoreContextManager, and the _top_priority is set to -1, which is the highest priority.

Examples:

The primary way to use this semaphore is by specifying a priority.

>>> priority_semaphore = PrioritySemaphore(10)
>>> async with priority_semaphore[priority]:
...     await do_stuff()

You can also enter and exit this semaphore without specifying a priority, and it will use the top priority by default:

>>> priority_semaphore = PrioritySemaphore(10)
>>> async with priority_semaphore:
...     await do_stuff()
See Also:

_AbstractPrioritySemaphore for the base class implementation.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__getitem__()

Gets the context manager for a given priority.

Parameters:

priority – The priority for which to get the context manager. If None, uses the top priority.

Returns:

The context manager associated with the given priority.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = semaphore[priority]
__init__(*args, **kwargs)
async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) None

Wakes up the next waiter in line.

This method handles the waking of waiters based on priority. It includes an emergency procedure to handle potential lost waiters, ensuring that no waiter is left indefinitely waiting.

The emergency procedure is a temporary measure to address potential issues with lost waiters.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore._wake_up_next()
Return type:

None

async acquire(self) Literal[True]

Acquires the semaphore with the top priority.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> await semaphore.acquire()
Return type:

Literal[True]

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Checks if the semaphore is locked.

Returns:

True if the semaphore cannot be acquired immediately, False otherwise.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore.locked()
release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.primitives.locks.prio_semaphore._AbstractPrioritySemaphore

Bases: Semaphore

_AbstractPrioritySemaphore(context_manager_class: Type[_AbstractPrioritySemaphoreContextManager], top_priority: object, int value: int = 1, name: Optional[str] = None, *) -> None

A semaphore that allows prioritization of waiters.

This semaphore manages waiters with associated priorities, ensuring that waiters with higher priorities are processed before those with lower priorities. Subclasses must define the _top_priority attribute to specify the default top priority behavior.

The _context_manager_class attribute should specify the class used for managing semaphore contexts.

See also

PrioritySemaphore for an implementation using numeric priorities.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__getitem__()

Gets the context manager for a given priority.

Parameters:

priority – The priority for which to get the context manager. If None, uses the top priority.

Returns:

The context manager associated with the given priority.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = semaphore[priority]
__init__()

Initializes the priority semaphore.

Parameters:
  • value – The initial capacity of the semaphore.

  • name – An optional name for the semaphore, used for debugging.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5, name="test_semaphore")
async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) None

Wakes up the next waiter in line.

This method handles the waking of waiters based on priority. It includes an emergency procedure to handle potential lost waiters, ensuring that no waiter is left indefinitely waiting.

The emergency procedure is a temporary measure to address potential issues with lost waiters.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore._wake_up_next()
Return type:

None

async acquire(self) Literal[True]

Acquires the semaphore with the top priority.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> await semaphore.acquire()
Return type:

Literal[True]

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Checks if the semaphore is locked.

Returns:

True if the semaphore cannot be acquired immediately, False otherwise.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore.locked()
release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.primitives.locks.prio_semaphore._AbstractPrioritySemaphoreContextManager

Bases: Semaphore

_AbstractPrioritySemaphoreContextManager(_AbstractPrioritySemaphore parent: _AbstractPrioritySemaphore, priority: PT, name: Optional[str] = None) -> None

A context manager for priority semaphore waiters.

This context manager is associated with a specific priority and handles the acquisition and release of the semaphore for waiters with that priority.

async _AbstractPrioritySemaphoreContextManager__acquire()

_AbstractPrioritySemaphoreContextManager.__acquire(self) -> Literal[True]

Return type:

Literal[True]

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initializes the context manager for a specific priority.

Parameters:
  • parent – The parent semaphore.

  • priority – The priority associated with this context manager.

  • name – An optional name for the context manager, used for debugging.

Examples

>>> parent_semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent_semaphore, priority=1)
async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_repr_no_parent_(self) unicode

Returns a string representation of the context manager without the parent.

_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquires the semaphore for this context manager.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent, priority=1)
>>> await context_manager.acquire()
decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Releases the semaphore for this context manager.

This method overrides Semaphore.release() to handle priority-based logic.

Examples

>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent, priority=1)
>>> context_manager.release()
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.primitives.locks.prio_semaphore._PrioritySemaphoreContextManager

Bases: _AbstractPrioritySemaphoreContextManager

Context manager for numeric priority semaphores.

async _AbstractPrioritySemaphoreContextManager__acquire()

_AbstractPrioritySemaphoreContextManager.__acquire(self) -> Literal[True]

Return type:

Literal[True]

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initializes the context manager for a specific priority.

Parameters:
  • parent – The parent semaphore.

  • priority – The priority associated with this context manager.

  • name – An optional name for the context manager, used for debugging.

Examples

>>> parent_semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent_semaphore, priority=1)
async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_repr_no_parent_(self) unicode

Returns a string representation of the context manager without the parent.

_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquires the semaphore for this context manager.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent, priority=1)
>>> await context_manager.acquire()
decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Releases the semaphore for this context manager.

This method overrides Semaphore.release() to handle priority-based logic.

Examples

>>> context_manager = _AbstractPrioritySemaphoreContextManager(parent, priority=1)
>>> context_manager.release()
_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

a_sync.primitives.locks.semaphore module

This module provides various semaphore implementations, including a debug-enabled semaphore, a dummy semaphore that does nothing, and a threadsafe semaphore for use in multi-threaded applications.

class a_sync.primitives.locks.semaphore.DummySemaphore

Bases: Semaphore

DummySemaphore(name: Optional[str] = None)

A dummy semaphore that implements the standard asyncio.Semaphore API but does nothing.

This class is useful for scenarios where a semaphore interface is required but no actual synchronization is needed.

Example

dummy_semaphore = DummySemaphore()

async def no_op():
async with dummy_semaphore:

return 1

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the dummy semaphore with an optional name.

Parameters:

name (optional) – An optional name for the dummy semaphore.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

async acquire(self) Literal[True]

Acquire the dummy semaphore, which is a no-op.

Return type:

Literal[True]

async c_acquire(self) Literal[True]
Return type:

Literal[True]

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

No-op release method.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.primitives.locks.semaphore.Semaphore

Bases: _DebugDaemonMixin

Semaphore(int value: int = 1, unicode name=u’’, loop=None) -> None

A semaphore with additional debugging capabilities inherited from _DebugDaemonMixin.

This semaphore includes debug logging capabilities that are activated when the semaphore has waiters. It allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

You can write this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

See also

_DebugDaemonMixin for more details on debugging capabilities.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.

Returns:

True when the semaphore is successfully acquired.

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.primitives.locks.semaphore.ThreadsafeSemaphore

Bases: Semaphore

ThreadsafeSemaphore(value: Optional[int], name: Optional[str] = None) -> None

A semaphore that works in a multi-threaded environment.

This semaphore ensures that the program functions correctly even when used with multiple event loops. It provides a workaround for edge cases involving multiple threads and event loops by using a separate semaphore for each thread.

Example

semaphore = ThreadsafeSemaphore(5)

async def limited():
async with semaphore:

return 1

See also

Semaphore for the base class implementation.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the threadsafe semaphore with a given value and optional name.

Parameters:
  • value – The initial value for the semaphore, should be an integer.

  • name (optional) – An optional name for the semaphore.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.

Returns:

True when the semaphore is successfully acquired.

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

semaphore

Semaphore

Returns the appropriate semaphore for the current thread.

NOTE: We can’t cache this property because we need to check the current thread every time we access it.

Example

semaphore = ThreadsafeSemaphore(5)

async def limited():
async with semaphore.semaphore:

return 1

Type:

ThreadsafeSemaphore.semaphore

Module contents

class a_sync.primitives.locks.CounterLock

Bases: _DebugDaemonMixin

CounterLock(int start_value: int = 0, unicode name=u’’)

An async primitive that uses an internal counter to manage task synchronization.

A coroutine can await counter.wait_for(3) and it will wait until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will proceed as 5 >= 3.

The internal counter can only be set to a value greater than the current value.

See also

CounterLockCluster for managing multiple CounterLock instances.

__init__()

Initializes the CounterLock with a starting value and an optional name.

Parameters:
  • start_value – The initial value of the counter.

  • name – An optional name for the counter, used in debug logs.

Examples

>>> counter = CounterLock(start_value=0, name="example_counter")
>>> counter.value
0
async _debug_daemon(self) None

Periodically logs debug information about the counter state and waiters.

This method is used internally to provide debugging information when debug logging is enabled.

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

is_ready(self, long long v) bool

A function that indicates whether the current counter value is greater than or equal to a given value.

set(self, long long value) void

Sets the counter to the specified value.

This method internally uses the value property to enforce that the new value must be strictly greater than the current value.

Parameters:

value – The value to set the counter to. Must be strictly greater than the current value.

Raises:

ValueError – If the new value is less than or equal to the current value.

Examples

>>> counter = CounterLock(start_value=0)
>>> counter.set(5)
>>> counter.value
5

See also

CounterLock.value() for direct value assignment.

async wait_for(self, long long value) bint

Waits until the counter reaches or exceeds the specified value.

This method will ensure the debug daemon is running if the counter is not ready.

Parameters:

value – The value to wait for.

Return type:

bint

Examples

>>> counter = CounterLock(start_value=0)
>>> await counter.wait_for(5)  # This will block until counter.value >= 5

See also

CounterLock.set() to set the counter value.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_name

str

Type:

CounterLock._name

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

value

int

Gets the current value of the counter.

Examples

>>> counter = CounterLock(start_value=0)
>>> counter.value
0
Type:

CounterLock.value

class a_sync.primitives.locks.DummySemaphore

Bases: Semaphore

DummySemaphore(name: Optional[str] = None)

A dummy semaphore that implements the standard asyncio.Semaphore API but does nothing.

This class is useful for scenarios where a semaphore interface is required but no actual synchronization is needed.

Example

dummy_semaphore = DummySemaphore()

async def no_op():
async with dummy_semaphore:

return 1

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the dummy semaphore with an optional name.

Parameters:

name (optional) – An optional name for the dummy semaphore.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

async acquire(self) Literal[True]

Acquire the dummy semaphore, which is a no-op.

Return type:

Literal[True]

async c_acquire(self) Literal[True]
Return type:

Literal[True]

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

No-op release method.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

a_sync.primitives.locks.Event

alias of CythonEvent

class a_sync.primitives.locks.PrioritySemaphore

Bases: _AbstractPrioritySemaphore

PrioritySemaphore(int value: int = 1, name: Optional[str] = None, *) -> None Semaphore that uses numeric priorities for waiters.

This class extends _AbstractPrioritySemaphore and provides a concrete implementation using numeric priorities. The _context_manager_class is set to _PrioritySemaphoreContextManager, and the _top_priority is set to -1, which is the highest priority.

Examples:

The primary way to use this semaphore is by specifying a priority.

>>> priority_semaphore = PrioritySemaphore(10)
>>> async with priority_semaphore[priority]:
...     await do_stuff()

You can also enter and exit this semaphore without specifying a priority, and it will use the top priority by default:

>>> priority_semaphore = PrioritySemaphore(10)
>>> async with priority_semaphore:
...     await do_stuff()
See Also:

_AbstractPrioritySemaphore for the base class implementation.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__getitem__()

Gets the context manager for a given priority.

Parameters:

priority – The priority for which to get the context manager. If None, uses the top priority.

Returns:

The context manager associated with the given priority.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = semaphore[priority]
__init__(*args, **kwargs)
async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) None

Wakes up the next waiter in line.

This method handles the waking of waiters based on priority. It includes an emergency procedure to handle potential lost waiters, ensuring that no waiter is left indefinitely waiting.

The emergency procedure is a temporary measure to address potential issues with lost waiters.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore._wake_up_next()
Return type:

None

async acquire(self) Literal[True]

Acquires the semaphore with the top priority.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> await semaphore.acquire()
Return type:

Literal[True]

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Checks if the semaphore is locked.

Returns:

True if the semaphore cannot be acquired immediately, False otherwise.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore.locked()
release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.primitives.locks.Semaphore

Bases: _DebugDaemonMixin

Semaphore(int value: int = 1, unicode name=u’’, loop=None) -> None

A semaphore with additional debugging capabilities inherited from _DebugDaemonMixin.

This semaphore includes debug logging capabilities that are activated when the semaphore has waiters. It allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

You can write this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

See also

_DebugDaemonMixin for more details on debugging capabilities.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.

Returns:

True when the semaphore is successfully acquired.

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class a_sync.primitives.locks.ThreadsafeSemaphore

Bases: Semaphore

ThreadsafeSemaphore(value: Optional[int], name: Optional[str] = None) -> None

A semaphore that works in a multi-threaded environment.

This semaphore ensures that the program functions correctly even when used with multiple event loops. It provides a workaround for edge cases involving multiple threads and event loops by using a separate semaphore for each thread.

Example

semaphore = ThreadsafeSemaphore(5)

async def limited():
async with semaphore:

return 1

See also

Semaphore for the base class implementation.

__call__()

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__init__()

Initialize the threadsafe semaphore with a given value and optional name.

Parameters:
  • value – The initial value for the semaphore, should be an integer.

  • name (optional) – An optional name for the semaphore.

async _debug_daemon(self) None

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

This method is part of the _DebugDaemonMixin and is used to provide detailed logging information about the semaphore’s state when it is being waited on.

Example

semaphore = Semaphore(5)

async def monitor():

await semaphore._debug_daemon()

Return type:

None

_ensure_debug_daemon(self, *args, **kwargs) None

Ensures that the debug daemon task is running.

This method checks if the debug daemon is already running and starts it if necessary. If debug logging is not enabled, it sets the daemon to a dummy future.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

None

Examples

Ensuring the debug daemon is running:

my_instance = MyDebugClass()
my_instance._ensure_debug_daemon()

See also

_start_debug_daemon() for starting the daemon.

_get_loop(self)
_start_debug_daemon(self, *args, **kwargs) 'asyncio.Future[None]'

Starts the debug daemon task if debug logging is enabled and the event loop is running.

This method checks if debug logging is enabled and if the event loop is running. If both conditions are met, it starts the debug daemon task.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task as an asyncio.Task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

Examples

Starting the debug daemon:

my_instance = MyDebugClass()
my_instance._start_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_stop_debug_daemon(self, t: asyncio.Task | None = None) None

Stops the debug daemon task.

This method cancels the debug daemon task if it is running. Raises a ValueError if the task to be stopped is not the current daemon.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

Examples

Stopping the debug daemon:

my_instance = MyDebugClass()
my_instance._stop_debug_daemon()

See also

_ensure_debug_daemon() for ensuring the daemon is running.

_wake_up_next(self) void

Wake up the first waiter that isn’t done.

acquire(self)

Acquire the semaphore, ensuring that debug logging is enabled if there are waiters.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

If the semaphore value is zero or less, the debug daemon is started to log the state of the semaphore.

Returns:

True when the semaphore is successfully acquired.

decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Returns True if semaphore cannot be acquired immediately.

release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_loop

asyncio.AbstractEventLoop

Type:

_LoopBoundMixin._loop

_value

int

Type:

Semaphore._value

_waiters

List[Future]

Type:

Semaphore._waiters

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

semaphore

Semaphore

Returns the appropriate semaphore for the current thread.

NOTE: We can’t cache this property because we need to check the current thread every time we access it.

Example

semaphore = ThreadsafeSemaphore(5)

async def limited():
async with semaphore.semaphore:

return 1

Type:

ThreadsafeSemaphore.semaphore