y.utils package
Submodules
y.utils.cache module
y.utils.checks module
Utility functions for performing various checks.
- y.utils.checks.hasall(obj, attrs)[source]
Check if an object has all the specified attributes.
- Parameters:
- Returns:
True if the object has all the specified attributes, False otherwise.
- Return type:
Example
>>> class TestClass: ... attr1 = 1 ... attr2 = 2 >>> test_obj = TestClass() >>> hasall(test_obj, ['attr1', 'attr2']) True >>> hasall(test_obj, ['attr1', 'attr3']) False
y.utils.client module
Utility functions for retrieving Ethereum client information.
- y.utils.client.get_ethereum_client()[source]
Get the Ethereum client type for the current connection.
- Returns:
A string representing the Ethereum client type.
- Return type:
Example
>>> from y.utils.client import get_ethereum_client >>> client = get_ethereum_client() >>> print(client) 'geth'
- y.utils.client.get_ethereum_client_async()[source]
Asynchronously get the Ethereum client type for the current connection.
- Returns:
A string representing the Ethereum client type.
- Return type:
Example
>>> from y.utils.client import get_ethereum_client_async >>> client = await get_ethereum_client_async() >>> print(client) 'erigon'
y.utils.dank_mids module
y.utils.events module
- class y.utils.events.Events[source]
Bases:
LogFilter
A class for fetching and processing events.
When awaited, a list of all elements will be returned.
- obj_type
alias of
_EventItem
- __aiter__()
Return an async iterator that yields
T
objects from theASyncIterable
.- Return type:
- __await__()
Asynchronously iterate through the
ASyncIterable
and return all objects.
- __init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)
Initialize a LogFilter instance.
- Parameters:
addresses – List of contract addresses to fetch logs from.
topics – List of event topics to filter logs by.
from_block (int | None) – The starting block to fetch logs from.
chunk_size (int) – The number of blocks to fetch in each chunk.
chunks_per_batch (int | None) – The number of chunks to fetch in each batch.
semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.
executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.
is_reusable (bool) – Whether the filter is reusable.
verbose (bool) – Verbosity level for logging.
- Return type:
None
- __iter__()
Return an iterator that yields
T
objects from theASyncIterable
.- Return type:
Iterator[T]
- events(to_block)[source]
Get events up to a given block.
- Parameters:
to_block (int) – The ending block to fetch events to.
- Yields:
A decoded event.
- Return type:
- filter(function)
Filters the contents of the
ASyncIterable
based on a function.- Parameters:
function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.
- Returns:
An instance of
ASyncFilter
that yields the filtered objects from theASyncIterable
.- Return type:
ASyncFilter[T]
- logs(to_block)
Get logs up to a given block.
- Parameters:
to_block (int | None) – The ending block to fetch logs to.
- Yields:
A decoded event log.
- Return type:
- sort(*, key=None, reverse=False)
Sort the contents of the
ASyncIterable
.- Parameters:
key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.
reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.
- Returns:
An instance of
ASyncSorter
that will yield the objects yielded from thisASyncIterable
, but sorted.- Return type:
ASyncSorter[T]
- classmethod wrap(wrapped)
Class method to wrap an AsyncIterable for backward compatibility.
- Parameters:
wrapped (AsyncIterable[T])
- Return type:
- __wrapped__: AsyncIterable[T]
The wrapped async iterable object.
- addresses
- property bulk_insert: Callable[[List[Log]], Awaitable[None]]
Get the function for bulk inserting logs into the database.
- Returns:
A function for bulk inserting logs.
- property cache: LogCache
- property executor: _AsyncExecutorMixin
- from_block
- property insert_to_db: Callable[[Log], None]
Get the function for inserting logs into the database.
- Raises:
NotImplementedError – If this method is not implemented in the subclass.
- is_reusable
- property materialized: List[T]
Synchronously iterate through the
ASyncIterable
and return all objects.- Returns:
A list of the objects yielded by the
ASyncIterable
.
- property semaphore: BlockSemaphore
- to_block
- topics
- class y.utils.events.LogFilter[source]
Bases:
Filter
[Log
, LogCache]A filter for fetching and processing event logs.
When awaited, a list of all elements will be returned.
- __aiter__()
Return an async iterator that yields
T
objects from theASyncIterable
.- Return type:
- __await__()
Asynchronously iterate through the
ASyncIterable
and return all objects.
- __init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)[source]
Initialize a LogFilter instance.
- Parameters:
addresses – List of contract addresses to fetch logs from.
topics – List of event topics to filter logs by.
from_block (int | None) – The starting block to fetch logs from.
chunk_size (int) – The number of blocks to fetch in each chunk.
chunks_per_batch (int | None) – The number of chunks to fetch in each batch.
semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.
executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.
is_reusable (bool) – Whether the filter is reusable.
verbose (bool) – Verbosity level for logging.
- Return type:
None
- __iter__()
Return an iterator that yields
T
objects from theASyncIterable
.- Return type:
Iterator[T]
- filter(function)
Filters the contents of the
ASyncIterable
based on a function.- Parameters:
function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.
- Returns:
An instance of
ASyncFilter
that yields the filtered objects from theASyncIterable
.- Return type:
ASyncFilter[T]
- logs(to_block)[source]
Get logs up to a given block.
- Parameters:
to_block (int | None) – The ending block to fetch logs to.
- Yields:
A decoded event log.
- Return type:
- sort(*, key=None, reverse=False)
Sort the contents of the
ASyncIterable
.- Parameters:
key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.
reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.
- Returns:
An instance of
ASyncSorter
that will yield the objects yielded from thisASyncIterable
, but sorted.- Return type:
ASyncSorter[T]
- classmethod wrap(wrapped)
Class method to wrap an AsyncIterable for backward compatibility.
- Parameters:
wrapped (AsyncIterable[T])
- Return type:
- __wrapped__: AsyncIterable[T]
The wrapped async iterable object.
- addresses
- property bulk_insert: Callable[[List[Log]], Awaitable[None]]
Get the function for bulk inserting logs into the database.
- Returns:
A function for bulk inserting logs.
- property cache: LogCache
- property executor: _AsyncExecutorMixin
- from_block
- property insert_to_db: Callable[[Log], None]
Get the function for inserting logs into the database.
- Raises:
NotImplementedError – If this method is not implemented in the subclass.
- is_reusable
- property materialized: List[T]
Synchronously iterate through the
ASyncIterable
and return all objects.- Returns:
A list of the objects yielded by the
ASyncIterable
.
- property semaphore: BlockSemaphore
- to_block
- topics
- class y.utils.events.ProcessedEvents[source]
Bases:
Events
,ASyncIterable
[T
]A class for fetching, processing, and iterating over events.
When awaited, a list of all elements will be returned.
- obj_type
alias of
_EventItem
- __aiter__()
Return an async iterator that yields
T
objects from theASyncIterable
.- Return type:
- __await__()
Asynchronously iterate through the
ASyncIterable
and return all objects.
- __init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)
Initialize a LogFilter instance.
- Parameters:
addresses – List of contract addresses to fetch logs from.
topics – List of event topics to filter logs by.
from_block (int | None) – The starting block to fetch logs from.
chunk_size (int) – The number of blocks to fetch in each chunk.
chunks_per_batch (int | None) – The number of chunks to fetch in each batch.
semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.
executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.
is_reusable (bool) – Whether the filter is reusable.
verbose (bool) – Verbosity level for logging.
- Return type:
None
- __iter__()
Return an iterator that yields
T
objects from theASyncIterable
.- Return type:
Iterator[T]
- events(to_block)
Get events up to a given block.
- Parameters:
to_block (int) – The ending block to fetch events to.
- Yields:
A decoded event.
- Return type:
- filter(function)
Filters the contents of the
ASyncIterable
based on a function.- Parameters:
function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.
- Returns:
An instance of
ASyncFilter
that yields the filtered objects from theASyncIterable
.- Return type:
ASyncFilter[T]
- logs(to_block)
Get logs up to a given block.
- Parameters:
to_block (int | None) – The ending block to fetch logs to.
- Yields:
A decoded event log.
- Return type:
- objects(to_block)[source]
Get an
ASyncIterator
that yields all events up to a given block.- Parameters:
to_block (int) – The ending block to fetch events to.
- Returns:
An
ASyncIterator
that yields all included events.- Return type:
- sort(*, key=None, reverse=False)
Sort the contents of the
ASyncIterable
.- Parameters:
key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.
reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.
- Returns:
An instance of
ASyncSorter
that will yield the objects yielded from thisASyncIterable
, but sorted.- Return type:
ASyncSorter[T]
- classmethod wrap(wrapped)
Class method to wrap an AsyncIterable for backward compatibility.
- Parameters:
wrapped (AsyncIterable[T])
- Return type:
- __wrapped__: AsyncIterable[T]
The wrapped async iterable object.
- addresses
- property bulk_insert: Callable[[List[Log]], Awaitable[None]]
Get the function for bulk inserting logs into the database.
- Returns:
A function for bulk inserting logs.
- property cache: LogCache
- property executor: _AsyncExecutorMixin
- from_block
- property insert_to_db: Callable[[Log], None]
Get the function for inserting logs into the database.
- Raises:
NotImplementedError – If this method is not implemented in the subclass.
- is_reusable
- property materialized: List[T]
Synchronously iterate through the
ASyncIterable
and return all objects.- Returns:
A list of the objects yielded by the
ASyncIterable
.
- property semaphore: BlockSemaphore
- to_block
- topics
- ASyncFunctiony.utils.events._get_logs(address: eth_typing.evm.ChecksumAddress | None, topics: List[str] | None, start: int | eth_typing.evm.BlockNumber, end: int | eth_typing.evm.BlockNumber) List[evmspec.log.Log] [source]
Get logs for a given address, topics, and block range.
- Args:
address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to.
- Returns:
A list of decoded event logs.
Since _get_logs is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
address (ChecksumAddress | None)
start (int | BlockNumber)
end (int | BlockNumber)
- Return type:
- y.utils.events.checkpoints_to_weight(checkpoints, start_block, end_block)[source]
- Parameters:
start_block (int | BlockNumber)
end_block (int | BlockNumber)
- Return type:
- y.utils.events.decode_logs(logs)[source]
Decode logs to events and enrich them with additional info.
- ASyncFunctiony.utils.events.get_logs_asap(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | NoneType, topics: List[str] | None, from_block: int | eth_typing.evm.BlockNumber | NoneType = None, to_block: int | eth_typing.evm.BlockNumber | NoneType = None, verbose: int = 0) List[Any] [source]
Get logs as soon as possible.
- Args:
address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. from_block: The starting block to fetch logs from. to_block: The ending block to fetch logs to. verbose: Verbosity level for logging.
- Returns:
A list of decoded event logs.
Since get_logs_asap is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
address (str | HexBytes | AnyAddress | EthAddress | None)
from_block (int | BlockNumber | None)
to_block (int | BlockNumber | None)
verbose (int)
- Return type:
- async y.utils.events.get_logs_asap_generator(address, topics=None, from_block=None, to_block=None, chronological=True, run_forever=False, run_forever_interval=60, verbose=0)[source]
Get logs as soon as possible in a generator.
- Parameters:
address (str | HexBytes | AnyAddress | EthAddress | None) – The address of the contract to fetch logs from.
topics (List[str] | None) – The event topics to filter logs by.
from_block (int | BlockNumber | None) – The starting block to fetch logs from.
to_block (int | BlockNumber | None) – The ending block to fetch logs to.
chronological (bool) – If True, yield logs in chronological order.
run_forever (bool) – If True, run indefinitely, fetching new logs periodically.
run_forever_interval (int) – The interval in seconds to wait between fetches when running forever.
verbose (int) – Verbosity level for logging.
- Yields:
Lists of decoded event logs.
- Return type:
AsyncGenerator[List[LogReceipt], None]
y.utils.fakes module
y.utils.gather module
Utility functions for gathering method results asynchronously.
- async y.utils.gather.gather_methods(address, methods, *, block=None, return_exceptions=False)[source]
Asynchronously gather results from multiple contract methods.
- Parameters:
- Returns:
A tuple containing the results of the method calls.
- Return type:
Example
>>> from y.utils import gather_methods >>> address = "0x6B175474E89094C44Da98b954EedeAC495271d0F" # DAI >>> methods = ["name()", "symbol()", "decimals()"] >>> results = await gather_methods(address, methods) >>> print(results) ('Dai Stablecoin', 'DAI', 18)
>>> # Using raw method calls >>> methods = ["name()(string)", "symbol()(string)", "decimals()(uint8)"] >>> results = await gather_methods(address, methods) >>> print(results) ('Dai Stablecoin', 'DAI', 18)
y.utils.logging module
- class y.utils.logging.PriceLogger[source]
Bases:
Logger
- __init__(name, level=0)
Initialize the logger with a name and an optional level.
- addFilter(filter)
Add the specified filter to this handler.
- addHandler(hdlr)
Add the specified handler to this logger.
- callHandlers(record)
Pass a record to all relevant handlers.
Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger whose handlers are called.
- critical(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘CRITICAL’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.critical(“Houston, we have a %s”, “major disaster”, exc_info=1)
- debug(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘DEBUG’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=1)
- error(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘ERROR’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.error(“Houston, we have a %s”, “major problem”, exc_info=1)
- exception(msg, *args, exc_info=True, **kwargs)
Convenience method for logging an ERROR with exception information.
- fatal(msg, *args, **kwargs)
Don’t use this method, use critical() instead.
- filter(record)
Determine if a record is loggable by consulting all the filters.
The default is to allow the record to be logged; any filter can veto this and the record is then dropped. Returns a zero value if a record is to be dropped, else non-zero.
Changed in version 3.2: Allow filters to be just callables.
- findCaller(stack_info=False, stacklevel=1)
Find the stack frame of the caller so that we can note the source file name, line number and function name.
- getChild(suffix)
Get a logger which is a descendant to this one.
This is a convenience method, such that
logging.getLogger(‘abc’).getChild(‘def.ghi’)
is the same as
logging.getLogger(‘abc.def.ghi’)
It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.
- getEffectiveLevel()
Get the effective level for this logger.
Loop through this logger and its parents in the logger hierarchy, looking for a non-zero logging level. Return the first one found.
- handle(record)
Call the handlers for the specified record.
This method is used for unpickled records received from a socket, as well as those created locally. Logger-level filtering is applied.
- hasHandlers()
See if this logger has any handlers configured.
Loop through all handlers for this logger and its parents in the logger hierarchy. Return True if a handler was found, else False. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger which is checked for the existence of handlers.
- info(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘INFO’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.info(“Houston, we have a %s”, “interesting problem”, exc_info=1)
- isEnabledFor(level)
Is this logger enabled for level ‘level’?
- log(level, msg, *args, **kwargs)
Log ‘msg % args’ with the integer severity ‘level’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.log(level, “We have a %s”, “mysterious problem”, exc_info=1)
- makeRecord(name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None)
A factory method which can be overridden in subclasses to create specialized LogRecords.
- removeFilter(filter)
Remove the specified filter from this handler.
- removeHandler(hdlr)
Remove the specified handler from this logger.
- setLevel(level)
Set the logging level of this logger. level must be an int or a str.
- warn(msg, *args, **kwargs)
- warning(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘WARNING’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.warning(“Houston, we have a %s”, “bit of a problem”, exc_info=1)
- key: Tuple[str | HexBytes | AnyAddress | EthAddress | Contract | int, int | BlockNumber, str | None, str]
- manager = <logging.Manager object>
- root = <RootLogger root (WARNING)>
- y.utils.logging.enable_debug_logging(logger='y')[source]
- Parameters:
logger (str)
- Return type:
None
- y.utils.logging.get_price_logger(token_address, block, *, symbol=None, extra='', start_task=False)[source]
- Parameters:
token_address (str | HexBytes | AnyAddress | EthAddress | Contract | int)
block (int | BlockNumber)
symbol (str | None)
extra (str)
start_task (bool)
- Return type:
y.utils.middleware module
- y.utils.middleware.getcode_cache_middleware(make_request, web3)[source]
Middleware for caching eth_getCode calls.
- y.utils.middleware.setup_getcode_cache_middleware()[source]
Set up the eth_getCode cache middleware for the current Web3 provider.
This function modifies the Web3 provider to use a custom session with increased connection pool size and timeout, and adds the getcode cache middleware.
- Return type:
None
- y.utils.middleware.setup_geth_poa_middleware()[source]
Set up the geth proof-of-authority middleware for the current Web3 provider.
- Return type:
None
y.utils.multicall module
- ASyncFunctiony.utils.multicall.fetch_multicall(*calls: Any, block: int | eth_typing.evm.BlockNumber | NoneType = None) List[Any | None] [source]
Since fetch_multicall is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
calls (Any)
block (int | BlockNumber | None)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_decimals(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int] [source]
Since multicall_decimals is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
addresses (Iterable[str | HexBytes | AnyAddress | EthAddress | Contract])
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_same_func_no_input(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int], method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any] [source]
Since multicall_same_func_no_input is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
addresses (Iterable[str | HexBytes | AnyAddress | EthAddress | Contract | int])
method (str)
block (int | BlockNumber | None)
apply_func (Callable | None)
return_None_on_failure (bool)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_same_func_same_contract_different_inputs(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int, method: str, inputs: List | Tuple, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any] [source]
Since multicall_same_func_same_contract_different_inputs is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
address (str | HexBytes | AnyAddress | EthAddress | Contract | int)
method (str)
block (int | BlockNumber | None)
apply_func (Callable | None)
return_None_on_failure (bool)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_totalSupply(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int] [source]
Since multicall_totalSupply is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
addresses (Iterable[str | HexBytes | AnyAddress | EthAddress | Contract])
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
y.utils.raw_calls module
- ASyncFunctiony.utils.raw_calls._cached_call_fn(func: Callable, contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType, required_arg=None) Any [source]
Since _cached_call_fn is an
ASyncFunctionAsyncDefault
, you can optionally pass sync=True or asynchronous=False to force it to run synchronously and return a value. Without either kwarg, it will return a coroutine for you to await.- Parameters:
func (Callable)
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
- Return type:
- ASyncFunctiony.utils.raw_calls._decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None [source]
Since _decimals is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
int | None
- ASyncFunctiony.utils.raw_calls._totalSupply(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None [source]
Since _totalSupply is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
int | None
- ASyncFunctiony.utils.raw_calls.balanceOf(call_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, input_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None [source]
Since balanceOf is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
call_address (str | HexBytes | AnyAddress | EthAddress | Contract)
input_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
int | None
- ASyncFunctiony.utils.raw_calls.decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int [source]
Since decimals is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
- y.utils.raw_calls.prepare_data(method, inputs=typing.Union[NoneType, bytes, int, str, hexbytes.main.HexBytes, ~AnyAddress, brownie.convert.datatypes.EthAddress, brownie.network.contract.Contract, y.contracts.Contract])[source]
Prepare data for a raw contract call by encoding the method signature and input data.
This function takes a method signature and input data, and returns a hexadecimal string that can be used as the data field in a raw contract call.
- Parameters:
method – The method signature as a string (e.g., “transfer(address,uint256)”).
inputs – The input data for the method. Can be None, bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.
- Returns:
A hexadecimal string representing the encoded method call data.
- Raises:
CalldataPreparationError – If the input type is not supported.
- Return type:
Note
The method signature is encoded to its 4-byte function selector.
If inputs is None, only the method selector is returned.
For other input types, the input is prepared and appended to the method selector.
- y.utils.raw_calls.prepare_input(input)[source]
Prepare input data for a raw contract call by encoding it to a hexadecimal string.
This function takes various input types and converts them to a hexadecimal string that can be used as part of the data field in a raw contract call.
- Parameters:
input (bytes | int | str | HexBytes | AnyAddress | EthAddress | Contract | Contract) – The input data to be prepared. Can be bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.
- Returns:
A hexadecimal string representing the encoded input data.
- Raises:
CalldataPreparationError – If the input type is not supported.
- Return type:
Note
Bytes input is converted directly to its hexadecimal representation.
Integer input is converted to bytes and then to hexadecimal.
String, Address, EthAddress, brownie.Contract, and Contract inputs are assumed to be addresses and are padded to 32 bytes.
- ASyncFunctiony.utils.raw_calls.raw_call(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, inputs=None, output: str = None, return_None_on_failure: bool = False) Any | None [source]
call a contract with only address and method. Bypasses brownie Contract object formation to save time only works with 1 input, ie balanceOf(address) or getPoolInfo(uint256)
Since raw_call is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
method (str)
block (int | BlockNumber | None)
output (str)
return_None_on_failure (bool)
- Return type:
Any | None
- y.utils.raw_calls.logger = <Logger y.utils.raw_calls (WARNING)>
We use raw calls for commonly used functions because its much faster than using brownie Contracts