a_sync.primitives.locks package

Submodules

a_sync.primitives.locks.counter module

This module provides two specialized async flow management classes, CounterLock and CounterLockCluster.

These primitives manages asyncio.Task objects that must wait for an internal counter to reach a specific value.

class a_sync.primitives.locks.counter.CounterLock[source]

Bases: _DebugDaemonMixin

An async primitive that blocks until the internal counter has reached a specific value.

A coroutine can await counter.wait_for(3) and it will block until the internal counter >= 3. If some other task executes counter.value = 5 or counter.set(5), the first coroutine will unblock as 5 >= 3.

The internal counter can only increase.

__init__(start_value=0, name=None)[source]

Initializes the CounterLock with a starting value and an optional name.

Parameters:
  • start_value (int) – The initial value of the counter.

  • name (optional) – An optional name for the counter, used in debug logs.

async _debug_daemon()[source]

Periodically logs debug information about the counter state and waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

set(value)[source]

Sets the counter to the specified value.

Parameters:

value (int) – The value to set the counter to. Must be >= the current value.

Raises:

ValueError – If the new value is less than the current value.

Return type:

None

async wait_for(value)[source]

Waits until the counter reaches or exceeds the specified value.

Parameters:

value (int) – The value to wait for.

Returns:

True when the counter reaches or exceeds the specified value.

Return type:

bool

_daemon
_events: DefaultDict[int, Event]

A defaultdict that maps each awaited value to an asyncio.Event that manages the waiters for that value.

_name

An optional name for the counter, used in debug logs.

_value

The current value of the counter.

property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

is_ready

A lambda function that indicates whether a given value has already been surpassed.

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

property value: int

Gets the current value of the counter.

Returns:

The current value of the counter.

class a_sync.primitives.locks.counter.CounterLockCluster[source]

Bases: object

An asyncio primitive that represents 2 or more CounterLock objects.

wait_for(i) will block until the value of all CounterLock objects is >= i.

__init__(counter_locks)[source]

Initializes the CounterLockCluster with a collection of CounterLock objects.

Parameters:

counter_locks (Iterable[CounterLock]) – The CounterLock objects to manage.

Return type:

None

async wait_for(value)[source]

Waits until the value of all CounterLock objects in the cluster reaches or exceeds the specified value.

Parameters:

value (int) – The value to wait for.

Returns:

True when the value of all CounterLock objects reach or exceed the specified value.

Return type:

bool

locks

a_sync.primitives.locks.event module

This module provides an enhanced version of asyncio.Event with additional debug logging to help detect deadlocks.

class a_sync.primitives.locks.event.Event[source]

Bases: Event, _DebugDaemonMixin

An asyncio.Event with additional debug logging to help detect deadlocks.

This event class extends asyncio.Event by adding debug logging capabilities. It logs detailed information about the event state and waiters, which can be useful for diagnosing and debugging potential deadlocks.

__init__(name='', debug_daemon_interval=300, *, loop=None)[source]

Initializes the Event.

Parameters:
  • name (str) – An optional name for the event, used in debug logs.

  • debug_daemon_interval (int) – The interval in seconds for the debug daemon to log information.

  • loop (Optional[asyncio.AbstractEventLoop]) – The event loop to use.

async _debug_daemon()[source]

Periodically logs debug information about the event state and waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

clear()[source]

Reset the internal flag to false. Subsequently, coroutines calling wait() will block until set() is called to set the internal flag to true again.

is_set()[source]

Return True if and only if the internal flag is true.

set()[source]

Set the internal flag to true. All coroutines waiting for it to become true are awakened. Coroutine that call wait() once the flag is true will not block at all.

async wait()[source]

Wait until the event is set.

Returns:

True when the event is set.

Return type:

Literal[True]

_daemon
_debug_daemon_interval
_loop: AbstractEventLoop = None
_value: bool
_waiters: Deque[Future[None]]
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

a_sync.primitives.locks.prio_semaphore module

class a_sync.primitives.locks.prio_semaphore.Priority[source]

