dank_mids package

Subpackages

Submodules

dank_mids.ENVIRONMENT_VARIABLES module

dank_mids.ENVIRONMENT_VARIABLES.AIOHTTP_TIMEOUT: Final = EnvironmentVariable[int](name=`DANKMIDS_AIOHTTP_TIMEOUT`, default_value=1200, using_default=True)

Timeout value in seconds for aiohttp requests.

dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_CALL_SEMAPHORE: Final = EnvironmentVariable[BlockSemaphore](name=`DANKMIDS_BROWNIE_CALL_SEMAPHORE`, default_value=100000, using_default=True)

Semaphore for limiting concurrent Brownie calls.

See also

dank_mids.semaphores.BlockSemaphore: The semaphore class used for concurrency control.

dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_DECODER_PROCESSES = EnvironmentVariable[AsyncProcessPoolExecutor](name=`DANKMIDS_BROWNIE_DECODER_PROCESSES`, default_value=0, using_default=True)

Process pool for Brownie decoding operations.

See also

a_sync.AsyncProcessPoolExecutor: The executor class used for managing asynchronous processes.

dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_ENCODER_PROCESSES = EnvironmentVariable[AsyncProcessPoolExecutor](name=`DANKMIDS_BROWNIE_ENCODER_PROCESSES`, default_value=0, using_default=True)

Process pool for Brownie encoding operations.

See also

a_sync.AsyncProcessPoolExecutor: The executor class used for managing asynchronous processes.

dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_ENCODER_SEMAPHORE: Final = EnvironmentVariable[BlockSemaphore](name=`DANKMIDS_BROWNIE_ENCODER_SEMAPHORE`, default_value=200000, using_default=True)

Semaphore for limiting concurrent Brownie encoding operations. This limits memory consumption.

See also

dank_mids.semaphores.BlockSemaphore: The semaphore class used for concurrency control.

dank_mids.ENVIRONMENT_VARIABLES.COLLECTION_FACTOR: Final = EnvironmentVariable[int](name=`DANKMIDS_COLLECTION_FACTOR`, default_value=10, using_default=True)

Factor determining the size of data collection operations.

dank_mids.ENVIRONMENT_VARIABLES.GANACHE_FORK: Final = EnvironmentVariable[bool](name=`DANKMIDS_GANACHE_FORK`, default_value=False, using_default=True)

Flag indicating whether the current environment is a Ganache fork.

dank_mids.ENVIRONMENT_VARIABLES.USE_FULL_REQUEST: Final = EnvironmentVariable[bool](name=`DANKMIDS_USE_FULL_REQUEST`, default_value=False, using_default=True)

Flag indicating whether to use the full request specification.

dank_mids.constants module

dank_mids.constants.GAS_LIMIT: Final = 50000000

The gas limit constant imported from the multicall library.

This value is used as the default gas limit for multicall operations.

dank_mids.constants.MULTICALL2_DEPLOY_BLOCKS: Final[dict[Network, BlockNumber]] = {Network.Mainnet: 12336033, Network.Optimism: 722566, Network.Fantom: 16572242, Network.Arbitrum: 821923}

A dictionary mapping networks to the block numbers where Multicall2 was deployed.

dank_mids.constants.MULTICALL2_OVERRIDE_CODE: Final = '0x608060405234801561001057600080fd5b50600436106100b45760003560e01c806372425d9d1161007157806372425d9d1461013d57806386d516e814610145578063a8b0574e1461014d578063bce38bd714610162578063c3077fa914610182578063ee82ac5e14610195576100b4565b80630f28c97d146100b9578063252dba42146100d757806327e86d6e146100f8578063399542e91461010057806342cbb15c146101225780634d2301cc1461012a575b600080fd5b6100c16101a8565b6040516100ce919061083b565b60405180910390f35b6100ea6100e53660046106bb565b6101ac565b6040516100ce9291906108ba565b6100c1610340565b61011361010e3660046106f6565b610353565b6040516100ce93929190610922565b6100c161036b565b6100c161013836600461069a565b61036f565b6100c161037c565b6100c1610380565b610155610384565b6040516100ce9190610814565b6101756101703660046106f6565b610388565b6040516100ce9190610828565b6101136101903660046106bb565b610533565b6100c16101a3366004610748565b610550565b4290565b8051439060609067ffffffffffffffff8111156101d957634e487b7160e01b600052604160045260246000fd5b60405190808252806020026020018201604052801561020c57816020015b60608152602001906001900390816101f75790505b50905060005b835181101561033a5760008085838151811061023e57634e487b7160e01b600052603260045260246000fd5b6020026020010151600001516001600160a01b031686848151811061027357634e487b7160e01b600052603260045260246000fd5b60200260200101516020015160405161028c91906107f8565b6000604051808303816000865af19150503d80600081146102c9576040519150601f19603f3d011682016040523d82523d6000602084013e6102ce565b606091505b5091509150816102f95760405162461bcd60e51b81526004016102f090610885565b60405180910390fd5b8084848151811061031a57634e487b7160e01b600052603260045260246000fd5b602002602001018190525050508080610332906109c2565b915050610212565b50915091565b600061034d60014361097b565b40905090565b43804060606103628585610388565b90509250925092565b4390565b6001600160a01b03163190565b4490565b4590565b4190565b6060815167ffffffffffffffff8111156103b257634e487b7160e01b600052604160045260246000fd5b6040519080825280602002602001820160405280156103eb57816020015b6103d8610554565b8152602001906001900390816103d05790505b50905060005b825181101561052c5760008084838151811061041d57634e487b7160e01b600052603260045260246000fd5b6020026020010151600001516001600160a01b031685848151811061045257634e487b7160e01b600052603260045260246000fd5b60200260200101516020015160405161046b91906107f8565b6000604051808303816000865af19150503d80600081146104a8576040519150601f19603f3d011682016040523d82523d6000602084013e6104ad565b606091505b509150915085156104d557816104d55760405162461bcd60e51b81526004016102f090610844565b604051806040016040528083151581526020018281525084848151811061050c57634e487b7160e01b600052603260045260246000fd5b602002602001018190525050508080610524906109c2565b9150506103f1565b5092915050565b6000806060610543600185610353565b9196909550909350915050565b4090565b60408051808201909152600081526060602082015290565b80356001600160a01b038116811461058357600080fd5b919050565b600082601f830112610598578081fd5b8135602067ffffffffffffffff808311156105b5576105b56109f3565b6105c2828385020161094a565b83815282810190868401865b8681101561068c57813589016040601f198181848f030112156105ef578a8bfd5b6105f88261094a565b6106038a850161056c565b81528284013589811115610615578c8dfd5b8085019450508d603f850112610629578b8cfd5b898401358981111561063d5761063d6109f3565b61064d8b84601f8401160161094a565b92508083528e84828701011115610662578c8dfd5b808486018c85013782018a018c9052808a01919091528652505092850192908501906001016105ce565b509098975050505050505050565b6000602082840312156106ab578081fd5b6106b48261056c565b9392505050565b6000602082840312156106cc578081fd5b813567ffffffffffffffff8111156106e2578182fd5b6106ee84828501610588565b949350505050565b60008060408385031215610708578081fd5b82358015158114610717578182fd5b9150602083013567ffffffffffffffff811115610732578182fd5b61073e85828601610588565b9150509250929050565b600060208284031215610759578081fd5b5035919050565b60008282518085526020808601955080818302840101818601855b848110156107bf57858303601f19018952815180511515845284015160408585018190526107ab818601836107cc565b9a86019a945050509083019060010161077b565b5090979650505050505050565b600081518084526107e4816020860160208601610992565b601f01601f19169290920160200192915050565b6000825161080a818460208701610992565b9190910192915050565b6001600160a01b0391909116815260200190565b6000602082526106b46020830184610760565b90815260200190565b60208082526021908201527f4d756c746963616c6c32206167677265676174653a2063616c6c206661696c656040820152601960fa1b606082015260800190565b6020808252818101527f4d756c746963616c6c206167677265676174653a2063616c6c206661696c6564604082015260600190565b600060408201848352602060408185015281855180845260608601915060608382028701019350828701855b8281101561091457605f198887030184526109028683516107cc565b955092840192908401906001016108e6565b509398975050505050505050565b6000848252836020830152606060408301526109416060830184610760565b95945050505050565b604051601f8201601f1916810167ffffffffffffffff81118282101715610973576109736109f3565b604052919050565b60008282101561098d5761098d6109dd565b500390565b60005b838110156109ad578181015183820152602001610995565b838111156109bc576000848401525b50505050565b60006000198214156109d6576109d66109dd565b5060010190565b634e487b7160e01b600052601160045260246000fd5b634e487b7160e01b600052604160045260246000fdfea2646970667358221220c1152f751f29ece4d7bce5287ceafc8a153de9c2c633e3f21943a87d845bd83064736f6c63430008010033'

