import asyncio
import json
import logging
import threading
from collections import defaultdict
from typing import Any, Callable, Dict, List, Literal, NewType, Optional, Set, Tuple, Union

import a_sync
import dank_mids
import eth_retry
from brownie import ZERO_ADDRESS, chain, web3
from brownie.exceptions import CompilerError, ContractNotFound
from import (ContractEvents, _add_deployment,
                                      _ContractBase, _DeployedContractBase,
                                      _fetch_from_explorer, _resolve_address)
from brownie.typing import AccountsType
from checksum_dict import ChecksumAddressDict, ChecksumAddressSingletonMeta
from hexbytes import HexBytes
from multicall import Call
from typing_extensions import Self
from web3.exceptions import ContractLogicError

from y import convert, exceptions
from y._decorators import stuck_coro_debugger
from y.datatypes import Address, AnyAddressType, Block
from y.interfaces.ERC20 import ERC20ABI
from y.networks import Network
from y.time import check_node, check_node_async
from y.utils.cache import memory
from import Events
from y.utils.gather import gather_methods

logger = logging.getLogger(__name__)

contract_threads = a_sync.PruningThreadPoolExecutor(16)

# cached Contract instance, saves about 20ms of init time
_contract_lock = threading.Lock()

# These tokens have trouble when resolving the implementation via the chain.
    Network.Mainnet: {
        "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48": "0xa2327a938Febf5FEC13baCFb16Ae10EcBc4cbDCF", # USDC as of 2022-08-10
        "0x3d1E5Cf16077F349e999d6b21A4f646e83Cd90c5": "0xf51fC5ae556F5B8c6dCf50f70167B81ceb02a2b2", # dETH as of 2024-02-15
}.get(, {})

