Source code for y.prices.utils.buckets

import asyncio
import logging
from typing import Awaitable, Callable, Tuple

import a_sync

from y import convert
from y.constants import STABLECOINS
from y.datatypes import Address, AnyAddressType
from y.prices import convex, one_to_one, pendle, popsicle, rkp3r, solidex, yearn
from y.prices.band import band
from y.prices.chainlink import chainlink
from y.prices.dex import mooniswap
from y.prices.dex.balancer import balancer_multiplexer
from y.prices.dex.genericamm import is_generic_amm
from y.prices.dex.uniswap import uniswap_multiplexer
from y.prices.eth_derivs import creth, wsteth
from y.prices.gearbox import gearbox
from y.prices.lending import ib
from y.prices.lending.aave import aave
from y.prices.lending.compound import compound
from y.prices.stable_swap import belt, ellipsis, froyo, mstablefeederpool, saddle
from y.prices.stable_swap.curve import curve
from y.prices.synthetix import synthetix
from y.prices.tokenized_fund import basketdao, gelato, piedao, reserve, tokensets
from y.utils.logging import get_price_logger

logger = logging.getLogger(__name__)


[docs] @a_sync.a_sync(default="sync", cache_type="memory") async def check_bucket(token: AnyAddressType) -> str: """ Determine the category or 'bucket' of a given token. This function attempts to classify a token into a specific category based on its characteristics. It first checks the database for a cached bucket using :func:`y._db.utils.token.get_bucket`. If not found, it performs a series of checks that may involve string comparisons, calls to the blockchain, and contract initializations. The determined bucket is then stored using :func:`db.set_bucket`. Args: token: The token address to classify. Examples: >>> bucket = check_bucket("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(bucket) 'stable usd' See Also: - :func:`y.convert.to_address_async` - :func:`y.utils.logging.get_price_logger` - :func:`_check_bucket_helper` """ token_address = await convert.to_address_async(token) logger = get_price_logger(token_address, block=None, extra="buckets") import y._db.utils.token as db bucket = await db.get_bucket(token_address) if bucket: logger.debug("returning bucket %s from ydb", bucket) return bucket # these require neither calls to the chain nor contract initialization, just string comparisons (pretty sure) for bucket, check in string_matchers.items(): if check(token): logger.debug("%s is %s", token_address, bucket) db.set_bucket(token, bucket) return bucket else: logger.debug("%s is not %s", token_address, bucket) # check these first, these just require calls futs = [ asyncio.ensure_future(_check_bucket_helper(bucket, check, token_address)) for bucket, check in calls_only.items() ] for fut in asyncio.as_completed(futs): try: bucket, is_member = await fut except TypeError: raise except Exception as e: logger.warning( "%s when checking %s. This will probably not impact your run.", e, fut ) logger.warning(e, exc_info=True) continue if is_member: logger.debug("%s is %s", token_address, bucket) for fut in futs: fut.cancel() db.set_bucket(token, bucket) return bucket else: logger.debug("%s is not %s", token_address, bucket) bucket = None # TODO: Refactor the below like the above # these require both calls and contract initializations if await solidex.is_solidex_deposit(token_address, sync=False): bucket = "solidex" if await uniswap_multiplexer.is_uniswap_pool(token_address, sync=False): bucket = "uni or uni-like lp" elif gearbox and await gearbox.is_diesel_token(token_address, sync=False): bucket = "gearbox" # this just requires contract initialization but should go behind uniswap elif await aave.is_wrapped_atoken_v2(token_address, sync=False): bucket = "wrapped atoken v2" elif await aave.is_wrapped_atoken_v3(token_address, sync=False): bucket = "wrapped atoken v3" elif await is_generic_amm(token_address): bucket = "generic amm" elif await mooniswap.is_mooniswap_pool(token_address, sync=False): bucket = "mooniswap lp" elif await compound.is_compound_market(token_address, sync=False): bucket = "compound" elif await _chainlink_and_band(token_address): bucket = "chainlink and band" elif chainlink and await chainlink.has_feed(token_address, sync=False): bucket = "chainlink feed" elif synthetix and await synthetix.is_synth(token_address, sync=False): bucket = "synthetix" elif await yearn.is_yearn_vault(token_address, sync=False): bucket = "yearn or yearn-like" elif curve and await curve.get_pool(token_address, sync=False): bucket = "curve lp" logger.debug("%s bucket is %s", token_address, bucket) if bucket: db.set_bucket(token, bucket) return bucket
# these require neither calls to the chain nor contract initialization, just string comparisons (pretty sure) string_matchers = { "wrapped gas coin": lambda address: address == "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE", "stable usd": lambda address: address in STABLECOINS, "one to one": one_to_one.is_one_to_one_token, "wsteth": wsteth.is_wsteth, "creth": creth.is_creth, "belt lp": belt.is_belt_lp, "froyo": froyo.is_froyo, "convex": convex.is_convex_lp, "rkp3r": rkp3r.is_rkp3r, } # these just require calls calls_only = { "atoken": aave.is_atoken, "balancer pool": balancer_multiplexer.is_balancer_pool, "ib token": ib.is_ib_token, "gelato": gelato.is_gelato_pool, "pendle lp": pendle.is_pendle_lp, "piedao lp": piedao.is_pie, "token set": tokensets.is_token_set, "ellipsis lp": ellipsis.is_eps_rewards_pool, "mstable feeder pool": mstablefeederpool.is_mstable_feeder_pool, "saddle": saddle.is_saddle_lp, "basketdao": basketdao.is_basketdao_index, "popsicle": popsicle.is_popsicle_lp, "reserve": reserve.is_rtoken, } async def _chainlink_and_band(token_address) -> bool: """ Check if a token is supported by both Chainlink and Band oracles. This function is primarily used for historical data on the Fantom network, where Band was used before Chainlink became available. Args: token_address: The address of the token to check. Examples: >>> is_supported = await _chainlink_and_band("0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(is_supported) True See Also: - :func:`y.prices.chainlink.has_feed` """ return ( chainlink and await chainlink.has_feed(token_address, sync=False) and token_address in band ) async def _check_bucket_helper( bucket: str, check: Callable[[Address], Awaitable[bool]], address: Address ) -> Tuple[str, bool]: """ Helper function to check if a token belongs to a specific bucket. Args: bucket: The name of the bucket to check. check: A callable that checks if the token belongs to the bucket. address: The address of the token to check. Examples: >>> result = await _check_bucket_helper("stable usd", lambda x: x in STABLECOINS, "0x6B175474E89094C44Da98b954EedeAC495271d0F") >>> print(result) ('stable usd', True) See Also: - :func:`check_bucket` """ result = await check(address, sync=False) # TODO: debug why we have to re-await sometimes when @optional_async_diskcache is used if not isinstance(result, bool): if not asyncio.iscoroutine(result): raise TypeError(f"{bucket} result must be boolean. You passed {result}") result = await result if not isinstance(result, bool): # if not asyncio.iscoroutine(result): raise TypeError(f"{bucket} result must be boolean. You passed {result}") # result = await result return bucket, result