y.utils package

Submodules

y.utils.cache module

y.utils.cache.optional_async_diskcache(fn)[source]

Decorator to optionally apply disk caching to asynchronous functions.

If toolcache is not installed, this decorator will return the function as is, without any caching applied.

Parameters:

fn (Callable[[~P], Awaitable[T]] | Callable[[~P], T]) – The function to be decorated.

Return type:

Callable[[~P], Awaitable[T]] | Callable[[~P], T]

Examples

Using the decorator with an asynchronous function:

>>> async def fetch_data():
...     return "data"
>>> fetch_data = optional_async_diskcache(fetch_data)

See also

  • toolcache

y.utils.checks module

Utility functions for performing various checks.

y.utils.checks.hasall(obj, attrs)[source]

Check if an object has all the specified attributes.

Parameters:
  • obj (Any) – The object to check.

  • attrs (Iterable[str]) – An iterable of attribute names to check for.

Returns:

True if the object has all the specified attributes, False otherwise.

Return type:

bool

Example

>>> class TestClass:
...     attr1 = 1
...     attr2 = 2
>>> test_obj = TestClass()
>>> hasall(test_obj, ['attr1', 'attr2'])
True
>>> hasall(test_obj, ['attr1', 'attr3'])
False

y.utils.client module

Utility functions for retrieving Ethereum client information.

y.utils.client.get_ethereum_client()[source]

Get the Ethereum client type for the current connection.

Returns:

A string representing the Ethereum client type, such as ‘geth’, ‘erigon’, or ‘tg’.

Return type:

str

Examples

>>> from y.utils.client import get_ethereum_client
>>> client = get_ethereum_client()
>>> print(client)
'geth'
>>> # Example with a different client
>>> client = get_ethereum_client()
>>> print(client)
'erigon'

See also

y.utils.client.get_ethereum_client_async()[source]

Asynchronously get the Ethereum client type for the current connection.

Returns:

A string representing the Ethereum client type, such as ‘geth’, ‘erigon’, or ‘tg’.

Return type:

str

Examples

>>> from y.utils.client import get_ethereum_client_async
>>> client = await get_ethereum_client_async()
>>> print(client)
'erigon'
>>> # Example with a different client
>>> client = await get_ethereum_client_async()
>>> print(client)
'tg'

See also

y.utils.dank_mids module

y.utils.events module

class y.utils.events.Events[source]

Bases: LogFilter

A class for fetching and processing events.

This class extends LogFilter to provide additional functionality for handling events.

When awaited, a list of all Log will be returned.

Example

>>> my_object = Events(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], Log)
True
obj_type

alias of _EventItem

__aiter__(self) AsyncIterator[T]

Return an async iterator that yields T objects from the ASyncIterable.

Return type:

AsyncIterator[T]

__await__(self) Generator[Any, Any, List[T]]

Asynchronously iterate through the ASyncIterable and return all T objects.

Returns:

A list of the T objects yielded by the ASyncIterable.

Return type:

Generator[Any, Any, List[T]]

__init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)

Initialize a LogFilter instance.

Parameters:
  • addresses – List of contract addresses to fetch logs from.

  • topics – List of event topics to filter logs by.

  • from_block (int | None) – The starting block to fetch logs from.

  • chunk_size (int) – The number of blocks to fetch in each chunk.

  • chunks_per_batch (int | None) – The number of chunks to fetch in each batch.

  • semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.

  • executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.

  • is_reusable (bool) – Whether the filter is reusable.

  • verbose (bool) – Verbosity level for logging.

Return type:

None

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> logs = log_filter.logs(1000100)
>>> print(logs)
__iter__(self) Iterator[T]

Return an iterator that yields T objects from the ASyncIterable.

Note

Synchronous iteration leverages ASyncIterator, which uses asyncio.BaseEventLoop.run_until_complete() to fetch items. ASyncIterator.__next__() raises a SyncModeInAsyncContextError if the event loop is already running.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Iterator[T]

