Source code for a_sync.future

# type: ignore [var-annotated]

"""
The `future.py` module provides functionality for handling asynchronous futures, 
including a decorator for converting callables into `ASyncFuture` objects and 
utilities for managing asynchronous computations.

Functions:
    future(callable: Union[Callable[P, Awaitable[T]], Callable[P, T]] = None, **kwargs: Unpack[ModifierKwargs]) -> Callable[P, Union[T, "ASyncFuture[T]"]]: 
        A decorator to convert a callable into an `ASyncFuture`, with optional modifiers.
    _gather_check_and_materialize(*things: Unpack[MaybeAwaitable[T]]) -> List[T]: 
        Gathers and materializes a list of awaitable or non-awaitable items.
    _check_and_materialize(thing: T) -> T: 
        Checks if an item is awaitable and materializes it.
    _materialize(meta: "ASyncFuture[T]") -> T: 
        Materializes the result of an `ASyncFuture`.

Classes:
    ASyncFuture: Represents an asynchronous future result.

TODO include a simple mathematics example a one complex example with numerous variables and operations
TODO include attribute access examples
TODO describe a bit more about both of the above 2 TODOs somewhere in this module-level docstring
TODO describe why a user would want to use these (to write cleaner code that doesn't require as many ugly gathers)
TODO include comparisons between the 'new way' with this future class and the 'old way' with gathers
"""

import asyncio
import concurrent.futures
from functools import partial, wraps
from inspect import isawaitable

from a_sync._typing import *


