Source code for y.prices.lending.compound

import asyncio
import logging
from typing import Optional, Tuple

import a_sync
from a_sync.a_sync import HiddenMethodDescriptor
from brownie import chain, convert
from brownie.exceptions import VirtualMachineError
from multicall import Call
from typing_extensions import Self

from y import ENVIRONMENT_VARIABLES as ENVS
from y.classes.common import ERC20, ContractBase
from y.constants import EEE_ADDRESS
from y.contracts import Contract, has_methods
from y.datatypes import AddressOrContract, AnyAddressType, Block, UsdPrice
from y.exceptions import call_reverted
from y.networks import Network
from y.utils.logging import _gh_issue_request
from y.utils.raw_calls import raw_call

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)

TROLLERS = {
    Network.Mainnet: {
        "comp":             "0x3d9819210A31b4961b30EF54bE2aeD79B9c9Cd3B",
        "cream":            "0x3d5BC3c8d13dcB8bF317092d84783c2697AE9258",
        "ironbank":         "0xAB1c342C7bf5Ec5F02ADEA1c2270670bCa144CbB",
        "inverse":          "0x4dCf7407AE5C07f8681e1659f626E114A7667339",
        "unfederalreserve": "0x3105D328c66d8d55092358cF595d54608178E9B5",
        "flux":             "0x95Af143a021DF745bc78e845b54591C53a8B3A51",
    },
    Network.BinanceSmartChain: {
        "venus":            "0xfD36E2c2a6789Db23113685031d7F16329158384",
    },
    Network.Polygon: {
        "easyfi":           "0xcb3fA413B23b12E402Cfcd8FA120f983FB70d8E8",
        "apple":            "0x46220a07F071D1a821D68fA7C769BCcdA3C65430",
        "chumhum":          "0x1D43f6DA91e9EF6614dCe95bCef43E4d7b2bcFB5",
        "cream":            "0x20CA53E2395FA571798623F1cFBD11Fe2C114c24",
    },
    Network.Fantom: {
        "cream":            "0x4250A6D3BD57455d7C6821eECb6206F507576cD2",
        "scream":           "0x260E596DAbE3AFc463e75B6CC05d8c46aCAcFB09",
        "ola":              "0xD65eB596cFb5DE402678a12df651E0e588Dc3A81",
    },
    Network.Avalanche: {
        "vee":              "0xA67DFeD73025b0d61F2515c531dd8D25D4Cfd0Db",
        "vee2":             "0x43AAd7d8Bc661dfA70120865239529ED92Faa054",
        "vee3":             "0xeEf69Cab52480D2BD2D4A3f3E8F5CcfF2923f6eF",
        "cream":            "0x2eE80614Ccbc5e28654324a66A396458Fa5cD7Cc",
    },
    Network.Arbitrum: {
        "cream":            "0xbadaC56c9aca307079e8B8FC699987AAc89813ee",
        "neku":             "0xD5B649c7d27C13a2b80425daEe8Cb6023015Dc6B",
        "channels":         "0x3C13b172bf8BE5b873EB38553feC50F78c826284",
        "hund":             "0x0F390559F258eB8591C8e31Cf0905E97cf36ACE2",
    },
    Network.Optimism: {
        "ironbank":         "0xE0B57FEEd45e7D908f2d0DaCd26F113Cf26715BF",
    }
}.get(chain.id, {})


