import asyncio
from contextlib import suppress
from decimal import Decimal
from functools import cached_property
from logging import DEBUG, getLogger
from typing import AsyncIterator, Dict, List, Optional, Set, Tuple
import a_sync
import a_sync.exceptions
import brownie
import dank_mids
from a_sync.a_sync import HiddenMethodDescriptor
from brownie.network.event import _EventItem
from dank_mids.exceptions import Revert
from eth_typing import HexAddress
from multicall import Call
from typing_extensions import Self
from web3.exceptions import ContractLogicError
from y import ENVIRONMENT_VARIABLES as ENVS
from y import convert
from y._decorators import continue_on_revert, stuck_coro_debugger
from y.classes.common import ERC20, ContractBase, WeiBalance
from y.constants import CHAINID, STABLECOINS, WRAPPED_GAS_COIN, sushi, usdc, weth
from y.contracts import Contract, contract_creation_block_async
from y.datatypes import (
Address,
AddressOrContract,
AnyAddressType,
Block,
Pool,
UsdPrice,
)
from y.exceptions import (
CantFindSwapPath,
ContractNotVerified,
NonStandardERC20,
NotAUniswapV2Pool,
TokenNotFound,
call_reverted,
continue_if_call_reverted,
reraise_excs_with_extra_context,
)
from y.interfaces.uniswap.factoryv2 import UNIV2_FACTORY_ABI
from y.networks import Network
from y.prices import magic
from y.prices.dex.uniswap.v2_forks import (
ROUTER_TO_FACTORY,
ROUTER_TO_PROTOCOL,
special_paths,
)
from y.utils.events import ProcessedEvents
from y.utils.raw_calls import raw_call
logger = getLogger(__name__)
Path = List[AddressOrContract]
Reserves = Tuple[int, int, int]
factory_helper_address = {
# put special case addresses here
}.get(CHAINID, "0xE57Bfd650A7771E401d56d4b2CA22d9f8f51D3D9")
try:
FACTORY_HELPER = Contract(factory_helper_address)
except ContractNotVerified:
FACTORY_HELPER = None
[docs]
class UniswapV2Pool(ERC20):
"""
Represents a liquidity pool from Uniswap V2 or its forks.
This class provides methods to interact with and retrieve information
about a Uniswap V2 or fork liquidity pool, such as token reserves,
total value locked (TVL), and price calculations.
Examples:
>>> pool = UniswapV2Pool("0xAddress")
>>> reserves = await pool.reserves()
>>> price = await pool.get_price()
>>> tvl = await pool.tvl()
See Also:
- :class:`~y.classes.common.ERC20`
- :class:`~y.prices.dex.uniswap.v2_forks`
"""
# default stored as class var to keep instance dicts smaller
__types_assumed = True
"True if we're assuming types based on normal univ2 abi, False if we checked via block explorer."
__slots__ = ("get_reserves",)
[docs]
def __init__(
self,
address: AnyAddressType,
token0: Optional[Address] = None,
token1: Optional[Address] = None,
deploy_block: Optional[int] = None,
asynchronous: bool = False,
):
super().__init__(address, asynchronous=asynchronous)
self.get_reserves = Call(
self.address, "getReserves()((uint112,uint112,uint32))"
).coroutine
if deploy_block:
self._deploy_block = deploy_block
if token0:
self.token0 = token0
if token1:
self.token1 = token1
[docs]
@a_sync.aka.cached_property
async def factory(self) -> Address:
try:
return await raw_call(
self.address, "factory()", output="address", sync=False
)
except ValueError as e:
if call_reverted(e):
raise NotAUniswapV2Pool(self) from e
# `is not a valid ETH address` means we got some kind of response from the chain.
# but couldn't convert to address. If it happens to be a goofy but
# verified uni fork, maybe we can get factory this way
okay_errors = [
"is not a valid ETH address",
"invalid opcode",
"invalid jump destination",
]
if all(msg not in str(e) for msg in okay_errors):
raise
contract = await Contract.coroutine(self.address)
try:
return await contract.factory
except AttributeError as exc:
raise NotAUniswapV2Pool(self) from exc
__factory__: HiddenMethodDescriptor[Self, Address]
[docs]
@a_sync.aka.property
async def tokens(self) -> Tuple[ERC20, ERC20]:
return await asyncio.gather(self.__token0__, self.__token1__)
__tokens__: HiddenMethodDescriptor[Self, Tuple[ERC20, ERC20]]
[docs]
@a_sync.aka.cached_property
async def token0(self) -> ERC20:
try:
if token0 := await Call(self.address, "token0()(address)"):
return ERC20(token0, asynchronous=self.asynchronous)
except ValueError as e:
continue_if_call_reverted(e)
raise NotAUniswapV2Pool(self)
__token0__: HiddenMethodDescriptor[Self, ERC20]
[docs]
@a_sync.aka.cached_property
async def token1(self) -> ERC20:
try:
if token1 := await Call(self.address, "token1()(address)"):
return ERC20(token1, asynchronous=self.asynchronous)
except ValueError as e:
continue_if_call_reverted(e)
raise NotAUniswapV2Pool(self)
__token1__: HiddenMethodDescriptor[Self, ERC20]
[docs]
@a_sync.a_sync(ram_cache_ttl=ENVS.CACHE_TTL)
@stuck_coro_debugger
async def get_price(
self, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE
) -> Optional[UsdPrice]:
"""
Calculate the price of the pool's liquidity token in USD.
This method calculates the price of the pool's liquidity token by dividing
the total value locked (TVL) by the total supply of the liquidity token.
Args:
block: The block number to query. Defaults to the latest block.
skip_cache: If True, skip using the cache while fetching price data.
Examples:
>>> pool = UniswapV2Pool("0xAddress")
>>> price = await pool.get_price()
>>> print(price)
See Also:
- :meth:`tvl`
- :meth:`total_supply_readable`
"""
tvl = await self.tvl(block=block, skip_cache=skip_cache, sync=False)
if tvl is not None:
return UsdPrice(
tvl / Decimal(await self.total_supply_readable(block=block, sync=False))
)
[docs]
@a_sync.a_sync(ram_cache_maxsize=None, ram_cache_ttl=ENVS.CACHE_TTL)
async def get_token_out(self, token_in: Address) -> ERC20:
if token_in == (token0 := await self.__token0__):
# these return instantly since theyre already cached
return await self.__token1__
elif token_in == (token1 := await self.__token1__):
# these return instantly since theyre already cached
return await self.__token0__
raise TokenNotFound(token_in, [token0, token1]) from None
[docs]
@stuck_coro_debugger
async def reserves(
self, *, block: Optional[Block] = None
) -> Optional[Tuple[WeiBalance, WeiBalance]]:
try:
reserves = await self.get_reserves(block_id=block)
except Exception as e:
if not call_reverted(e):
raise
reserves = None
if reserves is None and self.__types_assumed:
try:
await self._check_return_types()
except AttributeError as e:
raise NotAUniswapV2Pool(self) from e
return await self.reserves(block=block, sync=False)
if reserves is None and self._verified:
# This shouldn't really run anymore, maybe delete
contract = await Contract.coroutine(self.address)
try:
reserves = await contract.getReserves.coroutine(block_identifier=block)
types = ",".join(
output["type"] for output in contract.getReserves.abi["outputs"]
)
logger.warning(f"abi for getReserves for {contract} is {types}")
except Exception as e:
if not call_reverted(e):
raise
if reserves is None:
return None
# NOTE: using `__token0__` and `__token1__` is faster than `__tokens__` since they're already cached and return instantly
# it also creates 2 fewer tasks and 1 fewer future than `__tokens__` since there is no use of `asyncio.gather`.
return (
WeiBalance(reserves[0], await self.__token0__, block=block),
WeiBalance(reserves[1], await self.__token1__, block=block),
)
[docs]
@stuck_coro_debugger
async def tvl(
self, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE
) -> Optional[Decimal]:
"""
Calculate the total value locked (TVL) in the pool in USD.
This method calculates the TVL by summing the USD value of the reserves
of the two tokens in the pool.
Args:
block: The block number to query. Defaults to the latest block.
skip_cache: If True, skip using the cache while fetching price data.
Examples:
>>> pool = UniswapV2Pool("0xAddress")
>>> tvl = await pool.tvl()
>>> print(tvl)
See Also:
- :meth:`reserves`
- :meth:`get_price`
"""
# start these tasks now
price_tasks: a_sync.TaskMapping[ERC20, UsdPrice]
price_tasks = ERC20.price.map(
# NOTE: using `__token0__` and `__token1__` is faster than `__tokens__` since they're already cached and return instantly
# it also creates 2 fewer tasks and 1 fewer future than `__tokens__` since there is no use of `asyncio.gather`.
[await self.__token0__, await self.__token1__],
block=block,
return_None_on_failure=True,
skip_cache=skip_cache,
)
reserves: Tuple[WeiBalance, WeiBalance]
if (reserves := await self.reserves(block=block, sync=False)) is None:
await price_tasks.close()
return None
prices = await price_tasks.values(pop=True)
if vals := [
Decimal(await reserve.__readable__) * Decimal(price)
for reserve, price in zip(reserves, prices)
if price is not None
]:
if len(vals) == 1:
vals *= 2
if len(vals) == 2:
if logger.isEnabledFor(DEBUG):
logger._log(DEBUG, "reserves: %s", (reserves,))
logger._log(DEBUG, "prices: %s", (prices,))
logger._log(DEBUG, "vals: %s", (vals,))
return sum(vals)
else:
raise Exception("how did we get here?") from None
[docs]
@stuck_coro_debugger
@a_sync.a_sync(
ram_cache_maxsize=100_000, ram_cache_ttl=60 * 60, semaphore=10_000
) # lets try a semaphore here
async def check_liquidity(self, token: Address, block: Block) -> int:
"""
Check the liquidity of a specific token in the pool at a given block.
Args:
token: The address of the token to check.
block: The block number to query.
Returns:
The liquidity of the token in the pool.
Raises:
TokenNotFound: If the token is not one of the two tokens in the liquidity pool.
Examples:
>>> pool = UniswapV2Pool("0xAddress")
>>> liquidity = await pool.check_liquidity("0xTokenAddress", 12345678)
>>> print(liquidity)
"""
if debug_logs := logger.isEnabledFor(DEBUG):
logger._log(
DEBUG, "checking %s liquidity for %s at %s", (self, token, block)
)
if block and block < await self.deploy_block(sync=False):
if debug_logs:
logger._log(DEBUG, "block %s is before %s deploy block", (block, self))
return 0
if reserves := await self.reserves(block=block, sync=False):
balance: WeiBalance
for balance in reserves:
if token == balance.token:
liquidity = balance.balance
if debug_logs:
logger._log(
DEBUG,
"%s liquidity for %s at %s is %s",
(self, token, block, liquidity),
)
return liquidity
raise TokenNotFound(token, reserves)
return 0
[docs]
@stuck_coro_debugger
async def is_uniswap_pool(self, block: Optional[Block] = None) -> bool:
"""
Check if this contract is a valid liquidity pool for Uniswap V2 or one of its forks.
Args:
block (optional): The block number to query. Defaults to latest block.
Returns:
True if the contract is a valid Uniswap V2 pool, False otherwise.
Examples:
>>> pool = UniswapV2Pool("0xAddress")
>>> is_valid = await pool.is_uniswap_pool()
>>> print(is_valid)
"""
try:
return all(
await asyncio.gather(
self.reserves(block=block, sync=False),
self.total_supply(block, sync=False),
)
)
except (NotAUniswapV2Pool, ContractNotVerified):
return False
async def _check_return_types(self) -> None:
if not self.__types_assumed:
return
try:
contract = await Contract.coroutine(self.address)
reserves_types = ",".join(
output["type"] for output in contract.getReserves.abi["outputs"]
)
self._verified = True
assert reserves_types.count(",") == 2, reserves_types
self.get_reserves = Call(
self.address, f"getReserves()(({reserves_types}))"
).coroutine
except ContractNotVerified:
self._verified = False
self.__types_assumed = False
[docs]
class PoolsFromEvents(ProcessedEvents[UniswapV2Pool]):
PairCreated = "0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9"
__slots__ = "asynchronous", "label"
[docs]
def __init__(self, factory: AnyAddressType, label: str, asynchronous: bool = False):
self.asynchronous = asynchronous
self.label = label
super().__init__(
addresses=[factory], topics=[[self.PairCreated]], is_reusable=False
)
def __repr__(self) -> str:
return f"<{self.__class__.__name__} label={self.label}>"
[docs]
def pools(self, to_block: Optional[int] = None) -> AsyncIterator[UniswapV2Pool]:
return self._objects_thru(block=to_block)
def _get_block_for_obj(self, obj: UniswapV2Pool) -> int:
return obj._deploy_block
def _process_event(self, event: _EventItem) -> UniswapV2Pool:
pool = UniswapV2Pool(
address=event["pair"],
token0=event["token0"],
token1=event["token1"],
asynchronous=self.asynchronous,
)
# Do this here instead of in the init in case the user inited their own UniswapV2Pool object previoulsy, which is now the singleton
pool._deploy_block = event.block_number
return pool
def _log_factory_helper_failure(
e: Exception, token_address, block, _ignore_pools
) -> None:
stre = f"{e}".lower()
if "timeout" in stre:
msg = "timeout"
elif "out of gas" in stre:
msg = "out of gas"
elif "invalid request" in stre:
# TODO: debug where these come from
msg = "invalid request"
elif isinstance(e, (Revert, ContractLogicError)):
# TODO: debug me!
msg = "reverted"
else:
raise e
logger.debug(
"helper %s for %s at block %s ignore_pools %s: %s",
msg,
token_address,
block,
_ignore_pools,
e,
)
_all_pools_semaphore = a_sync.Semaphore(
10, name=f"{__name__}.UniswapRouterV2.all_pools_for"
)
[docs]
class UniswapRouterV2(ContractBase):
[docs]
def __init__(
self, router_address: AnyAddressType, *, asynchronous: bool = False
) -> None:
super().__init__(router_address, asynchronous=asynchronous)
self.label = ROUTER_TO_PROTOCOL[self.address]
self.factory = ROUTER_TO_FACTORY[self.address]
self.special_paths = special_paths(self.address)
self.get_amounts_out = Call(
self.address, "getAmountsOut(uint,address[])(uint[])"
).coroutine
self._skip_factory_helper: Set[Address] = set()
# we need the factory contract object cached in brownie so we can decode logs properly
if not ContractBase(self.factory, asynchronous=self.asynchronous)._is_cached:
brownie.Contract.from_abi(
"UniClone Factory [forced]", self.factory, UNIV2_FACTORY_ABI
)
[docs]
def __str__(self) -> str:
return self.__repr__()
def __repr__(self) -> str:
return f"<UniswapV2Router {self.label} '{self.address}'>"
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=500)
async def get_price(
self,
token_in: Address,
block: Optional[Block] = None,
token_out: Address = usdc.address,
paired_against: Address = WRAPPED_GAS_COIN,
skip_cache: bool = ENVS.SKIP_CACHE,
ignore_pools: Tuple[Pool, ...] = (),
) -> Optional[UsdPrice]:
"""
Calculate a price based on Uniswap Router quote for selling one `token_in`.
Always uses intermediate WETH pair if `[token_in,weth,token_out]` swap path available.
"""
token_in, token_out, path = str(token_in), str(token_out), None
if CHAINID == Network.BinanceSmartChain and token_out == usdc.address:
busd = await Contract.coroutine(
"0xe9e7CEA3DedcA5984780Bafc599bD69ADd087D56"
)
token_out = busd.address
if token_in in STABLECOINS:
return 1
try:
amount_in = await ERC20(token_in, asynchronous=True).scale
except NonStandardERC20:
return None
debug_logs = logger.isEnabledFor(DEBUG)
if token_in in [weth.address, WRAPPED_GAS_COIN] and token_out in STABLECOINS:
path = [token_in, token_out]
elif str(token_out) in STABLECOINS:
with suppress(CantFindSwapPath):
path = await self.get_path_to_stables(
token_in, block, _ignore_pools=ignore_pools, sync=False
)
if debug_logs:
logger._log(DEBUG, "smrt")
# If we can't find a good path to stables, we might still be able to determine price from price of paired token
if path is None and (
deepest_pool := await self.deepest_pool(
token_in, block, _ignore_pools=ignore_pools, sync=False
)
):
if debug_logs:
logger._log(DEBUG, "deepest pool: %s", (deepest_pool,))
paired_with = await deepest_pool.get_token_out(token_in, sync=False)
path = [token_in, paired_with]
quote, out_scale = await asyncio.gather(
self.get_quote(amount_in, path, block=block, sync=False),
ERC20(path[-1], asynchronous=True).scale,
)
if debug_logs:
logger._log(DEBUG, "quote: %s", (quote,))
if quote is not None:
amount_out = Decimal(quote[-1]) / out_scale
fees = Decimal(0.997) ** (len(path) - 1)
amount_out /= fees
paired_with_price = await magic.get_price(
paired_with,
block,
fail_to_None=True,
skip_cache=skip_cache,
ignore_pools=(*ignore_pools, deepest_pool),
sync=False,
)
if paired_with_price:
return amount_out * Decimal(paired_with_price)
# If we still don't have a workable path, try this smol brain method
if path is None:
path = self._smol_brain_path_selector(token_in, token_out, paired_against)
# NOTE: does this ever run anymore? can we take it out?
logger.warning("using smol brain path selector")
fees = 0.997 ** (len(path) - 1)
if debug_logs:
logger._log(DEBUG, "router: %s path: %s", (self.label, path))
quote, out_scale = await asyncio.gather(
self.get_quote(amount_in, path, block=block, sync=False),
ERC20(path[-1], asynchronous=True).scale,
)
if quote is not None:
amount_out = quote[-1] / out_scale
return UsdPrice(amount_out / fees)
[docs]
@continue_on_revert
@stuck_coro_debugger
async def get_quote(
self, amount_in: int, path: Path, block: Optional[Block] = None
) -> Tuple[int, int]:
if not self._is_cached:
return await self.get_amounts_out((amount_in, path), block_id=block)
try:
return await self.contract.getAmountsOut.coroutine(
amount_in, path, block_identifier=block
)
# TODO figure out how to best handle uni forks with slight modifications.
# Sometimes the below "else" code will not work with modified methods. Brownie works for now.
except Exception as e:
strings = [
"INSUFFICIENT_INPUT_AMOUNT",
"INSUFFICIENT_LIQUIDITY",
"INSUFFICIENT_OUT_LIQUIDITY",
"Sequence has incorrect length",
]
if not call_reverted(e) and all(s not in str(e) for s in strings):
raise
[docs]
@a_sync.aka.cached_property
@stuck_coro_debugger
async def pools(self) -> List[UniswapV2Pool]:
logger.info(
"Fetching pools for %s on %s. If this is your first time using ypricemagic, this can take a while. Please wait patiently...",
self.label,
Network.printable(),
)
factory = self.factory
events = PoolsFromEvents(factory, self.label, asynchronous=self.asynchronous)
to_block = await dank_mids.eth.block_number
pools = [pool async for pool in events.pools(to_block=to_block)]
events._task.cancel()
del events
all_pairs_len = await raw_call(
factory, "allPairsLength()", output="int", sync=False
)
if len(pools) > all_pairs_len:
raise NotImplementedError("this shouldnt happen again")
elif to_get := all_pairs_len - len(pools): # <
logger.debug(
"Oh no! Looks like your node can't look back that far. Checking for the missing %s pools...",
to_get,
)
factory = await Contract.coroutine(self.factory)
for i, pool_address in enumerate(await factory.allPairs.map(range(to_get))):
pool = UniswapV2Pool(
address=pool_address, asynchronous=self.asynchronous
)
pools.insert(i, pool)
logger.debug("Done fetching %s missing pools on %s", to_get, self.label)
token0s = UniswapV2Pool.token0.map(pools).values(pop=True)
token1s = UniswapV2Pool.token1.map(pools).values(pop=True)
tokens = set(await token0s + await token1s)
logger.info(
"Loaded %s pools supporting %s tokens on %s",
len(pools),
len(tokens),
self.label,
)
return pools
__pools__: HiddenMethodDescriptor[Self, List[UniswapV2Pool]]
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=None, semaphore=_all_pools_semaphore)
async def all_pools_for(self, token_in: Address) -> Dict[UniswapV2Pool, Address]:
pool_to_token_out = {}
for i, pool in enumerate(await self.__pools__):
# these will return immediately since the pools are already loaded by this point
if token_in == await pool.__token0__:
pool_to_token_out[pool] = await pool.__token1__
elif token_in == await pool.__token1__:
pool_to_token_out[pool] = await pool.__token0__
if not i % 10_000:
await asyncio.sleep(0)
return pool_to_token_out
[docs]
@stuck_coro_debugger
async def get_pools_for(
self, token_in: Address, block: Optional[Block] = None
) -> Dict[UniswapV2Pool, Address]:
if (
self._supports_factory_helper is False
or token_in in self._skip_factory_helper
):
return await self.all_pools_for(token_in, sync=False)
try:
pools: List[HexAddress] = await FACTORY_HELPER.getPairsFor.coroutine(
self.factory, token_in, block_identifier=block
)
except Exception as e:
okay_errs = "out of gas", "timeout"
stre = str(e).lower()
if "invalid request" in stre:
# TODO: debug where this invalid request is coming from
self._skip_factory_helper.add(token_in)
elif call_reverted(e) or any(err in stre for err in okay_errs):
pass
else:
raise
return await self.all_pools_for(token_in, sync=False)
pool_to_token_out = {}
for p in pools:
pool = UniswapV2Pool(p, asynchronous=self.asynchronous)
# these will return immediately since the pools are already loaded by this point
if token_in == await pool.__token0__:
pool_to_token_out[pool] = await pool.__token1__
elif token_in == await pool.__token1__:
pool_to_token_out[pool] = await pool.__token0__
return pool_to_token_out
[docs]
@stuck_coro_debugger
async def pools_for_token(
self,
token_address: Address,
block: Optional[Block] = None,
_ignore_pools: Tuple[UniswapV2Pool, ...] = (),
) -> AsyncIterator[UniswapV2Pool]:
pools: Dict[UniswapV2Pool, Address]
if (
CHAINID == Network.Mainnet
and token_address == WRAPPED_GAS_COIN
and self.label == "uniswap v2"
):
# This will run out of gas if we use the helper so we bypass it with a known liquid pool
usdc_pool = UniswapV2Pool(
"0xB4e16d0168e52d35CaCD2c6185b44281Ec28C9Dc", asynchronous=True
)
pools = {usdc_pool: "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"}
else:
pools = await self.get_pools_for(token_address, block=block, sync=False)
for pool in _ignore_pools:
pools.pop(pool, None)
if not pools:
return
elif block is None:
for pool in pools:
yield pool
return
async for pool, deploy_block in ERC20.deploy_block.map(
when_no_history_return_0=True
).map(pools):
if deploy_block <= block:
yield pool
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=500)
async def deepest_pool(
self,
token_address: AnyAddressType,
block: Optional[Block] = None,
_ignore_pools: Tuple[UniswapV2Pool, ...] = (),
) -> Optional[UniswapV2Pool]:
"""returns the deepest pool for `token_address` at `block`, excluding pools in `_ignore_pools`"""
token_address = await convert.to_address_async(token_address)
if token_address == WRAPPED_GAS_COIN or token_address in STABLECOINS:
return await self.deepest_stable_pool(token_address, block, sync=False)
if self._supports_factory_helper and (
block is None
or block >= await contract_creation_block_async(FACTORY_HELPER)
):
try:
deepest_pool, deepest_pool_depth = await self.deepest_pool_for(
token_address, block, ignore_pools=_ignore_pools
)
return (
None
if deepest_pool == brownie.ZERO_ADDRESS
else UniswapV2Pool(deepest_pool, asynchronous=self.asynchronous)
)
except (Revert, ValueError, ContractLogicError) as e:
if "invalid request" in str(e):
self._skip_factory_helper.add(token_address)
_log_factory_helper_failure(e, token_address, block, _ignore_pools)
pools = self.pools_for_token(token_address, block, _ignore_pools=_ignore_pools)
logger.debug(
"checking %s liquidity at block %s for pools %s",
token_address,
block,
pools,
)
deepest_pool: UniswapV2Pool
async for deepest_pool in (
UniswapV2Pool.check_liquidity.map(pools, token=token_address, block=block)
.keys(pop=True)
.aiterbyvalues(reverse=True)
):
return deepest_pool
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=500)
async def deepest_stable_pool(
self,
token_address: AnyAddressType,
block: Optional[Block] = None,
_ignore_pools: Tuple[UniswapV2Pool, ...] = (),
) -> Optional[UniswapV2Pool]:
"""returns the deepest pool for `token_address` at `block` which has `token_address` paired with a stablecoin, excluding pools in `_ignore_pools`"""
token_out_tasks: a_sync.TaskMapping[UniswapV2Pool, ERC20]
deepest_stable_pool: UniswapV2Pool
token_address = await convert.to_address_async(token_address)
pools = self.pools_for_token(
token_address, block=block, _ignore_pools=_ignore_pools
)
token_out_tasks = UniswapV2Pool.get_token_out.map(token_in=token_address)
if stable_pools := [
pool
async for pool, paired_with in token_out_tasks.map(pools)
if paired_with in STABLECOINS
]:
del token_out_tasks
if self._supports_factory_helper and (
block is None
or block
>= await contract_creation_block_async(
FACTORY_HELPER, when_no_history_return_0=True
)
):
(
deepest_stable_pool,
deepest_stable_pool_balance,
) = await FACTORY_HELPER.deepestPoolForFrom.coroutine(
token_address, stable_pools, block_identifier=block
)
return (
None
if deepest_stable_pool == brownie.ZERO_ADDRESS
else UniswapV2Pool(
deepest_stable_pool, asynchronous=self.asynchronous
)
)
async for deepest_stable_pool, depth in (
UniswapV2Pool.check_liquidity.map(
stable_pools, token=token_address, block=block
)
.items(pop=True)
.aiterbyvalues(reverse=True)
):
return deepest_stable_pool
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=500)
async def get_path_to_stables(
self,
token: AnyAddressType,
block: Optional[Block] = None,
_loop_count: int = 0,
_ignore_pools: Tuple[UniswapV2Pool, ...] = (),
) -> Path:
if _loop_count > 10:
raise CantFindSwapPath
token_address = await convert.to_address_async(token)
path = [token_address]
deepest_pool = await self.deepest_pool(
token_address, block, _ignore_pools, sync=False
)
if deepest_pool:
paired_with = await deepest_pool.get_token_out(token_address, sync=False)
from y.prices.utils.buckets import check_bucket
if await check_bucket(paired_with, sync=False) and _loop_count == 0:
# let's just use the other token to get the price
return None
deepest_stable_pool = await self.deepest_stable_pool(
token_address, block, _ignore_pools=_ignore_pools, sync=False
)
if deepest_stable_pool and deepest_pool == deepest_stable_pool:
path.append(
await deepest_stable_pool.get_token_out(token_address, sync=False)
)
return path
if path == [token_address]:
with suppress(CantFindSwapPath):
path.extend(
await self.get_path_to_stables(
paired_with,
block=block,
_loop_count=_loop_count + 1,
_ignore_pools=tuple(list(_ignore_pools) + [deepest_pool]),
sync=False,
)
)
if path == [token_address]:
raise CantFindSwapPath(
f"Unable to find swap path for {token_address} on {Network.printable()}"
)
return path
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60 * 60)
async def check_liquidity(
self, token: Address, block: Block, ignore_pools=[]
) -> int:
if debug_logs := logger.isEnabledFor(DEBUG):
logger._log(
DEBUG, "checking %s liquidity for %s at %s", (self, token, block)
)
if block and block < await contract_creation_block_async(self.factory):
if debug_logs:
logger._log(DEBUG, "block %s is before %s deploy block")
return 0
if self._supports_factory_helper and (
block is None
or block >= await contract_creation_block_async(FACTORY_HELPER)
):
try:
deepest_pool, liquidity = await self.deepest_pool_for(
token, block, ignore_pools=ignore_pools
)
if debug_logs:
logger._log(
DEBUG,
"%s liquidity for %s at %s is %s",
(self, token, block, liquidity),
)
return liquidity
except (Revert, ValueError, ContractLogicError) as e:
_log_factory_helper_failure(e, token, block, ignore_pools)
pools = self.pools_for_token(token, block=block, _ignore_pools=ignore_pools)
try:
liquidity = await UniswapV2Pool.check_liquidity.max(
pools, token=token, block=block, sync=False
)
except a_sync.exceptions.EmptySequenceError:
liquidity = 0
if debug_logs:
logger._log(
DEBUG,
"%s liquidity for %s at %s is %s",
(self, token, block, liquidity),
)
return liquidity
[docs]
@a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60 * 60)
@stuck_coro_debugger
async def deepest_pool_for(
self, token: Address, block: Block = None, *, ignore_pools=[]
) -> Tuple[Address, int]:
# sourcery skip: default-mutable-arg
with reraise_excs_with_extra_context(self, token, block, ignore_pools):
deepest = await FACTORY_HELPER.deepestPoolFor.coroutine(
self.factory, token, ignore_pools, block_identifier=block
)
logger.debug(
"got deepest pool for %s at %s: %s from helper", token, block, deepest
)
return deepest
@cached_property
def _supports_factory_helper(self) -> bool:
"""returns True if our uniswap helper contract is supported, False if not"""
return (
CHAINID != Network.Mainnet
and FACTORY_HELPER
and self.label
and self.label not in {"zipswap"}
)
def _smol_brain_path_selector(
self,
token_in: AddressOrContract,
token_out: AddressOrContract,
paired_against: AddressOrContract,
) -> Path:
# sourcery skip: assign-if-exp, lift-return-into-if, merge-duplicate-blocks, merge-else-if-into-elif, remove-redundant-if, remove-unnecessary-cast
"""Chooses swap path to use for quote"""
# NOTE: can we just delete this now? probably, must test
token_in, token_out, paired_against = (
str(token_in),
str(token_out),
str(paired_against),
)
if str(paired_against) in STABLECOINS and str(token_out) in STABLECOINS:
path = [token_in, paired_against]
elif weth in (token_in, token_out):
path = [token_in, token_out]
elif sushi and paired_against == sushi and token_out != sushi:
path = [token_in, sushi, weth, token_out]
elif str(token_in) in self.special_paths and str(token_out) in STABLECOINS:
path = self.special_paths[str(token_in)]
elif CHAINID == Network.BinanceSmartChain:
from y.constants import cake, wbnb
if WRAPPED_GAS_COIN in (token_in, token_out):
path = [token_in, token_out]
elif cake.address in (token_in, token_out):
path = [token_in, token_out]
else:
path = [token_in, wbnb.address, token_out]
else:
if WRAPPED_GAS_COIN in (token_in, token_out):
path = [token_in, token_out]
else:
path = [token_in, WRAPPED_GAS_COIN, token_out]
return path