Source code for y.contracts

import asyncio
import logging
import os
import threading
import warnings
from collections import defaultdict
from functools import lru_cache
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Literal,
    NewType,
    Optional,
    Set,
    Tuple,
    Union,
)
from urllib.parse import urlparse

import a_sync
import aiohttp
import dank_mids
import eth_retry
from async_lru import alru_cache
from brownie import ZERO_ADDRESS, chain, web3
from brownie._config import CONFIG, REQUEST_HEADERS
from brownie.exceptions import (
    BrownieEnvironmentWarning,
    CompilerError,
    ContractNotFound,
)
from brownie.network.contract import (
    ContractEvents,
    _add_deployment,
    _ContractBase,
    _DeployedContractBase,
    _explorer_tokens,
    _fetch_from_explorer,
    _resolve_address,
    _unverified_addresses,
)
from brownie.network.state import _get_deployment
from brownie.typing import AccountsType
from brownie.utils import color
from cachetools.func import ttl_cache
from checksum_dict import ChecksumAddressSingletonMeta
from hexbytes import HexBytes
from msgspec.json import decode
from multicall import Call
from typing_extensions import Self
from web3.exceptions import ContractLogicError

from y import ENVIRONMENT_VARIABLES as ENVS
from y import convert, exceptions
from y._decorators import stuck_coro_debugger
from y.datatypes import Address, AnyAddressType, Block
from y.interfaces.ERC20 import ERC20ABI
from y.networks import Network
from y.time import check_node, check_node_async
from y.utils.cache import memory
from y.utils.events import Events
from y.utils.gather import gather_methods

logger = logging.getLogger(__name__)

_CHAINID = chain.id

_brownie_deployments_db_lock = threading.Lock()
_contract_locks = defaultdict(asyncio.Lock)

# These tokens have trouble when resolving the implementation via the chain.
FORCE_IMPLEMENTATION = {
    Network.Mainnet: {
        "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48": "0xa2327a938Febf5FEC13baCFb16Ae10EcBc4cbDCF",  # USDC as of 2022-08-10
        "0x3d1E5Cf16077F349e999d6b21A4f646e83Cd90c5": "0xf51fC5ae556F5B8c6dCf50f70167B81ceb02a2b2",  # dETH as of 2024-02-15
    },
}.get(_CHAINID, {})