_objects_thru(block)
Parameters:

block (int | None)

Return type:

AsyncIterator[T]

events(to_block)[source]

Get events up to a given block.

Parameters:

to_block (int) – The ending block to fetch events to.

Yields:

A decoded event.

Return type:

ASyncIterator[_EventItem]

Examples

>>> events = Events(addresses=["0x1234..."], topics=["0x5678..."])
>>> async for event in events.events(1000100):
...     print(event)
filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the T objects yielded by the ASyncIterable based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered T objects from the ASyncIterable.

Return type:

ASyncFilter[T]

logs(to_block)

Get logs up to a given block.

Parameters:

to_block (int | None) – The ending block to fetch logs to.

Yields:

A raw log.

Return type:

ASyncIterator[Log]

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> logs = log_filter.logs(1000100)
>>> print(logs)
sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the T objects yielded by the ASyncIterable.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the T objects yielded from this ASyncIterable, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

__wrapped__: AsyncIterable[T]
addresses
property bulk_insert: Callable[[List[Log]], Awaitable[None]]

Get the function for bulk inserting logs into the database.

Returns:

A function for bulk inserting logs.

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> await log_filter.bulk_insert(logs)
property cache: LogCache
property executor: AsyncThreadPoolExecutor
from_block
property insert_to_db: Callable[[Log], None]

Get the function for inserting logs into the database.

Raises:

NotImplementedError – If this method is not implemented in the subclass.

property is_asleep: bool
is_reusable
property materialized: List[T]

Synchronously iterate through the ASyncIterable and return all T objects.

Returns:

A list of the T objects yielded by the ASyncIterable.

property semaphore: BlockSemaphore
to_block
topics
class y.utils.events.LogFilter[source]

Bases: Filter[Log, LogCache]

A filter for fetching and processing event logs.

This class provides methods to fetch logs from the blockchain and process them.

When awaited, a list of all Log will be returned.

Example

>>> my_object = LogFilter(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], Log)
True
__aiter__(self) AsyncIterator[T]

Return an async iterator that yields T objects from the ASyncIterable.

Return type:

AsyncIterator[T]

__await__(self) Generator[Any, Any, List[T]]

Asynchronously iterate through the ASyncIterable and return all T objects.

Returns:

A list of the T objects yielded by the ASyncIterable.

Return type:

Generator[Any, Any, List[T]]

__init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)[source]

Initialize a LogFilter instance.

Parameters:
  • addresses – List of contract addresses to fetch logs from.

  • topics – List of event topics to filter logs by.

  • from_block (int | None) – The starting block to fetch logs from.

  • chunk_size (int) – The number of blocks to fetch in each chunk.

  • chunks_per_batch (int | None) – The number of chunks to fetch in each batch.

  • semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.

  • executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.

  • is_reusable (bool) – Whether the filter is reusable.

  • verbose (bool) – Verbosity level for logging.

Return type:

None

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> logs = log_filter.logs(1000100)
>>> print(logs)
__iter__(self) Iterator[T]

Return an iterator that yields T objects from the ASyncIterable.

Note

Synchronous iteration leverages ASyncIterator, which uses asyncio.BaseEventLoop.run_until_complete() to fetch items. ASyncIterator.__next__() raises a SyncModeInAsyncContextError if the event loop is already running.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Iterator[T]

_objects_thru(block)
Parameters:

block (int | None)

Return type:

AsyncIterator[T]

filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the T objects yielded by the ASyncIterable based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered T objects from the ASyncIterable.

Return type:

ASyncFilter[T]

logs(to_block)[source]

Get logs up to a given block.

Parameters:

to_block (int | None) – The ending block to fetch logs to.

Yields:

A raw log.

Return type:

ASyncIterator[Log]

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> logs = log_filter.logs(1000100)
>>> print(logs)
sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the T objects yielded by the ASyncIterable.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the T objects yielded from this ASyncIterable, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

__wrapped__: AsyncIterable[T]
addresses
property bulk_insert: Callable[[List[Log]], Awaitable[None]]

