Skip to content

Chapter 10 — Recipes and Patterns

Practical, runnable examples for common tasks. Each recipe is self- contained: copy, adapt, ship.


Recipe: Note-taking app (5 minutes)

A complete note-taking app with CRUD operations and search.

1. Create the project

trails new notes-app
cd notes-app

2. Define the model

# models.py
from trails import node_type

@node_type("Note", fields={
    "title": str,
    "content": str,
    "tags": list[str],
})
class Note: ...

3. Write capabilities

# app.py
from trails import capability
from models import Note

@capability(id="notes.create", description="Create a new note")
def create_note(ctx, title: str, content: str, tags: str = "") -> dict:
    tag_list = [t.strip() for t in tags.split(",") if t.strip()]
    note = Note(title=title, content=content, tags=tag_list)
    ctx.kg.add(note)
    return {"id": note.id, "title": note.title}

@capability(id="notes.search", description="Search notes by title")
def search_notes(ctx, q: str) -> list:
    hits = Note.where(title__icontains=q).fetch(ctx)
    return [{"id": n.id, "title": n.title, "tags": n.tags} for n in hits]

@capability(id="notes.get", description="Get a note by ID")
def get_note(ctx, note_id: str) -> dict:
    note = Note.find(note_id, ctx)
    if note is None:
        return {"error": "not found"}
    return {"id": note.id, "title": note.title,
            "content": note.content, "tags": note.tags}

@capability(id="notes.by_tag", description="Find notes with a specific tag")
def notes_by_tag(ctx, tag: str) -> list:
    hits = Note.where(tags__contains=tag).fetch(ctx)
    return [{"id": n.id, "title": n.title} for n in hits]

4. Run it

# As an MCP server (Claude Desktop, Cursor)
trails server --transport stdio

# Or via the admin UI
pip install 'trails[admin]'
trails-admin --app app --port 4455

5. Test it

# test_notes.py
from trails import capability
from trails.runtime import invoke
from trails.testing import isolated_kernel

def test_create_and_search():
    with isolated_kernel():
        from models import Note  # noqa: register
        import app  # noqa: register capabilities

        invoke("notes.create", {"title": "Hello", "content": "World"})
        results = invoke("notes.search", {"q": "Hello"})
        assert len(results) >= 1
        assert results[0]["title"] == "Hello"

Recipe: REST API with FastAPI adapter

Expose Trails capabilities as a REST API.

# api.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from trails import capability, node_type
from trails.runtime import invoke

app = FastAPI(title="Patient API")

# --- Models ---

@node_type("Patient", fields={
    "name": str,
    "age": int,
    "diagnosis": str,
})
class Patient: ...

# --- Capabilities ---

@capability(id="patient.create")
def create_patient(ctx, name: str, age: int, diagnosis: str) -> dict:
    p = Patient(name=name, age=age, diagnosis=diagnosis)
    ctx.kg.add(p)
    return {"id": p.id, "name": p.name}

@capability(id="patient.list")
def list_patients(ctx) -> list:
    return [
        {"id": p.id, "name": p.name, "age": p.age}
        for p in Patient.where().fetch(ctx)
    ]

@capability(id="patient.get")
def get_patient(ctx, patient_id: str) -> dict:
    p = Patient.find(patient_id, ctx)
    if p is None:
        raise ValueError("not found")
    return {"id": p.id, "name": p.name, "age": p.age,
            "diagnosis": p.diagnosis}

# --- FastAPI routes ---

class PatientIn(BaseModel):
    name: str
    age: int
    diagnosis: str

@app.post("/patients")
def api_create(body: PatientIn):
    return invoke("patient.create", body.model_dump())

@app.get("/patients")
def api_list():
    return invoke("patient.list", {})

@app.get("/patients/{patient_id}")
def api_get(patient_id: str):
    try:
        return invoke("patient.get", {"patient_id": patient_id})
    except ValueError:
        raise HTTPException(status_code=404, detail="Patient not found")

Run with:

uvicorn api:app --reload --port 8000

Every invocation still goes through trails.invoke, so provenance, cost tracking, and observability all work.