The bytecode for the Multicall2 contract.

This is used for state override on blocks before the Multicall2 contract was deployed.

dank_mids.constants.MULTICALL3_DEPLOY_BLOCKS: Final[dict[Network, BlockNumber]] = {Network.Mainnet: 14353601, Network.Optimism: 4286263, Network.Fantom: 33001987, Network.Base: 5022, Network.Arbitrum: 7654707}

A dictionary mapping networks to the block numbers where Multicall3 was deployed.

dank_mids.constants.RETRY_ERRS: Final = {'batch limit exceeded', 'connection reset by peer', 'evm error', 'evm timeout', 'execution aborted (timeout =', 'request timed out', 'request timeout', 'server disconnected'}

A list of error messages that are expected during normal use and are not indicative of any problem(s).

These errors will be automatically retried until success is achieved.

dank_mids.constants.REVERT_SELECTORS: Final = (b'\x08\xc3y\xa0', b'4e487b71')

A list of byte strings representing revert selectors.

These selectors are used to identify specific types of revert errors in Ethereum transactions.

dank_mids.constants.TOO_MUCH_DATA_ERRS: Final = {'batch limit exceeded', 'content length too large', 'payload too large', 'request entity too large'}

A list of error messages indicating that the request sent to the RPC was too large and must be split up.

These error messages are used to identify when a request needs to be broken into smaller chunks.

dank_mids.controller module

final class dank_mids.controller.DankMiddlewareController

Bases: object

Controller for managing Dank Middleware operations.

This class handles the core functionality of Dank Mids, including request batching, call execution, and error handling.

See also

dank_mids.semaphores.BlockSemaphore: Used for managing concurrency of eth_calls made with the controller.

async __call__(method, params)

Asynchronous method to handle RPC calls.

This method routes different types of RPC calls to appropriate handlers, including specialized handling for eth_call and other methods that may use queues.

Parameters:
  • method (RPCEndpoint) – The RPC method to be called.

  • params (Any) – The parameters for the RPC call.

Returns:

The response from the RPC call.

Return type:

RPCResponse

__init__(w3)

Initialize the DankMiddlewareController.

Parameters:

w3 (Web3) – The Web3 instance used to make RPC requests.

Return type:

None

async dispatch_pending_rpc_batch_and_wait(calls, timeout=120.0)

Append calls to the current pending JSON-RPC batch, dispatch it, and wait with a timeout.

Returns:

True if the dispatched batch completed before timeout and without errors, otherwise False.

Parameters:
Return type:

bool

early_start()

Initiate processing of queued calls once the capacity gate is hit.

This method combines pending eth_calls and other RPC calls into a single batch, clears the pending Ethereum calls queue, and starts processing the combined batch. This is the immediate-dispatch gate used when the queue is full and we should not wait for another event-loop tick.

Return type:

None

async execute_batch()

Execute a batch of pending calls.

This method collects all pending eth calls and RPC calls, clears the pending queues, and executes them as a single batch.

Return type:

None

async make_request(method, params, request_id=None)

Makes an RPC request to the Ethereum node.

This method creates an RPC request with the given method and parameters, sends it to the node, and returns the raw response.

Parameters:
  • method (str) – The RPC method to call.

  • params (Sequence[Any]) – The parameters for the RPC method.

  • request_id (int | str | None) – An optional request ID. If not provided, a new ID will be generated.

Returns:

The raw response from the Ethereum node.

Raises:

Exception – If DEBUG environment variable is set, any exception that occurs during the request is logged and re-raised.

Return type:

RawResponse

reduce_batch_size(num_calls)

Decrease the size of JSON-RPC batches in response to failures.

Similar to reduce_multicall_size(), this method is called when a JSON-RPC batch operation fails, allowing for dynamic adjustment of batch sizes.

Parameters:

num_calls (int) – The number of calls in the failed batch operation.

Return type:

None

reduce_multicall_size(num_calls)

Decrease the size of multicall batches in response to failures.

This method is called when a multicall operation fails, allowing the system to dynamically adjust the batch size to prevent future failures.

Parameters:

num_calls (int) – The number of calls in the failed multicall operation.

Return type:

None

set_batch_size_limit(new_limit)

Set a new limit for the JSON-RPC batch size.

Parameters:

new_limit (int) – The new maximum number of calls in a JSON-RPC batch.

Return type:

None

set_multicall_size_limit(new_limit)

Set a new limit for the multicall size.

Parameters:

new_limit (int) – The new maximum number of calls in a multicall.

Return type:

None

batcher: Final[NotSoBrightBatcher]

Batcher for RPC calls.

call_uid: Final[UIDGenerator]

Unique identifier generator for individual calls.

chain_id: Final

The chainid for the currently connected rpc.

client_version: Final[str]

The client version for the currently connected rpc.

endpoint: Final[str]

The uri for the connected rpc.

eth_call_semaphores: Final

Used for managing concurrency of eth_calls.

property has_pending_calls: bool
multicall_uid: Final

Unique identifier generator for multicall operations.

no_multicall: Final[set[ChecksumAddress]]

A set of addresses that have issues when called from the multicall contract. Calls to these contracts will not be batched in multicalls.

pending_eth_calls: Final[DefaultDict[BlockId, Multicall]]

A dictionary of pending Multicall objects by block. The Multicalls hold all pending eth_calls.

pending_rpc_calls: JSONRPCBatch

A JSONRPCBatch containing all pending rpc requests.

property queue_is_full: bool

Check if the queue of pending calls is full.

Returns:

True if the queue is full, False otherwise.

request_type

The Struct class the controller will use to encode requests.

request_uid: Final

Unique identifier generator for RPC requests.

state_override_not_supported: Final

A boolean that indicates whether the connected rpc supports state override functionality.

sync_w3: Final

A sync Web3 instance connected to the same rpc, used to make calls during init.

w3: Final[Web3]

The Web3 instance used to make rpc requests.

dank_mids.eth module

class dank_mids.eth.DankEth

Bases: AsyncEth

__init__(w3)
Return type:

None

attach_methods(methods)
Parameters:

methods (Dict[str, Method[Callable[[...], Any]]])

Return type:

None

block_id_munger(account, block_identifier=None)
Parameters:
  • account (Address | ChecksumAddress | ENS)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

Return type:

Tuple[Address | ChecksumAddress | ENS, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int]

async call(transaction, block_identifier=None, state_override=None, ccip_read_enabled=None)
Parameters:
  • transaction (TxParams)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

  • state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)

  • ccip_read_enabled (bool | None)

Return type:

HexBytes

call_munger(transaction, block_identifier=None, state_override=None)
Parameters:
  • transaction (TxParams)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

  • state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)

Return type:

Tuple[TxParams, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int] | ~typing.Tuple[~web3.types.TxParams, ~typing.Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int, ~typing.Dict[str | ~eth_typing.evm.Address | ~eth_typing.evm.ChecksumAddress, ~web3.types.StateOverrideParams]]

contract(address=None, **kwargs)
Parameters:
  • address (Address | ChecksumAddress | ENS | None)

  • kwargs (Any)

Return type:

Type[AsyncContract] | AsyncContract

async create_access_list(transaction, block_identifier=None)
Parameters:
  • transaction (TxParams)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

