Skip to content

Chapter 9 — Advanced Topics

This chapter covers the features you reach for after the basics are working: middleware for cross-cutting concerns, observability hooks, health checks, ontology evolution, the CLI generator system, custom store adapters, and the Rust kernel.


Middleware (@before, @after, @on_error, @around)

Middleware wraps capability invocations without touching handler bodies. Four decorators are available:

from trails import before, after, on_error, around

@before -- run before the handler

import time
from trails import before

@before("notes.*")
def inject_timestamp(ctx, args):
    """Merge a creation timestamp into args before the handler sees them."""
    return {"created_at": time.time()}  # merged into args

Return None to pass args through unchanged. Return a dict to merge into args. If you raise, the invocation aborts -- the handler never runs.

@after -- run after a successful handler

from trails import after

@after("notes.*")
def enrich_result(ctx, args, result):
    """Add metadata to every notes response."""
    if isinstance(result, dict):
        result["_enriched"] = True
        return result
    # returning None leaves result unchanged

Does not run when the handler raises. Multiple @after hooks chain: each one's return feeds into the next.

@on_error -- run when the handler raises

from trails import on_error
from trails._core import TrailsError

@on_error("*")
def wrap_errors(ctx, args, exc):
    """Convert ValueErrors to TrailsError for cleaner API responses."""
    if isinstance(exc, ValueError):
        return TrailsError(f"validation failed: {exc}")
    # return None → re-raise original exception

If an @on_error itself raises, the original exception propagates. Middleware never masks the root cause.

@around -- wrap the entire invocation

import time
from trails import around

@around("*")
def timing_wrapper(ctx, args, next):
    """Measure wall-clock duration of every capability."""
    t0 = time.monotonic()
    try:
        result = next()  # run the handler + inner middleware
        return result     # must return the result
    finally:
        dt = (time.monotonic() - t0) * 1000
        print(f"Took {dt:.1f}ms")

next() runs the inner middleware stack plus the handler. Call it exactly once. Multiple @around form a LIFO stack (last registered = outermost), matching Python decorator composition.

Pattern matching

All four decorators accept glob patterns:

Pattern Matches
"notes.create" Exact match only
"notes.*" notes.create, notes.delete, notes.list
"*" Every capability (catch-all)

Patterns use fnmatch.fnmatchcase semantics. They activate lazily -- registering @before("notes.*") before any notes.foo capability exists is fine.

Execution order

@around (outermost = last registered)
  @around (next)
    @before hooks (FIFO registration order)
    handler(ctx, **args)
    @after hooks  (FIFO, on success)
    @on_error hooks (FIFO, on failure)

Common pattern: audit logger

import time
from trails import before, after, on_error
from trails.decorators import _current_capability_id

_T0: dict[str, float] = {}

@before("*")
def _audit_start(ctx, args):
    _T0[ctx.trace_id] = time.monotonic()

@after("*")
def _audit_success(ctx, args, result):
    dt_ms = (time.monotonic() - _T0.pop(ctx.trace_id, 0)) * 1000
    ctx.kg.node(labels=["AuditEntry"], properties={
        "capability": _current_capability_id() or "unknown",
        "principal": ctx.principal,
        "outcome": "success",
        "duration_ms": f"{dt_ms:.3f}",
    })

@on_error("*")
def _audit_failure(ctx, args, exc):
    dt_ms = (time.monotonic() - _T0.pop(ctx.trace_id, 0)) * 1000
    ctx.kg.node(labels=["AuditEntry"], properties={
        "capability": _current_capability_id() or "unknown",
        "outcome": "failed",
        "error": type(exc).__name__,
    })

Note the side-dict pattern: Context uses __slots__, so per- invocation state goes in a module-level dict keyed by ctx.trace_id.

Common pattern: retry with @around

import time
from trails import around

@around("external.*")
def retry_transient(ctx, args, next):
    """Retry transient network failures up to 3 times."""
    last_exc = None
    for attempt in range(3):
        try:
            return next()
        except ConnectionError as exc:
            last_exc = exc
            time.sleep(0.1 * (2 ** attempt))
    raise last_exc

Common pattern: argument validation

from trails import before
from trails._core import TrailsError

@before("notes.*")
def validate_title(ctx, args):
    title = args.get("title", "")
    if not title.strip():
        raise TrailsError("validation: title must not be blank")
    return {"title": title.strip()}

For the full middleware reference, see the Middleware Guide.


Custom observability hooks

The observability system fires events on every capability invocation, LLM call, and KG read/write. Register an observer to wire any telemetry sink.

Register a basic observer

from trails.observability import register_observer

def my_logger(kind, event):
    """Log every event to stdout."""
    print(f"[{kind}] {event}")

register_observer(my_logger)

Observers are fire-and-forget: exceptions are caught and logged, never re-raised. They run synchronously on the invoke thread, so keep them microseconds-cheap. If your sink is remote, queue events and flush on a background thread.

Event kinds

