y.utils package

Submodules

y.utils.cache module

y.utils.cache.optional_async_diskcache(fn)[source]
Parameters:

fn (Callable[[~P], Awaitable[T]] | Callable[[~P], T])

Return type:

Callable[[~P], Awaitable[T]] | Callable[[~P], T]

y.utils.checks module

Utility functions for performing various checks.

y.utils.checks.hasall(obj, attrs)[source]

Check if an object has all the specified attributes.

Parameters:
  • obj (Any) – The object to check.

  • attrs (Iterable[str]) – An iterable of attribute names to check for.

Returns:

True if the object has all the specified attributes, False otherwise.

Return type:

bool

Example

>>> class TestClass:
...     attr1 = 1
...     attr2 = 2
>>> test_obj = TestClass()
>>> hasall(test_obj, ['attr1', 'attr2'])
True
>>> hasall(test_obj, ['attr1', 'attr3'])
False

y.utils.client module

Utility functions for retrieving Ethereum client information.

y.utils.client.get_ethereum_client()[source]

Get the Ethereum client type for the current connection.

Returns:

A string representing the Ethereum client type.

Return type:

str

Example

>>> from y.utils.client import get_ethereum_client
>>> client = get_ethereum_client()
>>> print(client)
'geth'
y.utils.client.get_ethereum_client_async()[source]

Asynchronously get the Ethereum client type for the current connection.

Returns:

A string representing the Ethereum client type.

Return type:

str

Example

>>> from y.utils.client import get_ethereum_client_async
>>> client = await get_ethereum_client_async()
>>> print(client)
'erigon'

y.utils.dank_mids module

y.utils.events module

class y.utils.events.Events[source]

Bases: LogFilter

A class for fetching and processing events.

When awaited, a list of all elements will be returned.

obj_type

alias of _EventItem

__aiter__()

Return an async iterator that yields T objects from the ASyncIterable.

Return type:

AsyncIterator[T]

__await__()

Asynchronously iterate through the ASyncIterable and return all objects.

Returns:

A list of the objects yielded by the ASyncIterable.

Return type:

Generator[Any, Any, List[T]]

__init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)

Initialize a LogFilter instance.

Parameters:
  • addresses – List of contract addresses to fetch logs from.

  • topics – List of event topics to filter logs by.

  • from_block (int | None) – The starting block to fetch logs from.

  • chunk_size (int) – The number of blocks to fetch in each chunk.

  • chunks_per_batch (int | None) – The number of chunks to fetch in each batch.

  • semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.

  • executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.

  • is_reusable (bool) – Whether the filter is reusable.

  • verbose (bool) – Verbosity level for logging.

Return type:

None

__iter__()

Return an iterator that yields T objects from the ASyncIterable.

Return type:

Iterator[T]

_objects_thru(block)
Parameters:

block (int | None)

Return type:

AsyncIterator[T]

events(to_block)[source]

Get events up to a given block.

Parameters:

to_block (int) – The ending block to fetch events to.

Yields:

A decoded event.

Return type:

ASyncIterator[_EventItem]

filter(function)

Filters the contents of the ASyncIterable based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered objects from the ASyncIterable.

Return type:

ASyncFilter[T]

logs(to_block)

Get logs up to a given block.

Parameters:

to_block (int | None) – The ending block to fetch logs to.

Yields:

A decoded event log.

Return type:

ASyncIterator[Log]

sort(*, key=None, reverse=False)

Sort the contents of the ASyncIterable.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the objects yielded from this ASyncIterable, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(wrapped)

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

__wrapped__: AsyncIterable[T]

The wrapped async iterable object.

addresses
property bulk_insert: Callable[[List[Log]], Awaitable[None]]

Get the function for bulk inserting logs into the database.

Returns:

A function for bulk inserting logs.

property cache: LogCache
property executor: _AsyncExecutorMixin
from_block
property insert_to_db: Callable[[Log], None]

Get the function for inserting logs into the database.

Raises:

NotImplementedError – If this method is not implemented in the subclass.

property is_asleep: bool
is_reusable
property materialized: List[T]

Synchronously iterate through the ASyncIterable and return all objects.

Returns:

A list of the objects yielded by the ASyncIterable.

property semaphore: BlockSemaphore
to_block
topics
class y.utils.events.LogFilter[source]