Return type:

CreateAccessListResponse

create_access_list_munger(transaction, block_identifier=None)
Parameters:
  • transaction (TxParams)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

Return type:

Tuple[TxParams, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int]

async estimate_gas(transaction, block_identifier=None, state_override=None)
Parameters:
  • transaction (TxParams)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

  • state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)

Return type:

int

estimate_gas_munger(transaction, block_identifier=None, state_override=None)
Parameters:
  • transaction (TxParams)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

  • state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)

Return type:

Tuple[TxParams, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int] | ~typing.Tuple[~web3.types.TxParams, ~typing.Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int, ~typing.Dict[str | ~eth_typing.evm.Address | ~eth_typing.evm.ChecksumAddress, ~web3.types.StateOverrideParams]]

async fee_history(block_count, newest_block, reward_percentiles=None)
Parameters:
  • block_count (int)

  • newest_block (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber)

  • reward_percentiles (List[float] | None)

Return type:

FeeHistory

filter_munger(filter_params=None, filter_id=None)
Parameters:
  • filter_params (str | FilterParams | None)

  • filter_id (HexStr | None)

Return type:

List[FilterParams] | List[HexStr] | List[str]

generate_gas_price(transaction_params=None)
Parameters:

transaction_params (TxParams | None)

Return type:

Wei | None

async get_balance(account, block_identifier=None)
Parameters:
  • account (ChecksumAddress)

  • block_identifier (BlockNumber | None)

async get_block(block_identifier, full_transactions=False)
Parameters:
  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int)

  • full_transactions (bool)

Return type:

BlockData

get_block_munger(block_identifier, full_transactions=False)
Parameters:
  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int)

  • full_transactions (bool)

Return type:

Tuple[Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int, bool]

async get_block_number()

Method object for web3 module methods

Calls to the Method go through these steps:

1. input munging - includes normalization, parameter checking, early parameter formatting. Any processing on the input parameters that need to happen before json_rpc method string selection occurs.

A note about mungers: The first (root) munger should reflect the desired

api function arguments. In other words, if the api function wants to behave as: get_balance(account, block_identifier=None), the root munger should accept these same arguments, with the addition of the module as the first argument e.g.:

``` def get_balance_root_munger(module, account, block_identifier=None):

if block_identifier is None:

block_identifier = DEFAULT_BLOCK

return module, [account, block_identifier]

```

all mungers should return an argument list.

if no munger is provided, a default munger expecting no method arguments will be used.

2. method selection - The json_rpc_method argument can be method string or a function that returns a method string. If a callable is provided the processed method inputs are passed to the method selection function, and the returned method string is used.

3. request and response formatters are set - formatters are retrieved using the json rpc method string.

4. After the parameter processing from steps 1-3 the request is made using the calling function returned by the module attribute retrieve_caller_fn and the response formatters are applied to the output.

Return type:

BlockNumber

async get_block_receipts(block_identifier)
Parameters:

block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int)

Return type:

List[TxReceipt]

async get_block_timestamp(block_identifier)

Retrieves only the timestamp from a specific block.

This method skips decoding the rest of the Block response data.

Parameters:

block_identifier (int) – The block number from which to retrieve the timestamp.

Return type:

UnixTimestamp

Example

>>> print(await dank_mids.eth.get_block_timestamp(12345678))
async get_code(account, block_identifier=None)
Parameters:
  • account (ChecksumAddress)

  • block_identifier (BlockNumber | None)

async get_logs(*args, decode_to=tuple[evmspec.structs.log.Log, ...], decode_hook=<function _decode_hook>, **kwargs)

Fetches logs and decodes them into the specified format.

Parameters:
  • *args – Additional positional arguments.

  • decode_to (type[T]) – The class to which logs should be decoded.

  • decode_hook (Callable[[type[T], Any], T] | None) – Hook function to assist in decoding.

  • **kwargs – Additional keyword arguments.

Return type:

T

async get_raw_transaction(transaction_hash)
Parameters:

transaction_hash (Hash32 | HexBytes | HexStr)

Return type:

HexBytes

async get_raw_transaction_by_block(block_identifier, index)
Parameters:
  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int)

  • index (int)

Return type:

HexBytes

async get_storage_at(account, position, block_identifier=None)
Parameters:
  • account (Address | ChecksumAddress | ENS)

  • position (int)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

Return type:

HexBytes

get_storage_at_munger(account, position, block_identifier=None)
Parameters:
  • account (Address | ChecksumAddress | ENS)

  • position (int)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)

Return type:

Tuple[Address | ChecksumAddress | ENS, int, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int]

async get_transaction(transaction_hash)

Retrieves a transaction by its hash and attempts to decode it.

Parameters:

transaction_hash (HexStr) – The hash of the transaction to retrieve.

Raises:

ValidationError – If the transaction cannot be decoded into either Transaction or TransactionRLP format.

Return type:

TransactionLegacy | Transaction2930 | Transaction1559 | Transaction4844 | Transaction7702 | TransactionRLP

async get_transaction_by_block(block_identifier, index)
Parameters:
  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int)

  • index (int)

Return type:

TxData

async get_transaction_count(account, block_identifier=None)
Parameters:
  • account (ChecksumAddress)

  • block_identifier (BlockNumber | None)

async get_transaction_receipt(*args, decode_to=<class 'evmspec.structs.receipt.TransactionReceipt'>, decode_hook=<function _decode_hook>, **kwargs)

Fetches the transaction receipt and decodes it into the specified format.

Parameters:
  • *args – Additional positional arguments.

  • decode_to (type[T]) – The class to which the receipt should be decoded.

  • decode_hook (Callable[[type[T], Any], T] | None) – Hook function to assist in decoding.

  • **kwargs – Additional keyword arguments.

Return type:

T

async get_transaction_status(transaction_hash)

Retrieves the status of a transaction.

Parameters:

transaction_hash (str) – The hash of the transaction to query.

Returns:

The status of the transaction.

Return type:

Status

async get_transactions(block_identifier: int | HexStr) list[TransactionLegacy | Transaction2930 | Transaction1559 | Transaction4844 | Transaction7702]
async get_transactions(block_identifier: int | HexStr, hashes_only: Literal[True]) list[TransactionHash]
async get_transactions(block_identifier: int | HexStr, hashes_only: Literal[False]) list[TransactionLegacy | Transaction2930 | Transaction1559 | Transaction4844 | Transaction7702]

Retrieves only the transactions from a specific block.

This method skips decoding the rest of the Block response data.

Parameters:
  • block_identifier – The block number or hash from which to retrieve the transactions.

  • hashes_only – If True, only transaction hashes will be returned.

Example

>>> [print(tx.hash) for tx in await dank_mids.eth.get_transactions(12345678))
get_uncle_count(block_identifier)
Parameters:

block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int)

Return type:

int

icap_namereg()
Return type:

NoReturn

async modify_transaction(transaction_hash, **transaction_params)
Parameters:
  • transaction_hash (Hash32 | HexBytes | HexStr)

  • transaction_params (Unpack[TxParams])

Return type:

HexBytes

namereg()
Return type:

NoReturn

async replace_transaction(transaction_hash, new_transaction)
Parameters:
  • transaction_hash (Hash32 | HexBytes | HexStr)

  • new_transaction (TxParams)

Return type:

HexBytes

async send_raw_transaction(transaction)
Parameters:

transaction (HexStr | bytes)

Return type:

HexBytes

async send_transaction(transaction)
Parameters:

transaction (TxParams)

Return type:

HexBytes

send_transaction_munger(transaction)
Parameters:

transaction (TxParams)

Return type:

Tuple[TxParams]

set_contract_factory(contract_factory)
Parameters:

contract_factory (Type[AsyncContract | AsyncContractCaller])

Return type:

None

set_gas_price_strategy(gas_price_strategy)
Parameters:

gas_price_strategy (Callable[[Web3, TxParams], Wei] | Callable[[AsyncWeb3[Any], TxParams], Wei] | None)

Return type:

None

