Source code for

from abc import abstractmethod
from asyncio import as_completed, gather, get_event_loop, sleep
from collections import Counter, defaultdict
from functools import cached_property, wraps
from importlib.metadata import version
from inspect import isawaitable
from itertools import zip_longest
from logging import getLogger
from threading import current_thread, main_thread
from typing import (

import a_sync
import dank_mids
import eth_retry
from a_sync.executor import _AsyncExecutorMixin
from async_property import async_property
from brownie import web3
from import (
from eth_typing import ChecksumAddress
from evmspec import Log
from msgspec.structs import force_setattr
from toolz import groupby
from web3.middleware.filter import block_ranges
from web3.types import LogReceipt

from y._db.common import Filter, _clean_addresses
from y.datatypes import Address, AnyAddressType, Block
from y.exceptions import reraise_excs_with_extra_context
from y.utils.cache import memory
from y.utils.middleware import BATCH_SIZE

    from y._db.utils.logs import LogCache

T = TypeVar("T")

logger = getLogger(__name__)

# not really sure why this breaks things
ETH_EVENT_GTE_1_2_4 = tuple(int(i) for i in version("eth-event").split(".")) >= (

[docs] def decode_logs(logs: Union[Iterable[LogReceipt], Iterable[Log]]) -> EventDict: # NOTE: we want to ensure backward-compatability with LogReceipt """ Decode logs to events and enrich them with additional info. Args: logs: An iterable of :class:`~web3.types.LogReceipt` or :class:`~evmspec.Log` objects. Returns: An :class:`` containing decoded events. Examples: >>> logs = [LogReceipt(...), LogReceipt(...)] >>> events = decode_logs(logs) >>> print(events) See Also: - :class:`` """ if not logs: return EventDict() logs = list(logs) from y.contracts import Contract for log in logs: if log.address not in _deployment_topics: _add_deployment_topics(log.address, Contract(log.address).abi) # for some reason < 1.2.4 can decode them just fine but >= cannot special_treatment = ETH_EVENT_GTE_1_2_4 and logs and isinstance(logs[0], Log) if is_struct := isinstance(logs[0], Log): # save these for later orig_topics = [log.topics for log in logs] for log in logs: # these must support pop() for some reason force_setattr(log, "topics", list(log.topics)) try: decoded = _decode_logs( [log.to_dict() for log in logs] if special_treatment else logs ) except Exception: decoded = [] for log in logs: with reraise_excs_with_extra_context(log): # get some help for debugging decoded.extend(_decode_logs([log])) with reraise_excs_with_extra_context(len(logs), decoded): if is_struct: for i, (log, orig_topics) in enumerate(zip(logs, orig_topics)): setattr(decoded[i], "block_number", log.blockNumber) setattr(decoded[i], "transaction_hash", log.transactionHash) setattr(decoded[i], "log_index", log.logIndex) # put the log back to normal force_setattr(log, "topics", orig_topics) else: for i, log in enumerate(logs): setattr(decoded[i], "block_number", log["blockNumber"]) setattr(decoded[i], "transaction_hash", log["transactionHash"]) setattr(decoded[i], "log_index", log["logIndex"]) return decoded
[docs] @a_sync.a_sync(default="sync") async def get_logs_asap( address: Optional[Address], topics: Optional[List[str]], from_block: Optional[Block] = None, to_block: Optional[Block] = None, verbose: int = 0, ) -> List[Any]: """ Get logs as soon as possible. This function fetches raw logs from the blockchain within the specified block range. The logs are not decoded; use :func:`decode_logs` to decode them if needed. Args: address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. from_block: The starting block to fetch logs from. to_block: The ending block to fetch logs to. verbose: Verbosity level for logging. Returns: A list of raw logs. Examples: Synchronous usage: >>> logs = get_logs_asap("0x1234...", ["0x5678..."], 1000000, 1000100) >>> decoded_logs = decode_logs(logs) Asynchronous usage: >>> logs = await get_logs_asap("0x1234...", ["0x5678..."], 1000000, 1000100, sync=False) >>> decoded_logs = decode_logs(logs) See Also: - :func:`decode_logs` """ if from_block is None: from y.contracts import contract_creation_block_async from_block = ( 0 if address is None else await contract_creation_block_async(address, True) ) if to_block is None: to_block = await dank_mids.eth.block_number ranges = list(block_ranges(from_block, to_block, BATCH_SIZE)) if verbose > 0:"fetching %d batches", len(ranges)) batches = await gather( *(_get_logs_async(address, topics, start, end) for start, end in ranges) ) return [log for batch in batches for log in batch]
[docs] async def get_logs_asap_generator( address: Optional[Address], topics: Optional[List[str]] = None, from_block: Optional[Block] = None, to_block: Optional[Block] = None, chronological: bool = True, run_forever: bool = False, run_forever_interval: int = 60, verbose: int = 0, ) -> AsyncGenerator[List[LogReceipt], None]: # sourcery skip: low-code-quality """ Get logs as soon as possible in a generator. This function fetches raw logs from the blockchain within the specified block range and yields them as they are retrieved. The logs are not decoded; use :func:`decode_logs` to decode them if needed. Args: address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. from_block: The starting block to fetch logs from. to_block: The ending block to fetch logs to. chronological: If True, yield logs in chronological order. run_forever: If True, run indefinitely, fetching new logs periodically. run_forever_interval: The interval in seconds to wait between fetches when running forever. verbose: Verbosity level for logging. Yields: Lists of raw logs. Examples: >>> async for logs in get_logs_asap_generator("0x1234...", ["0x5678..."], 1000000, 1000100): ... decoded_logs = decode_logs(logs) See Also: - :func:`decode_logs` """ # NOTE: If you don't need the logs in order, you will get your logs faster if you set `chronological` to False. if from_block is None: from y.contracts import contract_creation_block_async if address is None: from_block = 0 elif isinstance(address, Iterable) and not isinstance(address, str): from_block = await _lowest_deploy_block( address, when_no_history_return_0=True ) else: from_block = await contract_creation_block_async(address, True) if to_block is None: to_block = await dank_mids.eth.block_number elif run_forever: raise TypeError("`to_block` must be None if `run_forever` is True.") if from_block > to_block: raise ValueError( f"from_block must be <= to_block. You passed from_block: {from_block} to_block: {to_block}." ) while True: ranges = list(block_ranges(from_block, to_block, BATCH_SIZE)) if verbose > 0:"fetching %d batches", len(ranges)) coros = [_get_logs_async(address, topics, start, end) for start, end in ranges] if chronological: yielded = 0 done = {} async for i, result in a_sync.as_completed( dict(enumerate(coros)), aiter=True ): done[i] = result for i in range(len(coros)): if yielded > i: continue if i not in done: break yield done.pop(i) yielded += 1 else: for logs in as_completed(coros, timeout=None): yield await logs if not run_forever: return await sleep(run_forever_interval) # Find start and end block for next loop current_block = await dank_mids.eth.block_number while current_block <= to_block: await sleep(run_forever_interval) current_block = await dank_mids.eth.block_number from_block = to_block + 1 if to_block + 1 <= current_block else current_block to_block = current_block
[docs] def logs_to_balance_checkpoints(logs) -> Dict[ChecksumAddress, int]: """ Convert Transfer logs to `{address: {from_block: balance}}` checkpoints. Args: logs: An iterable of logs to convert. Returns: A dictionary mapping addresses to balance checkpoints. Examples: >>> logs = [Log(...), Log(...)] >>> checkpoints = logs_to_balance_checkpoints(logs) >>> print(checkpoints) """ balances = Counter() checkpoints = defaultdict(dict) for block, block_logs in groupby("blockNumber", logs).items(): events = decode_logs(block_logs) for log in events: # ZERO_ADDRESS tracks -totalSupply ( sender, receiver, amount, ) = log.values() # there can be several different aliases balances[sender] -= amount checkpoints[sender][block] = balances[sender] balances[receiver] += amount checkpoints[receiver][block] = balances[receiver] return checkpoints
[docs] def checkpoints_to_weight(checkpoints, start_block: Block, end_block: Block) -> float: """ Calculate the weight of checkpoints between two blocks. Args: checkpoints: A dictionary of checkpoints. start_block: The starting block number. end_block: The ending block number. Returns: The calculated weight. Examples: >>> checkpoints = {0: 100, 10: 200} >>> weight = checkpoints_to_weight(checkpoints, 0, 10) >>> print(weight) """ total = 0 for a, b in zip_longest(list(checkpoints), list(checkpoints)[1:]): if a < start_block or a > end_block: continue b = min(b, end_block) if b else end_block total += checkpoints[a] * (b - a) / (end_block - start_block) return total
[docs] @a_sync.a_sync def _get_logs( address: Optional[ChecksumAddress], topics: Optional[List[str]], start: Block, end: Block, ) -> List[Log]: """ Get logs for a given address, topics, and block range. Args: address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to. Returns: A list of raw logs. Examples: >>> logs = _get_logs("0x1234...", ["0x5678..."], 1000000, 1000100) >>> print(logs) """ if end - start == BATCH_SIZE - 1: response = _get_logs_batch_cached(address, topics, start, end) else: response = _get_logs_no_cache(address, topics, start, end) for log in response: if address and log.address != address: """I have this due to a corrupt cache on my local box that I would prefer not to lose.""" """ It will not impact your scripts. """ response.remove(log) return response
get_logs_semaphore = defaultdict( lambda: dank_mids.BlockSemaphore( ENVS.GETLOGS_DOP, # We need to do this in case users use the sync api in a multithread context name="y.get_logs" + ("" if current_thread() == main_thread() else f".{current_thread()}"), ) ) async def _get_logs_async(address, topics, start, end) -> List[Log]: """ Get logs for a given address, topics, and block range. The result of this function is cached. Args: address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to. Returns: A list of raw logs. Examples: >>> logs = await _get_logs_async("0x1234...", ["0x5678..."], 1000000, 1000100) >>> print(logs) """ async with get_logs_semaphore[get_event_loop()][end]: return await _get_logs(address, topics, start, end, asynchronous=True) @eth_retry.auto_retry async def _get_logs_async_no_cache(address, topics, start, end) -> List[Log]: """ Get logs for a given address, topics, and block range. The result of this function is not cached. Args: address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to. Returns: A list of raw logs. Examples: >>> logs = await _get_logs_async_no_cache("0x1234...", ["0x5678..."], 1000000, 1000100) >>> print(logs) """ args = {"fromBlock": hex(start), "toBlock": hex(end)} if address is None: args["topics"] = topics elif topics is None: args["address"] = address else: args["address"] = address args["topics"] = topics try: return await dank_mids.eth.get_logs(args) except Exception as e: errs = [ "Service Unavailable for url:", "exceed maximum block range", "block range is too wide", "request timed out", "parse error", "method handler crashed", # TypeError # This is some intermittent error I need to debug in dank_mids, I think it occurs when we get rate limited "a bytes-like object is required, not 'NoneType'", ] if all(err not in str(e) for err in errs): raise logger.debug("your node is having trouble, breaking batch in half") batch_size = end - start + 1 if batch_size <= 2: # breaks the logic below and usually just succeeds on retry anyway return await _get_logs_async_no_cache(address, topics, start, end) half_of_batch = batch_size // 2 batch1_end = start + half_of_batch batch2_start = batch1_end + 1 batch1 = await _get_logs_async_no_cache(address, topics, start, batch1_end) batch2 = await _get_logs_async_no_cache(address, topics, batch2_start, end) response = batch1 + batch2 return response @eth_retry.auto_retry def _get_logs_no_cache( address: Optional[ChecksumAddress], topics: Optional[List[str]], start: Block, end: Block, ) -> List[Log]: """ Get logs without using the disk cache. Args: address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to. Returns: A list of raw logs. Examples: >>> logs = _get_logs_no_cache("0x1234...", ["0x5678..."], 1000000, 1000100) >>> print(logs) """ logger.debug("fetching logs %s to %s", start, end) try: if address is None: response = web3.eth.get_logs( {"topics": topics, "fromBlock": start, "toBlock": end} ) elif topics is None: response = web3.eth.get_logs( {"address": address, "fromBlock": start, "toBlock": end} ) else: response = web3.eth.get_logs( { "address": address, "topics": topics, "fromBlock": start, "toBlock": end, } ) except Exception as e: errs = [ "Service Unavailable for url:", "exceed maximum block range", "block range is too wide", "request timed out", "parse error", "method handler crashed", "Log response size exceeded.", ] if any(err in str(e) for err in errs): logger.debug("your node is having trouble, breaking batch in half") batch_size = end - start + 1 half_of_batch = batch_size // 2 batch1_end = start + half_of_batch batch2_start = batch1_end + 1 batch1 = _get_logs_no_cache(address, topics, start, batch1_end) batch2 = _get_logs_no_cache(address, topics, batch2_start, end) response = batch1 + batch2 else: raise return response ############################## # Leaving this here so as to not upset people's caches ############################## @memory.cache() def _get_logs_batch_cached( address: Optional[str], topics: Optional[List[str]], start: Block, end: Block ) -> List[Log]: """ Get logs from the disk cache, or fetch and cache them if not available. Args: address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to. Returns: A list of raw logs. Examples: >>> logs = _get_logs_batch_cached("0x1234...", ["0x5678..."], 1000000, 1000100) >>> print(logs) """ return _get_logs_no_cache(address, topics, start, end)
[docs] class LogFilter(Filter[Log, "LogCache"]): """ A filter for fetching and processing event logs. This class provides methods to fetch logs from the blockchain and process them. """ __slots__ = "addresses", "topics", "from_block"
[docs] def __init__( self, *, addresses=[], topics=[], from_block: Optional[int] = None, chunk_size: int = BATCH_SIZE, chunks_per_batch: Optional[int] = None, semaphore: Optional[dank_mids.BlockSemaphore] = None, executor: Optional[_AsyncExecutorMixin] = None, is_reusable: bool = True, verbose: bool = False, ) -> None: # sourcery skip: default-mutable-arg """ Initialize a LogFilter instance. Args: addresses: List of contract addresses to fetch logs from. topics: List of event topics to filter logs by. from_block: The starting block to fetch logs from. chunk_size: The number of blocks to fetch in each chunk. chunks_per_batch: The number of chunks to fetch in each batch. semaphore: A semaphore for limiting concurrent requests. executor: An executor for running tasks asynchronously. is_reusable: Whether the filter is reusable. verbose: Verbosity level for logging. Examples: >>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs) """ self.addresses = _clean_addresses(addresses) self.topics = topics super().__init__( from_block, chunk_size=chunk_size, chunks_per_batch=chunks_per_batch, semaphore=semaphore, executor=executor, is_reusable=is_reusable, verbose=verbose, )
def __repr__(self) -> str: return f"<{self.__class__.__name__} addresses={self.addresses} topics={self.topics}>" @property def cache(self) -> "LogCache": cache = self._cache if cache is None: from y._db.utils.logs import LogCache cache = LogCache(self.addresses, self.topics) self._cache = cache return cache @property def semaphore(self) -> dank_mids.BlockSemaphore: semaphore = self._semaphore if semaphore is None: semaphore = get_logs_semaphore[get_event_loop()] self._semaphore = semaphore return semaphore
[docs] def logs(self, to_block: Optional[int]) -> a_sync.ASyncIterator[Log]: """ Get logs up to a given block. Args: to_block: The ending block to fetch logs to. Yields: A raw log. Examples: >>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs) """ return self._objects_thru(block=to_block)
@property def insert_to_db(self) -> Callable[[Log], None]: """ Get the function for inserting logs into the database. Raises: NotImplementedError: If this method is not implemented in the subclass. """ # TODO: refactor this out of the base class abc raise NotImplementedError @cached_property def bulk_insert(self) -> Callable[[List[Log]], Awaitable[None]]: """ Get the function for bulk inserting logs into the database. Returns: A function for bulk inserting logs. Examples: >>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> await log_filter.bulk_insert(logs) """ from y._db.utils.logs import bulk_insert executor = self.executor @wraps(bulk_insert) async def bulk_insert_wrapped(*args, **kwargs) -> None: return await bulk_insert(*args, **kwargs, executor=executor) return bulk_insert_wrapped @async_property async def _from_block(self) -> int: """ Get the starting block for fetching logs. Returns: The starting block. Examples: >>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> start_block = await log_filter._from_block >>> print(start_block) """ if self.from_block is None: from y.contracts import contract_creation_block_async if self.addresses is None: self.from_block = 0 elif isinstance(self.addresses, Iterable) and not isinstance( self.addresses, str ): self.from_block = await _lowest_deploy_block( self.addresses, when_no_history_return_0=True ) else: self.from_block = await contract_creation_block_async( self.addresses, when_no_history_return_0=True ) return self.from_block async def _fetch_range(self, range_start: int, range_end: int) -> List[Log]: """ Fetch logs for a given block range. Args: range_start: The starting block of the range. range_end: The ending block of the range. Returns: A list of raw logs. Examples: >>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = await log_filter._fetch_range(1000000, 1000100) >>> print(logs) """ tries = 0 while True: try: return await _get_logs_async_no_cache( self.addresses, self.topics, range_start, range_end ) except ValueError as e: if "parse error" not in str(e) or tries >= 50: raise tries += 1 async def _fetch(self) -> NoReturn: """ Fetch logs indefinitely, starting from the specified block. Examples: >>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> await log_filter._fetch() """ from_block = await self._from_block await self._loop(from_block) __slots__ = "addresses", "topics", "from_block"
[docs] class Events(LogFilter): """ A class for fetching and processing events. This class extends :class:`LogFilter` to provide additional functionality for handling events. """ obj_type = _EventItem
[docs] def events(self, to_block: int) -> a_sync.ASyncIterator[_EventItem]: """ Get events up to a given block. Args: to_block: The ending block to fetch events to. Yields: A decoded event. Examples: >>> events = Events(addresses=["0x1234..."], topics=["0x5678..."]) >>> async for event in ... print(event) """ return self._objects_thru(block=to_block)
async def _extend(self, logs: List[Log]) -> None: """ Extend the list of events with decoded logs. Args: objs: A list of raw event logs. Examples: >>> events = Events(addresses=["0x1234..."], topics=["0x5678..."]) >>> await events._extend(logs) """ if logs: decoded = await, logs) # let the event loop run once since the previous and next lines are potentially blocking await sleep(0) self._objects.extend(decoded) def _get_block_for_obj(self, obj: _EventItem) -> int: """ Get the block number for a given event. Args: obj: The event. Returns: The block number. Examples: >>> events = Events(addresses=["0x1234..."], topics=["0x5678..."]) >>> block_number = events._get_block_for_obj(event) >>> print(block_number) """ return obj.block_number __slots__ = []
[docs] class ProcessedEvents(Events, a_sync.ASyncIterable[T]): """ A class for fetching, processing, and iterating over events. This class extends :class:`Events` to provide additional functionality for processing events. """ def _include_event(self, event: _EventItem) -> Union[bool, Awaitable[bool]]: """Override this to exclude specific events from processing and collection.""" return True @abstractmethod def _process_event(self, event: _EventItem) -> T: """ Process a given event and return the result. Args: event: The event. Returns: The processed event. Examples: >>> processed_events = ProcessedEvents(addresses=["0x1234..."], topics=["0x5678..."]) >>> result = processed_events._process_event(event) >>> print(result) """
[docs] def objects(self, to_block: int) -> a_sync.ASyncIterator[_EventItem]: """ Get an :class:`~a_sync.ASyncIterator` that yields all events up to a given block. Args: to_block: The ending block to fetch events to. Returns: An :class:`~a_sync.ASyncIterator` that yields all included events. Examples: >>> processed_events = ProcessedEvents(addresses=["0x1234..."], topics=["0x5678..."]) >>> async for event in processed_events.objects(1000100): ... print(event) """ return self._objects_thru(block=to_block)
async def _extend(self, logs: List[Log]) -> None: """ Process a new set of logs and extend the list of processed events with the results. Args: logs: A list of raw event logs. Examples: >>> processed_events = ProcessedEvents(addresses=["0x1234..."], topics=["0x5678..."]) >>> await processed_events._extend(logs) """ if logs: decoded = await, logs) # let the event loop run once since the previous and next blocks are potentially blocking await sleep(0) should_include = await gather( *(self.__include_event(event) for event in decoded) ) # let the event loop run once since the previous and next blocks are potentially blocking await sleep(0) append_processed_event = self._objects.append done = 0 for event, include in zip(decoded, should_include): if include: append_processed_event(self._process_event(event)) done += 1 if done % 100 == 0: # Let the event loop run once in case we've been blocking for too long await sleep(0) async def __include_event(self, event: _EventItem) -> bool: """ Determine whether to include a given event in this container. Args: event: The event. Returns: True if the event should be included, False otherwise. Examples: >>> processed_events = ProcessedEvents(addresses=["0x1234..."], topics=["0x5678..."]) >>> include = await processed_events.__include_event(event) >>> print(include) """ # sourcery skip: assign-if-exp include = self._include_event(event) if isinstance(include, bool): return include if isawaitable(include): return bool(await include) return bool(include) __slots__ = []
async def _lowest_deploy_block( addresses: Iterable[AnyAddressType], when_no_history_return_0: bool ) -> Block: """ Get the lowest deployment block for a list of addresses. Args: addresses: A list of contract addresses. when_no_history_return_0: Whether to return 0 if insufficient historical data is available to calculate. Returns: The lowest deployment block. Examples: >>> addresses = ["0x1234...", "0x5678..."] >>> block = await _lowest_deploy_block(addresses, True) >>> print(block) """ from y.contracts import contract_creation_block_async return await contract_creation_block_async, addresses, when_no_history_return_0=when_no_history_return_0, ).min(sync=False)