import asyncio
import logging
from abc import abstractmethod
from decimal import Decimal
from typing import Awaitable, List, Optional, Union
import a_sync
from a_sync.a_sync import HiddenMethodDescriptor
from brownie import chain
from multicall import Call
from typing_extensions import Self
from web3.exceptions import ContractLogicError
from y import convert
from y.classes.common import ERC20, ContractBase
from y.contracts import Contract
from y.datatypes import Address, AddressOrContract, AnyAddressType, Block, UsdPrice
from y.networks import Network
from y.utils import hasall
from y.utils.logging import get_price_logger
from y.utils.raw_calls import raw_call
logger = logging.getLogger(__name__)
v1_pools = {
Network.Mainnet: ("0x398eC7346DcD622eDc5ae82352F02bE94C62d119",),
}.get(, ())
v2_pools = {
Network.Mainnet: (
"0x7d2768dE32b0b80b7a3454c06BdAc94A69DDc7A9", # aave
"0x7937D4799803FbBe595ed57278Bc4cA21f3bFfCB", # aave amm
"0xcE744a9BAf573167B2CF138114BA32ed7De274Fa", # umee
Network.Polygon: ("0x8dFf5E27EA6b7AC08EbFdf9eB090F32ee9a30fcf",), # aave
Network.Avalanche: ("0x70BbE4A294878a14CB3CDD9315f5EB490e346163",), # blizz
}.get(, [])
v3_pools = {
Network.Mainnet: ("0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2",), # aave v3
Network.Optimism: ("0x794a61358D6845594F94dc1DB02A252b5b4814aD",), # aave v3
Network.Arbitrum: ("0x794a61358D6845594F94dc1DB02A252b5b4814aD",), # aave v3
Network.Harmony: ("0x794a61358D6845594F94dc1DB02A252b5b4814aD",), # aave v3
Network.Arbitrum: ("0x794a61358D6845594F94dc1DB02A252b5b4814aD",), # aave v3
Network.Fantom: ("0x794a61358D6845594F94dc1DB02A252b5b4814aD",), # aave v3
Network.Avalanche: ("0x794a61358D6845594F94dc1DB02A252b5b4814aD",), # aave v3
Network.Polygon: ("0x794a61358D6845594F94dc1DB02A252b5b4814aD",), # aave v3
}.get(, ())
class AaveMarketBase(ContractBase):
Base class for Aave markets.
This class provides common functionality for Aave markets, including methods
to check if a token is an aToken from the market and to retrieve reserve data.
See Also:
- :class:`AaveMarketV1`
- :class:`AaveMarketV2`
- :class:`AaveMarketV3`
def __contains__(self, token: object) -> bool:
Check if `token` is an aToken from this market.
This method is intended for synchronous use. If `self.asynchronous` is not `False`,
a `RuntimeError` will be raised.
token: The item to check.
True if the token is an aToken from the market, False otherwise.
RuntimeError: If `self.asynchronous` is not `False`.
>>> market = AaveMarketV1("0xAddress")
>>> token = "0xTokenAddress"
>>> token in market
if not self.asynchronous:
cls = self.__class__.__name__
raise RuntimeError(
f"'self.asynchronous' must be False to use {cls}.__contains__.\nYou may wish to use {cls}.is_atoken instead."
return convert.to_address(token) in self.atokens
async def contains(self, token: object) -> bool:
Check if `token` is an aToken from this market.
This method is intended for asynchronous use.
token: The item to check.
True if the token is an aToken from the market, False otherwise.
>>> market = AaveMarketV1("0xAddress", asynchronous=True)
>>> token = "0xTokenAddress"
>>> await market.contains(token)
contains = await convert.to_address_async(token) in await self.__atokens__
logger.debug("%s contains %s: %s", self, token, contains)
return contains
async def get_reserves(self) -> List[Address]:
return await Call(self.address, [self._get_reserves_method])
async def get_reserve_data(self, reserve: AnyAddressType) -> tuple:
return await self.contract.getReserveData.coroutine(reserve)
async def atokens(self) -> Awaitable[List[ERC20]]:
Get the aTokens of the market.
This is an abstract property and must be implemented by subclasses.
A list of aTokens as :class:`~ERC20` objects.
>>> market = AaveMarketV1("0xAddress", asynchronous=True)
>>> atokens = await market.atokens
>>> print(atokens)
[<ERC20 '0xTokenAddress1'>, <ERC20 '0xTokenAddress2'>]
__atokens__: HiddenMethodDescriptor[Self, List[ERC20]]
async def underlying(self, atoken_address: AddressOrContract) -> ERC20:
Get the underlying asset of the given aToken address.
This is an abstract method and must be implemented by subclasses.
atoken_address: The address of the aToken.
The underlying asset.
>>> market = AaveMarketV1("0xAddress", asynchronous=True)
>>> underlying_asset = await market.underlying("0xATokenAddress")
>>> print(underlying_asset)
<ERC20 '0xUnderlyingAssetAddress'>
def _get_reserves_method(self) -> str:
The method that must be called to get the reserves list.
This is an abstract property and must be implemented by subclasses.
>>> market = AaveMarketV1("0xAddress")
>>> print(market._get_reserves_method)
class AaveMarketV1(AaveMarketBase):
async def atokens(self) -> List[ERC20]:
reserves_data =, self.get_reserves(sync=False))
atokens = [
ERC20(reserve_data["aTokenAddress"], asynchronous=self.asynchronous)
async for _, reserve_data in reserves_data
]"loaded %s v1 atokens for %s", len(atokens), self)
return atokens
async def underlying(self, atoken_address: AddressOrContract) -> ERC20:
underlying = await raw_call(
atoken_address, "underlyingAssetAddress()", output="address", sync=False
return ERC20(underlying)
_get_reserves_method = "getReserves()(address[])"
_V2_RESERVE_DATA_METHOD = "getReserveData(address)((uint256,uint128,uint128,uint128,uint128,uint128,uint40,address,address,address,address,uint8))"
class AaveMarketV2(AaveMarketBase):
async def atokens(self) -> List[ERC20]:
reserves_data =, self.get_reserves(sync=False))
atokens = [
ERC20(reserve_data[7], asynchronous=self.asynchronous)
async for _, reserve_data in reserves_data
]"loaded %s v2 atokens for %s", len(atokens), self)
return atokens
except (
) as e: # TODO figure out what to do about non verified aave markets
logger.warning("failed to load tokens for %s", self)
return []
async def get_reserve_data(self, reserve: AnyAddressType) -> tuple:
return await Call(self.address, [_V2_RESERVE_DATA_METHOD, str(reserve)])
async def underlying(self, atoken_address: AddressOrContract) -> ERC20:
underlying = await raw_call(
atoken_address, "UNDERLYING_ASSET_ADDRESS()", output="address", sync=False
logger.debug("underlying: %s", underlying)
return ERC20(underlying, asynchronous=self.asynchronous)
_get_reserves_method = "getReservesList()(address[])"
class AaveMarketV3(AaveMarketBase):
async def atokens(self) -> List[ERC20]:
reserves_data =, self.get_reserves(sync=False))
atokens = [
ERC20(reserve_data[8], asynchronous=self.asynchronous)
async for _, reserve_data in reserves_data
]"loaded %s v3 atokens for %s", len(atokens), self)
return atokens
except (
) as e: # TODO figure out what to do about non verified aave markets
logger.warning("failed to load tokens for %s", self)
return []
async def underlying(self, atoken_address: AddressOrContract) -> ERC20:
underlying = await raw_call(
atoken_address, "UNDERLYING_ASSET_ADDRESS()", output="address", sync=False
logger.debug("underlying: %s", underlying)
return ERC20(underlying, asynchronous=self.asynchronous)
_get_reserves_method = "getReservesList()(address[])"
AaveMarket = Union[AaveMarketV1, AaveMarketV2, AaveMarketV3]
class AaveRegistry(a_sync.ASyncGenericSingleton):
def __init__(self, *, asynchronous: bool = False) -> None:
self.asynchronous = asynchronous
async def pools(self) -> List[AaveMarket]:
v1, v2, v3 = await asyncio.gather(
return v1 + v2 + v3
__pools__: HiddenMethodDescriptor[Self, List[AaveMarket]]
async def pools_v1(self) -> List[AaveMarketV1]:
pools = [
AaveMarketV1(pool, asynchronous=self.asynchronous) for pool in v1_pools
logger.debug("%s v1 pools %s", self, pools)
return pools
__pools_v1__: HiddenMethodDescriptor[Self, List[AaveMarketV1]]
async def pools_v2(self) -> List[AaveMarketV2]:
pools = [
AaveMarketV2(pool, asynchronous=self.asynchronous) for pool in v2_pools
logger.debug("%s v2 pools %s", self, pools)
return pools
__pools_v2__: HiddenMethodDescriptor[Self, List[AaveMarketV2]]
async def pools_v3(self) -> List[AaveMarketV3]:
pools = [
AaveMarketV3(pool, asynchronous=self.asynchronous) for pool in v3_pools
logger.debug("%s v3 pools %s", self, pools)
return pools
__pools_v3__: HiddenMethodDescriptor[Self, List[AaveMarketV3]]
async def pool_for_atoken(
self, atoken_address: AnyAddressType
) -> Optional[Union[AaveMarketV1, AaveMarketV2, AaveMarketV3]]:
pools = await self.__pools__
for pool in pools:
if await pool.contains(atoken_address, sync=False):
return pool
def __contains__(self, __o: object) -> bool:
if self.asynchronous:
raise RuntimeError(
f"'self.asynchronous' must be False to use AaveRegistry.__contains__.\nYou may wish to use AaveRegistry.is_atoken instead."
return any(__o in pool for pool in self.pools)
async def is_atoken(self, atoken_address: AnyAddressType) -> bool:
logger = get_price_logger(atoken_address, block=None, extra="aave")
is_atoken = any(
await asyncio.gather(
pool.contains(atoken_address, sync=False)
for pool in await self.__pools__
logger.debug("is_atoken: %s", is_atoken)
return is_atoken
async def is_wrapped_atoken_v2(self, atoken_address: AnyAddressType) -> bool:
# NOTE: Not sure if this wrapped version is actually related to aave but this works for pricing purposes.
contract = await Contract.coroutine(atoken_address, require_success=False)
return contract.verified and hasall(contract, _WRAPPED_V2_METHODS)
async def is_wrapped_atoken_v3(self, atoken_address: AnyAddressType) -> bool:
# NOTE: Not sure if this wrapped version is actually related to aave but this works for pricing purposes.
contract = await Contract.coroutine(atoken_address, require_success=False)
return contract.verified and hasall(contract, _WRAPPED_V3_METHODS)
async def underlying(self, atoken_address: AddressOrContract) -> ERC20:
pool: Union[AaveMarketV1, AaveMarketV2, AaveMarketV3] = (
await self.pool_for_atoken(atoken_address, sync=False)
return await pool.underlying(atoken_address, sync=False)
async def get_price(
atoken_address: AddressOrContract,
block: Optional[Block] = None,
skip_cache: bool = ENVS.SKIP_CACHE,
) -> UsdPrice:
underlying: ERC20 = await self.underlying(atoken_address, sync=False)
return await underlying.price(block, skip_cache=skip_cache, sync=False)
async def get_price_wrapped_v2(
atoken_address: AddressOrContract,
block: Optional[Block] = None,
skip_cache: bool = ENVS.SKIP_CACHE,
) -> Optional[UsdPrice]:
return await self._get_price_wrapped(
atoken_address, "staticToDynamicAmount", block=block, skip_cache=skip_cache
async def get_price_wrapped_v3(
atoken_address: AddressOrContract,
block: Optional[Block] = None,
skip_cache: bool = ENVS.SKIP_CACHE,
) -> Optional[UsdPrice]:
return await self._get_price_wrapped(
atoken_address, "convertToAssets", block=block, skip_cache=skip_cache
async def _get_price_wrapped(
atoken_address: AddressOrContract,
method: str,
block: Optional[Block] = None,
skip_cache: bool = ENVS.SKIP_CACHE,
) -> Optional[UsdPrice]:
contract, scale = await asyncio.gather(
ERC20(atoken_address, asynchronous=True).scale,
underlying, price_per_share = await asyncio.gather(
), # NOTE: We can probably cache this without breaking anything
getattr(contract, method).coroutine(scale, block_identifier=block),
except ContractLogicError:
return None
price_per_share /= Decimal(scale)
return price_per_share * Decimal(
await ERC20(underlying, asynchronous=True).price(
block, skip_cache=skip_cache
aave: AaveRegistry = AaveRegistry(asynchronous=True)