async sign(account, data=None, hexstr=None, text=None)
Parameters:
  • account (Address | ChecksumAddress | ENS)

  • data (int | bytes | None)

  • hexstr (HexStr | None)

  • text (str | None)

Return type:

HexStr

sign_munger(account, data=None, hexstr=None, text=None)
Parameters:
  • account (Address | ChecksumAddress | ENS)

  • data (int | bytes | None)

  • hexstr (HexStr | None)

  • text (str | None)

Return type:

Tuple[Address | ChecksumAddress | ENS, HexStr]

async sign_transaction(transaction)
Parameters:

transaction (TxParams)

Return type:

SignedTx

async sign_typed_data(account, data)
Parameters:
  • account (Address | ChecksumAddress | ENS)

  • data (Dict[str, Any])

Return type:

HexStr

async simulate_v1(payload, block_identifier)
Parameters:
  • payload (SimulateV1Payload)

  • block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int)

Return type:

Sequence[SimulateV1Result]

async subscribe(subscription_type, subscription_arg=None, handler=None, handler_context=None, label=None, parallelize=None)
Parameters:
  • subscription_type (Literal['newHeads', 'logs', 'newPendingTransactions', 'syncing'])

  • subscription_arg (LogsSubscriptionArg | bool | None)

  • handler (Callable[[EthSubscriptionContext[Any, Any]], Coroutine[Any, Any, None]] | None)

  • handler_context (Dict[str, Any] | None)

  • label (str | None)

  • parallelize (bool | None)

Return type:

HexStr

async trace_filter(filter_params, decode_to=list[typing.Union[evmspec.structs.trace.call.Trace, evmspec.structs.trace.create.Trace, evmspec.structs.trace.reward.Trace, evmspec.structs.trace.suicide.Trace]], decode_hook=<function _decode_hook>)

Returns all traces matching a filter. If the decoding to the specified type fails, the method logs problematic traces and re-raises the exception as a diagnostic aid.

Parameters:
  • filter_params (TraceFilterParams) – The parameters defining the traces to filter.

  • decode_to (type[T]) – The class to which traces should be decoded.

  • decode_hook (Callable[[type[T], Any], T] | None) – Hook function to assist in decoding.

Raises:

ValidationError – If a trace cannot be decoded.

Return type:

T

async trace_transaction(transaction_hash)

Returns all traces produced by a transaction.

Parameters:

transaction_hash (str) – The hash of the transaction to trace.

Return type:

list[Trace | Trace | Trace | Trace]

Example

>>> traces = await dank_mids.eth.trace_transaction('0x...')
async uninstall_filter(filter_id)
Parameters:

filter_id (HexStr)

Return type:

bool

async unsubscribe(subscription_id)
Parameters:

subscription_id (HexStr)

Return type:

bool

async wait_for_transaction_receipt(transaction_hash, timeout=120, poll_latency=0.1)
Parameters:
  • transaction_hash (Hash32 | HexBytes | HexStr)

  • timeout (float | None)

  • poll_latency (float)

Return type:

TxReceipt

account = <eth_account.account.Account object>
property accounts: Tuple[ChecksumAddress]
property blob_base_fee: Wei
property block_number: BlockNumber
chain_id
property codec: ABICodec
property default_account: ChecksumAddress | Empty
property default_block: Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | BlockNumber | Hash32 | HexStr | HexBytes | int
filter: Method[Callable[[str | FilterParams | HexStr | None], Awaitable[AsyncFilter]]]
property gas_price: Wei
get_block_transaction_count: Method[Callable[[BlockIdentifier], Awaitable[int]]]
get_filter_changes

Custom method class to bypass web3py’s default result formatters.

This class processes parameters, makes conditional adjustments to the parameters, and selects specific methods to call based on those parameters. It bypasses the standard result formatters by applying a no-operation formatter, effectively returning responses as-is or using predefined formatters based on the RPC call.

get_filter_logs

Custom method class to bypass web3py’s default result formatters.

This class processes parameters, makes conditional adjustments to the parameters, and selects specific methods to call based on those parameters. It bypasses the standard result formatters by applying a no-operation formatter, effectively returning responses as-is or using predefined formatters based on the RPC call.

is_async = True
property max_priority_fee: Wei

Try to use eth_maxPriorityFeePerGas but, since this is not part of the spec and is only supported by some clients, fall back to an eth_feeHistory calculation with min and max caps.

meth

Custom method class to bypass web3py’s default result formatters.

This class processes parameters, makes conditional adjustments to the parameters, and selects specific methods to call based on those parameters. It bypasses the standard result formatters by applying a no-operation formatter, effectively returning responses as-is or using predefined formatters based on the RPC call.

property syncing: SyncStatus | bool
w3: AsyncWeb3[Any]
class dank_mids.eth.TraceFilterParams

Bases: TypedDict

__getitem__()

x.__getitem__(y) <==> x[y]

__init__(*args, **kwargs)
__iter__()

Implement iter(self).

clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values
after: int
count: int
fromAddress: Sequence[Address | ChecksumAddress | ENS]
fromBlock: Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | BlockNumber | Hash32 | HexStr | HexBytes | int
toAddress: Sequence[Address | ChecksumAddress | ENS]
toBlock: Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | BlockNumber | Hash32 | HexStr | HexBytes | int
dank_mids.eth.decode_timestamped(self, buf)

Deserialize an object from JSON.

Parameters:

buf (bytes-like or str) – The message to decode.

Returns:

obj – The deserialized object.

Return type:

Any

dank_mids.eth.decode_tiny_block(self, buf)

Deserialize an object from JSON.

Parameters:

buf (bytes-like or str) – The message to decode.

Returns:

obj – The deserialized object.

Return type:

Any

dank_mids.eth.decode_transaction(self, buf)

Deserialize an object from JSON.

Parameters:

buf (bytes-like or str) – The message to decode.

Returns:

obj – The deserialized object.

Return type:

Any

dank_mids.eth.decode_transaction_rlp(self, buf)

Deserialize an object from JSON.

Parameters:

buf (bytes-like or str) – The message to decode.

Returns:

obj – The deserialized object.

Return type:

Any

dank_mids.exceptions module

exception dank_mids.exceptions.BrownieNotConnectedError

Bases: RuntimeError

RuntimeError raised when brownie is not connected but needs to be in order to access functionality within dank_mids.brownie_patch.

__init__(obj_name)

Initializes a new BrownieNotConnectedError.

Params:

obj_name: The name of the object the user attempted to access, which requires brownie to be connected.

Parameters:

obj_name (str)

exception dank_mids.exceptions.BrowniePatchImportError

Bases: ImportError

ImportError raised when brownie integration cannot be imported or initialized.

__init__(obj_name, exc)
Parameters:
Return type:

None

msg

exception message

name

module name

path

module path

exception dank_mids.exceptions.BrowniePatchNotInitializedError

Bases: RuntimeError

RuntimeError raised when brownie is connected but brownie patch was not initialized.

__init__(obj_name)
Parameters:

obj_name (str)

Return type:

None

exception dank_mids.exceptions.GarbageCollectionError

Bases: RuntimeError

Exception raised when an object is garbage collected prematurely.

exception dank_mids.exceptions.Revert

Bases: ValueError

dank_mids.lock module

final class dank_mids.lock.AlertingRLock

Bases: object

__init__(name)

Initializes the reentrant lock with a given name.

The name can be used for debugging and logging purposes to track lock usage.

Examples

>>> lock = AlertingRLock(name="example")
>>> lock._name
'example'

See also

UIDGenerator

Parameters:

name (str)

Return type:

None

acquire(blocking=True, timeout=-1.0)

Acquire a lock, blocking or non-blocking, with alerting if not acquired within 5 seconds.

When invoked, this method first attempts to acquire the lock with a 5-second timeout. If the lock is not acquired within 5 seconds, a warning is logged, and the method proceeds to acquire the lock using the provided blocking and timeout arguments, matching the semantics of the builtin RLock.

Parameters:
  • blocking (bool) – Whether to block during the acquisition attempt.

  • timeout (float) – The maximum time to wait during the acquisition attempt.

