Source code for y._db.common

from abc import ABCMeta, abstractmethod
from asyncio import Task, create_task, get_event_loop, sleep
from itertools import dropwhile, groupby
from logging import DEBUG, getLogger
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterator,
    Awaitable,
    Callable,
    Container,
    Dict,
    Generic,
    List,
    NoReturn,
    Optional,
    Type,
    TypeVar,
    Union,
)

import a_sync
import dank_mids
import eth_retry
from a_sync import (
    ASyncIterable,
    ASyncIterator,
    AsyncThreadPoolExecutor,
    CounterLock,
    PruningThreadPoolExecutor,
)
from async_property import async_property
from brownie import ZERO_ADDRESS
from dank_mids import BlockSemaphore
from evmspec.data import Address, HexBytes32
from hexbytes import HexBytes
from pony.orm import OptimisticCheckError, TransactionIntegrityError, db_session
from web3.datastructures import AttributeDict
from web3.middleware.filter import block_ranges

from y import convert
from y import ENVIRONMENT_VARIABLES as ENVS
from y._db.decorators import retry_locked
from y._db.exceptions import CacheNotPopulatedError
from y._decorators import stuck_coro_debugger
from y.exceptions import reraise_excs_with_extra_context
from y.utils.middleware import BATCH_SIZE

if TYPE_CHECKING:
    from y import Block

T = TypeVar("T")
S = TypeVar("S")
M = TypeVar("M")

Checkpoints = Dict["Block", int]

logger = getLogger(__name__)
default_filter_threads = PruningThreadPoolExecutor(4)
"""
The thread pool executor used for all :class:`Filter` objects without one provided, with a maximum of 4 threads.
"""


def enc_hook(obj: Any) -> bytes:
    """
    Encode hook for JSON serialization of special types.

    Args:
        obj: The object to encode.

    Raises:
        NotImplementedError: If the object type is not supported for encoding.

    Note:
        Currently supports encoding of :class:`int`, :class:`Address`, :class:`HexBytes32`, :class:`HexBytes`, and :class:`AttributeDict` objects.

    Examples:
        >>> from web3.datastructures import AttributeDict
        >>> enc_hook(AttributeDict({'key': 'value'}))
        {'key': 'value'}

        >>> from hexbytes import HexBytes
        >>> enc_hook(HexBytes('0x1234'))
        '1234'

    See Also:
        - :func:`dec_hook`
    """
    typ = type(obj)
    # sometimes we get a recursion error from the instance checks, this helps us debug that case.
    with reraise_excs_with_extra_context(obj, typ):
        # we use issubclass instead of isinstance here to prevent a recursion error
        if issubclass(typ, int):
            return int(obj)
        elif issubclass(typ, Address):
            return obj[2:]
        elif isinstance(obj, HexBytes32):
            # we trim all leading zeroes since we know how many we need to put back later
            return hex(int(obj.hex(), 16))[2:]
        elif isinstance(obj, HexBytes):
            return bytes(obj).hex()
        elif isinstance(obj, AttributeDict):
            return dict(obj)
        raise TypeError


def dec_hook(typ: Type[T], obj: bytes) -> T:
    """
    Decode hook for JSON deserialization of special types.

    Args:
        typ: The type to decode into.
        obj: The object to decode.

    Raises:
        ValueError: If the type is not supported for decoding.

    Note:
        Currently only supports decoding of :class:`HexBytes` objects.

    Example:
        >>> from hexbytes import HexBytes
        >>> dec_hook(HexBytes, b'1234')
        HexBytes('0x1234')

    See Also:
        - :func:`enc_hook`
    """
    if typ is HexBytes:
        return typ(obj)
    raise ValueError(f"{typ} is not a valid type for decoding")


