Source code for a_sync.task

"""
This module provides asynchronous task management utilities, specifically focused on creating and handling mappings of tasks.

The main components include:
- TaskMapping: A class for managing and asynchronously generating tasks based on input iterables.
- TaskMappingKeys: A view to asynchronously iterate over the keys of a TaskMapping.
- TaskMappingValues: A view to asynchronously iterate over the values of a TaskMapping.
- TaskMappingItems: A view to asynchronously iterate over the items (key-value pairs) of a TaskMapping.
"""

import asyncio
import contextlib
import functools
import inspect
import logging
import weakref

import a_sync.asyncio
from a_sync import exceptions
from a_sync._typing import *
from a_sync.a_sync import _kwargs
from a_sync.a_sync.base import ASyncGenericBase
from a_sync.a_sync.function import ASyncFunction
from a_sync.a_sync.method import (
    ASyncBoundMethod,
    ASyncMethodDescriptor,
    ASyncMethodDescriptorSyncDefault,
)
from a_sync.a_sync.property import _ASyncPropertyDescriptorBase
from a_sync.asyncio import as_completed, create_task, gather
from a_sync.asyncio.gather import Excluder
from a_sync.iter import ASyncIterator, ASyncGeneratorFunction, ASyncSorter
from a_sync.primitives.locks import Event
from a_sync.primitives.queue import Queue, ProcessingQueue
from a_sync.utils.iterators import as_yielded, exhaust_iterator


logger = logging.getLogger(__name__)


MappingFn = Callable[Concatenate[K, P], Awaitable[V]]


