a_sync.a_sync.modifiers package
Subpackages
- a_sync.a_sync.modifiers.cache package
- Submodules
- a_sync.a_sync.modifiers.cache.memory module
CacheKwargs
CacheKwargs.__getitem__()
CacheKwargs.__init__()
CacheKwargs.__iter__()
CacheKwargs.clear()
CacheKwargs.copy()
CacheKwargs.fromkeys()
CacheKwargs.get()
CacheKwargs.items()
CacheKwargs.keys()
CacheKwargs.pop()
CacheKwargs.popitem()
CacheKwargs.setdefault()
CacheKwargs.update()
CacheKwargs.values()
CacheKwargs.maxsize
CacheKwargs.ttl
CacheKwargs.typed
apply_async_memory_cache()
- Module contents
CacheArgs
CacheArgs.__getitem__()
CacheArgs.__init__()
CacheArgs.__iter__()
CacheArgs.clear()
CacheArgs.copy()
CacheArgs.fromkeys()
CacheArgs.get()
CacheArgs.items()
CacheArgs.keys()
CacheArgs.pop()
CacheArgs.popitem()
CacheArgs.setdefault()
CacheArgs.update()
CacheArgs.values()
CacheArgs.cache_type
CacheArgs.cache_typed
CacheArgs.ram_cache_maxsize
CacheArgs.ram_cache_ttl
apply_async_cache()
Submodules
a_sync.a_sync.modifiers.limiter module
- a_sync.a_sync.modifiers.limiter.apply_rate_limit(coro_fn=None, runs_per_minute=None)[source]
Applies a rate limit to an asynchronous function.
This function can be used as a decorator to limit the number of times an asynchronous function can be called per minute. It can be configured with either an integer specifying the number of runs per minute or an
aiolimiter.AsyncLimiter
instance.- Parameters:
coro_fn (Callable[[~P], Awaitable[T]] | int | None) – The coroutine function to be rate-limited. If an integer is provided, it is treated as runs per minute, and runs_per_minute should be None.
runs_per_minute (int | AsyncLimiter | None) – The number of allowed executions per minute or an
aiolimiter.AsyncLimiter
instance. If coro_fn is an integer, this should be None.
- Raises:
TypeError – If runs_per_minute is neither an integer nor an
aiolimiter.AsyncLimiter
when coro_fn is None.exceptions.FunctionNotAsync – If coro_fn is not an asynchronous function.
- Return type:
Callable[[Callable[[~P], Awaitable[T]]], Callable[[~P], Awaitable[T]]] | Callable[[~P], Awaitable[T]]
Examples
Applying a rate limit of 60 executions per minute:
>>> @apply_rate_limit(60) ... async def my_function(): ... pass
Using an
aiolimiter.AsyncLimiter
instance:>>> async_limiter = AsyncLimiter(60) >>> @apply_rate_limit(async_limiter) ... async def my_function(): ... pass
Specifying the rate limit directly in the decorator:
>>> @apply_rate_limit ... async def my_function(): ... pass
See also
aiolimiter.AsyncLimiter
a_sync.a_sync.modifiers.manager module
- class a_sync.a_sync.modifiers.manager.ModifierManager
-
Manages modifiers for asynchronous and synchronous functions.
This class is responsible for applying modifiers to functions, such as caching, rate limiting, and semaphores for asynchronous functions. It also handles synchronous functions, although no sync modifiers are currently implemented.
Examples
Creating a ModifierManager with specific modifiers:
>>> modifiers = ModifierKwargs(cache_type='memory', runs_per_minute=60) >>> manager = ModifierManager(modifiers)
Applying modifiers to an asynchronous function:
>>> async def my_coro(): ... pass >>> modified_coro = manager.apply_async_modifiers(my_coro)
Applying modifiers to a synchronous function (no sync modifiers applied):
>>> def my_function(): ... pass >>> modified_function = manager.apply_sync_modifiers(my_function)
See also
- __getitem__(self, unicode modifier_key) Any
Gets the value of a modifier by key.
- Parameters:
modifier_key – The key of the modifier to retrieve.
- Returns:
The value of the modifier.
- Return type:
Examples
>>> manager = ModifierManager(ModifierKwargs(cache_type='memory')) >>> manager['cache_type'] 'memory'
- __init__(self, modifiers: ModifierKwargs) None
Initializes the ModifierManager with the given modifiers.
- Parameters:
modifiers (ModifierKwargs) – A dictionary of modifiers to be applied.
- Raises:
ValueError – If an unsupported modifier is provided.
- Return type:
None
- __iter__(self) Iterator[str]
Returns an iterator over the modifier keys.
Examples
>>> manager = ModifierManager(ModifierKwargs(cache_type='memory')) >>> list(iter(manager)) ['cache_type']
- apply_async_modifiers(self, coro_fn: CoroFn[P, T]) CoroFn[P, T]
Applies asynchronous modifiers to a coroutine function.
- Parameters:
coro_fn (Callable[[~P], Awaitable[T]]) – The coroutine function to modify.
- Returns:
The modified coroutine function.
- Return type:
Examples
>>> async def my_coro(): ... pass >>> manager = ModifierManager(ModifierKwargs(runs_per_minute=60)) >>> modified_coro = manager.apply_async_modifiers(my_coro)
- apply_sync_modifiers(self, function: SyncFn[P, T]) SyncFn[P, T]
Wraps a synchronous function.
Note
There are no sync modifiers at this time, but they will be added here for convenience.
- Parameters:
function (Callable[[~P], T]) – The synchronous function to wrap.
- Returns:
The wrapped synchronous function.
- Return type:
Callable[[~P], T]
Examples
>>> def my_function(): ... pass >>> manager = ModifierManager(ModifierKwargs()) >>> modified_function = manager.apply_sync_modifiers(my_function)
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- fromkeys(value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- items(self) ItemsView[str, Any]
Returns the items of the modifiers.
Examples
>>> manager = ModifierManager(ModifierKwargs(cache_type='memory')) >>> list(manager.items()) [('cache_type', 'memory')]
- keys(self) KeysView[str]
Returns the keys of the modifiers.
Examples
>>> manager = ModifierManager(ModifierKwargs(cache_type='memory')) >>> list(manager.keys()) ['cache_type']
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values(self) ValuesView[Any]
Returns the values of the modifiers.
Examples
>>> manager = ModifierManager(ModifierKwargs(cache_type='memory')) >>> list(manager.values()) ['memory']
- Return type:
- _modifiers
- executor: Executor
- property use_cache: bint
Determines if caching should be used.
Examples
>>> manager = ModifierManager(ModifierKwargs(cache_type='memory')) >>> manager.use_cache True
- property use_limiter: bint
Determines if a rate limiter should be used.
Examples
>>> manager = ModifierManager(ModifierKwargs(runs_per_minute=60)) >>> manager.use_limiter True
- property use_semaphore: bint
Determines if a semaphore should be used.
Examples
>>> manager = ModifierManager(ModifierKwargs(semaphore=SemaphoreSpec())) >>> manager.use_semaphore True
a_sync.a_sync.modifiers.semaphores module
- a_sync.a_sync.modifiers.semaphores.apply_semaphore(coro_fn=None, semaphore=None)[source]
Apply a semaphore to a coroutine function or return a decorator.
This function can be used to apply a semaphore to a coroutine function either by passing the coroutine function and semaphore as arguments or by using the semaphore as a decorator. It raises exceptions if the inputs are not valid.
- Parameters:
coro_fn (Optional[Callable]) – The coroutine function to which the semaphore will be applied, or None if the semaphore is to be used as a decorator.
semaphore (Union[int, asyncio.Semaphore, primitives.Semaphore]) – The semaphore to apply, which can be an integer, an asyncio.Semaphore, or a primitives.Semaphore.
- Raises:
ValueError – If coro_fn is an integer or asyncio.Semaphore and semaphore is not None.
exceptions.FunctionNotAsync – If the provided function is not a coroutine.
TypeError – If the semaphore is not an integer, an asyncio.Semaphore, or a primitives.Semaphore.
- Return type:
Callable[[Callable[[~P], Awaitable[T]]], Callable[[~P], Awaitable[T]]] | Callable[[~P], Awaitable[T]]
Examples
Using as a decorator: >>> @apply_semaphore(2) … async def limited_concurrent_function(): … pass
Applying directly to a function: >>> async def my_coroutine(): … pass >>> my_coroutine = apply_semaphore(my_coroutine, 3)
Handling invalid inputs: >>> try: … apply_semaphore(3, 2) … except ValueError as e: … print(e)
See also
primitives.Semaphore
Note
primitives.Semaphore implements the same API as asyncio.Semaphore. Therefore, when the documentation refers to asyncio.Semaphore, it also includes primitives.Semaphore and any other implementations that conform to the same interface.
- a_sync.a_sync.modifiers.semaphores.dummy_semaphore = <DummySemaphore name=>
A dummy semaphore that does not enforce any concurrency limits.
Module contents
This file contains all logic for ez-a-sync’s “modifiers”.
Modifiers modify the behavior of ez-a-sync’s ASync objects in various ways.
- Submodules:
cache: Handles caching mechanisms for async functions. limiter: Manages rate limiting for async functions. manager: Provides management of valid modifiers and their application. semaphores: Implements semaphore logic for controlling concurrency.
The modifiers available are: - cache_type: Specifies the type of cache to use, such as ‘memory’. - cache_typed: Determines if types are considered for cache keys. - ram_cache_maxsize: Sets the maximum size for the LRU cache. - ram_cache_ttl: Defines the time-to-live for items in the cache. - runs_per_minute: Sets a rate limit for function execution. - semaphore: Specifies a semaphore for controlling concurrency. - executor: Defines the executor for synchronous functions. This is not applied like the other modifiers but is used to manage the execution context for synchronous functions when they are called in an asynchronous manner.
See also
- a_sync.a_sync.modifiers.apply_class_defined_modifiers(attrs_from_metaclass)[source]
Applies class-defined modifiers to a dictionary of attributes.
This function modifies the input dictionary in place. If the ‘semaphore’ key is present and its value is an integer, it is converted to a ThreadsafeSemaphore. If the ‘runs_per_minute’ key is present and its value is an integer, it is converted to an AsyncLimiter. If these keys are not present or their values are not integers, the function will silently do nothing.
- Parameters:
attrs_from_metaclass (dict) – A dictionary of attributes from a metaclass.
Examples
Applying modifiers to a dictionary:
>>> attrs = {'semaphore': 3, 'runs_per_minute': 60} >>> apply_class_defined_modifiers(attrs) >>> attrs['semaphore'] ThreadsafeSemaphore(3)
>>> attrs['runs_per_minute'] AsyncLimiter(60)
See also
aiolimiter.AsyncLimiter
- a_sync.a_sync.modifiers.get_modifiers_from(thing)[source]
Extracts valid modifiers from a given object, type, or dictionary.
- Parameters:
thing (dict | type | object) – The source from which to extract modifiers. It can be a dictionary, a type, or an object.
- Returns:
A ModifierKwargs object containing the valid modifiers extracted from the input.
- Return type:
Examples
Extracting modifiers from a class:
>>> class Example: ... cache_type = 'memory' ... runs_per_minute = 60 >>> get_modifiers_from(Example) ModifierKwargs({'cache_type': 'memory', 'runs_per_minute': 60})
Extracting modifiers from a dictionary:
>>> modifiers_dict = {'cache_type': 'memory', 'semaphore': 5} >>> get_modifiers_from(modifiers_dict) ModifierKwargs({'cache_type': 'memory', 'semaphore': ThreadsafeSemaphore(5)})