Source code for y.time

import asyncio
import datetime
import logging
import time
from typing import NewType, Union

import dank_mids
import eth_retry
from async_lru import alru_cache
from brownie import chain, web3
from cachetools.func import ttl_cache

try:
    from dank_mids.ENVIRONMENT_VARIABLES import GANACHE_FORK
except ImportError:
    from dank_mids._config import GANACHE_FORK

from y.exceptions import NodeNotSynced
from y.networks import Network
from y.utils.cache import a_sync_ttl_cache, memory
from y.utils.client import get_ethereum_client, get_ethereum_client_async
from y.utils.logging import yLazyLogger


UnixTimestamp = NewType("UnixTimestamp", int)
Timestamp = Union[UnixTimestamp, datetime.datetime]

logger = logging.getLogger(__name__)


[docs] class NoBlockFound(Exception): """ Raised when no block is found for a specified timestamp because the timestamp is in the future. """
[docs] def __init__(self, timestamp: Timestamp): super().__init__(f"No block found after timestamp {timestamp}")
@memory.cache() @yLazyLogger(logger) @eth_retry.auto_retry def get_block_timestamp(height: int) -> int: import y._db.utils.utils as db if ts := db.get_block_timestamp(height, sync=True): return ts client = get_ethereum_client() if client in ["tg", "erigon"] and chain.id not in [Network.Polygon]: # NOTE: polygon erigon does not support this method header = web3.manager.request_blocking(f"{client}_getHeaderByNumber", [height]) ts = int(header.timestamp, 16) db.set_block_timestamp(height, ts, sync=True) return ts return chain[height].timestamp
[docs] @a_sync_ttl_cache @eth_retry.auto_retry async def get_block_timestamp_async(height: int) -> int: import y._db.utils.utils as db if ts := await db.get_block_timestamp(height, sync=False): return ts client = await get_ethereum_client_async() if client in ["tg", "erigon"] and chain.id not in [Network.Polygon]: # NOTE: polygon erigon does not support this method header = await dank_mids.web3.manager.coro_request( f"{client}_getHeaderByNumber", [height] ) ts = int(header.timestamp, 16) else: ts = await dank_mids.eth.get_block_timestamp(height) db.set_block_timestamp(height, ts) return ts
# TODO: deprecate @memory.cache() def last_block_on_date(date: Union[str, datetime.date]) -> int: """Returns the last block on a given `date`. You can pass either a `datetime.date` object or a date string formatted as 'YYYY-MM-DD'.""" if isinstance(date, datetime.datetime): raise TypeError( "You can not pass in a `datetime.datetime` object. Please call date() on your input before passing it to this funciton." ) if not isinstance(date, datetime.date): date = datetime.datetime.strptime(date, "%Y-%m-%d").date() height = chain.height lo, hi = 0, height while hi - lo > 1: mid = lo + (hi - lo) // 2 logger.debug("block: %s", str(mid)) mid_ts = get_block_timestamp(mid) mid_date = datetime.date.fromtimestamp(mid_ts) logger.debug("mid: %s", mid_date) logger.debug(date) if mid_date > date: hi = mid else: lo = mid hi = hi - 1 block = hi if hi != height else None logger.debug("last %s block on date %s -> %s", Network.name(), date, block) return block
[docs] @a_sync_ttl_cache async def get_block_at_timestamp(timestamp: datetime.datetime) -> int: import y._db.utils.utils as db if block_at_timestamp := await db.get_block_at_timestamp(timestamp): return block_at_timestamp # TODO: invert this and use this fn inside of closest_block_after_timestamp for backwards compatability before deprecating closest_block_after_timestamp block_after_timestamp = await closest_block_after_timestamp_async(timestamp) block_at_timestamp = block_after_timestamp - 1 db.set_block_at_timestamp(timestamp, block_at_timestamp) return block_at_timestamp
def _parse_timestamp(timestamp: Timestamp) -> UnixTimestamp: if isinstance(timestamp, datetime.datetime): timestamp = int(timestamp.timestamp()) elif not isinstance(timestamp, int): raise TypeError("You may only pass in a unix timestamp or a datetime object.") return UnixTimestamp(timestamp) # yLazyLogger(logger)
[docs] def closest_block_after_timestamp( timestamp: Timestamp, wait_for_block_if_needed: bool = False ) -> int: timestamp = _parse_timestamp(timestamp) while wait_for_block_if_needed: try: return closest_block_after_timestamp(timestamp) except NoBlockFound: time.sleep(0.2) check_node() block = _closest_block_after_timestamp_cached(timestamp) logger.debug( "closest %s block after timestamp %s -> %s", Network.name(), timestamp, block ) return block
[docs] @a_sync_ttl_cache async def closest_block_after_timestamp_async( timestamp: Timestamp, wait_for_block_if_needed: bool = False ) -> int: timestamp = _parse_timestamp(timestamp) while wait_for_block_if_needed: try: block_at_ts = await get_block_at_timestamp( datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc), sync=False, ) return block_at_ts + 1 except NoBlockFound: await asyncio.sleep(0.2) await check_node_async() height = await dank_mids.eth.block_number lo, hi = 0, height while hi - lo > 1: mid = lo + (hi - lo) // 2 if await get_block_timestamp_async(mid) > timestamp: hi = mid else: lo = mid if hi == height: raise NoBlockFound(timestamp) return hi
@memory.cache() def _closest_block_after_timestamp_cached(timestamp: int) -> int: height = chain.height lo, hi = 0, height while hi - lo > 1: mid = lo + (hi - lo) // 2 if get_block_timestamp(mid) > timestamp: hi = mid else: lo = mid if hi == height: raise NoBlockFound(timestamp) return hi
[docs] @ttl_cache(ttl=300) @eth_retry.auto_retry def check_node() -> None: if GANACHE_FORK: return current_time = time.time() node_timestamp = web3.eth.get_block("latest").timestamp if current_time - node_timestamp > 5 * 60: raise NodeNotSynced( f"current time: {current_time} latest block time: {node_timestamp} discrepancy: {round((current_time - node_timestamp) / 60, 2)} minutes" )
[docs] @alru_cache(ttl=300) @eth_retry.auto_retry async def check_node_async() -> None: if GANACHE_FORK: return current_time = time.time() node_timestamp = await dank_mids.eth.get_block_timestamp("latest") if current_time - node_timestamp > 5 * 60: raise NodeNotSynced( f"current time: {current_time} latest block time: {node_timestamp} discrepancy: {round((current_time - node_timestamp) / 60, 2)} minutes" )