Get the function for bulk inserting logs into the database.

Returns:

A function for bulk inserting logs.

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> await log_filter.bulk_insert(logs)
property cache: LogCache
property executor: AsyncThreadPoolExecutor
from_block
property insert_to_db: Callable[[Log], None]

Get the function for inserting logs into the database.

Raises:

NotImplementedError – If this method is not implemented in the subclass.

property is_asleep: bool
is_reusable
property materialized: List[T]

Synchronously iterate through the ASyncIterable and return all T objects.

Returns:

A list of the T objects yielded by the ASyncIterable.

property semaphore: BlockSemaphore
to_block
topics
class y.utils.events.ProcessedEvents[source]

Bases: Events, ASyncIterable[T]

A class for fetching, processing, and iterating over events.

This class extends Events to provide additional functionality for processing events.

When awaited, a list of all T objects will be returned.

Example

>>> my_object = ProcessedEvents(...)
>>> all_contents = await my_object
>>> isinstance(all_contents, list)
True
>>> isinstance(all_contents[0], T)
True
obj_type

alias of _EventItem

__aiter__(self) AsyncIterator[T]

Return an async iterator that yields T objects from the ASyncIterable.

Return type:

AsyncIterator[T]

__await__(self) Generator[Any, Any, List[T]]

Asynchronously iterate through the ASyncIterable and return all T objects.

Returns:

A list of the T objects yielded by the ASyncIterable.

Return type:

Generator[Any, Any, List[T]]

__init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)

Initialize a LogFilter instance.

Parameters:
  • addresses – List of contract addresses to fetch logs from.

  • topics – List of event topics to filter logs by.

  • from_block (int | None) – The starting block to fetch logs from.

  • chunk_size (int) – The number of blocks to fetch in each chunk.

  • chunks_per_batch (int | None) – The number of chunks to fetch in each batch.

  • semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.

  • executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.

  • is_reusable (bool) – Whether the filter is reusable.

  • verbose (bool) – Verbosity level for logging.

Return type:

None

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> logs = log_filter.logs(1000100)
>>> print(logs)
__iter__(self) Iterator[T]

Return an iterator that yields T objects from the ASyncIterable.

Note

Synchronous iteration leverages ASyncIterator, which uses asyncio.BaseEventLoop.run_until_complete() to fetch items. ASyncIterator.__next__() raises a SyncModeInAsyncContextError if the event loop is already running.

If you encounter a SyncModeInAsyncContextError, you are likely working in an async codebase and should consider asynchronous iteration using __aiter__() and __anext__() instead.

Return type:

Iterator[T]

_objects_thru(block)
Parameters:

block (int | None)

Return type:

AsyncIterator[T]

events(to_block)

Get events up to a given block.

Parameters:

to_block (int) – The ending block to fetch events to.

Yields:

A decoded event.

Return type:

ASyncIterator[_EventItem]

Examples

>>> events = Events(addresses=["0x1234..."], topics=["0x5678..."])
>>> async for event in events.events(1000100):
...     print(event)
filter(self, function: ViewFn[T]) 'ASyncFilter[T]'

Filters the T objects yielded by the ASyncIterable based on a function.

Parameters:

function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.

Returns:

An instance of ASyncFilter that yields the filtered T objects from the ASyncIterable.

Return type:

ASyncFilter[T]

logs(to_block)

Get logs up to a given block.

Parameters:

to_block (int | None) – The ending block to fetch logs to.

Yields:

A raw log.

Return type:

ASyncIterator[Log]

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> logs = log_filter.logs(1000100)
>>> print(logs)
objects(to_block)[source]

Get an ASyncIterator that yields all events up to a given block.

Parameters:

to_block (int) – The ending block to fetch events to.

Returns:

An ASyncIterator that yields all included events.

Return type:

ASyncIterator[_EventItem]

Examples

