a_sync.a_sync.modifiers package

Subpackages

Submodules

a_sync.a_sync.modifiers.limiter module

a_sync.a_sync.modifiers.limiter.apply_rate_limit(coro_fn=None, runs_per_minute=None)[source]

Applies a rate limit to an asynchronous function.

This function can be used as a decorator to limit the number of times an asynchronous function can be called per minute. It can be configured with either an integer specifying the number of runs per minute or an aiolimiter.AsyncLimiter instance.

Parameters:
  • coro_fn (Callable[[~P], Awaitable[T]] | int | None) – The coroutine function to be rate-limited. If an integer is provided, it is treated as runs per minute, and runs_per_minute should be None.

  • runs_per_minute (int | AsyncLimiter | None) – The number of allowed executions per minute or an aiolimiter.AsyncLimiter instance. If coro_fn is an integer, this should be None.

Raises:
  • TypeError – If runs_per_minute is neither an integer nor an aiolimiter.AsyncLimiter when coro_fn is None.

  • exceptions.FunctionNotAsync – If coro_fn is not an asynchronous function.

Return type:

Callable[[Callable[[~P], Awaitable[T]]], Callable[[~P], Awaitable[T]]] | Callable[[~P], Awaitable[T]]

Examples

Applying a rate limit of 60 executions per minute:

>>> @apply_rate_limit(60)
... async def my_function():
...     pass

Using an aiolimiter.AsyncLimiter instance:

>>> async_limiter = AsyncLimiter(60)
>>> @apply_rate_limit(async_limiter)
... async def my_function():
...     pass

Specifying the rate limit directly in the decorator:

>>> @apply_rate_limit
... async def my_function():
...     pass

See also

aiolimiter.AsyncLimiter

a_sync.a_sync.modifiers.manager module

class a_sync.a_sync.modifiers.manager.ModifierManager[source]

Bases: Dict[str, Any]

Manages modifiers for asynchronous and synchronous functions.

This class is responsible for applying modifiers to functions, such as caching, rate limiting, and semaphores for asynchronous functions. It also handles synchronous functions, although no sync modifiers are currently implemented.

Examples

Creating a ModifierManager with specific modifiers:

>>> modifiers = ModifierKwargs(cache_type='memory', runs_per_minute=60)
>>> manager = ModifierManager(modifiers)

Applying modifiers to an asynchronous function:

>>> async def my_coro():
...     pass
>>> modified_coro = manager.apply_async_modifiers(my_coro)

Applying modifiers to a synchronous function (no sync modifiers applied):

>>> def my_function():
...     pass
>>> modified_function = manager.apply_sync_modifiers(my_function)
__getitem__(modifier_key)[source]

Gets the value of a modifier by key.

Parameters:

modifier_key (str) – The key of the modifier to retrieve.

Returns:

The value of the modifier.

Examples

>>> manager = ModifierManager(ModifierKwargs(cache_type='memory'))
>>> manager['cache_type']
'memory'
__init__(modifiers)[source]

Initializes the ModifierManager with the given modifiers.

Parameters:

modifiers (ModifierKwargs) – A dictionary of modifiers to be applied.

Raises:

ValueError – If an unsupported modifier is provided.

Return type:

None

__iter__()[source]

Returns an iterator over the modifier keys.

Examples

>>> manager = ModifierManager(ModifierKwargs(cache_type='memory'))
>>> list(iter(manager))
['cache_type']
Return type:

Iterator[str]

apply_async_modifiers(coro_fn)[source]

Applies asynchronous modifiers to a coroutine function.

Parameters:

coro_fn (Callable[[~P], Awaitable[T]]) – The coroutine function to modify.

Returns:

The modified coroutine function.

Return type:

Callable[[~P], Awaitable[T]]

Examples

>>> async def my_coro():
...     pass
>>> manager = ModifierManager(ModifierKwargs(runs_per_minute=60))
>>> modified_coro = manager.apply_async_modifiers(my_coro)
apply_sync_modifiers(function)[source]

Wraps a synchronous function.

Note

There are no sync modifiers at this time, but they will be added here for convenience.

Parameters:

function (Callable[[~P], T]) – The synchronous function to wrap.

Returns:

The wrapped synchronous function.

Return type:

Callable[[~P], T]

Examples

>>> def my_function():
...     pass
>>> manager = ModifierManager(ModifierKwargs())
>>> modified_function = manager.apply_sync_modifiers(my_function)
clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items()[source]

Returns the items of the modifiers.

Examples

>>> manager = ModifierManager(ModifierKwargs(cache_type='memory'))
>>> list(manager.items())
[('cache_type', 'memory')]
Return type:

ItemsView[str, Any]

keys()[source]

Returns the keys of the modifiers.

Examples

>>> manager = ModifierManager(ModifierKwargs(cache_type='memory'))
>>> list(manager.keys())
['cache_type']
Return type:

KeysView[str]

pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()[source]

Returns the values of the modifiers.

Examples

>>> manager = ModifierManager(ModifierKwargs(cache_type='memory'))
>>> list(manager.values())
['memory']
Return type:

ValuesView[Any]

_modifiers
cache_type: Literal['memory', None]
cache_typed: bool
default: Literal['sync', 'async', None]
executor: Executor

This is not applied like a typical modifier but is still passed through the library with them for convenience. The executor is used to run the sync function in an asynchronous context.