Bases: Filter[Log, LogCache]

A filter for fetching and processing event logs.

When awaited, a list of all elements will be returned.

__aiter__()

Return an async iterator that yields T objects from the ASyncIterable.

Return type:

AsyncIterator[T]

__await__()

Asynchronously iterate through the ASyncIterable and return all objects.

Returns:

A list of the objects yielded by the ASyncIterable.

Return type:

Generator[Any, Any, List[T]]

__init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)[source]

Initialize a LogFilter instance.

Parameters:
  • addresses – List of contract addresses to fetch logs from.

  • topics – List of event topics to filter logs by.

  • from_block (int | None) – The starting block to fetch logs from.

  • chunk_size (int) – The number of blocks to fetch in each chunk.

  • chunks_per_batch (int | None) – The number of chunks to fetch in each batch.

  • semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.

  • executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.

  • is_reusable (bool) – Whether the filter is reusable.

  • verbose (bool) – Verbosity level for logging.

Return type:

None

__iter__()

Return an iterator that yields T objects from the ASyncIterable.

Return type:

Iterator[T]

_objects_thru(block)
Parameters:

block (int | None)

Return type:

AsyncIterator[T]

filter(function)

Filters the contents of the ASyncIterable based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered objects from the ASyncIterable.

Return type:

ASyncFilter[T]

logs(to_block)[source]

Get logs up to a given block.

Parameters:

to_block (int | None) – The ending block to fetch logs to.

Yields:

A decoded event log.

Return type:

ASyncIterator[Log]

sort(*, key=None, reverse=False)

Sort the contents of the ASyncIterable.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the objects yielded from this ASyncIterable, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(wrapped)

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

__wrapped__: AsyncIterable[T]

The wrapped async iterable object.

addresses
property bulk_insert: Callable[[List[Log]], Awaitable[None]]

Get the function for bulk inserting logs into the database.

Returns:

A function for bulk inserting logs.

property cache: LogCache
property executor: _AsyncExecutorMixin
from_block
property insert_to_db: Callable[[Log], None]

Get the function for inserting logs into the database.

Raises:

NotImplementedError – If this method is not implemented in the subclass.

property is_asleep: bool
is_reusable
property materialized: List[T]

Synchronously iterate through the ASyncIterable and return all objects.

Returns:

A list of the objects yielded by the ASyncIterable.

property semaphore: BlockSemaphore
to_block
topics
class y.utils.events.ProcessedEvents[source]

Bases: Events, ASyncIterable[T]

A class for fetching, processing, and iterating over events.

When awaited, a list of all elements will be returned.

obj_type

alias of _EventItem

__aiter__()

Return an async iterator that yields T objects from the ASyncIterable.

Return type:

AsyncIterator[T]

__await__()

Asynchronously iterate through the ASyncIterable and return all objects.

Returns:

A list of the objects yielded by the ASyncIterable.

Return type:

Generator[Any, Any, List[T]]

__init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)

Initialize a LogFilter instance.

Parameters:
  • addresses – List of contract addresses to fetch logs from.

  • topics – List of event topics to filter logs by.

  • from_block (int | None) – The starting block to fetch logs from.

  • chunk_size (int) – The number of blocks to fetch in each chunk.

  • chunks_per_batch (int | None) – The number of chunks to fetch in each batch.

  • semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.

  • executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.

  • is_reusable (bool) – Whether the filter is reusable.

  • verbose (bool) – Verbosity level for logging.

Return type:

None

__iter__()

Return an iterator that yields T objects from the ASyncIterable.

Return type:

Iterator[T]

_objects_thru(block)
Parameters:

block (int | None)

Return type:

AsyncIterator[T]

events(to_block)

Get events up to a given block.

Parameters:

to_block (int) – The ending block to fetch events to.

Yields:

A decoded event.

Return type:

ASyncIterator[_EventItem]

filter(function)

Filters the contents of the ASyncIterable based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered objects from the ASyncIterable.

Return type:

ASyncFilter[T]

logs(to_block)

Get logs up to a given block.

Parameters:

to_block (int | None) – The ending block to fetch logs to.

Yields:

A decoded event log.

Return type:

ASyncIterator[Log]

objects(to_block)[source]