[docs] def Contract_erc20(address: AnyAddressType) -> "Contract": """ Create a :class:`~Contract` instance for an ERC20 token. This function uses the standard ERC20 ABI instead of fetching the contract ABI from the block explorer. Args: address: The address of the ERC20 token. Returns: A :class:`~Contract` instance for the ERC20 token. Example: >>> contract = Contract_erc20("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> contract.name() 'Dai Stablecoin' """ address = convert.to_address(address) return Contract.from_abi("ERC20", address, ERC20ABI)
[docs] def Contract_with_erc20_fallback(address: AnyAddressType) -> "Contract": """ Create a :class:`~Contract` instance for an address, falling back to an ERC20 token if the contract is not verified. Args: address: The address of the contract or ERC20 token. Returns: A :class:`~Contract` instance for the contract address. Example: >>> contract = Contract_with_erc20_fallback("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> contract.symbol() 'DAI' """ if isinstance(address, Contract): return address address = convert.to_address(address) try: return Contract(address) except exceptions.ContractNotVerified: return Contract_erc20(address)
@memory.cache() # yLazyLogger(logger) @eth_retry.auto_retry def contract_creation_block( address: AnyAddressType, when_no_history_return_0: bool = False ) -> int: """ Determine the block when a contract was created using binary search. NOTE Requires access to historical state. Doesn't account for CREATE2 or SELFDESTRUCT. Args: address: The address of the contract. when_no_history_return_0: If True, return 0 when no history is found instead of raising a :class:`~exceptions.NodeNotSynced` exception. Default False. Raises: exceptions.NodeNotSynced: If the node is not fully synced. ValueError: If the contract creation block cannot be determined. Example: >>> block = contract_creation_block("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(block) 1234567 """ address = convert.to_address(address) logger.debug("contract creation block %s", address) height = chain.height if height == 0: raise exceptions.NodeNotSynced(_NOT_SYNCED) check_node() lo, hi = 0, height barrier = 0 warned = False while hi - lo > 1: mid = lo + (hi - lo) // 2 # TODO rewrite this so we can get deploy blocks for some contracts deployed on correct side of barrier try: if _get_code(address, mid): hi = mid else: lo = mid except ValueError as e: if "missing trie node" in str(e) and not warned: logger.warning( "missing trie node, `contract_creation_block` may output a higher block than actual. Please try again using an archive node." ) elif "Server error: account aurora does not exist while viewing" in str(e): if not warned: logger.warning(str(e)) elif "No state available for block" in str(e): if not warned: logger.warning(str(e)) else: raise warned = True barrier = mid lo = mid if hi == lo + 1 == barrier + 1 and when_no_history_return_0: logger.warning( "could not determine creation block for %s on %s (deployed prior to barrier)", address, Network.name(), ) logger.debug("contract creation block %s -> 0", address) return 0 if hi != height: logger.debug("contract creation block %s -> %s", address, hi) return hi raise ValueError(f"Unable to find deploy block for {address} on {Network.name()}") get_code = eth_retry.auto_retry(dank_mids.eth.get_code) @memory.cache @eth_retry.auto_retry def _get_code(address: str, block: int) -> HexBytes: return web3.eth.get_code(address, block) creation_block_semaphore = a_sync.ThreadsafeSemaphore(10)
[docs] @a_sync.a_sync(cache_type="memory") @stuck_coro_debugger @eth_retry.auto_retry async def contract_creation_block_async( address: AnyAddressType, when_no_history_return_0: bool = False ) -> int: # sourcery skip: merge-duplicate-blocks, remove-redundant-if """ Determine the block when a contract was created using binary search. NOTE Requires access to historical state. Doesn't account for CREATE2 or SELFDESTRUCT. Args: address: The address of the contract. when_no_history_return_0: If True, return 0 when no history is found instead of raising a :class:`~exceptions.NodeNotSynced` exception. Default False. Raises: exceptions.NodeNotSynced: If the node is not fully synced. ValueError: If the contract creation block cannot be determined. Example: >>> block = await contract_creation_block_async("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(block) 1234567 """ from y._db.utils import contract as db address = await convert.to_address_async(address) if deploy_block := await db.get_deploy_block(address): return deploy_block logger.debug(f"contract creation block {address}") height = await dank_mids.eth.block_number if height == 0: raise exceptions.NodeNotSynced(_NOT_SYNCED) await check_node_async() lo, hi = 0, height barrier = 0 warned = False while hi - lo > 1: mid = lo + (hi - lo) // 2 # TODO rewrite this so we can get deploy blocks for some contracts deployed on correct side of barrier try: if await get_code(address, mid): hi = mid else: lo = mid except ValueError as e: if "missing trie node" in str(e): if not warned: logger.warning( "missing trie node, `contract_creation_block` may output a higher block than actual. Please try again using an archive node." ) elif "Server error: account aurora does not exist while viewing" in str(e): if not warned: logger.warning(str(e)) elif "No state available for block" in str(e): if not warned: logger.warning(str(e)) else: raise warned = True barrier = mid lo = mid if hi == lo + 1 == barrier + 1 and when_no_history_return_0: logger.warning( f"could not determine creation block for {address} on {Network.name()} (deployed prior to barrier)" ) logger.debug(f"contract creation block {address} -> 0") return 0 if hi != height: logger.debug(f"contract creation block {address} -> {hi}") db.set_deploy_block(address, hi) return hi raise ValueError(f"Unable to find deploy block for {address} on {Network.name()}")
# this defaultdict prevents congestion in the contracts thread pool address_semaphores = defaultdict(lambda: a_sync.ThreadsafeSemaphore(1))
[docs] class ContractEvents(ContractEvents):
[docs] def __getattr__(self, name: str) -> Events: return super().__getattr__(name)
[docs] class CompilerError(Exception):
[docs] def __init__(self): super().__init__( "y.Contract objects work best when we bypass compilers.\n" "In this case, it will *only* work when we bypass.\n" "Please ensure autofetch_sources=False in your brownie-config.yaml and rerun your script." )
[docs] class Contract(dank_mids.Contract, metaclass=ChecksumAddressSingletonMeta): """ A :class:`~dank_mids.Contract` object with several modifications for enhanced functionality. This class provides a modified contract object with additional features and optimizations: 1. Contracts will not be compiled. This allows you to more quickly fetch contracts from the block explorer and prevents you from having to download and install compilers. NOTE: You must set `autofetch_sources=false` in your project's brownie-config.yaml for this to work correctly. 2. To each contract method, a `coroutine` property has been defined which allows you to make asynchronous calls. This is enabled by inheriting from :class:`~dank_mids.Contract`, which provides the `coroutine` method for each :class:`~dank_mids.ContractCall` object. These asynchronous calls are intelligently batched in the background by :mod:`dank_mids` to reduce overhead. 3. New methods: - :meth:`~has_method` - :meth:`~has_methods` - :meth:`~build_name` - :meth:`~get_code` 4. A few attributes were removed in order to minimize the size of a Contract object in memory: - :attr:`~ast` - :attr:`~bytecode` - :attr:`~coverageMap` - :attr:`~deployedBytecode` - :attr:`~deployedSourceMap` - :attr:`~natspec` - :attr:`~opcodes` - :attr:`~pcMap` Examples: >>> contract = Contract("0xAddress") >>> contract.methodName(*args, block_identifier=12345678) 1000000000000000000 >>> coro = contract.methodName.coroutine(*args, block_identifier=12345678) >>> coro <coroutine coroutine object at 0x12345678> >>> contract.methodName(*args, block_identifier=12345678) == await coro True See Also: - :class:`~dank_mids.Contract` - :mod:`dank_mids` """ # the default state for Contract objects verified = True """True if the contract is verified on this network's block explorer. False otherwise.""" events: ContractEvents """ A container for various event types associated with this contract. Provides a convenient way to query contract events with minimal code. """ _ttl_cache_popper: Union[Literal["disabled"], int, asyncio.TimerHandle]
[docs] @eth_retry.auto_retry def __init__( self, address: AnyAddressType, owner: Optional[AccountsType] = None, require_success: bool = True, cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds ) -> None: """ Initialize a :class:`~Contract` instance. Note: autofetch-sources: false Args: address: The address of the contract. owner (optional): The owner of the contract. Default None. require_success (optional): If True, require successful contract verification. Default True. cache_ttl (optional): The time-to-live for the contract cache in seconds. Default set in :mod:`~y.ENVIRONMENT_VARIABLES`. Raises: ContractNotFound: If the address is not a contract. exceptions.ContractNotVerified: If the contract is not verified and require_success is True. """ address = str(address) if address.lower() in [ "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", ZERO_ADDRESS, ]: raise ContractNotFound(f"{address} is not a contract.") if require_success and address in _unverified_addresses: raise exceptions.ContractNotVerified(address) try: # Try to fetch the contract from the local sqlite db. with _brownie_deployments_db_lock: super().__init__(address, owner=owner) except (AssertionError, IndexError) as e: if str(e) == "pop from an empty deque" or isinstance(e, AssertionError): raise CompilerError from None raise except ValueError as e: logger.debug(f"{e}") if not str(e).startswith("Unknown contract address: "): raise else: # Nice, we got it from the db. if not isinstance(self.verified, bool) and self.verified is not None: logger.warning( f'`Contract("{address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.' ) # schedule call to pop from cache self._schedule_cache_pop(cache_ttl) return # The contract does not exist in your local brownie deployments.db try: name, abi = _resolve_proxy(address) build = { "abi": abi, "address": address, "contractName": name, "type": "contract", } self.__init_from_abi__(build, owner=owner, persist=True) self.__post_init__(cache_ttl) except (ContractNotFound, exceptions.ContractNotVerified) as e: if isinstance(e, exceptions.ContractNotVerified): _unverified_addresses.add(address) if require_success: raise try: if isinstance(e, exceptions.ContractNotVerified): self.verified = False self._build = {"contractName": "Non-Verified Contract"} else: self.verified = None self._build = {"contractName": "Broken Contract"} except AttributeError: logger.warning( f'`Contract("{address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.' )
[docs] @classmethod @a_sync.a_sync def from_abi( cls, name: str, address: str, abi: List, owner: Optional[AccountsType] = None, persist: bool = True, cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds ) -> Self: """ Create a :class:`~Contract` instance from an ABI. Args: name: The name of the contract. address: The address of the contract. abi: The ABI of the contract. owner (optional): The owner of the contract. Default None. persist (optional): If True, persist the contract in brownie's local contract database. Default True. cache_ttl (optional): The time-to-live for the contract cache in seconds. Default set in :mod:`~y.ENVIRONMENT_VARIABLES`. Example: >>> abi = [{"constant":True,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":False,"stateMutability":"view","type":"function"}] >>> contract = Contract.from_abi("MyContract", "0x6B175474E89094C44Da98b954EedeAC495271d0F", abi) >>> contract.name() 'Dai Stablecoin' """ self = cls.__new__(cls) build = { "abi": abi, "address": _resolve_address(address), "contractName": name, "type": "contract", } self.__init_from_abi__(build, owner, persist) self.__post_init__(cache_ttl) return self
[docs] @classmethod @stuck_coro_debugger async def coroutine( cls, address: AnyAddressType, owner: Optional[AccountsType] = None, persist: bool = True, require_success: bool = True, cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds ) -> Self: """ Create a :class:`~Contract` instance asynchronously. Args: address: The address of the contract. owner (optional): The owner of the contract. Default None. persist (optional): If True, persist the contract in brownie's local contract database. Default True. require_success (optional): If True, require successful contract verification. Default True. cache_ttl (optional): The time-to-live for the contract cache in seconds. Default set in :mod:`~y.ENVIRONMENT_VARIABLES`. Example: >>> contract = await Contract.coroutine("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> contract.symbol() 'DAI' """ address = str(address) if contract := cls.get_instance(address): return contract # dict lookups faster than string comparisons, keep this behind the singleton check if address.lower() in [ "0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee", ZERO_ADDRESS, ]: raise ContractNotFound(f"{address} is not a contract.") from None contract = cls.__new__(cls) build, _ = await _get_deployment_from_db(address) if build: async with _contract_locks[address]: # now that we're inside the lock, check and see if another coro populated the cache if cache_value := cls.get_instance(address): return cache_value # nope, continue contract.__init_from_abi__(build, owner=owner, persist=False) contract.__post_init__(cache_ttl) elif not CONFIG.active_network.get("explorer"): raise ValueError(f"Unknown contract address: '{address}'") else: try: # The contract does not exist in your local brownie deployments.db name, abi = await _resolve_proxy_async(address) except (ContractNotFound, exceptions.ContractNotVerified) as e: if not_verified := isinstance(e, exceptions.ContractNotVerified): _unverified_addresses.add(address) if require_success: raise try: contract.verified = False if not_verified else None except AttributeError: logger.warning( '`Contract("%s").verified` property will not be usable due to the contract having a `verified` method in its ABI.', address, ) contract._build = { "contractName": ( "Non-Verified Contract" if not_verified else "Broken Contract" ) } else: async with _contract_locks[address]: # now that we're inside the lock, check and see if another coro populated the cache if cache_value := cls.get_instance(address): return cache_value # nope, continue build = { "abi": abi, "address": address, "contractName": name, "type": "contract", } contract.__init_from_abi__(build, owner=owner, persist=persist) contract.__post_init__(cache_ttl) # Cache manually since we aren't calling init cls[address] = contract # keep the dict small, we cache Contract instances so we won't need these in the future _contract_locks.pop(address, None) if not contract.verified or contract._ttl_cache_popper == "disabled": pass elif cache_ttl is None: if isinstance(contract._ttl_cache_popper, asyncio.TimerHandle): contract._ttl_cache_popper.cancel() contract._ttl_cache_popper = "disabled" elif isinstance(contract._ttl_cache_popper, int): cache_ttl = max(contract._ttl_cache_popper, cache_ttl) contract._ttl_cache_popper = asyncio.get_running_loop().call_later( cache_ttl, cls.delete_instance, contract.address, ) elif ( loop := asyncio.get_running_loop() ).time() + cache_ttl > contract._ttl_cache_popper.when(): contract._ttl_cache_popper.cancel() contract._ttl_cache_popper = loop.call_later( cache_ttl, cls.delete_instance, contract.address, ) return contract
def __init_from_abi__( self, build: Dict, owner: Optional[AccountsType] = None, persist: bool = True ) -> None: """ Initialize a :class:`~Contract` instance from an ABI. Args: build: The build information for the contract. owner (optional): The owner of the contract. Default None. persist (optional): If True, persist the contract in the local database. Default True. Example: >>> abi = [{"constant":True,"inputs":[],"name":"name","outputs":[{"name":"","type":"string"}],"payable":False,"stateMutability":"view","type":"function"}] >>> build = {"abi": abi, "address": "0x6B175474E89094C44Da98b954EedeAC495271d0F", "contractName": "MyContract", "type": "contract"} >>> contract = Contract.__new__(Contract) >>> contract.__init_from_abi__(build) >>> contract.name() 'Dai Stablecoin' """ _ContractBase.__init__(self, None, build, {}) # type: ignore _DeployedContractBase.__init__(self, build["address"], owner, None) if persist: _add_deployment(self) try: self.verified = True except AttributeError: logger.warning( f'`Contract("{self.address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.' ) return self
[docs] def has_method( self, method: str, return_response: bool = False ) -> Union[bool, Any]: """ Check if the contract has a specific method. Args: method: The name of the method to check for. return_response (optional): If True, return the response of the method call instead of a boolean. Default False. Example: >>> contract = Contract("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> contract.has_method("name()") True """ return has_method( self.address, method, return_response=return_response, sync=False )
[docs] async def has_methods( self, methods: List[str], _func: Union[any, all] = all ) -> bool: """ Check if the contract has all the specified methods. Args: methods: A list of method names to check for. _func (optional): The function to use for combining the results (either :func:`all` or :func:`any`). Default :func:`all`. Example: >>> contract = Contract("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> await contract.has_methods(["name()", "symbol()"]) True """ return await has_methods(self.address, methods, _func, sync=False)
[docs] async def build_name(self, return_None_on_failure: bool = False) -> Optional[str]: """ Get the build name of the contract. Args: return_None_on_failure (optional): If True, return None if the build name cannot be determined instead of raising an Exception. Default False. Example: >>> contract = Contract("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> await contract.build_name() 'Dai Stablecoin' """ return await build_name( self.address, return_None_on_failure=return_None_on_failure, sync=False )
[docs] async def get_code(self, block: Optional[Block] = None) -> HexBytes: """ Get the bytecode of the contract at a specific block. Args: block (optional): The block number at which to get the bytecode. Defaults to latest block. Example: >>> contract = Contract("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> await contract.get_code() HexBytes('0x...') """ return await get_code(self.address, block=block)
def _schedule_cache_pop(self, cache_ttl: Optional[int]) -> None: if cache_ttl is None: self._ttl_cache_popper = "disabled" return try: loop = asyncio.get_running_loop() except RuntimeError: # If the event loop isn't running yet we can just specify the cache_ttl for later use self._ttl_cache_popper = cache_ttl return self._ttl_cache_popper = loop.call_later( cache_ttl, type(self).delete_instance, self.address, )
[docs] def __post_init__(self, cache_ttl: Optional[int] = None) -> None: super().__post_init__() # Init an event container for each topic _setup_events(self) # Get rid of unnecessary memory-hog properties _squeeze(self) # schedule call to pop from cache self._schedule_cache_pop(cache_ttl)
@memory.cache() # TODO: async this and put it into ydb for quicker startups # yLazyLogger(logger) def is_contract(address: AnyAddressType) -> bool: """ Checks to see if the input address is a contract. Returns `False` if: - The address is not and has never been a contract - The address used to be a contract but has self-destructed Args: address: The address to check. Example: >>> is_contract("0x6B175474E89094C44Da98b954EedeAC495271d0F") True """ address = convert.to_address(address) return web3.eth.get_code(address) not in ["0x", b""]
[docs] @a_sync.a_sync(default="sync", cache_type="memory") async def has_method( address: Address, method: str, return_response: bool = False ) -> Union[bool, Any]: """ Checks to see if a contract has a `method` view method with no inputs. `return_response=True` will return `response` in bytes if `response` else `False` Args: address: The address of the contract. method: The name of the method to check for. return_response: If True, return the response of the method call instead of a boolean. Default False. Example: >>> await has_method("0x6B175474E89094C44Da98b954EedeAC495271d0F", "name()") True """ address = await convert.to_address_async(address) try: response = await Call(address, [method]) return False if response is None else response if return_response else True except Exception as e: if ( isinstance(e, ContractLogicError) or exceptions.call_reverted(e) or "invalid jump destination" in str(e) ): return False raise
[docs] @stuck_coro_debugger @a_sync.a_sync(default="sync", cache_type="memory", ram_cache_ttl=15 * 60) async def has_methods( address: AnyAddressType, methods: Tuple[str], _func: Callable = all, # Union[any, all] ) -> bool: """ Checks to see if a contract has each view method (with no inputs) in `methods`. Pass `at_least_one=True` to only verify a contract has at least one of the methods. Args: address: The address of the contract. methods: A tuple of method names to check for. _func: The function to use for combining the results (either :func:`all` or :func:`any`). Example: >>> await has_methods("0x6B175474E89094C44Da98b954EedeAC495271d0F", ("name()", "symbol()")) True """ assert _func in [all, any], "`_func` must be either `any` or `all`" address = await convert.to_address_async(address) try: return _func( [result is not None for result in await gather_methods(address, methods)] ) except Exception as e: if not isinstance(e, ContractLogicError) and not exceptions.call_reverted(e): raise # and not out_of_gas(e): raise # Out of gas error implies one or more method is state-changing. # If `_func == all` we return False because `has_methods` is only supposed to work for public view methods with no inputs # If `_func == any` maybe one of the methods will work without "out of gas" error return ( False if _func == all else any( await asyncio.gather( *[has_method(address, method, sync=False) for method in methods] ) ) )
# yLazyLogger(logger)
[docs] @stuck_coro_debugger async def probe( address: AnyAddressType, methods: List[str], block: Optional[Block] = None, return_method: bool = False, ) -> Any: address = await convert.to_address_async(address) results = await gather_methods( address, methods, block=block, return_exceptions=True ) logger.debug("probe results: %s", results) results = [ (method, result) for method, result in zip(methods, results) if not isinstance(result, Exception) and result is not None ] if len(results) not in [1, 0]: logger.debug("multiple results: %s", results) if len(results) != 2 or results[0][1] != results[1][1]: raise AssertionError( f"`probe` returned multiple results for {address}: {results}. Must debug" ) method = results[0][0], results[1][0] result = results[0][1] results = [(method, result)] logger.debug("final results: %s", results) method, result = results[0] if len(results) == 1 else (None, None) if method: assert result is not None return (method, result) if return_method else result
[docs] @a_sync.a_sync(default="sync") @stuck_coro_debugger async def build_name( address: AnyAddressType, return_None_on_failure: bool = False ) -> str: """ Get the build name of a contract. Args: address: The address of the contract. return_None_on_failure (optional): If True, return None if the build name cannot be determined instead of raising an Exception. Default False. Example: >>> await build_name("0x6B175474E89094C44Da98b954EedeAC495271d0F") 'Dai Stablecoin' """ try: contract = await Contract.coroutine(address) return contract.__dict__["_build"]["contractName"] except exceptions.ContractNotVerified: if not return_None_on_failure: raise return None
[docs] async def proxy_implementation( address: AnyAddressType, block: Optional[Block] ) -> Address: """ Get the implementation address for a proxy contract. Args: address: The address of the proxy contract. block: The block number at which to get the implementation address. Defaults to latest block. Example: >>> await proxy_implementation("0x6B175474E89094C44Da98b954EedeAC495271d0F") '0x1234567890abcdef1234567890abcdef12345678' """ return await probe( address, ["implementation()(address)", "target()(address)"], block )
_get_deployment_from_db = a_sync.SmartProcessingQueue( lambda address: ENVS.CONTRACT_THREADS.run(_get_deployment, address), num_workers=64, ) def _squeeze(contract: Contract) -> Contract: """ Reduce the contract size in RAM by removing large data structures from the build. Args: contract: The contract object to squeeze. Example: >>> contract = Contract("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> _squeeze(contract) <Contract object> """ for k in [ "ast", "bytecode", "coverageMap", "deployedBytecode", "deployedSourceMap", "natspec", "opcodes", "pcMap", ]: if contract._build and k in contract._build.keys(): contract._build[k] = {} return contract # we loosely cache this so we don't have to repeatedly fetch abis for commonly used proxy implementations @ttl_cache(maxsize=1000, ttl=60 * 60) @eth_retry.auto_retry def _extract_abi_data(address: Address): """ Extract ABI data for a contract from the blockchain explorer. Args: address: The address of the contract. Returns: A tuple containing the contract name, ABI, and implementation address (if applicable). Raises: Various exceptions based on the API response and contract status. Example: >>> name, abi, implementation = _extract_abi_data("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(name) 'Dai Stablecoin' """ try: data = _fetch_from_explorer(address, "getsourcecode", False)["result"][0] except ConnectionError as e: if '{"message":"Something went wrong.","result":null,"status":"0"}' in str(e): if _CHAINID == Network.xDai: raise ValueError("Rate limited by Blockscout. Please try again.") from e if web3.eth.get_code(address): raise exceptions.ContractNotVerified(address) from e else: raise ContractNotFound(address) from e raise except ValueError as e: if ( str(e).startswith("Failed to retrieve data from API") and "invalid api key" in str(e).lower() ): raise exceptions.InvalidAPIKeyError from e if exceptions.contract_not_verified(e): raise exceptions.ContractNotVerified( f"{address} on {Network.printable()}" ) from e elif "Unknown contract address:" in str(e): exc_type = ( exceptions.ContractNotVerified if is_contract(address) else ContractNotFound ) raise exc_type(str(e)) from e else: raise is_verified = bool(data.get("SourceCode")) if not is_verified: raise exceptions.ContractNotVerified( f"Contract source code not verified: {address}" ) from None name = data["ContractName"] abi = decode(data["ABI"]) implementation = data.get("Implementation") return name, abi, implementation def _resolve_proxy(address) -> Tuple[str, List]: """ Resolve the implementation address for a proxy contract. Args: address: The address of the proxy contract. Returns: A tuple containing the contract name and ABI. Example: >>> name, abi = _resolve_proxy("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(name) 'Dai Stablecoin' """ address = convert.to_address(address) name, abi, implementation = _extract_abi_data(address) as_proxy_for = None if address in FORCE_IMPLEMENTATION: implementation = FORCE_IMPLEMENTATION[address] name, implementation_abi, _ = _extract_abi_data(implementation) # Here we merge the proxy ABI with the implementation ABI # without doing this, we'd only get the implementation # and would lack any valid methods/events from the proxy itself. # Credit: Wavey@Yearn abi += implementation_abi return name, abi # always check for an EIP1967 proxy - https://eips.ethereum.org/EIPS/eip-1967 implementation_eip1967 = web3.eth.get_storage_at( address, int(web3.keccak(text="eip1967.proxy.implementation").hex(), 16) - 1 ) # always check for an EIP1822 proxy - https://eips.ethereum.org/EIPS/eip-1822 implementation_eip1822 = web3.eth.get_storage_at( address, web3.keccak(text="PROXIABLE") ) # Just leave this code where it is for a helpful debugger as needed. if address == "": raise Exception( f"""implementation: {implementation} implementation_eip1967: {len(implementation_eip1967)} {implementation_eip1967} implementation_eip1822: {len(implementation_eip1822)} {implementation_eip1822}""" ) if len(implementation_eip1967) > 0 and int(implementation_eip1967.hex(), 16): as_proxy_for = _resolve_address(implementation_eip1967[-20:]) elif len(implementation_eip1822) > 0 and int(implementation_eip1822.hex(), 16): as_proxy_for = _resolve_address(implementation_eip1822[-20:]) elif implementation: # for other proxy patterns, we only check if etherscan indicates # the contract is a proxy. otherwise we could have a false positive # if there is an `implementation` method on a regular contract. try: # first try to call `implementation` per EIP897 # https://eips.ethereum.org/EIPS/eip-897 c = Contract.from_abi(name, address, abi) as_proxy_for = c.implementation.call() except Exception: # if that fails, fall back to the address provided by etherscan as_proxy_for = _resolve_address(implementation) if as_proxy_for: name, abi, _ = _extract_abi_data(as_proxy_for) return name, abi # we loosely cache this so we don't have to repeatedly fetch abis for commonly used proxy implementations @alru_cache(maxsize=1000, ttl=300) async def _extract_abi_data_async(address: Address): """ Extract ABI data for a contract from the blockchain explorer. Args: address: The address of the contract. Returns: A tuple containing the contract name, ABI, and implementation address (if applicable). Raises: Various exceptions based on the API response and contract status. Example: >>> name, abi, implementation = await _extract_abi_data_async("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(name) 'Dai Stablecoin' """ try: response = await _fetch_from_explorer_async(address, "getsourcecode", False) except ConnectionError as e: if '{"message":"Something went wrong.","result":null,"status":"0"}' in str(e): if _CHAINID == Network.xDai: raise ValueError("Rate limited by Blockscout. Please try again.") from e if await dank_mids.eth.get_code(address): raise exceptions.ContractNotVerified(address) from e else: raise ContractNotFound(address) from e raise except ValueError as e: if ( str(e).startswith("Failed to retrieve data from API") and "invalid api key" in str(e).lower() ): raise exceptions.InvalidAPIKeyError from e if exceptions.contract_not_verified(e): raise exceptions.ContractNotVerified( f"{address} on {Network.printable()}" ) from e elif "Unknown contract address:" in str(e): exc_type = ( exceptions.ContractNotVerified if await dank_mids.eth.get_code(address) not in ["0x", b""] else ContractNotFound ) raise exc_type(str(e)) from e else: raise data = response["result"][0] is_verified = bool(data.get("SourceCode")) if not is_verified: raise exceptions.ContractNotVerified( f"Contract source code not verified: {address}" ) from None name = data["ContractName"] abi = decode(data["ABI"]) implementation = data.get("Implementation") return name, abi, implementation @eth_retry.auto_retry async def _fetch_from_explorer_async(address: str, action: str, silent: bool) -> Dict: url = CONFIG.active_network.get("explorer") if url is None: raise ValueError("Explorer API not set for this network") if address in _unverified_addresses: raise ValueError(f"Source for {address} has not been verified") code = (await dank_mids.eth.get_code(address)).hex()[2:] # EIP-1167: Minimal Proxy Contract if ( code[:20] == "363d3d373d3d3d363d73" and code[60:] == "5af43d82803e903d91602b57fd5bf3" ): address = _resolve_address(code[20:60]) # Vyper <0.2.9 `create_forwarder_to` elif ( code[:30] == "366000600037611000600036600073" and code[70:] == "5af4602c57600080fd5b6110006000f3" ): address = _resolve_address(code[30:70]) # 0xSplits Clones elif ( code[:120] == "36603057343d52307f830d2d700a97af574b186c80d40429385d24241565b08a7c559ba283a964d9b160203da23d3df35b3d3d3d3d363d3d37363d73" # noqa e501 and code[160:] == "5af43d3d93803e605b57fd5bf3" ): address = _resolve_address(code[120:160]) params = {"module": "contract", "action": action, "address": address} return await _fetch_explorer_data(url, silent=silent, params=params) @lru_cache(maxsize=None) def _get_explorer_api_key(url, silent) -> Tuple[str, str]: explorer, env_key = next( ((k, v) for k, v in _explorer_tokens.items() if k in url), (None, None) ) if env_key is None: return None if api_key := os.getenv(env_key): return api_key if not silent: warnings.warn( f"No {explorer} API token set. You may experience issues with rate limiting. " f"Visit https://{explorer}.io/register to obtain a token, and then store it " f"as the environment variable ${env_key}", BrownieEnvironmentWarning, ) return None @eth_retry.auto_retry async def _fetch_explorer_data(url, silent, params): api_key = _get_explorer_api_key(url, silent) if api_key is not None: params["apiKey"] = api_key async with aiohttp.ClientSession() as session: if not silent: print( f"Fetching source of {color('bright blue')}{params['address']}{color} " f"from {color('bright blue')}{urlparse(url).netloc}{color}..." ) async with session.get(url, params=params, headers=REQUEST_HEADERS) as response: # Check the status code of the response if response.status != 200: raise ConnectionError( f"Status {response.status} when querying {url}: {await response.text()}" ) data = await response.json() if int(data["status"]) != 1: raise ValueError(f"Failed to retrieve data from API: {data}") return data async def _resolve_proxy_async(address) -> Tuple[str, List]: """ Resolve the implementation address for a proxy contract. Args: address: The address of the proxy contract. Returns: A tuple containing the contract name and ABI. Example: >>> name, abi = await _resolve_proxy_async("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(name) 'Dai Stablecoin' """ address = convert.to_address(address) name, abi, implementation = await _extract_abi_data_async(address) as_proxy_for = None if address in FORCE_IMPLEMENTATION: implementation = FORCE_IMPLEMENTATION[address] name, implementation_abi, _ = await _extract_abi_data_async(implementation) # Here we merge the proxy ABI with the implementation ABI # without doing this, we'd only get the implementation # and would lack any valid methods/events from the proxy itself. # Credit: Wavey@Yearn abi += implementation_abi return name, abi # always check for an EIP1967 proxy - https://eips.ethereum.org/EIPS/eip-1967 # always check for an EIP1822 proxy - https://eips.ethereum.org/EIPS/eip-1822 implementation_eip1967, implementation_eip1822 = await asyncio.gather( dank_mids.eth.get_storage_at( address, int(web3.keccak(text="eip1967.proxy.implementation").hex(), 16) - 1 ), dank_mids.eth.get_storage_at(address, web3.keccak(text="PROXIABLE")), ) # Just leave this code where it is for a helpful debugger as needed. if address == "": raise Exception( f"""implementation: {implementation} implementation_eip1967: {len(implementation_eip1967)} {implementation_eip1967} implementation_eip1822: {len(implementation_eip1822)} {implementation_eip1822}""" ) if len(implementation_eip1967) > 0 and int(implementation_eip1967.hex(), 16): as_proxy_for = _resolve_address(implementation_eip1967[-20:]) elif len(implementation_eip1822) > 0 and int(implementation_eip1822.hex(), 16): as_proxy_for = _resolve_address(implementation_eip1822[-20:]) elif implementation: # for other proxy patterns, we only check if etherscan indicates # the contract is a proxy. otherwise we could have a false positive # if there is an `implementation` method on a regular contract. try: # first try to call `implementation` per EIP897 # https://eips.ethereum.org/EIPS/eip-897 c = Contract.from_abi(name, address, abi) as_proxy_for = await c.implementation except Exception: # if that fails, fall back to the address provided by etherscan as_proxy_for = _resolve_address(implementation) if as_proxy_for: name, abi, _ = await _extract_abi_data_async(as_proxy_for) return name, abi _resolve_proxy_async = a_sync.SmartProcessingQueue(_resolve_proxy_async, num_workers=8) def _setup_events(contract: Contract) -> None: """ Helper function used to init contract event containers on a newly created `y.Contract` object. Args: contract: The contract object to set up events for. Example: >>> contract = Contract("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> _setup_events(contract) """ if not hasattr(contract, "events"): contract.events = ContractEvents(contract) for k, v in contract.topics.items(): setattr(contract.events, k, Events(addresses=[contract.address], topics=[[v]])) _NOT_SYNCED = "`chain.height` returns 0 on your node, which means it is not fully synced.\nYou can only use this function on a fully synced node."