class DiskCache(Generic[S, M], metaclass=ABCMeta):
    @abstractmethod
    def _set_metadata(self, from_block: "Block", done_thru: "Block") -> None:
        """
        Update cache metadata to indicate that the cache is populated from block `from_block` to block `done_thru`.

        Args:
            from_block: The starting block number.
            done_thru: The ending block number indicating that the cache is populated up to this block.

        Example:
            >>> disk_cache._set_metadata(100, 200)

        See Also:
            - :meth:`set_metadata`
        """

    @abstractmethod
    def _is_cached_thru(self, from_block: "Block") -> "Block":
        """
        Returns max cached block for this cache or 0 if not cached.

        Args:
            from_block: The starting block number.

        Returns:
            The maximum cached block number.
        """

    @abstractmethod
    def _select(self, from_block: "Block", to_block: "Block") -> List[S]:
        """
        Selects all cached objects from block `from_block` to block `to_block`.

        Args:
            from_block: The starting block number.
            to_block: The ending block number.

        Returns:
            A list of cached objects.
        """

    @retry_locked
    def set_metadata(self, from_block: "Block", done_thru: "Block") -> None:
        """
        Update cache metadata to indicate that the cache is populated from block `from_block` to block `done_thru`.

        Args:
            from_block: The starting block number.
            done_thru: The ending block number up to which the cache is populated.

        Example:
            >>> cache.set_metadata(100, 200)

        See Also:
            - :meth:`_set_metadata`
            - :class:`CacheNotPopulatedError`
        """
        try:
            with db_session:
                self._set_metadata(from_block, done_thru)
        except TransactionIntegrityError as e:
            logger.debug("%s got exc %s when setting cache metadata", self, e)
            self.set_metadata(from_block, done_thru)
        except OptimisticCheckError as e:
            # Don't need to update in this case
            logger.debug("%s got exc %s when setting cache metadata", self, e)

    @db_session
    @retry_locked
    def select(self, from_block: "Block", to_block: "Block") -> List[S]:
        """
        Selects all cached objects from block `from_block` to block `to_block`.

        Args:
            from_block: The starting block number.
            to_block: The ending block number.

        Returns:
            A list of cached objects.
        """
        return self._select(from_block, to_block)

    @db_session
    @retry_locked
    def is_cached_thru(self, from_block: "Block") -> "Block":
        """
        Returns max cached block for this cache or 0 if not cached.

        Args:
            from_block: The starting block number.

        Returns:
            The maximum cached block number.
        """
        return self._is_cached_thru(from_block)

    @db_session
    @retry_locked
    def check_and_select(self, from_block: "Block", to_block: "Block") -> List[S]:
        """
        Selects all cached objects within a specified block range.

        Args:
            from_block: The starting block number.
            to_block: The ending block number.

        Returns:
            A list of cached objects.

        Raises:
            CacheNotPopulatedError: If the cache is not fully populated.

        Example:
            >>> try:
            ...     data = cache.check_and_select(100, 200)
            ... except CacheNotPopulatedError:
            ...     print("Cache incomplete")

        See Also:
            - :meth:`set_metadata`
            - :meth:`select`
        """
        if self.is_cached_thru(from_block) >= to_block:
            return self.select(from_block, to_block)
        raise CacheNotPopulatedError(self, from_block, to_block)

    __slots__ = []


C = TypeVar("C", bound=DiskCache)