[docs] def future( callable: AnyFn[P, T] = None, **kwargs: Unpack[ModifierKwargs], ) -> Callable[P, Union[T, "ASyncFuture[T]"]]: """ A decorator function to convert a callable into an `ASyncFuture`, with optional modifiers. This function wraps the provided callable in an `_ASyncFutureWrappedFn` instance, which returns an `ASyncFuture` when called. The `ASyncFuture` allows the result of the callable to be awaited or accessed synchronously. Args: callable: The callable to convert. Defaults to None. **kwargs: Additional keyword arguments for the modifier. Returns: A callable that returns either the result or an `ASyncFuture`. Example: >>> @future ... async def async_function(): ... return 42 >>> result = async_function() >>> isinstance(result, ASyncFuture) True See Also: :class:`ASyncFuture` """ return _ASyncFutureWrappedFn(callable, **kwargs)
async def _gather_check_and_materialize(*things: Unpack[MaybeAwaitable[T]]) -> List[T]: """ Gathers and materializes a list of awaitable or non-awaitable items. This function takes a list of items that may be awaitable or not, and returns a list of their results. Awaitable items are awaited, while non-awaitable items are returned as-is. Args: *things: Items to gather and materialize. Example: >>> async def async_fn(x): ... return x >>> await _gather_check_and_materialize(async_fn(1), 2, async_fn(3)) [1, 2, 3] """ return await asyncio.gather(*[_check_and_materialize(thing) for thing in things]) async def _check_and_materialize(thing: T) -> T: """ Checks if an item is awaitable and materializes it. If the item is awaitable, it is awaited and the result is returned. Otherwise, the item is returned as-is. Args: thing: The item to check and materialize. Example: >>> async def async_fn(): ... return 42 >>> await _check_and_materialize(async_fn()) 42 """ return await thing if isawaitable(thing) else thing def _materialize(meta: "ASyncFuture[T]") -> T: """ Materializes the result of an `ASyncFuture`. This function attempts to run the event loop until the `ASyncFuture` is complete. If the event loop is already running, it raises a RuntimeError. Args: meta: The `ASyncFuture` to materialize. Raises: RuntimeError: If the event loop is running and the result cannot be awaited. Example: >>> future = ASyncFuture(asyncio.sleep(1, result=42)) >>> _materialize(future) 42 See Also: :class:`ASyncFuture` """ try: return asyncio.get_event_loop().run_until_complete(meta) except RuntimeError as e: raise RuntimeError( f"{meta} result is not set and the event loop is running, you will need to await it first" ) from e MetaNumeric = Union[ Numeric, "ASyncFuture[int]", "ASyncFuture[float]", "ASyncFuture[Decimal]" ]
[docs] @final class ASyncFuture(concurrent.futures.Future, Awaitable[T]): """ A class representing an asynchronous future result. Inherits from both `concurrent.futures.Future` and `Awaitable[T]`, allowing it to be used in both synchronous and asynchronous contexts. The `ASyncFuture` class provides additional functionality for arithmetic operations, comparisons, and conversions, making it versatile for various use cases. Example: >>> async def async_fn(): ... return 42 >>> future = ASyncFuture(async_fn()) >>> await future 42 Note: Arithmetic operations are implemented, allowing for mathematical operations on future results. You no longer have to choose between optimized async code and clean, readable code. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=5)) >>> future3 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future4 = ASyncFuture(asyncio.sleep(1, result=2)) >>> result = (future1 + future2) / future3 ** future4 >>> await result 0.15 Attribute Access: The `ASyncFuture` allows attribute access on the materialized result. Example: >>> class Example: ... def __init__(self, value): ... self.value = value >>> future = ASyncFuture(asyncio.sleep(1, result=Example(42))) >>> future.value 42 See Also: :func:`future` for creating `ASyncFuture` instances. """ __slots__ = "__awaitable__", "__dependencies", "__dependants", "__task"
[docs] def __init__( self, awaitable: Awaitable[T], dependencies: List["ASyncFuture"] = [] ) -> None: # sourcery skip: default-mutable-arg """ Initializes an `ASyncFuture` with an awaitable and optional dependencies. Args: awaitable: The awaitable object. dependencies: A list of dependencies. Defaults to []. Example: >>> async def async_fn(): ... return 42 >>> future = ASyncFuture(async_fn()) >>> await future 42 """ self.__awaitable__ = awaitable """The awaitable object.""" self.__dependencies = dependencies """A list of dependencies.""" for dependency in dependencies: assert isinstance(dependency, ASyncFuture) dependency.__dependants.append(self) self.__dependants: List[ASyncFuture] = [] """A list of dependants.""" self.__task = None """The task associated with the awaitable.""" super().__init__()
def __hash__(self) -> int: return hash(self.__awaitable__) def __repr__(self) -> str: string = f"<{self.__class__.__name__} {self._state} for {self.__awaitable__}" if self.cancelled(): pass elif self.done(): string += ( f" exception={self.exception()}" if self.exception() else f" result={super().result()}" ) return f"{string}>" def __list_dependencies(self, other) -> List["ASyncFuture"]: """ Lists dependencies for the `ASyncFuture`. Args: other: The other dependency to list. Returns: A list of dependencies including the current and other `ASyncFuture`. """ return [self, other] if isinstance(other, ASyncFuture) else [self] @property def result(self) -> Union[Callable[[], T], Any]: # sourcery skip: assign-if-exp, reintroduce-else """ If this future is not done, it will work like `cf.Future.result`. It will block, await the awaitable, and return the result when ready. If this future is done and the result has attribute `result`, will return `getattr(future_result, 'result')` If this future is done and the result does NOT have attribute `result`, will again work like `cf.Future.result` Example: >>> future = ASyncFuture(asyncio.sleep(1, result=42)) >>> future.result() 42 """ if self.done(): if hasattr(r := super().result(), "result"): # can be property, method, whatever. should work. return r.result # the result should be callable like an asyncio.Future return super().result return lambda: _materialize(self) def __getattr__(self, attr: str) -> Any: """ Allows access to attributes of the materialized result. Args: attr: The attribute name to access. Returns: The attribute value from the materialized result. Example: >>> class Example: ... def __init__(self, value): ... self.value = value >>> future = ASyncFuture(asyncio.sleep(1, result=Example(42))) >>> future.value 42 """ return getattr(_materialize(self), attr)
[docs] def __getitem__(self, key) -> Any: """ Allows item access on the materialized result. Args: key: The key to access. Returns: The item from the materialized result. Example: >>> future = ASyncFuture(asyncio.sleep(1, result={'key': 'value'})) >>> future['key'] 'value' """ return _materialize(self)[key]
# NOTE: broken, do not use. I think def __setitem__(self, key, value) -> None: """ Allows setting an item on the materialized result. Args: key: The key to set. value: The value to set. Example: >>> future = ASyncFuture(asyncio.sleep(1, result={'key': 'value'})) >>> future['key'] = 'new_value' """ _materialize(self)[key] = value # not sure what to call these def __contains__(self, key: Any) -> bool: """ Checks if a key is in the materialized result. Args: key: The key to check. Returns: True if the key is in the materialized result, False otherwise. Example: >>> future = ASyncFuture(asyncio.sleep(1, result={'key': 'value'})) >>> 'key' in future True """ return _materialize( ASyncFuture( self.__contains(key), dependencies=self.__list_dependencies(key) ) ) def __await__(self) -> Generator[Any, None, T]: """ Makes the `ASyncFuture` awaitable. Example: >>> future = ASyncFuture(asyncio.sleep(1, result=42)) >>> await future 42 """ return self.__await().__await__() async def __await(self) -> T: if not self.done(): self.set_result(await self.__task__) return self._result @property def __task__(self) -> "asyncio.Task[T]": """ Returns the asyncio task associated with the awaitable, creating it if necessary. Example: >>> future = ASyncFuture(asyncio.sleep(1, result=42)) >>> task = future.__task__ >>> isinstance(task, asyncio.Task) True """ if self.__task is None: self.__task = asyncio.create_task(self.__awaitable__) return self.__task
[docs] def __iter__(self): """ Returns an iterator for the materialized result. Example: >>> future = ASyncFuture(asyncio.sleep(1, result=[1, 2, 3])) >>> for item in future: ... print(item) 1 2 3 """ return _materialize(self).__iter__()
[docs] def __next__(self): """ Returns the next item from the materialized result. Example: >>> future = ASyncFuture(asyncio.sleep(1, result=iter([1, 2, 3]))) >>> next(future) 1 """ return _materialize(self).__next__()
def __enter__(self): """ Enters the context of the materialized result. Example: >>> class SomeContext: ... def __enter__(self): ... return self ... def __exit__(self, exc_type, exc_val, exc_tb): ... pass >>> future = ASyncFuture(asyncio.sleep(1, result=SomeContext())) >>> with future as context: ... context.do_something() """ return _materialize(self).__enter__() def __exit__(self, *args): """ Exits the context of the materialized result. Example: >>> class SomeContext: ... def __enter__(self): ... return self ... def __exit__(self, exc_type, exc_val, exc_tb): ... pass >>> future = ASyncFuture(asyncio.sleep(1, result=SomeContext())) >>> with future as context: ... pass >>> # Context is exited here """ return _materialize(self).__exit__(*args) @overload def __add__(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload def __add__(self: "ASyncFuture[float]", other: float) -> "ASyncFuture[float]": ... @overload def __add__(self: "ASyncFuture[float]", other: int) -> "ASyncFuture[float]": ... @overload def __add__(self: "ASyncFuture[int]", other: float) -> "ASyncFuture[float]": ... @overload def __add__( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload def __add__(self: "ASyncFuture[Decimal]", other: int) -> "ASyncFuture[Decimal]": ... @overload def __add__(self: "ASyncFuture[int]", other: Decimal) -> "ASyncFuture[Decimal]": ... @overload def __add__( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload def __add__( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __add__( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload def __add__( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __add__( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload def __add__( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload def __add__( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... def __add__(self, other: MetaNumeric) -> "ASyncFuture": """ Adds the result of this `ASyncFuture` to another value or `ASyncFuture`. Args: other: The value or `ASyncFuture` to add. Returns: A new `ASyncFuture` representing the sum. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=5)) >>> result = future1 + future2 >>> await result 15 """ return ASyncFuture( self.__add(other), dependencies=self.__list_dependencies(other) ) @overload def __sub__(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload def __sub__(self: "ASyncFuture[float]", other: float) -> "ASyncFuture[float]": ... @overload def __sub__(self: "ASyncFuture[float]", other: int) -> "ASyncFuture[float]": ... @overload def __sub__(self: "ASyncFuture[int]", other: float) -> "ASyncFuture[float]": ... @overload def __sub__( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload def __sub__(self: "ASyncFuture[Decimal]", other: int) -> "ASyncFuture[Decimal]": ... @overload def __sub__(self: "ASyncFuture[int]", other: Decimal) -> "ASyncFuture[Decimal]": ... @overload def __sub__( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload def __sub__( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __sub__( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload def __sub__( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __sub__( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload def __sub__( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload def __sub__( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... def __sub__(self, other: MetaNumeric) -> "ASyncFuture": """ Subtracts another value or `ASyncFuture` from the result of this `ASyncFuture`. Args: other: The value or `ASyncFuture` to subtract. Returns: A new `ASyncFuture` representing the difference. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=5)) >>> result = future1 - future2 >>> await result 5 """ return ASyncFuture( self.__sub(other), dependencies=self.__list_dependencies(other) ) def __mul__(self, other) -> "ASyncFuture": """ Multiplies the result of this `ASyncFuture` by another value or `ASyncFuture`. Args: other: The value or `ASyncFuture` to multiply. Returns: A new `ASyncFuture` representing the product. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=5)) >>> result = future1 * future2 >>> await result 50 """ return ASyncFuture( self.__mul(other), dependencies=self.__list_dependencies(other) ) def __pow__(self, other) -> "ASyncFuture": """ Raises the result of this `ASyncFuture` to the power of another value or `ASyncFuture`. Args: other: The exponent value or `ASyncFuture`. Returns: A new `ASyncFuture` representing the power. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=2)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=3)) >>> result = future1 ** future2 >>> await result 8 """ return ASyncFuture( self.__pow(other), dependencies=self.__list_dependencies(other) ) def __truediv__(self, other) -> "ASyncFuture": """ Divides the result of this `ASyncFuture` by another value or `ASyncFuture`. Args: other: The divisor value or `ASyncFuture`. Returns: A new `ASyncFuture` representing the quotient. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=2)) >>> result = future1 / future2 >>> await result 5.0 """ return ASyncFuture( self.__truediv(other), dependencies=self.__list_dependencies(other) ) def __floordiv__(self, other) -> "ASyncFuture": """ Performs floor division of the result of this `ASyncFuture` by another value or `ASyncFuture`. Args: other: The divisor value or `ASyncFuture`. Returns: A new `ASyncFuture` representing the floor division result. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=3)) >>> result = future1 // future2 >>> await result 3 """ return ASyncFuture( self.__floordiv(other), dependencies=self.__list_dependencies(other) ) def __pow__(self, other) -> "ASyncFuture": """ Raises the result of this `ASyncFuture` to the power of another value or `ASyncFuture`. Args: other: The exponent value or `ASyncFuture`. Returns: A new `ASyncFuture` representing the power. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=2)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=3)) >>> result = future1 ** future2 >>> await result 8 """ return ASyncFuture( self.__pow(other), dependencies=self.__list_dependencies(other) ) @overload def __radd__(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload def __radd__(self: "ASyncFuture[float]", other: float) -> "ASyncFuture[float]": ... @overload def __radd__(self: "ASyncFuture[float]", other: int) -> "ASyncFuture[float]": ... @overload def __radd__(self: "ASyncFuture[int]", other: float) -> "ASyncFuture[float]": ... @overload def __radd__( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload def __radd__( self: "ASyncFuture[Decimal]", other: int ) -> "ASyncFuture[Decimal]": ... @overload def __radd__( self: "ASyncFuture[int]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload def __radd__( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload def __radd__( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __radd__( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload def __radd__( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __radd__( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload def __radd__( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload def __radd__( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... def __radd__(self, other) -> "ASyncFuture": """ Adds another value or `ASyncFuture` to the result of this `ASyncFuture`. Args: other: The value or `ASyncFuture` to add. Returns: A new `ASyncFuture` representing the sum. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> result = 5 + future1 >>> await result 15 """ return ASyncFuture( self.__radd(other), dependencies=self.__list_dependencies(other) ) @overload def __rsub__(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload def __rsub__(self: "ASyncFuture[float]", other: float) -> "ASyncFuture[float]": ... @overload def __rsub__(self: "ASyncFuture[float]", other: int) -> "ASyncFuture[float]": ... @overload def __rsub__(self: "ASyncFuture[int]", other: float) -> "ASyncFuture[float]": ... @overload def __rsub__( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload def __rsub__( self: "ASyncFuture[Decimal]", other: int ) -> "ASyncFuture[Decimal]": ... @overload def __rsub__( self: "ASyncFuture[int]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload def __rsub__( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload def __rsub__( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __rsub__( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload def __rsub__( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload def __rsub__( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload def __rsub__( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload def __rsub__( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... def __rsub__(self, other) -> "ASyncFuture": """ Subtracts the result of this `ASyncFuture` from another value or `ASyncFuture`. Args: other: The value or `ASyncFuture` to subtract from. Returns: A new `ASyncFuture` representing the difference. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> result = 20 - future1 >>> await result 10 """ return ASyncFuture( self.__rsub(other), dependencies=self.__list_dependencies(other) ) def __rmul__(self, other) -> "ASyncFuture": """ Multiplies another value or `ASyncFuture` by the result of this `ASyncFuture`. Args: other: The value or `ASyncFuture` to multiply. Returns: A new `ASyncFuture` representing the product. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> result = 2 * future1 >>> await result 20 """ return ASyncFuture( self.__rmul(other), dependencies=self.__list_dependencies(other) ) def __rtruediv__(self, other) -> "ASyncFuture": """ Divides another value or `ASyncFuture` by the result of this `ASyncFuture`. Args: other: The value or `ASyncFuture` to divide. Returns: A new `ASyncFuture` representing the quotient. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=2)) >>> result = 10 / future1 >>> await result 5.0 """ return ASyncFuture( self.__rtruediv(other), dependencies=self.__list_dependencies(other) ) def __rfloordiv__(self, other) -> "ASyncFuture": """ Performs floor division of another value or `ASyncFuture` by the result of this `ASyncFuture`. Args: other: The value or `ASyncFuture` to divide. Returns: A new `ASyncFuture` representing the floor division result. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=3)) >>> result = 10 // future1 >>> await result 3 """ return ASyncFuture( self.__rfloordiv(other), dependencies=self.__list_dependencies(other) ) def __rpow__(self, other) -> "ASyncFuture": """ Raises another value or `ASyncFuture` to the power of the result of this `ASyncFuture`. Args: other: The base value or `ASyncFuture`. Returns: A new `ASyncFuture` representing the power. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=3)) >>> result = 2 ** future1 >>> await result 8 """ return ASyncFuture( self.__rpow(other), dependencies=self.__list_dependencies(other) ) def __eq__(self, other) -> "ASyncFuture": """ Compares the result of this `ASyncFuture` with another value or `ASyncFuture` for equality. Args: other: The value or `ASyncFuture` to compare. Returns: A new `ASyncFuture` representing the equality comparison result. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=10)) >>> result = future1 == future2 >>> await result True """ return bool( ASyncFuture(self.__eq(other), dependencies=self.__list_dependencies(other)) ) def __gt__(self, other) -> "ASyncFuture": """ Compares the result of this `ASyncFuture` with another value or `ASyncFuture` for greater than. Args: other: The value or `ASyncFuture` to compare. Returns: A new `ASyncFuture` representing the greater than comparison result. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=5)) >>> result = future1 > future2 >>> await result True """ return ASyncFuture( self.__gt(other), dependencies=self.__list_dependencies(other) ) def __ge__(self, other) -> "ASyncFuture": """ Compares the result of this `ASyncFuture` with another value or `ASyncFuture` for greater than or equal. Args: other: The value or `ASyncFuture` to compare. Returns: A new `ASyncFuture` representing the greater than or equal comparison result. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=10)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=10)) >>> result = future1 >= future2 >>> await result True """ return ASyncFuture( self.__ge(other), dependencies=self.__list_dependencies(other) ) def __lt__(self, other) -> "ASyncFuture": """ Compares the result of this `ASyncFuture` with another value or `ASyncFuture` for less than. Args: other: The value or `ASyncFuture` to compare. Returns: A new `ASyncFuture` representing the less than comparison result. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=5)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=10)) >>> result = future1 < future2 >>> await result True """ return ASyncFuture( self.__lt(other), dependencies=self.__list_dependencies(other) ) def __le__(self, other) -> "ASyncFuture": """ Compares the result of this `ASyncFuture` with another value or `ASyncFuture` for less than or equal. Args: other: The value or `ASyncFuture` to compare. Returns: A new `ASyncFuture` representing the less than or equal comparison result. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=5)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=5)) >>> result = future1 <= future2 >>> await result True """ return ASyncFuture( self.__le(other), dependencies=self.__list_dependencies(other) ) # Maths @overload async def __add(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload async def __add( self: "ASyncFuture[float]", other: float ) -> "ASyncFuture[float]": ... @overload async def __add(self: "ASyncFuture[float]", other: int) -> "ASyncFuture[float]": ... @overload async def __add(self: "ASyncFuture[int]", other: float) -> "ASyncFuture[float]": ... @overload async def __add( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __add( self: "ASyncFuture[Decimal]", other: int ) -> "ASyncFuture[Decimal]": ... @overload async def __add( self: "ASyncFuture[int]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __add( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload async def __add( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __add( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload async def __add( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __add( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload async def __add( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload async def __add( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... async def __add(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) return a + b @overload async def __sub(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload async def __sub( self: "ASyncFuture[float]", other: float ) -> "ASyncFuture[float]": ... @overload async def __sub(self: "ASyncFuture[float]", other: int) -> "ASyncFuture[float]": ... @overload async def __sub(self: "ASyncFuture[int]", other: float) -> "ASyncFuture[float]": ... @overload async def __sub( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __sub( self: "ASyncFuture[Decimal]", other: int ) -> "ASyncFuture[Decimal]": ... @overload async def __sub( self: "ASyncFuture[int]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __sub( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload async def __sub( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __sub( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload async def __sub( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __sub( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload async def __sub( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload async def __sub( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... async def __sub(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) return a - b async def __mul(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) return a * b async def __truediv(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) return a / b async def __floordiv(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) return a // b async def __pow(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) return a**b # rMaths @overload async def __radd(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload async def __radd( self: "ASyncFuture[float]", other: float ) -> "ASyncFuture[float]": ... @overload async def __radd( self: "ASyncFuture[float]", other: int ) -> "ASyncFuture[float]": ... @overload async def __radd( self: "ASyncFuture[int]", other: float ) -> "ASyncFuture[float]": ... @overload async def __radd( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __radd( self: "ASyncFuture[Decimal]", other: int ) -> "ASyncFuture[Decimal]": ... @overload async def __radd( self: "ASyncFuture[int]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __radd( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload async def __radd( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __radd( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload async def __radd( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __radd( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload async def __radd( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload async def __radd( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... async def __radd(self, other) -> "Any": a, b = await _gather_check_and_materialize(other, self) return a + b @overload async def __rsub(self: "ASyncFuture[int]", other: int) -> "ASyncFuture[int]": ... @overload async def __rsub( self: "ASyncFuture[float]", other: float ) -> "ASyncFuture[float]": ... @overload async def __rsub( self: "ASyncFuture[float]", other: int ) -> "ASyncFuture[float]": ... @overload async def __rsub( self: "ASyncFuture[int]", other: float ) -> "ASyncFuture[float]": ... @overload async def __rsub( self: "ASyncFuture[Decimal]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __rsub( self: "ASyncFuture[Decimal]", other: int ) -> "ASyncFuture[Decimal]": ... @overload async def __rsub( self: "ASyncFuture[int]", other: Decimal ) -> "ASyncFuture[Decimal]": ... @overload async def __rsub( self: "ASyncFuture[int]", other: Awaitable[int] ) -> "ASyncFuture[int]": ... @overload async def __rsub( self: "ASyncFuture[float]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __rsub( self: "ASyncFuture[float]", other: Awaitable[int] ) -> "ASyncFuture[float]": ... @overload async def __rsub( self: "ASyncFuture[int]", other: Awaitable[float] ) -> "ASyncFuture[float]": ... @overload async def __rsub( self: "ASyncFuture[Decimal]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... @overload async def __rsub( self: "ASyncFuture[Decimal]", other: Awaitable[int] ) -> "ASyncFuture[Decimal]": ... @overload async def __rsub( self: "ASyncFuture[int]", other: Awaitable[Decimal] ) -> "ASyncFuture[Decimal]": ... async def __rsub(self, other) -> "Any": a, b = await _gather_check_and_materialize(other, self) return a - b async def __rmul(self, other) -> "Any": a, b = await _gather_check_and_materialize(other, self) return a * b async def __rtruediv(self, other) -> "Any": a, b = await _gather_check_and_materialize(other, self) return a / b async def __rfloordiv(self, other) -> "Any": a, b = await _gather_check_and_materialize(other, self) return a // b async def __rpow(self, other) -> "Any": a, b = await _gather_check_and_materialize(other, self) return a**b async def __iadd(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) self._result = a + b return self._result async def __isub(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) self._result = a - b return self._result async def __imul(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) self._result = a * b return self._result async def __itruediv(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) self._result = a / b return self._result async def __ifloordiv(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) self._result = a // b return self._result async def __ipow(self, other) -> "Any": a, b = await _gather_check_and_materialize(self, other) self._result = a**b return self._result # Comparisons async def __eq(self, other) -> bool: a, b = await _gather_check_and_materialize(self, other) return a == b async def __gt(self, other) -> bool: a, b = await _gather_check_and_materialize(self, other) return a > b async def __ge(self, other) -> bool: a, b = await _gather_check_and_materialize(self, other) return a >= b async def __lt(self, other) -> bool: a, b = await _gather_check_and_materialize(self, other) return a < b async def __le(self, other) -> bool: a, b = await _gather_check_and_materialize(self, other) return a <= b # not sure what to call these async def __contains(self, item: Any) -> bool: _self, _item = await _gather_check_and_materialize(self, item) return _item in _self # conversion # NOTE: We aren't allowed to return ASyncFutures here :( def __bool__(self) -> bool: return bool(_materialize(self)) def __bytes__(self) -> bytes: return bytes(_materialize(self)) def __str__(self) -> str: return str(_materialize(self)) def __int__(self) -> int: return int(_materialize(self)) def __float__(self) -> float: return float(_materialize(self)) # WIP internals @property def __dependants__(self) -> Set["ASyncFuture"]: """ Returns the set of dependants for this `ASyncFuture`, including nested dependants. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=42)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=10), dependencies=[future1]) >>> dependants = future1.__dependants__ >>> future2 in dependants True """ dependants = set() for dep in self.__dependants: dependants.add(dep) dependants.union(dep.__dependants__) return dependants @property def __dependencies__(self) -> Set["ASyncFuture"]: """ Returns the set of dependencies for this `ASyncFuture`, including nested dependencies. Example: >>> future1 = ASyncFuture(asyncio.sleep(1, result=42)) >>> future2 = ASyncFuture(asyncio.sleep(1, result=10), dependencies=[future1]) >>> dependencies = future2.__dependencies__ >>> future1 in dependencies True """ dependencies = set() for dep in self.__dependencies: dependencies.add(dep) dependencies.union(dep.__dependencies__) return dependencies def __sizeof__(self) -> int: if isinstance(self.__awaitable__, Coroutine): return sum( sys.getsizeof(v) for v in self.__awaitable__.cr_frame.f_locals.values() ) elif isinstance(self.__awaitable__, asyncio.Future): raise NotImplementedError raise NotImplementedError
@final class _ASyncFutureWrappedFn(Callable[P, ASyncFuture[T]]): """ A callable class to wrap functions and return `ASyncFuture` objects. This class is used internally by the `future` decorator to wrap a function and return an `ASyncFuture` when the function is called. It allows the function to be executed asynchronously and its result to be awaited. Example: >>> def sync_fn(): ... return 42 >>> wrapped_fn = _ASyncFutureWrappedFn(sync_fn) >>> future = wrapped_fn() >>> isinstance(future, ASyncFuture) True Note: This is not part of the public API. Use the `future` decorator instead. """ __slots__ = "callable", "wrapped", "_callable_name" def __init__( self, callable: AnyFn[P, T] = None, **kwargs: Unpack[ModifierKwargs], ): from a_sync import a_sync if callable: self.callable = callable """The callable function.""" self._callable_name = callable.__name__ """The name of the callable function.""" a_sync_callable = a_sync(callable, default="async", **kwargs) @wraps(callable) def future_wrap(*args: P.args, **kwargs: P.kwargs) -> "ASyncFuture[T]": return ASyncFuture(a_sync_callable(*args, **kwargs, sync=False)) self.wrapped = future_wrap """The wrapped function returning ASyncFuture.""" else: self.wrapped = partial(_ASyncFutureWrappedFn, **kwargs) def __call__(self, *args: P.args, **kwargs: P.kwargs) -> ASyncFuture[T]: return self.wrapped(*args, **kwargs) def __repr__(self) -> str: return f"<{self.__class__.__name__} {self.callable}>" def __get__( self, instance: I, owner: Type[I] ) -> Union[Self, "_ASyncFutureInstanceMethod[I, P, T]"]: return self if owner is None else _ASyncFutureInstanceMethod(self, instance) @final class _ASyncFutureInstanceMethod(Generic[I, P, T]): # NOTE: probably could just replace this with functools.partial """ A class to handle instance methods wrapped as `ASyncFuture`. This class is used internally to manage instance methods that are wrapped by `_ASyncFutureWrappedFn`. It ensures that the method is bound to the instance and returns an `ASyncFuture` when called. Example: >>> class MyClass: ... @_ASyncFutureWrappedFn ... def method(self): ... return 42 >>> instance = MyClass() >>> future = instance.method() >>> isinstance(future, ASyncFuture) True Note: This is not part of the public API. Use the `future` decorator instead. """ __module__: str """The module name of the wrapper.""" __name__: str """The name of the wrapper.""" __qualname__: str """The qualified name of the wrapper.""" __doc__: Optional[str] """The docstring of the wrapper.""" __annotations__: Dict[str, Any] """The annotations of the wrapper.""" __instance: I """The instance to which the method is bound.""" __wrapper: _ASyncFutureWrappedFn[P, T] """The wrapper function.""" def __init__( self, wrapper: _ASyncFutureWrappedFn[P, T], instance: I, ) -> None: # sourcery skip: use-contextlib-suppress try: self.__module__ = wrapper.__module__ except AttributeError: pass try: self.__name__ = wrapper.__name__ except AttributeError: pass try: self.__qualname__ = wrapper.__qualname__ except AttributeError: pass try: self.__doc__ = wrapper.__doc__ except AttributeError: pass try: self.__annotations__ = wrapper.__annotations__ except AttributeError: pass try: self.__dict__.update(wrapper.__dict__) except AttributeError: pass self.__instance = instance self.__wrapper = wrapper def __repr__(self) -> str: return f"<{self.__class__.__name__} for {self.__wrapper.callable} bound to {self.__instance}>" def __call__(self, /, *fn_args: P.args, **fn_kwargs: P.kwargs) -> T: return self.__wrapper(self.__instance, *fn_args, **fn_kwargs) __all__ = ["future", "ASyncFuture"]