Recipe: MCP-discoverable service

Make your app discoverable by Claude Desktop, Cursor, or any MCP client.

1. Register capabilities with descriptions

# app.py
from trails import capability

@capability(id="research.search",
            description="Search the knowledge base for papers matching a query")
def search(ctx, query: str, limit: int = 10) -> list:
    hits = ctx.kg.match(labels=["Paper"], where={"title": query})
    return hits[:limit]

@capability(id="research.summarize",
            description="Summarize a paper by its ID")
def summarize(ctx, paper_id: str) -> dict:
    triples = ctx.kg.traverse(subject=paper_id, label="has_abstract")
    abstract = triples[0]["object"] if triples else "No abstract found"
    reply = ctx.llm.complete([
        {"role": "user", "content": f"Summarize: {abstract}"}
    ])
    return {"summary": reply.text}

2. Start the MCP server

# stdio transport (for local MCP hosts)
trails server --transport stdio

# SSE transport (for remote/multi-client)
trails server --transport sse --port 8080

3. Configure Claude Desktop

Add to claude_desktop_config.json:

{
  "mcpServers": {
    "research": {
      "command": "trails",
      "args": ["server", "--transport", "stdio"],
      "cwd": "/path/to/your/project"
    }
  }
}

Your capabilities appear as tools. Claude sees the descriptions and parameter types from your function signatures.

4. Add MCP resources (read-only data)

from trails.mcp import resource

@resource(uri="research://papers/recent",
          description="List of recently added papers")
def recent_papers(ctx) -> str:
    hits = ctx.kg.match(labels=["Paper"])
    return "\n".join(h.get("title", "untitled") for h in hits[:20])

5. Add MCP prompts (reusable templates)

from trails.mcp import prompt

@prompt(name="literature-review",
        description="Generate a literature review prompt for a topic")
def literature_review(topic: str) -> list:
    return [
        {"role": "user",
         "content": f"Write a literature review on: {topic}. "
                    f"Focus on recent findings and identify gaps."}
    ]

Recipe: Multi-tenant app with Cedar policies

Restrict capabilities by tenant and role.

1. Define policies

// policies/tenant.cedar

// Admins can do everything in their tenant
permit (
    principal,
    action,
    resource
) when {
    principal.role == "admin" &&
    resource.tenant == principal.tenant
};

// Viewers can only read
permit (
    principal,
    action == Action::"patient.list",
    resource
) when {
    principal.role == "viewer" &&
    resource.tenant == principal.tenant
};

// Deny cross-tenant access by default
forbid (
    principal,
    action,
    resource
) when {
    resource.tenant != principal.tenant
};

2. Wire policies to capabilities

from trails import capability
from trails.policy import policy, register_principal_attrs

# Register tenant-aware principals
register_principal_attrs("did:tenant-a:alice", {
    "role": "admin",
    "tenant": "tenant-a",
})
register_principal_attrs("did:tenant-b:bob", {
    "role": "viewer",
    "tenant": "tenant-b",
})

@capability(id="patient.create")
@policy("policies/tenant.cedar")
def create_patient(ctx, name: str, tenant: str) -> dict:
    iri = ctx.kg.node(labels=["Patient"],
                      properties={"name": name, "tenant": tenant})
    return {"id": iri}

@capability(id="patient.list")
@policy("policies/tenant.cedar")
def list_patients(ctx, tenant: str) -> list:
    return ctx.kg.match(labels=["Patient"], where={"tenant": tenant})

3. Test policy enforcement

from trails.testing import isolated_kernel, register_principal_role

def test_cross_tenant_denied():
    with isolated_kernel():
        register_principal_role("did:tenant-b:bob", "viewer")
        # Bob (tenant-b) trying to list tenant-a patients
        # should be denied by the Cedar policy
        ...

Combine knowledge graph queries with vector similarity search.

# research_assistant.py
from trails import capability, node_type
from trails.vector import SentenceTransformerEmbedder, SqliteVecStore, retrieve

embedder = SentenceTransformerEmbedder()
vec_store = SqliteVecStore(path="vectors.db", dim=384)