Get an ASyncIterator that yields all events up to a given block.

Parameters:

to_block (int) – The ending block to fetch events to.

Returns:

An ASyncIterator that yields all included events.

Return type:

ASyncIterator[_EventItem]

sort(*, key=None, reverse=False)

Sort the contents of the ASyncIterable.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the objects yielded from this ASyncIterable, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(wrapped)

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

__wrapped__: AsyncIterable[T]

The wrapped async iterable object.

addresses
property bulk_insert: Callable[[List[Log]], Awaitable[None]]

Get the function for bulk inserting logs into the database.

Returns:

A function for bulk inserting logs.

property cache: LogCache
property executor: _AsyncExecutorMixin
from_block
property insert_to_db: Callable[[Log], None]

Get the function for inserting logs into the database.

Raises:

NotImplementedError – If this method is not implemented in the subclass.

property is_asleep: bool
is_reusable
property materialized: List[T]

Synchronously iterate through the ASyncIterable and return all objects.

Returns:

A list of the objects yielded by the ASyncIterable.

property semaphore: BlockSemaphore
to_block
topics
ASyncFunctiony.utils.events._get_logs(address: eth_typing.evm.ChecksumAddress | None, topics: List[str] | None, start: int | eth_typing.evm.BlockNumber, end: int | eth_typing.evm.BlockNumber) List[evmspec.log.Log][source]

Get logs for a given address, topics, and block range.

Args:

address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to.

Returns:

A list of decoded event logs.

Since _get_logs is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Log]

y.utils.events.checkpoints_to_weight(checkpoints, start_block, end_block)[source]
Parameters:
Return type:

float

y.utils.events.decode_logs(logs)[source]

Decode logs to events and enrich them with additional info.

Parameters:

logs (Iterable[LogReceipt] | Iterable[Log])

Return type:

EventDict

ASyncFunctiony.utils.events.get_logs_asap(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | NoneType, topics: List[str] | None, from_block: int | eth_typing.evm.BlockNumber | NoneType = None, to_block: int | eth_typing.evm.BlockNumber | NoneType = None, verbose: int = 0) List[Any][source]

Get logs as soon as possible.

Args:

address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. from_block: The starting block to fetch logs from. to_block: The ending block to fetch logs to. verbose: Verbosity level for logging.

Returns:

A list of decoded event logs.

Since get_logs_asap is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any]

async y.utils.events.get_logs_asap_generator(address, topics=None, from_block=None, to_block=None, chronological=True, run_forever=False, run_forever_interval=60, verbose=0)[source]

Get logs as soon as possible in a generator.

Parameters:
  • address (str | HexBytes | AnyAddress | EthAddress | None) – The address of the contract to fetch logs from.

  • topics (List[str] | None) – The event topics to filter logs by.

  • from_block (int | BlockNumber | None) – The starting block to fetch logs from.

  • to_block (int | BlockNumber | None) – The ending block to fetch logs to.

  • chronological (bool) – If True, yield logs in chronological order.

  • run_forever (bool) – If True, run indefinitely, fetching new logs periodically.

  • run_forever_interval (int) – The interval in seconds to wait between fetches when running forever.

  • verbose (int) – Verbosity level for logging.

Yields:

Lists of decoded event logs.

Return type:

AsyncGenerator[List[LogReceipt], None]

y.utils.events.logs_to_balance_checkpoints(logs)[source]

Convert Transfer logs to {address: {from_block: balance}} checkpoints.

Return type:

Dict[EthAddress, int]

y.utils.fakes module

y.utils.gather module

Utility functions for gathering method results asynchronously.

async y.utils.gather.gather_methods(address, methods, *, block=None, return_exceptions=False)[source]

Asynchronously gather results from multiple contract methods.

Parameters:
  • address (str) – The contract address.

  • methods (Iterable[str]) – An iterable of method names or encoded method calls.

  • block (int | None) – The block number to query. Defaults to None.

  • return_exceptions (bool) – Whether to return exceptions or raise them. Defaults to False.

Returns:

A tuple containing the results of the method calls.

Return type:

Tuple[Any]

Example

