Retry Observer
Overview
Dank Mids exposes a retry observer API so applications can record retry decisions
into logs or metrics backends. Observers are callables that accept a
dank_mids.retry_observer.RetryEvent.
Note
Internal retry logic emits events for built-in retry paths. You can still
emit from your own wrappers using dank_mids.retry_observer.emit_retry_event().
Internal emit points
Retry events are emitted in the following internal locations:
RPC request timeouts and 408 responses (
RPCRequest.make_request)Multicall bisect retries (
Multicall.bisect_and_retry)JSON-RPC batch bisect retries (
JSONRPCBatch.bisect_and_retry)HTTP 429 and retryable status codes (
DankClientSession.post)
Attempt semantics
attempt is 1-based. For batch retries, the attempt number increments each
time a batch is bisected and retried. For will_retry=False events, the
attempt number reflects the retry decision that would have occurred next.
RetryEvent
Fields:
operation(str): Human-readable identifier for the retried operation.attempt(int): 1-based retry attempt number.error(BaseException): Exception that triggered the retry.max_attempts(int | None): Optional configured limit.delay(float | None): Optional delay before next attempt.component(str | None): Optional subsystem name.will_retry(bool): Whether the caller intends to retry.metadata(dict[str, str] | None): Optional key/value tags.timestamp(float): Unix timestamp when the event was created.
Registration
Register observers with dank_mids.retry_observer.register_retry_observer() and
remove them with dank_mids.retry_observer.unregister_retry_observer().
from dank_mids import RetryEvent, emit_retry_event, register_retry_observer
def log_retry(event: RetryEvent) -> None:
print(
f"retrying {event.operation} attempt={event.attempt} "
f"delay={event.delay} error={event.error}"
)
register_retry_observer(log_retry)
emit_retry_event(
RetryEvent(
operation="eth_call",
attempt=1,
delay=0.5,
error=RuntimeError("rate limited"),
component="jsonrpc",
)
)
Usage Examples
Structured logging
import logging
from dank_mids import RetryEvent, register_retry_observer
logger = logging.getLogger("dank_mids.retry")
def log_retry(event: RetryEvent) -> None:
logger.warning(
"retrying %s attempt=%s delay=%s error=%s",
event.operation,
event.attempt,
event.delay,
event.error,
)
register_retry_observer(log_retry)
Prometheus counters
from prometheus_client import Counter
from dank_mids import RetryEvent, register_retry_observer
retry_total = Counter(
"dank_mids_retry_total",
"Retry events emitted by Dank Mids",
["operation", "error_type"],
)
def record_retry(event: RetryEvent) -> None:
retry_total.labels(event.operation, type(event.error).__name__).inc()
register_retry_observer(record_retry)
Sentry + stats collector
from dank_mids import StatsRetryObserver, register_retry_observer
from dank_mids import stats
observer = StatsRetryObserver()
register_retry_observer(observer)
# When you want to push metrics to Sentry.
stats.sentry.push_measurements()
# New metrics include: retry_total, retry_error_types