@node_type("Paper", fields={
    "title": str,
    "abstract": str,
    "year": int,
    "authors": list[str],
})
class Paper: ...

@capability(id="research.ingest",
            description="Add a paper to the knowledge base")
def ingest_paper(ctx, title: str, abstract: str, year: int,
                 authors: str) -> dict:
    author_list = [a.strip() for a in authors.split(",")]
    paper = Paper(title=title, abstract=abstract,
                  year=year, authors=author_list)
    ctx.kg.add(paper)

    # Also index in vector store for semantic search
    vector = embedder.embed(f"{title}. {abstract}")
    vec_store.add(id=paper.id, vector=vector,
                  metadata={"iri": paper.id, "snippet": title})

    return {"id": paper.id}

@capability(id="research.semantic_search",
            description="Find papers semantically similar to a query")
def semantic_search(ctx, query: str, k: int = 5) -> list:
    hits = retrieve(query, mode="vector", k=k,
                    vector_store=vec_store, embedder=embedder)
    results = []
    for h in hits:
        paper = Paper.find(h.iri, ctx)
        if paper:
            results.append({
                "title": paper.title,
                "year": paper.year,
                "score": h.score,
                "authors": paper.authors,
            })
    return results

@capability(id="research.ask",
            description="Answer a research question using papers in the KB")
def ask_question(ctx, question: str) -> dict:
    # 1. Find relevant papers via vector search
    hits = retrieve(question, mode="vector", k=3,
                    vector_store=vec_store, embedder=embedder)

    # 2. Build context from paper abstracts
    context_parts = []
    for h in hits:
        paper = Paper.find(h.iri, ctx)
        if paper:
            context_parts.append(
                f"[{paper.title} ({paper.year})]\n{paper.abstract}"
            )

    context = "\n\n---\n\n".join(context_parts)

    # 3. Ask the LLM with retrieved context
    reply = ctx.llm.complete([
        {"role": "system",
         "content": "Answer based on the provided papers. "
                    "Cite paper titles in your answer."},
        {"role": "user",
         "content": f"Papers:\n{context}\n\nQuestion: {question}"}
    ])

    return {"answer": reply.text, "sources": [h.iri for h in hits]}

Recipe: Data pipeline (CSV -> RML -> KG -> query)

A complete ETL pipeline from structured data to queryable knowledge graph.

1. Source data: sales.csv

order_id,customer,product,quantity,price,date
1001,Acme Corp,Widget A,50,12.99,2025-01-15
1002,Beta Inc,Widget B,100,8.50,2025-01-16
1003,Acme Corp,Widget A,25,12.99,2025-02-01
1004,Gamma LLC,Widget C,10,45.00,2025-02-10

2. RML mapping: mappings/sales.ttl

@prefix rml: <http://semweb.mmlab.be/ns/rml#>.
@prefix ql:  <http://semweb.mmlab.be/ns/ql#>.
@prefix rr:  <http://www.w3.org/ns/r2rml#>.
@prefix ex:  <https://myapp.example/>.
@prefix xsd: <http://www.w3.org/2001/XMLSchema#>.

<#OrderMapping>
    rml:logicalSource [
        rml:source "sales.csv";
        rml:referenceFormulation ql:CSV
    ];
    rr:subjectMap [
        rr:template "https://myapp.example/Order/{order_id}";
        rr:class ex:Order
    ];
    rr:predicateObjectMap [
        rr:predicate ex:customer;
        rr:objectMap [ rml:reference "customer" ]
    ];
    rr:predicateObjectMap [
        rr:predicate ex:product;
        rr:objectMap [ rml:reference "product" ]
    ];
    rr:predicateObjectMap [
        rr:predicate ex:quantity;
        rr:objectMap [ rml:reference "quantity"; rr:datatype xsd:integer ]
    ];
    rr:predicateObjectMap [
        rr:predicate ex:price;
        rr:objectMap [ rml:reference "price"; rr:datatype xsd:decimal ]
    ];
    rr:predicateObjectMap [
        rr:predicate ex:date;
        rr:objectMap [ rml:reference "date"; rr:datatype xsd:date ]
    ].