[docs] class CToken(ERC20):
[docs] def __init__(self, address: AnyAddressType, comptroller: Optional["Comptroller"] = None, asynchronous: bool = False) -> None: self.troller = comptroller super().__init__(address, asynchronous=asynchronous) self.exchange_rate_current = Call(self.address, 'exchangeRateCurrent()(uint)')
[docs] async def get_price(self, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE) -> UsdPrice: if self.troller: # We can use the protocol's oracle which will be quick (if it works) underlying_per_ctoken, underlying_price = await asyncio.gather( self.underlying_per_ctoken(block=block, asynchronous=True), self.get_underlying_price(block=block, asynchronous=True), ) if underlying_price: return UsdPrice(underlying_per_ctoken * underlying_price) # Or we can just price the underlying token ourselves underlying = await self.__underlying__ underlying_per_ctoken, underlying_price = await asyncio.gather( self.underlying_per_ctoken(block=block, asynchronous=True), underlying.price(block=block, skip_cache=skip_cache, asynchronous=True) ) return UsdPrice(underlying_per_ctoken * underlying_price)
[docs] @a_sync.aka.cached_property async def underlying(self) -> ERC20: # sourcery skip: use-or-for-fallback underlying = await self.has_method('underlying()(address)', return_response=True, sync=False) # this will run for gas coin markets like cETH, crETH if not underlying: underlying = EEE_ADDRESS return ERC20(underlying, asynchronous=self.asynchronous)
__underlying__: HiddenMethodDescriptor[Self, ERC20]
[docs] async def underlying_per_ctoken(self, block: Optional[Block] = None) -> float: exchange_rate, decimals, underlying = await asyncio.gather( self.exchange_rate(block=block, sync=False), self.__decimals__, self.__underlying__, ) return exchange_rate * 10 ** (decimals - await underlying.__decimals__(asynchronous=True))
#yLazyLogger(logger)
[docs] async def exchange_rate(self, block: Optional[Block] = None) -> float: try: exchange_rate = await self.exchange_rate_current.coroutine(block_id=block) except Exception as e: if not call_reverted(e): raise exchange_rate = None if exchange_rate is None: # NOTE: Sometimes this works, not sure why contract = await Contract.coroutine(self.address) try: exchange_rate = contract.exchangeRateCurrent.call(block_identifier=block) except Exception as e: if 'borrow rate is absurdly high' not in str(e): raise exchange_rate = 0 return exchange_rate / 1e18
[docs] async def get_underlying_price(self, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE) -> Optional[float]: # always query the oracle in case it was changed oracle, underlying = await asyncio.gather( self.troller.oracle(block, asynchronous=True), self.__underlying__(asynchronous=True), ) price, underlying_decimals = await asyncio.gather( oracle.getUnderlyingPrice.coroutine(self.address, block_identifier=block), underlying.__decimals__(asynchronous=True), return_exceptions=True, ) if isinstance(price, Exception): # TODO debug why this occurs and refactor. only found on arbitrum cream try: price = oracle.getUnderlyingPrice(self.address, block_identifier=block) except VirtualMachineError as e: if str(e) in { "revert: grace period not over", "revert: Chainlink feeds are not being updated", }: return None raise price /= 10 ** (36 - underlying_decimals) return price
[docs] class Comptroller(ContractBase):
[docs] def __init__(self, address: Optional[AnyAddressType] = None, key: Optional[str] = None, asynchronous: bool = False) -> None: assert address or key, 'Must provide either an address or a key' assert not (address and key), 'Must provide either an address or a key, not both' if key: address = TROLLERS[key] else: key = [key for key in TROLLERS if address == TROLLERS[key]][0] self.address = convert.to_address(address) self.key = key self.asynchronous = asynchronous
def __repr__(self) -> str: return f"<Comptroller {self.key} '{self.address}'>" #yLazyLogger(logger)
[docs] def __contains__(self, token_address: AnyAddressType) -> bool: if self.asynchronous: raise RuntimeError("'self.asynchronous' must be False to use Comptroller.__contains__") return token_address in self.markets
[docs] @a_sync.aka.cached_property async def markets(self) -> Tuple[CToken]: response = await self.has_method("getAllMarkets()(address[])", return_response=True, sync=False) if not response: logger.warning('had trouble loading markets for %s', self) response = set() markets = tuple(CToken(market, comptroller=self, asynchronous=self.asynchronous) for market in response) logger.info("loaded %s markets for %s", len(markets), self) return markets
__markets__ = HiddenMethodDescriptor[Self, Tuple[CToken]]
[docs] async def oracle(self, block: Optional[Block] = None) -> Contract: contract = await Contract.coroutine(self.address) try: oracle = await contract.oracle.coroutine(block_identifier=block) except Exception as e: # TODO debug why this occurs and refactor. only found on arbitrum cream if not call_reverted(e): raise oracle = contract.oracle(block_identifier=block) return await Contract.coroutine(oracle)
[docs] class Compound(a_sync.ASyncGenericSingleton):
[docs] def __init__(self, asynchronous: bool = False) -> None: self.asynchronous = asynchronous self.trollers = { protocol: Comptroller(troller, asynchronous=self.asynchronous) for protocol, troller in TROLLERS.items() }
[docs] def __contains__(self, token_address: AddressOrContract) -> bool: if self.asynchronous: raise RuntimeError("'self.asynchronous' must be False and the event loop must not be running") return self.is_compound_market(token_address)
[docs] async def get_troller(self, token_address: AddressOrContract) -> Optional[Comptroller]: if self.trollers: async for troller, markets in Comptroller.markets.map(self.trollers.values()): if token_address in markets: return troller
[docs] @a_sync.a_sync(ram_cache_ttl=5*60) async def is_compound_market(self, token_address: AddressOrContract) -> bool: if await self.get_troller(token_address, sync=False): return True # NOTE: Workaround for pools that have since been revoked result = await has_methods(token_address, ('isCToken()(bool)','comptroller()(address)','underlying()(address)'), sync=False) if result is True: await self.__notify_if_unknown_comptroller(token_address) return result
[docs] async def get_price(self, token_address: AnyAddressType, block: Optional[Block] = None, skip_cache: bool = ENVS.SKIP_CACHE) -> Optional[UsdPrice]: troller = await self.get_troller(token_address) return await CToken(token_address, comptroller=troller, asynchronous=True).get_price(block=block, skip_cache=skip_cache)
async def __notify_if_unknown_comptroller(self, token_address: AddressOrContract) -> None: comptroller = await raw_call(token_address,'comptroller()',output='address', sync=False) if comptroller not in self.trollers.values(): _gh_issue_request(f'Comptroller {comptroller} is unknown to ypricemagic.', logger)
compound: Compound = Compound(asynchronous=True)