"""Treasury orchestration and analytics interface.
This module defines the Treasury class, which aggregates DAO wallets, sets up
sorting rules, and manages transaction ingestion and streaming analytics.
It coordinates the end-to-end flow from wallet configuration to database
population and dashboard analytics.
Key Responsibilities:
- Aggregate and manage DAO-controlled wallets.
- Ingest and process on-chain transactions.
- Apply sorting/categorization rules.
- Integrate with streaming protocols (e.g., LlamaPay).
- Populate the database for analytics and dashboards.
This is the main entry point for orchestrating DAO treasury analytics.
"""
from asyncio import create_task, gather
from logging import getLogger
from pathlib import Path
from typing import Final, Iterable, List, Optional, Union
import a_sync
from a_sync.a_sync.abstract import ASyncABC
from eth_typing import BlockNumber
from eth_portfolio.structs import LedgerEntry
from eth_portfolio.typing import PortfolioBalances
from eth_portfolio_scripts._portfolio import ExportablePortfolio
from pony.orm import db_session
from tqdm.asyncio import tqdm_asyncio
from dao_treasury._wallet import TreasuryWallet
from dao_treasury.constants import CHAINID
from dao_treasury.db import TreasuryTx
from dao_treasury.sorting._rules import Rules
from dao_treasury.streams import llamapay
Wallet = Union[TreasuryWallet, str]
wallet_types = (TreasuryWallet, str)
logger = getLogger("dao_treasury")
TREASURY = None
[docs]
class Treasury(a_sync.ASyncGenericBase): # type: ignore [misc]
[docs]
def __init__(
self,
wallets: Iterable[Union[TreasuryWallet, str]],
sort_rules: Optional[Path] = None,
start_block: int = 0,
label: str = "your org's treasury",
asynchronous: bool = False,
) -> None:
"""Initialize the Treasury singleton for managing DAO funds.
This class aggregates multiple treasury wallets, sets up sorting rules,
and constructs an :class:`.ExportablePortfolio` for fetching balance and
transaction history.
Args:
wallets: Iterable of wallet
addresses or :class:`.TreasuryWallet` instances representing
DAO-controlled wallets.
sort_rules: Directory path containing YAML rule files
for sorting transactions. See :class:`dao_treasury.sorting._rules.Rules`.
start_block: Block number from which to start loading portfolio
history.
label: Descriptive label for the portfolio, used in exported data.
asynchronous: Whether methods default to asynchronous mode.
Raises:
RuntimeError: If a second Treasury instance is initialized.
TypeError: If any item in `wallets` is not a str or TreasuryWallet.
Examples:
.. code-block:: python
# Create a synchronous Treasury
treasury = Treasury(
wallets=["0xAbc123...", TreasuryWallet("0xDef456...", start_block=1000)],
sort_rules=Path("/path/to/rules"),
start_block=500,
label="DAO Treasury",
asynchronous=False
)
# Create an asynchronous Treasury
treasury_async = Treasury(
wallets=["0xAbc123..."],
asynchronous=True
)
"""
global TREASURY
if TREASURY is not None:
raise RuntimeError(
f"You can only initialize one {type(self).__name__} object"
)
ASyncABC.__init__(self)
self.wallets: Final[List[TreasuryWallet]] = []
"""The collection of wallets owned or controlled by the on-chain org"""
for wallet in wallets:
if isinstance(wallet, str):
self.wallets.append(TreasuryWallet(wallet)) # type: ignore [type-arg]
elif isinstance(wallet, TreasuryWallet):
self.wallets.append(wallet)
else:
raise TypeError(
f"`wallets` can only contain: {wallet_types} You passed {wallet}"
)
self.sort_rules: Final = Rules(sort_rules) if sort_rules else None
self.portfolio: Final = ExportablePortfolio(
addresses=(
wallet.address
for wallet in self.wallets
if wallet.networks is None or CHAINID in wallet.networks
),
start_block=start_block,
label=label,
load_prices=True,
asynchronous=asynchronous,
)
"""An eth_portfolio.Portfolio object used for exporting tx and balance history"""
self._llamapay: Final = (
llamapay.LlamaPayProcessor() if CHAINID in llamapay.networks else None
)
self.asynchronous: Final = asynchronous
"""A boolean flag indicating whether the API for this `Treasury` object is sync or async by default"""
TREASURY = self
[docs]
async def describe(self, block: int) -> PortfolioBalances:
return await self.portfolio.describe(block)
@property
def txs(self) -> a_sync.ASyncIterator[LedgerEntry]:
return self.portfolio.ledger.all_entries
async def _insert_txs(
self, start_block: BlockNumber, end_block: BlockNumber
) -> None:
"""Populate the database with treasury transactions in a block range.
Streams ledger entries from `start_block` up to (but not including)
`end_block`, skips zero-value transfers, and inserts each remaining entry
into the DB via :meth:`dao_treasury.db.TreasuryTx.insert`. Uses
:class:`tqdm.asyncio.tqdm_asyncio` to display progress.
Args:
start_block: First block number to include (inclusive).
end_block: Last block number to include (exclusive).
Examples:
>>> # Insert transactions from block 0 to 10000
>>> await treasury._insert_txs(0, 10000)
"""
with db_session:
futs = []
async for entry in self.portfolio.ledger[start_block:end_block]:
if not entry.value:
# TODO: add an arg in eth-port to skip 0 value
logger.debug("zero value transfer, skipping %s", entry)
continue
futs.append(create_task(TreasuryTx.insert(entry)))
if futs:
await tqdm_asyncio.gather(*futs, desc="Insert Txs to Postgres")
logger.info(f"{len(futs)} transfers exported")
async def _process_streams(self) -> None:
if self._llamapay is not None:
await self._llamapay.process_streams(run_forever=True)
[docs]
async def populate_db(
self, start_block: BlockNumber, end_block: BlockNumber
) -> None:
"""
Populate the database with treasury transactions and streams in parallel.
"""
tasks = [self._insert_txs(start_block, end_block)]
if self._llamapay:
tasks.append(self._process_streams())
await gather(*tasks)
logger.info("db connection closed")