3. Load, infer, query

from trails import capability, node_type
from trails.rml import run_mapping
from trails.onto_infer import infer_schema, generate_code
from trails.testing import fresh_context

# Step 1: Load via RML
ctx = fresh_context()
result = run_mapping(ctx, "mappings/sales.ttl", sales="sales.csv")
print(f"Loaded {result.triples_added} triples")

# Step 2: Infer schema
schema = infer_schema(ctx.kg._store, trace_id="sales-infer")
code = generate_code(schema)
print(code)

# Step 3: Use the inferred type (after review and commit)
@node_type("Order", fields={
    "customer": str,
    "product": str,
    "quantity": int,
    "price": float,
    "date": str,
})
class Order: ...

# Step 4: Query capabilities
@capability(id="sales.by_customer")
def sales_by_customer(ctx, customer: str) -> list:
    orders = Order.where(customer=customer).fetch(ctx)
    return [{"product": o.product, "quantity": o.quantity,
             "price": o.price} for o in orders]

@capability(id="sales.top_products")
def top_products(ctx, min_quantity: int = 10) -> list:
    orders = Order.where(quantity__gte=min_quantity).fetch(ctx)
    # Aggregate by product
    totals: dict[str, int] = {}
    for o in orders:
        totals[o.product] = totals.get(o.product, 0) + o.quantity
    return sorted(
        [{"product": p, "total": t} for p, t in totals.items()],
        key=lambda x: x["total"],
        reverse=True,
    )

Recipe: Federated knowledge mesh (2 instances)

Connect two Trails instances so one can query the other's graph.

Instance A: Research papers (the data source)

# instance_a.py — Research graph
from fastapi import FastAPI
from trails import node_type, capability
from trails.federation import FederationConfig, FederationEndpoint
from trails.federation_http import mount_federation_routes
from trails.testing import fresh_context

@node_type("Paper", fields={"title": str, "year": int})
class Paper: ...

ctx = fresh_context()

# Seed some data
for title, year in [("KG Survey", 2024), ("LLM Agents", 2025)]:
    p = Paper(title=title, year=year)
    ctx.kg.add(p)

# Expose SPARQL endpoint
app = FastAPI()
config = FederationConfig(enabled=True, read_only=True,
                          max_query_time_ms=30000)
endpoint = FederationEndpoint(store=ctx.kg._store, config=config)
mount_federation_routes(app, endpoint)

# Run: uvicorn instance_a:app --port 9001

Instance B: Consumer (queries Instance A)

# instance_b.py — Consumer that federates queries
import requests

def query_remote(sparql: str, endpoint: str = "http://localhost:9001/sparql"):
    """Query a remote Trails instance's SPARQL endpoint."""
    resp = requests.post(
        endpoint,
        data={"query": sparql},
        headers={"Accept": "application/sparql-results+json"},
    )
    resp.raise_for_status()
    return resp.json()

# Find all papers from 2025
result = query_remote("""
    PREFIX ex: <https://myapp.example/>
    SELECT ?title ?year WHERE {
        ?s a ex:Paper ;
           ex:title ?title ;
           ex:year ?year .
        FILTER(?year >= 2025)
    }
""")

for binding in result["results"]["bindings"]:
    print(f"{binding['title']['value']} ({binding['year']['value']})")

For the full federation setup (including authentication, mesh topology, and capability relay), see the Federation Guide.


Anti-patterns: what NOT to do

Don't use raw SPARQL when the ORM works

# BAD — fragile, no validation, no provenance on the read
@capability
def bad_search(ctx, name: str) -> list:
    results = ctx.kg.query(
        f"SELECT ?s WHERE {{ ?s <https://myapp.example/name> '{name}' }}"
    )
    return results

# GOOD — type-safe, validated, readable
@capability
def good_search(ctx, name: str) -> list:
    hits = Employee.where(name=name).fetch(ctx)
    return [{"id": e.id, "name": e.name} for e in hits]