>>> processed_events = ProcessedEvents(addresses=["0x1234..."], topics=["0x5678..."])
>>> async for event in processed_events.objects(1000100):
...     print(event)
sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'

Sort the T objects yielded by the ASyncIterable.

Parameters:
  • key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.

  • reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.

Returns:

An instance of ASyncSorter that will yield the T objects yielded from this ASyncIterable, but sorted.

Return type:

ASyncSorter[T]

classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'

Class method to wrap an AsyncIterable for backward compatibility.

Parameters:

wrapped (AsyncIterable[T])

Return type:

ASyncIterable[T]

__wrapped__: AsyncIterable[T]
addresses
property bulk_insert: Callable[[List[Log]], Awaitable[None]]

Get the function for bulk inserting logs into the database.

Returns:

A function for bulk inserting logs.

Examples

>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."])
>>> await log_filter.bulk_insert(logs)
property cache: LogCache
property executor: AsyncThreadPoolExecutor
from_block
property insert_to_db: Callable[[Log], None]

Get the function for inserting logs into the database.

Raises:

NotImplementedError – If this method is not implemented in the subclass.

property is_asleep: bool
is_reusable
property materialized: List[T]

Synchronously iterate through the ASyncIterable and return all T objects.

Returns:

A list of the T objects yielded by the ASyncIterable.

property semaphore: BlockSemaphore
to_block
topics
ASyncFunctiony.utils.events._get_logs(address: eth_typing.evm.ChecksumAddress | None, topics: List[str] | None, start: int | eth_typing.evm.BlockNumber, end: int | eth_typing.evm.BlockNumber) List[evmspec.structs.log.Log][source]

Get logs for a given address, topics, and block range.

Args:

address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to.

Returns:

A list of raw logs.

Examples:
>>> logs = _get_logs("0x1234...", ["0x5678..."], 1000000, 1000100)
>>> print(logs)

Since _get_logs is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Log]

y.utils.events.checkpoints_to_weight(checkpoints, start_block, end_block)[source]

Calculate the weight of checkpoints between two blocks.

Parameters:
  • checkpoints – A dictionary of checkpoints.

  • start_block (int | BlockNumber) – The starting block number.

  • end_block (int | BlockNumber) – The ending block number.

Returns:

The calculated weight.

Return type:

float

Examples

>>> checkpoints = {0: 100, 10: 200}
>>> weight = checkpoints_to_weight(checkpoints, 0, 10)
>>> print(weight)
y.utils.events.decode_logs(logs)[source]

Decode logs to events and enrich them with additional info.

Parameters:

logs (Iterable[LogReceipt] | Iterable[Log]) – An iterable of LogReceipt or Log objects.

Returns:

An EventDict containing decoded events.

Return type:

EventDict

Examples

>>> logs = [LogReceipt(...), LogReceipt(...)]
>>> events = decode_logs(logs)
>>> print(events)

See also

ASyncFunctiony.utils.events.get_logs_asap(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | NoneType, topics: List[str] | None, from_block: int | eth_typing.evm.BlockNumber | NoneType = None, to_block: int | eth_typing.evm.BlockNumber | NoneType = None, verbose: int = 0) List[Any][source]

Get logs as soon as possible.

This function fetches raw logs from the blockchain within the specified block range. The logs are not decoded; use decode_logs() to decode them if needed.

Args:

address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. from_block: The starting block to fetch logs from. to_block: The ending block to fetch logs to. verbose: Verbosity level for logging.

Returns:

A list of raw logs.

Examples:

Synchronous usage: >>> logs = get_logs_asap(“0x1234…”, [“0x5678…”], 1000000, 1000100) >>> decoded_logs = decode_logs(logs)

Asynchronous usage: >>> logs = await get_logs_asap(“0x1234…”, [“0x5678…”], 1000000, 1000100, sync=False) >>> decoded_logs = decode_logs(logs)

See Also:

Since get_logs_asap is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any]

async y.utils.events.get_logs_asap_generator(address, topics=None, from_block=None, to_block=None, chronological=True, run_forever=False, run_forever_interval=60, verbose=0)[source]

