Skip to main content

ducto.manager module

High-level credit manager.

Orchestrates the full credit lifecycle: : calculate -> reserve -> deduct

Example:

from ducto import CreditManager, UsageMetrics
from ducto.interface.supabase import HttpxSupabaseStore

store = HttpxSupabaseStore(url=supabase_url, key=service_role_key)
manager = CreditManager(store=store)

# One-time setup (creates tables + RPCs)
manager.setup()

# Load pricing from store (credit_pricing_config table)
manager.load_pricing_from_store()

# Deduct credits for a usage event
result = manager.deduct(
user_id="user_abc",
metrics=UsageMetrics(model="claude-opus-4", input_tokens=500, output_tokens=200),
idempotency_key="chat_42_turn_7",
)
print(f"Deducted {result.amount} credits, balance: {result.balance_after}")

class ducto.manager.CreditManager(store: CreditStore, engine: PricingEngine | None = None, emitter: CreditEventEmitter | None = None)

Bases: object

Orchestrates credit operations: pricing -> reserve -> deduct.

Args: : store: A CreditStore adapter (HttpxSupabaseStore, PostgresStore, etc.). engine: An optional pre-configured PricingEngine. If omitted,

call load_pricing_from_store() or publish_pricing_from_dict() before deduct().


emitter: An optional CreditEventEmitter for lifecycle events.

add_credits(user_id: str, amount: int, tx_type: str = 'adjustment', metadata: CreditMetadata | None = None, expires_at: datetime | None = None) → AddCreditsResult

Add credits to a user’s account.

aggregate_stats(start: datetime, end: datetime) → AggregateStatsRow

Aggregate statistics across all users in a time window.

check_feature(user_id: str, feature: str) → CheckFeatureResult

Check whether a user’s plan has a specific feature entitlement.

Convenience wrapper around the store’s check_feature() – inspect the features dict on a user’s plan to gate functionality.

Feature values follow a truthy convention:

  • False / None / absent => has_feature=False
  • True / numeric / string => has_feature=True

daily_spend(start: datetime, end: datetime) → list[DailySpendRow]

Daily spend aggregation in a time window.

deduct(user_id: str, metrics: UsageMetrics, idempotency_key: str | None = None, metadata: CreditMetadata | None = None) → DeductionResult

Full deduction flow: calculate -> reserve -> deduct.

Args: : user_id: The user to charge. metrics: Usage metrics (model, tokens, tool calls, etc.). idempotency_key: Optional unique key for idempotent dedup. metadata: Extra metadata to attach to the transaction.

Returns: : DeductionResult with transaction details.

Raises: : PricingNotLoadedError: If pricing hasn’t been loaded. InsufficientCreditsError: If the user lacks sufficient balance

(including the min_balance floor).

deduct_fixed(user_id: str, job_name: str, idempotency_key: str | None = None, metadata: CreditMetadata | None = None) → DeductionResult

Shortcut for fixed-cost batch jobs (roadmap gen, topic gen, etc.).

deduct_team(team_id: str, user_id: str, metrics: UsageMetrics, idempotency_key: str | None = None, metadata: CreditMetadata | None = None) → TeamDeductionResult

Deduct from a team’s shared balance pool.

Calculates cost via the pricing engine, then debits the team pool.

Args: : team_id: The team’s UUID. user_id: The user to attribute the deduction to. metrics: Usage metrics (model, tokens, etc.). idempotency_key: Optional idempotency key. metadata: Extra metadata.

Returns: : TeamDeductionResult with transaction details.

property engine : PricingEngine | None

The current PricingEngine, or None if not loaded.

get_balance(user_id: str) → BalanceResult

Get a user’s current credit balance.

get_user_plan(user_id: str) → GetUserPlanResult

Fetch user’s current plan (including feature entitlements).

load_pricing_from_store() → None

Load the active pricing config from the store.

publish_pricing(config: PricingConfigData, label: str | None = None) → None

Publish new pricing and update the engine in one call.

publish_pricing_from_dict(data: PricingConfigData | dict[str, Any]) → None

Load pricing from a PricingConfigData or raw dict and sync it.

refund_credits(transaction_id: str, amount: int | None = None, reason: str | None = None, metadata: CreditMetadata | None = None) → RefundResult

Refund a previous credit deduction.

Args: : transaction_id: The transaction to refund. amount: Optional partial refund amount. Full refund if omitted. reason: Optional reason for the refund. metadata: Extra metadata to attach to the refund transaction.

Returns: : RefundResult with the refund transaction details.

reserve_credits(user_id: str, amount: int, operation_type: str = 'usage', metadata: CreditMetadata | None = None, min_balance: int | None = None) → ReserveResult

Reserve credits for an upcoming operation.

If min_balance is not specified, the engine’s configured minimum is used (defaults to 5 if no engine is loaded).

setup() → SetupResult

Run bundled SQL migrations through the store.

spend_by_model(start: datetime, end: datetime) → list[SpendByModelRow]

Aggregate spend by model in a time window.

spend_by_user(start: datetime, end: datetime) → list[SpendByUserRow]

Aggregate spend by user in a time window.

sweep_expired_credits(dry_run: bool = False) → SweepResult

Sweep expired credits from all users’ balances.

Args: : dry_run: If True, report without modifying.

Returns: : SweepResult with expired count and amount.

top_users(limit: int, start: datetime, end: datetime) → list[TopUserRow]

Top users by spend in a time window.

exception ducto.manager.InsufficientCreditsError

Bases: Exception

Raised when a user does not have enough credits for an operation.

exception ducto.manager.PricingNotLoadedError

Bases: Exception

Raised when deduct() is called before pricing is loaded.