dank_mids package
Subpackages
- dank_mids.brownie_patch package
- Submodules
- dank_mids.brownie_patch.call module
- dank_mids.brownie_patch.contract module
ContractContract.__init__()Contract.balance()Contract.decode_input()Contract.from_abi()Contract.from_ethpm()Contract.from_explorer()Contract.get_method()Contract.get_method_object()Contract.get_solc_version()Contract.info()Contract.remove_deployment()Contract.set_alias()Contract.abiContract.aliasContract.signaturesContract.topics
EventNameLogTopicMethodSignaturepatch_contract()retry_etherscan()
- dank_mids.brownie_patch.overloaded module
- dank_mids.brownie_patch.types module
DankContractCallDankContractCall.__call__()DankContractCall.__init__()DankContractCall.call()DankContractCall.coroutine()DankContractCall.decode_input()DankContractCall.decode_output()DankContractCall.encode_input()DankContractCall.estimate_gas()DankContractCall.info()DankContractCall.map()DankContractCall.transact()DankContractCall.abiDankContractCall.natspecDankContractCall.payableDankContractCall.signature
DankContractTxDankContractTx.__call__()DankContractTx.__init__()DankContractTx.call()DankContractTx.coroutine()DankContractTx.decode_input()DankContractTx.decode_output()DankContractTx.encode_input()DankContractTx.estimate_gas()DankContractTx.info()DankContractTx.map()DankContractTx.transact()DankContractTx.abiDankContractTx.natspecDankContractTx.payableDankContractTx.signature
DankOverloadedMethodDankOverloadedMethod.__call__()DankOverloadedMethod.__getitem__()DankOverloadedMethod.__init__()DankOverloadedMethod.call()DankOverloadedMethod.coroutine()DankOverloadedMethod.decode_input()DankOverloadedMethod.decode_output()DankOverloadedMethod.encode_input()DankOverloadedMethod.info()DankOverloadedMethod.map()DankOverloadedMethod.transact()DankOverloadedMethod.abiDankOverloadedMethod.methodsDankOverloadedMethod.signature
ContractMethodDankContractMethod
- Module contents
- dank_mids.helpers package
- Submodules
- dank_mids.helpers.batch_size module
- dank_mids.helpers.future module
DebuggableFutureDebuggableFuture.__init__()DebuggableFuture.__iter__()DebuggableFuture.add_done_callback()DebuggableFuture.cancel()DebuggableFuture.cancelled()DebuggableFuture.done()DebuggableFuture.exception()DebuggableFuture.get_loop()DebuggableFuture.remove_done_callback()DebuggableFuture.result()DebuggableFuture.set_exception()DebuggableFuture.set_result()DebuggableFuture.has_waitersDebuggableFuture.waiter_count
- dank_mids.helpers.hashing module
- dank_mids.helpers.lru_cache module
- dank_mids.helpers.method module
- Module contents
DebuggableFutureDebuggableFuture.__init__()DebuggableFuture.__iter__()DebuggableFuture.add_done_callback()DebuggableFuture.cancel()DebuggableFuture.cancelled()DebuggableFuture.done()DebuggableFuture.exception()DebuggableFuture.get_loop()DebuggableFuture.remove_done_callback()DebuggableFuture.result()DebuggableFuture.set_exception()DebuggableFuture.set_result()DebuggableFuture.has_waitersDebuggableFuture.waiter_count
gatherish()lru_cache_lite()lru_cache_lite_nonull()make_hashable()setup_dank_w3()setup_dank_w3_from_sync()
- dank_mids.stats package
Submodules
dank_mids.ENVIRONMENT_VARIABLES module
- dank_mids.ENVIRONMENT_VARIABLES.AIOHTTP_TIMEOUT: Final = EnvironmentVariable[int](name=`DANKMIDS_AIOHTTP_TIMEOUT`, default_value=1200, using_default=True)
Timeout value in seconds for aiohttp requests.
- dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_CALL_SEMAPHORE: Final = EnvironmentVariable[BlockSemaphore](name=`DANKMIDS_BROWNIE_CALL_SEMAPHORE`, default_value=100000, using_default=True)
Semaphore for limiting concurrent Brownie calls.
See also
dank_mids.semaphores.BlockSemaphore: The semaphore class used for concurrency control.
- dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_DECODER_PROCESSES = EnvironmentVariable[AsyncProcessPoolExecutor](name=`DANKMIDS_BROWNIE_DECODER_PROCESSES`, default_value=0, using_default=True)
Process pool for Brownie decoding operations.
See also
a_sync.AsyncProcessPoolExecutor: The executor class used for managing asynchronous processes.
- dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_ENCODER_PROCESSES = EnvironmentVariable[AsyncProcessPoolExecutor](name=`DANKMIDS_BROWNIE_ENCODER_PROCESSES`, default_value=0, using_default=True)
Process pool for Brownie encoding operations.
See also
a_sync.AsyncProcessPoolExecutor: The executor class used for managing asynchronous processes.
- dank_mids.ENVIRONMENT_VARIABLES.BROWNIE_ENCODER_SEMAPHORE: Final = EnvironmentVariable[BlockSemaphore](name=`DANKMIDS_BROWNIE_ENCODER_SEMAPHORE`, default_value=200000, using_default=True)
Semaphore for limiting concurrent Brownie encoding operations. This limits memory consumption.
See also
dank_mids.semaphores.BlockSemaphore: The semaphore class used for concurrency control.
- dank_mids.ENVIRONMENT_VARIABLES.COLLECTION_FACTOR: Final = EnvironmentVariable[int](name=`DANKMIDS_COLLECTION_FACTOR`, default_value=10, using_default=True)
Factor determining the size of data collection operations.
dank_mids.constants module
- dank_mids.constants.GAS_LIMIT: Final = 50000000
The gas limit constant imported from the
multicalllibrary.This value is used as the default gas limit for multicall operations.
- dank_mids.constants.MULTICALL2_DEPLOY_BLOCKS: Final[dict[Network, BlockNumber]] = {Network.Mainnet: 12336033, Network.Optimism: 722566, Network.Fantom: 16572242, Network.Arbitrum: 821923}
A dictionary mapping networks to the block numbers where Multicall2 was deployed.
- dank_mids.constants.MULTICALL2_OVERRIDE_CODE: Final = '0x608060405234801561001057600080fd5b50600436106100b45760003560e01c806372425d9d1161007157806372425d9d1461013d57806386d516e814610145578063a8b0574e1461014d578063bce38bd714610162578063c3077fa914610182578063ee82ac5e14610195576100b4565b80630f28c97d146100b9578063252dba42146100d757806327e86d6e146100f8578063399542e91461010057806342cbb15c146101225780634d2301cc1461012a575b600080fd5b6100c16101a8565b6040516100ce919061083b565b60405180910390f35b6100ea6100e53660046106bb565b6101ac565b6040516100ce9291906108ba565b6100c1610340565b61011361010e3660046106f6565b610353565b6040516100ce93929190610922565b6100c161036b565b6100c161013836600461069a565b61036f565b6100c161037c565b6100c1610380565b610155610384565b6040516100ce9190610814565b6101756101703660046106f6565b610388565b6040516100ce9190610828565b6101136101903660046106bb565b610533565b6100c16101a3366004610748565b610550565b4290565b8051439060609067ffffffffffffffff8111156101d957634e487b7160e01b600052604160045260246000fd5b60405190808252806020026020018201604052801561020c57816020015b60608152602001906001900390816101f75790505b50905060005b835181101561033a5760008085838151811061023e57634e487b7160e01b600052603260045260246000fd5b6020026020010151600001516001600160a01b031686848151811061027357634e487b7160e01b600052603260045260246000fd5b60200260200101516020015160405161028c91906107f8565b6000604051808303816000865af19150503d80600081146102c9576040519150601f19603f3d011682016040523d82523d6000602084013e6102ce565b606091505b5091509150816102f95760405162461bcd60e51b81526004016102f090610885565b60405180910390fd5b8084848151811061031a57634e487b7160e01b600052603260045260246000fd5b602002602001018190525050508080610332906109c2565b915050610212565b50915091565b600061034d60014361097b565b40905090565b43804060606103628585610388565b90509250925092565b4390565b6001600160a01b03163190565b4490565b4590565b4190565b6060815167ffffffffffffffff8111156103b257634e487b7160e01b600052604160045260246000fd5b6040519080825280602002602001820160405280156103eb57816020015b6103d8610554565b8152602001906001900390816103d05790505b50905060005b825181101561052c5760008084838151811061041d57634e487b7160e01b600052603260045260246000fd5b6020026020010151600001516001600160a01b031685848151811061045257634e487b7160e01b600052603260045260246000fd5b60200260200101516020015160405161046b91906107f8565b6000604051808303816000865af19150503d80600081146104a8576040519150601f19603f3d011682016040523d82523d6000602084013e6104ad565b606091505b509150915085156104d557816104d55760405162461bcd60e51b81526004016102f090610844565b604051806040016040528083151581526020018281525084848151811061050c57634e487b7160e01b600052603260045260246000fd5b602002602001018190525050508080610524906109c2565b9150506103f1565b5092915050565b6000806060610543600185610353565b9196909550909350915050565b4090565b60408051808201909152600081526060602082015290565b80356001600160a01b038116811461058357600080fd5b919050565b600082601f830112610598578081fd5b8135602067ffffffffffffffff808311156105b5576105b56109f3565b6105c2828385020161094a565b83815282810190868401865b8681101561068c57813589016040601f198181848f030112156105ef578a8bfd5b6105f88261094a565b6106038a850161056c565b81528284013589811115610615578c8dfd5b8085019450508d603f850112610629578b8cfd5b898401358981111561063d5761063d6109f3565b61064d8b84601f8401160161094a565b92508083528e84828701011115610662578c8dfd5b808486018c85013782018a018c9052808a01919091528652505092850192908501906001016105ce565b509098975050505050505050565b6000602082840312156106ab578081fd5b6106b48261056c565b9392505050565b6000602082840312156106cc578081fd5b813567ffffffffffffffff8111156106e2578182fd5b6106ee84828501610588565b949350505050565b60008060408385031215610708578081fd5b82358015158114610717578182fd5b9150602083013567ffffffffffffffff811115610732578182fd5b61073e85828601610588565b9150509250929050565b600060208284031215610759578081fd5b5035919050565b60008282518085526020808601955080818302840101818601855b848110156107bf57858303601f19018952815180511515845284015160408585018190526107ab818601836107cc565b9a86019a945050509083019060010161077b565b5090979650505050505050565b600081518084526107e4816020860160208601610992565b601f01601f19169290920160200192915050565b6000825161080a818460208701610992565b9190910192915050565b6001600160a01b0391909116815260200190565b6000602082526106b46020830184610760565b90815260200190565b60208082526021908201527f4d756c746963616c6c32206167677265676174653a2063616c6c206661696c656040820152601960fa1b606082015260800190565b6020808252818101527f4d756c746963616c6c206167677265676174653a2063616c6c206661696c6564604082015260600190565b600060408201848352602060408185015281855180845260608601915060608382028701019350828701855b8281101561091457605f198887030184526109028683516107cc565b955092840192908401906001016108e6565b509398975050505050505050565b6000848252836020830152606060408301526109416060830184610760565b95945050505050565b604051601f8201601f1916810167ffffffffffffffff81118282101715610973576109736109f3565b604052919050565b60008282101561098d5761098d6109dd565b500390565b60005b838110156109ad578181015183820152602001610995565b838111156109bc576000848401525b50505050565b60006000198214156109d6576109d66109dd565b5060010190565b634e487b7160e01b600052601160045260246000fd5b634e487b7160e01b600052604160045260246000fdfea2646970667358221220c1152f751f29ece4d7bce5287ceafc8a153de9c2c633e3f21943a87d845bd83064736f6c63430008010033'
The bytecode for the Multicall2 contract.
This is used for state override on blocks before the Multicall2 contract was deployed.
- dank_mids.constants.MULTICALL3_DEPLOY_BLOCKS: Final[dict[Network, BlockNumber]] = {Network.Mainnet: 14353601, Network.Optimism: 4286263, Network.Fantom: 33001987, Network.Base: 5022, Network.Arbitrum: 7654707}
A dictionary mapping networks to the block numbers where Multicall3 was deployed.
- dank_mids.constants.RETRY_ERRS: Final = {'batch limit exceeded', 'connection reset by peer', 'evm error', 'evm timeout', 'execution aborted (timeout =', 'request timed out', 'request timeout', 'server disconnected'}
A list of error messages that are expected during normal use and are not indicative of any problem(s).
These errors will be automatically retried until success is achieved.
- dank_mids.constants.REVERT_SELECTORS: Final = (b'\x08\xc3y\xa0', b'4e487b71')
A list of byte strings representing revert selectors.
These selectors are used to identify specific types of revert errors in Ethereum transactions.
- dank_mids.constants.TOO_MUCH_DATA_ERRS: Final = {'batch limit exceeded', 'content length too large', 'payload too large', 'request entity too large'}
A list of error messages indicating that the request sent to the RPC was too large and must be split up.
These error messages are used to identify when a request needs to be broken into smaller chunks.
dank_mids.controller module
- final class dank_mids.controller.DankMiddlewareController
Bases:
objectController for managing Dank Middleware operations.
This class handles the core functionality of Dank Mids, including request batching, call execution, and error handling.
See also
dank_mids.semaphores.BlockSemaphore: Used for managing concurrency of eth_calls made with the controller.- async __call__(method, params)
Asynchronous method to handle RPC calls.
This method routes different types of RPC calls to appropriate handlers, including specialized handling for eth_call and other methods that may use queues.
- Parameters:
method (RPCEndpoint) – The RPC method to be called.
params (Any) – The parameters for the RPC call.
- Returns:
The response from the RPC call.
- Return type:
RPCResponse
- __init__(w3)
Initialize the DankMiddlewareController.
- Parameters:
w3 (Web3) – The Web3 instance used to make RPC requests.
- Return type:
None
- async dispatch_pending_rpc_batch_and_wait(calls, timeout=120.0)
Append calls to the current pending JSON-RPC batch, dispatch it, and wait with a timeout.
- early_start()
Initiate processing of queued calls once the capacity gate is hit.
This method combines pending eth_calls and other RPC calls into a single batch, clears the pending Ethereum calls queue, and starts processing the combined batch. This is the immediate-dispatch gate used when the queue is full and we should not wait for another event-loop tick.
- Return type:
None
- async execute_batch()
Execute a batch of pending calls.
This method collects all pending eth calls and RPC calls, clears the pending queues, and executes them as a single batch.
- Return type:
None
- async make_request(method, params, request_id=None)
Makes an RPC request to the Ethereum node.
This method creates an RPC request with the given method and parameters, sends it to the node, and returns the raw response.
- Parameters:
- Returns:
The raw response from the Ethereum node.
- Raises:
Exception – If DEBUG environment variable is set, any exception that occurs during the request is logged and re-raised.
- Return type:
RawResponse
- reduce_batch_size(num_calls)
Decrease the size of JSON-RPC batches in response to failures.
Similar to
reduce_multicall_size(), this method is called when a JSON-RPC batch operation fails, allowing for dynamic adjustment of batch sizes.- Parameters:
num_calls (int) – The number of calls in the failed batch operation.
- Return type:
None
- reduce_multicall_size(num_calls)
Decrease the size of multicall batches in response to failures.
This method is called when a multicall operation fails, allowing the system to dynamically adjust the batch size to prevent future failures.
- Parameters:
num_calls (int) – The number of calls in the failed multicall operation.
- Return type:
None
- set_batch_size_limit(new_limit)
Set a new limit for the JSON-RPC batch size.
- Parameters:
new_limit (int) – The new maximum number of calls in a JSON-RPC batch.
- Return type:
None
- set_multicall_size_limit(new_limit)
Set a new limit for the multicall size.
- Parameters:
new_limit (int) – The new maximum number of calls in a multicall.
- Return type:
None
- no_multicall: Final[set[ChecksumAddress]]
A set of addresses that have issues when called from the multicall contract. Calls to these contracts will not be batched in multicalls.
- pending_eth_calls: Final[DefaultDict[BlockId, Multicall]]
A dictionary of pending
Multicallobjects by block. The Multicalls hold all pending eth_calls.
- pending_rpc_calls: JSONRPCBatch
A
JSONRPCBatchcontaining all pending rpc requests.
- property queue_is_full: bool
Check if the queue of pending calls is full.
- Returns:
True if the queue is full, False otherwise.
- request_type
The Struct class the controller will use to encode requests.
dank_mids.eth module
- class dank_mids.eth.DankEth
Bases:
AsyncEth- __init__(w3)
- Return type:
None
- attach_methods(methods)
- block_id_munger(account, block_identifier=None)
- Parameters:
account (Address | ChecksumAddress | ENS)
block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)
- Return type:
Tuple[Address | ChecksumAddress | ENS, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int]
- async call(transaction, block_identifier=None, state_override=None, ccip_read_enabled=None)
- Parameters:
transaction (TxParams)
block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)
state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)
ccip_read_enabled (bool | None)
- Return type:
- call_munger(transaction, block_identifier=None, state_override=None)
- Parameters:
transaction (TxParams)
block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)
state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)
- Return type:
Tuple[TxParams, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int] | ~typing.Tuple[~web3.types.TxParams, ~typing.Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int, ~typing.Dict[str | ~eth_typing.evm.Address | ~eth_typing.evm.ChecksumAddress, ~web3.types.StateOverrideParams]]
- contract(address=None, **kwargs)
- async create_access_list(transaction, block_identifier=None)
- Parameters:
transaction (TxParams)
block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)
- Return type:
CreateAccessListResponse
- create_access_list_munger(transaction, block_identifier=None)
- Parameters:
transaction (TxParams)
block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)
- Return type:
Tuple[TxParams, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int]
- async estimate_gas(transaction, block_identifier=None, state_override=None)
- Parameters:
transaction (TxParams)
block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)
state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)
- Return type:
- estimate_gas_munger(transaction, block_identifier=None, state_override=None)
- Parameters:
transaction (TxParams)
block_identifier (Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int | None)
state_override (Dict[str | Address | ChecksumAddress, StateOverrideParams] | None)
- Return type:
Tuple[TxParams, Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int] | ~typing.Tuple[~web3.types.TxParams, ~typing.Literal[‘latest’, ‘earliest’, ‘pending’, ‘safe’, ‘finalized’] | ~eth_typing.evm.BlockNumber | ~eth_typing.evm.Hash32 | ~eth_typing.encoding.HexStr | ~hexbytes.main.HexBytes | int, ~typing.Dict[str | ~eth_typing.evm.Address | ~eth_typing.evm.ChecksumAddress, ~web3.types.StateOverrideParams]]
- async fee_history(block_count, newest_block, reward_percentiles=None)
- filter_munger(filter_params=None, filter_id=None)
- generate_gas_price(transaction_params=None)
- Parameters:
transaction_params (TxParams | None)
- Return type:
Wei | None
- async get_balance(account, block_identifier=None)
- Parameters:
account (ChecksumAddress)
block_identifier (BlockNumber | None)
- async get_block(block_identifier, full_transactions=False)
- get_block_munger(block_identifier, full_transactions=False)
- async get_block_number()
Method object for web3 module methods
Calls to the Method go through these steps:
1. input munging - includes normalization, parameter checking, early parameter formatting. Any processing on the input parameters that need to happen before json_rpc method string selection occurs.
A note about mungers: The first (root) munger should reflect the desired
api function arguments. In other words, if the api function wants to behave as: get_balance(account, block_identifier=None), the root munger should accept these same arguments, with the addition of the module as the first argument e.g.:
``` def get_balance_root_munger(module, account, block_identifier=None):
- if block_identifier is None:
block_identifier = DEFAULT_BLOCK
return module, [account, block_identifier]
all mungers should return an argument list.
if no munger is provided, a default munger expecting no method arguments will be used.
2. method selection - The json_rpc_method argument can be method string or a function that returns a method string. If a callable is provided the processed method inputs are passed to the method selection function, and the returned method string is used.
3. request and response formatters are set - formatters are retrieved using the json rpc method string.
4. After the parameter processing from steps 1-3 the request is made using the calling function returned by the module attribute
retrieve_caller_fnand the response formatters are applied to the output.- Return type:
BlockNumber
- async get_block_receipts(block_identifier)
- async get_block_timestamp(block_identifier)
Retrieves only the timestamp from a specific block.
This method skips decoding the rest of the Block response data.
- Parameters:
block_identifier (int) – The block number from which to retrieve the timestamp.
- Return type:
Example
>>> print(await dank_mids.eth.get_block_timestamp(12345678))
- async get_code(account, block_identifier=None)
- Parameters:
account (ChecksumAddress)
block_identifier (BlockNumber | None)
- async get_logs(*args, decode_to=tuple[evmspec.structs.log.Log, ...], decode_hook=<function _decode_hook>, **kwargs)
Fetches logs and decodes them into the specified format.
- async get_raw_transaction(transaction_hash)
- async get_raw_transaction_by_block(block_identifier, index)
- async get_storage_at(account, position, block_identifier=None)
- get_storage_at_munger(account, position, block_identifier=None)
- async get_transaction(transaction_hash)
Retrieves a transaction by its hash and attempts to decode it.
- Parameters:
transaction_hash (HexStr) – The hash of the transaction to retrieve.
- Raises:
ValidationError – If the transaction cannot be decoded into either Transaction or TransactionRLP format.
- Return type:
TransactionLegacy | Transaction2930 | Transaction1559 | Transaction4844 | Transaction7702 | TransactionRLP
- async get_transaction_by_block(block_identifier, index)
- async get_transaction_count(account, block_identifier=None)
- Parameters:
account (ChecksumAddress)
block_identifier (BlockNumber | None)
- async get_transaction_receipt(*args, decode_to=<class 'evmspec.structs.receipt.TransactionReceipt'>, decode_hook=<function _decode_hook>, **kwargs)
Fetches the transaction receipt and decodes it into the specified format.
- async get_transaction_status(transaction_hash)
Retrieves the status of a transaction.
- async get_transactions(block_identifier: int | HexStr) list[TransactionLegacy | Transaction2930 | Transaction1559 | Transaction4844 | Transaction7702]
- async get_transactions(block_identifier: int | HexStr, hashes_only: Literal[True]) list[TransactionHash]
- async get_transactions(block_identifier: int | HexStr, hashes_only: Literal[False]) list[TransactionLegacy | Transaction2930 | Transaction1559 | Transaction4844 | Transaction7702]
Retrieves only the transactions from a specific block.
This method skips decoding the rest of the Block response data.
- Parameters:
block_identifier – The block number or hash from which to retrieve the transactions.
hashes_only – If True, only transaction hashes will be returned.
Example
>>> [print(tx.hash) for tx in await dank_mids.eth.get_transactions(12345678))
- get_uncle_count(block_identifier)
- async modify_transaction(transaction_hash, **transaction_params)
- async replace_transaction(transaction_hash, new_transaction)
- async send_raw_transaction(transaction)
- send_transaction_munger(transaction)
- Parameters:
transaction (TxParams)
- Return type:
Tuple[TxParams]
- set_contract_factory(contract_factory)
- Parameters:
contract_factory (Type[AsyncContract | AsyncContractCaller])
- Return type:
None
- set_gas_price_strategy(gas_price_strategy)
- async sign(account, data=None, hexstr=None, text=None)
- sign_munger(account, data=None, hexstr=None, text=None)
- async sign_transaction(transaction)
- Parameters:
transaction (TxParams)
- Return type:
SignedTx
- async sign_typed_data(account, data)
- async simulate_v1(payload, block_identifier)
- async subscribe(subscription_type, subscription_arg=None, handler=None, handler_context=None, label=None, parallelize=None)
- Parameters:
- Return type:
HexStr
- async trace_filter(filter_params, decode_to=list[typing.Union[evmspec.structs.trace.call.Trace, evmspec.structs.trace.create.Trace, evmspec.structs.trace.reward.Trace, evmspec.structs.trace.suicide.Trace]], decode_hook=<function _decode_hook>)
Returns all traces matching a filter. If the decoding to the specified type fails, the method logs problematic traces and re-raises the exception as a diagnostic aid.
- Parameters:
filter_params (TraceFilterParams) – The parameters defining the traces to filter.
decode_to (type[T]) – The class to which traces should be decoded.
decode_hook (Callable[[type[T], Any], T] | None) – Hook function to assist in decoding.
- Raises:
ValidationError – If a trace cannot be decoded.
- Return type:
T
- async trace_transaction(transaction_hash)
Returns all traces produced by a transaction.
- Parameters:
transaction_hash (str) – The hash of the transaction to trace.
- Return type:
Example
>>> traces = await dank_mids.eth.trace_transaction('0x...')
- async wait_for_transaction_receipt(transaction_hash, timeout=120, poll_latency=0.1)
- account = <eth_account.account.Account object>
- property blob_base_fee: Wei
- property block_number: BlockNumber
- chain_id
- property codec: ABICodec
- property default_account: ChecksumAddress | Empty
- property default_block: Literal['latest', 'earliest', 'pending', 'safe', 'finalized'] | BlockNumber | Hash32 | HexStr | HexBytes | int
- property gas_price: Wei
- get_filter_changes
Custom method class to bypass web3py’s default result formatters.
This class processes parameters, makes conditional adjustments to the parameters, and selects specific methods to call based on those parameters. It bypasses the standard result formatters by applying a no-operation formatter, effectively returning responses as-is or using predefined formatters based on the RPC call.
- get_filter_logs
Custom method class to bypass web3py’s default result formatters.
This class processes parameters, makes conditional adjustments to the parameters, and selects specific methods to call based on those parameters. It bypasses the standard result formatters by applying a no-operation formatter, effectively returning responses as-is or using predefined formatters based on the RPC call.
- is_async = True
- property max_priority_fee: Wei
Try to use eth_maxPriorityFeePerGas but, since this is not part of the spec and is only supported by some clients, fall back to an eth_feeHistory calculation with min and max caps.
- meth
Custom method class to bypass web3py’s default result formatters.
This class processes parameters, makes conditional adjustments to the parameters, and selects specific methods to call based on those parameters. It bypasses the standard result formatters by applying a no-operation formatter, effectively returning responses as-is or using predefined formatters based on the RPC call.
- w3: AsyncWeb3[Any]
- class dank_mids.eth.TraceFilterParams
Bases:
TypedDict- __getitem__()
x.__getitem__(y) <==> x[y]
- __init__(*args, **kwargs)
- __iter__()
Implement iter(self).
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- fromkeys(value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values
- dank_mids.eth.decode_timestamped(self, buf)
Deserialize an object from JSON.
- Parameters:
buf (bytes-like or str) – The message to decode.
- Returns:
obj – The deserialized object.
- Return type:
Any
- dank_mids.eth.decode_tiny_block(self, buf)
Deserialize an object from JSON.
- Parameters:
buf (bytes-like or str) – The message to decode.
- Returns:
obj – The deserialized object.
- Return type:
Any
dank_mids.exceptions module
- exception dank_mids.exceptions.BrownieNotConnectedError
Bases:
RuntimeErrorRuntimeErrorraised when brownie is not connected but needs to be in order to access functionality withindank_mids.brownie_patch.
- exception dank_mids.exceptions.BrowniePatchImportError
Bases:
ImportErrorImportErrorraised when brownie integration cannot be imported or initialized.- __init__(obj_name, exc)
- Parameters:
obj_name (str)
exc (ImportError)
- Return type:
None
- msg
exception message
- name
module name
- path
module path
- exception dank_mids.exceptions.BrowniePatchNotInitializedError
Bases:
RuntimeErrorRuntimeErrorraised when brownie is connected but brownie patch was not initialized.
- exception dank_mids.exceptions.GarbageCollectionError
Bases:
RuntimeErrorException raised when an object is garbage collected prematurely.
- exception dank_mids.exceptions.Revert
Bases:
ValueError
dank_mids.lock module
- final class dank_mids.lock.AlertingRLock
Bases:
object- __init__(name)
Initializes the reentrant lock with a given name.
The name can be used for debugging and logging purposes to track lock usage.
Examples
>>> lock = AlertingRLock(name="example") >>> lock._name 'example'
See also
UIDGenerator- Parameters:
name (str)
- Return type:
None
- acquire(blocking=True, timeout=-1.0)
Acquire a lock, blocking or non-blocking, with alerting if not acquired within 5 seconds.
When invoked, this method first attempts to acquire the lock with a 5-second timeout. If the lock is not acquired within 5 seconds, a warning is logged, and the method proceeds to acquire the lock using the provided blocking and timeout arguments, matching the semantics of the builtin RLock.
- Parameters:
- Returns:
True if the lock is acquired, False otherwise.
- Return type:
See also
_RLock.acquire()
- release()
Release a lock, decrementing the recursion level.
If after the decrement it is zero, reset the lock to unlocked (not owned by any thread), and if any other threads are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed. If after the decrement the recursion level is still nonzero, the lock remains locked and owned by the calling thread.
Only call this method when the calling thread owns the lock. A RuntimeError is raised if this method is called when the lock is unlocked.
There is no return value.
- Return type:
None
- final class dank_mids.lock.Lock
Bases:
objectPrimitive lock objects.
A primitive lock is a synchronization primitive that is not owned by a particular coroutine when locked. A primitive lock is in one of two states, ‘locked’ or ‘unlocked’.
It is created in the unlocked state. It has two basic methods, acquire() and release(). When the state is unlocked, acquire() changes the state to locked and returns immediately. When the state is locked, acquire() blocks until a call to release() in another coroutine changes it to unlocked, then the acquire() call resets it to locked and returns. The release() method should only be called in the locked state; it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError will be raised.
When more than one coroutine is blocked in acquire() waiting for the state to turn to unlocked, only one coroutine proceeds when a release() call resets the state to unlocked; first coroutine which is blocked in acquire() is being processed.
acquire() is a coroutine and should be called with ‘await’.
Locks also support the asynchronous context management protocol. ‘async with lock’ statement should be used.
Usage:
lock = Lock() … await lock.acquire() try:
…
- finally:
lock.release()
Context manager usage:
lock = Lock() … async with lock:
…
Lock objects can be tested for locking state:
- if not lock.locked():
await lock.acquire()
- else:
# lock is acquired …
- __init__()
- Return type:
None
- async acquire()
Acquire a lock.
This method blocks until the lock is unlocked, then sets it to locked and returns True.
- Return type:
Literal[True]
- release()
Release a lock.
When the lock is locked, reset it to unlocked, and return. If any other coroutines are blocked waiting for the lock to become unlocked, allow exactly one of them to proceed.
When invoked on an unlocked lock, a RuntimeError is raised.
There is no return value.
- Return type:
None
dank_mids.logging module
- class dank_mids.logging.CLogger
Bases:
Logger- __init__(name, level=0)
Initialize the logger with a name and an optional level.
- addFilter(filter)
Add the specified filter to this handler.
- addHandler(hdlr)
Add the specified handler to this logger.
- callHandlers(record)
Pass a record to all relevant handlers.
Loop through all handlers for this logger and its parents in the logger hierarchy. If no handler was found, output a one-off error message to sys.stderr. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger whose handlers are called.
- critical(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘CRITICAL’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.critical(“Houston, we have a %s”, “major disaster”, exc_info=1)
- debug(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘DEBUG’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.debug(“Houston, we have a %s”, “thorny problem”, exc_info=1)
- error(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘ERROR’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.error(“Houston, we have a %s”, “major problem”, exc_info=1)
- exception(msg, *args, exc_info=True, **kwargs)
Convenience method for logging an ERROR with exception information.
- Parameters:
msg (object)
args (Any)
exc_info (logging._ExcInfoType)
kwargs (Any)
- Return type:
None
- fatal(msg, *args, **kwargs)
Don’t use this method, use critical() instead.
- filter(record)
Determine if a record is loggable by consulting all the filters.
The default is to allow the record to be logged; any filter can veto this and the record is then dropped. Returns a zero value if a record is to be dropped, else non-zero.
Changed in version 3.2: Allow filters to be just callables.
- findCaller(stack_info=False, stacklevel=1)
Find the stack frame of the caller so that we can note the source file name, line number and function name.
- getChild(suffix)
Get a logger which is a descendant to this one.
This is a convenience method, such that
logging.getLogger(‘abc’).getChild(‘def.ghi’)
is the same as
logging.getLogger(‘abc.def.ghi’)
It’s useful, for example, when the parent logger is named using __name__ rather than a literal string.
- getEffectiveLevel()
Get the effective level for this logger.
Loop through this logger and its parents in the logger hierarchy, looking for a non-zero logging level. Return the first one found.
- Return type:
- handle(record)
Call the handlers for the specified record.
This method is used for unpickled records received from a socket, as well as those created locally. Logger-level filtering is applied.
- hasHandlers()
See if this logger has any handlers configured.
Loop through all handlers for this logger and its parents in the logger hierarchy. Return True if a handler was found, else False. Stop searching up the hierarchy whenever a logger with the “propagate” attribute set to zero is found - that will be the last logger which is checked for the existence of handlers.
- info(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘INFO’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.info(“Houston, we have a %s”, “interesting problem”, exc_info=1)
- isEnabledFor(level)
Is this logger enabled for level ‘level’?
- log(level, msg, *args, **kwargs)
Log ‘msg % args’ with the integer severity ‘level’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.log(level, “We have a %s”, “mysterious problem”, exc_info=1)
- makeRecord(name, level, fn, lno, msg, args, exc_info, func=None, extra=None, sinfo=None)
A factory method which can be overridden in subclasses to create specialized LogRecords.
- removeFilter(filter)
Remove the specified filter from this handler.
- removeHandler(hdlr)
Remove the specified handler from this logger.
- setLevel(level)
Set the logging level of this logger. level must be an int or a str.
- warn(msg, *args, **kwargs)
- warning(msg, *args, **kwargs)
Log ‘msg % args’ with severity ‘WARNING’.
To pass exception information, use the keyword argument exc_info with a true value, e.g.
logger.warning(“Houston, we have a %s”, “bit of a problem”, exc_info=1)
- manager = <logging.Manager object>
- root = <RootLogger root (WARNING)>
dank_mids.middleware module
- class dank_mids.middleware.DankMiddleware
Bases:
Web3MiddlewareWeb3 v7 middleware that routes async requests through a cached Dank controller.
- __init__(w3)
- Parameters:
w3 (AsyncWeb3 | None)
- Return type:
None
- async async_request_processor(method, params)
- async async_response_processor(method, response)
- Parameters:
method (RPCEndpoint)
response (RPCResponse)
- Return type:
RPCResponse
- async async_wrap_make_batch_request(make_batch_request)
- Parameters:
make_batch_request (AsyncMakeBatchRequestFn)
- Return type:
AsyncMakeBatchRequestFn
- async async_wrap_make_request(make_request)
- response_processor(method, response)
- Parameters:
method (RPCEndpoint)
response (RPCResponse)
- Return type:
RPCResponse
- wrap_make_batch_request(make_batch_request)
- Parameters:
make_batch_request (MakeBatchRequestFn)
- Return type:
MakeBatchRequestFn
- wrap_make_request(make_request)
- Parameters:
make_request (MakeRequestFn)
- Return type:
MakeRequestFn
dank_mids.retry_observer module
- class dank_mids.retry_observer.RetryEvent
Bases:
objectRepresents a single retry decision emitted by Dank Mids retry logic.
- error
The exception that triggered the retry decision.
- Type:
- __init__(operation, attempt, error, max_attempts=None, delay=None, component=None, will_retry=True, metadata=None, timestamp=<factory>)
- error: BaseException
- class dank_mids.retry_observer.RetryObserver
Bases:
ProtocolCallable observer interface for retry events.
Implementations should be fast and side-effect safe. If you need to do IO, buffer the event and ship it elsewhere.
- __call__(event)
Handle a retry event.
- Parameters:
event (RetryEvent)
- Return type:
None
- __init__(*args, **kwargs)
- dank_mids.retry_observer.emit_retry_event(event)
Emit a retry event to all currently registered observers.
Observers are called in the order they were registered. A snapshot is taken to avoid iteration issues if observers register/unregister themselves.
- Parameters:
event (RetryEvent)
- Return type:
None
- dank_mids.retry_observer.get_retry_observers()
Return a snapshot of currently registered retry observers.
- Return type:
tuple[RetryObserver, …]
- dank_mids.retry_observer.register_retry_observer(observer)
Register a retry observer.
Observers are stored as weak references. Keep a strong reference in your application if you do not want it to be garbage collected.
- Parameters:
observer (RetryObserver)
- Return type:
None
- dank_mids.retry_observer.unregister_retry_observer(observer)
Unregister a previously registered retry observer.
- Raises:
ValueError – If the observer is not currently registered.
- Parameters:
observer (RetryObserver)
- Return type:
None
dank_mids.semaphores module
- class dank_mids.semaphores.BlockSemaphore
Bases:
_AbstractPrioritySemaphoreA semaphore for managing concurrency based on block numbers.
This class extends
_AbstractPrioritySemaphoreto provide block-specific concurrency control.- Parameters:
value – The initial value of the semaphore.
name – An optional name for the semaphore.
See also
_BlockSemaphoreContextManager: The context manager used by this semaphore.- __call__(self, fn: CoroFn[P, T]) CoroFn[P, T]
Decorator method to wrap coroutine functions with the semaphore.
This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __getitem__(self, priority: PT | None) '_AbstractPrioritySemaphoreContextManager[PT]'
Gets the context manager for a given priority.
- Parameters:
- Returns:
The context manager associated with the given priority.
- Return type:
_BlockSemaphoreContextManager
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> context_manager = semaphore[priority]
- __init__(value=1, *, name=None)
Initializes the priority semaphore.
- Parameters:
value – The initial capacity of the semaphore.
name – An optional name for the semaphore, used for debugging.
- Return type:
None
Examples
>>> semaphore = _AbstractPrioritySemaphore(5, name="test_semaphore")
- acquire(self)
Acquires the semaphore with the top priority.
This method overrides
Semaphore.acquire()to handle priority-based logic.Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> await semaphore.acquire()
- decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]
Wrap a coroutine function to ensure it runs with the semaphore.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked(self) bool
Checks if the semaphore is locked.
- Returns:
True if the semaphore cannot be acquired immediately, False otherwise.
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> semaphore.locked()
- release(self) void
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- debug_logs_enabled
bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- Type:
_LoggerMixin.debug_logs_enabled
- logger
Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
- Type:
_LoggerMixin.logger
- name
str
- Type:
Semaphore.name
dank_mids.types module
- class dank_mids.types.BlockId
A type representing the identifier for a specific block in the blockchain.
alias of
str
- class dank_mids.types.ChainstackRateLimitContext
Bases:
DictStructChainstack doesn’t use status code 429 for rate limiting, they attach one of these objects to a 200 response.
- __getitem__(attr)
Lookup an attribute value via dictionary-style access.
- Parameters:
attr (str) – The name of the attribute to access.
- Raises:
KeyError – If the provided key is not a member of the struct.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s['field1'] 'value' >>> s['field2'] Traceback (most recent call last): ... KeyError: ('field2', MyStruct(field1='value'))
- __iter__()
Iterate through the keys of the Struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(iter(s)) ['field1', 'field2']
- get(key, default=None)
Get the value associated with a key, or a default value if the key is not present.
- Parameters:
key (str) – The key to look up.
default (optional) – The value to return if the key is not present.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s.get('field1') 'value' >>> s.get('field2', 'default') 'default'
- items()
Returns an iterator over the struct’s field name and value pairs.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.items()) [('field1', 'value'), ('field2', 42)]
- keys()
Returns an iterator over the field names of the struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.keys()) ['field1', 'field2']
- values()
Returns an iterator over the struct’s field values.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.values()) ['value', 42]
- property try_again_in: float
Calculates the time to wait before retrying the request.
- Returns:
The number of seconds to wait before retrying.
- Raises:
NotImplementedError – If the time string is in an unrecognized format.
- class dank_mids.types.Error
Bases:
DictStructRepresents an error in a JSON-RPC response.
- __getitem__(attr)
Lookup an attribute value via dictionary-style access.
- Parameters:
attr (str) – The name of the attribute to access.
- Raises:
KeyError – If the provided key is not a member of the struct.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s['field1'] 'value' >>> s['field2'] Traceback (most recent call last): ... KeyError: ('field2', MyStruct(field1='value'))
- __iter__()
Iterate through the keys of the Struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(iter(s)) ['field1', 'field2']
- get(key, default=None)
Get the value associated with a key, or a default value if the key is not present.
- Parameters:
key (str) – The key to look up.
default (optional) – The value to return if the key is not present.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s.get('field1') 'value' >>> s.get('field2', 'default') 'default'
- items()
Returns an iterator over the struct’s field name and value pairs.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.items()) [('field1', 'value'), ('field2', 42)]
- keys()
Returns an iterator over the field names of the struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.keys()) ['field1', 'field2']
- to_dict(*, context: None = None) RPCError
- to_dict(*, context: PartialRequest) RPCErrorWithContext
- values()
Returns an iterator over the struct’s field values.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.values()) ['value', 42]
- data: str | ChainstackRateLimitContext | None
Additional error data, if any.
EVM specs say it should be a string, but some providers will return a dictionary here with even more context.
- dank_mids.types.JsonrpcParams
A list of parameters for JSON-RPC calls, which should be eth_callParams, BlockId, and OverrideParams, in that order.
- dank_mids.types.Multicalls
A dictionary mapping BlockId to Multicall objects.
See also
dank_mids._requests.Multicall: The Multicall class used in this mapping.
- class dank_mids.types.OverrideParams
Bases:
TypedDict- __getitem__()
x.__getitem__(y) <==> x[y]
- __init__(*args, **kwargs)
- __iter__()
Implement iter(self).
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- fromkeys(value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values
- class dank_mids.types.PartialRequest
Bases:
DictStructRepresents a partial JSON-RPC request.
While technially part of a request, we can successfully make requests to many nodes without including the jsonrpc field.
This class leaves off the jsonrpc field reduce encoding burden and web traffic.
This works with many but not all nodes.
- __getitem__(attr)
Lookup an attribute value via dictionary-style access.
- Parameters:
attr (str) – The name of the attribute to access.
- Raises:
KeyError – If the provided key is not a member of the struct.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s['field1'] 'value' >>> s['field2'] Traceback (most recent call last): ... KeyError: ('field2', MyStruct(field1='value'))
- __iter__()
Iterate through the keys of the Struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(iter(s)) ['field1', 'field2']
- get(key, default=None)
Get the value associated with a key, or a default value if the key is not present.
- Parameters:
key (str) – The key to look up.
default (optional) – The value to return if the key is not present.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s.get('field1') 'value' >>> s.get('field2', 'default') 'default'
- items()
Returns an iterator over the struct’s field name and value pairs.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.items()) [('field1', 'value'), ('field2', 42)]
- keys()
Returns an iterator over the field names of the struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.keys()) ['field1', 'field2']
- values()
Returns an iterator over the struct’s field values.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.values()) ['value', 42]
- class dank_mids.types.PartialResponse
Bases:
DictStructRepresents a partial JSON-RPC response.
We use these to more efficiently decode responses from the node.
- __getitem__(attr)
Lookup an attribute value via dictionary-style access.
- Parameters:
attr (str) – The name of the attribute to access.
- Raises:
KeyError – If the provided key is not a member of the struct.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s['field1'] 'value' >>> s['field2'] Traceback (most recent call last): ... KeyError: ('field2', MyStruct(field1='value'))
- __iter__()
Iterate through the keys of the Struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(iter(s)) ['field1', 'field2']
- decode_result(method=None, *, caller=None)
- Parameters:
method (RPCEndpoint | None)
- Return type:
HexBytes | Wei | uint | ChainId | BlockNumber | AttributeDict | str
- get(key, default=None)
Get the value associated with a key, or a default value if the key is not present.
- Parameters:
key (str) – The key to look up.
default (optional) – The value to return if the key is not present.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s.get('field1') 'value' >>> s.get('field2', 'default') 'default'
- items()
Returns an iterator over the struct’s field name and value pairs.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.items()) [('field1', 'value'), ('field2', 42)]
- keys()
Returns an iterator over the field names of the struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.keys()) ['field1', 'field2']
- to_dict(method=None)
Returns a complete dictionary representation of this response
Struct.- Parameters:
method (RPCEndpoint | None)
- Return type:
RPCResponse
- values()
Returns an iterator over the struct’s field values.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.values()) ['value', 42]
- property exception: Exception
If the rpc response contains an ‘error’ field, returns a specialized exception for the specified rpc error.
- result: Raw
The result of the RPC call, if successful.
- class dank_mids.types.RPCErrorWithContext
Bases:
dict- __getitem__()
x.__getitem__(y) <==> x[y]
- __init__(*args, **kwargs)
- __iter__()
Implement iter(self).
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- fromkeys(value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values
- dank_mids_added_context: PartialRequest
- class dank_mids.types.Request
Bases:
PartialRequestRepresents a complete JSON-RPC request.
Inherits from PartialRequest and adds the JSON-RPC version.
- __getitem__(attr)
Lookup an attribute value via dictionary-style access.
- Parameters:
attr (str) – The name of the attribute to access.
- Raises:
KeyError – If the provided key is not a member of the struct.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s['field1'] 'value' >>> s['field2'] Traceback (most recent call last): ... KeyError: ('field2', MyStruct(field1='value'))
- __iter__()
Iterate through the keys of the Struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(iter(s)) ['field1', 'field2']
- get(key, default=None)
Get the value associated with a key, or a default value if the key is not present.
- Parameters:
key (str) – The key to look up.
default (optional) – The value to return if the key is not present.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s.get('field1') 'value' >>> s.get('field2', 'default') 'default'
- items()
Returns an iterator over the struct’s field name and value pairs.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.items()) [('field1', 'value'), ('field2', 42)]
- keys()
Returns an iterator over the field names of the struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.keys()) ['field1', 'field2']
- values()
Returns an iterator over the struct’s field values.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.values()) ['value', 42]
- class dank_mids.types.Response
Bases:
PartialResponseRepresents a complete JSON-RPC response.
Inherits from PartialResponse and adds the response identifier and JSON-RPC version.
- __getitem__(attr)
Lookup an attribute value via dictionary-style access.
- Parameters:
attr (str) – The name of the attribute to access.
- Raises:
KeyError – If the provided key is not a member of the struct.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s['field1'] 'value' >>> s['field2'] Traceback (most recent call last): ... KeyError: ('field2', MyStruct(field1='value'))
- __iter__()
Iterate through the keys of the Struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(iter(s)) ['field1', 'field2']
- decode_result(method=None, *, caller=None)
- Parameters:
method (RPCEndpoint | None)
- Return type:
HexBytes | Wei | uint | ChainId | BlockNumber | AttributeDict | str
- get(key, default=None)
Get the value associated with a key, or a default value if the key is not present.
- Parameters:
key (str) – The key to look up.
default (optional) – The value to return if the key is not present.
- Return type:
Example
>>> class MyStruct(DictStruct): ... field1: str >>> s = MyStruct(field1="value") >>> s.get('field1') 'value' >>> s.get('field2', 'default') 'default'
- items()
Returns an iterator over the struct’s field name and value pairs.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.items()) [('field1', 'value'), ('field2', 42)]
- keys()
Returns an iterator over the field names of the struct.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.keys()) ['field1', 'field2']
- to_dict(method=None)
Returns a complete dictionary representation of this response
Struct.- Parameters:
method (RPCEndpoint | None)
- Return type:
RPCResponse
- values()
Returns an iterator over the struct’s field values.
Example
>>> class MyStruct(DictStruct): ... field1: str ... field2: int >>> s = MyStruct(field1="value", field2=42) >>> list(s.values()) ['value', 42]
- property exception: Exception
If the rpc response contains an ‘error’ field, returns a specialized exception for the specified rpc error.
- result: Raw
The result of the RPC call, if successful.
- class dank_mids.types.eth_callParams
Bases:
TypedDict- __getitem__()
x.__getitem__(y) <==> x[y]
- __init__(*args, **kwargs)
- __iter__()
Implement iter(self).
- clear() None. Remove all items from D.
- copy() a shallow copy of D
- fromkeys(value=None, /)
Create a new dictionary with keys from iterable and values set to value.
- get(key, default=None, /)
Return the value for key if key is in the dictionary, else default.
- items() a set-like object providing a view on D's items
- keys() a set-like object providing a view on D's keys
- pop(k[, d]) v, remove specified key and return the corresponding value.
If the key is not found, return the default if given; otherwise, raise a KeyError.
- popitem()
Remove and return a (key, value) pair as a 2-tuple.
Pairs are returned in LIFO (last-in, first-out) order. Raises KeyError if the dict is empty.
- setdefault(key, default=None, /)
Insert key with a value of default if key is not in the dictionary.
Return the value for key if key is in the dictionary, else default.
- update([E, ]**F) None. Update D from dict/iterable E and F.
If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
- values() an object providing a view on D's values
- to: ChecksumAddress
- dank_mids.types.better_decode(data, *, type=None, dec_hook=None, method=None)
- dank_mids.types.decode_nested_dict(self, buf)
Deserialize an object from JSON.
- Parameters:
buf (bytes-like or str) – The message to decode.
- Returns:
obj – The deserialized object.
- Return type:
Any
- dank_mids.types.decode_string(self, buf)
Deserialize an object from JSON.
- Parameters:
buf (bytes-like or str) – The message to decode.
- Returns:
obj – The deserialized object.
- Return type:
Any
- dank_mids.types.encode(self, obj)
Serialize an object to bytes.
- Parameters:
obj (Any) – The object to serialize.
- Returns:
data – The serialized object.
- Return type:
Module contents
- class dank_mids.BlockSemaphore
Bases:
_AbstractPrioritySemaphoreA semaphore for managing concurrency based on block numbers.
This class extends
_AbstractPrioritySemaphoreto provide block-specific concurrency control.- Parameters:
value – The initial value of the semaphore.
name – An optional name for the semaphore.
See also
_BlockSemaphoreContextManager: The context manager used by this semaphore.- __call__(self, fn: CoroFn[P, T]) CoroFn[P, T]
Decorator method to wrap coroutine functions with the semaphore.
This allows rewriting the pattern of acquiring a semaphore within a coroutine using a decorator.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- __getitem__(self, priority: PT | None) '_AbstractPrioritySemaphoreContextManager[PT]'
Gets the context manager for a given priority.
- Parameters:
- Returns:
The context manager associated with the given priority.
- Return type:
_BlockSemaphoreContextManager
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> context_manager = semaphore[priority]
- __init__(value=1, *, name=None)
Initializes the priority semaphore.
- Parameters:
value – The initial capacity of the semaphore.
name – An optional name for the semaphore, used for debugging.
- Return type:
None
Examples
>>> semaphore = _AbstractPrioritySemaphore(5, name="test_semaphore")
- acquire(self)
Acquires the semaphore with the top priority.
This method overrides
Semaphore.acquire()to handle priority-based logic.Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> await semaphore.acquire()
- decorate(self, fn: CoroFn[P, T]) CoroFn[P, T]
Wrap a coroutine function to ensure it runs with the semaphore.
Example
semaphore = Semaphore(5)
@semaphore async def limited():
return 1
- locked(self) bool
Checks if the semaphore is locked.
- Returns:
True if the semaphore cannot be acquired immediately, False otherwise.
Examples
>>> semaphore = _AbstractPrioritySemaphore(5) >>> semaphore.locked()
- release(self) void
Release a semaphore, incrementing the internal counter by one.
When it was zero on entry and another coroutine is waiting for it to become larger than zero again, wake up that coroutine.
- debug_logs_enabled
bool
Checks if debug logging is enabled for the logger.
Examples
>>> class MyClass(_LoggerMixin): ... pass ... >>> instance = MyClass() >>> instance.debug_logs_enabled False
See also
- Type:
_LoggerMixin.debug_logs_enabled
- logger
Logger
Provides a logger instance specific to the class using this mixin.
The logger ID is constructed from the module and class name, and optionally includes an instance name if available.
Examples
>>> class MyClass(_LoggerMixin): ... _name = "example" ... >>> instance = MyClass() >>> logger = instance.logger >>> logger.name 'module_name.MyClass.example'
>>> class AnotherClass(_LoggerMixin): ... pass ... >>> another_instance = AnotherClass() >>> another_logger = another_instance.logger >>> another_logger.name 'module_name.AnotherClass'
Note
Replace module_name with the actual module name where the class is defined.
See also
- Type:
_LoggerMixin.logger
- name
str
- Type:
Semaphore.name
- class dank_mids.DankMiddleware
Bases:
Web3MiddlewareWeb3 v7 middleware that routes async requests through a cached Dank controller.
- __init__(w3)
- Parameters:
w3 (AsyncWeb3 | None)
- Return type:
None
- async async_request_processor(method, params)
- async async_response_processor(method, response)
- Parameters:
method (RPCEndpoint)
response (RPCResponse)
- Return type:
RPCResponse
- async async_wrap_make_batch_request(make_batch_request)
- Parameters:
make_batch_request (AsyncMakeBatchRequestFn)
- Return type:
AsyncMakeBatchRequestFn
- async async_wrap_make_request(make_request)
- response_processor(method, response)
- Parameters:
method (RPCEndpoint)
response (RPCResponse)
- Return type:
RPCResponse
- wrap_make_batch_request(make_batch_request)
- Parameters:
make_batch_request (MakeBatchRequestFn)
- Return type:
MakeBatchRequestFn
- wrap_make_request(make_request)
- Parameters:
make_request (MakeRequestFn)
- Return type:
MakeRequestFn
- class dank_mids.RetryEvent
Bases:
objectRepresents a single retry decision emitted by Dank Mids retry logic.
- error
The exception that triggered the retry decision.
- Type:
- __init__(operation, attempt, error, max_attempts=None, delay=None, component=None, will_retry=True, metadata=None, timestamp=<factory>)
- error: BaseException
- class dank_mids.RetryObserver
Bases:
ProtocolCallable observer interface for retry events.
Implementations should be fast and side-effect safe. If you need to do IO, buffer the event and ship it elsewhere.
- __call__(event)
Handle a retry event.
- Parameters:
event (RetryEvent)
- Return type:
None
- __init__(*args, **kwargs)
- class dank_mids.StatsRetryObserver
Bases:
objectRetry observer that records events into a stats collector.
When the provided collector lacks retry fields (for example, when the compiled stats module is imported), the observer falls back to an internal in-memory collector so retry handling does not raise AttributeError.
This observer intentionally does not auto-register itself. Wire it up explicitly via
dank_mids.retry_observer.register_retry_observer().- __call__(event)
Call self as a function.
- Parameters:
event (RetryEvent)
- Return type:
None
- __init__(collector=None)
- Parameters:
collector (_RetryCollector | None)
- Return type:
None
- dank_mids.emit_retry_event(event)
Emit a retry event to all currently registered observers.
Observers are called in the order they were registered. A snapshot is taken to avoid iteration issues if observers register/unregister themselves.
- Parameters:
event (RetryEvent)
- Return type:
None
- dank_mids.get_retry_observers()
Return a snapshot of currently registered retry observers.
- Return type:
tuple[RetryObserver, …]
- dank_mids.register_retry_observer(observer)
Register a retry observer.
Observers are stored as weak references. Keep a strong reference in your application if you do not want it to be garbage collected.
- Parameters:
observer (RetryObserver)
- Return type:
None
- dank_mids.setup_dank_w3(async_w3)
Sets up a DankWeb3 instance from a given Web3 instance.
- Parameters:
async_w3 (Web3) – The Web3 instance to be wrapped.
- Returns:
A new DankWeb3 instance with Dank Middleware injected.
- Return type:
DankWeb3
- dank_mids.setup_dank_w3_from_sync(sync_w3)
Sets up a DankWeb3 instance from a given synchronous Web3 instance.
- Parameters:
sync_w3 (Web3) – The synchronous Web3 instance to be wrapped.
- Returns:
A new DankWeb3 instance with Dank Middleware injected, supporting batched asynchronous operations.
- Return type:
DankWeb3
- dank_mids.unregister_retry_observer(observer)
Unregister a previously registered retry observer.
- Raises:
ValueError – If the observer is not currently registered.
- Parameters:
observer (RetryObserver)
- Return type:
None