ram_cache_maxsize: int | None
ram_cache_ttl: int | None
runs_per_minute: int | None
semaphore: Semaphore | int | None
property use_cache: bool

Determines if caching should be used.

Examples

>>> manager = ModifierManager(ModifierKwargs(cache_type='memory'))
>>> manager.use_cache
True
property use_limiter: bool

Determines if a rate limiter should be used.

Examples

>>> manager = ModifierManager(ModifierKwargs(runs_per_minute=60))
>>> manager.use_limiter
True
property use_semaphore: bool

Determines if a semaphore should be used.

Examples

>>> manager = ModifierManager(ModifierKwargs(semaphore=SemaphoreSpec()))
>>> manager.use_semaphore
True

a_sync.a_sync.modifiers.semaphores module

a_sync.a_sync.modifiers.semaphores.apply_semaphore(coro_fn=None, semaphore=None)[source]

Apply a semaphore to a coroutine function or return a decorator.

This function can be used to apply a semaphore to a coroutine function either by passing the coroutine function and semaphore as arguments or by using the semaphore as a decorator. It raises exceptions if the inputs are not valid.

Parameters:
  • coro_fn (Optional[Callable]) – The coroutine function to which the semaphore will be applied, or None if the semaphore is to be used as a decorator.

  • semaphore (Union[int, asyncio.Semaphore, primitives.Semaphore]) – The semaphore to apply, which can be an integer, an asyncio.Semaphore, or a primitives.Semaphore.

Raises:
  • ValueError – If coro_fn is an integer or asyncio.Semaphore and semaphore is not None.

  • exceptions.FunctionNotAsync – If the provided function is not a coroutine.

  • TypeError – If the semaphore is not an integer, an asyncio.Semaphore, or a primitives.Semaphore.

Return type:

Callable[[Callable[[~P], Awaitable[T]]], Callable[[~P], Awaitable[T]]] | Callable[[~P], Awaitable[T]]

Examples

Using as a decorator: >>> @apply_semaphore(2) … async def limited_concurrent_function(): … pass

Applying directly to a function: >>> async def my_coroutine(): … pass >>> my_coroutine = apply_semaphore(my_coroutine, 3)

Handling invalid inputs: >>> try: … apply_semaphore(3, 2) … except ValueError as e: … print(e)

See also

Note

primitives.Semaphore is a subclass of asyncio.Semaphore. Therefore, when the documentation refers to asyncio.Semaphore, it also includes primitives.Semaphore and any other subclasses.

a_sync.a_sync.modifiers.semaphores.dummy_semaphore = <DummySemaphore name=None>

A dummy semaphore that does not enforce any concurrency limits.

Module contents

This file contains all logic for ez-a-sync’s “modifiers”.

Modifiers modify the behavior of ez-a-sync’s ASync objects in various ways.

Submodules:

cache: Handles caching mechanisms for async functions. limiter: Manages rate limiting for async functions. manager: Provides management of valid modifiers and their application. semaphores: Implements semaphore logic for controlling concurrency.

The modifiers available are: - cache_type: Specifies the type of cache to use, such as ‘memory’. - cache_typed: Determines if types are considered for cache keys. - ram_cache_maxsize: Sets the maximum size for the LRU cache. - ram_cache_ttl: Defines the time-to-live for items in the cache. - runs_per_minute: Sets a rate limit for function execution. - semaphore: Specifies a semaphore for controlling concurrency. - executor: Defines the executor for synchronous functions. This is not applied like the other modifiers but is used to manage the execution context for synchronous functions when they are called in an asynchronous manner.

a_sync.a_sync.modifiers.apply_class_defined_modifiers(attrs_from_metaclass)[source]

Applies class-defined modifiers to a dictionary of attributes.

This function modifies the input dictionary in place. If the ‘semaphore’ key is present and its value is an integer, it is converted to a ThreadsafeSemaphore. If the ‘runs_per_minute’ key is present and its value is an integer, it is converted to an AsyncLimiter. If these keys are not present or their values are not integers, the function will silently do nothing.

Parameters:

attrs_from_metaclass (dict) – A dictionary of attributes from a metaclass.

Examples

Applying modifiers to a dictionary:

>>> attrs = {'semaphore': 3, 'runs_per_minute': 60}
>>> apply_class_defined_modifiers(attrs)
>>> attrs['semaphore']
ThreadsafeSemaphore(3)
>>> attrs['runs_per_minute']
AsyncLimiter(60)

See also

  • a_sync.primitives.locks.ThreadsafeSemaphore

  • aiolimiter.AsyncLimiter

a_sync.a_sync.modifiers.get_modifiers_from(thing)[source]

Extracts valid modifiers from a given object, type, or dictionary.

Parameters:

thing (dict | type | object) – The source from which to extract modifiers. It can be a dictionary, a type, or an object.

Returns:

A ModifierKwargs object containing the valid modifiers extracted from the input.

Return type:

ModifierKwargs

Examples

Extracting modifiers from a class:

>>> class Example:
...     cache_type = 'memory'
...     runs_per_minute = 60
>>> get_modifiers_from(Example)
ModifierKwargs({'cache_type': 'memory', 'runs_per_minute': 60})

Extracting modifiers from a dictionary:

>>> modifiers_dict = {'cache_type': 'memory', 'semaphore': 5}
>>> get_modifiers_from(modifiers_dict)
ModifierKwargs({'cache_type': 'memory', 'semaphore': ThreadsafeSemaphore(5)})