Source code for y.prices.utils.buckets

import asyncio
import logging
from typing import Awaitable, Callable, Tuple

import a_sync

from y import convert
from y.constants import STABLECOINS
from y.datatypes import Address, AnyAddressType
from y.prices import convex, one_to_one, pendle, popsicle, rkp3r, solidex, yearn
from y.prices.band import band
from y.prices.chainlink import chainlink
from y.prices.dex import mooniswap
from y.prices.dex.balancer import balancer_multiplexer
from y.prices.dex.genericamm import is_generic_amm
from y.prices.dex.uniswap import uniswap_multiplexer
from y.prices.eth_derivs import creth, wsteth
from y.prices.gearbox import gearbox
from y.prices.lending import ib
from y.prices.lending.aave import aave
from y.prices.lending.compound import compound
from y.prices.stable_swap import (belt, ellipsis, froyo, mstablefeederpool,
                                  saddle)
from y.prices.stable_swap.curve import curve
from y.prices.synthetix import synthetix
from y.prices.tokenized_fund import basketdao, gelato, piedao, reserve, tokensets
from y.utils.logging import get_price_logger

logger = logging.getLogger(__name__)

[docs] @a_sync.a_sync(default='sync', cache_type='memory') async def check_bucket(token: AnyAddressType) -> str: token_address = convert.to_address(token) logger = get_price_logger(token_address, block=None, extra='buckets') import y._db.utils.token as db bucket = await db.get_bucket(token_address) if bucket: logger.debug('returning bucket %s from ydb', bucket) return bucket # these require neither calls to the chain nor contract initialization, just string comparisons (pretty sure) for bucket, check in string_matchers.items(): if check(token): logger.debug("%s is %s", token_address, bucket) db.set_bucket(token, bucket) return bucket else: logger.debug("%s is not %s", token_address, bucket) # check these first, these just require calls futs = [asyncio.ensure_future(_check_bucket_helper(bucket, check, token_address)) for bucket, check in calls_only.items()] for fut in asyncio.as_completed(futs): try: bucket, is_member = await fut except TypeError: raise except Exception as e: logger.warning("%s when checking %s. This will probably not impact your run.", e, fut) logger.warning(e, exc_info=True) continue if is_member: logger.debug("%s is %s", token_address, bucket) for fut in futs: fut.cancel() db.set_bucket(token, bucket) return bucket else: logger.debug("%s is not %s", token_address, bucket) bucket = None # TODO: Refactor the below like the above # these require both calls and contract initializations if await solidex.is_solidex_deposit(token_address, sync=False): bucket = 'solidex' if await uniswap_multiplexer.is_uniswap_pool(token_address, sync=False): bucket = 'uni or uni-like lp' elif gearbox and await gearbox.is_diesel_token(token_address, sync=False): bucket = 'gearbox' # this just requires contract initialization but should go behind uniswap elif await aave.is_wrapped_atoken_v2(token_address, sync=False): bucket = 'wrapped atoken v2' elif await aave.is_wrapped_atoken_v3(token_address, sync=False): bucket = 'wrapped atoken v3' elif await is_generic_amm(token_address): bucket = 'generic amm' elif await mooniswap.is_mooniswap_pool(token_address, sync=False): bucket = 'mooniswap lp' elif await compound.is_compound_market(token_address, sync=False): bucket = 'compound' elif await _chainlink_and_band(token_address): bucket = 'chainlink and band' elif chainlink and await chainlink.has_feed(token_address, sync=False): bucket = 'chainlink feed' elif synthetix and await synthetix.is_synth(token_address, sync=False): bucket = 'synthetix' elif await yearn.is_yearn_vault(token_address, sync=False): bucket = 'yearn or yearn-like' elif curve and await curve.get_pool(token_address, sync=False): bucket = 'curve lp' logger.debug("%s bucket is %s", token_address, bucket) if bucket: db.set_bucket(token, bucket) return bucket
# these require neither calls to the chain nor contract initialization, just string comparisons (pretty sure) string_matchers = { 'wrapped gas coin': lambda address: address == "0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE", 'stable usd': lambda address: address in STABLECOINS, 'one to one': one_to_one.is_one_to_one_token, 'wsteth': wsteth.is_wsteth, 'creth': creth.is_creth, 'belt lp': belt.is_belt_lp, 'froyo': froyo.is_froyo, 'convex': convex.is_convex_lp, 'rkp3r': rkp3r.is_rkp3r, } # these just require calls calls_only = { 'atoken': aave.is_atoken, 'balancer pool': balancer_multiplexer.is_balancer_pool, 'ib token': ib.is_ib_token, 'gelato': gelato.is_gelato_pool, 'pendle lp': pendle.is_pendle_lp, 'piedao lp': piedao.is_pie, 'token set': tokensets.is_token_set, 'ellipsis lp': ellipsis.is_eps_rewards_pool, 'mstable feeder pool': mstablefeederpool.is_mstable_feeder_pool, 'saddle': saddle.is_saddle_lp, 'basketdao': basketdao.is_basketdao_index, 'popsicle': popsicle.is_popsicle_lp, 'reserve': reserve.is_rtoken, } async def _chainlink_and_band(token_address) -> bool: """ We only really need band for a short period in the beginning of fantom's history, and then we will default to chainlink once available. """ return chainlink and await chainlink.has_feed(token_address, sync=False) and token_address in band async def _check_bucket_helper(bucket: str, check: Callable[[Address], Awaitable[bool]], address: Address) -> Tuple[str, bool]: result = await check(address, sync=False) # TODO: debug why we have to re-await sometimes when @optional_async_diskcache is used if not isinstance(result, bool): if not asyncio.iscoroutine(result): raise TypeError(f'{bucket} result must be boolean. You passed {result}') result = await result if not isinstance(result, bool): #if not asyncio.iscoroutine(result): raise TypeError(f'{bucket} result must be boolean. You passed {result}') #result = await result return bucket, result