import asyncio
import json
import logging
import threading
from collections import defaultdict
from typing import (
Any,
Callable,
Dict,
List,
Literal,
NewType,
Optional,
Set,
Tuple,
Union,
)
import a_sync
import dank_mids
import eth_retry
from brownie import ZERO_ADDRESS, chain, web3
from brownie.exceptions import CompilerError, ContractNotFound
from brownie.network.contract import (
ContractEvents,
_add_deployment,
_ContractBase,
_DeployedContractBase,
_fetch_from_explorer,
_resolve_address,
)
from brownie.typing import AccountsType
from checksum_dict import ChecksumAddressDict, ChecksumAddressSingletonMeta
from hexbytes import HexBytes
from multicall import Call
from typing_extensions import Self
from web3.exceptions import ContractLogicError
from y import ENVIRONMENT_VARIABLES as ENVS
from y import convert, exceptions
from y._decorators import stuck_coro_debugger
from y.datatypes import Address, AnyAddressType, Block
from y.interfaces.ERC20 import ERC20ABI
from y.networks import Network
from y.time import check_node, check_node_async
from y.utils.cache import memory
from y.utils.events import Events
from y.utils.gather import gather_methods
logger = logging.getLogger(__name__)
contract_threads = a_sync.PruningThreadPoolExecutor(16)
# cached Contract instance, saves about 20ms of init time
_contract_lock = threading.Lock()
# These tokens have trouble when resolving the implementation via the chain.
FORCE_IMPLEMENTATION = {
Network.Mainnet: {
"0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48": "0xa2327a938Febf5FEC13baCFb16Ae10EcBc4cbDCF", # USDC as of 2022-08-10
"0x3d1E5Cf16077F349e999d6b21A4f646e83Cd90c5": "0xf51fC5ae556F5B8c6dCf50f70167B81ceb02a2b2", # dETH as of 2024-02-15
},
}.get(chain.id, {})
[docs]
def Contract_erc20(address: AnyAddressType) -> "Contract":
"""
Create a Contract instance for an ERC20 token.
This function uses the standard ERC20 ABI instead of fetching the contract ABI from the block explorer.
Args:
address: The address of the ERC20 token.
Returns:
A Contract instance for the ERC20 token.
"""
address = convert.to_address(address)
return Contract.from_abi("ERC20", address, ERC20ABI)
[docs]
def Contract_with_erc20_fallback(address: AnyAddressType) -> "Contract":
"""
Create a Contract instance for an address, falling back to an ERC20 token if the contract is not verified.
Args:
address: The address of the contract or ERC20 token.
Returns:
A Contract instance for the contract address.
"""
if isinstance(address, Contract):
return address
address = convert.to_address(address)
try:
return Contract(address)
except exceptions.ContractNotVerified:
return Contract_erc20(address)
@memory.cache()
# yLazyLogger(logger)
@eth_retry.auto_retry
def contract_creation_block(
address: AnyAddressType, when_no_history_return_0: bool = False
) -> int:
"""
Determine the block when a contract was created using binary search.
NOTE Requires access to historical state. Doesn't account for CREATE2 or SELFDESTRUCT.
Args:
address: The address of the contract.
when_no_history_return_0: If True, return 0 when no history is found instead of raising a :class:`~exceptions.NodeNotSynced` exception. Default False.
Returns:
The block number at which the contract was created.
Raises:
exceptions.NodeNotSynced: If the node is not fully synced.
ValueError: If the contract creation block cannot be determined.
"""
address = convert.to_address(address)
logger.debug("contract creation block %s", address)
height = chain.height
if height == 0:
raise exceptions.NodeNotSynced(_NOT_SYNCED)
check_node()
lo, hi = 0, height
barrier = 0
warned = False
while hi - lo > 1:
mid = lo + (hi - lo) // 2
# TODO rewrite this so we can get deploy blocks for some contracts deployed on correct side of barrier
try:
if _get_code(address, mid):
hi = mid
else:
lo = mid
except ValueError as e:
if "missing trie node" in str(e) and not warned:
logger.warning(
"missing trie node, `contract_creation_block` may output a higher block than actual. Please try again using an archive node."
)
elif "Server error: account aurora does not exist while viewing" in str(e):
if not warned:
logger.warning(str(e))
elif "No state available for block" in str(e):
if not warned:
logger.warning(str(e))
else:
raise
warned = True
barrier = mid
lo = mid
if hi == lo + 1 == barrier + 1 and when_no_history_return_0:
logger.warning(
"could not determine creation block for %s on %s (deployed prior to barrier)",
address,
Network.name(),
)
logger.debug("contract creation block %s -> 0", address)
return 0
if hi != height:
logger.debug("contract creation block %s -> %s", address, hi)
return hi
raise ValueError(f"Unable to find deploy block for {address} on {Network.name()}")
get_code = eth_retry.auto_retry(dank_mids.eth.get_code)
@memory.cache
@eth_retry.auto_retry
def _get_code(address: str, block: int) -> HexBytes:
return web3.eth.get_code(address, block)
creation_block_semaphore = a_sync.ThreadsafeSemaphore(10)
[docs]
@a_sync.a_sync(cache_type="memory")
@stuck_coro_debugger
@eth_retry.auto_retry
async def contract_creation_block_async(
address: AnyAddressType, when_no_history_return_0: bool = False
) -> int:
"""
Determine the block when a contract was created using binary search.
NOTE Requires access to historical state. Doesn't account for CREATE2 or SELFDESTRUCT.
Args:
address: The address of the contract.
when_no_history_return_0: If True, return 0 when no history is found instead of raising a :class:`~exceptions.NodeNotSynced` exception. Default False.
Returns:
The block number at which the contract was created.
Raises:
exceptions.NodeNotSynced: If the node is not fully synced.
ValueError: If the contract creation block cannot be determined.
"""
from y._db.utils import contract as db
address = convert.to_address(address)
if deploy_block := await db.get_deploy_block(address):
return deploy_block
logger.debug(f"contract creation block {address}")
height = await dank_mids.eth.block_number
if height == 0:
raise exceptions.NodeNotSynced(_NOT_SYNCED)
await check_node_async()
lo, hi = 0, height
barrier = 0
warned = False
while hi - lo > 1:
mid = lo + (hi - lo) // 2
# TODO rewrite this so we can get deploy blocks for some contracts deployed on correct side of barrier
try:
if await get_code(address, mid):
hi = mid
else:
lo = mid
except ValueError as e:
if "missing trie node" in str(e):
if not warned:
logger.warning(
"missing trie node, `contract_creation_block` may output a higher block than actual. Please try again using an archive node."
)
elif "Server error: account aurora does not exist while viewing" in str(e):
if not warned:
logger.warning(str(e))
elif "No state available for block" in str(e):
if not warned:
logger.warning(str(e))
else:
raise
warned = True
barrier = mid
lo = mid
if hi == lo + 1 == barrier + 1 and when_no_history_return_0:
logger.warning(
f"could not determine creation block for {address} on {Network.name()} (deployed prior to barrier)"
)
logger.debug(f"contract creation block {address} -> 0")
return 0
if hi != height:
logger.debug(f"contract creation block {address} -> {hi}")
db.set_deploy_block(address, hi)
return hi
raise ValueError(f"Unable to find deploy block for {address} on {Network.name()}")
# this defaultdict prevents congestion in the contracts thread pool
address_semaphores = defaultdict(lambda: a_sync.ThreadsafeSemaphore(1))
[docs]
class ContractEvents(ContractEvents):
[docs]
def __getattr__(self, name: str) -> Events:
return super().__getattr__(name)
[docs]
class Contract(dank_mids.Contract, metaclass=ChecksumAddressSingletonMeta):
"""
Though it may look complicated, a ypricemagic Contract object is simply a brownie Contract object with a few modifications:
1. Contracts will not be compiled. This allows you to more quickly fetch contracts from the block explorer and prevents you from having to download and install compilers.
NOTE: You must set `autofetch_sources=false` in your project's brownie-config.yaml for this to work correctly.
2. To each contract method, a `coroutine` property has been defined which allows you to make asynchronous calls which are intelligently batched in the background by :mod:`dank_mids` to reduce overhead.
Example:
>>> contract = Contract("0xAddress")
>>> contract.methodName(*args, block_identifier=12345678)
1000000000000000000
>>> coro = contract.methodName.coroutine(*args, block_identifier=12345678)
>>> coro
<coroutine coroutine object at 0x12345678>
>>> contract.methodName(*args, block_identifier=12345678) == await coro
True
3. New methods:
- :meth:`~has_method`
- :meth:`~has_methods`
- :meth:`~build_name`
- :meth:`~get_code`
4. A few attributes were removed in order to minimize the size of a Contract object in memory:
- :attr:`~ast`
- :attr:`~bytecode`
- :attr:`~coverageMap`
- :attr:`~deployedBytecode`
- :attr:`~deployedSourceMap`
- :attr:`~natspec`
- :attr:`~opcodes`
- :attr:`~pcMap`
"""
# the default state for Contract objects
verified = True
"""True if the contract is verified on this network's block explorer. False otherwise."""
events: ContractEvents
"""
A container for various event types associated with this contract.
Provides a convenient way to query contract events with minimal code.
"""
_ChecksumAddressSingletonMeta__instances: ChecksumAddressDict["Contract"]
_ttl_cache_popper: Union[Literal["disabled"], int, asyncio.TimerHandle]
[docs]
@eth_retry.auto_retry
def __init__(
self,
address: AnyAddressType,
owner: Optional[AccountsType] = None,
require_success: bool = True,
cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds
) -> None:
address = str(address)
if address.lower() in [
"0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
ZERO_ADDRESS,
]:
raise ContractNotFound(f"{address} is not a contract.")
if require_success and address in _unverified:
raise exceptions.ContractNotVerified(address)
with _contract_lock:
# autofetch-sources: false
# Try to fetch the contract from the local sqlite db.
try:
super().__init__(address, owner=owner)
if not isinstance(self.verified, bool) and self.verified is not None:
logger.warning(
f'`Contract("{address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.'
)
except AssertionError as e:
raise CompilerError(
"y.Contract objects work best when we bypass compilers. In this case, it will *only* work when we bypass. Please ensure autofetch_sources=False in your brownie-config.yaml and rerun your script."
) from None
except IndexError as e:
if str(e) == "pop from an empty deque":
raise CompilerError(
"y.Contract objects work best when we bypass compilers. In this case, it will *only* work when we bypass. Please ensure autofetch_sources=False in your brownie-config.yaml and rerun your script."
) from None
raise
except ValueError as e:
if not str(e).startswith("Unknown contract address: "):
raise
logger.debug(f"{e}")
try:
name, abi = _resolve_proxy(address)
build = {
"abi": abi,
"address": address,
"contractName": name,
"type": "contract",
}
self.__init_from_abi__(build, owner=owner, persist=True)
except exceptions.InvalidAPIKeyError:
# re-raise with a cleaner traceback
raise exceptions.InvalidAPIKeyError from None
except (ContractNotFound, exceptions.ContractNotVerified) as e:
if isinstance(e, exceptions.ContractNotVerified):
_unverified.add(address)
if require_success:
raise
try:
if isinstance(e, exceptions.ContractNotVerified):
self.verified = False
self._build = {"contractName": "Non-Verified Contract"}
else:
self.verified = None
self._build = {"contractName": "Broken Contract"}
except AttributeError:
logger.warning(
f'`Contract("{address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.'
)
# Patch the Contract with coroutines for each method.
dank_mids.patch_contract(self)
if self.verified:
# Init an event container for each topic
_setup_events(self)
# Get rid of unnecessary memory-hog properties
_squeeze(self)
self._schedule_cache_pop(cache_ttl)
[docs]
@classmethod
@a_sync.a_sync
def from_abi(
cls,
name: str,
address: str,
abi: List,
owner: Optional[AccountsType] = None,
persist: bool = True,
cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds
) -> Self:
"""
Create a Contract instance from an ABI.
Args:
name: The name of the contract.
address: The address of the contract.
abi: The ABI of the contract.
owner (optional): The owner of the contract. Default None.
persist (optional): If True, persist the contract in brownie's local contract database. Default True.
cache_ttl (optional): The time-to-live for the contract cache in seconds. Default set in :mod:`~y.ENVIRONMENT_VARIABLES`.
Returns:
A Contract instance for the given ABI.
"""
self = cls.__new__(cls)
build = {
"abi": abi,
"address": _resolve_address(address),
"contractName": name,
"type": "contract",
}
self.__init_from_abi__(build, owner, persist)
# Patch the Contract with coroutines for each method.
dank_mids.patch_contract(self)
# Init an event container for each topic
_setup_events(self)
# Get rid of unnecessary memory-hog properties
_squeeze(self)
# schedule call to pop from cache
self._schedule_cache_pop(cache_ttl)
return self
[docs]
@classmethod
async def coroutine(
cls,
address: AnyAddressType,
require_success: bool = True,
cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds
) -> Self:
address = str(address)
if contract := cls._ChecksumAddressSingletonMeta__instances.get(address, None):
return contract
# dict lookups faster than string comparisons, keep this behind the singleton check
if address.lower() in [
"0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
ZERO_ADDRESS,
]:
raise ContractNotFound(f"{address} is not a contract.") from None
try:
# We do this so we don't clog the threadpool with multiple jobs for the same contract.
return await _contract_queue(
address, require_success=require_success, cache_ttl=cache_ttl
)
except (ContractNotFound, exceptions._ExplorerError, CompilerError) as e:
# re-raise with nicer traceback
raise type(e)(*e.args) from None
@classmethod
@stuck_coro_debugger
async def _coroutine(
cls,
address: AnyAddressType,
require_success: bool = True,
cache_ttl: Optional[int] = ENVS.CONTRACT_CACHE_TTL, # units: seconds
) -> Self:
"""
Internal method to create a Contract instance asynchronously.
Args:
address: The address of the contract.
require_success: If True, raise an exception if the contract cannot be initialized.
cache_ttl: The time-to-live for the contract cache in seconds.
Returns:
A Contract instance for the given address.
"""
contract = await contract_threads.run(
cls, address, require_success=require_success
)
if not contract.verified or contract._ttl_cache_popper == "disabled":
pass
elif cache_ttl is None:
if isinstance(contract._ttl_cache_popper, asyncio.TimerHandle):
contract._ttl_cache_popper.cancel()
contract._ttl_cache_popper = "disabled"
elif isinstance(contract._ttl_cache_popper, int):
cache_ttl = max(contract._ttl_cache_popper, cache_ttl)
contract._ttl_cache_popper = asyncio.get_running_loop().call_later(
cache_ttl,
cls._ChecksumAddressSingletonMeta__instances.pop,
contract.address,
None,
)
elif (
asyncio.get_running_loop().time() + cache_ttl
> contract._ttl_cache_popper.when()
):
contract._ttl_cache_popper.cancel()
contract._ttl_cache_popper = asyncio.get_running_loop().call_later(
cache_ttl,
cls._ChecksumAddressSingletonMeta__instances.pop,
contract.address,
None,
)
return contract
def __init_from_abi__(
self, build: Dict, owner: Optional[AccountsType] = None, persist: bool = True
) -> None:
"""
Initialize a Contract instance from an ABI.
Args:
build: The build information for the contract.
owner (optional): The owner of the contract. Default None.
persist (optional): If True, persist the contract in the local database. Default True.
Returns:
The initialized Contract instance.
"""
_ContractBase.__init__(self, None, build, {}) # type: ignore
_DeployedContractBase.__init__(self, build["address"], owner, None)
if persist:
_add_deployment(self)
try:
self.verified = True
except AttributeError:
logger.warning(
f'`Contract("{self.address}").verified` property will not be usable due to the contract having a `verified` method in its ABI.'
)
return self
[docs]
def has_method(
self, method: str, return_response: bool = False
) -> Union[bool, Any]:
"""
Check if the contract has a specific method.
Args:
method: The name of the method to check for.
return_response (optional): If True, return the response of the method call instead of a boolean. Default False.
Returns:
A boolean indicating whether the contract has the method, or the response of the method call if return_response is True.
"""
return has_method(
self.address, method, return_response=return_response, sync=False
)
[docs]
async def has_methods(
self, methods: List[str], _func: Union[any, all] = all
) -> bool:
"""
Check if the contract has all the specified methods.
Args:
methods: A list of method names to check for.
_func (optional): The function to use for combining the results (either :func:`all` or :func:`any`). Default :func:`all`.
Returns:
A boolean indicating whether the contract has all the specified methods.
"""
return await has_methods(self.address, methods, _func, sync=False)
[docs]
async def build_name(self, return_None_on_failure: bool = False) -> Optional[str]:
"""
Get the build name of the contract.
Args:
return_None_on_failure (optional): If True, return None if the build name cannot be determined instead of raising an Exception. Default False.
Returns:
The build name of the contract, or None if the build name cannot be determined and return_None_on_failure is True.
"""
return await build_name(
self.address, return_None_on_failure=return_None_on_failure, sync=False
)
[docs]
async def get_code(self, block: Optional[Block] = None) -> HexBytes:
"""
Get the bytecode of the contract at a specific block.
Args:
block (optional): The block number at which to get the bytecode. Defaults to latest block.
Returns:
The bytecode of the contract at the specified block.
"""
return await get_code(self.address, block=block)
def _schedule_cache_pop(self, cache_ttl: Optional[int]) -> None:
if cache_ttl is None:
self._ttl_cache_popper = "disabled"
return
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# If the event loop isn't running yet we can just specify the cache_ttl for later use
self._ttl_cache_popper = cache_ttl
return
cache = self._ChecksumAddressSingletonMeta__instances
self._ttl_cache_popper = loop.call_later(
cache_ttl,
cache.pop,
self.address,
None,
)
_contract_queue = a_sync.SmartProcessingQueue(Contract._coroutine, num_workers=32)
@memory.cache()
# TODO: async this and put it into ydb for quicker startups
# yLazyLogger(logger)
def is_contract(address: AnyAddressType) -> bool:
"""
Checks to see if the input address is a contract. Returns `False` if:
- The address is not and has never been a contract
- The address used to be a contract but has self-destructed
Args:
address: The address to check.
Returns:
True if the address is a contract, False otherwise.
"""
address = convert.to_address(address)
return web3.eth.get_code(address) not in ["0x", b""]
[docs]
@a_sync.a_sync(default="sync", cache_type="memory")
async def has_method(
address: Address, method: str, return_response: bool = False
) -> Union[bool, Any]:
"""
Checks to see if a contract has a `method` view method with no inputs.
`return_response=True` will return `response` in bytes if `response` else `False`
Args:
address: The address of the contract.
method: The name of the method to check for.
return_response: If True, return the response of the method call instead of a boolean. Default False.
Returns:
A boolean indicating whether the contract has the method, or the response of the method call if return_response is True.
"""
address = convert.to_address(address)
try:
response = await Call(address, [method])
return False if response is None else response if return_response else True
except Exception as e:
if (
isinstance(e, ContractLogicError)
or exceptions.call_reverted(e)
or "invalid jump destination" in str(e)
):
return False
raise
[docs]
@stuck_coro_debugger
@a_sync.a_sync(default="sync", cache_type="memory", ram_cache_ttl=15 * 60)
async def has_methods(
address: AnyAddressType,
methods: Tuple[str],
_func: Callable = all, # Union[any, all]
) -> bool:
"""
Checks to see if a contract has each view method (with no inputs) in `methods`.
Pass `at_least_one=True` to only verify a contract has at least one of the methods.
Args:
address: The address of the contract.
methods: A tuple of method names to check for.
_func: The function to use for combining the results (either :func:`all` or :func:`any`).
Returns:
A boolean indicating whether the contract has all the specified methods.
"""
assert _func in [all, any], "`_func` must be either `any` or `all`"
address = convert.to_address(address)
try:
return _func(
[result is not None for result in await gather_methods(address, methods)]
)
except Exception as e:
if not isinstance(e, ContractLogicError) and not exceptions.call_reverted(e):
raise # and not out_of_gas(e): raise
# Out of gas error implies one or more method is state-changing.
# If `_func == all` we return False because `has_methods` is only supposed to work for public view methods with no inputs
# If `_func == any` maybe one of the methods will work without "out of gas" error
return (
False
if _func == all
else any(
await asyncio.gather(
*[has_method(address, method, sync=False) for method in methods]
)
)
)
# yLazyLogger(logger)
[docs]
@stuck_coro_debugger
async def probe(
address: AnyAddressType,
methods: List[str],
block: Optional[Block] = None,
return_method: bool = False,
) -> Any:
address = convert.to_address(address)
results = await gather_methods(
address, methods, block=block, return_exceptions=True
)
logger.debug("probe results: %s", results)
results = [
(method, result)
for method, result in zip(methods, results)
if not isinstance(result, Exception) and result is not None
]
if len(results) not in [1, 0]:
logger.debug("multiple results: %s", results)
if len(results) != 2 or results[0][1] != results[1][1]:
raise AssertionError(
f"`probe` returned multiple results for {address}: {results}. Must debug"
)
method = results[0][0], results[1][0]
result = results[0][1]
results = [(method, result)]
logger.debug("final results: %s", results)
method, result = results[0] if len(results) == 1 else (None, None)
if method:
assert result is not None
return (method, result) if return_method else result
[docs]
@a_sync.a_sync(default="sync")
@stuck_coro_debugger
async def build_name(
address: AnyAddressType, return_None_on_failure: bool = False
) -> str:
"""
Get the build name of a contract.
Args:
address: The address of the contract.
return_None_on_failure (optional): If True, return None if the build name cannot be determined instead of raising an Exception. Default False.
Returns:
The build name of the contract, or None if the build name cannot be determined and return_None_on_failure is True.
"""
try:
contract = await Contract.coroutine(address)
return contract.__dict__["_build"]["contractName"]
except exceptions.ContractNotVerified:
if not return_None_on_failure:
raise
return None
[docs]
async def proxy_implementation(
address: AnyAddressType, block: Optional[Block]
) -> Address:
"""
Get the implementation address for a proxy contract.
Args:
address: The address of the proxy contract.
block: The block number at which to get the implementation address. Defaults to latest block.
Returns:
The address of the implementation contract.
"""
return await probe(
address, ["implementation()(address)", "target()(address)"], block
)
def _squeeze(contract: Contract) -> Contract:
"""
Reduce the contract size in RAM by removing large data structures from the build.
Args:
contract: The contract object to squeeze.
Returns:
The squeezed contract object.
"""
for k in [
"ast",
"bytecode",
"coverageMap",
"deployedBytecode",
"deployedSourceMap",
"natspec",
"opcodes",
"pcMap",
]:
if contract._build and k in contract._build.keys():
contract._build[k] = {}
return contract
@eth_retry.auto_retry
def _extract_abi_data(address: Address):
"""
Extract ABI data for a contract from the blockchain explorer.
Args:
address: The address of the contract.
Returns:
A tuple containing the contract name, ABI, and implementation address (if applicable).
Raises:
Various exceptions based on the API response and contract status.
"""
try:
data = _fetch_from_explorer(address, "getsourcecode", False)["result"][0]
except ConnectionError as e:
if '{"message":"Something went wrong.","result":null,"status":"0"}' in str(e):
if chain.id == Network.xDai:
raise ValueError("Rate limited by Blockscout. Please try again.") from e
if web3.eth.get_code(address):
raise exceptions.ContractNotVerified(address) from e
else:
raise ContractNotFound(address) from e
raise
except ValueError as e:
if (
str(e).startswith("Failed to retrieve data from API")
and "invalid api key" in str(e).lower()
):
raise exceptions.InvalidAPIKeyError from e
if exceptions.contract_not_verified(e):
raise exceptions.ContractNotVerified(
f"{address} on {Network.printable()}"
) from e
elif "Unknown contract address:" in str(e):
exc_type = (
exceptions.ContractNotVerified
if is_contract(address)
else ContractNotFound
)
raise exc_type(str(e)) from e
else:
raise
is_verified = bool(data.get("SourceCode"))
if not is_verified:
raise exceptions.ContractNotVerified(
f"Contract source code not verified: {address}"
) from None
name = data["ContractName"]
abi = json.loads(data["ABI"])
implementation = data.get("Implementation")
return name, abi, implementation
def _resolve_proxy(address) -> Tuple[str, List]:
"""
Resolve the implementation address for a proxy contract.
Args:
address: The address of the proxy contract.
Returns:
A tuple containing the contract name and ABI.
"""
address = convert.to_address(address)
name, abi, implementation = _extract_abi_data(address)
as_proxy_for = None
if address in FORCE_IMPLEMENTATION:
implementation = FORCE_IMPLEMENTATION[address]
name, implementation_abi, _ = _extract_abi_data(implementation)
# Here we merge the proxy ABI with the implementation ABI
# without doing this, we'd only get the implementation
# and would lack any valid methods/events from the proxy itself.
# Credit: Wavey@Yearn
abi += implementation_abi
return name, abi
# always check for an EIP1967 proxy - https://eips.ethereum.org/EIPS/eip-1967
implementation_eip1967 = web3.eth.get_storage_at(
address, int(web3.keccak(text="eip1967.proxy.implementation").hex(), 16) - 1
)
# always check for an EIP1822 proxy - https://eips.ethereum.org/EIPS/eip-1822
implementation_eip1822 = web3.eth.get_storage_at(
address, web3.keccak(text="PROXIABLE")
)
# Just leave this code where it is for a helpful debugger as needed.
if address == "":
raise Exception(
f"""implementation: {implementation}
implementation_eip1967: {len(implementation_eip1967)} {implementation_eip1967}
implementation_eip1822: {len(implementation_eip1822)} {implementation_eip1822}"""
)
if len(implementation_eip1967) > 0 and int(implementation_eip1967.hex(), 16):
as_proxy_for = _resolve_address(implementation_eip1967[-20:])
elif len(implementation_eip1822) > 0 and int(implementation_eip1822.hex(), 16):
as_proxy_for = _resolve_address(implementation_eip1822[-20:])
elif implementation:
# for other proxy patterns, we only check if etherscan indicates
# the contract is a proxy. otherwise we could have a false positive
# if there is an `implementation` method on a regular contract.
try:
# first try to call `implementation` per EIP897
# https://eips.ethereum.org/EIPS/eip-897
c = Contract.from_abi(name, address, abi)
as_proxy_for = c.implementation.call()
except Exception:
# if that fails, fall back to the address provided by etherscan
as_proxy_for = _resolve_address(implementation)
if as_proxy_for:
name, abi, _ = _extract_abi_data(as_proxy_for)
return name, abi
def _setup_events(contract: Contract) -> None:
"""
Helper function used to init contract event containers on a newly created `y.Contract` object.
Args:
contract: The contract object to set up events for.
"""
if not hasattr(contract, "events"):
contract.events = ContractEvents(contract)
for k, v in contract.topics.items():
setattr(contract.events, k, Events(addresses=[contract.address], topics=[[v]]))
_Address = NewType("_Address", str)
_unverified: Set[_Address] = set()
"""A collection of unverified addresses that is used to prevent repetitive etherscan api calls"""
_NOT_SYNCED = "`chain.height` returns 0 on your node, which means it is not fully synced.\nYou can only use this function on a fully synced node."