y.utils package
Submodules
y.utils.cache module
- y.utils.cache.optional_async_diskcache(fn)[source]
Decorator to optionally apply disk caching to asynchronous functions.
If toolcache is not installed, this decorator will return the function as is, without any caching applied.
- Parameters:
fn (Callable[[~P], Awaitable[T]] | Callable[[~P], T]) – The function to be decorated.
- Return type:
Examples
Using the decorator with an asynchronous function:
>>> async def fetch_data(): ... return "data" >>> fetch_data = optional_async_diskcache(fetch_data)
See also
toolcache
y.utils.checks module
Utility functions for performing various checks.
- y.utils.checks.hasall(obj, attrs)[source]
Check if an object has all the specified attributes.
- Parameters:
- Returns:
True if the object has all the specified attributes, False otherwise.
- Return type:
Example
>>> class TestClass: ... attr1 = 1 ... attr2 = 2 >>> test_obj = TestClass() >>> hasall(test_obj, ['attr1', 'attr2']) True >>> hasall(test_obj, ['attr1', 'attr3']) False
y.utils.client module
Utility functions for retrieving Ethereum client information.
- y.utils.client.get_ethereum_client()[source]
Get the Ethereum client type for the current connection.
- Returns:
A string representing the Ethereum client type, such as ‘geth’, ‘erigon’, or ‘tg’.
- Return type:
Examples
>>> from y.utils.client import get_ethereum_client >>> client = get_ethereum_client() >>> print(client) 'geth'
>>> # Example with a different client >>> client = get_ethereum_client() >>> print(client) 'erigon'
See also
get_ethereum_client_async()
for the asynchronous version of this function.
- y.utils.client.get_ethereum_client_async()[source]
Asynchronously get the Ethereum client type for the current connection.
- Returns:
A string representing the Ethereum client type, such as ‘geth’, ‘erigon’, or ‘tg’.
- Return type:
Examples
>>> from y.utils.client import get_ethereum_client_async >>> client = await get_ethereum_client_async() >>> print(client) 'erigon'
>>> # Example with a different client >>> client = await get_ethereum_client_async() >>> print(client) 'tg'
See also
get_ethereum_client()
for the synchronous version of this function.
y.utils.dank_mids module
y.utils.events module
- class y.utils.events.Events[source]
Bases:
LogFilter
A class for fetching and processing events.
This class extends
LogFilter
to provide additional functionality for handling events.When awaited, a list of all
Log
will be returned.Example
>>> my_object = Events(...) >>> all_contents = await my_object >>> isinstance(all_contents, list) True >>> isinstance(all_contents[0], Log) True
- obj_type
alias of
_EventItem
- __aiter__(self) AsyncIterator[T]
Return an async iterator that yields
T
objects from the ASyncIterable.- Return type:
- __await__(self) Generator[Any, Any, List[T]]
Asynchronously iterate through the ASyncIterable and return all
T
objects.
- __init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)
Initialize a LogFilter instance.
- Parameters:
addresses – List of contract addresses to fetch logs from.
topics – List of event topics to filter logs by.
from_block (int | None) – The starting block to fetch logs from.
chunk_size (int) – The number of blocks to fetch in each chunk.
chunks_per_batch (int | None) – The number of chunks to fetch in each batch.
semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.
executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.
is_reusable (bool) – Whether the filter is reusable.
verbose (bool) – Verbosity level for logging.
- Return type:
None
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs)
- __iter__(self) Iterator[T]
Return an iterator that yields
T
objects from the ASyncIterable.Note
Synchronous iteration leverages
ASyncIterator
, which usesasyncio.BaseEventLoop.run_until_complete()
to fetch items.ASyncIterator.__next__()
raises aSyncModeInAsyncContextError
if the event loop is already running.If you encounter a
SyncModeInAsyncContextError
, you are likely working in an async codebase and should consider asynchronous iteration using__aiter__()
and__anext__()
instead.- Return type:
Iterator[T]
- events(to_block)[source]
Get events up to a given block.
- Parameters:
to_block (int) – The ending block to fetch events to.
- Yields:
A decoded event.
- Return type:
Examples
>>> events = Events(addresses=["0x1234..."], topics=["0x5678..."]) >>> async for event in events.events(1000100): ... print(event)
- filter(self, function: ViewFn[T]) 'ASyncFilter[T]'
Filters the
T
objects yielded by the ASyncIterable based on a function.- Parameters:
function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.
- Returns:
An instance of
ASyncFilter
that yields the filteredT
objects from the ASyncIterable.- Return type:
ASyncFilter[T]
- logs(to_block)
Get logs up to a given block.
- Parameters:
to_block (int | None) – The ending block to fetch logs to.
- Yields:
A raw log.
- Return type:
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs)
- sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'
Sort the
T
objects yielded by the ASyncIterable.- Parameters:
key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.
reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.
- Returns:
An instance of
ASyncSorter
that will yield theT
objects yielded from this ASyncIterable, but sorted.- Return type:
ASyncSorter[T]
- classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'
Class method to wrap an AsyncIterable for backward compatibility.
- Parameters:
wrapped (AsyncIterable[T])
- Return type:
- __wrapped__: AsyncIterable[T]
- addresses
- property bulk_insert: Callable[[List[Log]], Awaitable[None]]
Get the function for bulk inserting logs into the database.
- Returns:
A function for bulk inserting logs.
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> await log_filter.bulk_insert(logs)
- property cache: LogCache
- property executor: AsyncThreadPoolExecutor
- from_block
- property insert_to_db: Callable[[Log], None]
Get the function for inserting logs into the database.
- Raises:
NotImplementedError – If this method is not implemented in the subclass.
- is_reusable
- property materialized: List[T]
Synchronously iterate through the ASyncIterable and return all
T
objects.- Returns:
A list of the
T
objects yielded by the ASyncIterable.
- property semaphore: BlockSemaphore
- to_block
- topics
- class y.utils.events.LogFilter[source]
Bases:
Filter
[Log
, LogCache]A filter for fetching and processing event logs.
This class provides methods to fetch logs from the blockchain and process them.
When awaited, a list of all
Log
will be returned.Example
>>> my_object = LogFilter(...) >>> all_contents = await my_object >>> isinstance(all_contents, list) True >>> isinstance(all_contents[0], Log) True
- __aiter__(self) AsyncIterator[T]
Return an async iterator that yields
T
objects from the ASyncIterable.- Return type:
- __await__(self) Generator[Any, Any, List[T]]
Asynchronously iterate through the ASyncIterable and return all
T
objects.
- __init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)[source]
Initialize a LogFilter instance.
- Parameters:
addresses – List of contract addresses to fetch logs from.
topics – List of event topics to filter logs by.
from_block (int | None) – The starting block to fetch logs from.
chunk_size (int) – The number of blocks to fetch in each chunk.
chunks_per_batch (int | None) – The number of chunks to fetch in each batch.
semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.
executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.
is_reusable (bool) – Whether the filter is reusable.
verbose (bool) – Verbosity level for logging.
- Return type:
None
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs)
- __iter__(self) Iterator[T]
Return an iterator that yields
T
objects from the ASyncIterable.Note
Synchronous iteration leverages
ASyncIterator
, which usesasyncio.BaseEventLoop.run_until_complete()
to fetch items.ASyncIterator.__next__()
raises aSyncModeInAsyncContextError
if the event loop is already running.If you encounter a
SyncModeInAsyncContextError
, you are likely working in an async codebase and should consider asynchronous iteration using__aiter__()
and__anext__()
instead.- Return type:
Iterator[T]
- filter(self, function: ViewFn[T]) 'ASyncFilter[T]'
Filters the
T
objects yielded by the ASyncIterable based on a function.- Parameters:
function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.
- Returns:
An instance of
ASyncFilter
that yields the filteredT
objects from the ASyncIterable.- Return type:
ASyncFilter[T]
- logs(to_block)[source]
Get logs up to a given block.
- Parameters:
to_block (int | None) – The ending block to fetch logs to.
- Yields:
A raw log.
- Return type:
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs)
- sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'
Sort the
T
objects yielded by the ASyncIterable.- Parameters:
key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.
reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.
- Returns:
An instance of
ASyncSorter
that will yield theT
objects yielded from this ASyncIterable, but sorted.- Return type:
ASyncSorter[T]
- classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'
Class method to wrap an AsyncIterable for backward compatibility.
- Parameters:
wrapped (AsyncIterable[T])
- Return type:
- __wrapped__: AsyncIterable[T]
- addresses
- property bulk_insert: Callable[[List[Log]], Awaitable[None]]
Get the function for bulk inserting logs into the database.
- Returns:
A function for bulk inserting logs.
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> await log_filter.bulk_insert(logs)
- property cache: LogCache
- property executor: AsyncThreadPoolExecutor
- from_block
- property insert_to_db: Callable[[Log], None]
Get the function for inserting logs into the database.
- Raises:
NotImplementedError – If this method is not implemented in the subclass.
- is_reusable
- property materialized: List[T]
Synchronously iterate through the ASyncIterable and return all
T
objects.- Returns:
A list of the
T
objects yielded by the ASyncIterable.
- property semaphore: BlockSemaphore
- to_block
- topics
- class y.utils.events.ProcessedEvents[source]
Bases:
Events
,ASyncIterable
[T
]A class for fetching, processing, and iterating over events.
This class extends
Events
to provide additional functionality for processing events.When awaited, a list of all
T
objects will be returned.Example
>>> my_object = ProcessedEvents(...) >>> all_contents = await my_object >>> isinstance(all_contents, list) True >>> isinstance(all_contents[0], T) True
- obj_type
alias of
_EventItem
- __aiter__(self) AsyncIterator[T]
Return an async iterator that yields
T
objects from the ASyncIterable.- Return type:
- __await__(self) Generator[Any, Any, List[T]]
Asynchronously iterate through the ASyncIterable and return all
T
objects.
- __init__(*, addresses=[], topics=[], from_block=None, chunk_size=10000, chunks_per_batch=None, semaphore=None, executor=None, is_reusable=True, verbose=False)
Initialize a LogFilter instance.
- Parameters:
addresses – List of contract addresses to fetch logs from.
topics – List of event topics to filter logs by.
from_block (int | None) – The starting block to fetch logs from.
chunk_size (int) – The number of blocks to fetch in each chunk.
chunks_per_batch (int | None) – The number of chunks to fetch in each batch.
semaphore (BlockSemaphore | None) – A semaphore for limiting concurrent requests.
executor (_AsyncExecutorMixin | None) – An executor for running tasks asynchronously.
is_reusable (bool) – Whether the filter is reusable.
verbose (bool) – Verbosity level for logging.
- Return type:
None
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs)
- __iter__(self) Iterator[T]
Return an iterator that yields
T
objects from the ASyncIterable.Note
Synchronous iteration leverages
ASyncIterator
, which usesasyncio.BaseEventLoop.run_until_complete()
to fetch items.ASyncIterator.__next__()
raises aSyncModeInAsyncContextError
if the event loop is already running.If you encounter a
SyncModeInAsyncContextError
, you are likely working in an async codebase and should consider asynchronous iteration using__aiter__()
and__anext__()
instead.- Return type:
Iterator[T]
- events(to_block)
Get events up to a given block.
- Parameters:
to_block (int) – The ending block to fetch events to.
- Yields:
A decoded event.
- Return type:
Examples
>>> events = Events(addresses=["0x1234..."], topics=["0x5678..."]) >>> async for event in events.events(1000100): ... print(event)
- filter(self, function: ViewFn[T]) 'ASyncFilter[T]'
Filters the
T
objects yielded by the ASyncIterable based on a function.- Parameters:
function (Callable[[T], Awaitable[bool]] | Callable[[T], bool]) – A function that returns a boolean that indicates if an item should be included in the filtered result. Can be sync or async.
- Returns:
An instance of
ASyncFilter
that yields the filteredT
objects from the ASyncIterable.- Return type:
ASyncFilter[T]
- logs(to_block)
Get logs up to a given block.
- Parameters:
to_block (int | None) – The ending block to fetch logs to.
- Yields:
A raw log.
- Return type:
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> logs = log_filter.logs(1000100) >>> print(logs)
- objects(to_block)[source]
Get an
ASyncIterator
that yields all events up to a given block.- Parameters:
to_block (int) – The ending block to fetch events to.
- Returns:
An
ASyncIterator
that yields all included events.- Return type:
Examples
>>> processed_events = ProcessedEvents(addresses=["0x1234..."], topics=["0x5678..."]) >>> async for event in processed_events.objects(1000100): ... print(event)
- sort(self, *, key: SortKey[T] = None, reverse: bool = False) 'ASyncSorter[T]'
Sort the
T
objects yielded by the ASyncIterable.- Parameters:
key (optional) – A function of one argument that is used to extract a comparison key from each list element. If None, the elements themselves will be sorted. Defaults to None.
reverse (optional) – If True, the yielded elements will be sorted in reverse order. Defaults to False.
- Returns:
An instance of
ASyncSorter
that will yield theT
objects yielded from this ASyncIterable, but sorted.- Return type:
ASyncSorter[T]
- classmethod wrap(cls, wrapped: AsyncIterable[T]) 'ASyncIterable[T]'
Class method to wrap an AsyncIterable for backward compatibility.
- Parameters:
wrapped (AsyncIterable[T])
- Return type:
- __wrapped__: AsyncIterable[T]
- addresses
- property bulk_insert: Callable[[List[Log]], Awaitable[None]]
Get the function for bulk inserting logs into the database.
- Returns:
A function for bulk inserting logs.
Examples
>>> log_filter = LogFilter(addresses=["0x1234..."], topics=["0x5678..."]) >>> await log_filter.bulk_insert(logs)
- property cache: LogCache
- property executor: AsyncThreadPoolExecutor
- from_block
- property insert_to_db: Callable[[Log], None]
Get the function for inserting logs into the database.
- Raises:
NotImplementedError – If this method is not implemented in the subclass.
- is_reusable
- property materialized: List[T]
Synchronously iterate through the ASyncIterable and return all
T
objects.- Returns:
A list of the
T
objects yielded by the ASyncIterable.
- property semaphore: BlockSemaphore
- to_block
- topics
- ASyncFunctiony.utils.events._get_logs(address: eth_typing.evm.ChecksumAddress | None, topics: List[str] | None, start: int | eth_typing.evm.BlockNumber, end: int | eth_typing.evm.BlockNumber) List[evmspec.structs.log.Log] [source]
Get logs for a given address, topics, and block range.
- Args:
address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. start: The starting block to fetch logs from. end: The ending block to fetch logs to.
- Returns:
A list of raw logs.
- Examples:
>>> logs = _get_logs("0x1234...", ["0x5678..."], 1000000, 1000100) >>> print(logs)
Since _get_logs is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
address (ChecksumAddress | None)
start (int | BlockNumber)
end (int | BlockNumber)
- Return type:
- y.utils.events.checkpoints_to_weight(checkpoints, start_block, end_block)[source]
Calculate the weight of checkpoints between two blocks.
- Parameters:
checkpoints – A dictionary of checkpoints.
start_block (int | BlockNumber) – The starting block number.
end_block (int | BlockNumber) – The ending block number.
- Returns:
The calculated weight.
- Return type:
Examples
>>> checkpoints = {0: 100, 10: 200} >>> weight = checkpoints_to_weight(checkpoints, 0, 10) >>> print(weight)
- y.utils.events.decode_logs(logs)[source]
Decode logs to events and enrich them with additional info.
- Parameters:
logs (Iterable[LogReceipt] | Iterable[Log]) – An iterable of
LogReceipt
orLog
objects.- Returns:
An
EventDict
containing decoded events.- Return type:
Examples
>>> logs = [LogReceipt(...), LogReceipt(...)] >>> events = decode_logs(logs) >>> print(events)
See also
- ASyncFunctiony.utils.events.get_logs_asap(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | NoneType, topics: List[str] | None, from_block: int | eth_typing.evm.BlockNumber | NoneType = None, to_block: int | eth_typing.evm.BlockNumber | NoneType = None, verbose: int = 0) List[Any] [source]
Get logs as soon as possible.
This function fetches raw logs from the blockchain within the specified block range. The logs are not decoded; use
decode_logs()
to decode them if needed.- Args:
address: The address of the contract to fetch logs from. topics: The event topics to filter logs by. from_block: The starting block to fetch logs from. to_block: The ending block to fetch logs to. verbose: Verbosity level for logging.
- Returns:
A list of raw logs.
- Examples:
Synchronous usage: >>> logs = get_logs_asap(“0x1234…”, [“0x5678…”], 1000000, 1000100) >>> decoded_logs = decode_logs(logs)
Asynchronous usage: >>> logs = await get_logs_asap(“0x1234…”, [“0x5678…”], 1000000, 1000100, sync=False) >>> decoded_logs = decode_logs(logs)
- See Also:
Since get_logs_asap is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
address (str | HexBytes | AnyAddress | EthAddress | None)
from_block (int | BlockNumber | None)
to_block (int | BlockNumber | None)
verbose (int)
- Return type:
- async y.utils.events.get_logs_asap_generator(address, topics=None, from_block=None, to_block=None, chronological=True, run_forever=False, run_forever_interval=60, verbose=0)[source]
Get logs as soon as possible in a generator.
This function fetches raw logs from the blockchain within the specified block range and yields them as they are retrieved. The logs are not decoded; use
decode_logs()
to decode them if needed.- Parameters:
address (str | HexBytes | AnyAddress | EthAddress | None) – The address of the contract to fetch logs from.
topics (List[str] | None) – The event topics to filter logs by.
from_block (int | BlockNumber | None) – The starting block to fetch logs from.
to_block (int | BlockNumber | None) – The ending block to fetch logs to.
chronological (bool) – If True, yield logs in chronological order.
run_forever (bool) – If True, run indefinitely, fetching new logs periodically.
run_forever_interval (int) – The interval in seconds to wait between fetches when running forever.
verbose (int) – Verbosity level for logging.
- Yields:
Lists of raw logs.
- Return type:
AsyncGenerator[List[LogReceipt], None]
Examples
>>> async for logs in get_logs_asap_generator("0x1234...", ["0x5678..."], 1000000, 1000100): ... decoded_logs = decode_logs(logs)
See also
- y.utils.events.logs_to_balance_checkpoints(logs)[source]
Convert Transfer logs to {address: {from_block: balance}} checkpoints.
- Parameters:
logs – An iterable of logs to convert.
- Returns:
A dictionary mapping addresses to balance checkpoints.
- Return type:
Examples
>>> logs = [Log(...), Log(...)] >>> checkpoints = logs_to_balance_checkpoints(logs) >>> print(checkpoints)
y.utils.fakes module
- y.utils.fakes.FAKE_TOKENS = ['0xB1D75F4Fb67c7e93f05890f5eEAC2F3884991FF9', '0xcD545222eBf01143c1188bF712ee5e89c4278Afa', '0x6B2751Cd339217B2CAeD3485fc7a92256681053F', '0x224e13dF4b4DbF41820ec848B19bB6f015F8bf7b', '0x719A75aa3Dc05DEF57Be2F3eC0f4098475631D1c']
FAKE_TOKENS is a dictionary that maps network identifiers to lists of fake token addresses.
This dictionary is used to provide a set of fake token addresses for different blockchain networks. The keys are network identifiers from the
Network
enum, and the values are lists of fake token addresses represented as strings.Examples
>>> from y.utils.fakes import FAKE_TOKENS >>> FAKE_TOKENS[Network.Mainnet] ['0xB1D75F4Fb67c7e93f05890f5eEAC2F3884991FF9', '0xcD545222eBf01143c1188bF712ee5e89c4278Afa', ...]
>>> FAKE_TOKENS[Network.Fantom] ['0x1B27A9dE6a775F98aaA5B90B62a4e2A0B84DbDd9', '0x6E0aA9718C56Ef5d19ccf57955284C7CD95737be', ...]
See also
Network
for network ID definitions.brownie
for blockchain interaction.
y.utils.gather module
Utility functions for gathering method results asynchronously.
- async y.utils.gather.gather_methods(address, methods, *, block=None, return_exceptions=False)[source]
Asynchronously gather results from multiple contract methods.
- Parameters:
- Returns:
A tuple containing the results of the method calls.
- Return type:
Example
>>> from y.utils import gather_methods >>> address = "0x6B175474E89094C44Da98b954EedeAC495271d0F" # DAI >>> methods = ["name()", "symbol()", "decimals()"] >>> results = await gather_methods(address, methods) >>> print(results) ('Dai Stablecoin', 'DAI', 18)
>>> # Using raw method calls >>> methods = ["name()(string)", "symbol()(string)", "decimals()(uint8)"] >>> results = await gather_methods(address, methods) >>> print(results) ('Dai Stablecoin', 'DAI', 18)
y.utils.logging module
- class y.utils.logging.PriceLogger[source]
Bases:
Logger
- __init__(name, level=0)
Initialize the logger with a name and an optional level.
- addFilter(filter)
Add the specified filter to this handler.
- addHandler(hdlr)
Add the specified handler to this logger.
- callHandlers(record)
Pass a record to all relevant handlers.
Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger whose handlers are called.
- critical(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘CRITICAL’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.critical(“Houston, we have a %s”, “major disaster”, exc_info=1)
- debug(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘DEBUG’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=1)
- error(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘ERROR’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.error(“Houston, we have a %s”, “major problem”, exc_info=1)
- exception(msg, *args, exc_info=True, **kwargs)
Convenience method for logging an ERROR with exception information.
- fatal(msg, *args, **kwargs)
Don’t use this method, use critical() instead.
- filter(record)
Determine if a record is loggable by consulting all the filters.
The default is to allow the record to be logged; any filter can veto this and the record is then dropped. Returns a zero value if a record is to be dropped, else non-zero.
Changed in version 3.2: Allow filters to be just callables.
- findCaller(stack_info=False, stacklevel=1)
Find the stack frame of the caller so that we can note the source file name, line number and function name.
- getChild(suffix)
Get a logger which is a descendant to this one.
This is a convenience method, such that
logging.getLogger(‘abc’).getChild(‘def.ghi’)
is the same as
logging.getLogger(‘abc.def.ghi’)
It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.
- getEffectiveLevel()
Get the effective level for this logger.
Loop through this logger and its parents in the logger hierarchy, looking for a non-zero logging level. Return the first one found.
- handle(record)
Call the handlers for the specified record.
This method is used for unpickled records received from a socket, as well as those created locally. Logger-level filtering is applied.
- hasHandlers()
See if this logger has any handlers configured.
Loop through all handlers for this logger and its parents in the logger hierarchy. Return True if a handler was found, else False. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger which is checked for the existence of handlers.
- info(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘INFO’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.info(“Houston, we have a %s”, “interesting problem”, exc_info=1)
- isEnabledFor(level)
Is this logger enabled for level ‘level’?
- log(level, msg, *args, **kwargs)
Log ‘msg % args’ with the integer severity ‘level’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.log(level, “We have a %s”, “mysterious problem”, exc_info=1)
- makeRecord(name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None)
A factory method which can be overridden in subclasses to create specialized LogRecords.
- removeFilter(filter)
Remove the specified filter from this handler.
- removeHandler(hdlr)
Remove the specified handler from this logger.
- setLevel(level)
Set the logging level of this logger. level must be an int or a str.
- warn(msg, *args, **kwargs)
- warning(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘WARNING’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.warning(“Houston, we have a %s”, “bit of a problem”, exc_info=1)
- key: Tuple[str | HexBytes | AnyAddress | EthAddress | Contract | int, int | BlockNumber, str | None, str]
- manager = <logging.Manager object>
- root = <RootLogger root (WARNING)>
- y.utils.logging.enable_debug_logging(logger='y')[source]
Enables ypricemagic’s debugging mode. Very verbose.
- Parameters:
logger (str) – The name of the logger to enable debugging for. Defaults to “y”.
- Return type:
None
Example
>>> enable_debug_logging("y")
- y.utils.logging.get_price_logger(token_address, block, *, symbol=None, extra='', start_task=False)[source]
Create or retrieve a PriceLogger instance for a given token address and block.
This function manages a cache of loggers to ensure they have the proper members for ypricemagic. If a logger is enabled for DEBUG, it will start a debug task if specified.
- Parameters:
token_address (str | HexBytes | AnyAddress | EthAddress | Contract | int) – The address of the token.
block (int | BlockNumber) – The block number.
symbol (str | None) – An optional symbol for the token.
extra (str) – An optional extra string to append to the logger name.
start_task (bool) – Whether to start a debug task. Defaults to False.
- Return type:
Example
>>> logger = get_price_logger("0xTokenAddress", 123456) >>> logger.debug("This is a debug message.")
See also
y.utils.middleware module
- y.utils.middleware.getcode_cache_middleware(make_request, web3)[source]
Middleware for caching eth_getCode calls.
- Parameters:
make_request (Callable) – The original request function.
web3 (Web3) – The Web3 instance.
- Returns:
A middleware function that caches eth_getCode calls.
- Return type:
Examples
>>> from web3 import Web3 >>> w3 = Web3(Web3.HTTPProvider('http://localhost:8545')) >>> middleware = getcode_cache_middleware(w3.manager.request_blocking, w3) >>> w3.middleware_onion.add(middleware)
See also
- y.utils.middleware.setup_getcode_cache_middleware()[source]
Set up the eth_getCode cache middleware for the current Web3 provider.
This function modifies the Web3 provider to use a custom session with increased connection pool size and timeout, if the provider’s endpoint URI starts with “http” or “https”. If the provider is an IPCProvider, it does not modify the session.
Examples
>>> setup_getcode_cache_middleware()
See also
- Return type:
None
- y.utils.middleware.setup_geth_poa_middleware()[source]
Set up the geth proof-of-authority middleware for the current Web3 provider.
Examples
>>> setup_geth_poa_middleware()
See also
web3.middleware.geth_poa.geth_poa_middleware()
- Return type:
None
- y.utils.middleware.should_cache(method, params)[source]
Determine if a method call should be cached.
- Parameters:
- Returns:
True if the method call should be cached, False otherwise.
- Return type:
Examples
>>> should_cache("eth_getCode", ["0x1234", "latest"]) True
>>> should_cache("eth_getBalance", ["0x1234", "latest"]) False
y.utils.multicall module
- ASyncFunctiony.utils.multicall.fetch_multicall(*calls: Any, block: int | eth_typing.evm.BlockNumber | NoneType = None) List[Any | None] [source]
Since fetch_multicall is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
calls (Any)
block (int | BlockNumber | None)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_decimals(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int] [source]
Since multicall_decimals is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
addresses (Iterable[str | HexBytes | AnyAddress | EthAddress | Contract])
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_same_func_no_input(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int], method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any] [source]
Since multicall_same_func_no_input is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
addresses (Iterable[str | HexBytes | AnyAddress | EthAddress | Contract | int])
method (str)
block (int | BlockNumber | None)
apply_func (Callable | None)
return_None_on_failure (bool)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_same_func_same_contract_different_inputs(address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract | int, method: str, inputs: List | Tuple, block: int | eth_typing.evm.BlockNumber | NoneType = None, apply_func: Callable | None = None, return_None_on_failure: bool = False) List[Any] [source]
Since multicall_same_func_same_contract_different_inputs is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
address (str | HexBytes | AnyAddress | EthAddress | Contract | int)
method (str)
block (int | BlockNumber | None)
apply_func (Callable | None)
return_None_on_failure (bool)
- Return type:
- ASyncFunctiony.utils.multicall.multicall_totalSupply(addresses: Iterable[str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract], block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = True) List[int] [source]
Since multicall_totalSupply is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
addresses (Iterable[str | HexBytes | AnyAddress | EthAddress | Contract])
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
y.utils.raw_calls module
- ASyncFunctiony.utils.raw_calls._cached_call_fn(func: Callable, contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType, required_arg=None) Any [source]
Since _cached_call_fn is an
ASyncFunctionAsyncDefault
, you can optionally pass sync=True or asynchronous=False to force it to run synchronously and return a value. Without either kwarg, it will return a coroutine for you to await.- Parameters:
func (Callable)
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
- Return type:
- ASyncFunctiony.utils.raw_calls._decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None [source]
Since _decimals is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
int | None
- ASyncFunctiony.utils.raw_calls._totalSupply(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None [source]
Since _totalSupply is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
int | None
- ASyncFunctiony.utils.raw_calls.balanceOf(call_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, input_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int | None [source]
Since balanceOf is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
call_address (str | HexBytes | AnyAddress | EthAddress | Contract)
input_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
int | None
- ASyncFunctiony.utils.raw_calls.decimals(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, block: int | eth_typing.evm.BlockNumber | NoneType = None, return_None_on_failure: bool = False) int [source]
Since decimals is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
block (int | BlockNumber | None)
return_None_on_failure (bool)
- Return type:
- y.utils.raw_calls.prepare_data(method, inputs=typing.Union[NoneType, bytes, int, str, hexbytes.main.HexBytes, ~AnyAddress, brownie.convert.datatypes.EthAddress, brownie.network.contract.Contract, y.contracts.Contract])[source]
Prepare data for a raw contract call by encoding the method signature and input data.
This function takes a method signature and input data, and returns a hexadecimal string that can be used as the data field in a raw contract call.
- Parameters:
method – The method signature as a string (e.g., “transfer(address,uint256)”).
inputs – The input data for the method. Can be None, bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.
- Returns:
A hexadecimal string representing the encoded method call data.
- Raises:
CalldataPreparationError – If the input type is not supported.
- Return type:
Note
The method signature is encoded to its 4-byte function selector.
If inputs is None, only the method selector is returned.
For other input types, the input is prepared and appended to the method selector.
Examples
>>> prepare_data("transfer(address,uint256)", inputs=("0xRecipientAddress", 1000)) '0xa9059cbb000000000000000000000000RecipientAddress00000000000000000000000000000000000003e8'
>>> prepare_data("decimals()") '0x313ce567'
See also
raw_call()
for making the contract call.prepare_input()
for preparing individual inputs.
- y.utils.raw_calls.prepare_input(input)[source]
Prepare input data for a raw contract call by encoding it to a hexadecimal string.
This function takes various input types and converts them to a hexadecimal string that can be used as part of the data field in a raw contract call.
- Parameters:
input (bytes | int | str | HexBytes | AnyAddress | EthAddress | Contract | Contract) – The input data to be prepared. Can be bytes, int, str, Address, EthAddress, brownie.Contract, or Contract.
- Returns:
A hexadecimal string representing the encoded input data.
- Raises:
CalldataPreparationError – If the input type is not supported.
- Return type:
Note
Bytes input is converted directly to its hexadecimal representation.
Integer input is converted to bytes and then to hexadecimal.
String, Address, EthAddress, brownie.Contract, and Contract inputs are assumed to be addresses and are padded to 32 bytes.
Examples
>>> prepare_input("0xRecipientAddress") '000000000000000000000000RecipientAddress'
>>> prepare_input(1000) '00000000000000000000000000000000000000000000000000000000000003e8'
See also
prepare_data()
for preparing the call data.raw_call()
for making the contract call.
- ASyncFunctiony.utils.raw_calls.raw_call(contract_address: str | hexbytes.main.HexBytes | AnyAddress | brownie.convert.datatypes.EthAddress | brownie.network.contract.Contract, method: str, block: int | eth_typing.evm.BlockNumber | NoneType = None, inputs=None, output: str = None, return_None_on_failure: bool = False) Any | None [source]
Call a contract with only address and method. Bypasses brownie Contract object formation to save time.
This function allows for direct interaction with a contract by preparing and sending the call data without creating a full contract object. It supports various input types and can handle multiple inputs.
- Args:
contract_address: The address of the contract. method: The method signature as a string (e.g., “transfer(address,uint256)”). block: The block number at which to make the call. Defaults to the latest block. inputs: The input data for the method. Can be None, bytes, int, str, Address,
EthAddress, brownie.Contract, or Contract.
output: The expected output type. Options are “address”, “int”, “str”. return_None_on_failure: If True, return None if the call fails. Default False.
- Examples:
>>> await raw_call("0xTokenAddress", "balanceOf(address)", inputs="0xHolderAddress") 1000000000000000000
>>> await raw_call("0xTokenAddress", "decimals()") 18
- Raises:
ValueError: If the call fails and return_None_on_failure is False. TypeError: If an invalid output type is specified.
- See Also:
prepare_data()
for preparing the call data.prepare_input()
for preparing individual inputs.
Since raw_call is an
ASyncFunctionSyncDefault
, you can optionally pass sync=False or asynchronous=True to force it to return a coroutine. Without either kwarg, it will run synchronously.- Parameters:
contract_address (str | HexBytes | AnyAddress | EthAddress | Contract)
method (str)
block (int | BlockNumber | None)
output (str)
return_None_on_failure (bool)
- Return type:
Any | None
- y.utils.raw_calls.logger = <Logger y.utils.raw_calls (WARNING)>
We use raw calls for commonly used functions because its much faster than using brownie Contracts
Module contents
This module provides utility functions for the ypricemagic library.
The utilities include caching, attribute checking, and method gathering functionalities that are used throughout the library to enhance performance and ensure code reliability.
- Imported Functions:
y.utils.cache.a_sync_ttl_cache()
: Provides a caching mechanism with a time-to-live (TTL) feature.y.utils.checks.hasall()
: Checks if an object has all specified attributes.y.utils.gather.gather_methods()
: Gathers methods from a class or module.
Examples
Using the caching utility: >>> from y.utils.cache import a_sync_ttl_cache >>> @a_sync_ttl_cache … def expensive_function(x): … return x * x >>> result = expensive_function(4)
Checking for attributes: >>> from y.utils.checks import hasall >>> class MyClass: … attr1 = 1 … attr2 = 2 >>> obj = MyClass() >>> hasall(obj, [‘attr1’, ‘attr2’]) True
Gathering methods: >>> from y.utils.gather import gather_methods >>> methods = gather_methods(MyClass) >>> print(methods)
See also
y.utils.cache
: For caching utilities.y.utils.checks
: For attribute checking utilities.y.utils.gather
: For method gathering utilities.