07 – Events
Credit operations are useful on their own, but often you need to react to them — send a Slack alert when a user's balance runs low, update an analytics dashboard on each deduction, or trigger an auto top-up. ducto's event system follows the observer pattern: you emit events when operations happen, and registered handlers react asynchronously.
The observer pattern is a software design pattern where an object (the subject) maintains a list of dependents (observers) and notifies them automatically of state changes. In ducto, every credit operation — adding credits, deducting, refunding, hitting a cap — generates a typed event that can be observed by any number of handlers. This decouples the core credit logic from the integrations that react to it.
Without events, you would need to add notification logic directly inside every credit operation call site: after add_credits, check the balance and send a Slack message; after deduct_credits, update the dashboard. This approach is fragile, tightly coupled, and hard to maintain as your integration surface grows. Events solve this by letting you register handlers once, after which all relevant operations automatically trigger them.
ducto defines a standard set of event types: credits.added, credits.deducted, credits.refunded, credits.low_balance, credits.cap_reached, and credits.cap_warning. Each event carries structured data — the user ID, the amount, the new balance, a timestamp, and any operation-specific fields — so your handlers have full context without needing to query the store again.
What we will do in this section: create a CreditEventEmitter, register handler functions for multiple event types, wire the emitter to a CreditManager, trigger real operations that produce events, inspect the captured event data, and demonstrate type-specific subscriptions that only react to refund events.
from datetime import datetime, timedelta
from ducto.interface.postgres import PostgresStore
from ducto.manager import CreditManager
from ducto.engine import PricingEngine
from ducto.metrics import UsageMetrics, ToolCall
from ducto.interface.models import (
PricingConfigData, PlanDefinition,
CreditMetadata,
)
from shared import start_postgres_store, cleanup
store, pgdata = start_postgres_store()
import uuid
from ducto.events import CreditEvent, CreditEventEmitter
print("✔ PostgresStore ready.")
Create emitter and register handlers
The CreditEventEmitter is the central hub of ducto's event system. You create one instance, register handler functions for the event types you care about, and then pass the emitter to a CreditManager. From that point on, every relevant credit operation on that manager automatically dispatches events to the registered handlers.
Handler functions have a simple signature: they receive a CreditEvent object and return nothing. In our example, the logger function appends each event to a captured list for later inspection and prints a summary. This pattern — collect events in a list for assertions or post-hoc analysis — is especially useful in testing and monitoring scenarios.
Note that you can register the same handler for multiple event types (as we do with logger for credits.added, credits.deducted, and credits.low_balance) or different handlers for different types. The emitter dispatches each event to all handlers registered for that type, in registration order.
What we will do in this section: instantiate a CreditEventEmitter, define a logging handler that captures events into a list, and register it for three different event types.
# Create a list to collect all emitted events for later inspection.
# In a real application, this would be replaced by a handler that
# sends data to an external system (Slack, analytics, etc.).
captured: list[CreditEvent] = []
# Define a handler function that receives a CreditEvent object.
# This handler appends each event to our capture list and prints
# a one-line summary with the event type, truncated user ID, and
# any additional payload data.
def logger(ev: CreditEvent) -> None:
captured.append(ev)
print(f" EVENT [{ev.type}] user={ev.user_id[:8]}… data={ev.data}")
# Create the event emitter — the central hub that dispatches
# events to all registered handlers.
emitter = CreditEventEmitter()
# Register our logger handler for three different event types.
# When a credit operation triggers any of these types, the
# emitter calls logger() with the corresponding CreditEvent.
emitter.on("credits.added", logger)
emitter.on("credits.deducted", logger)
emitter.on("credits.low_balance", logger)
print("Handlers registered for credits.added, credits.deducted, credits.low_balance")
Wire to CreditManager and trigger events
A CreditEventEmitter on its own does nothing — it needs to be connected to credit operations. This is done by passing the emitter to the CreditManager constructor. When the manager calls add_credits, deduct, and other operations internally, it emits events through the wired emitter before returning the result.
In the example below, we create two managers: one with just the emitter (for the first add_credits call) and another with both an emitter and a PricingEngine (for the full deduct flow). Both emit events that get collected by our logger handler. The pricing engine is included so that manager.deduct() can calculate the credit cost from usage metrics before applying the deduction and emitting the credits.deducted event.
Each call to manager.add_credits or manager.deduct produces one or more events. The handler prints them in real time, and the events are also saved in the captured list for later inspection. At the end, we confirm how many events were captured total.
What we will do in this section: create a CreditManager wired to the emitter, add credits to a user (triggering credits.added), set up a pricing engine and deduct credits (triggering credits.deducted and potentially credits.low_balance), and count the total events captured.
# Create a CreditManager with just the emitter (no pricing engine).
# Operations through this manager will emit events to our logger.
manager = CreditManager(store, emitter=emitter)
user = str(uuid.uuid4())
# Add 500 credits to the user. This triggers a credits.added event
# that our logger handler will capture and print.
print("--- add_credits (triggers credits.added event) ---")
manager.add_credits(user, 500)
# Create a second manager that includes both a pricing engine and
# the emitter. The pricing engine calculates how many credits a
# usage metric costs, then the manager deducts that amount and
# emits a credits.deducted event.
print("\n--- deduct end-to-end (triggers credits.deducted) ---")
engine = PricingEngine.from_dict({
"models": {"_default": "input_tokens * 1"},
})
manager = CreditManager(store, engine=engine, emitter=emitter)
manager.add_credits(user, 2_000)
ded = manager.deduct(user, UsageMetrics(model="_default", input_tokens=100))
print(f" Deduct result: amount={ded.amount} credits deducted, balance_after={ded.balance_after}")
# Check how many events were captured across all operations.
# Each add_credits and deduct call should have produced at least
# one event that our handler recorded.
print(f"\nTotal events captured across all operations: {len(captured)}")
Inspect captured events
The captured list now contains every event that was emitted during the operations above. Each event is a CreditEvent object with structured fields: type (the event type string), user_id, amount, balance_after, timestamp, and an optional data dictionary with operation-specific details.
Inspecting captured events is valuable for debugging, audit logging, and testing. You can verify that the correct sequence of events was emitted, check that critical thresholds (like low balance warnings) were triggered at the right moment, and ensure that all event data contains the expected values. This pattern is also the foundation for building integration tests that assert on event-driven behavior.
In a production system, you would replace the in-memory captured list with a real handler — for example, one that sends a webhook to your analytics platform, posts a message to a Slack channel, or writes to an audit log table.
What we will do in this section: iterate over the captured events, print their timestamps and types, and display any additional data they carry.
# Iterate through every event that was captured during the
# operations above. Each CreditEvent object contains structured
# data fields that we can inspect programmatically.
for ev in captured:
# Print the timestamp and event type for each event. The
# timestamp is recorded at the moment the event is emitted,
# giving us a precise timeline of credit operations.
print(f" [{ev.timestamp.strftime('%H:%M:%S')}] {ev.type}")
# If the event carries additional payload data (such as
# transaction_id, new balance, or operation metadata), print
# each key-value pair on its own line.
if ev.data:
for k, v in ev.data.items():
print(f" {k}={v}")
Subscribe by specific type
Registering a handler for "credits.refunded" means it only fires when a refund operation occurs. This is useful for handlers that should react to specific credit lifecycle events without being invoked on every operation. For example, a refund handler might update an accounting ledger or notify a support agent, while a deduction handler might track usage for billing.
In this section, we register a separate handler that only listens for refund events and appends them to its own list. Then we perform a refund through the store directly (the CreditManager does not expose a refund method in this example) and verify that the refund-specific handler was triggered.
Note that the refund event carries information about the original deduction transaction, the refund amount, and the reason for the refund. This data is accessible through the event's data dictionary and is critical for audit trails and accounting reconciliation.
What we will do in this section: register a dedicated refund handler on the emitter, perform a reserve-deduct-refund cycle through the store, and verify that the refund handler captured the expected events.
# Create a dedicated list to capture only refund events, and
# register a handler that subscribes exclusively to the
# "credits.refunded" event type. This demonstrates type-specific
# subscriptions: this handler will only fire when a refund occurs.
refunds: list[CreditEvent] = []
emitter.on("credits.refunded", lambda e: refunds.append(e))
# Perform a reserve-deduct-refund cycle through the store directly
# (CreditManager does not expose a refund method in this example).
# Step 1: Reserve 100 credits to hold them for a pending operation.
ded_tx = store.reserve_credits(user, 100, operation_type="test")
# Step 2: Deduct the reserved credits, completing the operation.
ded = store.deduct_credits(user, ded_tx.reservation_id, 100)
# Step 3: Refund the full 100 credits. This triggers a
# credits.refunded event, which our dedicated refund handler
# captures. The reason "demo" is attached to the event data for
# audit trail purposes.
store.refund_credits(ded.transaction_id, amount=100, reason="demo")
print(f"Refund events captured by dedicated handler: {len(refunds)}")
cleanup(pgdata)