class _DiskCachedMixin(ASyncIterable[T], Generic[T, C], metaclass=ABCMeta):
    """
    Mixin that provides asynchronous features for data caches stored on disk.
    """

    _checkpoints: Checkpoints
    __slots__ = "is_reusable", "_cache", "_executor", "_objects", "_pruned"

    def __init__(
        self,
        executor: Optional[AsyncThreadPoolExecutor] = None,
        is_reusable: bool = True,
    ):
        """
        Initializes the mixin.

        Args:
            executor: Optional executor to use for I/O operations.
            is_reusable: Whether data should be kept in memory for reuse.
        """
        self.is_reusable = is_reusable
        self._cache = None
        self._executor = executor
        self._objects: List[T] = []
        self._pruned = 0

    @property
    @abstractmethod
    def cache(self) -> C:
        """
        Returns the associated cache object, which must be defined in subclasses.
        """

    @property
    def executor(self) -> AsyncThreadPoolExecutor:
        """
        Returns the executor used for disk operations, creating one if necessary.
        """
        executor = self._executor
        if executor is None:
            executor = self._executor = AsyncThreadPoolExecutor(1)
        return executor

    def __del__(self) -> None:
        """
        Shutdown the executor on deletion.
        """
        executor = self._executor
        if executor is not None:
            executor.shutdown()

    @property
    @abstractmethod
    def insert_to_db(self) -> Callable[[T], None]: ...

    def bulk_insert(self) -> Callable[[List[T]], Awaitable[None]]:
        """
        Function to bulk insert a list of objects into the database.

        This property must be overridden in subclasses to provide the desired bulk insertion logic.
        The implementation should return a callable that accepts a list of objects (List[T])
        and returns an awaitable.

        Example:
            >>> async def my_bulk_insert(objs):
            ...     # perform custom bulk insert operations here
            ...     pass
            >>> class MyFilter(_DiskCachedMixin):
            ...     @property
            ...     def bulk_insert(self) -> Callable[[List[T]], Awaitable[None]]:
            ...         return my_bulk_insert

        See Also:
            - :meth:`_load_cache`
        """

    async def _extend(self, objs: Container[T]) -> None:
        """
        Override this to pre-process objects before storing.

        Args:
            objs ("Container[T]"): The objects to extend the list with.

        Example:
            >>> await instance._extend([obj1, obj2])

        See Also:
            - :meth:`_load_cache`
        """
        if objs:
            self._objects.extend(objs)
            if self.is_reusable:
                block = self._get_block_for_obj(self._objects[-1])
                self._checkpoints[block] = len(self._objects)

    async def _load_cache(self, from_block: "Block") -> "Block":
        """
        Loads cached logs from disk.

        Args:
            from_block: The starting block number.

        Returns:
            The maximum block number loaded from cache, or None if no cached data is available.

        Example:
            >>> cached_thru = await instance._load_cache(100)
            >>> if cached_thru is None:
            ...     print("No cached data available")
            ... else:
            ...     print(f"Cache loaded through block {cached_thru}")

        See Also:
            - :meth:`_extend`
        """
        logger.debug("checking to see if %s is cached in local db", self)
        if cached_thru := await _metadata_read_executor.run(
            self.cache.is_cached_thru, from_block
        ):
            logger.info(
                "%s is cached thru block %s, loading from db", self, cached_thru
            )
            await self._extend(
                await self.executor.run(self.cache.select, from_block, cached_thru)
            )
            if self.is_reusable:
                objs_per_chunk = 50
                num_checkpoints = len(self._objects) // objs_per_chunk
                checkpoint_indexes = (
                    i * objs_per_chunk for i in range(1, num_checkpoints)
                )
                get_block_for_obj = self._get_block_for_obj
                for index in checkpoint_indexes:
                    obj = self._objects[index]
                    if index < len(self._objects):
                        next_obj = self._objects[index + 1]
                        while get_block_for_obj(obj) == get_block_for_obj(next_obj):
                            obj = next_obj
                            index += 1
                            try:
                                next_obj = self._objects[index + 1]
                            except IndexError:
                                break

                    self._checkpoints[get_block_for_obj(obj)] = index

            logger.info(
                "%s loaded %s objects thru block %s from disk",
                self,
                len(self._objects),
                cached_thru,
            )
            return cached_thru
        return None


def make_executor(
    small: int, big: int, name: Optional[str] = None
) -> PruningThreadPoolExecutor:
    """
    Creates a thread pool executor that prunes completed tasks.

    Args:
        small: Size of the pool if using a non-postgres DB.
        big: Size of the pool if using a postgres DB.
        name: Optional name for the executor.

    Returns:
        A PruningThreadPoolExecutor instance based on the environment configuration.
    """
    return PruningThreadPoolExecutor(
        big if ENVS.DB_PROVIDER == "postgres" else small, name
    )


_E = TypeVar("_E", bound=AsyncThreadPoolExecutor)
_MAX_LONG_LONG = 9223372036854775807

_metadata_read_executor = make_executor(2, 3, "ypricemagic Filter read metadata")
_metadata_write_executor = make_executor(1, 3, "ypricemagic Filter write metadata")