Returns:

True if the lock is acquired, False otherwise.

Return type:

bool

See also

_RLock.acquire()

locked()

Return True if the lock is held by any thread, False otherwise.

Return type:

bool

release()

Release a lock, decrementing the recursion level.

If after the decrement it is zero, reset the lock to unlocked (not owned by any thread), and if any other threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling thread.

Only call this method when the calling thread owns the lock. A RuntimeError is raised if this method is called when the lock is unlocked.

There is no return value.

Return type:

None

final class dank_mids.lock.Lock

Bases: object

Primitive lock objects.

A primitive lock is a synchronization primitive that is not owned by a particular coroutine when locked. A primitive lock is in one of two states, ‘locked’ or ‘unlocked’.

It is created in the unlocked state. It has two basic methods, acquire() and release(). When the state is unlocked, acquire() changes the state to locked and returns immediately. When the state is locked, acquire() blocks until a call to release() in another coroutine changes it to unlocked, then the acquire() call resets it to locked and returns. The release() method should only be called in the locked state; it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError will be raised.

When more than one coroutine is blocked in acquire() waiting for the state to turn to unlocked, only one coroutine proceeds when a release() call resets the state to unlocked; first coroutine which is blocked in acquire() is being processed.

acquire() is a coroutine and should be called with ‘await’.

Locks also support the asynchronous context management protocol. ‘async with lock’ statement should be used.

Usage:

lock = Lock() … await lock.acquire() try:

finally:

lock.release()

Context manager usage:

lock = Lock() … async with lock:

Lock objects can be tested for locking state:

if not lock.locked():

await lock.acquire()

else:

# lock is acquired …

__init__()
Return type:

None

async acquire()

Acquire a lock.

This method blocks until the lock is unlocked, then sets it to locked and returns True.

Return type:

Literal[True]

locked()

Return True if lock is acquired.

Return type:

bool

release()

Release a lock.

When the lock is locked, reset it to unlocked, and return. If any other coroutines are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed.

When invoked on an unlocked lock, a RuntimeError is raised.

There is no return value.

Return type:

None

dank_mids.logging module

class dank_mids.logging.CLogger

Bases: Logger

__init__(name, level=0)

Initialize the logger with a name and an optional level.

Parameters:
Return type:

None

addFilter(filter)

Add the specified filter to this handler.

addHandler(hdlr)

Add the specified handler to this logger.

callHandlers(record)

Pass a record to all relevant handlers.

Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger whose handlers are called.

critical(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘CRITICAL’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.critical(“Houston, we have a %s”, “major disaster”, exc_info=1)

Parameters:
Return type:

None

debug(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘DEBUG’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=1)

Parameters:
Return type:

None

error(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘ERROR’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.error(“Houston, we have a %s”, “major problem”, exc_info=1)

Parameters:
Return type:

None

exception(msg, *args, exc_info=True, **kwargs)

Convenience method for logging an ERROR with exception information.

Parameters:
  • msg (object)

  • args (Any)

  • exc_info (logging._ExcInfoType)

  • kwargs (Any)

Return type:

None

fatal(msg, *args, **kwargs)

Don’t use this method, use critical() instead.

Parameters:
Return type:

None

filter(record)

Determine if a record is loggable by consulting all the filters.

The default is to allow the record to be logged; any filter can veto this and the record is then dropped. Returns a zero value if a record is to be dropped, else non-zero.

Changed in version 3.2: Allow filters to be just callables.

findCaller(stack_info=False, stacklevel=1)

Find the stack frame of the caller so that we can note the source file name, line number and function name.

Parameters:
  • stack_info (bool)

  • stacklevel (int)

Return type:

tuple[str, int, str, str | None]

getChild(suffix)

Get a logger which is a descendant to this one.

This is a convenience method, such that

logging.getLogger(‘abc’).getChild(‘def.ghi’)

is the same as

logging.getLogger(‘abc.def.ghi’)

It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.

getEffectiveLevel()

Get the effective level for this logger.

Loop through this logger and its parents in the logger hierarchy, looking for a non-zero logging level. Return the first one found.

Return type:

int

handle(record)

Call the handlers for the specified record.

This method is used for unpickled records received from a socket, as well as those created locally. Logger-level filtering is applied.

hasHandlers()

See if this logger has any handlers configured.

Loop through all handlers for this logger and its parents in the logger hierarchy. Return True if a handler was found, else False. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger which is checked for the existence of handlers.

info(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘INFO’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.info(“Houston, we have a %s”, “interesting problem”, exc_info=1)

Parameters:
Return type:

None

isEnabledFor(level)

Is this logger enabled for level ‘level’?

Parameters:

level (int)

Return type:

bool

log(level, msg, *args, **kwargs)

Log ‘msg % args’ with the integer severity ‘level’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.log(level, “We have a %s”, “mysterious problem”, exc_info=1)

Parameters:
Return type:

None

makeRecord(name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None)

A factory method which can be overridden in subclasses to create specialized LogRecords.

Parameters:
  • name (str)

  • level (Level)

  • fn (str)

  • lno (int)

  • msg (object)

  • args (logging._ArgsType)

  • exc_info (logging._SysExcInfoType | None)

  • func (str | None)

  • extra (Mapping[str, object] | None)

  • sinfo (str | None)

Return type:

logging.LogRecord

removeFilter(filter)

Remove the specified filter from this handler.

removeHandler(hdlr)

Remove the specified handler from this logger.

setLevel(level)

Set the logging level of this logger. level must be an int or a str.

warn(msg, *args, **kwargs)
warning(msg, *args, **kwargs)

Log ‘msg % args’ with severity ‘WARNING’.

To pass exception information, use the keyword argument exc_info with a true value, e.g.

logger.warning(“Houston, we have a %s”, “bit of a problem”, exc_info=1)

Parameters:
Return type:

None

manager = <logging.Manager object>
root = <RootLogger root (WARNING)>
dank_mids.logging.get_c_logger(name)
Parameters:

name (str)

Return type:

CLogger

dank_mids.logging.use_c_logger_class()

Any logger created inside the with block will use our faster CLogger implementation.

Return type:

Iterator[None]

dank_mids.middleware module

class dank_mids.middleware.DankMiddleware

Bases: Web3Middleware

Web3 v7 middleware that routes async requests through a cached Dank controller.

__init__(w3)
Parameters:

w3 (AsyncWeb3 | None)

Return type:

None

async async_request_processor(method, params)
Parameters:
  • method (RPCEndpoint)

  • params (Any)

Return type:

Any

async async_response_processor(method, response)
Parameters:
  • method (RPCEndpoint)

  • response (RPCResponse)

Return type:

RPCResponse

async async_wrap_make_batch_request(make_batch_request)
Parameters:

make_batch_request (AsyncMakeBatchRequestFn)

Return type:

AsyncMakeBatchRequestFn

async async_wrap_make_request(make_request)
Parameters:

make_request (Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]])

Return type:

Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]

request_processor(method, params)
Parameters:
  • method (RPCEndpoint)

  • params (Any)

Return type:

Any

response_processor(method, response)
Parameters:
  • method (RPCEndpoint)

  • response (RPCResponse)

Return type:

RPCResponse

wrap_make_batch_request(make_batch_request)
Parameters:

make_batch_request (MakeBatchRequestFn)

Return type:

MakeBatchRequestFn

wrap_make_request(make_request)
Parameters:

make_request (MakeRequestFn)

Return type:

MakeRequestFn

dank_mids.retry_observer module

class dank_mids.retry_observer.RetryEvent

Bases: object

Represents a single retry decision emitted by Dank Mids retry logic.

operation

Human-readable identifier for the operation being retried.

Type:

str

attempt

The 1-based retry attempt number (the first retry attempt is 1).

Type:

int

error

The exception that triggered the retry decision.

Type:

BaseException

max_attempts

Optional maximum attempts configured by the caller.

Type:

int | None

delay

Optional delay (seconds) before the next attempt.

Type:

float | None

component

Optional subsystem name (for example: “jsonrpc” or “multicall”).

Type:

