ducto.manager module
High-level credit manager.
Orchestrates the full credit lifecycle: : calculate -> reserve -> deduct
Example:
from ducto import CreditManager, UsageMetrics
from ducto.interface.supabase import HttpxSupabaseStore
store = HttpxSupabaseStore(url=supabase_url, key=service_role_key)
manager = CreditManager(store=store)
# One-time setup (creates tables + RPCs)
manager.setup()
# Load pricing from store (credit_pricing_config table)
manager.load_pricing_from_store()
# Deduct credits for a usage event
result = manager.deduct(
user_id="user_abc",
metrics=UsageMetrics(model="claude-opus-4", input_tokens=500, output_tokens=200),
idempotency_key="chat_42_turn_7",
)
print(f"Deducted {result.amount} credits, balance: {result.balance_after}")
class ducto.manager.CreditManager(store: CreditStore, engine: PricingEngine | None = None, emitter: CreditEventEmitter | None = None)
Bases: object
Orchestrates credit operations: pricing -> reserve -> deduct.
Args:
: store: A CreditStore adapter (HttpxSupabaseStore, PostgresStore, etc.).
engine: An optional pre-configured PricingEngine. If omitted,
call
load_pricing_from_store()orpublish_pricing_from_dict()beforededuct().
emitter: An optional CreditEventEmitter for lifecycle events.
add_credits(user_id: str, amount: int, tx_type: str = 'adjustment', metadata: CreditMetadata | None = None, expires_at: datetime | None = None) → AddCreditsResult
Add credits to a user’s account.
aggregate_stats(start: datetime, end: datetime) → AggregateStatsRow
Aggregate statistics across all users in a time window.
check_feature(user_id: str, feature: str) → CheckFeatureResult
Check whether a user’s plan has a specific feature entitlement.
Convenience wrapper around the store’s check_feature() – inspect the features dict on a user’s plan to gate functionality.
Feature values follow a truthy convention:
- False / None / absent => has_feature=False
- True / numeric / string => has_feature=True
daily_spend(start: datetime, end: datetime) → list[DailySpendRow]
Daily spend aggregation in a time window.
deduct(user_id: str, metrics: UsageMetrics, idempotency_key: str | None = None, metadata: CreditMetadata | None = None) → DeductionResult
Full deduction flow: calculate -> reserve -> deduct.
Args: : user_id: The user to charge. metrics: Usage metrics (model, tokens, tool calls, etc.). idempotency_key: Optional unique key for idempotent dedup. metadata: Extra metadata to attach to the transaction.
Returns:
: DeductionResult with transaction details.
Raises:
: PricingNotLoadedError: If pricing hasn’t been loaded.
InsufficientCreditsError: If the user lacks sufficient balance
(including the min_balance floor).
deduct_fixed(user_id: str, job_name: str, idempotency_key: str | None = None, metadata: CreditMetadata | None = None) → DeductionResult
Shortcut for fixed-cost batch jobs (roadmap gen, topic gen, etc.).
deduct_team(team_id: str, user_id: str, metrics: UsageMetrics, idempotency_key: str | None = None, metadata: CreditMetadata | None = None) → TeamDeductionResult
Deduct from a team’s shared balance pool.
Calculates cost via the pricing engine, then debits the team pool.
Args: : team_id: The team’s UUID. user_id: The user to attribute the deduction to. metrics: Usage metrics (model, tokens, etc.). idempotency_key: Optional idempotency key. metadata: Extra metadata.
Returns:
: TeamDeductionResult with transaction details.
property engine : PricingEngine | None
The current PricingEngine, or None if not loaded.
get_balance(user_id: str) → BalanceResult
Get a user’s current credit balance.
get_user_plan(user_id: str) → GetUserPlanResult
Fetch user’s current plan (including feature entitlements).
load_pricing_from_store() → None
Load the active pricing config from the store.
publish_pricing(config: PricingConfigData, label: str | None = None) → None
Publish new pricing and update the engine in one call.
publish_pricing_from_dict(data: PricingConfigData | dict[str, Any]) → None
Load pricing from a PricingConfigData or raw dict and sync it.
refund_credits(transaction_id: str, amount: int | None = None, reason: str | None = None, metadata: CreditMetadata | None = None) → RefundResult
Refund a previous credit deduction.
Args: : transaction_id: The transaction to refund. amount: Optional partial refund amount. Full refund if omitted. reason: Optional reason for the refund. metadata: Extra metadata to attach to the refund transaction.
Returns:
: RefundResult with the refund transaction details.
reserve_credits(user_id: str, amount: int, operation_type: str = 'usage', metadata: CreditMetadata | None = None, min_balance: int | None = None) → ReserveResult
Reserve credits for an upcoming operation.
If min_balance is not specified, the engine’s configured minimum
is used (defaults to 5 if no engine is loaded).
setup() → SetupResult
Run bundled SQL migrations through the store.
spend_by_model(start: datetime, end: datetime) → list[SpendByModelRow]
Aggregate spend by model in a time window.
spend_by_user(start: datetime, end: datetime) → list[SpendByUserRow]
Aggregate spend by user in a time window.
sweep_expired_credits(dry_run: bool = False) → SweepResult
Sweep expired credits from all users’ balances.
Args: : dry_run: If True, report without modifying.
Returns:
: SweepResult with expired count and amount.
top_users(limit: int, start: datetime, end: datetime) → list[TopUserRow]
Top users by spend in a time window.
exception ducto.manager.InsufficientCreditsError
Bases: Exception
Raised when a user does not have enough credits for an operation.
exception ducto.manager.PricingNotLoadedError
Bases: Exception
Raised when deduct() is called before pricing is loaded.