Source code for y.prices.dex.uniswap.v3

import asyncio
import logging
import math
from collections import defaultdict
from functools import cached_property, lru_cache
from itertools import cycle
from typing import AsyncIterator, DefaultDict, List, Optional, Tuple

import a_sync
import eth_retry
from a_sync.a_sync import HiddenMethodDescriptor
from brownie import chain
from brownie.network.event import _EventItem
from typing_extensions import Self

from y import ENVIRONMENT_VARIABLES as ENVS
from y._decorators import stuck_coro_debugger
from y.classes.common import ERC20, ContractBase
from y.constants import usdc, weth
from y.contracts import Contract, contract_creation_block_async
from y.datatypes import Address, AnyAddressType, Block, Pool, UsdPrice
from y.exceptions import ContractNotVerified, TokenNotFound, UnsupportedNetwork, call_reverted
from y.interfaces.uniswap.quoterv3 import UNIV3_QUOTER_ABI
from y.networks import Network
from y.utils.events import ProcessedEvents

try:
    from eth_abi.packed import encode_packed
except ImportError:
    from eth_abi.packed import encode_abi_packed as encode_packed

# https://github.com/Uniswap/uniswap-v3-periphery/blob/main/deploys.md
UNISWAP_V3_FACTORY = '0x1F98431c8aD98523631AE4a59f267346ea31F984'
UNISWAP_V3_QUOTER = '0xb27308f9F90D607463bb33eA1BeBb41C27CE5AB6'

logger = logging.getLogger(__name__)

# same addresses on all networks
addresses = {
    Network.Mainnet: {
        'factory': UNISWAP_V3_FACTORY,
        'quoter': UNISWAP_V3_QUOTER,
        'fee_tiers': [3000, 500, 10_000, 100],
    },
    Network.Arbitrum: {
        'factory': UNISWAP_V3_FACTORY,
        'quoter': UNISWAP_V3_QUOTER,
        'fee_tiers': [3000, 500, 10_000],
    },
    Network.Optimism: {
        'factory': UNISWAP_V3_FACTORY,
        'quoter': UNISWAP_V3_QUOTER,
        'fee_tiers': [3000, 500, 10_000, 100],
    },
    Network.Base: {
        'factory': '0x33128a8fC17869897dcE68Ed026d694621f6FDfD',
        'quoter': '0x3d4e44Eb1374240CE5F1B871ab261CD16335B76a', # quoter v2
        'fee_tiers': [3000, 500, 10_000, 100],
    }
}

FEE_DENOMINATOR = 1_000_000


