import asyncio
import functools
import logging
from typing import (
Callable,
Dict,
Iterable,
List,
Literal,
Optional,
Tuple,
TypeVar,
overload,
)
import a_sync
import dank_mids
from brownie import ZERO_ADDRESS
from brownie.exceptions import ContractNotFound
from typing_extensions import ParamSpec
from y import ENVIRONMENT_VARIABLES as ENVS
from y import constants, convert
from y._decorators import stuck_coro_debugger
from y.classes import ERC20
from y.datatypes import AnyAddressType, Block, Pool, UsdPrice
from y.exceptions import NonStandardERC20, PriceError, yPriceMagicError
from y.prices import (
band,
chainlink,
convex,
one_to_one,
pendle,
popsicle,
rkp3r,
solidex,
utils,
yearn,
)
from y.prices.dex import *
from y.prices.dex.uniswap import UniswapV2Pool, uniswap_multiplexer
from y.prices.eth_derivs import *
from y.prices.gearbox import gearbox
from y.prices.lending import *
from y.prices.stable_swap import *
from y.prices.synthetix import synthetix
from y.prices.tokenized_fund import *
from y.utils.logging import get_price_logger
_P = ParamSpec("_P")
_T = TypeVar("_T")
cache_logger = logging.getLogger(f"{__name__}.cache")
@overload
async def get_price(
token_address: AnyAddressType,
block: Optional[Block] = None,
*,
fail_to_None: Literal[True],
skip_cache: bool = ENVS.SKIP_CACHE,
ignore_pools: Tuple[Pool, ...] = (),
silent: bool = False,
) -> Optional[UsdPrice]: ...
@overload
async def get_price(
token_address: AnyAddressType,
block: Optional[Block] = None,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
ignore_pools: Tuple[Pool, ...] = (),
silent: bool = False,
) -> UsdPrice: ...
[docs]
@a_sync.a_sync(default="sync")
async def get_price(
token_address: AnyAddressType,
block: Optional[Block] = None,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
ignore_pools: Tuple[Pool, ...] = (),
silent: bool = False,
) -> Optional[UsdPrice]:
"""
Get the price of a token in USD.
Args:
token_address: The address of the token to price.
block (optional): The block number at which to get the price. If None, uses the latest block.
fail_to_None (optional): If True, return None instead of raising a :class:`~yPriceMagicError` on failure. Default False.
skip_cache (optional): If True, bypass the cache and fetch the price directly. Defaults to :obj:`ENVS.SKIP_CACHE`.
ignore_pools (optional): A tuple of pool addresses to ignore when fetching the price.
silent (optional): If True, suppress error logging. Default False.
Returns:
The price of the token in USD, or None if the price couldn't be determined and fail_to_None is True.
Raises:
yPriceMagicError: If the price couldn't be determined and fail_to_None is False.
Note:
Don't pass an int like `123` into `token_address` please, that's just silly.
- ypricemagic accepts ints to allow you to pass `y.get_price(0x0bc529c00C6401aEF6D220BE8C6Ea1667F6Ad93e)`
so you can save yourself some keystrokes while testing in a console
- (as opposed to `y.get_price("0x0bc529c00C6401aEF6D220BE8C6Ea1667F6Ad93e")`)
"""
block = int(block or await dank_mids.eth.block_number)
token_address = await convert.to_address_async(token_address)
try:
return await _get_price(
token_address,
block,
fail_to_None=fail_to_None,
ignore_pools=ignore_pools,
skip_cache=skip_cache,
silent=silent,
)
except (ContractNotFound, NonStandardERC20, PriceError) as e:
symbol = await ERC20(token_address, asynchronous=True).symbol
if not fail_to_None:
raise_from = None if isinstance(e, PriceError) else e
raise yPriceMagicError(e, token_address, block, symbol) from raise_from
@overload
async def get_prices(
token_addresses: Iterable[AnyAddressType],
block: Optional[Block] = None,
*,
fail_to_None: Literal[True],
skip_cache: bool = ENVS.SKIP_CACHE,
silent: bool = False,
) -> List[Optional[UsdPrice]]: ...
@overload
async def get_prices(
token_addresses: Iterable[AnyAddressType],
block: Optional[Block] = None,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
silent: bool = False,
) -> List[UsdPrice]: ...
[docs]
@a_sync.a_sync(default="sync")
async def get_prices(
token_addresses: Iterable[AnyAddressType],
block: Optional[Block] = None,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
silent: bool = False,
) -> List[Optional[UsdPrice]]:
"""
Get prices for multiple tokens in USD.
You should use this function over :func:`get_price` where possible, it is better optimized for parallel execution.
Args:
token_addresses: An iterable of token addresses to price.
block (optional): The block number at which to get the prices. Defaults to the latest block.
fail_to_None (optional): If True, return None for tokens whose price couldn't be determined. Default False.
skip_cache (optional): If True, bypass the cache and fetch prices directly. Defaults to :obj:`ENVS.SKIP_CACHE`.
silent (optional): If True, suppress progress bar and error logging. This kwarg is not currently implemented.
Returns:
A list of token prices in USD, in the same order as the input token_addresses.
"""
return await map_prices(
token_addresses,
block or await dank_mids.eth.block_number,
fail_to_None=fail_to_None,
skip_cache=skip_cache,
silent=silent,
).values(pop=True)
@overload
def map_prices(
token_addresses: Iterable[AnyAddressType],
block: Block,
*,
fail_to_None: Literal[True],
skip_cache: bool = ENVS.SKIP_CACHE,
silent: bool = False,
) -> a_sync.TaskMapping[AnyAddressType, Optional[UsdPrice]]: ...
@overload
def map_prices(
token_addresses: Iterable[AnyAddressType],
block: Block,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
silent: bool = False,
) -> a_sync.TaskMapping[AnyAddressType, UsdPrice]: ...
[docs]
def map_prices(
token_addresses: Iterable[AnyAddressType],
block: Block,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
silent: bool = False,
) -> a_sync.TaskMapping[AnyAddressType, Optional[UsdPrice]]:
"""
Map token addresses to their prices asynchronously.
Args:
token_addresses: An iterable of token addresses to price.
block (optional): The block number at which to get the prices. Defaults to latest block.
fail_to_None (optional): If True, map to None for tokens whose price couldn't be determined. Default False.
skip_cache (optional): If True, bypass the cache and fetch prices directly. Defaults to :obj:`ENVS.SKIP_CACHE`.
silent (optional): If True, suppress error logging. Default False.
Returns:
An :class:`a_sync.TaskMapping` object mapping token addresses to their prices.
"""
return a_sync.map(
get_price,
token_addresses,
block=block,
fail_to_None=fail_to_None,
skip_cache=skip_cache,
silent=silent,
)
def __cache(get_price: Callable[_P, _T]) -> Callable[_P, _T]:
"""
A decorator to cache the results of the get_price function.
Args:
get_price: The function to be cached.
Returns:
A wrapped version of the input function with caching functionality.
"""
@functools.wraps(get_price)
async def cache_wrap(
token: AnyAddressType,
block: Block,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
ignore_pools: Tuple[Pool, ...] = (),
silent: bool = False,
) -> Optional[UsdPrice]:
from y._db.utils import price as db
if not skip_cache and (price := await db.get_price(token, block)):
cache_logger.debug("disk cache -> %s", price)
return price
price = await get_price(
token,
block=block,
fail_to_None=fail_to_None,
ignore_pools=ignore_pools,
silent=silent,
)
if price and not skip_cache:
db.set_price(token, block, price)
return price
return cache_wrap
@stuck_coro_debugger
@a_sync.a_sync(default="async", cache_type="memory", ram_cache_ttl=ENVS.CACHE_TTL)
@__cache
async def _get_price(
token: AnyAddressType,
block: Block,
*,
fail_to_None: bool = False,
skip_cache: bool = ENVS.SKIP_CACHE,
ignore_pools: Tuple[Pool, ...] = (),
silent: bool = False,
) -> Optional[UsdPrice]: # sourcery skip: remove-redundant-if
"""
Internal function to get the price of a token.
This function implements the core logic for fetching token prices.
Args:
token: The address of the token to price.
block: The block number at which to get the price.
fail_to_None: If True, return None instead of raising an exception on failure.
skip_cache: If True, bypass the cache and fetch the price directly.
ignore_pools: A tuple of pool addresses to ignore when fetching the price.
silent: If True, suppress error logging.
Returns:
The price of the token in USD, or None if the price couldn't be determined and fail_to_None is True.
"""
if token == ZERO_ADDRESS:
_fail_appropriately(logger, symbol, fail_to_None, silent)
return None
try:
# We do this to cache the symbol for later, otherwise some repr woudl break
symbol = await ERC20(token, asynchronous=True).symbol
except NonStandardERC20:
symbol = None
logger = get_price_logger(
token, block, symbol=symbol, extra="magic", start_task=True
)
logger.debug("fetching price for %s", symbol)
try:
price = await _get_price_from_api(token, block, logger)
if price is None:
price = await _exit_early_for_known_tokens(
token,
block=block,
ignore_pools=ignore_pools,
skip_cache=skip_cache,
logger=logger,
)
if price is None:
price = await _get_price_from_dexes(
token, block, ignore_pools, skip_cache, logger
)
if price:
await utils.sense_check(token, block, price)
else:
_fail_appropriately(logger, symbol, fail_to_None, silent)
logger.debug("%s price: %s", symbol, price)
if price: # checks for the erroneous 0 value we see once in a while
return price
finally:
logger.close()
@stuck_coro_debugger
async def _exit_early_for_known_tokens(
token_address: str,
block: Block,
logger: logging.Logger,
skip_cache: bool = ENVS.SKIP_CACHE,
ignore_pools: Tuple[Pool, ...] = (),
) -> Optional[UsdPrice]: # sourcery skip: low-code-quality
"""
Attempt to get the price for known token types without having to fully load everything.
This function checks if the token is of a known type (e.g., atoken, balancer pool, etc.)
and attempts to get its price using type-specific methods.
Args:
token_address: The address of the token to price.
block: The block number at which to get the price.
logger: A logger instance for recording debug information.
skip_cache: If True, bypass the cache and fetch the price directly.
ignore_pools: A tuple of pool addresses to ignore when fetching the price.
Returns:
The price of the token if it can be determined early, or None otherwise.
"""
bucket = await utils.check_bucket(token_address, sync=False)
price = None
if bucket == "atoken":
price = await aave.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "balancer pool":
price = await balancer_multiplexer.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "basketdao":
price = await basketdao.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "belt lp":
price = await belt.get_price(token_address, block, sync=False)
elif bucket == "chainlink and band":
price = await chainlink.get_price(
token_address, block, sync=False
) or await band.get_price(token_address, block, sync=False)
elif bucket == "chainlink feed":
price = await chainlink.get_price(token_address, block, sync=False)
elif bucket == "compound":
price = await compound.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "convex":
price = await convex.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "creth":
price = await creth.get_price_creth(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "curve lp":
price = await curve.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "ellipsis lp":
price = await ellipsis.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "froyo":
price = await froyo.get_price(token_address, block=block, sync=False)
elif bucket == "gearbox":
price = await gearbox.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "gelato":
price = await gelato.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "generic amm":
price = await generic_amm.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "ib token":
price = await ib.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "mooniswap lp":
price = await mooniswap.get_pool_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "mstable feeder pool":
price = await mstablefeederpool.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "one to one":
price = await one_to_one.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "pendle lp":
price = await pendle.get_lp_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "piedao lp":
price = await piedao.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "popsicle":
price = await popsicle.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "reserve":
price = await reserve.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "rkp3r":
price = await rkp3r.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "saddle":
price = await saddle.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "solidex":
price = await solidex.get_price(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "stable usd":
price = 1
elif bucket == "synthetix":
price = await synthetix.get_price(token_address, block, sync=False)
elif bucket == "token set":
price = await tokensets.get_price(
token_address, block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "uni or uni-like lp":
price = await UniswapV2Pool(token_address).get_price(
block=block, skip_cache=skip_cache, sync=False
)
elif bucket == "wrapped gas coin":
price = await get_price(
constants.WRAPPED_GAS_COIN, block, skip_cache=skip_cache, sync=False
)
elif bucket == "wrapped atoken v2":
price = await aave.get_price_wrapped_v2(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "wrapped atoken v3":
price = await aave.get_price_wrapped_v3(
token_address, block, skip_cache=skip_cache, sync=False
)
elif bucket == "wsteth":
price = await wsteth.wsteth.get_price(block, skip_cache=skip_cache, sync=False)
elif bucket == "yearn or yearn-like":
price = await yearn.get_price(
token_address,
block,
skip_cache=skip_cache,
ignore_pools=ignore_pools,
sync=False,
)
logger.debug("%s -> %s", bucket, price)
return price
async def _get_price_from_api(
token: AnyAddressType,
block: Block,
logger: logging.Logger,
):
"""
Attempt to get the price from the ypricemagic API.
Args:
token: The address of the token to price.
block: The block number at which to get the price.
logger: A logger instance for recording debug information.
Returns:
The price of the token if it can be fetched from the ypricemagic API, or None otherwise.
"""
if utils.ypriceapi.should_use and token not in utils.ypriceapi.skip_tokens:
price = await utils.ypriceapi.get_price(token, block)
logger.debug("ypriceapi -> %s", price)
return price
async def _get_price_from_dexes(
token: AnyAddressType,
block: Block,
ignore_pools,
skip_cache: bool,
logger: logging.Logger,
):
"""
Attempt to get the price from decentralized exchanges.
This function tries to fetch the price from various DEXes like Uniswap, Curve, and Balancer.
Args:
token: The address of the token to price.
block : The block number at which to get the price.
ignore_pools: A tuple of pool addresses to ignore when fetching the price.
skip_cache: If True, bypass the cache and fetch the price directly.
logger: A logger instance for recording debug information.
Returns:
The price of the token if it can be determined from DEXes, or None otherwise.
"""
# TODO We need better logic to determine whether to use uniswap, curve, balancer. For now this works for all known cases.
dexes = [uniswap_multiplexer]
if curve:
dexes.append(curve)
# TODO: make a DexABC, include balancer and future dexes
# TODO: this would be so cool if a_sync.map could proxy abstractmethods correctly
# dexes_by_depth = dict(
# await DexABC.check_liquidity.map(dexes, token=token, block=block, ignore_pools=ignore_pools).items(pop=True).sort(lambda k, v: v)
# )
liquidity = await asyncio.gather(
*[
dex.check_liquidity(token, block, ignore_pools=ignore_pools, sync=False)
for dex in dexes
]
)
depth_to_dex: Dict[int, object] = dict(zip(liquidity, dexes))
dexes_by_depth: Dict[int, object] = {
depth: depth_to_dex[depth]
for depth in sorted(depth_to_dex, reverse=True)
if depth
}
logger.debug("dexes by depth: %s", dexes_by_depth)
for dex in dexes_by_depth.values():
method = (
"get_price_for_underlying"
if hasattr(dex, "get_price_for_underlying")
else "get_price"
)
logger.debug("trying %s", dex)
price = await getattr(dex, method)(
token, block, ignore_pools=ignore_pools, skip_cache=skip_cache, sync=False
)
logger.debug("%s -> %s", dex, price)
if price:
return price
logger.debug("no %s liquidity found on primary markets", token)
# If price is 0, we can at least try to see if balancer gives us a price. If not, its probably a shitcoin.
if price := await balancer_multiplexer.get_price(
token, block=block, skip_cache=skip_cache, sync=False
):
logger.debug("balancer -> %s", price)
return price
def _fail_appropriately(
logger: logging.Logger,
symbol: str,
fail_to_None: bool,
silent: bool,
) -> None:
"""
Handle failure to get a price appropriately.
This function decides how to handle a failure to get a price based on the input parameters.
Args:
logger: A logger instance for recording error information.
symbol: The symbol of the token whose price couldn't be determined.
fail_to_None: If True, the function will return silently. If False, it will raise a PriceError.
silent: If True, suppress error logging.
Raises:
PriceError: If fail_to_None is False.
"""
if not silent:
logger.warning(f"failed to get price for {symbol}")
if not fail_to_None:
raise PriceError(logger, symbol)