[docs] def Contract_erc20(address: AnyAddressType) -> "Contract": """ Create a Contract instance for an ERC20 token. This function uses the standard ERC20 ABI instead of fetching the contract ABI from the block explorer. Args: address: The address of the ERC20 token. Returns: A Contract instance for the ERC20 token. """ address = convert.to_address(address) return Contract.from_abi('ERC20',address,ERC20ABI)
[docs] def Contract_with_erc20_fallback(address: AnyAddressType) -> "Contract": """ Create a Contract instance for an address, falling back to an ERC20 token if the contract is not verified. Args: address: The address of the contract or ERC20 token. Returns: A Contract instance for the contract address. """ if isinstance(address, Contract): return address address = convert.to_address(address) try: return Contract(address) except exceptions.ContractNotVerified: return Contract_erc20(address)
@memory.cache() #yLazyLogger(logger) def contract_creation_block(address: AnyAddressType, when_no_history_return_0: bool = False) -> int: """ Determine the block when a contract was created using binary search. NOTE Requires access to historical state. Doesn't account for CREATE2 or SELFDESTRUCT. Args: address: The address of the contract. when_no_history_return_0: If True, return 0 when no history is found instead of raising a :class:`~exceptions.NodeNotSynced` exception. Default False. Returns: The block number at which the contract was created. Raises: exceptions.NodeNotSynced: If the node is not fully synced. ValueError: If the contract creation block cannot be determined. """ address = convert.to_address(address) logger.debug("contract creation block %s", address) height = chain.height if height == 0: raise exceptions.NodeNotSynced(_NOT_SYNCED) check_node() lo, hi = 0, height barrier = 0 warned = False while hi - lo > 1: mid = lo + (hi - lo) // 2 # TODO rewrite this so we can get deploy blocks for some contracts deployed on correct side of barrier try: if eth_retry.auto_retry(web3.eth.get_code)(address, mid): hi = mid else: lo = mid except ValueError as e: if 'missing trie node' in str(e) and not warned: logger.warning('missing trie node, `contract_creation_block` may output a higher block than actual. Please try again using an archive node.') elif 'Server error: account aurora does not exist while viewing' in str(e): if not warned: logger.warning(str(e)) elif 'No state available for block' in str(e): if not warned: logger.warning(str(e)) else: raise warned = True barrier = mid lo = mid if hi == lo + 1 == barrier + 1 and when_no_history_return_0: logger.warning('could not determine creation block for %s on %s (deployed prior to barrier)', address, logger.debug("contract creation block %s -> 0", address) return 0 if hi != height: logger.debug("contract creation block %s -> %s", address, hi) return hi raise ValueError(f"Unable to find deploy block for {address} on {}") get_code = eth_retry.auto_retry(dank_mids.eth.get_code) creation_block_semaphore = a_sync.ThreadsafeSemaphore(10)
[docs] @a_sync.a_sync(cache_type='memory') @stuck_coro_debugger @eth_retry.auto_retry async def contract_creation_block_async(address: AnyAddressType, when_no_history_return_0: bool = False) -> int: """ Determine the block when a contract was created using binary search. NOTE Requires access to historical state. Doesn't account for CREATE2 or SELFDESTRUCT. Args: address: The address of the contract. when_no_history_return_0: If True, return 0 when no history is found instead of raising a :class:`~exceptions.NodeNotSynced` exception. Default False. Returns: The block number at which the contract was created. Raises: exceptions.NodeNotSynced: If the node is not fully synced. ValueError: If the contract creation block cannot be determined. """ from y._db.utils import contract as db address = convert.to_address(address) if deploy_block := await db.get_deploy_block(address): return deploy_block logger.debug(f"contract creation block {address}") height = await dank_mids.eth.block_number if height == 0: raise exceptions.NodeNotSynced(_NOT_SYNCED) await check_node_async() lo, hi = 0, height barrier = 0 warned = False while hi - lo > 1: mid = lo + (hi - lo) // 2 # TODO rewrite this so we can get deploy blocks for some contracts deployed on correct side of barrier try: if await get_code(address, mid): hi = mid else: lo = mid except ValueError as e: if 'missing trie node' in str(e): if not warned: logger.warning('missing trie node, `contract_creation_block` may output a higher block than actual. Please try again using an archive node.') elif 'Server error: account aurora does not exist while viewing' in str(e): if not warned: logger.warning(str(e)) elif 'No state available for block' in str(e): if not warned: logger.warning(str(e)) else: raise warned = True barrier = mid lo = mid if hi == lo + 1 == barrier + 1 and when_no_history_return_0: logger.warning(f'could not determine creation block for {address} on {} (deployed prior to barrier)') logger.debug(f"contract creation block {address} -> 0") return 0 if hi != height: logger.debug(f"contract creation block {address} -> {hi}") db.set_deploy_block(address, hi) return hi raise ValueError(f"Unable to find deploy block for {address} on {}")
# this defaultdict prevents congestion in the contracts thread pool address_semaphores = defaultdict(lambda: a_sync.ThreadsafeSemaphore(1))
[docs] class ContractEvents(ContractEvents):
[docs] def __getattr__(self, name: str) -> Events: return super().__getattr__(name)
[docs] class Contract(dank_mids.Contract, metaclass=ChecksumAddressSingletonMeta): """ Though it may look complicated, a ypricemagic Contract object is simply a brownie Contract object with a few modifications: 1. Contracts will not be compiled. This allows you to more quickly fetch contracts from the block explorer and prevents you from having to download and install compilers. 2. To each contract method, a `coroutine` property has been defined which allows you to make asynchronous calls using the following syntax: ```python Contract(0xAddress).methodName.coroutine(*args, block_identifier=12345678) ``` 3. A few attributes were removed in order to minimize the size of a Contract object in memory: - ast, bytecode, coverageMap, deployedBytecode, deployedSourceMap, natspec, opcodes, pcMap 4. There are a few new util methods but they're not officially supported yet and may change without warning: - has_method - has_methods - has_methods_async - build_name - build_name_async - get_code """ # the default state for Contract objects verified = True events: ContractEvents _ChecksumAddressSingletonMeta__instances: ChecksumAddressDict["Contract"]
[docs] @eth_retry.auto_retry def __init__( self, address: AnyAddressType, owner: Optional[AccountsType] = None, require_success: bool = True, cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds ) -> None: address = str(address) if address.lower() in ["0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", ZERO_ADDRESS]: raise ContractNotFound(f"{address} is not a contract.") if require_success and address in _unverified: raise exceptions.ContractNotVerified(address) with _contract_lock: # autofetch-sources: false # Try to fetch the contract from the local sqlite db. try: super().__init__(address, owner=owner) if not isinstance(self.verified, bool) and self.verified is not None: logger.warning(f'`Contract("{address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.') except AssertionError as e: raise CompilerError("y.Contract objects work best when we bypass compilers. In this case, it will *only* work when we bypass. Please ensure autofetch_sources=False in your brownie-config.yaml and rerun your script.") from None except IndexError as e: if str(e) == "pop from an empty deque": raise CompilerError("y.Contract objects work best when we bypass compilers. In this case, it will *only* work when we bypass. Please ensure autofetch_sources=False in your brownie-config.yaml and rerun your script.") from None raise except ValueError as e: if not str(e).startswith("Unknown contract address: "): raise logger.debug(f"{e}") try: name, abi = _resolve_proxy(address) build = {"abi": abi, "address": address, "contractName": name, "type": "contract"} self.__init_from_abi__(build, owner=owner, persist=True) except exceptions.InvalidAPIKeyError: # re-raise with a cleaner traceback raise exceptions.InvalidAPIKeyError from None except (ContractNotFound, exceptions.ContractNotVerified) as e: if isinstance(e, exceptions.ContractNotVerified): _unverified.add(address) if require_success: raise try: if isinstance(e, exceptions.ContractNotVerified): self.verified = False self._build = {"contractName": "Non-Verified Contract"} else: self.verified = None self._build = {"contractName": "Broken Contract"} except AttributeError: logger.warning(f'`Contract("{address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.') # Patch the Contract with coroutines for each method. dank_mids.patch_contract(self) if self.verified: _setup_events(self) # Init an event container for each topic _squeeze(self) # Get rid of unnecessary memory-hog properties self._ttl_cache_popper: Union[Literal["disabled"], int, asyncio.TimerHandle] try: self._ttl_cache_popper = "disabled" if cache_ttl is None else asyncio.get_running_loop().call_later(cache_ttl, self._ChecksumAddressSingletonMeta__instances.pop, self.address, None) except RuntimeError: self._ttl_cache_popper = cache_ttl
[docs] @classmethod @a_sync.a_sync def from_abi( cls, name: str, address: str, abi: List, owner: Optional[AccountsType] = None, persist: bool = True, cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds ) -> Self: """ Create a Contract instance from an ABI. Args: name: The name of the contract. address: The address of the contract. abi: The ABI of the contract. owner (optional): The owner of the contract. Default None. persist (optional): If True, persist the contract in brownie's local contract database. Default True. cache_ttl (optional): The time-to-live for the contract cache in seconds. Default set in :mod:`~y.ENVIRONMENT_VARIABLES`. Returns: A Contract instance for the given ABI. """ self = cls.__new__(cls) build = {"abi": abi, "address": _resolve_address(address), "contractName": name, "type": "contract"} self.__init_from_abi__(build, owner, persist) dank_mids.patch_contract(self) # Patch the Contract with coroutines for each method. _setup_events(self) # Init an event container for each topic _squeeze(self) # Get rid of unnecessary memory-hog properties try: self._ttl_cache_popper = "disabled" if cache_ttl is None else asyncio.get_running_loop().call_later(cache_ttl, cls._ChecksumAddressSingletonMeta__instances.pop, self.address, None) except RuntimeError: self._ttl_cache_popper = cache_ttl return self
[docs] @classmethod async def coroutine( cls, address: AnyAddressType, require_success: bool = True, cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds ) -> Self: address = str(address) if contract := cls._ChecksumAddressSingletonMeta__instances.get(address, None): return contract # dict lookups faster than string comparisons, keep this behind the singleton check if address.lower() in ["0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", ZERO_ADDRESS]: raise ContractNotFound(f"{address} is not a contract.") from None try: # We do this so we don't clog the threadpool with multiple jobs for the same contract. return await _contract_queue(address, require_success=require_success, cache_ttl=cache_ttl) except (ContractNotFound, exceptions._ExplorerError, CompilerError) as e: # re-raise with nicer traceback raise type(e)(*e.args) from None
@classmethod @stuck_coro_debugger async def _coroutine( cls, address: AnyAddressType, require_success: bool = True, cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds ) -> Self: """ Internal method to create a Contract instance asynchronously. Args: address: The address of the contract. require_success: If True, raise an exception if the contract cannot be initialized. cache_ttl: The time-to-live for the contract cache in seconds. Returns: A Contract instance for the given address. """ contract = await, address, require_success=require_success) if not contract.verified or contract._ttl_cache_popper == "disabled": pass elif cache_ttl is None: if isinstance(contract._ttl_cache_popper, asyncio.TimerHandle): contract._ttl_cache_popper.cancel() contract._ttl_cache_popper = "disabled" elif isinstance(contract._ttl_cache_popper, int): cache_ttl = max(contract._ttl_cache_popper, cache_ttl) contract._ttl_cache_popper = asyncio.get_running_loop().call_later(cache_ttl, cls._ChecksumAddressSingletonMeta__instances.pop, contract.address, None) elif asyncio.get_running_loop().time() + cache_ttl > contract._ttl_cache_popper.when(): contract._ttl_cache_popper.cancel() contract._ttl_cache_popper = asyncio.get_running_loop().call_later(cache_ttl, cls._ChecksumAddressSingletonMeta__instances.pop, contract.address, None) return contract def __init_from_abi__(self, build: Dict, owner: Optional[AccountsType] = None, persist: bool = True) -> None: """ Initialize a Contract instance from an ABI. Args: build: The build information for the contract. owner (optional): The owner of the contract. Default None. persist (optional): If True, persist the contract in the local database. Default True. Returns: The initialized Contract instance. """ _ContractBase.__init__(self, None, build, {}) # type: ignore _DeployedContractBase.__init__(self, build['address'], owner, None) if persist: _add_deployment(self) try: self.verified = True except AttributeError: logger.warning(f'`Contract("{self.address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.') return self
[docs] def has_method(self, method: str, return_response: bool = False) -> Union[bool,Any]: """ Check if the contract has a specific method. Args: method: The name of the method to check for. return_response (optional): If True, return the response of the method call instead of a boolean. Default False. Returns: A boolean indicating whether the contract has the method, or the response of the method call if return_response is True. """ return has_method(self.address, method, return_response=return_response, sync=False)
[docs] async def has_methods( self, methods: List[str], _func: Union[any, all] = all ) -> bool: """ Check if the contract has all the specified methods. Args: methods: A list of method names to check for. _func (optional): The function to use for combining the results (either :func:`all` or :func:`any`). Default :func:`all`. Returns: A boolean indicating whether the contract has all the specified methods. """ return await has_methods(self.address, methods, _func, sync=False)
[docs] async def build_name(self, return_None_on_failure: bool = False) -> Optional[str]: """ Get the build name of the contract. Args: return_None_on_failure (optional): If True, return None if the build name cannot be determined instead of raising an Exception. Default False. Returns: The build name of the contract, or None if the build name cannot be determined and return_None_on_failure is True. """ return await build_name(self.address, return_None_on_failure=return_None_on_failure, sync=False)
[docs] async def get_code(self, block: Optional[Block] = None) -> HexBytes: """ Get the bytecode of the contract at a specific block. Args: block (optional): The block number at which to get the bytecode. Defaults to latest block. Returns: The bytecode of the contract at the specified block. """ return await get_code(self.address, block=block)
_contract_queue = a_sync.SmartProcessingQueue(Contract._coroutine, num_workers=32) @memory.cache() # TODO: async this and put it into ydb for quicker startups #yLazyLogger(logger) def is_contract(address: AnyAddressType) -> bool: ''' Checks to see if the input address is a contract. Returns `False` if: - The address is not and has never been a contract - The address used to be a contract but has self-destructed ''' address = convert.to_address(address) return web3.eth.get_code(address) not in ['0x',b'']
[docs] @a_sync.a_sync(default='sync', cache_type='memory') async def has_method(address: Address, method: str, return_response: bool = False) -> Union[bool,Any]: ''' Checks to see if a contract has a `method` view method with no inputs. `return_response=True` will return `response` in bytes if `response` else `False` ''' address = convert.to_address(address) try: response = await Call(address, [method]) return False if response is None else response if return_response else True except Exception as e: if isinstance(e, ContractLogicError) or exceptions.call_reverted(e): return False raise
[docs] @stuck_coro_debugger @a_sync.a_sync(default='sync', cache_type='memory', ram_cache_ttl=15*60) async def has_methods( address: AnyAddressType, methods: Tuple[str], _func: Callable = all # Union[any, all] ) -> bool: ''' Checks to see if a contract has each view method (with no inputs) in `methods`. Pass `at_least_one=True` to only verify a contract has at least one of the methods. ''' assert _func in [all, any], '`_func` must be either `any` or `all`' address = convert.to_address(address) try: return _func([result is not None for result in await gather_methods(address, methods)]) except Exception as e: if not isinstance(e, ContractLogicError) and not exceptions.call_reverted(e): raise # and not out_of_gas(e): raise # Out of gas error implies one or more method is state-changing. # If `_func == all` we return False because `has_methods` is only supposed to work for public view methods with no inputs # If `_func == any` maybe one of the methods will work without "out of gas" error return False if _func == all else any(await asyncio.gather(*[has_method(address, method, sync=False) for method in methods]))
[docs] @stuck_coro_debugger async def probe( address: AnyAddressType, methods: List[str], block: Optional[Block] = None, return_method: bool = False ) -> Any: address = convert.to_address(address) results = await gather_methods(address, methods, block=block, return_exceptions=True) logger.debug('probe results: %s', results) results = [(method, result) for method, result in zip(methods, results) if not isinstance(result, Exception) and result is not None] if len(results) not in [1,0]: logger.debug('multiple results: %s', results) if len(results) != 2 or results[0][1] != results[1][1]: raise AssertionError(f'`probe` returned multiple results for {address}: {results}. Must debug') method = results[0][0], results[1][0] result = results[0][1] results = [(method, result)] logger.debug('final results: %s', results) method, result = results[0] if len(results) == 1 else (None, None) if method: assert result is not None return (method, result) if return_method else result
[docs] @a_sync.a_sync(default='sync') @stuck_coro_debugger async def build_name(address: AnyAddressType, return_None_on_failure: bool = False) -> str: """ Get the build name of a contract. Args: address: The address of the contract. return_None_on_failure (optional): If True, return None if the build name cannot be determined instead of raising an Exception. Default False. Returns: The build name of the contract, or None if the build name cannot be determined and return_None_on_failure is True. """ try: contract = await Contract.coroutine(address) return contract.__dict__['_build']['contractName'] except exceptions.ContractNotVerified: if not return_None_on_failure: raise return None
[docs] async def proxy_implementation(address: AnyAddressType, block: Optional[Block]) -> Address: """ Get the implementation address for a proxy contract. Args: address: The address of the proxy contract. block: The block number at which to get the implementation address. Defaults to latest block. Returns: The address of the implementation contract. """ return await probe(address, ['implementation()(address)','target()(address)'], block)
def _squeeze(contract: Contract) -> Contract: """ Reduce the contract size in RAM by removing large data structures from the build. Args: contract: The contract object to squeeze. Returns: The squeezed contract object. """ for k in ["ast", "bytecode", "coverageMap", "deployedBytecode", "deployedSourceMap", "natspec", "opcodes", "pcMap"]: if contract._build and k in contract._build.keys(): contract._build[k] = {} return contract @eth_retry.auto_retry def _extract_abi_data(address: Address): """ Extract ABI data for a contract from the blockchain explorer. Args: address: The address of the contract. Returns: A tuple containing the contract name, ABI, and implementation address (if applicable). Raises: Various exceptions based on the API response and contract status. """ try: data = _fetch_from_explorer(address, "getsourcecode", False)["result"][0] except ConnectionError as e: if '{"message":"Something went wrong.","result":null,"status":"0"}' in str(e): if == Network.xDai: raise ValueError('Rate limited by Blockscout. Please try again.') from e if web3.eth.get_code(address): raise exceptions.ContractNotVerified(address) from e else: raise ContractNotFound(address) from e raise except ValueError as e: if str(e).startswith("Failed to retrieve data from API") and "invalid api key" in str(e).lower(): raise exceptions.InvalidAPIKeyError from e if exceptions.contract_not_verified(e): raise exceptions.ContractNotVerified(f'{address} on {Network.printable()}') from e elif "Unknown contract address:" in str(e): exc_type = exceptions.ContractNotVerified if is_contract(address) else ContractNotFound raise exc_type(str(e)) from e else: raise is_verified = bool(data.get("SourceCode")) if not is_verified: raise exceptions.ContractNotVerified(f"Contract source code not verified: {address}") from None name = data["ContractName"] abi = json.loads(data["ABI"]) implementation = data.get("Implementation") return name, abi, implementation def _resolve_proxy(address) -> Tuple[str, List]: """ Resolve the implementation address for a proxy contract. Args: address: The address of the proxy contract. Returns: A tuple containing the contract name and ABI. """ address = convert.to_address(address) name, abi, implementation = _extract_abi_data(address) as_proxy_for = None if address in FORCE_IMPLEMENTATION: implementation = FORCE_IMPLEMENTATION[address] name, implementation_abi, _ = _extract_abi_data(implementation) # Here we merge the proxy ABI with the implementation ABI # without doing this, we'd only get the implementation # and would lack any valid methods/events from the proxy itself. # Credit: Wavey@Yearn abi += implementation_abi return name, abi # always check for an EIP1967 proxy - implementation_eip1967 = web3.eth.get_storage_at( address, int(web3.keccak(text="eip1967.proxy.implementation").hex(), 16) - 1 ) # always check for an EIP1822 proxy - implementation_eip1822 = web3.eth.get_storage_at(address, web3.keccak(text="PROXIABLE")) # Just leave this code where it is for a helpful debugger as needed. if address == "": raise Exception( f"""implementation: {implementation} implementation_eip1967: {len(implementation_eip1967)} {implementation_eip1967} implementation_eip1822: {len(implementation_eip1822)} {implementation_eip1822}""") if len(implementation_eip1967) > 0 and int(implementation_eip1967.hex(), 16): as_proxy_for = _resolve_address(implementation_eip1967[-20:]) elif len(implementation_eip1822) > 0 and int(implementation_eip1822.hex(), 16): as_proxy_for = _resolve_address(implementation_eip1822[-20:]) elif implementation: # for other proxy patterns, we only check if etherscan indicates # the contract is a proxy. otherwise we could have a false positive # if there is an `implementation` method on a regular contract. try: # first try to call `implementation` per EIP897 # c = Contract.from_abi(name, address, abi) as_proxy_for = except Exception: # if that fails, fall back to the address provided by etherscan as_proxy_for = _resolve_address(implementation) if as_proxy_for: name, abi, _ = _extract_abi_data(as_proxy_for) return name, abi def _setup_events(contract: Contract) -> None: """ Helper function used to init contract event containers on a newly created `y.Contract` object. Args: contract: The contract object to set up events for. """ if not hasattr(contract, 'events'): = ContractEvents(contract) for k, v in contract.topics.items(): setattr(, k, Events(addresses=[contract.address], topics=[[v]])) _Address = NewType("_Address", str) _unverified: Set[_Address] = set() """A collection of unverified addresses that is used to prevent repetitive etherscan api calls""" _NOT_SYNCED = "`chain.height` returns 0 on your node, which means it is not fully synced.\nYou can only use this function on a fully synced node."