Kind Fires when Key fields
capability_started Before handler runs capability_id, trace_id, principal
capability_completed After success capability_id, duration_ms, outcome
capability_failed Handler raises or policy denies error_kind, message
llm_call After LLMClient.complete() model, tokens, cost_usd
kg_write After ctx.kg.add/save/update op, model, dirty_fields
kg_query After ctx.kg.query/match row_count, sparql_kind

OpenTelemetry bridge

from opentelemetry import trace
from trails.observability import register_observer

otel = trace.get_tracer("my-app")
spans: dict[str, object] = {}

def to_otel(kind, event):
    if kind == "capability_started":
        spans[event["trace_id"]] = otel.start_span(event["capability_id"])
    elif kind in ("capability_completed", "capability_failed"):
        s = spans.pop(event["trace_id"], None)
        if s is not None:
            s.end()

register_observer(to_otel)

In-memory tracer (no OTel dependency)

from trails.observability import tracer

with tracer.span("my-operation", attributes={"version": 2}) as span:
    # nested tracer.span() calls inherit span.trace_id
    pass

# Query spans later
for s in tracer.list_spans():
    print(s.name, s.end_time - s.start_time)

Capture events in tests

from trails.testing import capture_events
from trails.runtime import invoke

with capture_events() as events:
    invoke("note.summarize", {"text": "..."})

kinds = [k for k, _ in events]
assert "capability_started" in kinds
assert "capability_completed" in kinds

For the full observability reference, see the Observability Guide.


trails doctor and health checks

trails doctor runs 13+ health checks against your project and reports pass/warn/fail for each:

trails doctor

Output:

[pass] trails.toml found
[pass] Python >= 3.11
[pass] Virtual environment active
[pass] Port 8080 available
[pass] Project layout valid (app.py or handlers/ found)
[pass] Data directory writable
[pass] FFI version matches Python package
[pass] 3 capabilities registered, 0 collisions
[pass] 2 shapes registered, 0 export errors
[pass] Graph store operational
[warn] No SHACL export configured
[pass] MCP server responds
[warn] Field 'nickname' has 0 queries across 200 writes — consider removal

Machine-readable output

trails doctor --json

Returns a JSON array of check results for CI integration:

[
  {"name": "toml", "status": "pass", "message": "trails.toml found"},
  {"name": "python_version", "status": "pass", "message": "Python 3.12.1"}
]

Check categories

Category Checks
Environment Python version, virtualenv, port availability
Project trails.toml, project layout, data directory
Framework FFI version, capabilities, shapes, store
Subsystems Federation config, RML availability, auto-ontology suggestions
Code quality Raw SPARQL in capability bodies (linter)

Python API

from pathlib import Path
from trails.doctor import run_checks

results = run_checks(Path("."))
for check in results:
    print(f"[{check.status}] {check.name}: {check.message}")

Ontology evolution (trails onto evolve)

The auto-ontology workflow (Chapter 8) produces initial types. Over time, your schema evolves. Trails supports this through:

Progressive layering

Each step is additive. You never break the previous level:

Labels → @node_type (fields) → @shape (SHACL) → OWL export

Schema migration from suggestions

When trails onto refine produces suggestions, generate migration code:

from trails.onto_refine import UsageCollector, SchemaAnalyzer, generate_migration_code

collector = UsageCollector()
collector.start()
# ... app runs ...

analyzer = SchemaAnalyzer(collector)
report = analyzer.analyze(min_samples=50)

# Generate reviewable migration snippets
code = generate_migration_code(report.suggestions)
print(code)

Output:

"""Migration code generated by `trails onto refine --apply`.

Review each change before applying.
"""
# --- Suggestion 1: make_optional ---
# Node type: Employee
# Field: nickname
# Reason: Field 'nickname' is null in 95% of 200 writes.
# Confidence: 0.95

# In your @node_type/Employee definition:
# Change: "nickname": str
# To:     "nickname": str | None

Re-inference on evolved data

When the data changes shape (new sources, schema migrations, new fields), re-run inference and diff:

# Current schema
trails onto infer -o models/current_inferred.py

# Compare with your hand-tuned models
diff models/employee.py models/current_inferred.py

New fields or types that appear in the inference but not in your models are candidates for schema expansion.


The CLI generator system

Trails provides Rails-style generators for common scaffolds:

Generate a capability

trails generate capability patient.intake
# or shorthand:
trails g cap patient.intake

Creates a capability stub with correct imports and decorator.

Generate a shape

trails g shape Patient

Creates a SHACL shape stub matching your @node_type declaration.

Generate a resource

trails g resource patient.list

Creates an MCP resource stub with correct URI template.

Create a new project

trails new myapp
cd myapp

Scaffolds a project directory with trails.toml, app.py, a conftest.py for testing, and a .gitignore.


Extending the registry

The capability registry is a module-level dict. You can inspect it programmatically:

from trails.decorators import _handlers

# List all registered capabilities
for cap_id, descriptor in _handlers.items():
    print(f"{cap_id}: {descriptor.description}")

Dynamic registration