>>> from y.utils import gather_methods
>>> address = "0x6B175474E89094C44Da98b954EedeAC495271d0F"  # DAI
>>> methods = ["name()", "symbol()", "decimals()"]
>>> results = await gather_methods(address, methods)
>>> print(results)
('Dai Stablecoin', 'DAI', 18)
>>> # Using raw method calls
>>> methods = ["name()(string)", "symbol()(string)", "decimals()(uint8)"]
>>> results = await gather_methods(address, methods)
>>> print(results)
('Dai Stablecoin', 'DAI', 18)

y.utils.logging module

class y.utils.logging.PriceLogger[source]

Bases: Logger

__init__(name, level=0)

Initialize the logger with a name and an optional level.

addFilter(filter)

Add the specified filter to this handler.

addHandler(hdlr)

Add the specified handler to this logger.

callHandlers(record)

Pass a record to all relevant handlers.

Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger whose handlers are called.

close()[source]
Return type:

None

critical(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘CRITICAL’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.critical(“Houston, we have a %s”, “major disaster”, exc_info=1)

debug(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘DEBUG’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=1)

error(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘ERROR’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.error(“Houston, we have a %s”, “major problem”, exc_info=1)

exception(msg, *args, exc_info=True, **kwargs)

Convenience method for logging an ERROR with exception information.

fatal(msg, *args, **kwargs)

Don’t use this method, use critical() instead.

filter(record)

Determine if a record is loggable by consulting all the filters.

The default is to allow the record to be logged; any filter can veto this and the record is then dropped. Returns a zero value if a record is to be dropped, else non-zero.

Changed in version 3.2: Allow filters to be just callables.

findCaller(stack_info=False, stacklevel=1)

Find the stack frame of the caller so that we can note the source file name, line number and function name.

getChild(suffix)

Get a logger which is a descendant to this one.

This is a convenience method, such that

logging.getLogger(‘abc’).getChild(‘def.ghi’)

is the same as

logging.getLogger(‘abc.def.ghi’)

It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.

getEffectiveLevel()

Get the effective level for this logger.

Loop through this logger and its parents in the logger hierarchy, looking for a non-zero logging level. Return the first one found.

handle(record)

Call the handlers for the specified record.

This method is used for unpickled records received from a socket, as well as those created locally. Logger-level filtering is applied.

hasHandlers()

See if this logger has any handlers configured.

Loop through all handlers for this logger and its parents in the logger hierarchy. Return True if a handler was found, else False. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger which is checked for the existence of handlers.

info(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘INFO’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.info(“Houston, we have a %s”, “interesting problem”, exc_info=1)

isEnabledFor(level)

Is this logger enabled for level ‘level’?

log(level, msg, *args, **kwargs)

Log ‘msg % args’ with the integer severity ‘level’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.log(level, “We have a %s”, “mysterious problem”, exc_info=1)

makeRecord(name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None)

A factory method which can be overridden in subclasses to create specialized LogRecords.

removeFilter(filter)

Remove the specified filter from this handler.

removeHandler(hdlr)

Remove the specified handler from this logger.

setLevel(level)

Set the logging level of this logger. level must be an int or a str.

warn(msg, *args, **kwargs)
warning(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘WARNING’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.warning(“Houston, we have a %s”, “bit of a problem”, exc_info=1)

address: str
block: int
debug_task: Task[None] | None
enabled: bool
key: Tuple[str | HexBytes | AnyAddress | EthAddress | Contract | int, int | BlockNumber, str | None, str]
manager = <logging.Manager object>
root = <RootLogger root (WARNING)>
y.utils.logging.enable_debug_logging(logger='y')[source]
Parameters:

logger (str)

Return type:

None

y.utils.logging.get_price_logger(token_address, block, *, symbol=None, extra='', start_task=False)[source]
Parameters:
Return type:

PriceLogger

y.utils.middleware module

y.utils.middleware.getcode_cache_middleware(make_request, web3)[source]

Middleware for caching eth_getCode calls.

Parameters:
  • make_request (Callable) – The original request function.

  • web3 (Web3) – The Web3 instance.

Returns:

A middleware function that caches eth_getCode calls.

Return type:

Callable

y.utils.middleware.setup_getcode_cache_middleware()[source]

Set up the eth_getCode cache middleware for the current Web3 provider.

This function modifies the Web3 provider to use a custom session with increased connection pool size and timeout, and adds the getcode cache middleware.

Return type:

None

y.utils.middleware.setup_geth_poa_middleware()[source]

Set up the geth proof-of-authority middleware for the current Web3 provider.

Return type:

None

y.utils.middleware.should_cache(method, params)[source]

Determine if a method call should be cached.

Parameters:
  • method (str) – The name of the method being called.

  • params (Any) – The parameters of the method call.

Returns:

True if the method call should be cached, False otherwise.

Return type:

bool

y.utils.multicall module

ASyncFunctiony.utils.multicall.fetch_multicall(*calls: Any, block: int | eth_typing.evm.BlockNumber | NoneType = None) List[Any | None][source]

Since fetch_multicall is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any | None]

ASyncFunctiony.utils.multicall.multicall_decimals(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int][source]

Since multicall_decimals is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[int]

ASyncFunctiony.utils.multicall.multicall_same_func_no_input(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int], method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any][source]

Since multicall_same_func_no_input is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any]

ASyncFunctiony.utils.multicall.multicall_same_func_same_contract_different_inputs(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int, method: str, inputs: List | Tuple, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any][source]

Since multicall_same_func_same_contract_different_inputs is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any]