Get logs as soon as possible in a generator.

This function fetches raw logs from the blockchain within the specified block range and yields them as they are retrieved. The logs are not decoded; use decode_logs() to decode them if needed.

Parameters:
  • address (str | HexBytes | AnyAddress | EthAddress | None) – The address of the contract to fetch logs from.

  • topics (List[str] | None) – The event topics to filter logs by.

  • from_block (int | BlockNumber | None) – The starting block to fetch logs from.

  • to_block (int | BlockNumber | None) – The ending block to fetch logs to.

  • chronological (bool) – If True, yield logs in chronological order.

  • run_forever (bool) – If True, run indefinitely, fetching new logs periodically.

  • run_forever_interval (int) – The interval in seconds to wait between fetches when running forever.

  • verbose (int) – Verbosity level for logging.

Yields:

Lists of raw logs.

Return type:

AsyncGenerator[List[LogReceipt], None]

Examples

>>> async for logs in get_logs_asap_generator("0x1234...", ["0x5678..."], 1000000, 1000100):
...     decoded_logs = decode_logs(logs)

See also

y.utils.events.logs_to_balance_checkpoints(logs)[source]

Convert Transfer logs to {address: {from_block: balance}} checkpoints.

Parameters:

logs – An iterable of logs to convert.

Returns:

A dictionary mapping addresses to balance checkpoints.

Return type:

Dict[ChecksumAddress, int]

Examples

>>> logs = [Log(...), Log(...)]
>>> checkpoints = logs_to_balance_checkpoints(logs)
>>> print(checkpoints)

y.utils.fakes module

y.utils.fakes.FAKE_TOKENS = ['0xB1D75F4Fb67c7e93f05890f5eEAC2F3884991FF9', '0xcD545222eBf01143c1188bF712ee5e89c4278Afa', '0x6B2751Cd339217B2CAeD3485fc7a92256681053F', '0x224e13dF4b4DbF41820ec848B19bB6f015F8bf7b', '0x719A75aa3Dc05DEF57Be2F3eC0f4098475631D1c']

FAKE_TOKENS is a dictionary that maps network identifiers to lists of fake token addresses.

This dictionary is used to provide a set of fake token addresses for different blockchain networks. The keys are network identifiers from the Network enum, and the values are lists of fake token addresses represented as strings.

Examples

>>> from y.utils.fakes import FAKE_TOKENS
>>> FAKE_TOKENS[Network.Mainnet]
['0xB1D75F4Fb67c7e93f05890f5eEAC2F3884991FF9', '0xcD545222eBf01143c1188bF712ee5e89c4278Afa', ...]
>>> FAKE_TOKENS[Network.Fantom]
['0x1B27A9dE6a775F98aaA5B90B62a4e2A0B84DbDd9', '0x6E0aA9718C56Ef5d19ccf57955284C7CD95737be', ...]

See also

  • Network for network ID definitions.

  • brownie for blockchain interaction.

y.utils.gather module

Utility functions for gathering method results asynchronously.

async y.utils.gather.gather_methods(address, methods, *, block=None, return_exceptions=False)[source]

Asynchronously gather results from multiple contract methods.

Parameters:
  • address (str) – The contract address.

  • methods (Iterable[str]) – An iterable of method names or encoded method calls.

  • block (int | None) – The block number to query. Defaults to None.

  • return_exceptions (bool) – Whether to return exceptions or raise them. Defaults to False.

Returns:

A tuple containing the results of the method calls.

Return type:

Tuple[Any]

Example

>>> from y.utils import gather_methods
>>> address = "0x6B175474E89094C44Da98b954EedeAC495271d0F"  # DAI
>>> methods = ["name()", "symbol()", "decimals()"]
>>> results = await gather_methods(address, methods)
>>> print(results)
('Dai Stablecoin', 'DAI', 18)
>>> # Using raw method calls
>>> methods = ["name()(string)", "symbol()(string)", "decimals()(uint8)"]
>>> results = await gather_methods(address, methods)
>>> print(results)
('Dai Stablecoin', 'DAI', 18)

