import asyncio
import logging
from contextlib import suppress
from decimal import Decimal
from typing import Dict, List, Optional, Tuple
import a_sync
from a_sync.a_sync import HiddenMethodDescriptor
from brownie import chain
from brownie.convert.datatypes import EthAddress
from brownie.exceptions import VirtualMachineError
from typing_extensions import Self
from web3.exceptions import ContractLogicError
from y import ENVIRONMENT_VARIABLES as ENVS
from y._decorators import stuck_coro_debugger
from y.classes.common import ERC20
from y.constants import dai, usdc, wbtc, weth
from y.contracts import Contract, contract_creation_block_async
from y.datatypes import (
Address,
AddressOrContract,
AnyAddressType,
Block,
Pool,
UsdPrice,
UsdValue,
)
from y.exceptions import continue_if_call_reverted
from y.networks import Network
from y.prices import magic
from y.prices.dex.balancer._abc import BalancerABC, BalancerPool
from y.utils.cache import optional_async_diskcache
EXCHANGE_PROXY = {
Network.Mainnet: "0x3E66B66Fd1d0b02fDa6C811Da9E0547970DB2f21",
}.get(chain.id)
SCALES_TO_TRY = [1.0, 0.5, 0.1]
TOKENOUTS_TO_TRY = [weth, dai, usdc, wbtc]
logger = logging.getLogger(__name__)
async def _calc_out_value(
token_out: AddressOrContract,
total_outout: int,
scale: float,
block: int,
skip_cache: bool = ENVS.SKIP_CACHE,
) -> float:
"""Calculate the output value for a given token.
Args:
token_out: The output token address or contract.
total_outout: The total output amount.
scale: The scale factor.
block: The block number.
skip_cache: Whether to skip the cache.
Returns:
The calculated output value.
Examples:
>>> await _calc_out_value(weth, 1000, 1.0, 12345678)
0.5
See Also:
- :func:`y.prices.magic.get_price`
"""
out_scale, out_price = await asyncio.gather(
ERC20(token_out, asynchronous=True).scale,
magic.get_price(token_out, block, skip_cache=skip_cache, sync=False),
)
return (total_outout / out_scale) * float(out_price) / scale
[docs]
class BalancerV1Pool(BalancerPool):
"""A Balancer V1 Pool."""
[docs]
@a_sync.aka.cached_property
@stuck_coro_debugger
# @optional_async_diskcache
async def tokens(self) -> List[ERC20]:
"""Get the list of tokens in the pool.
Returns:
A list of :class:`~y.classes.common.ERC20` tokens in the pool.
Examples:
>>> pool = BalancerV1Pool("0x1234567890abcdef1234567890abcdef12345678")
>>> await pool.tokens
[<ERC20 TKN '0x1234567890abcdef1234567890abcdef12345678'>, ...]
See Also:
- :class:`~y.classes.common.ERC20`
"""
contract = await Contract.coroutine(self.address)
return [
ERC20(token, asynchronous=self.asynchronous)
for token in await contract.getFinalTokens
]
__tokens__: HiddenMethodDescriptor[Self, List[ERC20]]
[docs]
@stuck_coro_debugger
async def get_tvl(
self, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE
) -> Optional[UsdValue]:
"""Get the total value locked (TVL) in the pool.
Args:
block: The block number to query.
skip_cache: Whether to skip the cache.
Returns:
The total value locked in the pool, or None if it cannot be determined.
Examples:
>>> pool = BalancerV1Pool("0x1234567890abcdef1234567890abcdef12345678")
>>> await pool.get_tvl()
123456.78
See Also:
- :class:`~y.datatypes.UsdValue`
"""
token_balances = await self.get_balances(block=block, sync=False)
good_balances = {
token: balance
for token, balance in token_balances.items()
if await token.price(
block=block,
return_None_on_failure=True,
skip_cache=skip_cache,
sync=False,
)
is not None
}
if not good_balances:
return None
prices = await ERC20.price.map(
good_balances,
block=block,
return_None_on_failure=True,
skip_cache=skip_cache,
).values()
# in case we couldn't get prices for all tokens, we can extrapolate from the prices we did get
good_value = sum(
balance * Decimal(price)
for balance, price in zip(good_balances.values(), prices)
)
return good_value / len(good_balances) * len(token_balances)
[docs]
@stuck_coro_debugger
async def get_balances(self, block: Optional[Block] = None) -> Dict[ERC20, Decimal]:
"""Get the balances of tokens in the pool.
Args:
block: The block number to query.
Returns:
A dictionary mapping :class:`~y.classes.common.ERC20` tokens to their balances.
Examples:
>>> pool = BalancerV1Pool("0x1234567890abcdef1234567890abcdef12345678")
>>> await pool.get_balances()
{<ERC20 TKN '0x1234567890abcdef1234567890abcdef12345678'>: Decimal('1000'), ...}
See Also:
- :class:`~y.classes.common.ERC20`
"""
return await a_sync.map(
self.get_balance, self.__tokens__, block=block or "latest"
)
[docs]
@stuck_coro_debugger
async def get_balance(self, token: AnyAddressType, block: Block) -> Decimal:
"""Get the balance of a specific token in the pool.
Args:
token: The token address.
block: The block number to query.
Returns:
The balance of the token in the pool.
Examples:
>>> pool = BalancerV1Pool("0x1234567890abcdef1234567890abcdef12345678")
>>> await pool.get_balance("0xabcdefabcdefabcdefabcdefabcdefabcdef")
Decimal('1000')
"""
balance, scale = await asyncio.gather(
self.check_liquidity(str(token), block, sync=False),
ERC20(token, asynchronous=True).scale,
)
return Decimal(balance) / scale
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=10_000, ram_cache_ttl=10 * 60)
async def check_liquidity(self, token: Address, block: Block) -> int:
"""Check the liquidity of a specific token in the pool.
Args:
token: The token address.
block: The block number to query.
Returns:
The liquidity of the token in the pool.
Examples:
>>> pool = BalancerV1Pool("0x1234567890abcdef1234567890abcdef12345678")
>>> await pool.check_liquidity("0xabcdefabcdefabcdefabcdefabcdefabcdef")
1000
"""
if block < await self.deploy_block(sync=False):
return 0
contract = await Contract.coroutine(self.address)
try:
return await contract.getBalance.coroutine(token, block_identifier=block)
except Exception as e:
# the pool was not yet finalized at this block
# NOTE: does this happen for any pool except YLA? tbd...
if "NOT_BOUND" in str(e):
return 0
elif e.args and str(e.args[0]) == "execution reverted":
# we only want to continue on exact match of the original (no extra context added) exception
return 0
raise
[docs]
class BalancerV1(BalancerABC[BalancerV1Pool]):
"""A Balancer V1 instance."""
_pool_type = BalancerV1Pool
_check_methods = (
"getCurrentTokens()(address[])",
"getTotalDenormalizedWeight()(uint)",
"totalSupply()(uint)",
)
[docs]
def __init__(self, *, asynchronous: bool = False) -> None:
"""Initialize a BalancerV1 instance.
Args:
asynchronous: Whether to use asynchronous operations.
Examples:
>>> balancer = BalancerV1(asynchronous=True)
"""
super().__init__()
self.asynchronous = asynchronous
self.exchange_proxy = Contract(EXCHANGE_PROXY) if EXCHANGE_PROXY else None
[docs]
@stuck_coro_debugger
async def get_token_price(
self,
token_address: AddressOrContract,
block: Optional[Block] = None,
skip_cache: bool = ENVS.SKIP_CACHE,
) -> Optional[UsdPrice]:
"""Get the price of a token in the pool.
Args:
token_address: The token address or contract.
block: The block number to query.
skip_cache: Whether to skip the cache.
Returns:
The price of the token in USD, or None if it cannot be determined.
Examples:
>>> balancer = BalancerV1(asynchronous=True)
>>> await balancer.get_token_price("0xabcdefabcdefabcdefabcdefabcdefabcdef")
1.23
See Also:
- :class:`~y.datatypes.UsdPrice`
"""
if block is not None and block < await contract_creation_block_async(
self.exchange_proxy, True
):
return None
for scale in SCALES_TO_TRY:
# Can we get an output if we try smaller size? try consecutively smaller
if output := await self.get_some_output(
token_address, block=block, scale=scale, sync=False
):
return await _calc_out_value(
*output, scale, block=block, skip_cache=skip_cache
)
[docs]
@stuck_coro_debugger
async def check_liquidity_against(
self,
token_in: AddressOrContract,
token_out: AddressOrContract,
scale: int = 1,
block: Optional[Block] = None,
) -> Optional[int]:
"""Check the liquidity of a token against another token in the pool.
Args:
token_in: The input token address or contract.
token_out: The output token address or contract.
scale: The scale factor.
block: The block number to query.
Returns:
The total output amount, or None if it cannot be determined.
Examples:
>>> balancer = BalancerV1(asynchronous=True)
>>> await balancer.check_liquidity_against("0xabcdefabcdefabcdefabcdefabcdefabcdef", "0x1234567890abcdef1234567890abcdef12345678")
1000
"""
amount_in = await ERC20(token_in, asynchronous=True).scale * scale
with suppress(ValueError, VirtualMachineError, ContractLogicError):
# across various dep versions we get these various excs
view_split_exact_in = await self.exchange_proxy.viewSplitExactIn.coroutine(
token_in,
token_out,
amount_in,
32, # NOTE: 32 is max
block_identifier=block,
)
return view_split_exact_in["totalOutput"]
[docs]
@stuck_coro_debugger
async def get_some_output(
self, token_in: AddressOrContract, scale: int = 1, block: Optional[Block] = None
) -> Optional[Tuple[EthAddress, int]]:
"""Get some output for a given input token.
Args:
token_in: The input token address or contract.
scale: The scale factor.
block: The block number to query.
Returns:
A tuple containing the output token address and the total output amount, or None if it cannot be determined.
Examples:
>>> balancer = BalancerV1(asynchronous=True)
>>> await balancer.get_some_output("0xabcdefabcdefabcdefabcdefabcdefabcdef")
('0x1234567890abcdef1234567890abcdef12345678', 1000)
"""
for token_out in TOKENOUTS_TO_TRY:
if output := await self.check_liquidity_against(
token_in, token_out, block=block, scale=scale, sync=False
):
return token_out, output
[docs]
@stuck_coro_debugger
async def check_liquidity(
self, token: Address, block: Block, ignore_pools: Tuple[Pool, ...] = ()
) -> int:
"""Check the liquidity of a token in the pool.
Args:
token: The token address.
block: The block number to query.
ignore_pools: A tuple of pools to ignore.
Returns:
The liquidity of the token in the pool.
Examples:
>>> balancer = BalancerV1(asynchronous=True)
>>> await balancer.check_liquidity("0xabcdefabcdefabcdefabcdefabcdefabcdef", 12345678)
1000
"""
pools = []
pools = [pool for pool in pools if pool not in ignore_pools]
return (
await BalancerV1Pool.check_liquidity.max(
pools, token=token, block=block, sync=False
)
if pools
else 0
)