Source code for y.prices.dex.uniswap.v2

 
import asyncio
import logging
from contextlib import suppress
from decimal import Decimal
from functools import cached_property
from typing import Any, AsyncIterator, Dict, List, Optional, Tuple

import a_sync
import a_sync.exceptions
import brownie
import dank_mids
from a_sync.a_sync import HiddenMethodDescriptor
from brownie import chain
from brownie.network.event import _EventItem
from dank_mids.exceptions import Revert
from multicall import Call
from typing_extensions import Self

from y import ENVIRONMENT_VARIABLES as ENVS
from y import convert
from y._decorators import continue_on_revert, stuck_coro_debugger
from y.classes.common import ERC20, ContractBase, WeiBalance
from y.constants import STABLECOINS, WRAPPED_GAS_COIN, sushi, usdc, weth
from y.contracts import Contract, contract_creation_block_async
from y.datatypes import (Address, AddressOrContract, AnyAddressType, Block,
                         Pool, UsdPrice)
from y.exceptions import (CantFindSwapPath, ContractNotVerified,
                          NonStandardERC20, NotAUniswapV2Pool, TokenNotFound,
                          call_reverted, continue_if_call_reverted)
from y.interfaces.uniswap.factoryv2 import UNIV2_FACTORY_ABI
from y.networks import Network
from y.prices import magic
from y.prices.dex.uniswap.v2_forks import (ROUTER_TO_FACTORY,
                                           ROUTER_TO_PROTOCOL, special_paths)
from y.utils.events import ProcessedEvents
from y.utils.raw_calls import raw_call

logger = logging.getLogger(__name__)

Path = List[AddressOrContract]
Reserves = Tuple[int,int,int]

factory_helper_address = {
    # put special case addresses here
}.get(chain.id, "0xE57Bfd650A7771E401d56d4b2CA22d9f8f51D3D9")

try:
    FACTORY_HELPER = Contract(factory_helper_address)
except ContractNotVerified:
    FACTORY_HELPER = None


