Source code for brownie.network.contract

#!/usr/bin/python3

import asyncio
import io
import json
import os
import re
import time
import warnings
from pathlib import Path
from textwrap import TextWrapper
from threading import get_ident  # noqa
from typing import (
    Any,
    Callable,
    Coroutine,
    Dict,
    Iterator,
    List,
    Match,
    Optional,
    Set,
    Tuple,
    Union,
)
from urllib.parse import urlparse

import eth_abi
import requests
import solcx
from eth_utils import combomethod, remove_0x_prefix
from hexbytes import HexBytes
from semantic_version import Version
from vvm import get_installable_vyper_versions
from vvm.utils.convert import to_vyper_version
from web3._utils import filters
from web3.datastructures import AttributeDict
from web3.types import LogReceipt

from brownie._config import BROWNIE_FOLDER, CONFIG, REQUEST_HEADERS, _load_project_compiler_config
from brownie.convert.datatypes import Wei
from brownie.convert.normalize import format_input, format_output
from brownie.convert.utils import (
    build_function_selector,
    build_function_signature,
    get_type_strings,
)
from brownie.exceptions import (
    BrownieCompilerWarning,
    BrownieEnvironmentWarning,
    ContractExists,
    ContractNotFound,
    UndeployedLibrary,
    VirtualMachineError,
    decode_typed_error,
    parse_errors_from_abi,
)
from brownie.project import compiler
from brownie.project.flattener import Flattener
from brownie.typing import AccountsType, TransactionReceiptType
from brownie.utils import color

from . import accounts, chain
from .event import _add_deployment_topics, _get_topics, event_watcher
from .state import (
    _add_contract,
    _add_deployment,
    _find_contract,
    _get_deployment,
    _remove_contract,
    _remove_deployment,
    _revert_register,
)
from .web3 import ContractEvent, _ContractEvents, _resolve_address, web3

_unverified_addresses: Set = set()

_explorer_tokens = {
    "optimistic": "OPTIMISMSCAN_TOKEN",
    "etherscan": "ETHERSCAN_TOKEN",
    "bscscan": "BSCSCAN_TOKEN",
    "zkevm": "ZKEVMSCAN_TOKEN",
    "polygonscan": "POLYGONSCAN_TOKEN",
    "ftmscan": "FTMSCAN_TOKEN",
    "arbiscan": "ARBISCAN_TOKEN",
    "snowtrace": "SNOWTRACE_TOKEN",
    "aurorascan": "AURORASCAN_TOKEN",
    "moonscan": "MOONSCAN_TOKEN",
    "gnosisscan": "GNOSISSCAN_TOKEN",
    "base": "BASESCAN_TOKEN",
    "blast": "BLASTSCAN_TOKEN",
}


class _ContractBase:
    _dir_color = "bright magenta"

    def __init__(self, project: Any, build: Dict, sources: Dict) -> None:
        self._project = project
        self._build = build.copy()
        self._sources = sources
        self.topics = _get_topics(self.abi)
        self.selectors = {
            build_function_selector(i): i["name"] for i in self.abi if i["type"] == "function"
        }
        # this isn't fully accurate because of overloaded methods - will be removed in `v2.0.0`
        self.signatures = {
            i["name"]: build_function_selector(i) for i in self.abi if i["type"] == "function"
        }
        parse_errors_from_abi(self.abi)

    @property
    def abi(self) -> List:
        return self._build["abi"]

    @property
    def _name(self) -> str:
        return self._build["contractName"]

    def info(self) -> None:
        """
        Display NatSpec documentation for this contract.
        """
        if self._build.get("natspec"):
            _print_natspec(self._build["natspec"])

    def get_method(self, calldata: str) -> Optional[str]:
        sig = calldata[:10].lower()
        return self.selectors.get(sig)

    def decode_input(self, calldata: Union[str, bytes]) -> Tuple[str, Any]:
        """
        Decode input calldata for this contract.

        Arguments
        ---------
        calldata : str | bytes
            Calldata for a call to this contract

        Returns
        -------
        str
            Signature of the function that was called
        Any
            Decoded input arguments
        """
        if not isinstance(calldata, HexBytes):
            calldata = HexBytes(calldata)

        fn_selector = calldata[:4].hex()  # type: ignore
        abi = next(
            (
                i
                for i in self.abi
                if i["type"] == "function" and build_function_selector(i) == fn_selector
            ),
            None,
        )
        if abi is None:
            raise ValueError("Four byte selector does not match the ABI for this contract")

        function_sig = build_function_signature(abi)

        types_list = get_type_strings(abi["inputs"])
        result = eth_abi.decode(types_list, calldata[4:])
        input_args = format_input(abi, result)

        return function_sig, input_args


