Source code for y.prices.dex.solidly


import asyncio
from typing import List, Optional, Tuple

from y._decorators import continue_on_revert, stuck_coro_debugger
from y.datatypes import Address, Block
from y.exceptions import call_reverted
from y.prices.dex.uniswap.v2 import Path, UniswapRouterV2, UniswapV2Pool
from y.utils.cache import a_sync_ttl_cache


[docs] class SolidlyRouterBase(UniswapRouterV2): """ Solidly is a modified fork of Uni V2. The `uniswap_multiplexer` is the entrypoint for pricing using this object. """
[docs] @continue_on_revert @stuck_coro_debugger async def get_quote(self, amount_in: int, path: Path, block: Optional[Block] = None) -> Tuple[int,int]: routes = await self.get_routes_from_path(path, block) try: return await self.contract.getAmountsOut.coroutine(amount_in, routes, block_identifier=block) except Exception as e: strings = [ "INSUFFICIENT_INPUT_AMOUNT", "INSUFFICIENT_LIQUIDITY", "INSUFFICIENT_OUT_LIQUIDITY", "Sequence has incorrect length", ] if not call_reverted(e) and all(s not in str(e) for s in strings): raise
[docs] class SolidlyPool(UniswapV2Pool): pass
[docs] class SolidlyRouter(SolidlyRouterBase):
[docs] @stuck_coro_debugger @a_sync_ttl_cache async def pair_for(self, input_token: Address, output_token: Address, stable: bool) -> Address: return await self.contract.pairFor.coroutine(input_token, output_token, stable)
[docs] @stuck_coro_debugger @a_sync_ttl_cache async def get_pool(self, input_token: Address, output_token: Address, stable: bool, block: Block) -> Optional[SolidlyPool]: pool_address = await self.pair_for(input_token, output_token, stable, sync=False) if await self.contract.isPair.coroutine(pool_address, block_identifier=block): return SolidlyPool(pool_address, asynchronous=self.asynchronous)
[docs] @stuck_coro_debugger async def get_routes_from_path(self, path: Path, block: Block) -> List[Tuple[Address, Address, bool]]: routes = [] for i in range(len(path) - 1): input_token, output_token = path[i], path[i+1] # Try for a stable pool first and use that if available stable_pool, unstable_pool = await asyncio.gather( self.get_pool(input_token, output_token, True, block, sync=False), self.get_pool(input_token, output_token, False, block, sync=False), ) if stable_pool and unstable_pool: # We have to find out which of these pools is deepest stable_reserves, unstable_reserves = await asyncio.gather( stable_pool.reserves(block=block, sync=False), unstable_pool.reserves(block=block, sync=False), ) stable_reserves = tuple(stable_reserves) unstable_reserves = tuple(unstable_reserves) # NOTE: using `__token0__` and `__token1__` is faster than `__tokens__` since they're already cached and return instantly # it also creates 2 fewer tasks and 1 fewer future than `__tokens__` since there is no use of `asyncio.gather`. if await stable_pool.__token0__ == await unstable_pool.__token0__ and await stable_pool.__token1__ == await unstable_pool.__token1__: stable_reserve = stable_reserves[0] unstable_reserve = unstable_reserves[0] else: # Order of tokens is flip flopped in the pools stable_reserve = stable_reserves[0] unstable_reserve = unstable_reserves[1] if stable_reserve >= unstable_reserve: is_stable = True elif stable_reserve < unstable_reserve: is_stable = False routes.append([input_token, output_token, is_stable]) elif stable_pool: routes.append([input_token, output_token, True]) elif unstable_pool: routes.append([input_token, output_token, False]) else: raise ValueError("Not sure why this function is even running if no pool is found") return routes