08 -- Custom Store
ducto ships with two store implementations: PostgresStore (production-ready, persistent) and MemoryStore (development, ephemeral). But your application might use a different backend -- Redis for speed, DynamoDB for scalability, SQLite for embedded deployments. The CreditStore abstract base class (ABC) defines the contract that every store must fulfill. If you implement all of its abstract methods, your custom store unlocks the full ducto feature set: reservations, deductions, refunds, analytics, team pools, spend caps, and credit expiry.
Think of the CreditStore ABC as a standardized electrical outlet. The shape of the outlet (the abstract methods) is the same everywhere, but what happens behind the wall (the implementation) can be anything -- Postgres, Redis, DynamoDB, or a plain Python dictionary. As long as the outlet fits, any appliance (CreditManager, PricingEngine, event emitter) works with any store. This is the dependency inversion principle in action: high-level modules depend on abstractions, not concrete implementations.
The CreditStore ABC defines seven method groups. The balance and lifecycle group handles the core credit operations (get_balance, add_credits, reserve_credits, deduct_credits, refund_credits). Pricing methods connect the store to pricing formulas. Plan methods support subscription-style free allowances. Cap methods enforce spending limits. Analytics methods power dashboards and reports. The sweep method handles credit expiry. Team methods support shared credit pools for organizations.
Not every method group is required for every use case. If your application does not need teams, you can raise NotImplementedError for the team methods. If you do not use spend caps, you can return stubs. The contract specifies the interface, but the implementation decides what is supported. This flexibility lets you start with a minimal store and add functionality over time.
Our example store below implements every method group using in-memory Python dictionaries. Some methods are fully functional (balance and lifecycle), some return stubs (pricing, plans, analytics), and some raise NotImplementedError (teams). In production, you would implement all methods against your chosen backend -- but even a partial implementation demonstrates the contract clearly.
What we will do in this section: implement a complete MyCustomStore class that satisfies the CreditStore ABC, with explanatory comments for each method group.
Implement the ABC
Below is the complete MyCustomStore implementation. Each method group is separated by a section comment with a brief explanation of the group's purpose. Pay close attention to the balance and lifecycle group -- those are the only methods with real logic in this example. The other groups return stubs or raise errors, but their signatures match the ABC exactly, which means the class passes all type checks.
The import section brings in every result type from ducto.interface.models. These are Pydantic models that define the return shape for each method. You do not need to import types you do not use, but importing them all shows the full contract at a glance.
What we will do in this section: walk through the imports, the class definition, and each method group with explanatory comments.
# =============================================================================
# IMPORTS: All result types from the ducto model layer
# =============================================================================
# The CreditStore base class from ducto.interface.base defines the abstract
# interface that every store must implement. The types imported from
# ducto.interface.models are the return types for each method. Every method
# in the ABC has a specific return type; these models ensure type safety.
from ducto.interface.base import CreditStore
from ducto.interface.models import (
BalanceResult, AddCreditsResult, ReserveResult, DeductionResult,
RefundResult, TeamDeductionResult, CreateTeamResult, TeamBalanceResult,
TeamMember, AddTeamMemberResult, AllowanceResult, CapCheckResult,
PricingConfigResult, SetupResult,
)
# =============================================================================
# CLASS: MyCustomStore
# =============================================================================
# This class inherits from CreditStore and implements every abstract method.
# The ABC uses Python's abc.ABC and @abstractmethod decorators to enforce
# that all abstract methods are implemented. If you forget a method, Python
# will raise a TypeError when you try to instantiate the class.
class MyCustomStore(CreditStore):
# The docstring uses double quotes to avoid conflict with the
# outer delimiter in this source code.
'''Minimal custom store -- dict-backed, no persistence.'''
def __init__(self):
# _balances maps user_id to current balance (total credits available).
# This is the source of truth for all balance operations.
self._balances: dict[str, int] = {}
# _reservations maps reservation_id to reserved_amount.
# Reservations lock credits for specific operations to prevent
# race conditions in concurrent systems.
self._reservations: dict[str, int] = {}
# =========================================================================
# METHOD GROUP 1: Balance and lifecycle operations
# =========================================================================
# These five methods form the core credit contract. Every store must
# implement them. They are the minimum viable interface for a working
# credit system: check balance, add credits, reserve, deduct, refund.
def get_balance(self, user_id: str) -> BalanceResult:
# Return the user's current balance, or 0 if the user does not exist.
return BalanceResult(user_id=user_id, balance=self._balances.get(user_id, 0))
def add_credits(self, user_id: str, amount: int, type: str = "adjustment",
metadata=None, expires_at=None) -> AddCreditsResult:
# Add credits to the user's balance. The type field records the
# source of the credits (purchase, adjustment, refund, promotion).
# The optional expires_at parameter enables TTL-based expiry.
self._balances[user_id] = self._balances.get(user_id, 0) + amount
return AddCreditsResult(transaction_id="tx", user_id=user_id,
amount=amount, new_balance=self._balances[user_id])
def reserve_credits(self, user_id: str, amount: int, operation_type: str,
metadata=None, min_balance: int = 5) -> ReserveResult:
# Reservation locks credits for a specific operation. This prevents
# race conditions where two concurrent requests both see sufficient
# balance but neither can actually deduct. The min_balance parameter
# ensures a safety buffer (default 5 credits) is never consumed.
bal = self._balances.get(user_id, 0)
rid = "res_" + user_id[:8]
self._reservations[rid] = amount
return ReserveResult(reservation_id=rid, user_id=user_id,
amount=amount, balance=bal - amount)
def deduct_credits(self, user_id: str, reservation_id: str, amount: int,
idempotency_key=None, metadata=None) -> DeductionResult:
# Deduct reserved credits. The reservation was created by a prior
# call to reserve_credits, which already checked the balance. The
# idempotency_key prevents double-charging if the caller retries.
amt = self._reservations.pop(reservation_id, amount)
self._balances[user_id] -= amt
return DeductionResult(transaction_id="ded", user_id=user_id,
amount=-amt,
balance_after=self._balances[user_id])
def refund_credits(self, transaction_id: str, amount: int = None,
reason: str = None, metadata=None) -> RefundResult:
# Reverse a previous deduction. Refunds are essential for error
# recovery -- if a downstream service fails, return the credits.
# The reason field provides an audit trail for the refund.
return RefundResult(refund_transaction_id="ref", user_id="",
original_transaction_id=transaction_id,
amount=amount or 0, new_balance=0,
reason=reason or "")
# =========================================================================
# METHOD GROUP 2: Pricing configuration
# =========================================================================
# These methods connect the store to pricing formulas stored as strings.
# In production, get_active_pricing would load from a database table.
# Here we return None (no active pricing) because this store delegates
# pricing to the CreditManager or PricingEngine.
def get_active_pricing(self) -> PricingConfigResult | None:
return None
def set_active_pricing(self, config, label=None) -> str:
return "cfg_1"
def setup_pricing_config(self, config, name="default") -> PricingConfigResult:
raise NotImplementedError
# =========================================================================
# METHOD GROUP 3: Plan management
# =========================================================================
# Plans provide subscription-style free allowances. A plan defines a
# monthly credit allowance; users on the plan draw from that allowance
# before per-use billing kicks in. These methods manage plan assignment
# and allowance tracking. This store returns stubs because plans are
# typically managed by the pricing config, not the store itself.
def get_user_plan(self, user_id: str):
return None
def set_user_plan(self, user_id: str, plan_id: str):
pass
def check_allowance(self, user_id: str) -> AllowanceResult:
return AllowanceResult(plan_id="", allowance_remaining=0,
period_start="", period_end="")
def increment_usage_window(self, user_id: str, plan_id: str, amount: int):
pass
# =========================================================================
# METHOD GROUP 4: Spend caps
# =========================================================================
# Caps enforce upper limits on credit consumption over a time window
# (daily, weekly, monthly). The action field controls the behavior when
# the cap is exceeded: deny (reject the operation), warn (allow but log),
# or notify (allow and trigger an event). This store returns stubs.
def set_spend_cap(self, cap):
pass
def check_spend_cap(self, user_id: str, model=None, amount=None) -> CapCheckResult:
return CapCheckResult()
# =========================================================================
# METHOD GROUP 5: Analytics
# =========================================================================
# Analytics methods power dashboards and reporting. They answer questions
# like "who spent the most last month" or "what is the average daily
# spend." This store returns empty lists because real analytics require
# a persistent backend with query capabilities.
def spend_by_user(self, start, end) -> list:
return []
def spend_by_model(self, start, end) -> list:
return []
def daily_spend(self, start, end) -> list:
return []
def top_users(self, limit, start, end) -> list:
return []
def aggregate_stats(self, start, end):
from ducto.interface.models import AggregateStatsRow
return AggregateStatsRow()
# =========================================================================
# METHOD GROUP 6: Credit expiry sweep
# =========================================================================
# The sweep operation finds credits whose expires_at timestamp is in the
# past and deducts them from the user's balance. The dry_run parameter
# lets you preview what would expire without modifying balances.
# This store returns an empty SweepResult because it does not track
# expiry timestamps in its dict-based implementation.
def sweep_expired_credits(self, dry_run=False):
from ducto.interface.models import SweepResult
return SweepResult()
# =========================================================================
# METHOD GROUP 7: Team operations
# =========================================================================
# Teams share a pooled credit balance across multiple users. Individual
# team members can have per-member spend caps. Team operations are
# optional -- if your application does not need team credit pools,
# raise NotImplementedError as shown here. The important thing is that
# the method signatures match the ABC, so the class can still be
# instantiated and used for non-team operations.
def create_team(self, name: str, initial_balance=0) -> CreateTeamResult:
raise NotImplementedError("Teams not supported")
def get_team_balance(self, team_id: str) -> TeamBalanceResult:
raise NotImplementedError
def add_team_member(self, team_id, user_id, role="member", spend_cap=None):
raise NotImplementedError
def get_team_members(self, team_id: str):
raise NotImplementedError
def deduct_team(self, team_id, user_id, amount, metadata=None):
raise NotImplementedError
def setup(self):
return SetupResult()
# Instantiate our store. Python checks that all abstract methods are
# implemented at instantiation time. If the class were missing any
# abstract method, this line would raise TypeError.
custom_store = MyCustomStore()
print("MyCustomStore implements CreditStore ABC.")
Use with CreditManager
Implementing the CreditStore ABC is only half the work. The real power comes when you connect your custom store to ducto's CreditManager. The CreditManager wraps any CreditStore and adds higher-level features: automatic pricing engine integration, event emission, idempotency handling, and resource lifecycle management.
The beauty of the ABC pattern is that CreditManager accepts ANY CreditStore implementation. The manager does not care whether the store stores data in Postgres, Redis, DynamoDB, or a Python dictionary. It only knows that the object satisfies the CreditStore contract. When you upgrade from a prototype store to a production PostgresStore, your CreditManager code does not change.
In a production application, you would typically create a single CreditManager instance at startup and inject it into your request handlers. The manager handles thread safety, connection pooling (for database-backed stores), and resource cleanup. It also provides convenience methods that combine multiple store calls into atomic operations.
What we will do in this section: create a CreditManager backed by our custom store, add credits to a user, check the balance, and perform a reserve-and-deduct cycle.
# Import uuid to generate unique user IDs for testing.
import uuid
# The CreditManager wraps any CreditStore with higher-level logic.
# It accepts our custom store because MyCustomStore extends CreditStore.
from ducto.manager import CreditManager
# Create a CreditManager using our custom store as the backend.
# The manager will delegate all storage operations to our store
# while adding pricing engine integration and event emission.
manager = CreditManager(custom_store)
# Generate a unique user ID for our test.
user = str(uuid.uuid4())
# Add 10 000 credits to the user's balance, simulating a grant.
manager.add_credits(user, 10_000)
# Check the balance to confirm the credits were added successfully.
# The get_balance method delegates to our store's get_balance.
print(f" After adding 10 000 credits, balance = {manager.get_balance(user).balance}")
# Reserve 1 000 credits for a specific operation (e.g., model inference).
# Reservation checks that sufficient balance exists and locks the credits
# so a concurrent request cannot consume them before the deduction.
res = manager.reserve_credits(user, 1_000, operation_type="test")
print(f" Reserved {res.amount} credit(s), remaining available balance = {res.balance}")