class Filter(_DiskCachedMixin[T, C]):
    # defaults are stored as class vars to keep instance dicts smaller
    _chunk_size = BATCH_SIZE
    _chunks_per_batch = None
    _exc = None
    _db_task = None
    _sleep_fut = None
    _sleep_time = 60
    _task = None
    _depth = 0
    _semaphore = None
    _verbose = False
    __slots__ = (
        "from_block",
        "to_block",
        "_checkpoints",
        "_interval",
        "_lock",
    )

    def __init__(
        self,
        from_block: "Block",
        *,
        chunk_size: int = BATCH_SIZE,
        chunks_per_batch: Optional[int] = None,
        sleep_time: int = 60,
        semaphore: Optional[BlockSemaphore] = None,
        executor: Optional[AsyncThreadPoolExecutor] = None,
        is_reusable: bool = True,
        verbose: bool = False,
    ):
        """
        Initializes the Filter, specifying blocks, concurrency, and caching parameters.

        Args:
            from_block: Earliest block from which data should be retrieved.
            chunk_size: Size of each chunk to retrieve in a single RPC call.
            chunks_per_batch: How many chunks to load per batch (optional).
            sleep_time: Time (in seconds) to sleep between reloads when following chain head.
            semaphore: Block-based semaphore to limit concurrency.
            executor: Executor for disk-related tasks.
            is_reusable: Keeps data in memory if True; data is pruned when False.
            verbose: Enable debug-like progress logging if True.
        """
        self.from_block = from_block
        if chunk_size != self._chunk_size:
            self._chunk_size = chunk_size
        if chunks_per_batch != self._chunks_per_batch:
            self._chunks_per_batch = chunks_per_batch
        self._lock = CounterLock(name=str(self))
        if semaphore != self._semaphore:
            self._semaphore = semaphore
        if sleep_time != self._sleep_time:
            self._sleep_time = sleep_time
        if verbose != self._verbose:
            self._verbose = verbose
        self._checkpoints = {}
        super().__init__(executor=executor, is_reusable=is_reusable)

    def __aiter__(self) -> AsyncIterator[T]:
        """
        Returns an async iterator over the stored objects, yielding new ones as they are fetched.
        """
        return self._objects_thru(block=None).__aiter__()

    def __del__(self) -> None:
        """
        Cancels any pending fetch task upon deletion.
        """
        if self._task and not self._task.done():
            self._task.cancel()

    @abstractmethod
    async def _fetch_range(self, from_block: "Block", to_block: "Block") -> List[T]:
        """
        Fetches data for a given range of blocks from an on-chain or remote provider.

        Args:
            from_block: Lower bound of the block range.
            to_block: Upper bound of the block range.

        Returns:
            List of objects for that block range.
        """

    @property
    def semaphore(self) -> BlockSemaphore:
        if self._semaphore is None:
            self._semaphore = BlockSemaphore(self._chunks_per_batch)
        return self._semaphore

    def _get_block_for_obj(self, obj: T) -> "Block":
        """
        Override this as needed for different object types.

        Args:
            obj: The object to get the block number for.

        Returns:
            The block number of the object.

        Example:
            >>> block = instance._get_block_for_obj(some_obj)
        """
        return obj.blockNumber

    @ASyncIterator.wrap
    async def _objects_thru(
        self, block: Optional["Block"], from_block: Optional["Block"] = None
    ) -> AsyncIterator[T]:
        """
        Generates objects up to a specified block, or indefinitely if none is given.

        Args:
            block: Maximum block number to generate objects. If None, yields continuously.
            from_block: Minimum block from which to start yielding objects. Only valid if reusable.

        Yields:
            Objects that fall within the requested block range, if any.
        """
        self._ensure_task()
        debug_logs = logger.isEnabledFor(DEBUG)
        yielded = self._pruned
        done_thru = 0
        get_block_for_obj = self._get_block_for_obj
        if self.is_reusable:
            if from_block:
                reached_from_block = False

                def obj_out_of_range(obj) -> bool:
                    if get_block_for_obj(obj) < from_block:
                        return True
                    nonlocal reached_from_block
                    reached_from_block = True
                    return False

                def skip_too_early(objects):
                    nonlocal yielded
                    if checkpoints := self._checkpoints:
                        start_checkpoint_index = _get_checkpoint_index(
                            from_block, checkpoints
                        )
                        if start_checkpoint_index is not None:
                            objects = objects[start_checkpoint_index:]
                            yielded += start_checkpoint_index
                    start_len = len(objects)
                    objects = tuple(dropwhile(obj_out_of_range, objects))
                    yielded += start_len - len(objs)
                    return objects

            if objs := self._objects:
                if block is None:
                    if from_block:
                        objs = skip_too_early(objs)
                    for obj in objs:
                        yield obj
                    yielded += len(objs)
                    done_thru = get_block_for_obj(obj)
                elif self._checkpoints:
                    checkpoint_index = _get_checkpoint_index(block, self._checkpoints)
                    if checkpoint_index is not None:
                        objs = objs[:checkpoint_index]
                        done_thru = get_block_for_obj(objs[-1])
                        if from_block:
                            objs = skip_too_early(objs)
                        for obj in objs:
                            yield obj
                        yielded += len(objs)

                elif from_block:
                    skip_too_early(objs)

        elif from_block:
            raise RuntimeError(
                f"You cannot pass a value for `from_block` unless the {type(self).__name__} is reusable"
            )

        while True:
            if block is None or done_thru < block:
                self._wakeup()
                await self._lock.wait_for(done_thru + 1)
            if self._exc is not None:
                # raise it
                await self._exc
            if to_yield := self._objects[yielded - self._pruned :]:
                if from_block and not reached_from_block:
                    objs = skip_too_early(to_yield)
                    if block is None:
                        for obj in objs:
                            yield obj
                    else:
                        for obj in objs:
                            if get_block_for_obj(obj) > block:
                                return
                            yield obj
                    yielded += len(objs)

                elif block:
                    if self.is_reusable:
                        for obj in to_yield:
                            if get_block_for_obj(obj) > block:
                                return
                            yield obj
                        yielded += len(to_yield)
                    else:
                        for obj in to_yield:
                            if get_block_for_obj(obj) > block:
                                self._prune(yielded - self._pruned)
                                return
                            yield obj
                            yielded += 1

                else:
                    for obj in to_yield:
                        yield obj
                    yielded += len(to_yield)

                if not self.is_reusable:
                    self._prune(len(to_yield))

            elif block and done_thru >= block:
                return

            done_thru = self._lock.value
            if debug_logs:
                logger._log(
                    DEBUG,
                    "%s lock value %s to_block %s",
                    (self, done_thru, block),
                )
            if block is None:
                await sleep(self._sleep_time)

    @async_property
    async def _sleep(self) -> None:
        """
        Puts the Filter into a sleep state until `_wakeup` is called. No new requests will be made.
        """
        if self._sleep_fut is None or self._sleep_fut.done():
            self._sleep_fut = get_event_loop().create_future()
        await self._sleep_fut

    def _wakeup(self) -> None:
        """Wake up the Filter to query logs from blocks not yet loaded into memory."""
        if self._sleep_fut is not None:
            self._sleep_fut.set_result(None)
            del self._sleep_fut

    async def __fetch(self) -> NoReturn:
        """
        Main coroutine that continuously runs the internal fetch loop.
        """
        try:
            await self._fetch()
        except Exception as e:
            import traceback

            logger.exception(e)
            self._exc = get_event_loop().create_future()
            self._exc.set_exception(e)
            # no need to hold vars in memory
            traceback.clear_frames(e.__traceback__)
            self._lock.set(_MAX_LONG_LONG)
            raise

    async def _fetch(self) -> NoReturn:
        """
        Defines the main logic for populating the Filter with data. Subclasses can override if needed.

        Example:
            >>> await instance._fetch()

        See Also:
            - :meth:`_loop`
        """
        await self._loop(self.from_block)

    @stuck_coro_debugger
    async def _fetch_range_wrapped(
        self, i: int, range_start: "Block", range_end: "Block", debug_logs: bool
    ) -> List[T]:
        """
        Wraps the _fetch_range call with concurrency control.

        Args:
            i: Index of the chunk or range segment.
            range_start: Lower bound of this block range.
            range_end: Upper bound of this block range.
            debug_logs: Whether debug logging is enabled.

        Returns:
            A tuple containing the index, the ending block, and the fetched objects.
        """
        async with self.semaphore[range_end]:
            if debug_logs:
                logger._log(
                    DEBUG,
                    "fetching %s block %s to %s",
                    (self, range_start, range_end),
                )
            return i, range_end, await self._fetch_range(range_start, range_end)

    async def _loop(self, from_block: "Block") -> NoReturn:
        """
        Work loop that continually fetches new data, loads from cache if available, then sleeps.

        Args:
            from_block: Earliest block from which to begin loading data.
        """
        logger.debug("starting work loop for %s", self)
        if cached_thru := await self._load_cache(from_block):
            self._lock.set(cached_thru)
        while True:
            await self._load_new_objects(start_from_block=cached_thru or from_block)
            await self._sleep

    @eth_retry.auto_retry
    @stuck_coro_debugger
    async def _load_new_objects(
        self,
        to_block: Optional["Block"] = None,
        start_from_block: Optional["Block"] = None,
    ) -> None:
        """
        Asynchronously loads new objects from your RPC, up to an optionally-specified end block.

        Args:
            to_block: Specific block to stop at. If None, load up to the current chain head.
            start_from_block: Block to start from if no prior data is cached.
        """
        SLEEP_TIME = 1

        if debug_logs := logger.isEnabledFor(DEBUG):
            logger._log(DEBUG, "loading new objects for %s", (self,))

        start = (
            v + 1 if (v := self._lock.value) else start_from_block or self.from_block
        )
        if to_block:
            end = to_block
            if start > end:
                raise ValueError(
                    f"start {start} is bigger than end {end}, can't do that"
                )

            while end > (current_block := await dank_mids.eth.block_number):
                logger.warning(
                    "You're trying to query a block range that has not fully completed:\n"
                    "range end: %s  current block: %s  Waiting 1s and trying again...",
                    end,
                    current_block,
                )
                await sleep(5.0)

        elif debug_logs:
            while start > (end := await dank_mids.eth.block_number):
                logger._log(
                    DEBUG,
                    "%s start %s is greater than end %s, sleeping...",
                    (self, start, end),
                )
                await sleep(SLEEP_TIME)

        else:
            while start > (end := await dank_mids.eth.block_number):
                await sleep(SLEEP_TIME)

        try:
            await self._load_range(start, end)
        except ValueError as e:
            if (
                "One of the blocks specified in filter (fromBlock, toBlock or blockHash) cannot be found."
                in str(e)
            ):
                logger.warning("Your rpc might be out of sync, trying again...")
            else:
                raise

    @stuck_coro_debugger
    async def _load_range(self, from_block: "Block", to_block: "Block") -> None:
        """
        Loads a particular block range in chunks, respecting concurrency limits.

        Args:
            from_block: Lower bound of the block range.
            to_block: Upper bound of the block range.
        """
        if debug_logs := logger.isEnabledFor(DEBUG):
            logger._log(DEBUG, "loading block range %s to %s", (from_block, to_block))
        chunks_yielded = 0
        done = {}
        coros = [
            self._fetch_range_wrapped(i, start, end, debug_logs)
            for i, (start, end) in enumerate(
                block_ranges(from_block, to_block, self._chunk_size)
            )
            if self._chunks_per_batch is None or i < self._chunks_per_batch
        ]
        async for i, end, objs in a_sync.as_completed(
            coros, aiter=True, tqdm=self._verbose
        ):
            next_chunk_loaded = False
            done[i] = end, objs
            for i in range(chunks_yielded, len(coros)):
                if i not in done:
                    break
                end, objs = done.pop(i)
                self._insert_chunk(objs, from_block, end, debug_logs)
                await self._extend(objs)
                next_chunk_loaded = True
                chunks_yielded += 1
            if next_chunk_loaded:
                await self._set_lock(end)
                if debug_logs:
                    logger._log(DEBUG, "%s loaded thru block %s", (self, end))

    @stuck_coro_debugger
    async def _set_lock(self, block: "Block") -> None:
        """
        Override this if you want to, for things like awaiting for tasks to complete as I do in the curve module.

        Args:
            block: The block number to set the lock to.

        Example:
            >>> await instance._set_lock(150)

        See Also:
            - :meth:`_load_new_objects`
        """
        self._lock.set(block)

    def _insert_chunk(
        self, objs: List[T], from_block: "Block", done_thru: "Block", debug_logs: bool
    ) -> None:
        """
        Queues the insertion of a chunk of objects into the database, and sets metadata.

        Args:
            objs: List of objects to be inserted.
            from_block: Earliest block in the current overall range.
            done_thru: Block number up to which this chunk completes.
            debug_logs: Whether debug logging is active.
        """
        if prev_task := self._db_task:
            if prev_task.done():
                if e := prev_task.exception():
                    raise e
                prev_task = None

        depth = self._depth
        self._depth += 1

        insert_coro = self.__insert_chunk(
            objs, from_block, done_thru, prev_task, depth, debug_logs
        )

        if debug_logs:
            logger._log(
                DEBUG,
                "%s queuing next db insert chunk %s thru block %s",
                (self, depth, done_thru),
            )
            task = create_task(
                coro=insert_coro,
                name=f"_insert_chunk from {from_block} to {done_thru}",
            )
        else:
            task = create_task(insert_coro)

        task._depth = depth
        self._db_task = task

    def _ensure_task(self) -> None:
        """
        Ensures there is a main fetch task running in the background. If not, creates it.
        """
        if self._task is None:
            logger.debug("creating task for %s", self)
            self._task = create_task(coro=self.__fetch(), name=f"{self}.__fetch")
            # NOTE: The task does not return and will be cancelled when this object is
            # garbage collected so there is no need to log the "destroy pending task" message.
            self._task._log_destroy_pending = False
        if self._task.done() and (e := self._task.exception()):
            raise e.with_traceback(e.__traceback__)

    async def __insert_chunk(
        self,
        objs: List[T],
        from_block: "Block",
        done_thru: "Block",
        prev_chunk_task: Optional[Task],
        depth: int,
        debug_logs: bool,
    ) -> None:
        """
        Inserts the previously fetched chunk into the database, waits on prior tasks if needed.

        Args:
            objs: List of objects to insert.
            from_block: Earliest block for the entire fetch range.
            done_thru: Ending block for this chunk.
            prev_chunk_task: Task that inserted previous chunk, if any.
            depth: Relative ordering of this chunk within the entire fetch.
            debug_logs: Whether debug logs are active.
        """
        if prev_chunk_task:
            await prev_chunk_task
        del prev_chunk_task

        if objs:
            await self.bulk_insert(objs)
        del objs

        await _metadata_write_executor.run(
            self.cache.set_metadata, from_block, done_thru
        )
        if debug_logs:
            logger._log(
                DEBUG,
                "%s chunk %s thru block %s is now in db",
                (self, depth, done_thru),
            )

    def _prune(self, count: int) -> None:
        """
        Removes a specified number of objects from the beginning of the in-memory list.

        Args:
            count: Number of objects to remove.
        """
        self._objects = self._objects[count:]
        self._pruned += count