[docs] class UniswapV2Pool(ERC20): """ Represents a Uniswap V2 liquidity pool. """ # default stored as class var to keep instance dicts smaller __types_assumed = True "True if we're assuming types based on normal univ2 abi, False if we checked via block explorer." __slots__ = 'get_reserves',
[docs] def __init__( self, address: AnyAddressType, token0: Optional[Address] = None, token1: Optional[Address] = None, deploy_block: Optional[int] = None, asynchronous: bool = False, ): super().__init__(address, asynchronous=asynchronous) self.get_reserves = Call(self.address, 'getReserves()((uint112,uint112,uint32))') if deploy_block: self._deploy_block = deploy_block if token0: self.token0 = token0 if token1: self.token1 = token1
[docs] @a_sync.aka.cached_property async def factory(self) -> Address: try: return await raw_call(self.address, 'factory()', output='address', sync=False) except ValueError as e: if call_reverted(e): raise NotAUniswapV2Pool(self) from e # `is not a valid ETH address` means we got some kind of response from the chain. # but couldn't convert to address. If it happens to be a goofy but # verified uni fork, maybe we can get factory this way okay_errors = ['is not a valid ETH address','invalid opcode','invalid jump destination'] if all(msg not in str(e) for msg in okay_errors): raise contract = await Contract.coroutine(self.address) try: return await contract.factory except AttributeError as exc: raise NotAUniswapV2Pool(self) from exc
__factory__: HiddenMethodDescriptor[Self, Address]
[docs] @a_sync.aka.property async def tokens(self) -> Tuple[ERC20, ERC20]: return await asyncio.gather(self.__token0__, self.__token1__)
__tokens__: HiddenMethodDescriptor[Self, Tuple[ERC20, ERC20]]
[docs] @a_sync.aka.cached_property async def token0(self) -> ERC20: try: if token0 := await Call(self.address, ['token0()(address)']): return ERC20(token0, asynchronous=self.asynchronous) except ValueError as e: continue_if_call_reverted(e) raise NotAUniswapV2Pool(self)
__token0__: HiddenMethodDescriptor[Self, ERC20]
[docs] @a_sync.aka.cached_property async def token1(self) -> ERC20: try: if token1 := await Call(self.address, ['token1()(address)']): return ERC20(token1, asynchronous=self.asynchronous) except ValueError as e: continue_if_call_reverted(e) raise NotAUniswapV2Pool(self)
__token1__: HiddenMethodDescriptor[Self, ERC20]
[docs] @a_sync.a_sync(ram_cache_ttl=ENVS.CACHE_TTL) @stuck_coro_debugger async def get_price(self, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE) -> Optional[UsdPrice]: tvl = await self.tvl(block=block, skip_cache=skip_cache, sync=False) if tvl is not None: return UsdPrice(tvl / Decimal(await self.total_supply_readable(block=block, sync=False)))
[docs] @a_sync.a_sync(ram_cache_maxsize=None, ram_cache_ttl=ENVS.CACHE_TTL) async def get_token_out(self, token_in: Address) -> ERC20: if token_in == (token0 := await self.__token0__): # these return instantly since theyre already cached return await self.__token1__ elif token_in == (token1 := await self.__token1__): # these return instantly since theyre already cached return await self.__token0__ raise TokenNotFound(token_in, [token0, token1]) from None
[docs] @stuck_coro_debugger async def reserves(self, *, block: Optional[Block] = None) -> Optional[Tuple[WeiBalance, WeiBalance]]: try: reserves = await self.get_reserves.coroutine(block_id=block) except Exception as e: if not call_reverted(e): raise reserves = None if reserves is None and self.__types_assumed: try: await self._check_return_types() except AttributeError as e: raise NotAUniswapV2Pool(self) from e return await self.reserves(block=block, sync=False) if reserves is None and self._verified: # This shouldn't really run anymore, maybe delete contract = await Contract.coroutine(self.address) try: reserves = await contract.getReserves.coroutine(block_identifier=block) types = ",".join(output["type"] for output in contract.getReserves.abi["outputs"]) logger.warning(f'abi for getReserves for {contract} is {types}') except Exception as e: if not call_reverted(e): raise if reserves is None: return None # NOTE: using `__token0__` and `__token1__` is faster than `__tokens__` since they're already cached and return instantly # it also creates 2 fewer tasks and 1 fewer future than `__tokens__` since there is no use of `asyncio.gather`. return ( WeiBalance(reserves[0], await self.__token0__, block=block), WeiBalance(reserves[1], await self.__token1__, block=block), )
[docs] @stuck_coro_debugger async def tvl(self, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE) -> Optional[Decimal]: # start these tasks now price_tasks: a_sync.TaskMapping[ERC20, UsdPrice] price_tasks = ERC20.price.map( # NOTE: using `__token0__` and `__token1__` is faster than `__tokens__` since they're already cached and return instantly # it also creates 2 fewer tasks and 1 fewer future than `__tokens__` since there is no use of `asyncio.gather`. [await self.__token0__, await self.__token1__], block=block, return_None_on_failure=True, skip_cache=skip_cache, ) reserves: Tuple[WeiBalance, WeiBalance] if (reserves := await self.reserves(block=block, sync=False)) is None: await price_tasks.close() return None prices = await price_tasks.values(pop=True) if vals := [ Decimal(await reserve.__readable__) * Decimal(price) for reserve, price in zip(reserves, prices) if price is not None ]: if len(vals) == 1: vals *= 2 if len(vals) == 2: logger.debug('reserves: %s', reserves) logger.debug('prices: %s', prices) logger.debug('vals: %s', vals) return sum(vals) else: raise Exception("how did we get here?") from None
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60*60, semaphore=10_000) # lets try a semaphore here async def check_liquidity(self, token: Address, block: Block) -> int: """ Check the liquidity of a specific token in the pool at a given block. Args: token: The address of the token to check. block: The block number to query. Returns: The liquidity of the token in the pool. Raises: TokenNotFound: If the token is not one of the two tokens in the liquidity pool. """ logger.debug("checking %s liquidity for %s at %s", self, token, block) if block and block < await self.deploy_block(sync=False): logger.debug("block %s is before %s deploy block", block, self) return 0 if reserves := await self.reserves(block=block, sync=False): balance: WeiBalance for balance in reserves: if token == balance.token: liquidity = balance.balance logger.debug("%s liquidity for %s at %s is %s", self, token, block, liquidity) return liquidity raise TokenNotFound(token, reserves) return 0
[docs] @stuck_coro_debugger async def is_uniswap_pool(self, block: Optional[Block] = None) -> bool: """ Check if this contract is a valid liquidity pool for Uniswap V2 or one of its forks. Args: block (optional): The block number to query. Defaults to latest block. Returns: True if the contract is a valid Uniswap V2 pool, False otherwise. """ try: return all(await asyncio.gather(self.reserves(block=block, sync=False), self.total_supply(block, sync=False))) except NotAUniswapV2Pool: return False
async def _check_return_types(self) -> None: if not self.__types_assumed: return try: contract = await Contract.coroutine(self.address) reserves_types = ",".join(output["type"] for output in contract.getReserves.abi["outputs"]) self._verified = True assert reserves_types.count(',') == 2, reserves_types self.get_reserves = Call(self.address, f'getReserves()(({reserves_types}))') except ContractNotVerified: self._verified = False self.__types_assumed = False
[docs] class PoolsFromEvents(ProcessedEvents[UniswapV2Pool]): PairCreated = "0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9" __slots__ = "asynchronous", "label"
[docs] def __init__(self, factory: AnyAddressType, label: str, asynchronous: bool = False): self.asynchronous = asynchronous self.label = label super().__init__(addresses=[factory], topics=[[self.PairCreated]])
def __repr__(self) -> str: return f"<{self.__class__.__name__} label={self.label}>"
[docs] def pools(self, to_block: Optional[int] = None) -> AsyncIterator[UniswapV2Pool]: return self._objects_thru(block=to_block)
def _get_block_for_obj(self, obj: UniswapV2Pool) -> int: return obj._deploy_block def _process_event(self, event: _EventItem) -> UniswapV2Pool: pool = UniswapV2Pool( address=event["pair"], token0=event["token0"], token1=event["token1"], asynchronous=self.asynchronous, ) # Do this here instead of in the init in case the user inited their own UniswapV2Pool object previoulsy, which is now the singleton pool._deploy_block = event.block_number return pool
[docs] class UniswapRouterV2(ContractBase):
[docs] def __init__(self, router_address: AnyAddressType, *args: Any, **kwargs: Any) -> None: super().__init__(router_address, *args, **kwargs) self.label = ROUTER_TO_PROTOCOL[self.address] self.factory = ROUTER_TO_FACTORY[self.address] self.special_paths = special_paths(self.address) self.get_amounts_out = Call(self.address, "getAmountsOut(uint,address[])(uint[])") # we need the factory contract object cached in brownie so we can decode logs properly if not ContractBase(self.factory, asynchronous=self.asynchronous)._is_cached: brownie.Contract.from_abi('UniClone Factory [forced]', self.factory, UNIV2_FACTORY_ABI) self._events = PoolsFromEvents(self.factory, self.label, asynchronous=self.asynchronous)
[docs] def __str__(self) -> str: return self.__repr__()
def __repr__(self) -> str: return f"<UniswapV2Router {self.label} '{self.address}'>"
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=500) async def get_price( self, token_in: Address, block: Optional[Block] = None, token_out: Address = usdc.address, paired_against: Address = WRAPPED_GAS_COIN, skip_cache: bool = ENVS.SKIP_CACHE, ignore_pools: Tuple[Pool, ...] = (), ) -> Optional[UsdPrice]: """ Calculate a price based on Uniswap Router quote for selling one `token_in`. Always uses intermediate WETH pair if `[token_in,weth,token_out]` swap path available. """ token_in, token_out, path = str(token_in), str(token_out), None if chain.id == Network.BinanceSmartChain and token_out == usdc.address: busd = await Contract.coroutine("0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56") token_out = busd.address if token_in in STABLECOINS: return 1 try: amount_in = await ERC20(token_in, asynchronous=True).scale except NonStandardERC20: return None if token_in in [weth.address, WRAPPED_GAS_COIN] and token_out in STABLECOINS: path = [token_in, token_out] elif str(token_out) in STABLECOINS: with suppress(CantFindSwapPath): path = await self.get_path_to_stables(token_in, block, _ignore_pools=ignore_pools, sync=False) logger.debug('smrt') # If we can't find a good path to stables, we might still be able to determine price from price of paired token if path is None and (deepest_pool:= await self.deepest_pool(token_in, block, _ignore_pools=ignore_pools, sync=False)): logger.debug('deepest pool: %s', deepest_pool) paired_with = await deepest_pool.get_token_out(token_in, sync=False) path = [token_in, paired_with] quote, out_scale = await asyncio.gather(self.get_quote(amount_in, path, block=block, sync=False), ERC20(path[-1], asynchronous=True).scale) logger.debug('quote: %s', quote) if quote is not None: amount_out = Decimal(quote[-1]) / out_scale fees = Decimal(0.997) ** (len(path) - 1) amount_out /= fees paired_with_price = await magic.get_price(paired_with, block, fail_to_None=True, skip_cache=skip_cache, ignore_pools=(*ignore_pools, deepest_pool), sync=False) if paired_with_price: return amount_out * Decimal(paired_with_price) # If we still don't have a workable path, try this smol brain method if path is None: path = self._smol_brain_path_selector(token_in, token_out, paired_against) # NOTE: does this ever run anymore? can we take it out? logger.warning('using smol brain path selector') fees = 0.997 ** (len(path) - 1) logger.debug('router: %s path: %s', self.label, path) quote, out_scale = await asyncio.gather(self.get_quote(amount_in, path, block=block, sync=False), ERC20(path[-1],asynchronous=True).scale) if quote is not None: amount_out = quote[-1] / out_scale return UsdPrice(amount_out / fees)
[docs] @continue_on_revert @stuck_coro_debugger async def get_quote(self, amount_in: int, path: Path, block: Optional[Block] = None) -> Tuple[int,int]: if not self._is_cached: return await self.get_amounts_out.coroutine((amount_in, path), block_id=block) try: return await self.contract.getAmountsOut.coroutine(amount_in, path, block_identifier=block) # TODO figure out how to best handle uni forks with slight modifications. # Sometimes the below "else" code will not work with modified methods. Brownie works for now. except Exception as e: strings = [ "INSUFFICIENT_INPUT_AMOUNT", "INSUFFICIENT_LIQUIDITY", "INSUFFICIENT_OUT_LIQUIDITY", "Sequence has incorrect length", ] if not call_reverted(e) and all(s not in str(e) for s in strings): raise
[docs] @a_sync.aka.cached_property @stuck_coro_debugger async def pools(self) -> List[UniswapV2Pool]: logger.info('Fetching pools for %s on %s. If this is your first time using ypricemagic, this can take a while. Please wait patiently...', self.label, Network.printable()) pools = [pool async for pool in self._events.pools(to_block=await dank_mids.eth.block_number)] all_pairs_len = await raw_call(self.factory, 'allPairsLength()', output='int', sync=False) if len(pools) > all_pairs_len: raise NotImplementedError('this shouldnt happen again') elif (to_get := all_pairs_len - len(pools)): # < logger.debug("Oh no! Looks like your node can't look back that far. Checking for the missing %s pools...", to_get) factory = await Contract.coroutine(self.factory) for i, pool_address in enumerate(await factory.allPairs.map(range(to_get))): pool = UniswapV2Pool(address=pool_address, asynchronous=self.asynchronous) pools.insert(i, pool) logger.debug('Done fetching %s missing pools on %s', to_get, self.label) token0s = UniswapV2Pool.token0.map(pools).values(pop=True) token1s = UniswapV2Pool.token1.map(pools).values(pop=True) tokens = set(await token0s + await token1s) logger.info('Loaded %s pools supporting %s tokens on %s', len(pools), len(tokens), self.label) return pools
__pools__: HiddenMethodDescriptor[Self, List[UniswapV2Pool]]
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=None, ram_cache_ttl=60*60) async def get_pools_for(self, token_in: Address, block: Optional[Block] = None) -> Dict[UniswapV2Pool, Address]: if self._supports_uniswap_helper: try: pools = [ UniswapV2Pool(pool, asynchronous=self.asynchronous) for pool in await FACTORY_HELPER.getPairsFor.coroutine(self.factory, token_in, block_identifier=block) ] except Exception as e: if not (call_reverted(e) or "out of gas" in str(e) or "timeout" in str(e)): raise pools = await self.__pools__ else: pools = await self.__pools__ pool_to_token_out = {} for pool in pools: # these will return immediately since the pools are already loaded by this point if token_in == await pool.__token0__: pool_to_token_out[pool] = await pool.__token1__ elif token_in == await pool.__token1__: pool_to_token_out[pool] = await pool.__token0__ return pool_to_token_out
[docs] @stuck_coro_debugger async def pools_for_token(self, token_address: Address, block: Optional[Block] = None, _ignore_pools: Tuple[UniswapV2Pool,...] = ()) -> AsyncIterator[UniswapV2Pool]: pools: Dict[UniswapV2Pool, Address] if chain.id == Network.Mainnet and token_address == WRAPPED_GAS_COIN and self.label == "uniswap v2": # This will run out of gas if we use the helper so we bypass it with a known liquid pool pools = {UniswapV2Pool("0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc", asynchronous=True): "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"} else: try: pools = await self.get_pools_for(token_address, sync=False) except Exception as e: raise if 'out of gas' not in str(e) and not call_reverted(e): e.args = (*e.args, self, token_address, block) raise try: # if it fails with no block we will try once with a block before we fetch the long way pools = await self.get_pools_for(token_address, block=block, sync=False) except Exception as e: e.args = (*e.args, self, token_address, block) raise for pool in _ignore_pools: pools.pop(pool, None) if not pools: return elif block is None: for pool in pools: yield pool else: async for pool, deploy_block in ERC20.deploy_block.map(when_no_history_return_0=True).map(pools): if deploy_block <= block: yield pool
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=500) async def deepest_pool(self, token_address: AnyAddressType, block: Optional[Block] = None, _ignore_pools: Tuple[UniswapV2Pool,...] = ()) -> Optional[UniswapV2Pool]: """returns the deepest pool for `token_address` at `block`, excluding pools in `_ignore_pools`""" token_address = convert.to_address(token_address) if token_address == WRAPPED_GAS_COIN or token_address in STABLECOINS: return await self.deepest_stable_pool(token_address, block, sync=False) if self._supports_uniswap_helper and (block is None or block >= await contract_creation_block_async(FACTORY_HELPER)): try: deepest_pool, deepest_pool_depth = await self.deepest_pool_for(token_address, block, ignore_pools=_ignore_pools) return None if deepest_pool == brownie.ZERO_ADDRESS else UniswapV2Pool(deepest_pool, asynchronous=self.asynchronous) except Revert as e: # TODO: debug me! logger.debug('helper reverted for %s at block %s ignore_pools %s: %s', token_address, block, _ignore_pools, e) except ValueError as e: if "out of gas" not in str(e): raise logger.debug('helper out of gas for %s at block %s ignore_pools %s: %s', token_address, block, _ignore_pools, e) pools = self.pools_for_token(token_address, block, _ignore_pools=_ignore_pools) logger.debug("checking %s liquidity at block %s for pools %s", token_address, block, pools) deepest_pool: UniswapV2Pool async for deepest_pool in UniswapV2Pool.check_liquidity.map(pools, token=token_address, block=block).keys(pop=True).aiterbyvalues(reverse=True): return deepest_pool
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=500) async def deepest_stable_pool(self, token_address: AnyAddressType, block: Optional[Block] = None, _ignore_pools: Tuple[UniswapV2Pool,...] = ()) -> Optional[UniswapV2Pool]: """returns the deepest pool for `token_address` at `block` which has `token_address` paired with a stablecoin, excluding pools in `_ignore_pools`""" token_out_tasks: a_sync.TaskMapping[UniswapV2Pool, ERC20] deepest_stable_pool: UniswapV2Pool pools = self.pools_for_token(convert.to_address(token_address), block=block, _ignore_pools=_ignore_pools) token_out_tasks = UniswapV2Pool.get_token_out.map(token_in=token_address) if stable_pools := [pool async for pool, paired_with in token_out_tasks.map(pools) if paired_with in STABLECOINS]: del token_out_tasks if self._supports_uniswap_helper and (block is None or block >= await contract_creation_block_async(FACTORY_HELPER, when_no_history_return_0=True)): deepest_stable_pool, deepest_stable_pool_balance = await FACTORY_HELPER.deepestPoolForFrom.coroutine(token_address, stable_pools, block_identifier=block) return None if deepest_stable_pool == brownie.ZERO_ADDRESS else UniswapV2Pool(deepest_stable_pool, asynchronous=self.asynchronous) async for deepest_stable_pool, depth in UniswapV2Pool.check_liquidity.map(stable_pools, token=token_address, block=block).items(pop=True).aiterbyvalues(reverse=True): return deepest_stable_pool
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=500) async def get_path_to_stables(self, token: AnyAddressType, block: Optional[Block] = None, _loop_count: int = 0, _ignore_pools: Tuple[UniswapV2Pool,...] = ()) -> Path: if _loop_count > 10: raise CantFindSwapPath token_address = convert.to_address(token) path = [token_address] deepest_pool = await self.deepest_pool(token_address, block, _ignore_pools, sync=False) if deepest_pool: paired_with = await deepest_pool.get_token_out(token_address, sync=False) from y.prices.utils.buckets import check_bucket if await check_bucket(paired_with, sync=False) and _loop_count == 0: # let's just use the other token to get the price return None deepest_stable_pool = await self.deepest_stable_pool(token_address, block, _ignore_pools=_ignore_pools, sync=False) if deepest_stable_pool and deepest_pool == deepest_stable_pool: path.append(await deepest_stable_pool.get_token_out(token_address, sync=False)) return path if path == [token_address]: with suppress(CantFindSwapPath): path.extend( await self.get_path_to_stables( paired_with, block=block, _loop_count=_loop_count+1, _ignore_pools=tuple(list(_ignore_pools) + [deepest_pool]), sync=False, ) ) if path == [token_address]: raise CantFindSwapPath(f'Unable to find swap path for {token_address} on {Network.printable()}') return path
[docs] @stuck_coro_debugger @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60*60) async def check_liquidity(self, token: Address, block: Block, ignore_pools = []) -> int: logger.debug("checking %s liquidity for %s at %s", self, token, block) if block and block < await contract_creation_block_async(self.factory): logger.debug("block %s is before %s deploy block") return 0 if self._supports_uniswap_helper and (block is None or block >= await contract_creation_block_async(FACTORY_HELPER)): try: deepest_pool, liquidity = await self.deepest_pool_for(token, block, ignore_pools=ignore_pools) logger.debug("%s liquidity for %s at %s is %s", self, token, block, liquidity) return liquidity except Revert as e: # TODO: debug me! logger.debug('helper reverted on check_liquidity for %s at block %s: %s',token, block, e) except ValueError as e: if "out of gas" not in str(e): raise logger.debug('helper out of gas on check_liquidity for %s at block %s: %s',token, block, e) pools = self.pools_for_token(token, block=block, _ignore_pools=ignore_pools) try: liquidity = await UniswapV2Pool.check_liquidity.max(pools, token=token, block=block, sync=False) except a_sync.exceptions.EmptySequenceError: liquidity = 0 logger.debug("%s liquidity for %s at %s is %s", self, token, block, liquidity) return liquidity
[docs] @a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60*60) @stuck_coro_debugger async def deepest_pool_for(self, token: Address, block: Block = None, *, ignore_pools = []) -> Tuple[Address, int]: # sourcery skip: default-mutable-arg try: deepest = await FACTORY_HELPER.deepestPoolFor.coroutine(self.factory, token, ignore_pools, block_identifier=block) logger.debug("got deepest pool for %s at %s: %s from helper", token, block, deepest) return deepest except Exception as e: e.args = (*e.args, self.__repr__(), token, block, ignore_pools) raise
@cached_property def _supports_uniswap_helper(self) -> bool: """returns True if our uniswap helper contract is supported, False if not""" return chain.id != Network.Mainnet and FACTORY_HELPER and self.label and self.label not in {"zipswap"} def _smol_brain_path_selector(self, token_in: AddressOrContract, token_out: AddressOrContract, paired_against: AddressOrContract) -> Path: # sourcery skip: assign-if-exp, lift-return-into-if, merge-duplicate-blocks, merge-else-if-into-elif, remove-redundant-if, remove-unnecessary-cast '''Chooses swap path to use for quote''' # NOTE: can we just delete this now? probably, must test token_in, token_out, paired_against = str(token_in), str(token_out), str(paired_against) if str(paired_against) in STABLECOINS and str(token_out) in STABLECOINS: path = [token_in, paired_against] elif weth in (token_in, token_out): path = [token_in, token_out] elif sushi and paired_against == sushi and token_out != sushi: path = [token_in,sushi,weth,token_out] elif str(token_in) in self.special_paths and str(token_out) in STABLECOINS: path = self.special_paths[str(token_in)] elif chain.id == Network.BinanceSmartChain: from y.constants import cake, wbnb if WRAPPED_GAS_COIN in (token_in, token_out): path = [token_in, token_out] elif cake.address in (token_in, token_out): path = [token_in, token_out] else: path = [token_in,wbnb.address,token_out] else: if WRAPPED_GAS_COIN in (token_in, token_out): path = [token_in, token_out] else: path = [token_in, WRAPPED_GAS_COIN, token_out] return path