class ContractContainer(_ContractBase):
    """List-like container class that holds all Contract instances of the same
    type, and is used to deploy new instances of that contract.

    Attributes:
        abi: Complete contract ABI.
        bytecode: Bytecode used to deploy the contract.
        signatures: Dictionary of {'function name': "bytes4 signature"}
        topics: Dictionary of {'event name': "bytes32 topic"}"""

    def __init__(self, project: Any, build: Dict) -> None:
        self.tx = None
        self.bytecode = build["bytecode"]
        self._contracts: List["ProjectContract"] = []
        super().__init__(project, build, project._sources)
        self.deploy = ContractConstructor(self, self._name)
        _revert_register(self)

        # messes with tests if it is created on init
        # instead we create when it's requested, but still define it here
        self._flattener: Flattener = None  # type: ignore

    def __iter__(self) -> Iterator:
        return iter(self._contracts)

    def __getitem__(self, i: Any) -> "ProjectContract":
        return self._contracts[i]

    def __delitem__(self, key: Any) -> None:
        item = self._contracts[key]
        self.remove(item)

    def __len__(self) -> int:
        return len(self._contracts)

    def __repr__(self) -> str:
        if CONFIG.argv["cli"] == "console":
            return str(self._contracts)
        return super().__repr__()

    def _reset(self) -> None:
        for contract in self._contracts:
            _remove_contract(contract)
            contract._reverted = True
        self._contracts.clear()

    def _revert(self, height: int) -> None:
        reverted = [
            i
            for i in self._contracts
            if (i.tx and i.tx.block_number is not None and i.tx.block_number > height)
            or len(web3.eth.get_code(i.address).hex()) <= 4
        ]
        for contract in reverted:
            self.remove(contract)
            contract._reverted = True

    def remove(self, contract: "ProjectContract") -> None:
        """Removes a contract from the container.

        Args:
            contract: Contract instance of address string of the contract."""
        if contract not in self._contracts:
            raise TypeError("Object is not in container.")
        self._contracts.remove(contract)
        contract._delete_deployment()
        _remove_contract(contract)

    def at(
        self,
        address: str,
        owner: Optional[AccountsType] = None,
        tx: Optional[TransactionReceiptType] = None,
        persist: bool = True,
    ) -> "ProjectContract":
        """Returns a contract address.

        Raises ValueError if no bytecode exists at the address.

        Args:
            address: Address string of the contract.
            owner: Default Account instance to send contract transactions from.
            tx: Transaction ID of the contract creation."""
        address = _resolve_address(address)
        contract = _find_contract(address)
        if isinstance(contract, ProjectContract):
            if contract._name == self._name and contract._project == self._project:
                return contract
            raise ContractExists(
                f"'{contract._name}' declared at {address} in project '{contract._project._name}'"
            )

        build = self._build
        contract = ProjectContract(self._project, build, address, owner, tx)
        if not _verify_deployed_code(address, build["deployedBytecode"], build["language"]):
            # prevent trace attempts when the bytecode doesn't match
            del contract._build["pcMap"]

        contract._save_deployment()
        _add_contract(contract)
        self._contracts.append(contract)
        if CONFIG.network_type == "live":
            if persist:
                _add_deployment(contract)

        return contract

    def _add_from_tx(self, tx: TransactionReceiptType) -> None:
        tx._confirmed.wait()
        if tx.status and tx.contract_address is not None:
            try:
                self.at(tx.contract_address, tx.sender, tx)
            except ContractNotFound:
                # if the contract self-destructed during deployment
                pass

    def get_verification_info(self) -> Dict:
        """
        Return a dict with flattened source code for this contract
        and further information needed for verification
        """
        language = self._build["language"]
        if language == "Vyper":
            raise TypeError(
                "Etherscan does not support API verification of source code "
                "for vyper contracts. You need to verify the source manually"
            )
        elif language == "Solidity":
            if self._flattener is None:
                source_fp = (
                    Path(self._project._path)
                    .joinpath(self._build["sourcePath"])
                    .resolve()
                    .as_posix()
                )
                config = self._project._compiler_config
                remaps = dict(
                    map(
                        lambda s: s.split("=", 1),
                        compiler._get_solc_remappings(config["solc"]["remappings"]),
                    )
                )
                libs = {lib.strip("_") for lib in re.findall("_{1,}[^_]*_{1,}", self.bytecode)}
                compiler_settings = {
                    "evmVersion": self._build["compiler"]["evm_version"],
                    "optimizer": config["solc"]["optimizer"],
                    "libraries": {
                        Path(source_fp).name: {lib: self._project[lib][-1].address for lib in libs}
                    },
                }
                self._flattener = Flattener(source_fp, self._name, remaps, compiler_settings)

            build_json = self._build

            return {
                "standard_json_input": self._flattener.standard_input_json,
                "contract_name": build_json["contractName"],
                "compiler_version": build_json["compiler"]["version"],
                "optimizer_enabled": build_json["compiler"]["optimizer"]["enabled"],
                "optimizer_runs": build_json["compiler"]["optimizer"]["runs"],
                "license_identifier": self._flattener.license,
                "bytecode_len": len(build_json["bytecode"]),
            }
        else:
            raise TypeError(f"Unsupported language for source verification: {language}")

    def publish_source(self, contract: Any, silent: bool = False) -> bool:
        """Flatten contract and publish source on the selected explorer"""

        # Check required conditions for verifying
        url = CONFIG.active_network.get("explorer")
        if url is None:
            raise ValueError("Explorer API not set for this network")
        env_token = next((v for k, v in _explorer_tokens.items() if k in url), None)
        if env_token is None:
            raise ValueError(
                f"Publishing source is only supported on {', '.join(_explorer_tokens)},"
                "change the Explorer API"
            )

        if os.getenv(env_token):
            api_key = os.getenv(env_token)
        else:
            host = urlparse(url).netloc
            host = host[host.index(".") + 1 :]
            raise ValueError(
                f"An API token is required to verify contract source code. Visit https://{host}/ "
                f"to obtain a token, and then store it as the environment variable ${env_token}"
            )

        address = _resolve_address(contract.address)

        # Get source code and contract/compiler information
        contract_info = self.get_verification_info()

        # Select matching license code (https://etherscan.io/contract-license-types)
        license_code = 1
        identifier = contract_info["license_identifier"].lower()
        if "unlicensed" in identifier:
            license_code = 2
        elif "mit" in identifier:
            license_code = 3
        elif "agpl" in identifier and "3.0" in identifier:
            license_code = 13
        elif "lgpl" in identifier:
            if "2.1" in identifier:
                license_code = 6
            elif "3.0" in identifier:
                license_code = 7
        elif "gpl" in identifier:
            if "2.0" in identifier:
                license_code = 4
            elif "3.0" in identifier:
                license_code = 5
        elif "bsd-2-clause" in identifier:
            license_code = 8
        elif "bsd-3-clause" in identifier:
            license_code = 9
        elif "mpl" in identifier and "2.0" in identifier:
            license_code = 10
        elif identifier.startswith("osl") and "3.0" in identifier:
            license_code = 11
        elif "apache" in identifier and "2.0" in identifier:
            license_code = 12

        # get constructor arguments
        params_tx: Dict = {
            "apikey": api_key,
            "module": "account",
            "action": "txlist",
            "address": address,
            "page": 1,
            "sort": "asc",
            "offset": 1,
        }
        i = 0
        while True:
            response = requests.get(url, params=params_tx, headers=REQUEST_HEADERS)
            if response.status_code != 200:
                raise ConnectionError(
                    f"Status {response.status_code} when querying {url}: {response.text}"
                )
            data = response.json()
            if int(data["status"]) == 1:
                # Constructor arguments received
                break
            else:
                # Wait for contract to be recognized by etherscan
                # This takes a few seconds after the contract is deployed
                # After 10 loops we throw with the API result message (includes address)
                if i >= 10:
                    raise ValueError(f"API request failed with: {data['result']}")
                elif i == 0 and not silent:
                    print(f"Waiting for {url} to process contract...")
                i += 1
                time.sleep(10)

        if data["message"] == "OK":
            constructor_arguments = data["result"][0]["input"][contract_info["bytecode_len"] + 2 :]
        else:
            constructor_arguments = ""

        # Submit verification
        payload_verification: Dict = {
            "apikey": api_key,
            "module": "contract",
            "action": "verifysourcecode",
            "contractaddress": address,
            "sourceCode": io.StringIO(json.dumps(self._flattener.standard_input_json)),
            "codeformat": "solidity-standard-json-input",
            "contractname": f"{self._flattener.contract_file}:{self._flattener.contract_name}",
            "compilerversion": f"v{contract_info['compiler_version']}",
            "optimizationUsed": 1 if contract_info["optimizer_enabled"] else 0,
            "runs": contract_info["optimizer_runs"],
            "constructorArguements": constructor_arguments,
            "licenseType": license_code,
        }
        response = requests.post(url, data=payload_verification, headers=REQUEST_HEADERS)
        if response.status_code != 200:
            raise ConnectionError(
                f"Status {response.status_code} when querying {url}: {response.text}"
            )
        data = response.json()
        if int(data["status"]) != 1:
            raise ValueError(f"Failed to submit verification request: {data['result']}")

        # Status of request
        guid = data["result"]
        if not silent:
            print("Verification submitted successfully. Waiting for result...")
        time.sleep(10)
        params_status: Dict = {
            "apikey": api_key,
            "module": "contract",
            "action": "checkverifystatus",
            "guid": guid,
        }
        while True:
            response = requests.get(url, params=params_status, headers=REQUEST_HEADERS)
            if response.status_code != 200:
                raise ConnectionError(
                    f"Status {response.status_code} when querying {url}: {response.text}"
                )
            data = response.json()
            if data["result"] == "Pending in queue":
                if not silent:
                    print("Verification pending...")
            else:
                if not silent:
                    col = "bright green" if data["message"] == "OK" else "bright red"
                    print(f"Verification complete. Result: {color(col)}{data['result']}{color}")
                return data["message"] == "OK"
            time.sleep(10)

    def _slice_source(self, source: str, offset: list) -> str:
        """Slice the source of the contract, preserving any comments above the first line."""
        offset_start = offset[0]
        top_source = source[:offset_start]
        top_lines = top_source.split("\n")[::-1]
        comment_open = False
        for line in top_lines:
            stripped = line.strip()
            if (
                not stripped
                or stripped.startswith(("//", "/*", "*"))
                or stripped.endswith("*/")
                or comment_open
            ):
                offset_start = offset_start - len(line) - 1
                if stripped.endswith("*/"):
                    comment_open = True
                elif stripped.startswith("/*"):
                    comment_open = False
            else:
                # Stop on the first non-empty, non-comment line
                break
        offset_start = max(0, offset_start)
        return source[offset_start : offset[1]].strip()