y.utils.logging module

class y.utils.logging.PriceLogger[source]

Bases: Logger

__init__(name, level=0)

Initialize the logger with a name and an optional level.

addFilter(filter)

Add the specified filter to this handler.

addHandler(hdlr)

Add the specified handler to this logger.

callHandlers(record)

Pass a record to all relevant handlers.

Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger whose handlers are called.

close()[source]
Return type:

None

critical(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘CRITICAL’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.critical(“Houston, we have a %s”, “major disaster”, exc_info=1)

debug(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘DEBUG’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=1)

error(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘ERROR’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.error(“Houston, we have a %s”, “major problem”, exc_info=1)

exception(msg, *args, exc_info=True, **kwargs)

Convenience method for logging an ERROR with exception information.

fatal(msg, *args, **kwargs)

Don’t use this method, use critical() instead.

filter(record)

Determine if a record is loggable by consulting all the filters.

The default is to allow the record to be logged; any filter can veto this and the record is then dropped. Returns a zero value if a record is to be dropped, else non-zero.

Changed in version 3.2: Allow filters to be just callables.

findCaller(stack_info=False, stacklevel=1)

Find the stack frame of the caller so that we can note the source file name, line number and function name.

getChild(suffix)

Get a logger which is a descendant to this one.

This is a convenience method, such that

logging.getLogger(‘abc’).getChild(‘def.ghi’)

is the same as

logging.getLogger(‘abc.def.ghi’)

It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.

getEffectiveLevel()

Get the effective level for this logger.

Loop through this logger and its parents in the logger hierarchy, looking for a non-zero logging level. Return the first one found.

handle(record)

Call the handlers for the specified record.

This method is used for unpickled records received from a socket, as well as those created locally. Logger-level filtering is applied.

hasHandlers()

See if this logger has any handlers configured.

Loop through all handlers for this logger and its parents in the logger hierarchy. Return True if a handler was found, else False. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger which is checked for the existence of handlers.

info(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘INFO’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.info(“Houston, we have a %s”, “interesting problem”, exc_info=1)

isEnabledFor(level)

Is this logger enabled for level ‘level’?

log(level, msg, *args, **kwargs)

Log ‘msg % args’ with the integer severity ‘level’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.log(level, “We have a %s”, “mysterious problem”, exc_info=1)

makeRecord(name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None)

A factory method which can be overridden in subclasses to create specialized LogRecords.

removeFilter(filter)

Remove the specified filter from this handler.

removeHandler(hdlr)

Remove the specified handler from this logger.

setLevel(level)

Set the logging level of this logger. level must be an int or a str.

warn(msg, *args, **kwargs)
warning(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘WARNING’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.warning(“Houston, we have a %s”, “bit of a problem”, exc_info=1)

address: str
block: int
debug_task: Task[None] | None
enabled: bool
key: Tuple[str | HexBytes | AnyAddress | EthAddress | Contract | int, int | BlockNumber, str | None, str]
manager = <logging.Manager object>
root = <RootLogger root (WARNING)>
y.utils.logging.enable_debug_logging(logger='y')[source]

Enables ypricemagic’s debugging mode. Very verbose.

Parameters:

logger (str) – The name of the logger to enable debugging for. Defaults to “y”.

Return type:

None

Example

>>> enable_debug_logging("y")
y.utils.logging.get_price_logger(token_address, block, *, symbol=None, extra='', start_task=False)[source]

Create or retrieve a PriceLogger instance for a given token address and block.

This function manages a cache of loggers to ensure they have the proper members for ypricemagic. If a logger is enabled for DEBUG, it will start a debug task if specified.

Parameters:
  • token_address (str | HexBytes | AnyAddress | EthAddress | Contract | int) – The address of the token.

  • block (int | BlockNumber) – The block number.

  • symbol (str | None) – An optional symbol for the token.

  • extra (str) – An optional extra string to append to the logger name.

  • start_task (bool) – Whether to start a debug task. Defaults to False.

Return type:

PriceLogger

Example

>>> logger = get_price_logger("0xTokenAddress", 123456)
>>> logger.debug("This is a debug message.")

y.utils.middleware module

y.utils.middleware.getcode_cache_middleware(make_request, web3)[source]

Middleware for caching eth_getCode calls.

Parameters:
  • make_request (Callable) – The original request function.

  • web3 (Web3) – The Web3 instance.

Returns:

A middleware function that caches eth_getCode calls.

Return type:

Callable

Examples

>>> from web3 import Web3
>>> w3 = Web3(Web3.HTTPProvider('http://localhost:8545'))
>>> middleware = getcode_cache_middleware(w3.manager.request_blocking, w3)
>>> w3.middleware_onion.add(middleware)

See also

y.utils.middleware.setup_getcode_cache_middleware()[source]

Set up the eth_getCode cache middleware for the current Web3 provider.

This function modifies the Web3 provider to use a custom session with increased connection pool size and timeout, if the provider’s endpoint URI starts with “http” or “https”. If the provider is an IPCProvider, it does not modify the session.

Examples

>>> setup_getcode_cache_middleware()
Return type:

None

y.utils.middleware.setup_geth_poa_middleware()[source]

Set up the geth proof-of-authority middleware for the current Web3 provider.

Examples

>>> setup_geth_poa_middleware()

See also

  • web3.middleware.geth_poa.geth_poa_middleware()

Return type:

None

y.utils.middleware.should_cache(method, params)[source]

Determine if a method call should be cached.

Parameters:
  • method (str) – The name of the method being called.

  • params (Any) – The parameters of the method call.

Returns:

True if the method call should be cached, False otherwise.

Return type:

bool

Examples

>>> should_cache("eth_getCode", ["0x1234", "latest"])
True
>>> should_cache("eth_getBalance", ["0x1234", "latest"])
False

y.utils.multicall module

ASyncFunctiony.utils.multicall.fetch_multicall(*calls: Any, block: int | eth_typing.evm.BlockNumber | NoneType = None) List[Any | None][source]

Since fetch_multicall is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any | None]

ASyncFunctiony.utils.multicall.multicall_decimals(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int][source]

Since multicall_decimals is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[int]

ASyncFunctiony.utils.multicall.multicall_same_func_no_input(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int], method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any][source]

Since multicall_same_func_no_input is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any]

ASyncFunctiony.utils.multicall.multicall_same_func_same_contract_different_inputs(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int, method: str, inputs: List | Tuple, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any][source]

Since multicall_same_func_same_contract_different_inputs is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[Any]

ASyncFunctiony.utils.multicall.multicall_totalSupply(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int][source]

Since multicall_totalSupply is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

List[int]

y.utils.raw_calls module

ASyncFunctiony.utils.raw_calls._cached_call_fn(func: Callable, contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType, required_arg=None) Any[source]

Since _cached_call_fn is an ASyncFunctionAsyncDefault, you can optionally pass sync=True or asynchronous=False to force it to run synchronously and return a value. Without either kwarg, it will return a coroutine for you to await.

Parameters:
Return type:

Any

ASyncFunctiony.utils.raw_calls._decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None[source]

Since _decimals is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int | None

ASyncFunctiony.utils.raw_calls._totalSupply(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None[source]

Since _totalSupply is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int | None

ASyncFunctiony.utils.raw_calls.balanceOf(call_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, input_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None[source]

Since balanceOf is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int | None

ASyncFunctiony.utils.raw_calls.decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int[source]

Since decimals is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

int

y.utils.raw_calls.prepare_data(method, inputs=typing.Union[NoneType, bytes, int, str, hexbytes.main.HexBytes, ~AnyAddress, brownie.convert.datatypes.EthAddress, brownie.network.contract.Contract, y.contracts.Contract])[source]

Prepare data for a raw contract call by encoding the method signature and input data.

This function takes a method signature and input data, and returns a hexadecimal string that can be used as the data field in a raw contract call.

Parameters:
  • method – The method signature as a string (e.g., “transfer(address,uint256)”).

  • inputs – The input data for the method. Can be None, bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.

Returns:

A hexadecimal string representing the encoded method call data.

Raises:

CalldataPreparationError – If the input type is not supported.

Return type:

str

Note

  • The method signature is encoded to its 4-byte function selector.

  • If inputs is None, only the method selector is returned.

  • For other input types, the input is prepared and appended to the method selector.

Examples

>>> prepare_data("transfer(address,uint256)", inputs=("0xRecipientAddress", 1000))
'0xa9059cbb000000000000000000000000RecipientAddress00000000000000000000000000000000000003e8'
>>> prepare_data("decimals()")
'0x313ce567'

See also

y.utils.raw_calls.prepare_input(input)[source]

Prepare input data for a raw contract call by encoding it to a hexadecimal string.

This function takes various input types and converts them to a hexadecimal string that can be used as part of the data field in a raw contract call.

Parameters:

input (bytes | int | str | HexBytes | AnyAddress | EthAddress | Contract | Contract) – The input data to be prepared. Can be bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.

Returns:

A hexadecimal string representing the encoded input data.

Raises:

CalldataPreparationError – If the input type is not supported.

Return type:

str

Note

  • Bytes input is converted directly to its hexadecimal representation.

  • Integer input is converted to bytes and then to hexadecimal.

  • String, Address, EthAddress, brownie.Contract, and Contract inputs are assumed to be addresses and are padded to 32 bytes.

Examples

>>> prepare_input("0xRecipientAddress")
'000000000000000000000000RecipientAddress'
>>> prepare_input(1000)
'00000000000000000000000000000000000000000000000000000000000003e8'

See also

ASyncFunctiony.utils.raw_calls.raw_call(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, inputs=None, output: str = None, return_None_on_failure: bool = False) Any | None[source]

Call a contract with only address and method. Bypasses brownie Contract object formation to save time.

This function allows for direct interaction with a contract by preparing and sending the call data without creating a full contract object. It supports various input types and can handle multiple inputs.

Args:

contract_address: The address of the contract. method: The method signature as a string (e.g., “transfer(address,uint256)”). block: The block number at which to make the call. Defaults to the latest block. inputs: The input data for the method. Can be None, bytes, int, str, Address,

EthAddress, brownie.Contract, or Contract.

output: The expected output type. Options are “address”, “int”, “str”. return_None_on_failure: If True, return None if the call fails. Default False.

Examples:
>>> await raw_call("0xTokenAddress", "balanceOf(address)", inputs="0xHolderAddress")
1000000000000000000
>>> await raw_call("0xTokenAddress", "decimals()")
18
Raises:

ValueError: If the call fails and return_None_on_failure is False. TypeError: If an invalid output type is specified.

See Also:

Since raw_call is an ASyncFunctionSyncDefault, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.

Parameters:
Return type:

Any | None

y.utils.raw_calls.logger = <Logger y.utils.raw_calls (WARNING)>

We use raw calls for commonly used functions because its much faster than using brownie Contracts

Module contents

This module provides utility functions for the ypricemagic library.

The utilities include caching, attribute checking, and method gathering functionalities that are used throughout the library to enhance performance and ensure code reliability.

Imported Functions:

Examples

Using the caching utility: >>> from y.utils.cache import a_sync_ttl_cache >>> @a_sync_ttl_cache … def expensive_function(x): … return x * x >>> result = expensive_function(4)

Checking for attributes: >>> from y.utils.checks import hasall >>> class MyClass: … attr1 = 1 … attr2 = 2 >>> obj = MyClass() >>> hasall(obj, [‘attr1’, ‘attr2’]) True

Gathering methods: >>> from y.utils.gather import gather_methods >>> methods = gather_methods(MyClass) >>> print(methods)

See also