str | None

will_retry

Whether the caller intends to retry after this event.

Type:

bool

metadata

Optional key/value tags for downstream logging or metrics.

Type:

dict[str, str] | None

timestamp

Unix timestamp when the event was created.

Type:

float

__init__(operation, attempt, error, max_attempts=None, delay=None, component=None, will_retry=True, metadata=None, timestamp=<factory>)
Parameters:
Return type:

None

attempt: int
component: str | None
delay: float | None
error: BaseException
max_attempts: int | None
metadata: dict[str, str] | None
operation: str
timestamp: float
will_retry: bool
class dank_mids.retry_observer.RetryObserver

Bases: Protocol

Callable observer interface for retry events.

Implementations should be fast and side-effect safe. If you need to do IO, buffer the event and ship it elsewhere.

__call__(event)

Handle a retry event.

Parameters:

event (RetryEvent)

Return type:

None

__init__(*args, **kwargs)
dank_mids.retry_observer.emit_retry_event(event)

Emit a retry event to all currently registered observers.

Observers are called in the order they were registered. A snapshot is taken to avoid iteration issues if observers register/unregister themselves.

Parameters:

event (RetryEvent)

Return type:

None

dank_mids.retry_observer.get_retry_observers()

Return a snapshot of currently registered retry observers.

Return type:

tuple[RetryObserver, …]

dank_mids.retry_observer.register_retry_observer(observer)

Register a retry observer.

Observers are stored as weak references. Keep a strong reference in your application if you do not want it to be garbage collected.

Parameters:

observer (RetryObserver)

Return type:

None

dank_mids.retry_observer.unregister_retry_observer(observer)

Unregister a previously registered retry observer.

Raises:

ValueError – If the observer is not currently registered.

Parameters:

observer (RetryObserver)

Return type:

None

dank_mids.semaphores module

class dank_mids.semaphores.BlockSemaphore

Bases: _AbstractPrioritySemaphore

A semaphore for managing concurrency based on block numbers.

This class extends _AbstractPrioritySemaphore to provide block-specific concurrency control.

Parameters:
  • value – The initial value of the semaphore.

  • name – An optional name for the semaphore.

See also

_BlockSemaphoreContextManager: The context manager used by this semaphore.

__call__(self, fn: CoroFn[P, T]) CoroFn[P, T]

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__getitem__(self, priority: PT | None) '_AbstractPrioritySemaphoreContextManager[PT]'

Gets the context manager for a given priority.

Parameters:
  • priority – The priority for which to get the context manager. If None, uses the top priority.

  • block (int | HexStr | Literal['latest', None])

Returns:

The context manager associated with the given priority.

Return type:

_BlockSemaphoreContextManager

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = semaphore[priority]
__init__(value=1, *, name=None)

Initializes the priority semaphore.

Parameters:
  • value – The initial capacity of the semaphore.

  • name – An optional name for the semaphore, used for debugging.

Return type:

None

Examples

>>> semaphore = _AbstractPrioritySemaphore(5, name="test_semaphore")
acquire(self)

Acquires the semaphore with the top priority.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> await semaphore.acquire()
decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Checks if the semaphore is locked.

Returns:

True if the semaphore cannot be acquired immediately, False otherwise.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore.locked()
release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

dank_mids.types module

class dank_mids.types.BlockId

A type representing the identifier for a specific block in the blockchain.

alias of str

class dank_mids.types.ChainstackRateLimitContext

Bases: DictStruct

Chainstack doesn’t use status code 429 for rate limiting, they attach one of these objects to a 200 response.

__getitem__(attr)

Lookup an attribute value via dictionary-style access.

Parameters:

attr (str) – The name of the attribute to access.

Raises:

KeyError – If the provided key is not a member of the struct.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s['field1']
'value'
>>> s['field2']
Traceback (most recent call last):
    ...
KeyError: ('field2', MyStruct(field1='value'))
__iter__()

Iterate through the keys of the Struct.

Yields:

Struct key.

Return type:

Iterator[str]

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(iter(s))
['field1', 'field2']
get(key, default=None)

Get the value associated with a key, or a default value if the key is not present.

Parameters:
  • key (str) – The key to look up.

  • default (optional) – The value to return if the key is not present.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s.get('field1')
'value'
>>> s.get('field2', 'default')
'default'
items()

Returns an iterator over the struct’s field name and value pairs.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.items())
[('field1', 'value'), ('field2', 42)]
Return type:

Iterator[tuple[str, Any]]

keys()

Returns an iterator over the field names of the struct.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.keys())
['field1', 'field2']
Return type:

Iterator[str]

values()

Returns an iterator over the struct’s field values.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.values())
['value', 42]
Return type:

Iterator[Any]

logger: ClassVar = <CLogger dank_mids.chainstack (WARNING)>
property try_again_in: float

Calculates the time to wait before retrying the request.

Returns:

The number of seconds to wait before retrying.

Raises:

NotImplementedError – If the time string is in an unrecognized format.

class dank_mids.types.Error

Bases: DictStruct

Represents an error in a JSON-RPC response.

__getitem__(attr)

Lookup an attribute value via dictionary-style access.

Parameters:

attr (str) – The name of the attribute to access.

Raises:

KeyError – If the provided key is not a member of the struct.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s['field1']
'value'
>>> s['field2']
Traceback (most recent call last):
    ...
KeyError: ('field2', MyStruct(field1='value'))
__iter__()

Iterate through the keys of the Struct.

Yields:

Struct key.

Return type:

Iterator[str]

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(iter(s))
['field1', 'field2']
get(key, default=None)

Get the value associated with a key, or a default value if the key is not present.

Parameters:
  • key (str) – The key to look up.

  • default (optional) – The value to return if the key is not present.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s.get('field1')
'value'
>>> s.get('field2', 'default')
'default'
items()

Returns an iterator over the struct’s field name and value pairs.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.items())
[('field1', 'value'), ('field2', 42)]
Return type:

Iterator[tuple[str, Any]]

keys()

Returns an iterator over the field names of the struct.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.keys())
['field1', 'field2']
Return type:

Iterator[str]

to_dict(*, context: None = None) RPCError
to_dict(*, context: PartialRequest) RPCErrorWithContext
values()

Returns an iterator over the struct’s field values.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.values())
['value', 42]
Return type:

Iterator[Any]

code: int

The error code.

data: str | ChainstackRateLimitContext | None

Additional error data, if any.

EVM specs say it should be a string, but some providers will return a dictionary here with even more context.

message: str

The error message.

dank_mids.types.JsonrpcParams

A list of parameters for JSON-RPC calls, which should be eth_callParams, BlockId, and OverrideParams, in that order.

dank_mids.types.Multicalls

A dictionary mapping BlockId to Multicall objects.

See also

dank_mids._requests.Multicall: The Multicall class used in this mapping.

class dank_mids.types.OverrideParams

Bases: TypedDict

__getitem__()

x.__getitem__(y) <==> x[y]

__init__(*args, **kwargs)
__iter__()

Implement iter(self).

clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values
code: str
class dank_mids.types.PartialRequest

Bases: DictStruct

Represents a partial JSON-RPC request.

While technially part of a request, we can successfully make requests to many nodes without including the jsonrpc field.

This class leaves off the jsonrpc field reduce encoding burden and web traffic.

This works with many but not all nodes.

__getitem__(attr)

Lookup an attribute value via dictionary-style access.

Parameters:

attr (str) – The name of the attribute to access.

Raises:

KeyError – If the provided key is not a member of the struct.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s['field1']
'value'
>>> s['field2']
Traceback (most recent call last):
    ...
KeyError: ('field2', MyStruct(field1='value'))
__iter__()

Iterate through the keys of the Struct.

Yields:

Struct key.

Return type:

Iterator[str]

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(iter(s))
['field1', 'field2']
get(key, default=None)

Get the value associated with a key, or a default value if the key is not present.

Parameters:
  • key (str) – The key to look up.

  • default (optional) – The value to return if the key is not present.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s.get('field1')
'value'
>>> s.get('field2', 'default')
'default'
items()