class ContractConstructor:
    _dir_color = "bright magenta"

    def __init__(self, parent: "ContractContainer", name: str) -> None:
        self._parent = parent
        try:
            self.abi = next(i for i in parent.abi if i["type"] == "constructor")
            self.abi["name"] = "constructor"
        except Exception:
            self.abi = {"inputs": [], "name": "constructor", "type": "constructor"}
        self._name = name

    @property
    def payable(self) -> bool:
        if "payable" in self.abi:
            return self.abi["payable"]
        else:
            return self.abi["stateMutability"] == "payable"

    def __repr__(self) -> str:
        return f"<{type(self).__name__} '{self._name}.constructor({_inputs(self.abi)})'>"

    def __call__(
        self, *args: Tuple, publish_source: bool = False, silent: bool = False
    ) -> Union["Contract", TransactionReceiptType]:
        """Deploys a contract.

        Args:
            *args: Constructor arguments. The last argument MUST be a dictionary
                   of transaction values containing at minimum a 'from' key to
                   specify which account to deploy this contract from.

        Returns:
            * Contract instance if the transaction confirms
            * TransactionReceipt if the transaction is pending or reverts"""
        args, tx = _get_tx(None, args)
        if not tx["from"]:
            raise AttributeError(
                "Final argument must be a dict of transaction parameters that "
                "includes a `from` field specifying the address to deploy from"
            )

        return tx["from"].deploy(
            self._parent,
            *args,
            amount=tx["value"],
            gas_limit=tx["gas"],
            gas_price=tx.get("gas_price"),
            max_fee=tx.get("max_fee"),
            priority_fee=tx.get("priority_fee"),
            nonce=tx["nonce"],
            required_confs=tx["required_confs"],
            allow_revert=tx.get("allow_revert"),
            publish_source=publish_source,
            silent=silent,
        )

    @staticmethod
    def _autosuggest(obj: "ContractConstructor") -> List:
        return _contract_method_autosuggest(obj.abi["inputs"], True, obj.payable)

    def encode_input(self, *args: tuple) -> str:
        bytecode = self._parent.bytecode
        # find and replace unlinked library pointers in bytecode
        for marker in re.findall("_{1,}[^_]*_{1,}", bytecode):
            library = marker.strip("_")
            if not self._parent._project[library]:
                raise UndeployedLibrary(
                    f"Contract requires '{library}' library, but it has not been deployed yet"
                )
            address = self._parent._project[library][-1].address[-40:]
            bytecode = bytecode.replace(marker, address)

        data = format_input(self.abi, args)
        types_list = get_type_strings(self.abi["inputs"])
        return bytecode + eth_abi.encode(types_list, data).hex()

    def estimate_gas(self, *args: Tuple) -> int:
        """
        Estimate the gas cost for the deployment.

        Raises VirtualMachineError if the transaction would revert.

        Arguments
        ---------
        *args
            Constructor arguments. The last argument MUST be a dictionary
            of transaction values containing at minimum a 'from' key to
            specify which account to deploy this contract from.

        Returns
        -------
        int
            Estimated gas value in wei.
        """
        args, tx = _get_tx(None, args)
        if not tx["from"]:
            raise AttributeError(
                "Final argument must be a dict of transaction parameters that "
                "includes a `from` field specifying the sender of the transaction"
            )

        return tx["from"].estimate_gas(amount=tx["value"], data=self.encode_input(*args))


class InterfaceContainer:
    """
    Container class that provides access to interfaces within a project.
    """

    def __init__(self, project: Any) -> None:
        self._project = project

        # automatically populate with interfaces in `data/interfaces`
        # overwritten if a project contains an interface with the same name
        for path in BROWNIE_FOLDER.glob("data/interfaces/*.json"):
            with path.open() as fp:
                abi = json.load(fp)
            self._add(path.stem, abi)

    def _add(self, name: str, abi: List) -> None:
        constructor = InterfaceConstructor(name, abi)
        setattr(self, name, constructor)


class InterfaceConstructor:
    """
    Constructor used to create Contract objects from a project interface.
    """

    def __init__(self, name: str, abi: List) -> None:
        self._name = name
        self.abi = abi
        self.selectors = {
            build_function_selector(i): i["name"] for i in self.abi if i["type"] == "function"
        }

    def __call__(self, address: str, owner: Optional[AccountsType] = None) -> "Contract":
        return Contract.from_abi(self._name, address, self.abi, owner, persist=False)

    def __repr__(self) -> str:
        return f"<{type(self).__name__} '{self._name}'>"

    def decode_input(self, calldata: Union[str, bytes]) -> Tuple[str, Any]:
        """
        Decode input calldata for this contract.

        Arguments
        ---------
        calldata : str | bytes
            Calldata for a call to this contract

        Returns
        -------
        str
            Signature of the function that was called
        Any
            Decoded input arguments
        """
        if not isinstance(calldata, HexBytes):
            calldata = HexBytes(calldata)

        fn_selector = calldata[:4].hex()  # type: ignore
        abi = next(
            (
                i
                for i in self.abi
                if i["type"] == "function" and build_function_selector(i) == fn_selector
            ),
            None,
        )
        if abi is None:
            raise ValueError("Four byte selector does not match the ABI for this contract")

        function_sig = build_function_signature(abi)

        types_list = get_type_strings(abi["inputs"])
        result = eth_abi.decode(types_list, calldata[4:])
        input_args = format_input(abi, result)

        return function_sig, input_args


