Source code for y.prices.dex.uniswap.v1

import asyncio
import logging
from contextlib import suppress
from typing import Optional, Tuple

import a_sync
from brownie import ZERO_ADDRESS, chain

from y import ENVIRONMENT_VARIABLES as ENVS
from y._decorators import stuck_coro_debugger
from y.classes.common import ERC20
from y.constants import usdc
from y.contracts import Contract, contract_creation_block_async
from y.datatypes import Address, Block, Pool, UsdPrice
from y.exceptions import (
    ContractNotVerified,
    UnsupportedNetwork,
    continue_if_call_reverted,
)
from y.networks import Network
from y.utils.raw_calls import _decimals

logger = logging.getLogger(__name__)


[docs] class UniswapV1(a_sync.ASyncGenericBase): factory = "0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95"
[docs] def __init__(self, *, asynchronous: bool = False) -> None: """ Initialize the UniswapV1 class. Args: asynchronous: If True, the class will operate in asynchronous mode. Raises: UnsupportedNetwork: If the current network is not Ethereum Mainnet. Examples: >>> uniswap_v1 = UniswapV1() >>> uniswap_v1.factory '0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95' """ if chain.id != Network.Mainnet: raise UnsupportedNetwork(f"UniswapV1 does not suppport chainid {chain.id}") super().__init__() self.asynchronous = asynchronous
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=None) async def get_exchange(self, token_address: Address) -> Optional[Contract]: """ Get the exchange contract for a given token address. Args: token_address: The address of the token for which to find the exchange. Returns: The exchange contract if it exists, otherwise None. Examples: >>> uniswap_v1 = UniswapV1() >>> exchange = await uniswap_v1.get_exchange("0xTokenAddress") >>> print(exchange) <Contract '0xExchangeAddress'> See Also: - :class:`~y.contracts.Contract` """ factory = await Contract.coroutine(self.factory) exchange = await factory.getExchange.coroutine(token_address) if exchange != ZERO_ADDRESS: with suppress(ContractNotVerified): return await Contract.coroutine(exchange)
[docs] @stuck_coro_debugger async def get_price( self, token_address: Address, block: Optional[Block], ignore_pools: Tuple[Pool, ...] = (), # unused skip_cache: bool = ENVS.SKIP_CACHE, # unused ) -> Optional[UsdPrice]: """ Get the price of a token in USD. Args: token_address: The address of the token to get the price for. block: The block number at which to get the price. ignore_pools: Unused parameter. skip_cache: Unused parameter. Returns: The price of the token in USD, or None if the price cannot be determined. Examples: >>> uniswap_v1 = UniswapV1() >>> price = await uniswap_v1.get_price("0xTokenAddress", 12345678) >>> print(price) 1.23 See Also: - :class:`~y.datatypes.UsdPrice` """ exchange, usdc_exchange, decimals = await asyncio.gather( self.get_exchange(token_address, sync=False), self.get_exchange(usdc, sync=False), _decimals(token_address, block, sync=False), ) if exchange is None: return None try: eth_bought = await exchange.getTokenToEthInputPrice.coroutine( 10**decimals, block_identifier=block ) usdc_bought = ( await usdc_exchange.getEthToTokenInputPrice.coroutine( eth_bought, block_identifier=block ) / 1e6 ) fees = 0.997**2 return UsdPrice(usdc_bought / fees) except ValueError as e: if "invalid jump destination" in str(e): return None continue_if_call_reverted(e)
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60 * 60) async def check_liquidity( self, token_address: Address, block: Block, ignore_pools: Tuple[Pool, ...] = () ) -> int: """ Check the liquidity of a token in its exchange. Args: token_address: The address of the token to check liquidity for. block: The block number at which to check liquidity. ignore_pools: A tuple of pools to ignore when checking liquidity. Returns: The liquidity of the token in its exchange. Examples: >>> uniswap_v1 = UniswapV1() >>> liquidity = await uniswap_v1.check_liquidity("0xTokenAddress", 12345678) >>> print(liquidity) 1000000 See Also: - :class:`~y.classes.common.ERC20` """ logger.debug( "checking %s liquidity for %s at %s ignoring %s", self, token_address, block, ignore_pools, ) exchange = await self.get_exchange(token_address, sync=False) if exchange is None or exchange in ignore_pools: logger.debug("no %s exchange found for %s", self, token_address) return 0 if block < await contract_creation_block_async(exchange): logger.debug("block %s prior to %s deploy block", block, exchange) return 0 liquidity = await ERC20(token_address, asynchronous=True).balance_of( exchange, block ) logger.debug( "%s liquidity for %s at %s is %s", self, token_address, block, liquidity ) return liquidity