Source code for eth_portfolio.protocols.lending.maker


import asyncio
from typing import Optional

from async_lru import alru_cache
from brownie.convert.datatypes import HexString
from y import Network, get_price
from y._decorators import stuck_coro_debugger
from y.constants import dai
from y.contracts import Contract
from y.datatypes import Address, Block

from eth_portfolio.protocols.lending._base import \
    LendingProtocolWithLockedCollateral
from eth_portfolio._utils import Decimal
from eth_portfolio.typing import Balance, TokenBalances

try:
    # this is only available in 4.0.0+
    from eth_abi import encode
    encode_bytes = lambda bytestring: encode(['bytes32'], [bytestring])
except ImportError:
    from eth_abi import encode_single
    encode_bytes = lambda bytestring: encode_single('bytes32', bytestring)
    
yfi = "0x0bc529c00C6401aEF6D220BE8C6Ea1667F6Ad93e"

[docs] class Maker(LendingProtocolWithLockedCollateral): networks = [Network.Mainnet]
[docs] def __init__(self) -> None: self.proxy_registry = Contract('0x4678f0a6958e4D2Bc4F1BAF7Bc52E8F3564f3fE4') self.cdp_manager = Contract('0x5ef30b9986345249bc32d8928B7ee64DE9435E39') self.vat = Contract('0x35D1b3F3D7966A1DFe207aa4514C12a259A0492B')
[docs] @stuck_coro_debugger async def _balances(self, address: Address, block: Optional[Block] = None) -> TokenBalances: balances: TokenBalances = TokenBalances() ilk = encode_bytes(b'YFI-A') urn = await self._urn(address) ink = (await self.vat.urns.coroutine(ilk, urn, block_identifier=block)).dict()["ink"] if ink: balance = ink / Decimal(10**18) value = round(balance * Decimal(await get_price(yfi, block, sync=False)), 18) balances[yfi] = Balance(balance, value) return balances
[docs] @stuck_coro_debugger async def _debt(self, address: Address, block: Optional[int] = None) -> TokenBalances: ilk = encode_bytes(b'YFI-A') urn = await self._urn(address) urns, ilks = await asyncio.gather( self.vat.urns.coroutine(ilk, urn, block_identifier=block), self.vat.ilks.coroutine(ilk, block_identifier=block), ) art = urns.dict()["art"] rate = ilks.dict()["rate"] debt = art * rate / Decimal(1e45) balances: TokenBalances = TokenBalances() balances[dai] += Balance(debt, debt) return balances
[docs] @alru_cache async def _proxy(self, address: Address) -> Address: return await self.proxy_registry.proxies.coroutine(address)
[docs] @alru_cache async def _cdp(self, address: Address) -> Address: proxy = await self._proxy(address) return await self.cdp_manager.first.coroutine(proxy)
[docs] @alru_cache async def _urn(self, address: Address) -> Address: cdp = await self._cdp(address) return await self.cdp_manager.urns.coroutine(cdp)