Chapter 9 — Advanced Topics¶
This chapter covers the features you reach for after the basics are working: middleware for cross-cutting concerns, observability hooks, health checks, ontology evolution, the CLI generator system, custom store adapters, and the Rust kernel.
Middleware (@before, @after, @on_error, @around)¶
Middleware wraps capability invocations without touching handler bodies. Four decorators are available:
@before -- run before the handler¶
import time
from trails import before
@before("notes.*")
def inject_timestamp(ctx, args):
"""Merge a creation timestamp into args before the handler sees them."""
return {"created_at": time.time()} # merged into args
Return None to pass args through unchanged. Return a dict to merge
into args. If you raise, the invocation aborts -- the handler never
runs.
@after -- run after a successful handler¶
from trails import after
@after("notes.*")
def enrich_result(ctx, args, result):
"""Add metadata to every notes response."""
if isinstance(result, dict):
result["_enriched"] = True
return result
# returning None leaves result unchanged
Does not run when the handler raises. Multiple @after hooks chain:
each one's return feeds into the next.
@on_error -- run when the handler raises¶
from trails import on_error
from trails._core import TrailsError
@on_error("*")
def wrap_errors(ctx, args, exc):
"""Convert ValueErrors to TrailsError for cleaner API responses."""
if isinstance(exc, ValueError):
return TrailsError(f"validation failed: {exc}")
# return None → re-raise original exception
If an @on_error itself raises, the original exception propagates.
Middleware never masks the root cause.
@around -- wrap the entire invocation¶
import time
from trails import around
@around("*")
def timing_wrapper(ctx, args, next):
"""Measure wall-clock duration of every capability."""
t0 = time.monotonic()
try:
result = next() # run the handler + inner middleware
return result # must return the result
finally:
dt = (time.monotonic() - t0) * 1000
print(f"Took {dt:.1f}ms")
next() runs the inner middleware stack plus the handler. Call it
exactly once. Multiple @around form a LIFO stack (last registered =
outermost), matching Python decorator composition.
Pattern matching¶
All four decorators accept glob patterns:
| Pattern | Matches |
|---|---|
"notes.create" |
Exact match only |
"notes.*" |
notes.create, notes.delete, notes.list |
"*" |
Every capability (catch-all) |
Patterns use fnmatch.fnmatchcase semantics. They activate lazily --
registering @before("notes.*") before any notes.foo capability
exists is fine.
Execution order¶
@around (outermost = last registered)
@around (next)
@before hooks (FIFO registration order)
handler(ctx, **args)
@after hooks (FIFO, on success)
@on_error hooks (FIFO, on failure)
Common pattern: audit logger¶
import time
from trails import before, after, on_error
from trails.decorators import _current_capability_id
_T0: dict[str, float] = {}
@before("*")
def _audit_start(ctx, args):
_T0[ctx.trace_id] = time.monotonic()
@after("*")
def _audit_success(ctx, args, result):
dt_ms = (time.monotonic() - _T0.pop(ctx.trace_id, 0)) * 1000
ctx.kg.node(labels=["AuditEntry"], properties={
"capability": _current_capability_id() or "unknown",
"principal": ctx.principal,
"outcome": "success",
"duration_ms": f"{dt_ms:.3f}",
})
@on_error("*")
def _audit_failure(ctx, args, exc):
dt_ms = (time.monotonic() - _T0.pop(ctx.trace_id, 0)) * 1000
ctx.kg.node(labels=["AuditEntry"], properties={
"capability": _current_capability_id() or "unknown",
"outcome": "failed",
"error": type(exc).__name__,
})
Note the side-dict pattern: Context uses __slots__, so per-
invocation state goes in a module-level dict keyed by ctx.trace_id.
Common pattern: retry with @around¶
import time
from trails import around
@around("external.*")
def retry_transient(ctx, args, next):
"""Retry transient network failures up to 3 times."""
last_exc = None
for attempt in range(3):
try:
return next()
except ConnectionError as exc:
last_exc = exc
time.sleep(0.1 * (2 ** attempt))
raise last_exc
Common pattern: argument validation¶
from trails import before
from trails._core import TrailsError
@before("notes.*")
def validate_title(ctx, args):
title = args.get("title", "")
if not title.strip():
raise TrailsError("validation: title must not be blank")
return {"title": title.strip()}
For the full middleware reference, see the Middleware Guide.
Custom observability hooks¶
The observability system fires events on every capability invocation, LLM call, and KG read/write. Register an observer to wire any telemetry sink.
Register a basic observer¶
from trails.observability import register_observer
def my_logger(kind, event):
"""Log every event to stdout."""
print(f"[{kind}] {event}")
register_observer(my_logger)
Observers are fire-and-forget: exceptions are caught and logged, never re-raised. They run synchronously on the invoke thread, so keep them microseconds-cheap. If your sink is remote, queue events and flush on a background thread.
Event kinds¶
| Kind | Fires when | Key fields |
|---|---|---|
capability_started |
Before handler runs | capability_id, trace_id, principal |
capability_completed |
After success | capability_id, duration_ms, outcome |
capability_failed |
Handler raises or policy denies | error_kind, message |
llm_call |
After LLMClient.complete() |
model, tokens, cost_usd |
kg_write |
After ctx.kg.add/save/update |
op, model, dirty_fields |
kg_query |
After ctx.kg.query/match |
row_count, sparql_kind |
OpenTelemetry bridge¶
from opentelemetry import trace
from trails.observability import register_observer
otel = trace.get_tracer("my-app")
spans: dict[str, object] = {}
def to_otel(kind, event):
if kind == "capability_started":
spans[event["trace_id"]] = otel.start_span(event["capability_id"])
elif kind in ("capability_completed", "capability_failed"):
s = spans.pop(event["trace_id"], None)
if s is not None:
s.end()
register_observer(to_otel)
In-memory tracer (no OTel dependency)¶
from trails.observability import tracer
with tracer.span("my-operation", attributes={"version": 2}) as span:
# nested tracer.span() calls inherit span.trace_id
pass
# Query spans later
for s in tracer.list_spans():
print(s.name, s.end_time - s.start_time)
Capture events in tests¶
from trails.testing import capture_events
from trails.runtime import invoke
with capture_events() as events:
invoke("note.summarize", {"text": "..."})
kinds = [k for k, _ in events]
assert "capability_started" in kinds
assert "capability_completed" in kinds
For the full observability reference, see the Observability Guide.
trails doctor and health checks¶
trails doctor runs 13+ health checks against your project and reports
pass/warn/fail for each:
Output:
[pass] trails.toml found
[pass] Python >= 3.11
[pass] Virtual environment active
[pass] Port 8080 available
[pass] Project layout valid (app.py or handlers/ found)
[pass] Data directory writable
[pass] FFI version matches Python package
[pass] 3 capabilities registered, 0 collisions
[pass] 2 shapes registered, 0 export errors
[pass] Graph store operational
[warn] No SHACL export configured
[pass] MCP server responds
[warn] Field 'nickname' has 0 queries across 200 writes — consider removal
Machine-readable output¶
Returns a JSON array of check results for CI integration:
[
{"name": "toml", "status": "pass", "message": "trails.toml found"},
{"name": "python_version", "status": "pass", "message": "Python 3.12.1"}
]
Check categories¶
| Category | Checks |
|---|---|
| Environment | Python version, virtualenv, port availability |
| Project | trails.toml, project layout, data directory |
| Framework | FFI version, capabilities, shapes, store |
| Subsystems | Federation config, RML availability, auto-ontology suggestions |
| Code quality | Raw SPARQL in capability bodies (linter) |
Python API¶
from pathlib import Path
from trails.doctor import run_checks
results = run_checks(Path("."))
for check in results:
print(f"[{check.status}] {check.name}: {check.message}")
Ontology evolution (trails onto evolve)¶
The auto-ontology workflow (Chapter 8) produces initial types. Over time, your schema evolves. Trails supports this through:
Progressive layering¶
Each step is additive. You never break the previous level:
Schema migration from suggestions¶
When trails onto refine produces suggestions, generate migration
code:
from trails.onto_refine import UsageCollector, SchemaAnalyzer, generate_migration_code
collector = UsageCollector()
collector.start()
# ... app runs ...
analyzer = SchemaAnalyzer(collector)
report = analyzer.analyze(min_samples=50)
# Generate reviewable migration snippets
code = generate_migration_code(report.suggestions)
print(code)
Output:
"""Migration code generated by `trails onto refine --apply`.
Review each change before applying.
"""
# --- Suggestion 1: make_optional ---
# Node type: Employee
# Field: nickname
# Reason: Field 'nickname' is null in 95% of 200 writes.
# Confidence: 0.95
# In your @node_type/Employee definition:
# Change: "nickname": str
# To: "nickname": str | None
Re-inference on evolved data¶
When the data changes shape (new sources, schema migrations, new fields), re-run inference and diff:
# Current schema
trails onto infer -o models/current_inferred.py
# Compare with your hand-tuned models
diff models/employee.py models/current_inferred.py
New fields or types that appear in the inference but not in your models are candidates for schema expansion.
The CLI generator system¶
Trails provides Rails-style generators for common scaffolds:
Generate a capability¶
Creates a capability stub with correct imports and decorator.
Generate a shape¶
Creates a SHACL shape stub matching your @node_type declaration.
Generate a resource¶
Creates an MCP resource stub with correct URI template.
Create a new project¶
Scaffolds a project directory with trails.toml, app.py, a
conftest.py for testing, and a .gitignore.
Extending the registry¶
The capability registry is a module-level dict. You can inspect it programmatically:
from trails.decorators import _handlers
# List all registered capabilities
for cap_id, descriptor in _handlers.items():
print(f"{cap_id}: {descriptor.description}")
Dynamic registration¶
Register capabilities from data (e.g., a plugin system):
from trails import capability
def register_from_config(config: dict):
"""Dynamically register capabilities from a config dict."""
for name, handler_path in config["capabilities"].items():
module_name, fn_name = handler_path.rsplit(".", 1)
import importlib
mod = importlib.import_module(module_name)
fn = getattr(mod, fn_name)
capability(id=name)(fn)
Registry isolation in tests¶
Always use isolated_kernel() to prevent test pollution:
from trails.testing import isolated_kernel
def test_my_capability():
with isolated_kernel():
@capability
def my_cap(ctx) -> dict:
return {"ok": True}
# This registration is scoped to this block
Writing custom graph store adapters¶
The kernel store is an Oxigraph instance, but the Python surface communicates through a protocol. To add a new store backend:
The store protocol¶
A store adapter must implement these operations:
class MyStore:
def query(self, sparql: str) -> list[dict]:
"""Execute a SPARQL SELECT and return bindings."""
...
def update(self, sparql: str) -> None:
"""Execute a SPARQL INSERT/DELETE."""
...
def add_triple(self, subject: str, predicate: str, object: str) -> None:
"""Add a single triple."""
...
def size(self) -> int:
"""Return the number of triples."""
...
Example: wrap a remote SPARQL endpoint¶
import requests
class RemoteSparqlStore:
"""Query a remote SPARQL endpoint (read-only)."""
def __init__(self, endpoint: str):
self._endpoint = endpoint
def query(self, sparql: str) -> list[dict]:
resp = requests.post(
self._endpoint,
data={"query": sparql},
headers={"Accept": "application/sparql-results+json"},
)
resp.raise_for_status()
data = resp.json()
return [
{var: binding[var]["value"]
for var in data["head"]["vars"]
if var in binding}
for binding in data["results"]["bindings"]
]
def update(self, sparql: str) -> None:
raise NotImplementedError("Remote store is read-only")
def size(self) -> int:
result = self.query("SELECT (COUNT(*) AS ?c) WHERE { ?s ?p ?o }")
return int(result[0]["c"])
The Rust kernel (when and how to touch it)¶
Trails has a Rust kernel (7 active crates + 5 archived) that handles the core store, provenance, and FFI boundary. Most app developers never need to touch it.
When to touch the kernel¶
- Never for normal application development. The Python surface covers everything.
- Performance-critical store operations that hit Oxigraph limits.
- Adding a new primitive to the framework (new event types, new store operations).
- Fixing FFI bugs between the Rust and Python boundaries.
Architecture overview¶
python/src/trails/ ← Python surface (what you use)
_core.py ← FFI bindings via PyO3
orm.py ← ActiveGraph ORM
...
kernel/ ← Rust crates
trails-core/ ← Store, provenance, core types
trails-ffi/ ← PyO3 FFI boundary
...
Building locally¶
The FFI version check¶
trails doctor verifies that the installed Python package matches the
compiled FFI extension. A mismatch produces:
Fix by rebuilding the extension:
Performance tuning and benchmarking¶
Measure capability latency¶
Use the @around timing middleware from earlier in this chapter:
@around("*")
def timing(ctx, args, next):
t0 = time.monotonic()
result = next()
dt = (time.monotonic() - t0) * 1000
print(f"[timing] {_current_capability_id()}: {dt:.1f}ms")
return result
Monitor with the metrics singleton¶
from trails.observability import metrics
# After running your app:
summary = metrics.get_summary()
print(f"Total invocations: {summary['total_invocations']}")
print(f"Total errors: {summary['total_errors']}")
for cap_id, stats in summary.get("capabilities", {}).items():
print(f" {cap_id}: {stats['count']} calls, "
f"p50={stats['p50_ms']:.1f}ms, p99={stats['p99_ms']:.1f}ms")
Store query performance¶
For SPARQL query tuning, check row counts via the observer:
from trails.observability import register_observer
def slow_query_detector(kind, event):
if kind == "kg_query" and event.get("duration_ms", 0) > 100:
print(f"SLOW QUERY ({event['duration_ms']:.0f}ms): "
f"{event.get('sparql_kind', 'unknown')}, "
f"{event.get('row_count', '?')} rows")
register_observer(slow_query_detector)
Cost tracking¶
Monitor LLM spend with the cost tracker:
from trails.observability import register_observer
total_usd = 0.0
def cost_observer(kind, event):
global total_usd
if kind == "llm_call":
total_usd += event.get("cost_usd", 0)
print(f"LLM call: ${event['cost_usd']:.6f} "
f"(total: ${total_usd:.4f})")
register_observer(cost_observer)
Or use the built-in CostTracker with budget enforcement -- see the
Observability Guide for details.
See also¶
- Middleware Guide -- full reference
- Observability Guide -- event kinds, tracer, metrics
- Testing Guide --
isolated_kernel,mock_llm,capture_events - Admin UI Guide -- the operator dashboard
- Capabilities Guide --
@capabilityreference