def _clean_addresses(addresses: Union[list, tuple]) -> Union[str, List[str]]:
    """
    Converts addresses into a standardized format, raising an error if the zero address is encountered.

    Args:
        addresses: Single or multiple addresses to clean.

    Returns:
        Cleaned addresses in a consistent string format or a list of such strings.

    Raises:
        ValueError: If the zero address is encountered or if input is invalid.
    """
    if addresses == ZERO_ADDRESS:
        raise ValueError("Cannot make a LogFilter for the zero address")
    if not addresses:
        return addresses
    if isinstance(addresses, str):
        return convert.to_address(addresses)
    elif hasattr(addresses, "__iter__"):
        if ZERO_ADDRESS in addresses:
            raise ValueError("Cannot make a LogFilter for the zero address")
        return list(map(convert.to_address, addresses))
    return convert.to_address(addresses)


def _get_suitable_checkpoint(
    target_block: "Block", checkpoints: Checkpoints
) -> Optional["Block"]:
    """
    Finds a suitable checkpoint block that is less than or equal to a target block.

    Args:
        target_block: The block number used as a reference.
        checkpoints: Dictionary of block -> index.

    Returns:
        Most recent checkpoint block before or equal to the target block, or None if none exist.
    """
    block_lt_checkpoint, group = next(groupby(checkpoints, target_block.__lt__))
    return None if block_lt_checkpoint is True else tuple(group)[-1]


def _get_checkpoint_index(
    target_block: "Block", checkpoints: Checkpoints
) -> Optional[int]:
    """
    Retrieves the index for a checkpoint that is less than or equal to a given block.

    Args:
        target_block: The block number reference.
        checkpoints: Dictionary of block -> index.

    Returns:
        The index of the checkpoint, or None if no suitable checkpoint exists.
    """
    checkpoint_block = _get_suitable_checkpoint(target_block, checkpoints)
    return None if checkpoint_block is None else checkpoints[checkpoint_block]