class _DeployedContractBase(_ContractBase):
    """Methods for interacting with a deployed contract.

    Each public contract method is available as a ContractCall or ContractTx
    instance, created when this class is instantiated.

    Attributes:
        bytecode: Bytecode of the deployed contract, including constructor args.
        tx: TransactionReceipt of the of the tx that deployed the contract."""

    _reverted = False
    _initialized = False

    def __init__(
        self, address: str, owner: Optional[AccountsType] = None, tx: TransactionReceiptType = None
    ) -> None:
        address = _resolve_address(address)
        self.bytecode = web3.eth.get_code(address).hex()[2:]
        if not self.bytecode:
            raise ContractNotFound(f"No contract deployed at {address}")
        self._owner = owner
        self.tx = tx
        self.address = address
        self.events = ContractEvents(self)
        _add_deployment_topics(address, self.abi)

        fn_names = [i["name"] for i in self.abi if i["type"] == "function"]
        for abi in [i for i in self.abi if i["type"] == "function"]:
            name = f"{self._name}.{abi['name']}"
            sig = build_function_signature(abi)
            natspec: Dict = {}
            if self._build.get("natspec"):
                natspec = self._build["natspec"]["methods"].get(sig, {})

            if fn_names.count(abi["name"]) == 1:
                fn = _get_method_object(address, abi, name, owner, natspec)
                self._check_and_set(abi["name"], fn)
                continue

            # special logic to handle function overloading
            if not hasattr(self, abi["name"]):
                overloaded = OverloadedMethod(address, name, owner)
                self._check_and_set(abi["name"], overloaded)
            getattr(self, abi["name"])._add_fn(abi, natspec)

        self._initialized = True

    def _check_and_set(self, name: str, obj: Any) -> None:
        if name == "balance":
            warnings.warn(
                f"'{self._name}' defines a 'balance' function, "
                f"'{self._name}.balance' is available as {self._name}.wei_balance",
                BrownieEnvironmentWarning,
            )
            setattr(self, "wei_balance", self.balance)
        elif hasattr(self, name):
            warnings.warn(
                "Namespace collision between contract function and "
                f"brownie `Contract` class member: '{self._name}.{name}'\n"
                f"The {name} function will not be available when interacting with {self._name}",
                BrownieEnvironmentWarning,
            )
            return
        setattr(self, name, obj)

    def __hash__(self) -> int:
        return hash(f"{self._name}{self.address}{self._project}")

    def __str__(self) -> str:
        return self.address

    def __repr__(self) -> str:
        alias = self._build.get("alias")
        if alias:
            return f"<'{alias}' Contract '{self.address}'>"
        return f"<{self._name} Contract '{self.address}'>"

    def __eq__(self, other: object) -> bool:
        if isinstance(other, _DeployedContractBase):
            return self.address == other.address and self.bytecode == other.bytecode
        if isinstance(other, str):
            try:
                address = _resolve_address(other)
                return address == self.address
            except ValueError:
                return False
        return super().__eq__(other)

    def __getattribute__(self, name: str) -> Any:
        if super().__getattribute__("_reverted"):
            raise ContractNotFound("This contract no longer exists.")
        try:
            return super().__getattribute__(name)
        except AttributeError:
            raise AttributeError(f"Contract '{self._name}' object has no attribute '{name}'")

    def __setattr__(self, name: str, value: Any) -> None:
        if self._initialized and hasattr(self, name):
            if isinstance(getattr(self, name), _ContractMethod):
                raise AttributeError(
                    f"{self._name}.{name} is a contract function, it cannot be assigned to"
                )
        super().__setattr__(name, value)

    def get_method_object(self, calldata: str) -> Optional["_ContractMethod"]:
        """
        Given a calldata hex string, returns a `ContractMethod` object.
        """
        sig = calldata[:10].lower()
        if sig not in self.selectors:
            return None
        fn = getattr(self, self.selectors[sig], None)
        if isinstance(fn, OverloadedMethod):
            return next((v for v in fn.methods.values() if v.signature == sig), None)
        return fn

    def balance(self) -> Wei:
        """Returns the current ether balance of the contract, in wei."""
        balance = web3.eth.get_balance(self.address)
        return Wei(balance)

    def _deployment_path(self) -> Optional[Path]:
        if not self._project._path or (
            CONFIG.network_type != "live" and not CONFIG.settings["dev_deployment_artifacts"]
        ):
            return None

        chainid = CONFIG.active_network["chainid"] if CONFIG.network_type == "live" else "dev"
        path = self._project._build_path.joinpath(f"deployments/{chainid}")
        path.mkdir(exist_ok=True)
        return path.joinpath(f"{self.address}.json")

    def _save_deployment(self) -> None:
        path = self._deployment_path()
        chainid = CONFIG.active_network["chainid"] if CONFIG.network_type == "live" else "dev"
        deployment_build = self._build.copy()

        deployment_build["deployment"] = {
            "address": self.address,
            "chainid": chainid,
            "blockHeight": web3.eth.block_number,
        }
        if path:
            self._project._add_to_deployment_map(self)
            if not path.exists():
                with path.open("w") as fp:
                    json.dump(deployment_build, fp)

    def _delete_deployment(self) -> None:
        path = self._deployment_path()
        if path:
            self._project._remove_from_deployment_map(self)
            if path.exists():
                path.unlink()


