Source code for y.prices.dex.balancer.balancer

import logging
from typing import List, Optional, Union

import a_sync
from a_sync.a_sync.property import HiddenMethodDescriptor
from typing_extensions import Self

from y import ENVIRONMENT_VARIABLES as ENVS
from y import exceptions
from y._decorators import stuck_coro_debugger
from y.constants import CHAINID
from y.datatypes import AnyAddressType, Block, UsdPrice
from y.networks import Network
from y.prices.dex.balancer._abc import BalancerABC
from y.prices.dex.balancer.v1 import BalancerV1
from y.prices.dex.balancer.v2 import BalancerV2
from y.utils.cache import optional_async_diskcache


logger = logging.getLogger(__name__)


[docs] class BalancerMultiplexer(a_sync.ASyncGenericBase): """A multiplexer for interacting with different versions of Balancer pools. This class provides methods to determine if a token is a Balancer pool, retrieve pool prices, and get token prices across different Balancer versions. Examples: Initialize the multiplexer: >>> multiplexer = BalancerMultiplexer(asynchronous=True) Check if a token is a Balancer pool: >>> is_pool = await multiplexer.is_balancer_pool(token_address) Get the price of a token: >>> price = await multiplexer.get_price(token_address, block=12345678) See Also: - :class:`BalancerV1` - :class:`BalancerV2` """
[docs] def __init__(self, *, asynchronous: bool = False) -> None: """ Initialize the BalancerMultiplexer. Args: asynchronous: Whether to operate in asynchronous mode. """ super().__init__() self.asynchronous = asynchronous
[docs] @a_sync.aka.property async def versions(self) -> List[Union[BalancerV1, BalancerV2]]: """ Get the available Balancer versions. Returns: A list of available Balancer versions. Examples: >>> versions = await multiplexer.versions """ return [ v async for v in a_sync.as_completed([self.__v1__, self.__v2__], aiter=True) if v ]
__versions__: HiddenMethodDescriptor[Self, List[Union[BalancerV1, BalancerV2]]]
[docs] @a_sync.aka.cached_property async def v1(self) -> Optional[BalancerV1]: """ Get the Balancer V1 instance. Returns: An instance of BalancerV1 if available, otherwise None. Examples: >>> v1 = await multiplexer.v1 """ try: return BalancerV1(asynchronous=self.asynchronous) except ImportError: return None
__v1__: HiddenMethodDescriptor[Self, Optional[BalancerV1]]
[docs] @a_sync.aka.cached_property async def v2(self) -> Optional[BalancerV2]: """ Get the Balancer V2 instance. Returns: An instance of BalancerV2 if available, otherwise None. Examples: >>> v2 = await multiplexer.v2 """ try: return BalancerV2(asynchronous=self.asynchronous) except ImportError: return None
__v2__: HiddenMethodDescriptor[Self, Optional[BalancerV2]]
[docs] @stuck_coro_debugger @optional_async_diskcache async def is_balancer_pool(self, token_address: AnyAddressType) -> bool: """ Check if a given token address is a Balancer pool. Args: token_address: The address of the token to check. Returns: True if the token is a Balancer pool, otherwise False. Examples: >>> is_pool = await multiplexer.is_balancer_pool(token_address) """ try: await self.get_version(token_address) return True except exceptions.TokenError: return False
[docs] @stuck_coro_debugger async def get_pool_price( self, token_address: AnyAddressType, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE, ) -> Optional[UsdPrice]: """ Get the price of a Balancer pool. Args: token_address: The address of the pool token. block: The block number to query the price at. skip_cache: Whether to skip the cache. Returns: The price of the pool in USD, or None if not available. Examples: >>> price = await multiplexer.get_pool_price(token_address, block=12345678) """ balancer: BalancerABC = await self.get_version(token_address) logger.debug("pool %s is from %s", token_address, balancer) price = await balancer.get_pool_price( token_address, block, skip_cache=skip_cache, sync=False ) return None if price is None else UsdPrice(price)
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_ttl=ENVS.CACHE_TTL) async def get_price( self, token_address: AnyAddressType, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE, ) -> Optional[UsdPrice]: """ Get the price of a token using Balancer pools. Args: token_address: The address of the token. block: The block number to query the price at. skip_cache: Whether to skip the cache. Returns: The price of the token in USD, or None if not available. Examples: >>> price = await multiplexer.get_price(token_address, block=12345678) """ if await self.is_balancer_pool(token_address, sync=False): return await self.get_pool_price( token_address, block=block, skip_cache=skip_cache, sync=False ) price = None if ( # NOTE: Only query v2 if block queried > v2 deploy block plus some extra blocks to build up liquidity CHAINID == Network.Mainnet and (not block or block > 12272146 + 100000) ) or ( CHAINID == Network.Fantom and (not block or block > 16896080) ): # TODO: refactor this out v2 = await self.__v2__ if price := await v2.get_token_price( token_address, block, skip_cache=skip_cache, sync=False ): logger.debug("balancer v2 -> $%s", price) return price if not price and CHAINID == Network.Mainnet: v1 = await self.__v1__ if price := await v1.get_token_price( token_address, block, skip_cache=skip_cache, sync=False ): logger.debug("balancer v1 -> $%s", price) return price
# cached forever because not many items
[docs] @a_sync.a_sync(cache_type="memory", ram_cache_ttl=None) async def get_version(self, token_address: AnyAddressType) -> BalancerABC: """ Determine the Balancer version for a given token address. Args: token_address: The address of the token. Returns: The Balancer version instance. Raises: exceptions.TokenError: If the token is not a Balancer pool. Examples: >>> version = await multiplexer.get_version(token_address) """ for v in await self.__versions__: if await v.is_pool(token_address, sync=False): return v raise exceptions.TokenError(token_address, "Balancer pool")
balancer_multiplexer = BalancerMultiplexer(asynchronous=True)