Source code for a_sync.primitives._debug


import abc
import asyncio
from typing import Optional

from a_sync.primitives._loggable import _LoggerMixin


[docs] class _DebugDaemonMixin(_LoggerMixin, metaclass=abc.ABCMeta): __slots__ = "_daemon",
[docs] @abc.abstractmethod async def _debug_daemon(self, fut: asyncio.Future, fn, *args, **kwargs) -> None: ...
[docs] def _start_debug_daemon(self, *args, **kwargs) -> "asyncio.Future[None]": if self.debug_logs_enabled and asyncio.get_event_loop().is_running(): return asyncio.create_task(self._debug_daemon(*args, **kwargs)) # else we return a blank Future since we shouldn't or can't create the daemon return asyncio.get_event_loop().create_future()
[docs] def _ensure_debug_daemon(self, *args, **kwargs) -> "asyncio.Future[None]": if not self.debug_logs_enabled: self._daemon = asyncio.get_event_loop().create_future() if not hasattr(self, '_daemon') or self._daemon is None: self._daemon = self._start_debug_daemon(*args, **kwargs) self._daemon.add_done_callback(self._stop_debug_daemon) return self._daemon
[docs] def _stop_debug_daemon(self, t: Optional[asyncio.Task] = None) -> None: if t and t != self._daemon: raise ValueError(f"{t} is not {self._daemon}") self._daemon.cancel() self._daemon = None