Bases: Protocol

__init__(*args, **kwargs)
class a_sync.primitives.locks.prio_semaphore.PrioritySemaphore[source]

Bases: _AbstractPrioritySemaphore[int | float | Decimal, _PrioritySemaphoreContextManager]

_context_manager_class

alias of _PrioritySemaphoreContextManager

__call__(fn)

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__getitem__(priority)
Parameters:

priority (PT | None)

Return type:

_AbstractPrioritySemaphoreContextManager[PT]

__init__(value=1, *, name=None)

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value (int) – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

Return type:

None

_count_waiters()
Return type:

Dict[PT, int]

async _debug_daemon()

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

Return type:

None

async acquire()

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

Return type:

bool

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_capacity
_context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
_daemon
_decorated: Set[str]
_loop = None
_potential_lost_waiters: List[Future[None]]
_top_priority = -1

It’s kinda like a regular Semaphore but you must give each waiter a priority:

``` priority_semaphore = PrioritySemaphore(10)

async with priority_semaphore[priority]:

await do_stuff()

```

You can aenter and aexit this semaphore without a priority and it will process those first. Like so:

``` priority_semaphore = PrioritySemaphore(10)

async with priority_semaphore:

await do_stuff()

```

_value: int
_waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

name: str | None
class a_sync.primitives.locks.prio_semaphore._AbstractPrioritySemaphore[source]

Bases: Semaphore, Generic[PT, CM]

__call__(fn)

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__getitem__(priority)[source]
Parameters:

priority (PT | None)

Return type:

_AbstractPrioritySemaphoreContextManager[PT]

__init__(value=1, *, name=None)[source]

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value (int) – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

Return type:

None

_count_waiters()[source]
Return type:

Dict[PT, int]

async _debug_daemon()

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()[source]

Wake up the first waiter that isn’t done.

Return type:

None

async acquire()[source]

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()[source]

Returns True if semaphore cannot be acquired immediately.

Return type:

bool

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_capacity
property _context_manager_class: Type[_AbstractPrioritySemaphoreContextManager[PT]]
_context_managers: Dict[PT, _AbstractPrioritySemaphoreContextManager[PT]]
_daemon
_decorated: Set[str]
_loop = None
_potential_lost_waiters: List[Future[None]]
property _top_priority: PT
_value: int
_waiters: List[_AbstractPrioritySemaphoreContextManager[PT]]
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

name: str | None
class a_sync.primitives.locks.prio_semaphore._AbstractPrioritySemaphoreContextManager[source]

Bases: Semaphore, Generic[PT]

__call__(fn)

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(parent, priority, name=None)[source]

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

  • parent (_AbstractPrioritySemaphore)

  • priority (PT)

Return type:

None

async _debug_daemon()

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_repr_no_parent_()[source]
Return type:

str

_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

async acquire()[source]

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

release()[source]

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

Return type:

None

_daemon
_decorated: Set[str]
_loop: AbstractEventLoop = None
_parent
_priority
property _priority_name: str
_value
_waiters: Deque[Future]
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

property loop: AbstractEventLoop
name
property waiters: Deque[Future]
class a_sync.primitives.locks.prio_semaphore._PrioritySemaphoreContextManager[source]

Bases: _AbstractPrioritySemaphoreContextManager[int | float | Decimal]

__call__(fn)

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(parent, priority, name=None)

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

  • parent (_AbstractPrioritySemaphore)

  • priority (PT)

Return type:

None

async _debug_daemon()

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_repr_no_parent_()
Return type:

str

_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

async acquire()

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

Return type:

None

_daemon
_decorated: Set[str]
_loop: AbstractEventLoop = None
_parent
_priority
_priority_name = 'priority'
_value
_waiters: Deque[Future]
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

property loop: AbstractEventLoop
name
property waiters: Deque[Future]

a_sync.primitives.locks.semaphore module

class a_sync.primitives.locks.semaphore.DummySemaphore[source]

Bases: Semaphore

A dummy semaphore that implements the standard asyncio.Semaphore API but does nothing.

