Capability middleware¶
Cross-cutting concerns without touching capability bodies: audit logging, metrics, arg / result transforms, exception reshaping, retry logic. Trails exposes four decorators that register middleware by capability id or glob pattern, plus a runtime hook to discover which capability is currently executing.
Quickstart¶
import time
from trails import before, after, capability
@capability
def create_note(ctx, title: str, body: str) -> dict:
note = Node(title=title, body=body)
ctx.kg.add(note)
return {"id": note.id}
@before("create_note")
def _inject_timestamp(ctx, args):
return {"created_at": time.time()} # merged into args
@after("create_note")
def _log_result(ctx, args, result):
print(f"Created note {result['id']}")
# returning None leaves result unchanged
The four decorators¶
All four are importable from trails:
@before(pattern)¶
Runs before the handler. Signature: (ctx, args) -> None | dict.
- Return
None— args pass through unchanged. - Return a
dict— it is merged intoargsviaargs.update(result), letting middleware inject or transform inputs. - Multiple
@beforefor the same capability run in registration order. - If a
@beforeraises, the invocation aborts withTrailsError(original chained as__cause__). The handler never runs.
@after(pattern)¶
Runs after a successful handler invocation. Signature:
(ctx, args, result) -> None | any.
- Return
None— result passes through unchanged. - Return any other value — it replaces
result. - Does not run when the handler raises (use
@on_errorfor that). - Multiple
@afterrun in registration order, with each one's return value feeding into the next. - If an
@afterraises, the invocation aborts withTrailsError— the handler's successful result is lost.
@after("notes.*")
def enrich_result(ctx, args, result):
if isinstance(result, dict):
result["_enriched"] = True
return result
@on_error(pattern)¶
Runs when the handler raises. Signature:
(ctx, args, exc) -> None | Exception.
- Return
None— the current exception is re-raised unchanged. - Return an
Exception— it replaces the exception that propagates. - Multiple
@on_errorrun in registration order, with each one's returned exception (if any) feeding into the next. - If an
@on_erroritself raises, the error is logged and the original handler exception propagates. Middleware never masks the root cause.
@on_error("*")
def wrap_errors(ctx, args, exc):
if isinstance(exc, ValueError):
return TrailsError(f"validation failed: {exc}")
# return None → re-raise original
@around(pattern)¶
Wraps the entire invocation (before + handler + after). Signature:
(ctx, args, next) -> any.
nextis a zero-arg callable. Callnext()exactly once to run the inner middleware stack plus the handler. Its return value is the post-@afterhandler result.- Whatever the
@aroundfunction returns becomes the new result. - Multiple
@aroundform a LIFO stack — the last registered becomes the outermost wrapper, mirroring Python decorator composition. - If
@aroundraises, the exception propagates normally — it owns the whole frame. - Use sparingly. Most use cases are better served by
@before/@after.
@around("*")
def timing_wrapper(ctx, args, next):
t0 = time.monotonic()
result = next() # run the handler
dt = time.monotonic() - t0
print(f"Took {dt*1000:.1f}ms")
return result # must return the result
Pattern matching (glob)¶
The pattern argument on all four decorators accepts:
| Pattern | Matches |
|---|---|
"notes.create" |
Exact match only |
"notes.*" |
notes.create, notes.delete, notes.list |
"create_*" |
create_note, create_user, etc. |
"*" |
Every capability (catch-all) |
"notes.?" |
notes.a, notes.x (single-char wildcard) |
Only * and ? wildcards are supported — no regex, no character classes.
Matching uses fnmatch.fnmatchcase semantics (case-sensitive).
Patterns activate lazily: registering @before("notes.*") before any
notes.foo capability exists is valid. The runtime matches at invoke
time, not at decoration time.
Execution order per invoke¶
┌─ @around (outermost = last registered) ────────────────┐
│ ┌─ @around (next) ──────────────────────────────────┐ │
│ │ @before hooks (registration order) │ │
│ │ handler(ctx, **args) │ │
│ │ ── on success ── │ │
│ │ @after hooks (registration order) │ │
│ │ ── on failure ── │ │
│ │ @on_error hooks (registration order) │ │
│ └────────────────────────────────────────────────────┘ │
└────────────────────────────────────────────────────────-┘
Key points:
@beforehooks run in registration order (FIFO).@afterhooks run in registration order (FIFO).@on_errorhooks run in registration order (FIFO).@aroundhooks form a LIFO stack (last registered = outermost).@afterand@on_errorare mutually exclusive per invocation — only one set runs depending on whether the handler succeeded or raised.
Discovering the current capability id¶
Context uses __slots__, so middleware cannot attach arbitrary state
to ctx directly. The runtime publishes the current capability id via
a thread-local-like hook:
from trails.decorators import _current_capability_id
cap_id = _current_capability_id() # e.g. "notes.create" or None
Returns the capability id when called inside any middleware or
handler during trails.runtime.invoke. Returns None outside an
invocation. This is useful for generic catch-all middleware that needs
to know which capability triggered it.
Middleware failure semantics¶
| Middleware raises | What happens |
|---|---|
@before |
Invocation aborts with TrailsError (original chained as __cause__) |
@after |
Invocation aborts with TrailsError — the handler's successful result is lost |
@on_error |
Error is logged; the original handler exception propagates (never masks root cause) |
@around |
Propagates normally — it owns the whole frame |
Async middleware¶
Not supported in M0. Decorating an async def function with any
middleware decorator raises TrailsError at decoration time. Async
middleware lands in M1 alongside async capabilities.
Common patterns¶
Audit logger (catch-all)¶
Log every invocation outcome to the knowledge graph without touching any capability body:
import time
from trails import before, after, on_error
from trails.decorators import _current_capability_id
_T0: dict[str, float] = {} # keyed by ctx.trace_id
@before("*")
def _audit_start(ctx, args):
_T0[ctx.trace_id] = time.monotonic()
@after("*")
def _audit_success(ctx, args, result):
dt_ms = (time.monotonic() - _T0.pop(ctx.trace_id, 0)) * 1000
ctx.kg.node(labels=["AuditEntry"], properties={
"capability": _current_capability_id() or "unknown",
"principal": ctx.principal,
"outcome": "success",
"duration_ms": f"{dt_ms:.3f}",
})
@on_error("*")
def _audit_failure(ctx, args, exc):
dt_ms = (time.monotonic() - _T0.pop(ctx.trace_id, 0)) * 1000
ctx.kg.node(labels=["AuditEntry"], properties={
"capability": _current_capability_id() or "unknown",
"outcome": "failed",
"error": type(exc).__name__,
})
Note the side-dict pattern (_T0): because Context uses
__slots__, stash per-invocation state in a module-level dict keyed
by ctx.trace_id.
Timing middleware (@around)¶
Wrap every capability to measure wall-clock duration:
import time
from trails import around
from trails.decorators import _current_capability_id
@around("*")
def timing(ctx, args, next):
t0 = time.monotonic()
try:
result = next()
return result
finally:
dt = (time.monotonic() - t0) * 1000
cap = _current_capability_id() or "unknown"
print(f"[timing] {cap}: {dt:.1f}ms")
Argument validator¶
Validate or normalise inputs before the handler sees them:
from trails import before
from trails._core import TrailsError
@before("notes.*")
def _validate_title(ctx, args):
title = args.get("title", "")
if not title.strip():
raise TrailsError("validation: title must not be blank")
return {"title": title.strip()} # normalised value merged back
Retry with @around¶
Retry transient failures up to N times:
import time
from trails import around
@around("external.*")
def retry_transient(ctx, args, next):
last_exc = None
for attempt in range(3):
try:
return next()
except ConnectionError as exc:
last_exc = exc
time.sleep(0.1 * (2 ** attempt))
raise last_exc
What middleware is NOT¶
- Not observability. Use
trails.observability.register_observerfor fire-and-forget telemetry that never blocks the invocation. Middleware can abort an invocation; observers never can. - Not policy. Use
@policyfor allow/deny decisions tied to a specific capability — it integrates with Cedar andregister_principal_attrs. - Not the observer pattern. If you find yourself writing
@afterjust to log something, considerobservability.emitinstead.
Reference¶
from trails import before, after, on_error, around
from trails.decorators import _current_capability_id
@before(pattern: str) # (ctx, args) -> None | dict
@after(pattern: str) # (ctx, args, result) -> None | any
@on_error(pattern: str) # (ctx, args, exc) -> None | Exception
@around(pattern: str) # (ctx, args, next) -> any
_current_capability_id() -> str | None
Internal helpers (not part of public API):
_middleware— the global registry dict_collect_middleware(capability_id, kind)— returns all matching middleware of the given kind in registration order_matches(pattern, capability_id)— glob match usingfnmatch
See also: docs/guides/capabilities.md,
docs/guides/observability.md (coming soon),
docs/guides/policy.md.