ASyncFunctiony.utils.multicall.multicall_totalSupply(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int][source]

Since multicall_totalSupply is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[int]

y.utils.raw_calls module

ASyncFunctiony.utils.raw_calls._cached_call_fn(func: Callable, contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType, required_arg=None) Any[source]

Since _cached_call_fn is an ASyncFunctionAsyncDefault, you can optionally pass sync=True or asynchronous=False to force it to run synchronously and return a value. Without either kwarg, it will return a coroutine for you to await.

Parameters:
Return type:

Any

ASyncFunctiony.utils.raw_calls._decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None[source]

Since _decimals is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int | None

ASyncFunctiony.utils.raw_calls._totalSupply(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None[source]

Since _totalSupply is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int | None

ASyncFunctiony.utils.raw_calls.balanceOf(call_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, input_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None[source]

Since balanceOf is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int | None

ASyncFunctiony.utils.raw_calls.decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int[source]

Since decimals is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int

y.utils.raw_calls.prepare_data(method, inputs=typing.Union[NoneType, bytes, int, str, hexbytes.main.HexBytes, ~AnyAddress, brownie.convert.datatypes.EthAddress, brownie.network.contract.Contract, y.contracts.Contract])[source]

Prepare data for a raw contract call by encoding the method signature and input data.

This function takes a method signature and input data, and returns a hexadecimal string that can be used as the data field in a raw contract call.

Parameters:
  • method – The method signature as a string (e.g., “transfer(address,uint256)”).

  • inputs – The input data for the method. Can be None, bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.

Returns:

A hexadecimal string representing the encoded method call data.

Raises:

CalldataPreparationError – If the input type is not supported.

Return type:

str

Note

  • The method signature is encoded to its 4-byte function selector.

  • If inputs is None, only the method selector is returned.

  • For other input types, the input is prepared and appended to the method selector.

y.utils.raw_calls.prepare_input(input)[source]

Prepare input data for a raw contract call by encoding it to a hexadecimal string.

This function takes various input types and converts them to a hexadecimal string that can be used as part of the data field in a raw contract call.

Parameters:

input (bytes | int | str | HexBytes | AnyAddress | EthAddress | Contract | Contract) – The input data to be prepared. Can be bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.

Returns:

A hexadecimal string representing the encoded input data.

Raises:

CalldataPreparationError – If the input type is not supported.

Return type:

str

Note

  • Bytes input is converted directly to its hexadecimal representation.

  • Integer input is converted to bytes and then to hexadecimal.

  • String, Address, EthAddress, brownie.Contract, and Contract inputs are assumed to be addresses and are padded to 32 bytes.

ASyncFunctiony.utils.raw_calls.raw_call(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, inputs=None, output: str = None, return_None_on_failure: bool = False) Any | None[source]

call a contract with only address and method. Bypasses brownie Contract object formation to save time only works with 1 input, ie balanceOf(address) or getPoolInfo(uint256)

Since raw_call is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

Any | None

y.utils.raw_calls.logger = <Logger y.utils.raw_calls (WARNING)>

We use raw calls for commonly used functions because its much faster than using brownie Contracts

Module contents