Source code for a_sync.debugging

from asyncio import create_task, sleep
from inspect import isasyncgenfunction
from logging import DEBUG, Logger, getLogger
from functools import wraps
from time import time
from typing import AsyncIterator, Awaitable, Callable, NoReturn, TypeVar, overload

from typing_extensions import Concatenate, ParamSpec

from a_sync.a_sync.base import ASyncGenericBase
from a_sync.a_sync.method import ASyncBoundMethod
from a_sync.iter import ASyncGeneratorFunction


__P = ParamSpec("__P")
__T = TypeVar("__T")
__B = TypeVar("__B", bound=ASyncGenericBase)


_FIVE_MINUTES = 300

logger = getLogger("a_sync.debugging")


@overload
def stuck_coro_debugger(
    fn: Callable[Concatenate[__B, __P], AsyncIterator[__T]],
    logger: Logger = logger,
    interval: int = _FIVE_MINUTES,
) -> ASyncGeneratorFunction[__P, __T]: ...


@overload
def stuck_coro_debugger(
    fn: Callable[Concatenate[__B, __P], Awaitable[__T]],
    logger: Logger = logger,
    interval: int = _FIVE_MINUTES,
) -> ASyncBoundMethod[__B, __P, __T]: ...


@overload
def stuck_coro_debugger(
    fn: Callable[Concatenate[__B, __P], __T], logger: Logger = logger, interval: int = _FIVE_MINUTES
) -> ASyncBoundMethod[__B, __P, __T]: ...


@overload
def stuck_coro_debugger(
    fn: Callable[__P, AsyncIterator[__T]], logger: Logger = logger, interval: int = _FIVE_MINUTES
) -> Callable[__P, AsyncIterator[__T]]: ...


@overload
def stuck_coro_debugger(
    fn: Callable[__P, Awaitable[__T]], logger: Logger = logger, interval: int = _FIVE_MINUTES
) -> Callable[__P, Awaitable[__T]]: ...


[docs] def stuck_coro_debugger(fn, logger=logger, interval=_FIVE_MINUTES): __logger_is_enabled_for = logger.isEnabledFor if isasyncgenfunction(fn): @wraps(fn) async def stuck_async_gen_wrap(*args: __P.args, **kwargs: __P.kwargs) -> AsyncIterator[__T]: aiterator = fn(*args, **kwargs) if not __logger_is_enabled_for(DEBUG): async for thing in aiterator: yield thing return task = create_task( coro=_stuck_debug_task(logger, interval, fn, args, kwargs), name="_stuck_debug_task", ) try: async for thing in aiterator: yield thing finally: task.cancel() return stuck_async_gen_wrap else: @wraps(fn) async def stuck_coro_wrap(*args: __P.args, **kwargs: __P.kwargs) -> __T: if not __logger_is_enabled_for(DEBUG): return await fn(*args, **kwargs) task = create_task( coro=_stuck_debug_task(logger, interval, fn, args, kwargs), name="_stuck_debug_task", ) try: retval = await fn(*args, **kwargs) finally: task.cancel() return retval return stuck_coro_wrap
async def _stuck_debug_task( logger: Logger, interval: int, fn: Callable[__P, __T], *args: __P.args, **kwargs: __P.kwargs ) -> NoReturn: # sleep early so fast-running coros can exit early await sleep(interval) start = time() - interval module = fn.__module__ name = fn.__name__ formatted_args = tuple(map(str, args)) formatted_kwargs = dict(zip(kwargs.keys(), map(str, kwargs.values()))) log = logger._log while True: log( DEBUG, "%s.%s still executing after %sm with args %s kwargs %s", ( module, name, round((time() - start) / 60, 2), formatted_args, formatted_kwargs, ), ) await sleep(interval) __all__ = ["stuck_coro_debugger"]