trails doctor includes a linter check that flags raw SPARQL inside capability bodies. Use the ORM for typed queries; reserve raw SPARQL for truly ad-hoc or analytical queries that don't map to a model.

Don't skip SHACL validation

# BAD — accepts any garbage
@node_type("Patient", fields={"name": str, "age": int})
class Patient: ...
# No shape → no constraints → bad data enters the graph

# GOOD — enforce constraints
from trails import node_type, shape, predicate

@node_type("Patient", fields={"name": str, "age": int})
class Patient: ...

@shape(Patient)
class PatientShape:
    name = predicate(min_length=1, max_length=200)
    age = predicate(min_inclusive=0, max_inclusive=150)

Shapes catch bad data at write time, before it pollutes the graph.

Don't hardcode IRIs

# BAD — brittle, breaks if the base IRI changes
@capability
def bad_lookup(ctx, name: str) -> dict:
    iri = f"https://myapp.example/Employee/{name}"
    return ctx.kg.traverse(subject=iri, label="manages")

# GOOD — use the ORM's IRI minting
@capability
def good_lookup(ctx, name: str) -> dict:
    employees = Employee.where(name=name).fetch(ctx)
    if not employees:
        return {"error": "not found"}
    return {"manages": Employee.where(
        manager=employees[0].id
    ).fetch(ctx)}

The ORM mints UUIDv7-based IRIs. Hardcoding IRI templates couples your code to a naming scheme that may change.

Don't ignore cost envelopes in production

# BAD — unbounded LLM spend
@capability
def expensive_analysis(ctx, text: str) -> dict:
    reply = ctx.llm.complete([
        {"role": "user", "content": f"Analyze: {text}"}
    ])
    return {"analysis": reply.text}

# GOOD — set budget limits
from trails.agent import Session
from trails.agent.planners import react

@capability
def bounded_analysis(ctx, text: str) -> dict:
    result = react.run(
        f"Analyze: {text}",
        llm=ctx.llm,
        session=Session(principal=ctx.principal),
        max_cost_usd=0.05,    # cap at 5 cents
        max_tokens=10000,      # cap token usage
        max_wall_time_s=30,    # cap wall time
    )
    return {"analysis": result.answer}

Budget limits prevent runaway costs. Set max_cost_usd on every production agent loop.


Migration guide: from raw rdflib to Trails

If you have an existing rdflib-based application, migrating to Trails is incremental.

Step 1: Replace the graph with ctx.kg

# Before (rdflib)
from rdflib import Graph, Literal, URIRef
g = Graph()
g.add((URIRef("urn:alice"), URIRef("urn:name"), Literal("Alice")))
results = g.query("SELECT ?name WHERE { ?s <urn:name> ?name }")

# After (Trails label-first — minimal change)
from trails import capability

@capability
def create(ctx) -> dict:
    iri = ctx.kg.node(labels=["Person"],
                      properties={"name": "Alice"})
    return {"id": iri}

Step 2: Add types when ready

from trails import node_type

@node_type("Person", fields={"name": str, "age": int})
class Person: ...

# Or let the framework discover types:
# trails onto infer -o models/types.py

Step 3: Replace raw SPARQL with ORM queries

# Before
results = g.query("""
    SELECT ?name WHERE {
        ?s a <urn:Person> ;
           <urn:name> ?name .
    }
""")

# After
people = Person.where().fetch(ctx)
names = [p.name for p in people]

Step 4: Add provenance and observability for free

Trails adds provenance to every write and fires observability events on every invocation. No code changes needed -- it happens automatically when you use @capability and ctx.kg.

Step 5: Add shapes for validation

from trails import shape, predicate

@shape(Person)
class PersonShape:
    name = predicate(min_length=1)
    age = predicate(min_inclusive=0)

Migration checklist

  • Replace rdflib.Graph() with ctx.kg in a @capability
  • Move SPARQL SELECTs to Model.where().fetch(ctx)
  • Move SPARQL INSERTs to ctx.kg.add(model_instance)
  • Run trails onto infer to discover types from existing data
  • Add @shape constraints where validation matters
  • Run trails doctor to catch remaining raw SPARQL

See also