import asyncio
import logging
import math
from collections import defaultdict
from functools import cached_property, lru_cache
from itertools import cycle
from typing import AsyncIterator, DefaultDict, Dict, List, Optional, Tuple
import a_sync
import eth_retry
from a_sync.a_sync import HiddenMethodDescriptor
from brownie.network.event import _EventItem
from eth_typing import ChecksumAddress
from typing_extensions import Self
from y import ENVIRONMENT_VARIABLES as ENVS
from y._decorators import stuck_coro_debugger
from y.classes.common import ERC20, ContractBase
from y.constants import CHAINID, usdc, weth
from y.contracts import Contract, contract_creation_block_async
from y.datatypes import Address, AnyAddressType, Block, Pool, UsdPrice
from y.exceptions import (
ContractNotVerified,
TokenNotFound,
UnsupportedNetwork,
call_reverted,
)
from y.interfaces.uniswap.quoterv3 import UNIV3_QUOTER_ABI
from y.networks import Network
from y.utils.events import ProcessedEvents
try:
from eth_abi.packed import encode_packed
except ImportError:
from eth_abi.packed import encode_abi_packed as encode_packed
# https://github.com/Uniswap/uniswap-v3-periphery/blob/main/deploys.md
UNISWAP_V3_FACTORY = "0x1F98431c8aD98523631AE4a59f267346ea31F984"
UNISWAP_V3_QUOTER = "0xb27308f9F90D607463bb33eA1BeBb41C27CE5AB6"
logger = logging.getLogger(__name__)
# same addresses on all networks
addresses = {
Network.Mainnet: {
"factory": UNISWAP_V3_FACTORY,
"quoter": UNISWAP_V3_QUOTER,
"fee_tiers": [3000, 500, 10_000, 100],
},
Network.Arbitrum: {
"factory": UNISWAP_V3_FACTORY,
"quoter": UNISWAP_V3_QUOTER,
"fee_tiers": [3000, 500, 10_000],
},
Network.Optimism: {
"factory": UNISWAP_V3_FACTORY,
"quoter": UNISWAP_V3_QUOTER,
"fee_tiers": [3000, 500, 10_000, 100],
},
Network.Base: {
"factory": "0x33128a8fC17869897dcE68Ed026d694621f6FDfD",
"quoter": "0x3d4e44Eb1374240CE5F1B871ab261CD16335B76a", # quoter v2
"fee_tiers": [3000, 500, 10_000, 100],
},
}
FEE_DENOMINATOR = 1_000_000
[docs]
class UniswapV3Pool(ContractBase):
"""Represents a Uniswap V3 Pool."""
__contains_cache__: Dict[Address, Dict[ChecksumAddress, bool]] = {}
__slots__ = "fee", "token0", "token1", "tick_spacing"
[docs]
def __init__(
self,
address: Address,
token0: Address,
token1: Address,
tick_spacing: int,
fee: int,
deploy_block: int,
asynchronous: bool = False,
) -> None:
"""
Initialize a UniswapV3Pool instance.
Args:
address: The address of the pool.
token0: The address of the first token in the pool.
token1: The address of the second token in the pool.
tick_spacing: The tick spacing of the pool.
fee: The fee of the pool.
deploy_block: The block number when the pool was deployed.
asynchronous: Whether to use asynchronous operations.
Examples:
>>> pool = UniswapV3Pool(
... address="0x1234567890abcdef1234567890abcdef12345678",
... token0="0xToken0Address",
... token1="0xToken1Address",
... tick_spacing=60,
... fee=3000,
... deploy_block=1234567,
... asynchronous=True
... )
"""
super().__init__(address, asynchronous=asynchronous)
self.token0 = ERC20(token0, asynchronous=asynchronous)
self.token1 = ERC20(token1, asynchronous=asynchronous)
self.tick_spacing = tick_spacing
self.fee = fee
self._deploy_block = deploy_block
[docs]
def __contains__(self, token: Address) -> bool:
"""
Check if a token is part of the pool.
Args:
token: The address of the token to check.
Returns:
True if the token is part of the pool, False otherwise.
Examples:
>>> pool = UniswapV3Pool(...)
>>> "0xToken0Address" in pool
True
"""
# force token to string in case it is Contract or EthAddress etc
token = str(token)
cache_for_token = self.__contains_cache__.get(token, {})
cache_value = cache_for_token.get(self.address)
if cache_value is None:
if not cache_for_token:
self.__contains_cache__[token] = {}
cache_value = token in (self.token0, self.token1)
self.__contains_cache__[token][self.address] = cache_value
return cache_value
[docs]
def __getitem__(self, token: Address) -> ERC20:
"""
Get the ERC20 token object for a given token address.
Args:
token: The address of the token.
Returns:
The ERC20 token object.
Raises:
TokenNotFound: If the token is not part of the pool.
Examples:
>>> pool = UniswapV3Pool(...)
>>> token = pool["0xToken0Address"]
"""
if token not in self:
raise TokenNotFound(token, self)
return ERC20(token, asynchronous=self.asynchronous)
[docs]
@a_sync.a_sync(
ram_cache_maxsize=100_000, ram_cache_ttl=60 * 60, semaphore=10000
) # lets try a semaphore here
async def check_liquidity(
self, token: AnyAddressType, block: Block
) -> Optional[int]:
"""
Check the liquidity of a token in the pool at a specific block.
Args:
token: The address of the token.
block: The block number to check liquidity at.
Returns:
The liquidity of the token in the pool, or None if not available.
Examples:
>>> pool = UniswapV3Pool(...)
>>> liquidity = await pool.check_liquidity("0xToken0Address", 1234567)
"""
logger.debug("checking %s liquidity for %s at %s", self, token, block)
if block < await self.deploy_block(sync=False):
logger.debug("block %s prior to %s deploy block", block, self)
return 0
try:
liquidity = await self[token].balance_of(self.address, block, sync=False)
except ContractNotVerified:
logger.debug(
"%s is not verified and we cannot fetch balance the usual way. returning 0.",
token,
)
return 0
logger.debug("%s liquidity for %s at %s: %s", self, token, block, liquidity)
return liquidity
[docs]
@a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60 * 60)
async def _check_liquidity_token_out(
self, token_in: AnyAddressType, block: Block
) -> Optional[int]:
"""
Check the liquidity of the token out for a given token in.
Args:
token_in: The address of the token in.
block: The block number to check liquidity at.
Returns:
The liquidity of the token out, or None if not available.
Examples:
>>> pool = UniswapV3Pool(...)
>>> liquidity = await pool._check_liquidity_token_out("0xToken0Address", 1234567)
"""
return await self.check_liquidity(
self._get_token_out(token_in), block=block, sync=False
)
@lru_cache
def _get_token_out(self, token_in: ERC20) -> ERC20:
"""
Get the token out for a given token in.
Args:
token_in: The ERC20 token object for the token in.
Returns:
The ERC20 token object for the token out.
Raises:
TokenNotFound: If the token in is not part of the pool.
Examples:
>>> pool = UniswapV3Pool(...)
>>> token_out = pool._get_token_out(token_in)
"""
if token_in == self.token0:
return self.token1
elif token_in == self.token1:
return self.token0
raise TokenNotFound(token_in, self)
[docs]
class UniswapV3(a_sync.ASyncGenericSingleton):
"""Represents the Uniswap V3 protocol."""
[docs]
def __init__(self, asynchronous: bool = True) -> None:
"""
Initialize a UniswapV3 instance.
Args:
asynchronous: Whether to use asynchronous operations.
Raises:
UnsupportedNetwork: If Uniswap V3 is not supported on the current network.
Examples:
>>> uniswap_v3 = UniswapV3(asynchronous=True)
"""
super().__init__()
self.asynchronous = asynchronous
if CHAINID not in addresses:
raise UnsupportedNetwork("Uniswap V3 is not supported on this network")
self.fee_tiers = addresses[CHAINID]["fee_tiers"]
self.loading = False
self._pools = {}
[docs]
def __contains__(self, asset) -> bool:
"""
Check if an asset is part of the Uniswap V3 protocol.
Args:
asset: The asset to check.
Returns:
True if the asset is part of the protocol, False otherwise.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> "0xAssetAddress" in uniswap_v3
True
"""
return CHAINID in addresses
@cached_property
def loaded(self) -> a_sync.Event:
"""
Get the loaded event for the Uniswap V3 instance.
Returns:
The loaded event.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> loaded_event = uniswap_v3.loaded
"""
return a_sync.Event(name=str(self))
[docs]
@a_sync.aka.property
async def factory(self) -> Contract:
"""
Get the factory contract for the Uniswap V3 protocol.
Returns:
The factory contract.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> factory_contract = await uniswap_v3.factory
"""
return await Contract.coroutine(addresses[CHAINID]["factory"])
__factory__: HiddenMethodDescriptor[Self, Contract]
[docs]
@a_sync.aka.cached_property
async def quoter(self) -> Contract:
"""
Get the quoter contract for the Uniswap V3 protocol.
Returns:
The quoter contract.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> quoter_contract = await uniswap_v3.quoter
"""
quoter = addresses[CHAINID]["quoter"]
try:
return await Contract.coroutine(quoter)
except ContractNotVerified:
return Contract.from_abi("Quoter", quoter, UNIV3_QUOTER_ABI)
__quoter__: HiddenMethodDescriptor[Self, Contract]
[docs]
@a_sync.aka.cached_property
@stuck_coro_debugger
async def pools(self) -> List[UniswapV3Pool]:
"""
Get the list of Uniswap V3 pools.
Returns:
A list of Uniswap V3 pools.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> pools = await uniswap_v3.pools
"""
factory = await self.__factory__
return UniV3Pools(factory, asynchronous=self.asynchronous)
__pools__: HiddenMethodDescriptor[Self, "UniV3Pools"]
[docs]
async def pools_for_token(
self, token: Address, block: Block
) -> AsyncIterator[UniswapV3Pool]:
"""
Get the pools for a specific token.
Args:
token: The address of the token.
block: The block number to get pools at.
Yields:
Uniswap V3 pools for the token.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> async for pool in uniswap_v3.pools_for_token("0xTokenAddress", 1234567):
... print(pool)
"""
pools = await self.__pools__
async for pool in pools.objects(to_block=block):
if token in pool:
yield pool
[docs]
@stuck_coro_debugger
@a_sync.a_sync(cache_type="memory", ram_cache_ttl=ENVS.CACHE_TTL)
async def get_price(
self,
token: Address,
block: Optional[Block] = None,
ignore_pools: Tuple[Pool, ...] = (), # unused
skip_cache: bool = ENVS.SKIP_CACHE, # unused
) -> Optional[UsdPrice]:
"""
Get the price of a token in USD.
Args:
token: The address of the token.
block: The block number to get the price at.
ignore_pools: Pools to ignore (unused).
skip_cache: Whether to skip cache (unused).
Returns:
The price of the token in USD, or None if not available.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> price = await uniswap_v3.get_price("0xTokenAddress", 1234567)
"""
quoter = await self.__quoter__
if block and block < await contract_creation_block_async(quoter, True):
return None
paths = [[token, fee, usdc.address] for fee in self.fee_tiers]
if token != weth:
paths += [
[token, fee, weth.address, self.fee_tiers[0], usdc.address]
for fee in self.fee_tiers
]
logger.debug("paths: %s", paths)
amount_in = await ERC20(token, asynchronous=True).scale
results = await asyncio.gather(
*(self._quote_exact_input(path, amount_in, block) for path in paths),
return_exceptions=True,
)
for result in results:
if isinstance(result, Exception) and not call_reverted(result):
raise result.with_traceback(result.__traceback__)
logger.debug("results: %s", results)
outputs = [
# Quoter v2 uses this weird return struct, we must unpack it to get amount out.
(amount if isinstance(amount, int) else amount[0]) / _undo_fees(path) / 1e6
for amount, path in zip(results, paths)
if amount and not call_reverted(amount)
]
logger.debug("outputs: %s", outputs)
return UsdPrice(max(outputs)) if outputs else None
[docs]
@stuck_coro_debugger
@a_sync.a_sync(ram_cache_maxsize=100_000, ram_cache_ttl=60 * 60)
async def check_liquidity(
self, token: Address, block: Block, ignore_pools: Tuple[Pool, ...] = ()
) -> int:
"""
Check the liquidity of a token in the Uniswap V3 protocol.
Args:
token: The address of the token.
block: The block number to check liquidity at.
ignore_pools: Pools to ignore.
Returns:
The liquidity of the token.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> liquidity = await uniswap_v3.check_liquidity("0xTokenAddress", 1234567)
"""
logger.debug("checking %s liquidity for %s at %s", self, token, block)
if (
CHAINID == Network.Mainnet
and token == "0x6DEA81C8171D0bA574754EF6F8b412F2Ed88c54D"
):
# LQTY, TODO refactor this somehow
return 0
quoter = await self.__quoter__
if block and block < await contract_creation_block_async(quoter):
logger.debug("block %s is before %s deploy block", block, quoter)
return 0
if token == weth.address:
# NOTE: we need to filter these or else we will be fetching every pool
# for now, we only focus on weth/usdc pools
filter_fn = (
lambda pool: pool._get_token_out(token) == usdc.address
and pool not in ignore_pools
)
else:
filter_fn = lambda pool: pool not in ignore_pools
token_out_tasks = UniswapV3Pool._check_liquidity_token_out.map(
token_in=token, block=block
)
pools_for_token: a_sync.ASyncIterator[UniswapV3Pool] = self.pools_for_token(
token, block
)
async for pool in pools_for_token.filter(filter_fn):
# the mapping will start the tasks internally
logger.debug("starting token_out_task for %s", pool)
token_out_tasks[pool]
if not token_out_tasks:
return 0
logger.debug(
"%s token_out_tasks for %s at %s: %s", self, token, block, token_out_tasks
)
# Since uni v3 liquidity can be provided asymmetrically, the most liquid pool in terms of `token` might not actually be the most liquid pool in terms of `token_out`
# We need some spaghetticode here to account for these erroneous liquidity values
# TODO: Refactor this
token_out_liquidity: DefaultDict[ERC20, List[int]] = defaultdict(list)
async for pool, liquidity in token_out_tasks.map(pop=False):
logger.debug("%s liquidity for %s at %s: %s", pool, token, block, liquidity)
token_out_liquidity[pool._get_token_out(token)].append(liquidity)
logger.debug("%s token_out_liquidity: %s", token, token_out_liquidity)
token_out_min_liquidity = {
token_out: min(liquidities)
for token_out, liquidities in token_out_liquidity.items()
}
token_in_tasks = UniswapV3Pool.check_liquidity.map(token=token, block=block)
async for pool, liquidity in token_out_tasks.map(pop=True):
token_out = pool._get_token_out(token)
if (
len(token_out_liquidity[token_out]) > 1
and liquidity == token_out_min_liquidity[token_out]
):
logger.debug("ignoring liquidity for %s", pool)
elif token_out == weth and liquidity < 10**19: # 10 ETH
# NOTE: this is totally arbitrary, works for all known cases but eventually will probably cause issues
logger.debug("insufficient liquidity for %s", pool)
else:
token_in_tasks[pool]
liquidity = (
await token_in_tasks.max(pop=True, sync=False) if token_in_tasks else 0
)
logger.debug("%s liquidity for %s at %s is %s", self, token, block, liquidity)
return liquidity
@stuck_coro_debugger
@eth_retry.auto_retry
async def _quote_exact_input(
self, path: List[list], amount_in: int, block: int
) -> int:
"""
Quote the exact input for a given path and amount.
Args:
path: The path for the swap.
amount_in: The input amount.
block: The block number to quote at.
Returns:
The quoted output amount.
Examples:
>>> uniswap_v3 = UniswapV3(...)
>>> output_amount = await uniswap_v3._quote_exact_input(path, 1000, 1234567)
"""
quoter = await self.__quoter__
return await quoter.quoteExactInput.coroutine(
_encode_path(path), amount_in, block_identifier=block
)
def _encode_path(path) -> bytes:
"""
Encode a path for Uniswap V3.
Args:
path: The path to encode.
Returns:
The encoded path.
Examples:
>>> path = ["0xToken0Address", 3000, "0xToken1Address"]
>>> encoded_path = _encode_path(path)
"""
types = [type for _, type in zip(path, cycle(("address", "uint24")))]
return encode_packed(types, path)
def _undo_fees(path) -> float:
"""
Undo the fees for a given path.
Args:
path: The path to undo fees for.
Returns:
The fee multiplier.
Examples:
>>> path = ["0xToken0Address", 3000, "0xToken1Address"]
>>> fee_multiplier = _undo_fees(path)
"""
fees = [1 - fee / FEE_DENOMINATOR for fee in path if isinstance(fee, int)]
return math.prod(fees)
[docs]
class UniV3Pools(ProcessedEvents[UniswapV3Pool]):
"""Represents a collection of Uniswap V3 Pools."""
__slots__ = ("asynchronous",)
[docs]
def __init__(self, factory: Contract, asynchronous: bool = False):
"""
Initialize a UniV3Pools instance.
Args:
factory: The factory contract for Uniswap V3.
asynchronous: Whether to use asynchronous operations.
Examples:
>>> factory_contract = Contract(...)
>>> pools = UniV3Pools(factory_contract, asynchronous=True)
"""
self.asynchronous = asynchronous
super().__init__(
addresses=[factory.address], topics=[factory.topics["PoolCreated"]]
)
def _process_event(self, event: _EventItem) -> UniswapV3Pool:
"""
Process a PoolCreated event and return a UniswapV3Pool instance.
Args:
event: The PoolCreated event.
Returns:
A UniswapV3Pool instance.
Examples:
>>> pools = UniV3Pools(...)
>>> pool = pools._process_event(event)
"""
token0, token1, fee, tick_spacing, pool = event.values()
return UniswapV3Pool(
pool,
token0,
token1,
fee,
tick_spacing,
event.block_number,
asynchronous=self.asynchronous,
)
def _get_block_for_obj(self, obj: UniswapV3Pool) -> int:
"""
Get the block number for a UniswapV3Pool object.
Args:
obj: The UniswapV3Pool object.
Returns:
The block number.
Examples:
>>> pools = UniV3Pools(...)
>>> block_number = pools._get_block_for_obj(pool)
"""
return obj._deploy_block
try:
uniswap_v3 = UniswapV3(asynchronous=True)
except UnsupportedNetwork:
uniswap_v3 = None