class Contract(_DeployedContractBase):
    """
    Object to interact with a deployed contract outside of a project.
    """

    def __init__(
        self, address_or_alias: str, *args: Any, owner: Optional[AccountsType] = None, **kwargs: Any
    ) -> None:
        """
        Recreate a `Contract` object from the local database.

        The init method is used to access deployments that have already previously
        been stored locally. For new deployments use `from_abi` or `from_etherscan`.

        Arguments
        ---------
        address_or_alias : str
            Address or user-defined alias of the deployment.
        owner : Account, optional
            Contract owner. If set, transactions without a `from` field
            will be performed using this account.
        """
        address_or_alias = address_or_alias.strip()

        if args or kwargs:
            warnings.warn(
                "Initializing `Contract` in this manner is deprecated." " Use `from_abi` instead.",
                DeprecationWarning,
            )
            kwargs["owner"] = owner
            return self._deprecated_init(address_or_alias, *args, **kwargs)

        address = ""
        try:
            address = _resolve_address(address_or_alias)
            build, sources = _get_deployment(address)
        except Exception:
            build, sources = _get_deployment(alias=address_or_alias)
            if build is not None:
                address = build["address"]

        if build is None or sources is None:
            if (
                not address
                or not CONFIG.settings.get("autofetch_sources")
                or not CONFIG.active_network.get("explorer")
            ):
                if not address:
                    raise ValueError(f"Unknown alias: '{address_or_alias}'")
                else:
                    raise ValueError(f"Unknown contract address: '{address}'")
            contract = self.from_explorer(address, owner=owner, silent=True)
            build, sources = contract._build, contract._sources
            address = contract.address

        _ContractBase.__init__(self, None, build, sources)
        _DeployedContractBase.__init__(self, address, owner)

    def _deprecated_init(
        self,
        name: str,
        address: Optional[str] = None,
        abi: Optional[List] = None,
        manifest_uri: Optional[str] = None,
        owner: Optional[AccountsType] = None,
    ) -> None:
        if manifest_uri:
            raise ValueError("ethPM functionality removed")

        if not address:
            raise TypeError("Address cannot be None unless creating object from manifest")

        build = {"abi": abi, "contractName": name, "type": "contract"}
        _ContractBase.__init__(self, None, build, {})  # type: ignore
        _DeployedContractBase.__init__(self, address, owner, None)

    @classmethod
    def from_abi(
        cls,
        name: str,
        address: str,
        abi: List,
        owner: Optional[AccountsType] = None,
        persist: bool = True,
    ) -> "Contract":
        """
        Create a new `Contract` object from an ABI.

        Arguments
        ---------
        name : str
            Name of the contract.
        address : str
            Address where the contract is deployed.
        abi : dict
            Contract ABI, given as a dictionary.
        owner : Account, optional
            Contract owner. If set, transactions without a `from` field
            will be performed using this account.
        """
        address = _resolve_address(address)
        build = {"abi": abi, "address": address, "contractName": name, "type": "contract"}

        self = cls.__new__(cls)
        _ContractBase.__init__(self, None, build, {})  # type: ignore
        _DeployedContractBase.__init__(self, address, owner, None)
        if persist:
            _add_deployment(self)
        return self

    @classmethod
    def from_explorer(
        cls,
        address: str,
        as_proxy_for: Optional[str] = None,
        owner: Optional[AccountsType] = None,
        silent: bool = False,
        persist: bool = True,
    ) -> "Contract":
        """
        Create a new `Contract` object with source code queried from a block explorer.

        Arguments
        ---------
        address : str
            Address where the contract is deployed.
        as_proxy_for : str, optional
            Address of the implementation contract, if `address` is a proxy contract.
            The generated object will send transactions to `address`, but use the ABI
            and NatSpec of `as_proxy_for`. This field is only required when the
            block explorer API does not provide an implementation address.
        owner : Account, optional
            Contract owner. If set, transactions without a `from` field will be
            performed using this account.
        """
        address = _resolve_address(address)
        data = _fetch_from_explorer(address, "getsourcecode", silent)
        is_verified = bool(data["result"][0].get("SourceCode"))

        if is_verified:
            abi = json.loads(data["result"][0]["ABI"])
            name = data["result"][0]["ContractName"]
        else:
            # if the source is not available, try to fetch only the ABI
            try:
                data_abi = _fetch_from_explorer(address, "getabi", True)
            except ValueError as exc:
                _unverified_addresses.add(address)
                raise exc
            abi = json.loads(data_abi["result"].strip())
            name = "UnknownContractName"
            if not silent:
                warnings.warn(
                    f"{address}: Was able to fetch the ABI but not the source code. "
                    "Some functionality will not be available.",
                    BrownieCompilerWarning,
                )

        if as_proxy_for is None:
            # always check for an EIP1967 proxy - https://eips.ethereum.org/EIPS/eip-1967
            implementation_eip1967 = web3.eth.get_storage_at(
                address, int(web3.keccak(text="eip1967.proxy.implementation").hex(), 16) - 1
            )
            # always check for an EIP1822 proxy - https://eips.ethereum.org/EIPS/eip-1822
            implementation_eip1822 = web3.eth.get_storage_at(address, web3.keccak(text="PROXIABLE"))
            if len(implementation_eip1967) > 0 and int(implementation_eip1967.hex(), 16):
                as_proxy_for = _resolve_address(implementation_eip1967[-20:])
            elif len(implementation_eip1822) > 0 and int(implementation_eip1822.hex(), 16):
                as_proxy_for = _resolve_address(implementation_eip1822[-20:])
            elif data["result"][0].get("Implementation"):
                # for other proxy patterns, we only check if etherscan indicates
                # the contract is a proxy. otherwise we could have a false positive
                # if there is an `implementation` method on a regular contract.
                try:
                    # first try to call `implementation` per EIP897
                    # https://eips.ethereum.org/EIPS/eip-897
                    contract = cls.from_abi(name, address, abi)
                    as_proxy_for = contract.implementation.call()
                except Exception:
                    # if that fails, fall back to the address provided by etherscan
                    as_proxy_for = _resolve_address(data["result"][0]["Implementation"])

        if as_proxy_for == address:
            as_proxy_for = None

        # if this is a proxy, fetch information for the implementation contract
        if as_proxy_for is not None:
            implementation_contract = Contract.from_explorer(as_proxy_for)
            abi = implementation_contract._build["abi"]

        if not is_verified:
            return cls.from_abi(name, address, abi, owner)

        compiler_str = data["result"][0]["CompilerVersion"]
        if compiler_str.startswith("vyper:"):
            try:
                version = to_vyper_version(compiler_str[6:])
                is_compilable = version in get_installable_vyper_versions()
            except Exception:
                is_compilable = False
        else:
            try:
                version = cls.get_solc_version(compiler_str, address)

                is_compilable = (
                    version >= Version("0.4.22")
                    and version
                    in solcx.get_installable_solc_versions() + solcx.get_installed_solc_versions()
                )
            except Exception:
                is_compilable = False

        if not is_compilable:
            if not silent:
                warnings.warn(
                    f"{address}: target compiler '{compiler_str}' cannot be installed or is not "
                    "supported by Brownie. Some debugging functionality will not be available.",
                    BrownieCompilerWarning,
                )
            return cls.from_abi(name, address, abi, owner)
        elif data["result"][0]["OptimizationUsed"] in ("true", "false"):
            if not silent:
                warnings.warn(
                    f"Blockscout explorer API has limited support by Brownie. "  # noqa
                    "Some debugging functionality will not be available.",
                    BrownieCompilerWarning,
                )
            return cls.from_abi(name, address, abi, owner)

        optimizer = {
            "enabled": bool(int(data["result"][0]["OptimizationUsed"])),
            "runs": int(data["result"][0]["Runs"]),
        }
        evm_version = data["result"][0].get("EVMVersion", "Default")
        if evm_version == "Default":
            evm_version = None

        source_str = "\n".join(data["result"][0]["SourceCode"].splitlines())
        try:
            if source_str.startswith("{{"):
                # source was verified using compiler standard JSON
                input_json = json.loads(source_str[1:-1])
                sources = {k: v["content"] for k, v in input_json["sources"].items()}
                evm_version = input_json["settings"].get("evmVersion", evm_version)
                remappings = input_json["settings"].get("remappings", [])

                compiler.set_solc_version(str(version))
                input_json.update(
                    compiler.generate_input_json(
                        sources, optimizer=optimizer, evm_version=evm_version, remappings=remappings
                    )
                )
                output_json = compiler.compile_from_input_json(input_json)
                build_json = compiler.generate_build_json(input_json, output_json)
            else:
                if source_str.startswith("{"):
                    # source was submitted as multiple files
                    sources = {k: v["content"] for k, v in json.loads(source_str).items()}
                else:
                    # source was submitted as a single file
                    if compiler_str.startswith("vyper"):
                        path_str = f"{name}.vy"
                    else:
                        path_str = f"{name}-flattened.sol"
                    sources = {path_str: source_str}

                build_json = compiler.compile_and_format(
                    sources,
                    solc_version=str(version),
                    vyper_version=str(version),
                    optimizer=optimizer,
                    evm_version=evm_version,
                )
        except Exception as e:
            if not silent:
                warnings.warn(
                    f"{address}: Compilation failed due to {type(e).__name__}. Falling back to ABI,"
                    " some functionality will not be available.",
                    BrownieCompilerWarning,
                )
            return cls.from_abi(name, address, abi, owner)

        build_json = build_json[name]
        if as_proxy_for is not None:
            build_json.update(abi=abi, natspec=implementation_contract._build.get("natspec"))

        if not _verify_deployed_code(
            address, build_json["deployedBytecode"], build_json["language"]
        ):
            if not silent:
                warnings.warn(
                    f"{address}: Locally compiled and on-chain bytecode do not match!",
                    BrownieCompilerWarning,
                )
            del build_json["pcMap"]

        self = cls.__new__(cls)
        _ContractBase.__init__(self, None, build_json, sources)  # type: ignore
        _DeployedContractBase.__init__(self, address, owner)
        if persist:
            _add_deployment(self)
        return self