__init__(name=None)[source]
Parameters:

name (str | None)

_get_loop()
_wake_up_next()

Wake up the first waiter that isn’t done.

async acquire()[source]

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

locked()

Returns True if semaphore cannot be acquired immediately.

release()[source]

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

Return type:

None

_loop = None
_value
name
class a_sync.primitives.locks.semaphore.Semaphore[source]

Bases: Semaphore, _DebugDaemonMixin

A semaphore with additional debugging capabilities.

This semaphore includes debug logging.

Also, it can be used to decorate coroutine functions so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

__call__(fn)[source]

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(value, name=None, **kwargs)[source]

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value (int) – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

Return type:

None

async _debug_daemon()[source]

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()[source]

Wake up the first waiter that isn’t done.

async acquire()[source]

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)[source]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()[source]

Returns True if semaphore cannot be acquired immediately.

release()[source]

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_daemon
_decorated: Set[str]
_loop = None
_value
_waiters
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

name
class a_sync.primitives.locks.semaphore.ThreadsafeSemaphore[source]

Bases: Semaphore

While its a bit weird to run multiple event loops, sometimes either you or a lib you’re using must do so. When in use in threaded applications, this semaphore will not work as intended but at least your program will function. You may need to reduce the semaphore value for multi-threaded applications.

# TL;DR it’s a janky fix for an edge case problem and will otherwise function as a normal a_sync.Semaphore (which is just an asyncio.Semaphore with extra bells and whistles).

__call__(fn)

Convenient decorator method to wrap coroutine functions with the semaphore so you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

__init__(value, name=None)[source]

Initialize the semaphore with a given value and optional name for debugging.

Parameters:
  • value (int | None) – The initial value for the semaphore.

  • name (optional) – An optional name used only to provide useful context in debug logs.

Return type:

None

async _debug_daemon()

Daemon coroutine (runs in a background task) which will emit a debug log every minute while the semaphore has waiters.

Return type:

None

_ensure_debug_daemon(*args, **kwargs)

Ensures that the debug daemon task is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

Either the debug daemon task or a dummy future if debug logging is not enabled.

Return type:

Future[None]

_get_loop()
_start_debug_daemon(*args, **kwargs)

Starts the debug daemon task if debug logging is enabled and the event loop is running.

Parameters:
  • *args – Positional arguments for the debug daemon.

  • **kwargs – Keyword arguments for the debug daemon.

Returns:

The debug daemon task, or a dummy future if debug logs are not enabled or if the daemon cannot be created.

Return type:

Future[None]

_stop_debug_daemon(t=None)

Stops the debug daemon task.

Parameters:

t (optional) – The task to be stopped, if any.

Raises:

ValueError – If t is not the current daemon.

Return type:

None

_wake_up_next()

Wake up the first waiter that isn’t done.

async acquire()

Acquire a semaphore.

If the internal counter is larger than zero on entry, decrement it by one and return True immediately. If it is zero on entry, block, waiting until some other coroutine has called release() to make it larger than 0, and then return True.

Return type:

Literal[True]

decorate(fn)

Wrap a coroutine function to ensure it runs with the semaphore.

Example

Now you can rewrite this pattern:

``` semaphore = Semaphore(5)

async def limited():
async with semaphore:

return 1

```

like this:

``` semaphore = Semaphore(5)

@semaphore async def limited():

return 1

```

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked()

Returns True if semaphore cannot be acquired immediately.

release()

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

_daemon
_decorated: Set[str]
_loop = None
_value
_waiters
property debug_logs_enabled: bool

Checks if debug logging is enabled for the logger.

Returns:

True if debug logging is enabled, False otherwise.

Return type:

bool

dummy
property logger: Logger

Returns a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Returns:

A logger instance for the class.

Return type:

Logger

name
property semaphore: Semaphore

Returns the appropriate semaphore for the current thread.

NOTE: We can’t cache this property because we need to check the current thread every time we access it.

semaphores: DefaultDict[Thread, Semaphore]
property use_dummy: bool

Module contents