Source code for y.prices.dex.balancer.balancer


import logging
from typing import List, Optional, Union

import a_sync
from a_sync.a_sync.property import HiddenMethodDescriptor
from brownie import chain
from typing_extensions import Self

from y import ENVIRONMENT_VARIABLES as ENVS
from y import exceptions
from y._decorators import stuck_coro_debugger
from y.datatypes import AnyAddressType, Block, UsdPrice
from y.networks import Network
from y.prices.dex.balancer._abc import BalancerABC
from y.prices.dex.balancer.v1 import BalancerV1
from y.prices.dex.balancer.v2 import BalancerV2
from y.utils.cache import optional_async_diskcache


logger = logging.getLogger(__name__)

[docs] class BalancerMultiplexer(a_sync.ASyncGenericBase):
[docs] def __init__(self, asynchronous: bool = False) -> None: self.asynchronous = asynchronous
[docs] @a_sync.aka.property async def versions(self) -> List[Union[BalancerV1, BalancerV2]]: return [v async for v in a_sync.as_completed([self.__v1__, self.__v2__], aiter=True) if v]
__versions__: HiddenMethodDescriptor[Self, List[Union[BalancerV1, BalancerV2]]]
[docs] @a_sync.aka.cached_property async def v1(self) -> Optional[BalancerV1]: try: return BalancerV1(asynchronous=self.asynchronous) except ImportError: return None
__v1__: HiddenMethodDescriptor[Self, Optional[BalancerV1]]
[docs] @a_sync.aka.cached_property async def v2(self) -> Optional[BalancerV2]: try: return BalancerV2(asynchronous=self.asynchronous) except ImportError: return None
__v2__: HiddenMethodDescriptor[Self, Optional[BalancerV2]]
[docs] @stuck_coro_debugger @optional_async_diskcache async def is_balancer_pool(self, token_address: AnyAddressType) -> bool: try: await self.get_version(token_address) return True except exceptions.TokenError: return False
[docs] @stuck_coro_debugger async def get_pool_price(self, token_address: AnyAddressType, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE) -> Optional[UsdPrice]: balancer: BalancerABC = await self.get_version(token_address) logger.debug("pool %s is from %s", token_address, balancer) return UsdPrice(await balancer.get_pool_price(token_address, block, skip_cache=skip_cache, sync=False))
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_ttl=ENVS.CACHE_TTL) async def get_price(self, token_address: AnyAddressType, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE) -> Optional[UsdPrice]: if await self.is_balancer_pool(token_address, sync=False): return await self.get_pool_price(token_address, block=block, skip_cache=skip_cache, sync=False) price = None if ( # NOTE: Only query v2 if block queried > v2 deploy block plus some extra blocks to build up liquidity (chain.id == Network.Mainnet and (not block or block > 12272146 + 100000)) or (chain.id == Network.Fantom and (not block or block > 16896080)) ): # TODO: refactor this out v2 = await self.__v2__ if price := await v2.get_token_price(token_address, block, skip_cache=skip_cache, sync=False): logger.debug("balancer v2 -> $%s", price) return price if not price and chain.id == Network.Mainnet: v1 = await self.__v1__ if price := await v1.get_token_price(token_address, block, skip_cache=skip_cache, sync=False): logger.debug("balancer v1 -> $%s", price) return price
# cached forever because not many items
[docs] @a_sync.a_sync(cache_type="memory", ram_cache_ttl=None) async def get_version(self, token_address: AnyAddressType) -> BalancerABC: for v in await self.__versions__: if await v.is_pool(token_address, sync=False): return v raise exceptions.TokenError(token_address, "Balancer pool")
balancer_multiplexer = BalancerMultiplexer(asynchronous=True)