# sourcery skip: merge-assign-and-aug-assign
import asyncio
import logging
import os
from collections import defaultdict
from http import HTTPStatus
from random import randint
from time import time
from typing import Any, Callable, Dict, Optional
import dank_mids
from aiohttp import (BasicAuth, ClientResponse, ClientSession, ClientTimeout,
TCPConnector)
from aiohttp.client_exceptions import (ClientConnectorSSLError, ClientError,
ContentTypeError)
from async_lru import alru_cache
from brownie import chain
from y import ENVIRONMENT_VARIABLES as ENVS
from y.classes.common import UsdPrice
from y.datatypes import Address, Block
from y.networks import Network
logger = logging.getLogger(__name__)
# current
header_env_names = {"X-Signer": "YPRICEAPI_SIGNER", "X-Signature": "YPRICEAPI_SIGNATURE"}
AUTH_HEADERS = {header: os.environ.get(env) for header, env in header_env_names.items()}
AUTH_HEADERS_PRESENT = all(AUTH_HEADERS.values())
# old
YPRICEAPI_USER = os.environ.get("YPRICEAPI_USER")
YPRICEAPI_PASS = os.environ.get("YPRICEAPI_PASS")
OLD_AUTH = BasicAuth(YPRICEAPI_USER, YPRICEAPI_PASS) if YPRICEAPI_USER and YPRICEAPI_PASS else None
ONE_MINUTE = 60 # some arbitrary amount of time in case the header is missing on unexpected 5xx responses
ONE_HOUR = ONE_MINUTE * 60
FALLBACK_STR = "Falling back to your node for pricing."
YPRICEAPI_TIMEOUT = ClientTimeout(int(os.environ.get("YPRICEAPI_TIMEOUT", 5 * ONE_MINUTE))) # Five minutes is the default timeout from aiohttp.
YPRICEAPI_SEMAPHORE = dank_mids.BlockSemaphore(int(os.environ.get("YPRICEAPI_SEMAPHORE", 100)))
if any(AUTH_HEADERS.values()) and not AUTH_HEADERS_PRESENT:
for header in AUTH_HEADERS:
if not AUTH_HEADERS[header]:
raise EnvironmentError(f'You must also pass in a value for {header_env_names[header]} in order to use ypriceAPI.')
should_use = not ENVS.SKIP_YPRICEAPI
notified = set()
auth_notifs = defaultdict(int)
resume_at = 0
get_retry_header: Callable[[ClientResponse], int] = lambda x: int(x.headers.get("Retry-After", ONE_MINUTE))
# NOTE: if you want to bypass ypriceapi for specific tokens, have your program add the addresses to this set.
skip_tokens = set()
skip_ypriceapi = skip_tokens # alias for backward compatability
#########################
# YPRICEAPI PUBLIC BETA #
#########################
_you_get = [
"access to your desired price data more quickly...",
"...from nodes run by yearn-affiliated big brains...",
"...on all the networks Yearn supports."
]
_testimonials = [
"I can now get prices for all of my useless shitcoins without waiting all day for ypricemagic to load logs.",
"I don't need to maintain an archive node anymore and that's saving me money.",
"Wow, so fast!",
]
beta_announcement = "ypriceAPI is now in beta!\n\n"
beta_announcement += "Head to ypriceapi-beta.yearn.finance and sign up for access. You get:\n"
for you_get in _you_get:
beta_announcement += f" - {you_get}\n"
beta_announcement += "\nCheck out some testimonials from our close frens:\n"
for testimonial in _testimonials:
beta_announcement += f' - from anon{randint(0, 9999)}, "{testimonial}"\n'
[docs]
def announce_beta() -> None:
spam_your_logs_fn = logger.info if logger.isEnabledFor(logging.INFO) else print
spam_your_logs_fn(beta_announcement)
global should_use
should_use = False
# TODO: Remove this when enough time has passed.
# Notify user if using old auth scheme
if OLD_AUTH is not None:
announce_beta()
raise NotImplementedError(
"YPRICEAPI_USER and YPRICEAPI_PASS are no longer used.\n"
+ "Please sign up for a plan (we have a free tier) at ypriceapi-beta.yearn.finance.\n"
+ "Then, pass in the following env vars to continue using ypriceAPI:\n"
+ " - YPRICEAPI_SIGNATURE, the signature you generated on the website"
+ " - YPRICEAPI_SIGNER, the wallet you used to sign up\n"
+ "You can unset the old envs to continue using ypricemagic."
)
[docs]
class BadResponse(Exception):
pass
[docs]
@alru_cache(maxsize=1)
async def get_session() -> ClientSession:
return ClientSession(
os.environ.get("YPRICEAPI_URL", "https://ypriceapi-beta.yearn.finance"),
connector=TCPConnector(verify_ssl=False),
headers=AUTH_HEADERS,
timeout=YPRICEAPI_TIMEOUT,
)
[docs]
@alru_cache(ttl=ONE_HOUR)
async def get_chains() -> Dict[int, str]:
session = await get_session()
async with session.get("/chains") as response:
chains = await read_response(response) or {}
return {int(k): v for k, v in chains.items()}
[docs]
@alru_cache(ttl=ONE_HOUR)
async def chain_supported(chainid: int) -> bool:
if chainid in await get_chains():
return True
logger.info('ypriceAPI does not support %s at this time.', Network.name())
return False
[docs]
async def get_price(
token: Address,
block: Optional[Block]
) -> Optional[UsdPrice]:
if not AUTH_HEADERS_PRESENT:
announce_beta()
return None
if time() < resume_at:
# NOTE: The reason we are here has already been logged.
return None
if block is None:
block = await dank_mids.eth.block_number
async with YPRICEAPI_SEMAPHORE[block]:
try:
tries = 0
while True:
try:
if not await chain_supported(chain.id):
return None
session = await get_session()
async with session.get(f'/get_price/{chain.id}/{token}?block={block}') as response:
return UsdPrice(price) if (price := await read_response(response, token, block)) else None
except ClientConnectorSSLError as e:
if "[[SSL: TLSV1_ALERT_INTERNAL_ERROR] tlsv1 alert internal error (_ssl.c:1129)]" not in str(e) or tries >= 50:
raise
tries += 1
except asyncio.TimeoutError:
logger.warning(f'ypriceAPI timed out for {token} at {block}.{FALLBACK_STR}')
except ContentTypeError:
raise
except ClientError as e:
logger.warning(f'ypriceAPI {e.__class__.__name__} for {token} at {block}.{FALLBACK_STR}')
[docs]
async def read_response(response: ClientResponse, token: Optional[Address] = None, block: Optional[Block] = None) -> Optional[Any]:
# 200
if response.status == HTTPStatus.OK:
try:
return await response.json()
except ContentTypeError as e:
msg = await response.json(content_type=None)
if msg:
raise BadResponse(msg) from e
logger.warning(f'ypriceAPI returned status code {_get_err_reason(response)} with response None. Must investigate.')
# 401
elif response.status == HTTPStatus.UNAUTHORIZED:
if HTTPStatus.UNAUTHORIZED not in notified:
logger.error(f'Your provided ypriceAPI credentials are not authorized for use.{FALLBACK_STR}')
notified.add(HTTPStatus.UNAUTHORIZED)
# 404
elif response.status == HTTPStatus.NOT_FOUND and token and block:
logger.debug("Failed to get price from API: %s at %s", token, block)
# Server Errors
# 502 & 503
elif response.status in {HTTPStatus.BAD_GATEWAY, HTTPStatus.SERVICE_UNAVAILABLE}:
logger.warning("ypriceAPI returned status code %s", _get_err_reason(response))
try:
msg = await response.json(content_type=None) or await response.text()
except Exception:
logger.warning('exception decoding ypriceapi %s response.%s', response.status, FALLBACK_STR, exc_info=True)
msg = ''
if msg:
logger.warning(msg)
_set_resume_at(get_retry_header(response))
else:
msg = f'ypriceAPI returned status code {_get_err_reason(response)}'
if token and block:
msg += f' for {token} at {block}.{FALLBACK_STR}'
logger.warning(msg)
def _get_err_reason(response: ClientResponse) -> str:
if response.reason is None:
return f"[{response.status}]"
ascii_encodable_reason = response.reason.encode(
"ascii", "backslashreplace"
).decode("ascii")
return f"[{response.status} {ascii_encodable_reason}]"
def _set_resume_at(retry_after: float) -> None:
global resume_at
logger.info("Falling back to your node for %s minutes.", int(retry_after/60))
resume_from_this_err_at = time() + retry_after
if resume_from_this_err_at > resume_at:
resume_at = resume_from_this_err_at