Register capabilities from data (e.g., a plugin system):

from trails import capability

def register_from_config(config: dict):
    """Dynamically register capabilities from a config dict."""
    for name, handler_path in config["capabilities"].items():
        module_name, fn_name = handler_path.rsplit(".", 1)
        import importlib
        mod = importlib.import_module(module_name)
        fn = getattr(mod, fn_name)
        capability(id=name)(fn)

Registry isolation in tests

Always use isolated_kernel() to prevent test pollution:

from trails.testing import isolated_kernel

def test_my_capability():
    with isolated_kernel():
        @capability
        def my_cap(ctx) -> dict:
            return {"ok": True}
        # This registration is scoped to this block

Writing custom graph store adapters

The kernel store is an Oxigraph instance, but the Python surface communicates through a protocol. To add a new store backend:

The store protocol

A store adapter must implement these operations:

class MyStore:
    def query(self, sparql: str) -> list[dict]:
        """Execute a SPARQL SELECT and return bindings."""
        ...

    def update(self, sparql: str) -> None:
        """Execute a SPARQL INSERT/DELETE."""
        ...

    def add_triple(self, subject: str, predicate: str, object: str) -> None:
        """Add a single triple."""
        ...

    def size(self) -> int:
        """Return the number of triples."""
        ...

Example: wrap a remote SPARQL endpoint

import requests

class RemoteSparqlStore:
    """Query a remote SPARQL endpoint (read-only)."""

    def __init__(self, endpoint: str):
        self._endpoint = endpoint

    def query(self, sparql: str) -> list[dict]:
        resp = requests.post(
            self._endpoint,
            data={"query": sparql},
            headers={"Accept": "application/sparql-results+json"},
        )
        resp.raise_for_status()
        data = resp.json()
        return [
            {var: binding[var]["value"]
             for var in data["head"]["vars"]
             if var in binding}
            for binding in data["results"]["bindings"]
        ]

    def update(self, sparql: str) -> None:
        raise NotImplementedError("Remote store is read-only")

    def size(self) -> int:
        result = self.query("SELECT (COUNT(*) AS ?c) WHERE { ?s ?p ?o }")
        return int(result[0]["c"])

The Rust kernel (when and how to touch it)

Trails has a Rust kernel (7 active crates + 5 archived) that handles the core store, provenance, and FFI boundary. Most app developers never need to touch it.

When to touch the kernel

  • Never for normal application development. The Python surface covers everything.
  • Performance-critical store operations that hit Oxigraph limits.
  • Adding a new primitive to the framework (new event types, new store operations).
  • Fixing FFI bugs between the Rust and Python boundaries.

Architecture overview

python/src/trails/  ← Python surface (what you use)
    _core.py        ← FFI bindings via PyO3
    orm.py          ← ActiveGraph ORM
    ...

kernel/             ← Rust crates
    trails-core/    ← Store, provenance, core types
    trails-ffi/     ← PyO3 FFI boundary
    ...

Building locally

cd kernel
cargo build --release
# Install the Python extension
maturin develop --release

The FFI version check

trails doctor verifies that the installed Python package matches the compiled FFI extension. A mismatch produces:

[fail] FFI version: Python package 0.0.3 vs FFI extension 0.0.2

Fix by rebuilding the extension:

cd kernel && maturin develop --release

Performance tuning and benchmarking

Measure capability latency

Use the @around timing middleware from earlier in this chapter:

@around("*")
def timing(ctx, args, next):
    t0 = time.monotonic()
    result = next()
    dt = (time.monotonic() - t0) * 1000
    print(f"[timing] {_current_capability_id()}: {dt:.1f}ms")
    return result

Monitor with the metrics singleton

from trails.observability import metrics

# After running your app:
summary = metrics.get_summary()
print(f"Total invocations: {summary['total_invocations']}")
print(f"Total errors: {summary['total_errors']}")
for cap_id, stats in summary.get("capabilities", {}).items():
    print(f"  {cap_id}: {stats['count']} calls, "
          f"p50={stats['p50_ms']:.1f}ms, p99={stats['p99_ms']:.1f}ms")

Store query performance

For SPARQL query tuning, check row counts via the observer:

from trails.observability import register_observer

def slow_query_detector(kind, event):
    if kind == "kg_query" and event.get("duration_ms", 0) > 100:
        print(f"SLOW QUERY ({event['duration_ms']:.0f}ms): "
              f"{event.get('sparql_kind', 'unknown')}, "
              f"{event.get('row_count', '?')} rows")

register_observer(slow_query_detector)

Cost tracking

Monitor LLM spend with the cost tracker:

from trails.observability import register_observer

total_usd = 0.0

def cost_observer(kind, event):
    global total_usd
    if kind == "llm_call":
        total_usd += event.get("cost_usd", 0)
        print(f"LLM call: ${event['cost_usd']:.6f} "
              f"(total: ${total_usd:.4f})")

register_observer(cost_observer)

Or use the built-in CostTracker with budget enforcement -- see the Observability Guide for details.


See also