[docs] class UniswapV3Pool(ContractBase): __slots__ = 'fee', 'token0', 'token1', 'tick_spacing'
[docs] def __init__( self, address: Address, token0: Address, token1: Address, tick_spacing: int, fee: int, deploy_block: int, asynchronous: bool = False ) -> None: super().__init__(address, asynchronous=asynchronous) self.token0 = ERC20(token0, asynchronous=asynchronous) self.token1 = ERC20(token1, asynchronous=asynchronous) self.tick_spacing = tick_spacing self.fee = fee self._deploy_block = deploy_block
[docs] def __contains__(self, token: Address) -> bool: return token in [self.token0, self.token1]
[docs] def __getitem__(self, token: Address) -> ERC20: if token not in self: raise TokenNotFound(token, self) return ERC20(token, asynchronous=self.asynchronous)
[docs] @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60*60, semaphore=10000) # lets try a semaphore here async def check_liquidity(self, token: AnyAddressType, block: Block) -> Optional[int]: logger.debug("checking %s liquidity for %s at %s", self, token, block) if block < await self.deploy_block(sync=False): logger.debug("block %s prior to %s deploy block", block, self) return 0 try: liquidity = await self[token].balance_of(self.address, block, sync=False) except ContractNotVerified: logger.debug("%s is not verified and we cannot fetch balance the usual way. returning 0.", token) return 0 logger.debug("%s liquidity for %s at %s: %s", self, token, block, liquidity) return liquidity
[docs] @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60*60) async def _check_liquidity_token_out(self, token_in: AnyAddressType, block: Block) -> Optional[int]: return await self.check_liquidity(self._get_token_out(token_in), block=block, sync=False)
@lru_cache def _get_token_out(self, token_in: ERC20) -> ERC20: if token_in == self.token0: return self.token1 elif token_in == self.token1: return self.token0 raise TokenNotFound(token_in, self)
[docs] class UniswapV3(a_sync.ASyncGenericSingleton):
[docs] def __init__(self, asynchronous: bool = True) -> None: self.asynchronous = asynchronous if chain.id not in addresses: raise UnsupportedNetwork('compound is not supported on this network') self.fee_tiers = addresses[chain.id]['fee_tiers'] self.loading = False self._pools = {}
[docs] def __contains__(self, asset) -> bool: return chain.id in addresses
@cached_property def loaded(self) -> a_sync.Event: return a_sync.Event(name=self)
[docs] @a_sync.aka.property async def factory(self) -> Contract: return await Contract.coroutine(addresses[chain.id]['factory'])
__factory__: HiddenMethodDescriptor[Self, Contract]
[docs] @a_sync.aka.cached_property async def quoter(self) -> Contract: quoter = addresses[chain.id]['quoter'] try: return await Contract.coroutine(quoter) except ContractNotVerified: return Contract.from_abi("Quoter", quoter, UNIV3_QUOTER_ABI)
__quoter__: HiddenMethodDescriptor[Self, Contract]
[docs] @a_sync.aka.cached_property @stuck_coro_debugger async def pools(self) -> List[UniswapV3Pool]: factory = await self.__factory__ return UniV3Pools(factory, asynchronous=self.asynchronous)
__pools__: HiddenMethodDescriptor[Self, "UniV3Pools"]
[docs] async def pools_for_token(self, token: Address, block: Block) -> AsyncIterator[UniswapV3Pool]: pools = await self.__pools__ async for pool in pools.objects(to_block=block): if token in pool: yield pool
[docs] @stuck_coro_debugger @a_sync.a_sync(cache_type='memory', ram_cache_ttl=ENVS.CACHE_TTL) async def get_price( self, token: Address, block: Optional[Block] = None, ignore_pools: Tuple[Pool, ...] = (), # unused skip_cache: bool = ENVS.SKIP_CACHE, # unused ) -> Optional[UsdPrice]: quoter = await self.__quoter__ if block and block < await contract_creation_block_async(quoter, True): return None paths = [[token, fee, usdc.address] for fee in self.fee_tiers] if token != weth: paths += [ [token, fee, weth.address, self.fee_tiers[0], usdc.address] for fee in self.fee_tiers ] logger.debug("paths: %s", paths) amount_in = await ERC20(token, asynchronous=True).scale results = await asyncio.gather(*(self._quote_exact_input(path, amount_in, block) for path in paths), return_exceptions=True) for result in results: if isinstance(result, Exception) and not call_reverted(result): raise result.with_traceback(result.__traceback__) logger.debug("results: %s", results) outputs = [ # Quoter v2 uses this weird return struct, we must unpack it to get amount out. (amount if isinstance(amount, int) else amount[0]) / _undo_fees(path) / 1e6 for amount, path in zip(results, paths) if amount and not call_reverted(amount) ] logger.debug("outputs: %s", outputs) return UsdPrice(max(outputs)) if outputs else None
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60*60) async def check_liquidity(self, token: Address, block: Block, ignore_pools: Tuple[Pool, ...] = ()) -> int: logger.debug("checking %s liquidity for %s at %s", self, token, block) if chain.id == Network.Mainnet and token == "0x6DEA81C8171D0bA574754EF6F8b412F2Ed88c54D": # LQTY, TODO refactor this somehow return 0 quoter = await self.__quoter__ if block and block < await contract_creation_block_async(quoter): logger.debug("block %s is before %s deploy block", block, quoter) return 0 if token == weth.address: # NOTE: we need to filter these or else we will be fetching every pool # for now, we only focus on weth/usdc pools filter_fn = lambda pool: pool._get_token_out(token) == usdc.address and pool not in ignore_pools else: filter_fn = lambda pool: pool not in ignore_pools token_out_tasks = UniswapV3Pool._check_liquidity_token_out.map(token_in=token, block=block) pools_for_token: a_sync.ASyncIterator[UniswapV3Pool] = self.pools_for_token(token, block) async for pool in pools_for_token.filter(filter_fn): # the mapping will start the tasks internally logger.debug("starting token_out_task for %s", pool) token_out_tasks[pool] if not token_out_tasks: return 0 logger.debug("%s token_out_tasks for %s at %s: %s", self, token, block, token_out_tasks) # Since uni v3 liquidity can be provided asymmetrically, the most liquid pool in terms of `token` might not actually be the most liquid pool in terms of `token_out` # We need some spaghetticode here to account for these erroneous liquidity values # TODO: Refactor this token_out_liquidity: DefaultDict[ERC20, List[int]] = defaultdict(list) async for pool, liquidity in token_out_tasks.map(pop=False): logger.debug("%s liquidity for %s at %s: %s", pool, token, block, liquidity) token_out_liquidity[pool._get_token_out(token)].append(liquidity) logger.debug("%s token_out_liquidity: %s", token, token_out_liquidity) token_out_min_liquidity = {token_out: min(liquidities) for token_out, liquidities in token_out_liquidity.items()} token_in_tasks = UniswapV3Pool.check_liquidity.map(token=token, block=block) async for pool, liquidity in token_out_tasks.map(pop=True): token_out = pool._get_token_out(token) if len(token_out_liquidity[token_out]) > 1 and liquidity == token_out_min_liquidity[token_out]: logger.debug("ignoring liquidity for %s", pool) elif token_out == weth and liquidity < 10 ** 19: # 10 ETH # NOTE: this is totally arbitrary, works for all known cases but eventually will probably cause issues logger.debug("insufficient liquidity for %s", pool) else: token_in_tasks[pool] liquidity = await token_in_tasks.max(pop=True, sync=False) if token_in_tasks else 0 logger.debug("%s liquidity for %s at %s is %s", self, token, block, liquidity) return liquidity
@stuck_coro_debugger @eth_retry.auto_retry async def _quote_exact_input(self, path: List[list], amount_in: int, block: int) -> int: quoter = await self.__quoter__ return await quoter.quoteExactInput.coroutine(_encode_path(path), amount_in, block_identifier=block)
def _encode_path(path) -> bytes: types = [type for _, type in zip(path, cycle(['address', 'uint24']))] return encode_packed(types, path) def _undo_fees(path) -> float: fees = [1 - fee / FEE_DENOMINATOR for fee in path if isinstance(fee, int)] return math.prod(fees)
[docs] class UniV3Pools(ProcessedEvents[UniswapV3Pool]): __slots__ = "asynchronous",
[docs] def __init__(self, factory: Contract, asynchronous: bool = False): self.asynchronous = asynchronous super().__init__(addresses=[factory.address], topics=[factory.topics["PoolCreated"]])
def _process_event(self, event: _EventItem) -> UniswapV3Pool: token0, token1, fee, tick_spacing, pool = event.values() return UniswapV3Pool(pool, token0, token1, fee, tick_spacing, event.block_number, asynchronous=self.asynchronous) def _get_block_for_obj(self, obj: UniswapV3Pool) -> int: return obj._deploy_block
try: uniswap_v3 = UniswapV3(asynchronous=True) except UnsupportedNetwork: uniswap_v3 = None