Chapter 10 — Recipes and Patterns¶
Practical, runnable examples for common tasks. Each recipe is self- contained: copy, adapt, ship.
Recipe: Note-taking app (5 minutes)¶
A complete note-taking app with CRUD operations and search.
1. Create the project¶
2. Define the model¶
# models.py
from trails import node_type
@node_type("Note", fields={
"title": str,
"content": str,
"tags": list[str],
})
class Note: ...
3. Write capabilities¶
# app.py
from trails import capability
from models import Note
@capability(id="notes.create", description="Create a new note")
def create_note(ctx, title: str, content: str, tags: str = "") -> dict:
tag_list = [t.strip() for t in tags.split(",") if t.strip()]
note = Note(title=title, content=content, tags=tag_list)
ctx.kg.add(note)
return {"id": note.id, "title": note.title}
@capability(id="notes.search", description="Search notes by title")
def search_notes(ctx, q: str) -> list:
hits = Note.where(title__icontains=q).fetch(ctx)
return [{"id": n.id, "title": n.title, "tags": n.tags} for n in hits]
@capability(id="notes.get", description="Get a note by ID")
def get_note(ctx, note_id: str) -> dict:
note = Note.find(note_id, ctx)
if note is None:
return {"error": "not found"}
return {"id": note.id, "title": note.title,
"content": note.content, "tags": note.tags}
@capability(id="notes.by_tag", description="Find notes with a specific tag")
def notes_by_tag(ctx, tag: str) -> list:
hits = Note.where(tags__contains=tag).fetch(ctx)
return [{"id": n.id, "title": n.title} for n in hits]
4. Run it¶
# As an MCP server (Claude Desktop, Cursor)
trails server --transport stdio
# Or via the admin UI
pip install 'trails[admin]'
trails-admin --app app --port 4455
5. Test it¶
# test_notes.py
from trails import capability
from trails.runtime import invoke
from trails.testing import isolated_kernel
def test_create_and_search():
with isolated_kernel():
from models import Note # noqa: register
import app # noqa: register capabilities
invoke("notes.create", {"title": "Hello", "content": "World"})
results = invoke("notes.search", {"q": "Hello"})
assert len(results) >= 1
assert results[0]["title"] == "Hello"
Recipe: REST API with FastAPI adapter¶
Expose Trails capabilities as a REST API.
# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from trails import capability, node_type
from trails.runtime import invoke
app = FastAPI(title="Patient API")
# --- Models ---
@node_type("Patient", fields={
"name": str,
"age": int,
"diagnosis": str,
})
class Patient: ...
# --- Capabilities ---
@capability(id="patient.create")
def create_patient(ctx, name: str, age: int, diagnosis: str) -> dict:
p = Patient(name=name, age=age, diagnosis=diagnosis)
ctx.kg.add(p)
return {"id": p.id, "name": p.name}
@capability(id="patient.list")
def list_patients(ctx) -> list:
return [
{"id": p.id, "name": p.name, "age": p.age}
for p in Patient.where().fetch(ctx)
]
@capability(id="patient.get")
def get_patient(ctx, patient_id: str) -> dict:
p = Patient.find(patient_id, ctx)
if p is None:
raise ValueError("not found")
return {"id": p.id, "name": p.name, "age": p.age,
"diagnosis": p.diagnosis}
# --- FastAPI routes ---
class PatientIn(BaseModel):
name: str
age: int
diagnosis: str
@app.post("/patients")
def api_create(body: PatientIn):
return invoke("patient.create", body.model_dump())
@app.get("/patients")
def api_list():
return invoke("patient.list", {})
@app.get("/patients/{patient_id}")
def api_get(patient_id: str):
try:
return invoke("patient.get", {"patient_id": patient_id})
except ValueError:
raise HTTPException(status_code=404, detail="Patient not found")
Run with:
Every invocation still goes through trails.invoke, so provenance,
cost tracking, and observability all work.
Recipe: MCP-discoverable service¶
Make your app discoverable by Claude Desktop, Cursor, or any MCP client.
1. Register capabilities with descriptions¶
# app.py
from trails import capability
@capability(id="research.search",
description="Search the knowledge base for papers matching a query")
def search(ctx, query: str, limit: int = 10) -> list:
hits = ctx.kg.match(labels=["Paper"], where={"title": query})
return hits[:limit]
@capability(id="research.summarize",
description="Summarize a paper by its ID")
def summarize(ctx, paper_id: str) -> dict:
triples = ctx.kg.traverse(subject=paper_id, label="has_abstract")
abstract = triples[0]["object"] if triples else "No abstract found"
reply = ctx.llm.complete([
{"role": "user", "content": f"Summarize: {abstract}"}
])
return {"summary": reply.text}
2. Start the MCP server¶
# stdio transport (for local MCP hosts)
trails server --transport stdio
# SSE transport (for remote/multi-client)
trails server --transport sse --port 8080
3. Configure Claude Desktop¶
Add to claude_desktop_config.json:
{
"mcpServers": {
"research": {
"command": "trails",
"args": ["server", "--transport", "stdio"],
"cwd": "/path/to/your/project"
}
}
}
Your capabilities appear as tools. Claude sees the descriptions and parameter types from your function signatures.
4. Add MCP resources (read-only data)¶
from trails.mcp import resource
@resource(uri="research://papers/recent",
description="List of recently added papers")
def recent_papers(ctx) -> str:
hits = ctx.kg.match(labels=["Paper"])
return "\n".join(h.get("title", "untitled") for h in hits[:20])
5. Add MCP prompts (reusable templates)¶
from trails.mcp import prompt
@prompt(name="literature-review",
description="Generate a literature review prompt for a topic")
def literature_review(topic: str) -> list:
return [
{"role": "user",
"content": f"Write a literature review on: {topic}. "
f"Focus on recent findings and identify gaps."}
]
Recipe: Multi-tenant app with Cedar policies¶
Restrict capabilities by tenant and role.
1. Define policies¶
// policies/tenant.cedar
// Admins can do everything in their tenant
permit (
principal,
action,
resource
) when {
principal.role == "admin" &&
resource.tenant == principal.tenant
};
// Viewers can only read
permit (
principal,
action == Action::"patient.list",
resource
) when {
principal.role == "viewer" &&
resource.tenant == principal.tenant
};
// Deny cross-tenant access by default
forbid (
principal,
action,
resource
) when {
resource.tenant != principal.tenant
};
2. Wire policies to capabilities¶
from trails import capability
from trails.policy import policy, register_principal_attrs
# Register tenant-aware principals
register_principal_attrs("did:tenant-a:alice", {
"role": "admin",
"tenant": "tenant-a",
})
register_principal_attrs("did:tenant-b:bob", {
"role": "viewer",
"tenant": "tenant-b",
})
@capability(id="patient.create")
@policy("policies/tenant.cedar")
def create_patient(ctx, name: str, tenant: str) -> dict:
iri = ctx.kg.node(labels=["Patient"],
properties={"name": name, "tenant": tenant})
return {"id": iri}
@capability(id="patient.list")
@policy("policies/tenant.cedar")
def list_patients(ctx, tenant: str) -> list:
return ctx.kg.match(labels=["Patient"], where={"tenant": tenant})
3. Test policy enforcement¶
from trails.testing import isolated_kernel, register_principal_role
def test_cross_tenant_denied():
with isolated_kernel():
register_principal_role("did:tenant-b:bob", "viewer")
# Bob (tenant-b) trying to list tenant-a patients
# should be denied by the Cedar policy
...
Recipe: Research assistant with LLM + vector search¶
Combine knowledge graph queries with vector similarity search.
# research_assistant.py
from trails import capability, node_type
from trails.vector import SentenceTransformerEmbedder, SqliteVecStore, retrieve
embedder = SentenceTransformerEmbedder()
vec_store = SqliteVecStore(path="vectors.db", dim=384)
@node_type("Paper", fields={
"title": str,
"abstract": str,
"year": int,
"authors": list[str],
})
class Paper: ...
@capability(id="research.ingest",
description="Add a paper to the knowledge base")
def ingest_paper(ctx, title: str, abstract: str, year: int,
authors: str) -> dict:
author_list = [a.strip() for a in authors.split(",")]
paper = Paper(title=title, abstract=abstract,
year=year, authors=author_list)
ctx.kg.add(paper)
# Also index in vector store for semantic search
vector = embedder.embed(f"{title}. {abstract}")
vec_store.add(id=paper.id, vector=vector,
metadata={"iri": paper.id, "snippet": title})
return {"id": paper.id}
@capability(id="research.semantic_search",
description="Find papers semantically similar to a query")
def semantic_search(ctx, query: str, k: int = 5) -> list:
hits = retrieve(query, mode="vector", k=k,
vector_store=vec_store, embedder=embedder)
results = []
for h in hits:
paper = Paper.find(h.iri, ctx)
if paper:
results.append({
"title": paper.title,
"year": paper.year,
"score": h.score,
"authors": paper.authors,
})
return results
@capability(id="research.ask",
description="Answer a research question using papers in the KB")
def ask_question(ctx, question: str) -> dict:
# 1. Find relevant papers via vector search
hits = retrieve(question, mode="vector", k=3,
vector_store=vec_store, embedder=embedder)
# 2. Build context from paper abstracts
context_parts = []
for h in hits:
paper = Paper.find(h.iri, ctx)
if paper:
context_parts.append(
f"[{paper.title} ({paper.year})]\n{paper.abstract}"
)
context = "\n\n---\n\n".join(context_parts)
# 3. Ask the LLM with retrieved context
reply = ctx.llm.complete([
{"role": "system",
"content": "Answer based on the provided papers. "
"Cite paper titles in your answer."},
{"role": "user",
"content": f"Papers:\n{context}\n\nQuestion: {question}"}
])
return {"answer": reply.text, "sources": [h.iri for h in hits]}
Recipe: Data pipeline (CSV -> RML -> KG -> query)¶
A complete ETL pipeline from structured data to queryable knowledge graph.
1. Source data: sales.csv¶
order_id,customer,product,quantity,price,date
1001,Acme Corp,Widget A,50,12.99,2025-01-15
1002,Beta Inc,Widget B,100,8.50,2025-01-16
1003,Acme Corp,Widget A,25,12.99,2025-02-01
1004,Gamma LLC,Widget C,10,45.00,2025-02-10
2. RML mapping: mappings/sales.ttl¶
@prefix rml: <http://semweb.mmlab.be/ns/rml#>.
@prefix ql: <http://semweb.mmlab.be/ns/ql#>.
@prefix rr: <http://www.w3.org/ns/r2rml#>.
@prefix ex: <https://myapp.example/>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.
<#OrderMapping>
rml:logicalSource [
rml:source "sales.csv";
rml:referenceFormulation ql:CSV
];
rr:subjectMap [
rr:template "https://myapp.example/Order/{order_id}";
rr:class ex:Order
];
rr:predicateObjectMap [
rr:predicate ex:customer;
rr:objectMap [ rml:reference "customer" ]
];
rr:predicateObjectMap [
rr:predicate ex:product;
rr:objectMap [ rml:reference "product" ]
];
rr:predicateObjectMap [
rr:predicate ex:quantity;
rr:objectMap [ rml:reference "quantity"; rr:datatype xsd:integer ]
];
rr:predicateObjectMap [
rr:predicate ex:price;
rr:objectMap [ rml:reference "price"; rr:datatype xsd:decimal ]
];
rr:predicateObjectMap [
rr:predicate ex:date;
rr:objectMap [ rml:reference "date"; rr:datatype xsd:date ]
].
3. Load, infer, query¶
from trails import capability, node_type
from trails.rml import run_mapping
from trails.onto_infer import infer_schema, generate_code
from trails.testing import fresh_context
# Step 1: Load via RML
ctx = fresh_context()
result = run_mapping(ctx, "mappings/sales.ttl", sales="sales.csv")
print(f"Loaded {result.triples_added} triples")
# Step 2: Infer schema
schema = infer_schema(ctx.kg._store, trace_id="sales-infer")
code = generate_code(schema)
print(code)
# Step 3: Use the inferred type (after review and commit)
@node_type("Order", fields={
"customer": str,
"product": str,
"quantity": int,
"price": float,
"date": str,
})
class Order: ...
# Step 4: Query capabilities
@capability(id="sales.by_customer")
def sales_by_customer(ctx, customer: str) -> list:
orders = Order.where(customer=customer).fetch(ctx)
return [{"product": o.product, "quantity": o.quantity,
"price": o.price} for o in orders]
@capability(id="sales.top_products")
def top_products(ctx, min_quantity: int = 10) -> list:
orders = Order.where(quantity__gte=min_quantity).fetch(ctx)
# Aggregate by product
totals: dict[str, int] = {}
for o in orders:
totals[o.product] = totals.get(o.product, 0) + o.quantity
return sorted(
[{"product": p, "total": t} for p, t in totals.items()],
key=lambda x: x["total"],
reverse=True,
)
Recipe: Federated knowledge mesh (2 instances)¶
Connect two Trails instances so one can query the other's graph.
Instance A: Research papers (the data source)¶
# instance_a.py — Research graph
from fastapi import FastAPI
from trails import node_type, capability
from trails.federation import FederationConfig, FederationEndpoint
from trails.federation_http import mount_federation_routes
from trails.testing import fresh_context
@node_type("Paper", fields={"title": str, "year": int})
class Paper: ...
ctx = fresh_context()
# Seed some data
for title, year in [("KG Survey", 2024), ("LLM Agents", 2025)]:
p = Paper(title=title, year=year)
ctx.kg.add(p)
# Expose SPARQL endpoint
app = FastAPI()
config = FederationConfig(enabled=True, read_only=True,
max_query_time_ms=30000)
endpoint = FederationEndpoint(store=ctx.kg._store, config=config)
mount_federation_routes(app, endpoint)
# Run: uvicorn instance_a:app --port 9001
Instance B: Consumer (queries Instance A)¶
# instance_b.py — Consumer that federates queries
import requests
def query_remote(sparql: str, endpoint: str = "http://localhost:9001/sparql"):
"""Query a remote Trails instance's SPARQL endpoint."""
resp = requests.post(
endpoint,
data={"query": sparql},
headers={"Accept": "application/sparql-results+json"},
)
resp.raise_for_status()
return resp.json()
# Find all papers from 2025
result = query_remote("""
PREFIX ex: <https://myapp.example/>
SELECT ?title ?year WHERE {
?s a ex:Paper ;
ex:title ?title ;
ex:year ?year .
FILTER(?year >= 2025)
}
""")
for binding in result["results"]["bindings"]:
print(f"{binding['title']['value']} ({binding['year']['value']})")
For the full federation setup (including authentication, mesh topology, and capability relay), see the Federation Guide.
Anti-patterns: what NOT to do¶
Don't use raw SPARQL when the ORM works¶
# BAD — fragile, no validation, no provenance on the read
@capability
def bad_search(ctx, name: str) -> list:
results = ctx.kg.query(
f"SELECT ?s WHERE {{ ?s <https://myapp.example/name> '{name}' }}"
)
return results
# GOOD — type-safe, validated, readable
@capability
def good_search(ctx, name: str) -> list:
hits = Employee.where(name=name).fetch(ctx)
return [{"id": e.id, "name": e.name} for e in hits]
trails doctor includes a linter check that flags raw SPARQL inside
capability bodies. Use the ORM for typed queries; reserve raw SPARQL for
truly ad-hoc or analytical queries that don't map to a model.
Don't skip SHACL validation¶
# BAD — accepts any garbage
@node_type("Patient", fields={"name": str, "age": int})
class Patient: ...
# No shape → no constraints → bad data enters the graph
# GOOD — enforce constraints
from trails import node_type, shape, predicate
@node_type("Patient", fields={"name": str, "age": int})
class Patient: ...
@shape(Patient)
class PatientShape:
name = predicate(min_length=1, max_length=200)
age = predicate(min_inclusive=0, max_inclusive=150)
Shapes catch bad data at write time, before it pollutes the graph.
Don't hardcode IRIs¶
# BAD — brittle, breaks if the base IRI changes
@capability
def bad_lookup(ctx, name: str) -> dict:
iri = f"https://myapp.example/Employee/{name}"
return ctx.kg.traverse(subject=iri, label="manages")
# GOOD — use the ORM's IRI minting
@capability
def good_lookup(ctx, name: str) -> dict:
employees = Employee.where(name=name).fetch(ctx)
if not employees:
return {"error": "not found"}
return {"manages": Employee.where(
manager=employees[0].id
).fetch(ctx)}
The ORM mints UUIDv7-based IRIs. Hardcoding IRI templates couples your code to a naming scheme that may change.
Don't ignore cost envelopes in production¶
# BAD — unbounded LLM spend
@capability
def expensive_analysis(ctx, text: str) -> dict:
reply = ctx.llm.complete([
{"role": "user", "content": f"Analyze: {text}"}
])
return {"analysis": reply.text}
# GOOD — set budget limits
from trails.agent import Session
from trails.agent.planners import react
@capability
def bounded_analysis(ctx, text: str) -> dict:
result = react.run(
f"Analyze: {text}",
llm=ctx.llm,
session=Session(principal=ctx.principal),
max_cost_usd=0.05, # cap at 5 cents
max_tokens=10000, # cap token usage
max_wall_time_s=30, # cap wall time
)
return {"analysis": result.answer}
Budget limits prevent runaway costs. Set max_cost_usd on every
production agent loop.
Migration guide: from raw rdflib to Trails¶
If you have an existing rdflib-based application, migrating to Trails is incremental.
Step 1: Replace the graph with ctx.kg¶
# Before (rdflib)
from rdflib import Graph, Literal, URIRef
g = Graph()
g.add((URIRef("urn:alice"), URIRef("urn:name"), Literal("Alice")))
results = g.query("SELECT ?name WHERE { ?s <urn:name> ?name }")
# After (Trails label-first — minimal change)
from trails import capability
@capability
def create(ctx) -> dict:
iri = ctx.kg.node(labels=["Person"],
properties={"name": "Alice"})
return {"id": iri}
Step 2: Add types when ready¶
from trails import node_type
@node_type("Person", fields={"name": str, "age": int})
class Person: ...
# Or let the framework discover types:
# trails onto infer -o models/types.py
Step 3: Replace raw SPARQL with ORM queries¶
# Before
results = g.query("""
SELECT ?name WHERE {
?s a <urn:Person> ;
<urn:name> ?name .
}
""")
# After
people = Person.where().fetch(ctx)
names = [p.name for p in people]
Step 4: Add provenance and observability for free¶
Trails adds provenance to every write and fires observability events
on every invocation. No code changes needed -- it happens automatically
when you use @capability and ctx.kg.
Step 5: Add shapes for validation¶
from trails import shape, predicate
@shape(Person)
class PersonShape:
name = predicate(min_length=1)
age = predicate(min_inclusive=0)
Migration checklist¶
- Replace
rdflib.Graph()withctx.kgin a@capability - Move SPARQL SELECTs to
Model.where().fetch(ctx) - Move SPARQL INSERTs to
ctx.kg.add(model_instance) - Run
trails onto inferto discover types from existing data - Add
@shapeconstraints where validation matters - Run
trails doctorto catch remaining raw SPARQL
See also¶
- Knowledge Graph Guide --
ctx.kgreference - ORM Guide --
@node_typeand query API - MCP Guide -- tools, resources, prompts
- Policy Guide -- Cedar authorization
- Vector Guide -- embeddings and retrieval
- Federation Guide -- multi-instance queries
- RML Guide -- declarative data mapping