[docs] class TaskMapping(DefaultDict[K, "asyncio.Task[V]"], AsyncIterable[Tuple[K, V]]): """ A mapping of keys to asynchronous tasks with additional functionality. `TaskMapping` is a specialized dictionary that maps keys to `asyncio` Tasks. It provides convenient methods for creating, managing, and iterating over these tasks asynchronously. Tasks are created automatically for each key using a provided function. You cannot manually set items in a `TaskMapping` using dictionary-like syntax. Example: >>> async def fetch_data(url: str) -> str: ... async with aiohttp.ClientSession() as session: ... async with session.get(url) as response: ... return await response.text() ... >>> tasks = TaskMapping(fetch_data, ['http://example.com', 'https://www.python.org'], name='url_fetcher', concurrency=5) >>> async for key, result in tasks: ... print(f"Data for {key}: {result}") ... Data for python.org: http://python.org Data for example.com: http://example.com Note: You cannot manually set items in a `TaskMapping` using dictionary-like syntax. Tasks are created and managed internally. See Also: - :class:`asyncio.Task` - :func:`asyncio.create_task` - :func:`a_sync.asyncio.create_task` """ concurrency: Optional[int] = None "The max number of tasks that will run at one time." _destroyed: bool = False "Boolean indicating whether his mapping has been consumed and is no longer usable for aggregations." _init_loader: Optional["asyncio.Task[None]"] = None "An asyncio Task used to preload values from the iterables." _init_loader_next: Optional[Callable[[], Awaitable[Tuple[Tuple[K, "asyncio.Task[V]"]]]]] = None "A coro function that blocks until the _init_loader starts a new task(s), and then returns a `Tuple[Tuple[K, asyncio.Task[V]]]` with all of the new tasks and the keys that started them." _name: Optional[str] = None "Optional name for tasks created by this mapping." _next: Event = None "An asyncio Event that indicates the next result is ready" _wrapped_func_kwargs: Dict[str, Any] = {} "Additional keyword arguments passed to `_wrapped_func`." __iterables__: Tuple[AnyIterableOrAwaitableIterable[K], ...] = () "The original iterables, if any, used to initialize this mapping." __init_loader_coro: Optional[Awaitable[None]] = None """An optional asyncio Coroutine to be run by the `_init_loader`""" __slots__ = "_wrapped_func", "__wrapped__", "__dict__", "__weakref__" # NOTE: maybe since we use so many classvars here we are better off getting rid of slots
[docs] def __init__( self, wrapped_func: MappingFn[K, P, V] = None, *iterables: AnyIterableOrAwaitableIterable[K], name: str = "", concurrency: Optional[int] = None, **wrapped_func_kwargs: P.kwargs, ) -> None: """ Initialize a TaskMapping instance. Args: wrapped_func: A callable that takes a key and additional parameters and returns an Awaitable. *iterables: Any number of iterables whose elements will be used as keys for task generation. name: An optional name for the tasks created by this mapping. concurrency: Maximum number of tasks to run concurrently. **wrapped_func_kwargs: Additional keyword arguments to be passed to wrapped_func. Example: async def process_item(item: int) -> int: await asyncio.sleep(1) return item * 2 task_map = TaskMapping(process_item, [1, 2, 3], concurrency=2) """ if concurrency: self.concurrency = concurrency self.__wrapped__ = wrapped_func "The original callable used to initialize this mapping without any modifications." if iterables: self.__iterables__ = iterables wrapped_func = _unwrap(wrapped_func) self._wrapped_func = wrapped_func "The function used to create tasks for each key." if isinstance(wrapped_func, ASyncMethodDescriptor) and not _kwargs.get_flag_name_legacy( wrapped_func_kwargs ): wrapped_func_kwargs["sync"] = False if wrapped_func_kwargs: self._wrapped_func_kwargs = wrapped_func_kwargs if name: self._name = name if iterables: self._next = Event(name=f"{self} `_next`") @functools.wraps(wrapped_func) async def _wrapped_set_next( *args: P.args, __a_sync_recursion: int = 0, **kwargs: P.kwargs ) -> V: try: return await wrapped_func(*args, **kwargs) except exceptions.SyncModeInAsyncContextError as e: e.args = *e.args, f"wrapped:{self.__wrapped__}" raise except TypeError as e: if __a_sync_recursion > 2 or not ( str(e).startswith(wrapped_func.__name__) and "got multiple values for argument" in str(e) ): raise # NOTE: args ordering is clashing with provided kwargs. We can handle this in a hacky way. # TODO: perform this check earlier and pre-prepare the args/kwargs ordering new_args = list(args) new_kwargs = dict(kwargs) try: for i, arg in enumerate(inspect.getfullargspec(self.__wrapped__).args): if arg in kwargs: new_args.insert(i, new_kwargs.pop(arg)) else: break return await _wrapped_set_next( *new_args, **new_kwargs, __a_sync_recursion=__a_sync_recursion + 1, ) except TypeError as e2: raise ( e.with_traceback(e.__traceback__) if str(e2) == "unsupported callable" else e2.with_traceback(e2.__traceback__) ) finally: self._next.set() self._next.clear() self._wrapped_func = _wrapped_set_next init_loader_queue: Queue[Tuple[K, "asyncio.Future[V]"]] = Queue() self.__init_loader_coro = exhaust_iterator( self._start_tasks_for_iterables(*iterables), queue=init_loader_queue ) self._init_loader_next = init_loader_queue.get_all try: self._init_loader except _NoRunningLoop: # its okay if we get this exception, we can start the task as soon as the loop starts pass
def __repr__(self) -> str: return f"<{type(self).__name__} for {self._wrapped_func} kwargs={self._wrapped_func_kwargs} tasks={len(self)} at {hex(id(self))}>" def __hash__(self) -> int: return id(self) def __setitem__(self, item: Any, value: Any) -> None: raise NotImplementedError("You cannot manually set items in a TaskMapping")
[docs] def __getitem__(self, item: K) -> "asyncio.Task[V]": try: return dict.__getitem__(self, item) except KeyError: return self.__start_task(item)
def __await__(self) -> Generator[Any, None, Dict[K, V]]: """Wait for all tasks to complete and return a dictionary of the results.""" return self.gather(sync=False).__await__()
[docs] async def __aiter__(self, pop: bool = False) -> AsyncIterator[Tuple[K, V]]: """Asynchronously iterate through all key-task pairs, yielding the key-result pair as each task completes.""" self._if_pop_check_destroyed(pop) # if you inited the TaskMapping with some iterators, we will load those yielded = set() try: if self._init_loader is None: # if you didn't init the TaskMapping with iterators and you didn't start any tasks manually, we should fail self._raise_if_empty() else: while not self._init_loader.done(): await self._wait_for_next_key() while unyielded := [key for key in self if key not in yielded]: if ready := {key: task for key in unyielded if (task := self[key]).done()}: if pop: for key, task in ready.items(): yield key, await self.pop(key) yielded.add(key) else: for key, task in ready.items(): yield key, await task yielded.add(key) else: await self._next.wait() # loader is already done by this point, but we need to check for exceptions await self._init_loader # if there are any tasks that still need to complete, we need to await them and yield them if unyielded := {key: self[key] for key in self if key not in yielded}: if pop: async for key, value in as_completed(unyielded, aiter=True): self.pop(key) yield key, value else: async for key, value in as_completed(unyielded, aiter=True): yield key, value finally: await self._if_pop_clear(pop)
def __delitem__(self, item: K) -> None: task_or_fut = dict.__getitem__(self, item) if not task_or_fut.done(): task_or_fut.cancel() dict.__delitem__(self, item)
[docs] def keys(self, pop: bool = False) -> "TaskMappingKeys[K, V]": return TaskMappingKeys(dict.keys(self), self, pop=pop)
[docs] def values(self, pop: bool = False) -> "TaskMappingValues[K, V]": return TaskMappingValues(dict.values(self), self, pop=pop)
[docs] def items(self, pop: bool = False) -> "TaskMappingValues[K, V]": return TaskMappingItems(dict.items(self), self, pop=pop)
[docs] async def close(self) -> None: await self._if_pop_clear(True)
[docs] @ASyncGeneratorFunction async def map( self, *iterables: AnyIterableOrAwaitableIterable[K], pop: bool = True, yields: Literal["keys", "both"] = "both", ) -> AsyncIterator[Tuple[K, V]]: """ Asynchronously map iterables to tasks and yield their results. Args: *iterables: Iterables to map over. pop: Whether to remove tasks from the internal storage once they are completed. yields: Whether to yield 'keys', 'values', or 'both' (key-value pairs). Yields: Depending on `yields`, either keys, values, or tuples of key-value pairs representing the results of completed tasks. Example: async def process_item(item: int) -> int: await asyncio.sleep(1) return item * 2 task_map = TaskMapping(process_item) async for key, result in task_map.map([1, 2, 3]): print(f"Processed {key}: {result}") """ self._if_pop_check_destroyed(pop) # make sure the init loader is started if needed init_loader = self._init_loader if iterables and init_loader: raise ValueError( "You cannot pass `iterables` to map if the TaskMapping was initialized with an (a)iterable." ) try: if iterables: self._raise_if_not_empty() try: async for _ in self._tasks_for_iterables(*iterables): async for key, value in self.yield_completed(pop=pop): yield _yield(key, value, yields) except _EmptySequenceError: if len(iterables) > 1: # TODO gotta handle this situation raise exceptions.EmptySequenceError( "bob needs to code something so you can do this, go tell him" ) from None # just pass thru elif init_loader: # check for exceptions if you passed an iterable(s) into the class init await init_loader else: self._raise_if_empty( "You must either initialize your TaskMapping with an iterable(s) or provide them during your call to map" ) if self: if pop: async for key, value in as_completed(self, aiter=True): self.pop(key) yield _yield(key, value, yields) else: async for key, value in as_completed(self, aiter=True): yield _yield(key, value, yields) finally: await self._if_pop_clear(pop)
[docs] @ASyncMethodDescriptorSyncDefault async def all(self, pop: bool = True) -> bool: try: async for key, result in self.__aiter__(pop=pop): if not bool(result): return False return True except _EmptySequenceError: return True finally: await self._if_pop_clear(pop)
[docs] @ASyncMethodDescriptorSyncDefault async def any(self, pop: bool = True) -> bool: try: async for key, result in self.__aiter__(pop=pop): if bool(result): return True return False except _EmptySequenceError: return False finally: await self._if_pop_clear(pop)
[docs] @ASyncMethodDescriptorSyncDefault async def max(self, pop: bool = True) -> V: max = None try: async for key, result in self.__aiter__(pop=pop): if max is None or result > max: max = result except _EmptySequenceError: raise exceptions.EmptySequenceError("max() arg is an empty sequence") from None if max is None: raise exceptions.EmptySequenceError("max() arg is an empty sequence") from None return max
[docs] @ASyncMethodDescriptorSyncDefault async def min(self, pop: bool = True) -> V: """Return the minimum result from the tasks in the mapping.""" min = None try: async for key, result in self.__aiter__(pop=pop): if min is None or result < min: min = result except _EmptySequenceError: raise exceptions.EmptySequenceError("min() arg is an empty sequence") from None if min is None: raise exceptions.EmptySequenceError("min() arg is an empty sequence") from None return min
[docs] @ASyncMethodDescriptorSyncDefault async def sum(self, pop: bool = False) -> V: """Return the sum of the results from the tasks in the mapping.""" retval = 0 try: async for key, result in self.__aiter__(pop=pop): retval += result except _EmptySequenceError: return 0 return retval
[docs] @ASyncIterator.wrap async def yield_completed(self, pop: bool = True) -> AsyncIterator[Tuple[K, V]]: """ Asynchronously yield tuples of key-value pairs representing the results of any completed tasks. Args: pop: Whether to remove tasks from the internal storage once they are completed. Yields: Tuples of key-value pairs representing the results of completed tasks. Example: async def process_item(item: int) -> int: await asyncio.sleep(1) return item * 2 task_map = TaskMapping(process_item, [1, 2, 3]) async for key, result in task_map.yield_completed(): print(f"Completed {key}: {result}") """ if pop: for k, task in dict(self).items(): if task.done(): yield k, await self.pop(k) else: for k, task in dict(self).items(): if task.done(): yield k, await task
[docs] @ASyncMethodDescriptorSyncDefault async def gather( self, return_exceptions: bool = False, exclude_if: Excluder[V] = None, tqdm: bool = False, **tqdm_kwargs: Any, ) -> Dict[K, V]: """Wait for all tasks to complete and return a dictionary of the results.""" if self._init_loader: await self._init_loader self._raise_if_empty() return await gather( self, return_exceptions=return_exceptions, exclude_if=exclude_if, tqdm=tqdm, **tqdm_kwargs, )
@overload def pop(self, item: K, *, cancel: bool = False) -> "Union[asyncio.Task[V], asyncio.Future[V]]": """Pop a task from the TaskMapping. Args: item: The key to pop. cancel: Whether to cancel the task when popping it. """ @overload def pop( self, item: K, default: K, *, cancel: bool = False ) -> "Union[asyncio.Task[V], asyncio.Future[V]]": """Pop a task from the TaskMapping. Args: item: The key to pop. default: The default value to return if no matching key is found. cancel: Whether to cancel the task when popping it. """
[docs] def pop(self, *args: K, cancel: bool = False) -> "Union[asyncio.Task[V], asyncio.Future[V]]": """Pop a task from the TaskMapping. Args: *args: One key to pop. cancel: Whether to cancel the task when popping it. """ fut_or_task = dict.pop(self, *args) if cancel: fut_or_task.cancel() return fut_or_task
[docs] def clear(self, cancel: bool = False) -> None: """# TODO write docs for this""" if cancel and self._init_loader and not self._init_loader.done(): logger.debug("cancelling %s", self._init_loader) self._init_loader.cancel() if keys := tuple(self.keys()): logger.debug("popping remaining %s tasks", self) for k in keys: self.pop(k, cancel=cancel)
@functools.cached_property def _init_loader(self) -> Optional["asyncio.Task[None]"]: if self.__init_loader_coro: logger.debug("starting %s init loader", self) try: task = create_task( coro=self.__init_loader_coro, name=f"{type(self).__name__} init loader loading {self.__iterables__} for {self}", ) except RuntimeError as e: raise _NoRunningLoop if str(e) == "no running event loop" else e task.add_done_callback(self.__cleanup) return task @functools.cached_property def _queue(self) -> ProcessingQueue: fn = functools.partial(self._wrapped_func, **self._wrapped_func_kwargs) return ProcessingQueue(fn, self.concurrency, name=self._name)
[docs] def _raise_if_empty(self, msg: str = "") -> None: if not self: raise exceptions.MappingIsEmptyError(self, msg)
[docs] def _raise_if_not_empty(self) -> None: if self: raise exceptions.MappingNotEmptyError(self)
[docs] @ASyncGeneratorFunction async def _tasks_for_iterables( self, *iterables: AnyIterableOrAwaitableIterable[K] ) -> AsyncIterator[Tuple[K, "asyncio.Task[V]"]]: """Ensure tasks are running for each key in the provided iterables.""" # if we have any regular containers we can yield their contents right away containers = [ iterable for iterable in iterables if not isinstance(iterable, AsyncIterable) and isinstance(iterable, Iterable) ] for iterable in containers: async for key in _yield_keys(iterable): yield key, self[key] if remaining := [iterable for iterable in iterables if iterable not in containers]: try: async for key in as_yielded(*[_yield_keys(iterable) for iterable in remaining]): # type: ignore [attr-defined] yield key, self[key] # ensure task is running except _EmptySequenceError: if len(iterables) == 1: raise raise RuntimeError("DEV: figure out how to handle this situation") from None
[docs] @ASyncGeneratorFunction async def _start_tasks_for_iterables( self, *iterables: AnyIterableOrAwaitableIterable[K] ) -> AsyncIterator[Tuple[K, "asyncio.Task[V]"]]: """Start new tasks for each key in the provided iterables.""" # if we have any regular containers we can yield their contents right away containers = [ iterable for iterable in iterables if not isinstance(iterable, AsyncIterable) and isinstance(iterable, Iterable) ] for iterable in containers: async for key in _yield_keys(iterable): yield key, self.__start_task(key) if remaining := [iterable for iterable in iterables if iterable not in containers]: try: async for key in as_yielded(*[_yield_keys(iterable) for iterable in remaining]): # type: ignore [attr-defined] yield key, self.__start_task(key) except _EmptySequenceError: if len(iterables) == 1: raise raise RuntimeError("DEV: figure out how to handle this situation") from None
[docs] def _if_pop_check_destroyed(self, pop: bool) -> None: if pop: if self._destroyed: raise RuntimeError(f"{self} has already been consumed") self._destroyed = True
[docs] async def _if_pop_clear(self, pop: bool) -> None: if pop: self._destroyed = True # _queue is a cached_property, we don't want to create it if it doesn't exist if self.concurrency and "_queue" in self.__dict__: self._queue.close() del self._queue self.clear(cancel=True) # we need to let the loop run once so the tasks can fully cancel await asyncio.sleep(0)
[docs] async def _wait_for_next_key(self) -> None: # NOTE if `_init_loader` has an exception it will return first, otherwise `_init_loader_next` will return always done, pending = await asyncio.wait( [ create_task(self._init_loader_next(), log_destroy_pending=False), self._init_loader, ], return_when=asyncio.FIRST_COMPLETED, ) for task in done: # check for exceptions await task
def __start_task(self, item: K) -> "asyncio.Future[V]": if self.concurrency: # NOTE: we use a queue instead of a Semaphore to reduce memory use for use cases involving many many tasks fut = self._queue.put_nowait(item) else: fut = create_task( coro=self._wrapped_func(item, **self._wrapped_func_kwargs), name=f"{self._name}[{item}]" if self._name else f"{item}", ) dict.__setitem__(self, item, fut) return fut def __cleanup(self, t: "asyncio.Task[None]") -> None: # clear the slot and let the bound Queue die del self.__init_loader_coro
class _NoRunningLoop(Exception): ... @overload def _yield( key: K, value: V, yields: Literal["keys"] ) -> K: ... # TODO write specific docs for this overload @overload def _yield( key: K, value: V, yields: Literal["both"] ) -> Tuple[K, V]: ... # TODO write specific docs for this overload def _yield(key: K, value: V, yields: Literal["keys", "both"]) -> Union[K, Tuple[K, V]]: """ Yield either the key, value, or both based on the 'yields' parameter. Args: key: The key of the task. value: The result of the task. yields: Determines what to yield; 'keys' for keys, 'both' for key-value pairs. Returns: The key, the value, or a tuple of both based on the 'yields' parameter. """ if yields == "both": return key, value elif yields == "keys": return key else: raise ValueError(f"`yields` must be 'keys' or 'both'. You passed {yields}") class _EmptySequenceError(ValueError): ... async def _yield_keys(iterable: AnyIterableOrAwaitableIterable[K]) -> AsyncIterator[K]: """ Asynchronously yield keys from the provided iterable. Args: iterable: An iterable that can be either synchronous or asynchronous. Yields: Keys extracted from the iterable. """ if not iterable: raise _EmptySequenceError(iterable) elif isinstance(iterable, AsyncIterable): async for key in iterable: yield key elif isinstance(iterable, Iterable): for key in iterable: yield key elif inspect.isawaitable(iterable): async for key in _yield_keys(await iterable): yield key else: raise TypeError(iterable) __unwrapped = weakref.WeakKeyDictionary() def _unwrap( wrapped_func: Union[ AnyFn[P, T], "ASyncMethodDescriptor[P, T]", _ASyncPropertyDescriptorBase[I, T] ] ) -> Callable[P, Awaitable[T]]: if unwrapped := __unwrapped.get(wrapped_func): return unwrapped if isinstance(wrapped_func, (ASyncBoundMethod, ASyncMethodDescriptor)): unwrapped = wrapped_func elif isinstance(wrapped_func, _ASyncPropertyDescriptorBase): unwrapped = wrapped_func.get elif isinstance(wrapped_func, ASyncFunction): # this speeds things up a bit by bypassing some logic # TODO implement it like this elsewhere if profilers suggest unwrapped = ( wrapped_func._modified_fn if wrapped_func._async_def else wrapped_func._asyncified ) else: unwrapped = wrapped_func __unwrapped[wrapped_func] = unwrapped return unwrapped _get_key: Callable[[Tuple[K, V]], K] = lambda k_and_v: k_and_v[0] _get_value: Callable[[Tuple[K, V]], V] = lambda k_and_v: k_and_v[1] class _TaskMappingView(ASyncGenericBase, Iterable[T], Generic[T, K, V]): """ Base class for TaskMapping views that provides common functionality. """ _get_from_item: Callable[[Tuple[K, V]], T] _pop: bool = False __slots__ = "__view__", "__mapping__" def __init__( self, view: Iterable[T], task_mapping: TaskMapping[K, V], pop: bool = False ) -> None: self.__view__ = view self.__mapping__: TaskMapping = weakref.proxy(task_mapping) "actually a weakref.ProxyType[TaskMapping] but then type hints weren't working" if pop: self._pop = True def __iter__(self) -> Iterator[T]: return iter(self.__view__) def __await__(self) -> Generator[Any, None, List[T]]: return self.__await().__await__() def __len__(self) -> int: return len(self.__view__) async def aiterbykeys(self, reverse: bool = False) -> ASyncIterator[T]: async for tup in ASyncSorter( self.__mapping__.items(pop=self._pop), key=_get_key, reverse=reverse ): yield self._get_from_item(tup) async def aiterbyvalues(self, reverse: bool = False) -> ASyncIterator[T]: async for tup in ASyncSorter( self.__mapping__.items(pop=self._pop), key=_get_value, reverse=reverse ): yield self._get_from_item(tup) async def __await(self) -> List[T]: return [result async for result in self]
[docs] class TaskMappingKeys(_TaskMappingView[K, K, V], Generic[K, V]): """ Asynchronous view to iterate over the keys of a TaskMapping. """ _get_from_item = lambda self, item: _get_key(item)
[docs] async def __aiter__(self) -> AsyncIterator[K]: # strongref mapping = self.__mapping__ mapping._if_pop_check_destroyed(self._pop) yielded = set() for key in self.__load_existing(): yielded.add(key) # there is no chance of duplicate keys here yield key if mapping._init_loader is None: await mapping._if_pop_clear(self._pop) return async for key in self.__load_init_loader(yielded): yielded.add(key) yield key if self._pop: # don't need to check yielded since we've been popping them as we go for key in self.__load_existing(): yield key await mapping._if_pop_clear(True) else: for key in self.__load_existing(): if key not in yielded: yield key
def __load_existing(self) -> Iterator[K]: # strongref mapping = self.__mapping__ if self._pop: for key in tuple(mapping): mapping.pop(key) yield key else: for key in tuple(mapping): yield key async def __load_init_loader(self, yielded: Set[K]) -> AsyncIterator[K]: # strongref mapping = self.__mapping__ if self._pop: while not mapping._init_loader.done(): await mapping._wait_for_next_key() for key in [k for k in mapping if k not in yielded]: mapping.pop(key) yield key else: while not mapping._init_loader.done(): await mapping._wait_for_next_key() for key in [k for k in mapping if k not in yielded]: yield key # check for any exceptions await mapping._init_loader
[docs] class TaskMappingItems(_TaskMappingView[Tuple[K, V], K, V], Generic[K, V]): """ Asynchronous view to iterate over the items (key-value pairs) of a TaskMapping. """ _get_from_item = lambda self, item: item
[docs] async def __aiter__(self) -> AsyncIterator[Tuple[K, V]]: # strongref mapping = self.__mapping__ mapping._if_pop_check_destroyed(self._pop) if self._pop: async for key in mapping.keys(): yield key, await mapping.pop(key) else: async for key in mapping.keys(): yield key, await mapping[key]
[docs] class TaskMappingValues(_TaskMappingView[V, K, V], Generic[K, V]): """ Asynchronous view to iterate over the values of a TaskMapping. """ _get_from_item = lambda self, item: _get_value(item)
[docs] async def __aiter__(self) -> AsyncIterator[V]: # strongref mapping = self.__mapping__ mapping._if_pop_check_destroyed(self._pop) if self._pop: async for key in mapping.keys(): yield await mapping.pop(key) else: async for key in mapping.keys(): yield await mapping[key]
__all__ = [ "TaskMapping", "TaskMappingKeys", "TaskMappingValues", "TaskMappingItems", ]