Returns an iterator over the struct’s field name and value pairs.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.items())
[('field1', 'value'), ('field2', 42)]
Return type:

Iterator[tuple[str, Any]]

keys()

Returns an iterator over the field names of the struct.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.keys())
['field1', 'field2']
Return type:

Iterator[str]

values()

Returns an iterator over the struct’s field values.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.values())
['value', 42]
Return type:

Iterator[Any]

property data: bytes
id: str | int

The request identifier.

method: str

The RPC method to be called.

params: list | None

The parameters for the RPC method call.

class dank_mids.types.PartialResponse

Bases: DictStruct

Represents a partial JSON-RPC response.

We use these to more efficiently decode responses from the node.

__getitem__(attr)

Lookup an attribute value via dictionary-style access.

Parameters:

attr (str) – The name of the attribute to access.

Raises:

KeyError – If the provided key is not a member of the struct.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s['field1']
'value'
>>> s['field2']
Traceback (most recent call last):
    ...
KeyError: ('field2', MyStruct(field1='value'))
__iter__()

Iterate through the keys of the Struct.

Yields:

Struct key.

Return type:

Iterator[str]

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(iter(s))
['field1', 'field2']
decode_result(method=None, *, caller=None)
Parameters:

method (RPCEndpoint | None)

Return type:

HexBytes | Wei | uint | ChainId | BlockNumber | AttributeDict | str

get(key, default=None)

Get the value associated with a key, or a default value if the key is not present.

Parameters:
  • key (str) – The key to look up.

  • default (optional) – The value to return if the key is not present.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s.get('field1')
'value'
>>> s.get('field2', 'default')
'default'
items()

Returns an iterator over the struct’s field name and value pairs.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.items())
[('field1', 'value'), ('field2', 42)]
Return type:

Iterator[tuple[str, Any]]

keys()

Returns an iterator over the field names of the struct.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.keys())
['field1', 'field2']
Return type:

Iterator[str]

to_dict(method=None)

Returns a complete dictionary representation of this response Struct.

Parameters:

method (RPCEndpoint | None)

Return type:

RPCResponse

values()

Returns an iterator over the struct’s field values.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.values())
['value', 42]
Return type:

Iterator[Any]

error: Error | None

The error object, if the call failed.

property exception: Exception

If the rpc response contains an ‘error’ field, returns a specialized exception for the specified rpc error.

property payload_too_large: bool
result: Raw

The result of the RPC call, if successful.

class dank_mids.types.RPCErrorWithContext

Bases: dict

__getitem__()

x.__getitem__(y) <==> x[y]

__init__(*args, **kwargs)
__iter__()

Implement iter(self).

clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values
code: int
dank_mids_added_context: PartialRequest
data: NotRequired[str]
message: str
class dank_mids.types.Request

Bases: PartialRequest

Represents a complete JSON-RPC request.

Inherits from PartialRequest and adds the JSON-RPC version.

__getitem__(attr)

Lookup an attribute value via dictionary-style access.

Parameters:

attr (str) – The name of the attribute to access.

Raises:

KeyError – If the provided key is not a member of the struct.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s['field1']
'value'
>>> s['field2']
Traceback (most recent call last):
    ...
KeyError: ('field2', MyStruct(field1='value'))
__iter__()

Iterate through the keys of the Struct.

Yields:

Struct key.

Return type:

Iterator[str]

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(iter(s))
['field1', 'field2']
get(key, default=None)

Get the value associated with a key, or a default value if the key is not present.

Parameters:
  • key (str) – The key to look up.

  • default (optional) – The value to return if the key is not present.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s.get('field1')
'value'
>>> s.get('field2', 'default')
'default'
items()

Returns an iterator over the struct’s field name and value pairs.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.items())
[('field1', 'value'), ('field2', 42)]
Return type:

Iterator[tuple[str, Any]]

keys()

Returns an iterator over the field names of the struct.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.keys())
['field1', 'field2']
Return type:

Iterator[str]

values()

Returns an iterator over the struct’s field values.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.values())
['value', 42]
Return type:

Iterator[Any]

property data: bytes
id: str | int

The request identifier.

jsonrpc: Literal['2.0']

The JSON-RPC version, always set to “2.0”.

method: str

The RPC method to be called.

params: list | None

The parameters for the RPC method call.

class dank_mids.types.Response

Bases: PartialResponse

Represents a complete JSON-RPC response.

Inherits from PartialResponse and adds the response identifier and JSON-RPC version.

__getitem__(attr)

Lookup an attribute value via dictionary-style access.

Parameters:

attr (str) – The name of the attribute to access.

Raises:

KeyError – If the provided key is not a member of the struct.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s['field1']
'value'
>>> s['field2']
Traceback (most recent call last):
    ...
KeyError: ('field2', MyStruct(field1='value'))
__iter__()

Iterate through the keys of the Struct.

Yields:

Struct key.

Return type:

Iterator[str]

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(iter(s))
['field1', 'field2']
decode_result(method=None, *, caller=None)
Parameters:

method (RPCEndpoint | None)

Return type:

HexBytes | Wei | uint | ChainId | BlockNumber | AttributeDict | str

get(key, default=None)

Get the value associated with a key, or a default value if the key is not present.

Parameters:
  • key (str) – The key to look up.

  • default (optional) – The value to return if the key is not present.

Return type:

Any

Example

>>> class MyStruct(DictStruct):
...     field1: str
>>> s = MyStruct(field1="value")
>>> s.get('field1')
'value'
>>> s.get('field2', 'default')
'default'
items()

Returns an iterator over the struct’s field name and value pairs.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.items())
[('field1', 'value'), ('field2', 42)]
Return type:

Iterator[tuple[str, Any]]

keys()

Returns an iterator over the field names of the struct.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.keys())
['field1', 'field2']
Return type:

Iterator[str]

to_dict(method=None)

Returns a complete dictionary representation of this response Struct.

Parameters:

method (RPCEndpoint | None)

Return type:

RPCResponse

values()

Returns an iterator over the struct’s field values.

Example

>>> class MyStruct(DictStruct):
...     field1: str
...     field2: int
>>> s = MyStruct(field1="value", field2=42)
>>> list(s.values())
['value', 42]
Return type:

Iterator[Any]

error: Error | None

The error object, if the call failed.

property exception: Exception

If the rpc response contains an ‘error’ field, returns a specialized exception for the specified rpc error.

id: str | int | None

The response identifier, matching the request.

jsonrpc: Literal['2.0']

The JSON-RPC version, always set to “2.0”.

property payload_too_large: bool
result: Raw

The result of the RPC call, if successful.

class dank_mids.types.eth_callParams

Bases: TypedDict

__getitem__()

x.__getitem__(y) <==> x[y]

__init__(*args, **kwargs)
__iter__()

Implement iter(self).

clear() None.  Remove all items from D.
copy() a shallow copy of D
fromkeys(value=None, /)

Create a new dictionary with keys from iterable and values set to value.

get(key, default=None, /)

Return the value for key if key is in the dictionary, else default.

items() a set-like object providing a view on D's items
keys() a set-like object providing a view on D's keys
pop(k[, d]) v, remove specified key and return the corresponding value.

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem()

Remove and return a (key, value) pair as a 2-tuple.

Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.

setdefault(key, default=None, /)

Insert key with a value of default if key is not in the dictionary.

Return the value for key if key is in the dictionary, else default.

update([E, ]**F) None.  Update D from dict/iterable E and F.

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values() an object providing a view on D's values
data: str
to: ChecksumAddress
dank_mids.types.better_decode(data, *, type=None, dec_hook=None, method=None)
Parameters:
Return type:

T

dank_mids.types.decode_nested_dict(self, buf)

Deserialize an object from JSON.

Parameters:

buf (bytes-like or str) – The message to decode.

Returns:

obj – The deserialized object.

Return type:

Any

dank_mids.types.decode_string(self, buf)

Deserialize an object from JSON.

Parameters:

buf (bytes-like or str) – The message to decode.

Returns:

obj – The deserialized object.

