a_sync.primitives.locks package
Submodules
a_sync.primitives.locks.counter module
This module provides two specialized async flow management classes, CounterLock and CounterLockCluster.
These primitives manages asyncio.Task
objects that must wait for an internal counter to reach a specific value.
- class a_sync.primitives.locks.counter.CounterLock[source]
Bases:
_DebugDaemonMixin
An async primitive that blocks until the internal counter has reached a specific value.
A coroutine can await counter.wait_for(3) and it will block until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will unblock as 5 >= 3.
The internal counter can only increase.
- __init__(start_value=0, name=None)[source]
Initializes the CounterLock with a starting value and an optional name.
- Parameters:
start_value (int) – The initial value of the counter.
name (optional) – An optional name for the counter, used in debug logs.
- async _debug_daemon()[source]
Periodically logs debug information about the counter state and waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- set(value)[source]
Sets the counter to the specified value.
- Parameters:
value (int) – The value to set the counter to. Must be >= the current value.
- Raises:
ValueError – If the new value is less than the current value.
- Return type:
None
- _daemon
- _events: DefaultDict[int, Event]
A defaultdict that maps each awaited value to an
asyncio.Event
that manages the waiters for that value.
- _name
An optional name for the counter, used in debug logs.
- _value
The current value of the counter.
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- is_ready
A lambda function that indicates whether a given value has already been surpassed.
- class a_sync.primitives.locks.counter.CounterLockCluster[source]
Bases:
object
An asyncio primitive that represents 2 or more CounterLock objects.
wait_for(i) will block until the value of all CounterLock objects is >= i.
- __init__(counter_locks)[source]
Initializes the CounterLockCluster with a collection of CounterLock objects.
- Parameters:
counter_locks (Iterable[CounterLock]) – The CounterLock objects to manage.
- Return type:
None
- async wait_for(value)[source]
Waits until the value of all CounterLock objects in the cluster reaches or exceeds the specified value.
- locks
a_sync.primitives.locks.event module
This module provides an enhanced version of asyncio.Event with additional debug logging to help detect deadlocks.
- class a_sync.primitives.locks.event.Event[source]
Bases:
Event
,_DebugDaemonMixin
An asyncio.Event with additional debug logging to help detect deadlocks.
This event class extends asyncio.Event by adding debug logging capabilities. It logs detailed information about the event state and waiters, which can be useful for diagnosing and debugging potential deadlocks.
- __init__(name='', debug_daemon_interval=300, *, loop=None)[source]
Initializes the Event.
- Parameters:
name (str) – An optional name for the event, used in debug logs.
debug_daemon_interval (int) – The interval in seconds for the debug daemon to log information.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop to use.
- async _debug_daemon()[source]
Periodically logs debug information about the event state and waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- clear()[source]
Reset the internal flag to false. Subsequently, coroutines calling wait() will block until set() is called to set the internal flag to true again.
- set()[source]
Set the internal flag to true. All coroutines waiting for it to become true are awakened. Coroutine that call wait() once the flag is true will not block at all.
- async wait()[source]
Wait until the event is set.
- Returns:
True when the event is set.
- Return type:
Literal[True]
- _daemon
- _debug_daemon_interval
- _loop: AbstractEventLoop = None
a_sync.primitives.locks.prio_semaphore module
- class a_sync.primitives.locks.prio_semaphore.Priority[source]
Bases:
Protocol
- __init__(*args, **kwargs)
- class a_sync.primitives.locks.prio_semaphore.PrioritySemaphore[source]
Bases:
_AbstractPrioritySemaphore
[int
|float
|Decimal
,_PrioritySemaphoreContextManager
]- _context_manager_class
alias of
_PrioritySemaphoreContextManager
- __call__(fn)
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __getitem__(priority)
- Parameters:
priority (PT | None)
- Return type:
- __init__(value=1, *, name=None)
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- _wake_up_next()
Wake up the first waiter that isn’t done.
- Return type:
None
- async acquire()
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _capacity
- _context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
- _daemon
- _loop = None
- _top_priority = -1
It’s kinda like a regular Semaphore but you must give each waiter a priority:
``` priority_semaphore = PrioritySemaphore(10)
- async with priority_semaphore[priority]:
await do_stuff()
You can aenter and aexit this semaphore without a priority and it will process those first. Like so:
``` priority_semaphore = PrioritySemaphore(10)
- async with priority_semaphore:
await do_stuff()
- _waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- class a_sync.primitives.locks.prio_semaphore._AbstractPrioritySemaphore[source]
Bases:
Semaphore
,Generic
[PT
,CM
]- __call__(fn)
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(value=1, *, name=None)[source]
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- async acquire()[source]
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _capacity
- property _context_manager_class: Type[_AbstractPrioritySemaphoreContextManager[PT]]
- _context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
- _daemon
- _loop = None
- property _top_priority: PT
- _waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- class a_sync.primitives.locks.prio_semaphore._AbstractPrioritySemaphoreContextManager[source]
-
- __call__(fn)
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(parent, priority, name=None)[source]
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
parent (_AbstractPrioritySemaphore)
priority (PT)
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- _wake_up_next()
Wake up the first waiter that isn’t done.
- async acquire()[source]
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked()
Returns True if semaphore cannot be acquired immediately.
- release()[source]
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- Return type:
None
- _daemon
- _loop: AbstractEventLoop = None
- _parent
- _priority
- _value
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- property logger: Logger
Returns a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
- Returns:
A logger instance for the class.
- Return type:
Logger
- property loop: AbstractEventLoop
- name
- class a_sync.primitives.locks.prio_semaphore._PrioritySemaphoreContextManager[source]
Bases:
_AbstractPrioritySemaphoreContextManager
[int
|float
|Decimal
]- __call__(fn)
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(parent, priority, name=None)
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
parent (_AbstractPrioritySemaphore)
priority (PT)
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- _wake_up_next()
Wake up the first waiter that isn’t done.
- async acquire()
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked()
Returns True if semaphore cannot be acquired immediately.
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- Return type:
None
- _daemon
- _loop: AbstractEventLoop = None
- _parent
- _priority
- _priority_name = 'priority'
- _value
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- property logger: Logger
Returns a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
- Returns:
A logger instance for the class.
- Return type:
Logger
- property loop: AbstractEventLoop
- name
a_sync.primitives.locks.semaphore module
- class a_sync.primitives.locks.semaphore.DummySemaphore[source]
Bases:
Semaphore
A dummy semaphore that implements the standard
asyncio.Semaphore
API but does nothing.- _get_loop()
- _wake_up_next()
Wake up the first waiter that isn’t done.
- async acquire()[source]
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- locked()
Returns True if semaphore cannot be acquired immediately.
- release()[source]
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- Return type:
None
- _loop = None
- _value
- name
- class a_sync.primitives.locks.semaphore.Semaphore[source]
Bases:
Semaphore
,_DebugDaemonMixin
A semaphore with additional debugging capabilities.
This semaphore includes debug logging.
Also, it can be used to decorate coroutine functions so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __call__(fn)[source]
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(value, name=None, **kwargs)[source]
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()[source]
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- async acquire()[source]
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)[source]
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- release()[source]
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _daemon
- _loop = None
- _value
- _waiters
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- property logger: Logger
Returns a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
- Returns:
A logger instance for the class.
- Return type:
Logger
- name
- class a_sync.primitives.locks.semaphore.ThreadsafeSemaphore[source]
Bases:
Semaphore
While its a bit weird to run multiple event loops, sometimes either you or a lib you’re using must do so. When in use in threaded applications, this semaphore will not work as intended but at least your program will function. You may need to reduce the semaphore value for multi-threaded applications.
# TL;DR it’s a janky fix for an edge case problem and will otherwise function as a normal a_sync.Semaphore (which is just an asyncio.Semaphore with extra bells and whistles).
- __call__(fn)
Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __init__(value, name=None)[source]
Initialize the semaphore with a given value and optional name for debugging.
- Parameters:
value (int | None) – The initial value for the semaphore.
name (optional) – An optional name used only to provide useful context in debug logs.
- Return type:
None
- async _debug_daemon()
Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.
- Return type:
None
- _ensure_debug_daemon(*args, **kwargs)
Ensures that the debug daemon task is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
Either the debug daemon task or a dummy future if debug logging is not enabled.
- Return type:
Future[None]
- _get_loop()
- _start_debug_daemon(*args, **kwargs)
Starts the debug daemon task if debug logging is enabled and the event loop is running.
- Parameters:
*args – Positional arguments for the debug daemon.
**kwargs – Keyword arguments for the debug daemon.
- Returns:
The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.
- Return type:
Future[None]
- _stop_debug_daemon(t=None)
Stops the debug daemon task.
- Parameters:
t (optional) – The task to be stopped, if any.
- Raises:
ValueError – If t is not the current daemon.
- Return type:
None
- _wake_up_next()
Wake up the first waiter that isn’t done.
- async acquire()
Acquire a semaphore.
If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.
- Return type:
Literal[True]
- decorate(fn)
Wrap a coroutine function to ensure it runs with the semaphore.
Example
Now you can rewrite this pattern:
``` semaphore = Semaphore(5)
- async def limited():
- async with semaphore:
return 1
like this:
``` semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked()
Returns True if semaphore cannot be acquired immediately.
- release()
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- _daemon
- _loop = None
- _value
- _waiters
- property debug_logs_enabled: bool
Checks if debug logging is enabled for the logger.
- Returns:
True if debug logging is enabled, False otherwise.
- Return type:
- dummy
- property logger: Logger
Returns a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
- Returns:
A logger instance for the class.
- Return type:
Logger
- name
- property semaphore: Semaphore
Returns the appropriate semaphore for the current thread.
NOTE: We can’t cache this property because we need to check the current thread every time we access it.
- semaphores: DefaultDict[Thread, Semaphore]