[docs] @classmethod def get_solc_version(cls, compiler_str: str, address: str) -> Version: """ Return the solc compiler version either from the passed compiler string or try to find the latest available patch semver compiler version. Arguments --------- compiler_str: str The compiler string passed from the contract metadata. address: str The contract address to check for. """ version = Version(compiler_str.lstrip("v")).truncate() compiler_config = _load_project_compiler_config(Path(os.getcwd())) solc_config = compiler_config["solc"] if "use_latest_patch" in solc_config: use_latest_patch = solc_config["use_latest_patch"] needs_patch_version = False if isinstance(use_latest_patch, bool): needs_patch_version = use_latest_patch elif isinstance(use_latest_patch, list): needs_patch_version = address in use_latest_patch if needs_patch_version: versions = [Version(str(i)) for i in solcx.get_installable_solc_versions()] for v in filter(lambda x: x < version.next_minor(), versions): if v > version: version = v return version
[docs] @classmethod def remove_deployment( cls, address: str = None, alias: str = None ) -> Tuple[Optional[Dict], Optional[Dict]]: """ Removes this contract from the internal deployments db with the passed address or alias. Arguments --------- address: str | None An address to apply alias: str | None An alias to apply """ return _remove_deployment(address, alias)
[docs] def set_alias(self, alias: Optional[str], persist: bool = True) -> None: """ Apply a unique alias this object. The alias can be used to restore the object in future sessions. Arguments --------- alias: str | None An alias to apply. If `None`, any existing alias is removed. """ if "chainid" not in CONFIG.active_network: raise ValueError("Cannot set aliases in a development environment") if alias is not None: if "." in alias or alias.lower().startswith("0x"): raise ValueError("Invalid alias") build, _ = _get_deployment(alias=alias) if build is not None: if build["address"] != self.address: raise ValueError("Alias is already in use on another contract") return if persist: _add_deployment(self, alias) self._build["alias"] = alias
@property def alias(self) -> Optional[str]: return self._build.get("alias") class ProjectContract(_DeployedContractBase): """Methods for interacting with a deployed contract as part of a Brownie project.""" def __init__( self, project: Any, build: Dict, address: str, owner: Optional[AccountsType] = None, tx: TransactionReceiptType = None, ) -> None: _ContractBase.__init__(self, project, build, project._sources) _DeployedContractBase.__init__(self, address, owner, tx) class ContractEvents(_ContractEvents):
[docs] def __init__(self, contract: _DeployedContractBase): self.linked_contract = contract # Ignoring type since ChecksumAddress type is an alias for string _ContractEvents.__init__(self, contract.abi, web3, contract.address) # type: ignore
[docs] def subscribe( self, event_name: str, callback: Callable[[AttributeDict], None], delay: float = 2.0 ) -> None: """ Subscribe to event with a name matching 'event_name', calling the 'callback' function on new occurrence giving as parameter the event log receipt. Args: event_name (str): Name of the event to subscribe to. callback (Callable[[AttributeDict], None]): Function called whenever an event occurs. delay (float, optional): Delay between each check for new events. Defaults to 2.0. """ target_event: ContractEvent = self.__getitem__(event_name) # type: ignore event_watcher.add_event_callback(event=target_event, callback=callback, delay=delay)
[docs] def get_sequence( self, from_block: int, to_block: int = None, event_type: Union[ContractEvent, str] = None ) -> Union[List[AttributeDict], AttributeDict]: """Returns the logs of events of type 'event_type' that occurred between the blocks 'from_block' and 'to_block'. If 'event_type' is not specified, it retrieves the occurrences of all events in the contract. Args: from_block (int): The block from which to search for events that have occurred. to_block (int, optional): The block on which to stop searching for events. if not specified, it is set to the most recently mined block (web3.eth.block_number). Defaults to None. event_type (ContractEvent, str, optional): Type or name of the event to be searched between the specified blocks. Defaults to None. Returns: if 'event_type' is specified: [list]: List of events of type 'event_type' that occurred between 'from_block' and 'to_block'. else: event_logbook [dict]: Dictionary of events of the contract that occurred between 'from_block' and 'to_block'. """ if to_block is None or to_block > web3.eth.block_number: to_block = web3.eth.block_number # Returns event sequence for the specified event if event_type is not None: if isinstance(event_type, str): # If 'event_type' is a string, search for an event with a name matching it. event_type: ContractEvent = self.__getitem__(event_type) # type: ignore return self._retrieve_contract_events(event_type, from_block, to_block) # Returns event sequence for all contract events events_logbook = dict() for event in ContractEvents.__iter__(self): events_logbook[event.event_name] = self._retrieve_contract_events( event, from_block, to_block ) return AttributeDict(events_logbook)
[docs] def listen(self, event_name: str, timeout: float = 0) -> Coroutine: """ Creates a listening Coroutine object ending whenever an event matching 'event_name' occurs. If timeout is superior to zero and no event matching 'event_name' has occured, the Coroutine ends when the timeout is reached. The Coroutine return value is an AttributeDict filled with the following fields : - 'event_data' (AttributeDict): The event log receipt that was caught. - 'timed_out' (bool): False if the event did not timeout, else True If the 'timeout' parameter is not passed or is inferior or equal to 0, the Coroutine listens indefinitely. Args: event_name (str): Name of the event to be listened to. timeout (float, optional): Timeout value in seconds. Defaults to 0. Returns: Coroutine: Awaitable object listening for the event matching 'event_name'. """ _triggered: bool = False _received_data: Union[AttributeDict, None] = None def _event_callback(event_data: AttributeDict) -> None: """ Fills the nonlocal varialbe '_received_data' with the received argument 'event_data' and sets the nonlocal '_triggered' variable to True """ nonlocal _triggered, _received_data _received_data = event_data _triggered = True _listener_end_time = time.time() + timeout async def _listening_task(is_timeout: bool, end_time: float) -> AttributeDict: """Generates and returns a coroutine listening for an event""" nonlocal _triggered, _received_data timed_out: bool = False while not _triggered: if is_timeout and end_time <= time.time(): timed_out = True break await asyncio.sleep(0.05) return AttributeDict({"event_data": _received_data, "timed_out": timed_out}) target_event: ContractEvent = self.__getitem__(event_name) # type: ignore event_watcher.add_event_callback( event=target_event, callback=_event_callback, delay=0.2, repeat=False ) return _listening_task(bool(timeout > 0), _listener_end_time)
@combomethod def _retrieve_contract_events( self, event_type: ContractEvent, from_block: int = None, to_block: int = None ) -> List[LogReceipt]: """ Retrieves all log receipts from 'event_type' between 'from_block' and 'to_block' blocks """ if to_block is None: to_block = web3.eth.block_number if from_block is None and isinstance(to_block, int): from_block = to_block - 10 event_filter: filters.LogFilter = event_type.create_filter( fromBlock=from_block, toBlock=to_block ) return event_filter.get_all_entries() class OverloadedMethod: def __init__(self, address: str, name: str, owner: Optional[AccountsType]): self._address = address self._name = name self._owner = owner self.methods: Dict = {} self.natspec: Dict = {} def _add_fn(self, abi: Dict, natspec: Dict) -> None: fn = _get_method_object(self._address, abi, self._name, self._owner, natspec) key = tuple(i["type"].replace("256", "") for i in abi["inputs"]) self.methods[key] = fn self.natspec.update(natspec) def _get_fn_from_args(self, args: Tuple) -> "_ContractMethod": input_length = len(args) if args and isinstance(args[-1], dict): input_length -= 1 keys = [i for i in self.methods if len(i) == input_length] if not keys: raise ValueError("No function matching the given number of arguments") if len(keys) > 1: raise ValueError( f"Contract has more than one function '{self._name}' requiring " f"{input_length} arguments. You must explicitly declare which function " f"you are calling, e.g. {self._name}['{','.join(keys[0])}'](*args)" ) return self.methods[keys[0]] def __getitem__(self, key: Union[Tuple, str]) -> "_ContractMethod": if isinstance(key, str): key = tuple(i.strip() for i in key.split(",")) key = tuple(i.replace("256", "") for i in key) return self.methods[key] def __repr__(self) -> str: return f"<OverloadedMethod '{self._name}'>" def __len__(self) -> int: return len(self.methods) def __call__( self, *args: Tuple, block_identifier: Union[int, str, bytes] = None, override: Dict = None ) -> Any: fn = self._get_fn_from_args(args) kwargs = {"block_identifier": block_identifier, "override": override} kwargs = {k: v for k, v in kwargs.items() if v is not None} return fn(*args, **kwargs) # type: ignore def call( self, *args: Tuple, block_identifier: Union[int, str, bytes] = None, override: Dict = None ) -> Any: """ Call the contract method without broadcasting a transaction. The specific function called is chosen based on the number of arguments given. If more than one function exists with this number of arguments, a `ValueError` is raised. Arguments --------- *args Contract method inputs. You can optionally provide a dictionary of transaction properties as the last arg. block_identifier : int | str | bytes, optional A block number or hash that the call is executed at. If not given, the latest block used. Raises `ValueError` if this value is too far in the past and you are not using an archival node. override : dict, optional A mapping from addresses to balance, nonce, code, state, stateDiff overrides for the context of the call. Returns ------- Contract method return value(s). """ fn = self._get_fn_from_args(args) return fn.call(*args, block_identifier=block_identifier, override=override) def transact(self, *args: Tuple) -> TransactionReceiptType: """ Broadcast a transaction that calls this contract method. The specific function called is chosen based on the number of arguments given. If more than one function exists with this number of arguments, a `ValueError` is raised. Arguments --------- *args Contract method inputs. You can optionally provide a dictionary of transaction properties as the last arg. Returns ------- TransactionReceipt Object representing the broadcasted transaction. """ fn = self._get_fn_from_args(args) return fn.transact(*args) def encode_input(self, *args: Tuple) -> Any: """ Generate encoded ABI data to call the method with the given arguments. Arguments --------- *args Contract method inputs Returns ------- str Hexstring of encoded ABI data """ fn = self._get_fn_from_args(args) return fn.encode_input(*args) def decode_input(self, hexstr: str) -> List: """ Decode input call data for this method. Arguments --------- hexstr : str Hexstring of input call data Returns ------- Decoded values """ selector = HexBytes(hexstr)[:4].hex() fn = next((i for i in self.methods.values() if i == selector), None) if fn is None: raise ValueError( "Data cannot be decoded using any input signatures of functions of this name" ) return fn.decode_input(hexstr) def decode_output(self, hexstr: str) -> Tuple: """ Decode hexstring data returned by this method. Arguments --------- hexstr : str Hexstring of returned call data Returns ------- Decoded values """ for fn in self.methods.values(): try: return fn.decode_output(hexstr) except Exception: pass raise ValueError( "Data cannot be decoded using any output signatures of functions of this name" ) def info(self) -> None: """ Display NatSpec documentation for this method. """ fn_sigs = [] for fn in self.methods.values(): fn_sigs.append(f"{fn.abi['name']}({_inputs(fn.abi)})") for sig in sorted(fn_sigs, key=lambda k: len(k)): print(sig) _print_natspec(self.natspec) class _ContractMethod: _dir_color = "bright magenta" def __init__( self, address: str, abi: Dict, name: str, owner: Optional[AccountsType], natspec: Optional[Dict] = None, ) -> None: self._address = address self._name = name self.abi = abi self._owner = owner self.signature = build_function_selector(abi) self._input_sig = build_function_signature(abi) self.natspec = natspec or {} def __repr__(self) -> str: pay = "payable " if self.payable else "" return f"<{type(self).__name__} {pay}'{self.abi['name']}({_inputs(self.abi)})'>" @property def payable(self) -> bool: if "payable" in self.abi: return self.abi["payable"] else: return self.abi["stateMutability"] == "payable" @staticmethod def _autosuggest(obj: "_ContractMethod") -> List: # this is a staticmethod to be compatible with `_call_suggest` and `_transact_suggest` return _contract_method_autosuggest( obj.abi["inputs"], isinstance(obj, ContractTx), obj.payable ) def info(self) -> None: """ Display NatSpec documentation for this method. """ print(f"{self.abi['name']}({_inputs(self.abi)})") _print_natspec(self.natspec) def call( self, *args: Tuple, block_identifier: Union[int, str, bytes] = None, override: Dict = None ) -> Any: """ Call the contract method without broadcasting a transaction. Arguments --------- *args Contract method inputs. You can optionally provide a dictionary of transaction properties as the last arg. block_identifier : int | str | bytes, optional A block number or hash that the call is executed at. If not given, the latest block used. Raises `ValueError` if this value is too far in the past and you are not using an archival node. override : dict, optional A mapping from addresses to balance, nonce, code, state, stateDiff overrides for the context of the call. Returns ------- Contract method return value(s). """ args, tx = _get_tx(self._owner, args) if tx["from"]: tx["from"] = str(tx["from"]) del tx["required_confs"] tx.update({"to": self._address, "data": self.encode_input(*args)}) try: data = web3.eth.call({k: v for k, v in tx.items() if v}, block_identifier, override) except ValueError as e: raise VirtualMachineError(e) from None if self.abi["outputs"] and not data: raise ValueError("No data was returned - the call likely reverted") try: return self.decode_output(data) except Exception: raise ValueError(f"Call reverted: {decode_typed_error(data)}") from None def transact(self, *args: Tuple, silent: bool = False) -> TransactionReceiptType: """ Broadcast a transaction that calls this contract method. Arguments --------- *args Contract method inputs. You can optionally provide a dictionary of transaction properties as the last arg. Returns ------- TransactionReceipt Object representing the broadcasted transaction. """ args, tx = _get_tx(self._owner, args) if not tx["from"]: raise AttributeError( "Final argument must be a dict of transaction parameters that " "includes a `from` field specifying the sender of the transaction" ) return tx["from"].transfer( self._address, tx["value"], gas_limit=tx["gas"], gas_buffer=tx.get("gas_buffer"), gas_price=tx.get("gas_price"), max_fee=tx.get("max_fee"), priority_fee=tx.get("priority_fee"), nonce=tx["nonce"], required_confs=tx["required_confs"], data=self.encode_input(*args), allow_revert=tx["allow_revert"], silent=silent, ) def decode_input(self, hexstr: str) -> List: """ Decode input call data for this method. Arguments --------- hexstr : str Hexstring of input call data Returns ------- Decoded values """ types_list = get_type_strings(self.abi["inputs"]) result = eth_abi.decode(types_list, HexBytes(hexstr)[4:]) return format_input(self.abi, result) def encode_input(self, *args: Tuple) -> str: """ Generate encoded ABI data to call the method with the given arguments. Arguments --------- *args Contract method inputs Returns ------- str Hexstring of encoded ABI data """ data = format_input(self.abi, args) types_list = get_type_strings(self.abi["inputs"]) return self.signature + eth_abi.encode(types_list, data).hex() def decode_output(self, hexstr: str) -> Tuple: """ Decode hexstring data returned by this method. Arguments --------- hexstr : str Hexstring of returned call data Returns ------- Decoded values """ types_list = get_type_strings(self.abi["outputs"]) result = eth_abi.decode(types_list, HexBytes(hexstr)) result = format_output(self.abi, result) if len(result) == 1: result = result[0] return result def estimate_gas(self, *args: Tuple) -> int: """ Estimate the gas cost for a transaction. Raises VirtualMachineError if the transaction would revert. Arguments --------- *args Contract method inputs Returns ------- int Estimated gas value in wei. """ args, tx = _get_tx(self._owner, args) if not tx["from"]: raise AttributeError( "Final argument must be a dict of transaction parameters that " "includes a `from` field specifying the sender of the transaction" ) return tx["from"].estimate_gas( to=self._address, amount=tx["value"], data=self.encode_input(*args), ) class ContractTx(_ContractMethod): """ A public payable or non-payable contract method. Attributes ---------- abi : dict Contract ABI specific to this method. signature : str Bytes4 method signature. """ def __call__(self, *args: Tuple, silent: bool = False) -> TransactionReceiptType: """ Broadcast a transaction that calls this contract method. Arguments --------- *args Contract method inputs. You can optionally provide a dictionary of transaction properties as the last arg. Returns ------- TransactionReceipt Object representing the broadcasted transaction. """ return self.transact(*args, silent=silent) class ContractCall(_ContractMethod): """ A public view or pure contract method. Attributes ---------- abi : dict Contract ABI specific to this method. signature : str Bytes4 method signature. """ def __call__( self, *args: Tuple, block_identifier: Union[int, str, bytes] = None, override: Dict = None ) -> Any: """ Call the contract method without broadcasting a transaction. Arguments --------- args Contract method inputs. You can optionally provide a dictionary of transaction properties as the last arg. block_identifier : int | str | bytes, optional A block number or hash that the call is executed at. If not given, the latest block used. Raises `ValueError` if this value is too far in the past and you are not using an archival node. override : dict, optional A mapping from addresses to balance, nonce, code, state, stateDiff overrides for the context of the call. Returns ------- Contract method return value(s). """ if not CONFIG.argv["always_transact"] or block_identifier is not None: return self.call(*args, block_identifier=block_identifier, override=override) args, tx = _get_tx(self._owner, args) tx.update({"gas_price": 0, "from": self._owner or accounts[0]}) pc, revert_msg = None, None try: self.transact(*args, tx) chain.undo() except VirtualMachineError as exc: pc, revert_msg = exc.pc, exc.revert_msg chain.undo() except Exception: pass try: return self.call(*args) except VirtualMachineError as exc: if pc == exc.pc and revert_msg and exc.revert_msg is None: # in case we miss a dev revert string exc.revert_msg = revert_msg raise exc def _get_tx(owner: Optional[AccountsType], args: Tuple) -> Tuple: # set / remove default sender if owner is None: owner = accounts.default default_owner = CONFIG.active_network["settings"]["default_contract_owner"] if CONFIG.mode == "test" and default_owner is False: owner = None # seperate contract inputs from tx dict and set default tx values tx = { "from": owner, "value": 0, "gas": None, "gas_buffer": None, "nonce": None, "required_confs": 1, "allow_revert": None, } if args and isinstance(args[-1], dict): tx.update(args[-1]) args = args[:-1] # key substitution to provide compatibility with web3.py for key, target in [("amount", "value"), ("gas_limit", "gas"), ("gas_price", "gasPrice")]: if key in tx: tx[target] = tx[key] # enable the magic of ganache's `evm_unlockUnknownAccount` if isinstance(tx["from"], str): tx["from"] = accounts.at(tx["from"], force=True) elif isinstance(tx["from"], _DeployedContractBase): tx["from"] = accounts.at(tx["from"].address, force=True) return args, tx def _get_method_object( address: str, abi: Dict, name: str, owner: Optional[AccountsType], natspec: Dict ) -> Union["ContractCall", "ContractTx"]: if "constant" in abi: constant = abi["constant"] else: constant = abi["stateMutability"] in ("view", "pure") if constant: return ContractCall(address, abi, name, owner, natspec) return ContractTx(address, abi, name, owner, natspec) def _inputs(abi: Dict) -> str: types_list = get_type_strings(abi["inputs"], {"fixed168x10": "decimal"}) params = zip([i["name"] for i in abi["inputs"]], types_list) return ", ".join( f"{i[1]}{color('bright blue')}{' '+i[0] if i[0] else ''}{color}" for i in params ) def _verify_deployed_code(address: str, expected_bytecode: str, language: str) -> bool: actual_bytecode = web3.eth.get_code(address).hex()[2:] expected_bytecode = remove_0x_prefix(expected_bytecode) # type: ignore if expected_bytecode.startswith("730000000000000000000000000000000000000000"): # special case for Solidity libraries return ( actual_bytecode.startswith(f"73{address[2:].lower()}") and actual_bytecode[42:] == expected_bytecode[42:] ) if "_" in expected_bytecode: for marker in re.findall("_{1,}[^_]*_{1,}", expected_bytecode): idx = expected_bytecode.index(marker) actual_bytecode = actual_bytecode[:idx] + actual_bytecode[idx + 40 :] expected_bytecode = expected_bytecode[:idx] + expected_bytecode[idx + 40 :] if language == "Solidity": # do not include metadata in comparison idx = -(int(actual_bytecode[-4:], 16) + 2) * 2 actual_bytecode = actual_bytecode[:idx] idx = -(int(expected_bytecode[-4:], 16) + 2) * 2 expected_bytecode = expected_bytecode[:idx] if language == "Vyper": # don't check immutables section # TODO actually grab data section length from layout. return actual_bytecode.startswith(expected_bytecode) return actual_bytecode == expected_bytecode def _print_natspec(natspec: Dict) -> None: wrapper = TextWrapper(initial_indent=f" {color('bright magenta')}") for key in [i for i in ("title", "notice", "author", "details") if i in natspec]: wrapper.subsequent_indent = " " * (len(key) + 4) print(wrapper.fill(f"@{key} {color}{natspec[key]}")) for key, value in natspec.get("params", {}).items(): wrapper.subsequent_indent = " " * 9 print(wrapper.fill(f"@param {color('bright blue')}{key}{color} {value}")) if "return" in natspec: wrapper.subsequent_indent = " " * 10 print(wrapper.fill(f"@return {color}{natspec['return']}")) for key in sorted(natspec.get("returns", [])): wrapper.subsequent_indent = " " * 10 print(wrapper.fill(f"@return {color}{natspec['returns'][key]}")) print() def _fetch_from_explorer(address: str, action: str, silent: bool) -> Dict: url = CONFIG.active_network.get("explorer") if url is None: raise ValueError("Explorer API not set for this network") if address in _unverified_addresses: raise ValueError(f"Source for {address} has not been verified") code = web3.eth.get_code(address).hex()[2:] # EIP-1167: Minimal Proxy Contract if code[:20] == "363d3d373d3d3d363d73" and code[60:] == "5af43d82803e903d91602b57fd5bf3": address = _resolve_address(code[20:60]) # Vyper <0.2.9 `create_forwarder_to` elif ( code[:30] == "366000600037611000600036600073" and code[70:] == "5af4602c57600080fd5b6110006000f3" ): address = _resolve_address(code[30:70]) # 0xSplits Clones elif ( code[:120] == "36603057343d52307f830d2d700a97af574b186c80d40429385d24241565b08a7c559ba283a964d9b160203da23d3df35b3d3d3d3d363d3d37363d73" # noqa e501 and code[160:] == "5af43d3d93803e605b57fd5bf3" ): address = _resolve_address(code[120:160]) params: Dict = {"module": "contract", "action": action, "address": address} explorer, env_key = next( ((k, v) for k, v in _explorer_tokens.items() if k in url), (None, None) ) if env_key is not None: if os.getenv(env_key): params["apiKey"] = os.getenv(env_key) elif not silent: warnings.warn( f"No {explorer} API token set. You may experience issues with rate limiting. " f"Visit https://{explorer}.io/register to obtain a token, and then store it " f"as the environment variable ${env_key}", BrownieEnvironmentWarning, ) if not silent: print( f"Fetching source of {color('bright blue')}{address}{color} " f"from {color('bright blue')}{urlparse(url).netloc}{color}..." ) response = requests.get(url, params=params, headers=REQUEST_HEADERS) if response.status_code != 200: raise ConnectionError(f"Status {response.status_code} when querying {url}: {response.text}") data = response.json() if int(data["status"]) != 1: raise ValueError(f"Failed to retrieve data from API: {data}") return data # console auto-completion logic def _call_autosuggest(method: Any) -> List: # since methods are not unique for each object, we use `__reduce__` # to locate the specific object so we can access the correct ABI method = method.__reduce__()[1][0] return _contract_method_autosuggest(method.abi["inputs"], False, False) def _transact_autosuggest(method: Any) -> List: method = method.__reduce__()[1][0] return _contract_method_autosuggest(method.abi["inputs"], True, method.payable) # assign the autosuggest functionality to various methods ContractConstructor.encode_input.__dict__["_autosuggest"] = _call_autosuggest _ContractMethod.call.__dict__["_autosuggest"] = _call_autosuggest _ContractMethod.encode_input.__dict__["_autosuggest"] = _call_autosuggest ContractConstructor.estimate_gas.__dict__["_autosuggest"] = _transact_autosuggest _ContractMethod.estimate_gas.__dict__["_autosuggest"] = _transact_autosuggest _ContractMethod.transact.__dict__["_autosuggest"] = _transact_autosuggest def _contract_method_autosuggest(args: List, is_transaction: bool, is_payable: bool) -> List: types_list = get_type_strings(args, {"fixed168x10": "decimal"}) params = zip([i["name"] for i in args], types_list) if not is_transaction: tx_hint: List = [] elif is_payable: tx_hint = [" {'from': Account", " 'value': Wei}"] else: tx_hint = [" {'from': Account}"] return [f" {i[1]}{' '+i[0] if i[0] else ''}" for i in params] + tx_hint def _comment_slicer(match: Match) -> str: start, mid, end = match.group(1, 2, 3) if mid is None: # single line comment return "" elif start is not None or end is not None: # multi line comment at start or end of a line return "" elif "\n" in mid: # multi line comment with line break return "\n" else: # multi line comment without line break return " "