Source code for y.prices.dex.uniswap.v1


import asyncio
import logging
from typing import Optional, Tuple

import a_sync
from brownie import ZERO_ADDRESS, chain

from y import ENVIRONMENT_VARIABLES as ENVS
from y._decorators import stuck_coro_debugger
from y.classes.common import ERC20
from y.constants import usdc
from y.contracts import Contract, contract_creation_block_async
from y.datatypes import Address, Block, Pool, UsdPrice
from y.exceptions import UnsupportedNetwork, continue_if_call_reverted
from y.networks import Network
from y.utils.raw_calls import _decimals

logger = logging.getLogger(__name__)

[docs] class UniswapV1(a_sync.ASyncGenericBase): factory = "0xc0a47dFe034B400B47bDaD5FecDa2621de6c4d95"
[docs] def __init__(self, asynchronous: bool = False) -> None: if chain.id != Network.Mainnet: raise UnsupportedNetwork(f"UniswapV1 does not suppport chainid {chain.id}") self.asynchronous = asynchronous
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=None) async def get_exchange(self, token_address: Address) -> Optional[Contract]: factory = await Contract.coroutine(self.factory) exchange = await factory.getExchange.coroutine(token_address) if exchange != ZERO_ADDRESS: return await Contract.coroutine(exchange)
[docs] @stuck_coro_debugger async def get_price( self, token_address: Address, block: Optional[Block], ignore_pools: Tuple[Pool, ...] = (), # unused skip_cache: bool = ENVS.SKIP_CACHE, # unused ) -> Optional[UsdPrice]: exchange, usdc_exchange, decimals = await asyncio.gather( self.get_exchange(token_address, sync=False), self.get_exchange(usdc, sync=False), _decimals(token_address, block, sync=False), ) if exchange is None: return None try: eth_bought = await exchange.getTokenToEthInputPrice.coroutine(10 ** decimals, block_identifier=block) usdc_bought = await usdc_exchange.getEthToTokenInputPrice.coroutine(eth_bought, block_identifier=block) / 1e6 fees = 0.997 ** 2 return UsdPrice(usdc_bought / fees) except ValueError as e: if 'invalid jump destination' in str(e): return None continue_if_call_reverted(e)
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60*60) async def check_liquidity(self, token_address: Address, block: Block, ignore_pools: Tuple[Pool, ...] = ()) -> int: logger.debug("checking %s liquidity for %s at %s ignoring %s", self, token_address, block, ignore_pools) exchange = await self.get_exchange(token_address, sync=False) if exchange is None or exchange in ignore_pools: logger.debug("no %s exchange found for %s", self, token_address) return 0 if block < await contract_creation_block_async(exchange): logger.debug("block %s prior to %s deploy block", block, exchange) return 0 liquidity = await ERC20(token_address, asynchronous=True).balance_of(exchange, block) logger.debug("%s liquidity for %s at %s is %s", self, token_address, block, liquidity) return liquidity