Return type:

Any

dank_mids.types.encode(self, obj)

Serialize an object to bytes.

Parameters:

obj (Any) – The object to serialize.

Returns:

data – The serialized object.

Return type:

bytes

dank_mids.types.BatchId

A type representing the identifier for a batch of operations, which can be either an integer or a string.

alias of int | str

Module contents

class dank_mids.BlockSemaphore

Bases: _AbstractPrioritySemaphore

A semaphore for managing concurrency based on block numbers.

This class extends _AbstractPrioritySemaphore to provide block-specific concurrency control.

Parameters:
  • value – The initial value of the semaphore.

  • name – An optional name for the semaphore.

See also

_BlockSemaphoreContextManager: The context manager used by this semaphore.

__call__(self, fn: CoroFn[P, T]) CoroFn[P, T]

Decorator method to wrap coroutine functions with the semaphore.

This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

__getitem__(self, priority: PT | None) '_AbstractPrioritySemaphoreContextManager[PT]'

Gets the context manager for a given priority.

Parameters:
  • priority – The priority for which to get the context manager. If None, uses the top priority.

  • block (int | HexStr | Literal['latest', None])

Returns:

The context manager associated with the given priority.

Return type:

_BlockSemaphoreContextManager

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> context_manager = semaphore[priority]
__init__(value=1, *, name=None)

Initializes the priority semaphore.

Parameters:
  • value – The initial capacity of the semaphore.

  • name – An optional name for the semaphore, used for debugging.

Return type:

None

Examples

>>> semaphore = _AbstractPrioritySemaphore(5, name="test_semaphore")
acquire(self)

Acquires the semaphore with the top priority.

This method overrides Semaphore.acquire() to handle priority-based logic.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> await semaphore.acquire()
decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]

Wrap a coroutine function to ensure it runs with the semaphore.

Example

semaphore = Semaphore(5)

@semaphore async def limited():

return 1

Parameters:

fn (Callable[[~P], Awaitable[T]])

Return type:

Callable[[~P], Awaitable[T]]

locked(self) bool

Checks if the semaphore is locked.

Returns:

True if the semaphore cannot be acquired immediately, False otherwise.

Examples

>>> semaphore = _AbstractPrioritySemaphore(5)
>>> semaphore.locked()
release(self) void

Release a semaphore, incrementing the internal counter by one.

When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.

debug_logs_enabled

bool

Checks if debug logging is enabled for the logger.

Examples

>>> class MyClass(_LoggerMixin):
...     pass
...
>>> instance = MyClass()
>>> instance.debug_logs_enabled
False
Type:

_LoggerMixin.debug_logs_enabled

logger

Logger

Provides a logger instance specific to the class using this mixin.

The logger ID is constructed from the module and class name, and optionally includes an instance name if available.

Examples

>>> class MyClass(_LoggerMixin):
...     _name = "example"
...
>>> instance = MyClass()
>>> logger = instance.logger
>>> logger.name
'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin):
...     pass
...
>>> another_instance = AnotherClass()
>>> another_logger = another_instance.logger
>>> another_logger.name
'module_name.AnotherClass'

Note

Replace module_name with the actual module name where the class is defined.

Type:

_LoggerMixin.logger

name

str

Type:

Semaphore.name

class dank_mids.DankMiddleware

Bases: Web3Middleware

Web3 v7 middleware that routes async requests through a cached Dank controller.

__init__(w3)
Parameters:

w3 (AsyncWeb3 | None)

Return type:

None

async async_request_processor(method, params)
Parameters:
  • method (RPCEndpoint)

  • params (Any)

Return type:

Any

async async_response_processor(method, response)
Parameters:
  • method (RPCEndpoint)

  • response (RPCResponse)

Return type:

RPCResponse

async async_wrap_make_batch_request(make_batch_request)
Parameters:

make_batch_request (AsyncMakeBatchRequestFn)

Return type:

AsyncMakeBatchRequestFn

async async_wrap_make_request(make_request)
Parameters:

make_request (Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]])

Return type:

Callable[[RPCEndpoint, Any], Coroutine[Any, Any, RPCResponse]]

request_processor(method, params)
Parameters:
  • method (RPCEndpoint)

  • params (Any)

Return type:

Any

response_processor(method, response)
Parameters:
  • method (RPCEndpoint)

  • response (RPCResponse)

Return type:

RPCResponse

wrap_make_batch_request(make_batch_request)
Parameters:

make_batch_request (MakeBatchRequestFn)

Return type:

MakeBatchRequestFn

wrap_make_request(make_request)
Parameters:

make_request (MakeRequestFn)

Return type:

MakeRequestFn

class dank_mids.RetryEvent

Bases: object

Represents a single retry decision emitted by Dank Mids retry logic.

operation

Human-readable identifier for the operation being retried.

Type:

str

attempt

The 1-based retry attempt number (the first retry attempt is 1).

Type:

int

error

The exception that triggered the retry decision.

Type:

BaseException

max_attempts

Optional maximum attempts configured by the caller.

Type:

int | None

delay

Optional delay (seconds) before the next attempt.

Type:

float | None

component

Optional subsystem name (for example: “jsonrpc” or “multicall”).

Type:

str | None

will_retry

Whether the caller intends to retry after this event.

Type:

bool

metadata

Optional key/value tags for downstream logging or metrics.

Type:

dict[str, str] | None

timestamp

Unix timestamp when the event was created.

Type:

float

__init__(operation, attempt, error, max_attempts=None, delay=None, component=None, will_retry=True, metadata=None, timestamp=<factory>)
Parameters:
Return type:

None

attempt: int
component: str | None
delay: float | None
error: BaseException
max_attempts: int | None
metadata: dict[str, str] | None
operation: str
timestamp: float
will_retry: bool
class dank_mids.RetryObserver

Bases: Protocol

Callable observer interface for retry events.

Implementations should be fast and side-effect safe. If you need to do IO, buffer the event and ship it elsewhere.

__call__(event)

Handle a retry event.

Parameters:

event (RetryEvent)

Return type:

None

__init__(*args, **kwargs)
class dank_mids.StatsRetryObserver

Bases: object

Retry observer that records events into a stats collector.

When the provided collector lacks retry fields (for example, when the compiled stats module is imported), the observer falls back to an internal in-memory collector so retry handling does not raise AttributeError.

This observer intentionally does not auto-register itself. Wire it up explicitly via dank_mids.retry_observer.register_retry_observer().

__call__(event)

Call self as a function.

Parameters:

event (RetryEvent)

Return type:

None

__init__(collector=None)
Parameters:

collector (_RetryCollector | None)

Return type:

None

dank_mids.emit_retry_event(event)

Emit a retry event to all currently registered observers.

Observers are called in the order they were registered. A snapshot is taken to avoid iteration issues if observers register/unregister themselves.

Parameters:

event (RetryEvent)

Return type:

None

dank_mids.get_retry_observers()

Return a snapshot of currently registered retry observers.

Return type:

tuple[RetryObserver, …]

dank_mids.register_retry_observer(observer)

Register a retry observer.

Observers are stored as weak references. Keep a strong reference in your application if you do not want it to be garbage collected.

Parameters:

observer (RetryObserver)

Return type:

None

dank_mids.setup_dank_w3(async_w3)

Sets up a DankWeb3 instance from a given Web3 instance.

Parameters:

async_w3 (Web3) – The Web3 instance to be wrapped.

Returns:

A new DankWeb3 instance with Dank Middleware injected.

Return type:

DankWeb3

dank_mids.setup_dank_w3_from_sync(sync_w3)

Sets up a DankWeb3 instance from a given synchronous Web3 instance.

Parameters:

sync_w3 (Web3) – The synchronous Web3 instance to be wrapped.

Returns:

A new DankWeb3 instance with Dank Middleware injected, supporting batched asynchronous operations.

Return type:

DankWeb3

dank_mids.unregister_retry_observer(observer)

Unregister a previously registered retry observer.

Raises:

ValueError – If the observer is not currently registered.

Parameters:

observer (RetryObserver)

Return type:

None