ActiveGraph ORM¶
The ORM is how a capability body reads and writes typed data without
hand-rolling SPARQL. It binds a Python class to an RDF type, mints
stable IRIs, lowers Django-style filters to SPARQL FILTER
expressions, and passes every write through the same kernel store that
emits provenance. The full design lives in
ADR-0017; the progressive-
enhancement framing that keeps the ORM on one surface instead of three
tiers is ADR-0021. For a
guided, feature-by-feature walk, read
Growing Your KG App alongside
this reference.
Quickstart¶
One node type, one capability, one fetch:
from trails import capability, node_type
@node_type("Note", fields={"title": str, "content": str})
class Note: ...
@capability
def create_note(ctx, title: str, content: str) -> dict:
note = Note(title=title, content=content)
ctx.kg.add(note)
return {"id": note.id}
@capability
def search_notes(ctx, q: str) -> list:
hits = Note.where(title__icontains=q).fetch(ctx)
return [{"id": n.id, "title": n.title} for n in hits]
Note(...) validates, mints a UUIDv7 IRI, and defers persistence until
ctx.kg.add. Note.where(...).fetch(ctx) returns hydrated Note
instances, not dicts.
@node_type — declaring a type¶
from trails import node_type
@node_type("Patient", fields={"name": str, "age": int}, extends=None)
class Patient: ...
The signature is node_type(label, *, fields, extends=None).
Field types. Scalars are str, int, float, bool, and
datetime.datetime. Multi-valued fields use parameterized list[T]
where T is one of those five scalars. bool is stored precisely — an
int passed to a bool field is rejected. Nested types, dict, and
set are not supported; model them as their scalar serialisation or
drop to a raw SPARQL INSERT.
datetime — xsd:dateTime. Declare fields={"created_at": datetime}
and pass real datetime.datetime instances. Writes lower to
"<isoformat>"^^xsd:dateTime — microseconds are preserved when
non-zero, and the tz offset (+00:00, -05:00, Z) rides along when
present. Reads parse the literal back via datetime.fromisoformat(...);
an unparseable literal from the store hydrates as a raw str with a
UserWarning so one bad triple cannot crash a whole read.
Range filters (created_at__lt=, __lte, __gt, __gte) lower to
FILTER(?ts < "..."^^xsd:dateTime). Lexicographic comparison on ISO-8601
is the right answer only when every value is UTC (or every value
shares the same offset). Mixed-tz ranges (some +02:00, some -08:00,
some naive) will compare by lexical string order, which is not
chronological. Normalise to UTC client-side before writing. Naive
datetimes (no tzinfo) pass through unchanged — that's a choice, not a
guarantee; the store has no way to guess your intended zone.
Plain datetime.date (without a time component) is rejected with a
pointed error — wrap it in a datetime.combine(...) or declare the
field as str if a date-only column is what you actually want.
Existing string-typed timestamp fields keep working; switching a field
from str to datetime is an opt-in on the producer side (old rows
written as xsd:string will still read back, but fromisoformat will
cover most common ISO-8601 lexical forms on re-read).
IRI minting. The class-level type IRI defaults to
trails://<project>/<Label>; per-instance IRIs get a UUIDv7 suffix
(trails://<project>/<Label>/<uuid7>). <project> is read from
trails.toml's [project].name and falls back to "local". Override
the scheme by setting [project].base_iri = "https://myapp.example/"
in trails.toml; the ORM concatenates <base_iri><Label> verbatim.
Construction. Note(title="...", content="...") runs JSON-Schema-
style validation: unknown kwargs raise, missing scalars raise, list
fields default to [], each list element is validated against the
inner type. An explicit iri= wins over auto-minting.
Introspection. Note._trails_node_type is the NodeTypeMeta the
decorator captured. meta.iri is the rdf:type IRI; meta.field_iri("title")
yields the predicate IRI. extends=["https://…/Document"] records
parent type IRIs as metadata only — Phase 3 will materialise them as
rdfs:subClassOf triples; today they round-trip through meta.extends
without hitting the store.
Writes — ctx.kg.add(instance)¶
ctx.kg.add lowers the instance to triples and runs one INSERT DATA
through the kernel store: one triple for rdf:type, one per scalar
field, and one per element in each list-valued field (no rdf:List
collection node — each element shares the same predicate).
Provenance. When ctx is the Context the runtime built for a
dispatched capability, the prov:Activity is emitted at the capability
boundary per ADR-0009; the ORM
piggy-backs on that boundary. Constructing a Context manually outside
invoke() (seed scripts, REPL) is allowed, but no provenance is
recorded — same stance as raw ctx.kg.update.
add is insert-only. Re-calling ctx.kg.add on an instance whose
IRI already exists in the store writes new triples alongside the old
ones (the store de-duplicates at the triple level, so unchanged values
net out, but changed values end up with both old and new triples
present). For read-modify-write cycles, use instance.save(ctx).
Mutating instances — instance.save(ctx)¶
note = Note.find(ctx, iri)
note.title = "revised"
note.tags.append("urgent")
existed = note.save(ctx) # True — the IRI already had triples
save is an upsert. For every declared field on the @node_type,
the old triples (<instance.id> <field-iri> ?o) are deleted and the
current Python state is re-inserted. The whole operation lowers to
one SPARQL 1.1 UPDATE with semicolon-separated DELETE ... WHERE
clauses followed by one INSERT DATA block — a single round-trip
through the kernel store, not N.
- Idempotent. Calling
savetwice with the same state is a no-op (same predicates cleared, same triples re-inserted). - List fields: delete-all, then insert-all.
- Optional fields set to
None: the predicate is dropped, no re-insert. The absence of the triple is the None. - Validation: both the declared-type check and SHACL
validate_instancefire before any write, so a shape violation leaves the store on the pre-save values. - Return value:
Truewhen the IRI already existed (an update),Falsewhen this was the first insert (equivalent toctx.kg.add). - Symmetric helper:
ctx.kg.save(instance)is a thin wrapper for single-dispatch call sites.
Transactional safety. The M0/M1 kernel is single-writer and
synchronous; one save call is atomic w.r.t. other SPARQL statements
but is not durable across crashes and not isolated from
concurrent writers on the same subject. Two racing saves on the same
IRI may interleave and leave partial state. Real transactional
boundaries land with the M5 concurrent-writer work.
Deprecation hint. Existing field-update code that drops to
ctx.kg.update("DELETE { ... } INSERT { ... } WHERE { ... }") can now
collapse to save. Keep the raw hatch for multi-subject updates,
named-graph writes, and anything that touches predicates outside the
declared node-type field set — everything else is better spelled as a
mutation + save.
Dirty-tracking — partial saves¶
save only writes fields that changed. Every direct attribute
assignment to a declared field marks it as dirty; save(ctx) emits
DELETE/INSERT clauses for just the dirty predicates and clears the
set on success.
note = Note.find(ctx, iri) # loaded — clean, nothing to save
note.title = "revised" # marks "title" dirty
assert note.is_dirty()
assert note.dirty_fields() == {"title"}
note.save(ctx) # one-predicate SPARQL; unchanged fields untouched
assert not note.is_dirty() # cleared after save
- New instances (
Note(...)) start with every declared field dirty, so the first save writes everything (equivalent toctx.kg.add). - Loaded instances (
Model.find,Model.where().fetch()) start clean. A save with no mutations is a zero-SPARQL no-op — it still returnsTrue/Falseby probing subject existence. force=Truebypasses dirty-tracking and rewrites every declared field (the pre-tracking behavior, kept for callers that want an unconditional re-assertion).
In-place list mutation is NOT tracked. Python routes
note.tags.append("x") through the list object, not through
Model.__setattr__, so the dirty tracker cannot see it. Two options:
# (A) Prefer assignment — goes through __setattr__, tracked automatically.
note.tags = [*note.tags, "urgent"]
note.save(ctx)
# (B) Or mutate in place, then mark the field dirty explicitly.
note.tags.append("urgent")
note.mark_dirty("tags")
note.save(ctx)
mark_dirty("field") is the escape hatch for any mutation pattern the
__setattr__ hook cannot observe. is_dirty() and dirty_fields()
return introspection state.
The _dirty bookkeeping attribute is not persisted — it is ignored by
to_triples and every other write path.
Reads — Model.find and Model.where¶
Model.find(ctx, id_or_iri) resolves a single instance or returns
None:
Model.where(**filters) returns a QueryBuilder that is chainable and
lazy — nothing executes until you call .fetch(ctx):
fetch returns list[Note] with scalar fields already coerced back to
their declared Python types (the kernel round-trips literals as strings).
List fields are populated by a follow-up per-subject SELECT, so the
main query stays free of Cartesian blowup.
Filter suffixes. field=value is equality. The suffix table:
| Suffix | Meaning | Applies to |
|---|---|---|
__gt, __gte, __lt, __lte |
Numeric / string comparisons | any scalar |
__in |
SPARQL IN (…) over a Python list/tuple |
any scalar |
__contains |
Substring (case-sensitive) | any scalar, but almost always str |
__icontains |
Substring (case-insensitive) | str only |
__startswith, __endswith |
Prefix/suffix (case-sensitive) | str only |
__istartswith, __iendswith |
Prefix/suffix (case-insensitive) | str only |
Unknown fields raise TrailsError at lowering time, not at fetch.
List-field filters. Note.where(tags="urgent") on a list[str]
field raises TrailsError with a pointer to ctx.kg.query(...) — the
surface deliberately refuses a semantics that would otherwise have to
guess between ANY and ALL.
Property paths (M8 Phase 3)¶
A @node_type field may reference another @node_type-decorated class
instead of a scalar. That declares a typed edge (one BGP triple with an
IRI object instead of a literal) and unlocks Django-style dotted-chain
traversal in where(). The full semantics live in
ADR-0017a.
Declaring reference fields. The target class must itself be a
@node_type-decorated class and must already be declared when the
referencing class is processed — decorators run at import time, so
declaration order matters:
from trails import node_type
@node_type("Person", fields={"name": str})
class Person: ...
@node_type("CareTeam", fields={"lead": Person, "name": str})
class CareTeam: ...
@node_type("Patient", fields={"care_team": CareTeam, "name": str})
class Patient: ...
Multi-valued edges use list[CareTeam] just like list[str].
Traversal in filters. A chain of __-joined segments walks edges
until it hits a scalar leaf:
Existential by default — one matching care-team member with one lead
named Alice satisfies. The chain lowers to a single BGP with one triple
per hop, anchored by ?iri a <Patient>.
Universal quantifier (__all__). A segment-level marker that
back-patches the preceding hop into a universal quantifier, lowered as
nested FILTER NOT EXISTS (double negation — vanilla SPARQL has no
forall). Per ADR-0017a,
__all__ is reserved: it is not a filter suffix, not a field name, and
must follow a reference hop:
Universal is vacuously true on empty edges; AND in an existential filter if "non-empty AND universal" is what you mean. It is also slower on large graphs — prefer the existential default when it fits.
Suffixes on the leaf. Every suffix from the table above still works on the scalar leaf of a chain:
Composition with Q. Property-path keys compose exactly like flat
keys — |, &, ~ all work, including mixing paths with flat scalars
and chains under different parents:
from trails.orm import Q
hits = Patient.where(
Q(care_team__lead__name="Alice") | Q(care_team__lead__name="Bob")
).fetch(ctx)
Fetch behavior. A where() containing any property-path chain
lowers to a two-phase fetch: SELECT DISTINCT ?iri WHERE { … } that
walks the chain(s), then per-IRI hydration through the single-instance
read path. DISTINCT defends against BGP blow-up on multi-valued hops;
hydration cost scales with matched subjects, so tight LIMIT still
helps on large graphs.
Ref values on write. ctx.kg.add accepts either a Model instance
of the declared target type (the ORM reads .id) or a bare IRI string
(the ORM trusts the caller for the type):
team = CareTeam(lead=alice, name="Team-A")
ctx.kg.add(alice); ctx.kg.add(team)
ctx.kg.add(Patient(care_team=team, name="p1")) # Model instance
ctx.kg.add(Patient(care_team=team.id, name="p2")) # bare IRI
Read-back. Reference fields hydrate as IRI strings, not as nested Model instances — no auto-traversal, no lazy loading (ADR-0017 §Scope fence). Follow a ref explicitly when you need the target:
patient = Patient.find(ctx, patient_iri)
team = CareTeam.find(ctx, patient.care_team) # patient.care_team is an IRI str
Optional single-valued fields (T | None). Declare a scalar or
reference field as optional by wrapping its type in Optional[T] or
T | None. Optional fields default to None, skip the triple on write
when unset, and round-trip back to None when no triple exists on
read. Model.where(field=None) selects subjects with no triple for
that predicate (lowers to a SPARQL !BOUND(?field) over an OPTIONAL
binding).
@node_type("Defect", fields={"title": str, "found_in": TestRun | None})
class Defect: ...
d = Defect(title="bug-1") # found_in is None
ctx.kg.add(d)
orphans = Defect.where(found_in=None).fetch(ctx) # subjects with no edge
Only binary Optional is supported — int | str | None raises at
@node_type time. Optional[list[T]] is also rejected; use an empty
list to mean "no elements". Property-path traversal through an optional
ref skips None subjects existentially and matches them vacuously
under __all__.
Out of scope in Phase 3 — flagged here so you know when to drop to
ctx.kg.query(...):
across_graphs([...])cross-graph traversal (ADR-0017b — own sprint).- Aggregates (
COUNT,MAX) on property paths. - Back-references / inverse properties (
^in SPARQL paths). __inover a property-path chain (care_team__lead__name__in=[...]) — raises with a hint to composeQnodes with|or drop to raw SPARQL.- Writing through associations (
patient.care_team = team; save()mutating persisted triples post-save).
Boolean logic — Q objects¶
Q is the composable filter node for OR, AND, NOT, and nested
shapes. Import it from trails.orm:
from trails.orm import Q
hits = Note.where(Q(title__icontains=q) | Q(content__icontains=q)).fetch(ctx)
strict = Note.where(~Q(status="archived")).fetch(ctx)
mixed = Note.where((Q(priority=1) | Q(priority=2)) & Q(active=True)).fetch(ctx)
Operators: | is OR, & is AND, ~ is NOT. They follow Python
precedence — wrap with parentheses when mixing. Positional Q objects
to where() are ANDed together and also ANDed with any kwargs:
Q() (no arguments) is the identity — it matches everything and is
safe to use as a seed for programmatic composition:
filt = Q()
for word in query.split():
filt &= (Q(title__icontains=word) | Q(content__icontains=word))
Note.where(filt).fetch(ctx)
Every Q lowers to a single boolean SPARQL FILTER expression; no
UNION, no FILTER NOT EXISTS (today).
Deletes¶
Four forms, matching the two access patterns of the rest of the ORM:
# Instance — you already have a hydrated Note.
note.delete(ctx) # -> True / False
# Class + IRI — "delete whatever is at this id."
Note.delete(ctx, note_iri) # -> True / False
# Query — delete everything matching the filters.
removed = Note.where(priority__gte=5).delete(ctx) # -> int
# Explicit blanket delete — required, by design.
n = Note.delete_all(ctx) # -> int
Model.delete is a dual-mode descriptor: the instance call and the
classmethod call are the same name. where(...).delete(ctx) with no
filters refuses — you must spell the bulk form as delete_all so the
"nuke everything of this type" action is deliberate, never accidental.
Each delete lowers to DELETE WHERE { <iri> ?p ?o } per subject, so
every triple where the subject appears is removed; triples where the
subject is the object are untouched.
Aggregates¶
Terminal aggregate methods on QueryBuilder cover the common
cardinality / total / extrema queries without dropping to raw SPARQL.
Each honours any .where(...) / Q(...) filters already applied,
including property-path chains.
# How many open tasks?
Task.where(status="open").count(ctx) # -> int
# Total story points across open tasks.
Task.where(status="open").sum("points", ctx) # -> int | float
# Average priority across everything.
Note.where().avg("priority", ctx) # -> float | None
# Earliest / latest timestamp.
Event.where().min("created_at", ctx) # -> datetime | None
Event.where().max("created_at", ctx) # -> datetime | None
# Lexically smallest / largest title.
Note.where().min("title", ctx) # -> str | None
# Works through property paths.
Book.where(author__name="Alice").count(ctx)
Book.where(author__name="Alice").sum("sales", ctx)
Signatures.
count(ctx) -> int
sum(field, ctx) -> int | float
avg(field, ctx) -> float | None
min(field, ctx) -> Any | None
max(field, ctx) -> Any | None
Return types. .sum() returns int when the field is declared
int, float when it is declared float — never narrows a float
field to int. .avg() always returns float. .min() / .max()
return the same Python type the field is declared as (via the same
coercion path hydration uses), so a datetime field yields a real
datetime.datetime, not an ISO string.
Empty-match convention. .count() and .sum() return 0 (or
0.0) for empty matches. .avg(), .min(), and .max() return
None — averaging an empty set is undefined, and there is no
sentinel "minimum of nothing." Check for None before arithmetic.
Numeric-only. .sum() and .avg() require the target field to
be int or float (optional or not). Passing a string or datetime
raises with a pointer at .count() / .min() / .max(). .min()
and .max() accept any scalar (numeric, string, datetime, bool);
reference fields and list fields raise on any aggregate.
Ordering and limit. .order_by() and .limit() are ignored by
aggregate terminals — SPARQL rejects ORDER BY on a scalar aggregate
without GROUP BY, and LIMIT on a single-row result is a no-op.
Nullable fields. Subjects with no triple for the target field are
excluded from the aggregate's input set (consistent with SPARQL's
MIN/MAX on unbound values). .sum("priority", ctx) on a mix of bound
and unbound values sums only the bound ones.
Annotations (grouped aggregates)¶
.count() / .sum() / .avg() / .min() / .max() return a single
scalar for the whole matched set. When a caller needs per-matched-
instance aggregates — e.g. a traceability report that shows each
requirement's test count, run count, and defect count in one table —
.annotate(**spec) groups the aggregates by the matched subject and
hangs the results onto each returned instance as plain attributes.
from trails.orm import Count, Sum, Avg, Min, Max
rows = Requirement.where(status="approved").annotate(
test_count=Count("covered_by"),
run_count=Count("covered_by__test_runs"),
defect_count=Count("covered_by__test_runs__defects"),
).fetch(ctx)
for r in rows:
print(r.title, r.test_count, r.run_count, r.defect_count)
Each keyword becomes a read-only attribute on every returned instance.
Count(path) counts distinct objects reachable via a __-separated
property path; the path may end at a reference hop (count related
entities) or at a scalar leaf (count literal bindings). Sum, Avg,
Min, Max require a scalar leaf — same rules as the whole-set
terminals (Sum / Avg want int/float; Min / Max work on any
ordered scalar).
totals = Order.annotate(
line_count=Count("line_items"),
total_sales=Sum("line_items__price"),
avg_price=Avg("line_items__price"),
min_price=Min("line_items__price"),
max_price=Max("line_items__price"),
).fetch(ctx)
Model.annotate(**spec) is a shortcut for Model.where().annotate(**spec)
when there is no filter.
Empty-set convention. A matched subject with zero related objects
gets the same default the whole-set terminals return: 0 for
Count, 0 (int field) or 0.0 (float field) for Sum, and None
for Avg / Min / Max. A zero-match base query (.where(...)
returns no subjects) is an empty list — .annotate(...) is a
no-op decoration.
SPARQL lowering — one query per annotation. A single SPARQL
GROUP BY cannot co-project N independent COUNT(DISTINCT …) over
different multi-valued property paths without a Cartesian blow-up:
covered_by__test_runs and covered_by__defects next to each other
would multiply row counts by cross-product and corrupt the counts.
The ORM therefore runs one aggregate sub-query per annotation —
SELECT ?iri (AGG AS ?name) WHERE { … OPTIONAL { <path> } } GROUP BY
?iri — reusing the base .where(...) WHERE body verbatim, and merges
results by ?iri in Python. For N annotations, expect N + 1
round-trips: one base fetch + N aggregate queries. Each annotation
query is small and keyed on the same subject set; the alternative
(one monster OPTIONAL block) would pay for the blow-up instead.
Collision safety. An annotation name that collides with a
declared field on the model raises at .annotate() call time — it
would silently corrupt the instance's own value on hydration
otherwise. Pick a different name.
Not persisted. Annotation attrs are derived values from a SPARQL
round-trip; they are set via object.__setattr__ to bypass dirty
tracking, and instance.save(ctx) only rewrites declared fields.
Assigning to an annotation name and calling save is a no-op.
Projections & existence¶
Four terminals close the common Django-parity gap: .values(),
.values_list(), .exists(), .distinct(). None hydrate Model
instances — they skip the per-field coercion loop and return raw
(typed) Python values.
# .values(*fields) — list[dict]; ``id`` is always surfaced.
rows = Note.where(priority__gt=3).values("title", "priority").fetch(ctx)
# [{"id": "trails://local/Note/...", "title": "Login", "priority": 4}, ...]
# .values_list(*fields, flat=False) — list[tuple] or (with flat=True +
# exactly one field) list[scalar].
ids = Note.where(published=True).values_list("id", flat=True).fetch(ctx)
# ["trails://local/Note/...", ...]
# .exists(ctx) — boolean; emits SPARQL ``ASK { ... }`` so the kernel
# short-circuits on the first match instead of counting everything.
if Requirement.where(status="approved").exists(ctx):
...
# .distinct() — toggles ``SELECT DISTINCT`` on the emitted query. Most
# useful with ``values_list(..., flat=True)`` so the de-dup set matches
# the user-visible projection (``?iri`` is dropped from SELECT for that
# shape precisely so the scalar de-dupes).
statuses = Note.where().values_list("status", flat=True).distinct().fetch(ctx)
# ["draft", "approved", "archived"]
Composability. .values(), .values_list(), .distinct(),
.order_by(), and .limit() compose in any chain order — they only
set flags that the terminal reads at build time:
# All three chains yield the same result.
Note.where(p__gte=2).order_by("p").limit(3).values("title").fetch(ctx)
Note.where(p__gte=2).values("title").order_by("p").limit(3).fetch(ctx)
Note.where(p__gte=2).limit(3).values("title").order_by("p").fetch(ctx)
Annotations. .values() / .values_list() rows never auto-project
annotations. List the annotation name explicitly to include it:
rows = (
Note.where()
.annotate(tag_count=Count("tags"))
.values("title", "tag_count")
.fetch(ctx)
)
# [{"id": "...", "title": "A", "tag_count": 3}, ...]
Distinct caveat. SPARQL 1.1's SELECT DISTINCT de-duplicates on the
full projected tuple, not per-column. For the values_list(..., flat=True)
single-scalar shape Trails drops ?iri from SELECT so per-column de-dup
actually happens. For .values() / .values_list() with multiple
columns, DISTINCT still runs across the whole row including ?iri, so
it mostly only eliminates structural duplicates introduced by property-
path fan-out. distinct("status") is accepted as a validation-only
form (typos fail loudly) but does not rewrite the SPARQL to per-column
DISTINCT — there is no portable DISTINCT ON (col) in SPARQL.
Unit of Work — atomic mutations¶
UnitOfWork (in trails.uow) collects model saves, deletes, and raw
SPARQL UPDATE statements, then commits them atomically. On exception the
buffer is discarded and no writes happen.
from trails.uow import UnitOfWork
@capability("transfer_ownership")
def transfer(ctx, post_id: str, new_owner: str) -> dict:
with UnitOfWork(ctx) as uow:
post = Post.find(ctx, post_id)
post.owner = new_owner
uow.save(post)
log = AuditLog(action="transfer", target=post_id)
uow.save(log)
# Both committed here; on exception, nothing is committed.
return {"ok": True}
UnitOfWork operates at the application level (model instances) and
delegates to ctx.kg.transaction() for the actual atomic write.
Caveat:
UnitOfWorkprovides client-side atomicity (all-or-nothing on commit) but not transaction isolation. Concurrent writers may see partial state from otherUnitOfWorkinstances. For full ACID semantics, use a transactional store backend.
Escape hatches¶
Two raw handles on ctx.kg sit one layer below the ORM:
rows = ctx.kg.query("SELECT ?s WHERE { ?s a <…/Note> }")
ctx.kg.update("INSERT DATA { <…/Note/x> <…/tags> \"urgent\" . }")
query(sparql) takes a SELECT or ASK and returns list[dict[str, str]];
update(sparql) takes any UPDATE form and returns a (inserted,
deleted) tuple from the kernel. Both pass through the same store as the
ORM and stay inside the capability's provenance envelope.
Reach for these when:
- You need an aggregate beyond
.count/.sum/.avg/.min/.maxand.annotate(**Count/Sum/Avg/Min/Max)(HAVING,SAMPLE, multi-field aggregates in a single SELECT,COUNT DISTINCTon a non-subject expression). The five terminals cover whole-set cardinality / total / extrema;.annotate()covers the per-subject grouped case. - You are filtering a list field of scalars (
Note.where(tags="x")raises by design — see Anti-patterns below). Property-path chains land natively in M8 Phase 3; containment on multi-valued scalar leaves stays an escape-hatch case. - Your query's shape is a natural
UNIONof disjoint WHERE-blocks rather than one parameterised block. - You need
FROM NAMEDto scope across named graphs explicitly.
The escape hatch is the load-bearing signal that the ORM is finite —
use it without apology, and leave a # TODO(promote to ORM) comment
when the pattern recurs so the next sprint can widen the surface.
Anti-patterns¶
N+1 reads. Fetching parents and then looping to fetch each parent's
children is the oldest ORM failure mode. With property paths, the join
rides on a single where(); reach for the raw hatch only when you need
variables the ORM does not project (the intermediate member, an
aggregate, a named graph):
# Wrong — one query per patient.
for p in Patient.where().fetch(ctx):
leads = ctx.kg.query(f"SELECT ?l WHERE {{ <{p.id}> <…/care_team> ?m . ?m <…/lead> ?l }}")
# Right — one `where()`, one BGP, hydrated patients.
patients = Patient.where(care_team__lead__name="Alice").fetch(ctx)
# Raw hatch — when you want the intermediate hop in the result set too.
rows = ctx.kg.query("""
SELECT ?patient ?lead WHERE {
?patient a <…/Patient> ; <…/care_team> ?m .
?m <…/lead> ?lead .
}
""")
Mutate-and-re-add. Changing an attribute on a fetched Note and
calling ctx.kg.add(note) a second time does not replace the old
value — add is insert-only, so the old and new triples coexist.
Use note.save(ctx) (upsert) instead; see "Mutating instances —
instance.save(ctx)" above. The raw
ctx.kg.update("DELETE { … } INSERT { … } WHERE { … }") hatch remains
for multi-subject updates and anything touching predicates outside the
node-type's declared field set.
Filtering list-of-scalar fields through where. Note.where(tags="x")
on a list[str] raises by design, not by bug — the v1 surface refuses
to guess between existential and universal semantics on multi-valued
scalar leaves. (Multi-valued reference edges are covered by property
paths and the __all__ quantifier above.) Use the raw-SPARQL hatch
with the field's predicate IRI
(Note._trails_node_type.field_iri("tags")) for list-of-scalar
membership checks.
Reference¶
Everything exported from trails.orm:
| Symbol | One-liner |
|---|---|
node_type(label, *, fields, extends=None) |
Decorator: register a class as a Trails node type. |
Model |
Base class produced by @node_type; carries id, where, find, delete, delete_all. |
Model.where(*q_nodes, **filters) -> QueryBuilder |
Start a chainable query; positional Qs + kwargs are ANDed. |
Model.find(ctx, id_or_iri) -> Model \| None |
Single-instance lookup by IRI; None when absent. |
instance.save(ctx) -> bool |
Upsert: per-field predicate replacement, one SPARQL 1.1 UPDATE. Returns True on update, False on first insert. |
Model.delete(ctx[, id_or_iri]) -> bool |
Dual-mode: instance delete or class-level delete by id. |
Model.delete_all(ctx) -> int |
Explicit blanket delete of every instance of this type. |
Model.id |
Property returning the instance's canonical IRI. |
Model.to_triples() -> Iterator[(s, p, o)] |
Raw triple emitter — used by kg.add; exposed for introspection. |
QueryBuilder.limit(n) -> QueryBuilder |
Set LIMIT n on the lowered SPARQL. |
QueryBuilder.order_by(field, *, descending=False) -> QueryBuilder |
Set ORDER BY. |
QueryBuilder.fetch(ctx) -> list[Model] |
Terminal: execute and hydrate. |
QueryBuilder.delete(ctx) -> int |
Terminal: delete every matching subject (refuses with no filters). |
QueryBuilder.annotate(**spec) -> QueryBuilder |
Attach per-instance aggregates (Count/Sum/Avg/Min/Max); one SPARQL per annotation, merged on ?iri. |
Model.annotate(**spec) -> QueryBuilder |
Shortcut for Model.where().annotate(**spec). |
Count(path) / Sum(path) / Avg(path) / Min(path) / Max(path) |
Aggregate value objects for .annotate(...); path is a __-separated property path. |
Q(**filters) |
Composable filter leaf; supports \|, &, ~. Q() is identity. |
NodeTypeMeta |
Dataclass attached to each @node_type class as _trails_node_type. |
The context-side surface lives on trails.context.KG:
| Symbol | One-liner |
|---|---|
ctx.kg.add(instance) |
Persist a @node_type instance (one INSERT DATA, insert-only). |
ctx.kg.save(instance) -> bool |
Upsert mirror of instance.save(ctx). |
ctx.kg.find(model_cls, id_or_iri) |
Delegates to Model.find. |
ctx.kg.where(model_cls, **filters) |
Delegates to Model.where. |
ctx.kg.query(sparql) -> list[dict] |
Raw SPARQL SELECT/ASK escape hatch. |
ctx.kg.update(sparql) -> tuple[int, int] |
Raw SPARQL UPDATE escape hatch. |
ctx.kg.node(labels=, properties=) |
Create a label-first node (M11 Phase 1). |
ctx.kg.edge(subject=, label=, object=) |
Create a label-first edge triple. |
ctx.kg.match(labels=, types=, where=) |
List nodes by label set and/or @node_type set + equality filters. |
ctx.kg.traverse(subject=, label=) |
Walk one edge hop; returns neighbor IRI list. |
Label-first nodes & edges¶
The ctx.kg.node / edge / match / traverse helpers are the
progressive-enhancement entry point behind
ADR-0021: write nodes,
edges, and properties without declaring a @node_type, a JSON schema,
or a SHACL shape. They live on the same ctx.kg handle as the ORM
surface documented above, mint IRIs under disjoint segments
(<prefix>node/..., <prefix>label/..., <prefix>prop/...,
<prefix>edge/...) so existing data is never silently retyped when a
class is later decorated, and share the project prefix resolver with
@node_type.
kg.match also accepts types=[NodeCls | "iri", ...] for discovering
@node_type-typed subjects through the same surface, ANDed with any
labels= constraint. For signatures, IRI namespace map, where=
resolution rules, the trails kg CLI, anti-patterns, and the full
reference, see the dedicated guide: Label-first Knowledge
Graph.
Reasoning (opt-in)¶
Reasoning is the last step on the progressive-enhancement ladder from
ADR-0021: once your
ontology declares RDFS (rdfs:subClassOf, rdfs:subPropertyOf,
rdfs:domain, rdfs:range) or OWL axioms (owl:Class,
owl:inverseOf, owl:TransitiveProperty, owl:SymmetricProperty,
owl:equivalentClass), any SPARQL query can honour those entailments
without setting a tier field or re-declaring anything on the
capability.
# Default: queries never pay for reasoning they didn't ask for.
rows = ctx.kg.query("ASK { <urn:A> rdfs:subClassOf <urn:C> }")
# => [{"_boolean": False}] when only A→B and B→C are asserted.
# Opt in per call. trails auto-detects OWL signals and picks RDFS or
# OWL-RL accordingly.
rows = ctx.kg.query(
"ASK { <urn:A> rdfs:subClassOf <urn:C> }",
reason=True,
)
# => [{"_boolean": True}]; subClassOf transitivity materialized.
# Or flip the default on a KG handle so every query auto-reasons.
from trails.context import KG
ctx.kg = KG(ctx.kg._store, ctx, reason=True)
Rules:
- Opt-in, per ADR-0004.
The default stays
reason=False— writes do not auto-materialize, and unsuspecting queries never pay the cost. - Auto-detection.
trails.reasoning.detect_owl(store)probes the store with one SPARQLSELECT (COUNT(*))per axiom kind and returns{"has_rdfs": bool, "has_owl_rl": bool, "axiom_counts": {...}}. Thequery(reason=True)hook uses this to pick RDFS vs. OWL-RL. - Idempotent.
materialize_rdfs/materialize_owl_rlrun their rule sets to a fixpoint. A second call on a converged store is a no-op (returns0), so reasoning a hot store only pays the probe cost. - Phase 1 path. Materialization is implemented in Python via
SPARQL INSERT WHERE loops that mirror
rust/crates/trails-reasonrule-for-rule. The Rust reasoner isn't yet exposed through the PyO3 FFI; once it is, this hook will call into Rust and keep the Python loop as a fallback. - Per-call override.
kg.query(..., reason=False)wins over aKG(reason=True)default — handy for hot paths that want to skip reasoning on a specific query.
If you need raw access, call the helpers directly:
from trails import reasoning
signals = reasoning.detect_owl(ctx.kg._store, ctx.trace_id)
if signals["has_owl_rl"]:
reasoning.materialize_owl_rl(ctx.kg._store, ctx.trace_id)
elif signals["has_rdfs"]:
reasoning.materialize_rdfs(ctx.kg._store, ctx.trace_id)
Async variants — afind / afetch / asave / adelete¶
Every terminal ORM operation has an async twin. The sync surface is unchanged; async is purely additive.
# Read
note = await Note.afind(ctx, note_id)
# Query
hot = await Note.where(priority__gte=5).afetch(ctx)
# Write
note.title = "updated"
await note.asave(ctx)
# Delete
await note.adelete(ctx)
Concurrent operations parallelize under asyncio.gather:
Backing path (Wave 2)¶
The kernel FFI exposes async_query natively via the Tokio runtime, but its
return shape is a W3C SPARQL-Results-JSON string rather than the flat
list[dict] the ORM's hydration path consumes, and async_update is not yet
exposed at all. Rather than maintain two hydration code paths, the async ORM
methods wrap their sync equivalents through asyncio.to_thread. Semantics —
dirty-tracking, SHACL validation gate, nullable-field handling, reference
fields, Q-object composition — are identical to the sync path because they
are the sync path, just run on a worker thread.
Trade-off: each await pays a thread-hop rather than riding the native Tokio
bridge. Once the FFI grows async_update and a shape-matching async_query,
this module can switch its internal lowering without changing the public
afind / afetch / asave / adelete signatures.
The M0 kernel is still a single-writer store (ADR-0004), so concurrent
asaves on the same subject remain a user-side race — the async surface
gives you concurrency across subjects and across reads, not a transaction
manager. Real concurrent-writer semantics land with M5.
Extensions (M11 Phase 2)¶
Three additive kwargs broaden @node_type without changing the existing
surface. None of them is required; existing call sites keep working unchanged.
extends=<ParentType> — type inheritance¶
A child type pulls every parent field into its own field set and emits a
single rdfs:subClassOf triple linking the child's RDF type IRI to the
parent's. Inherited predicates keep the parent's IRI so a reasoned query
against the parent type sees child instances on the same predicates (no
per-field bridging axiom required).
from datetime import datetime
@node_type("Document", fields={"title": str, "content": str})
class Document: ...
@node_type("Article", extends=Document, fields={"published_at": datetime})
class Article: ... # has title, content, published_at
a = Article(title="Hello", content="…", published_at=datetime.now())
ctx.kg.add(a)
# Reasoning is opt-in (ADR-0004). Without it, the parent query is empty:
assert Document.where().fetch(ctx) == [] # cold
from trails import reasoning
reasoning.materialize_rdfs(ctx.kg._store, ctx.trace_id)
docs = Document.where().fetch(ctx) # warm — finds Article instances
Narrowing rule. A child may re-declare an inherited field with a
compatibly narrower type — typically dropping Optional[T] to require
the value. Widening (e.g. parent required, child Optional) and incompatible
re-typing (str → int) raise at decoration time.
The legacy extends=["<iri>", ...] list form (external-vocabulary metadata
only, no triples emitted) is preserved.
Embedded objects — plain dataclass fields¶
A field whose type is a plain @dataclass (not another @node_type)
persists as a named sub-IRI under the owning subject, with one predicate per
declared dataclass attribute. Round-trip is automatic.
from dataclasses import dataclass
@dataclass
class Address:
street: str
city: str
postcode: int
@node_type("Customer", fields={"name": str, "address": Address})
class Customer: ...
c = Customer(name="Alice", address=Address(street="1 Main", city="Berlin",
postcode=10115))
ctx.kg.add(c)
loaded = Customer.find(ctx, c.id)
assert loaded.address.city == "Berlin"
Design choice — named sub-IRIs. Embeds mint a deterministic-per-instance
named IRI (<owner>/<field>/<uuid7>), not a blank node. Named IRIs give
deterministic re-targeting on save() and dodge Oxigraph's blank-node
labelling quirks across separate INSERT DATA calls. The IRI is opaque —
do not depend on its shape.
Scope (v1).
- One layer of embedding only. An embed dataclass whose own attribute is
itself a dataclass is rejected at
@node_typetime. - Embed attributes must be scalars (
str/int/float/bool/datetime). - List-valued embed attributes and reference fields inside an embed are out of scope for v1.
- Compound-unique constraints over embed fields are rejected at decoration time.
Distinguishing embed from ref: a field type with @node_type applied is a
reference (lowers to an IRI object triple); a plain @dataclass without
@node_type is an embed.
unique=[...] — compound uniqueness constraints¶
Declare cross-field uniqueness as a list of field-name tuples. Each
save() / kg.add() runs a SPARQL ASK against the store and rejects
inserts that would collide on every field in the tuple with a different
existing subject.
@node_type(
"Note",
fields={"title": str, "author": str, "body": str},
unique=[("title", "author")],
)
class Note: ...
ctx.kg.add(Note(title="T", author="A", body="first"))
ctx.kg.add(Note(title="T", author="A", body="second")) # raises TrailsError
A None-valued slot short-circuits the check for that tuple — "no value"
is not a collision target. Combine with a SHACL not-null rule if you want
nullable uniqueness to be rejected.
Multiple tuples are checked independently. Updating an existing subject without changing its unique-tuple values is a no-op (the subject is excluded from the ASK).
Reasoner coordination¶
Inheritance leans on RDFS rdfs:subClassOf rather than rewriting child
queries on the Python side. With reasoning off (the default), a child query
still works because the child instance asserts rdf:type <ChildIRI>
directly; the parent query simply does not see child instances. With
reasoning.materialize_rdfs (or any RDFS-capable reasoner) applied, the
parent query surfaces both — for free, the same way RDFS handles
hierarchies for any other RDF dataset.
Embeds and unique= do not interact with the reasoner.
When the ORM isn't enough — sparql() and sparql_update()¶
The ctx.kg.query() / ctx.kg.update() methods on the context are
low-level: they accept a raw SPARQL string, offer no parameter binding,
and return untyped results. For a safer, more ergonomic drop to raw
SPARQL, use the top-level sparql() and sparql_update() functions:
from trails import sparql, sparql_update
@capability
def complex_query(ctx, min_age: int) -> list:
rows = sparql(ctx, """
SELECT ?name ?age WHERE {
?person a <urn:Person> ;
<urn:name> ?name ;
<urn:age> ?age .
FILTER(?age > $min_age)
}
""", min_age=min_age)
return [dict(r) for r in rows]
@capability
def bulk_tag(ctx, tag: str) -> dict:
inserted, deleted = sparql_update(ctx, """
INSERT DATA {
<urn:note/1> <urn:tag> $tag .
}
""", tag=tag)
return {"inserted": inserted, "deleted": deleted}
Parameter binding. Use $name placeholders in the query string and
pass the values as keyword arguments. Each value is encoded as a typed
XSD literal:
| Python type | XSD datatype |
|---|---|
str |
xsd:string |
int |
xsd:integer |
float |
xsd:double |
bool |
xsd:boolean |
datetime |
xsd:dateTime |
Missing placeholders and unused keyword arguments both raise
TrailsError at bind time — no silent mismatches.
Validation. sparql() runs the bound query through
sparql_proxy.validate_query() before execution: UPDATE keywords,
SERVICE federation, CONSTRUCT, and DESCRIBE are rejected with
UnsafeSparqlError. This is the same safety net that protects the
public SPARQL proxy endpoint.
Return shapes.
sparql()returnslist[dict[str, Any]]for SELECT queries andboolfor ASK queries (the[{"_boolean": True}]wrapper is unwrapped automatically).sparql_update()returnstuple[int, int]—(inserted, deleted)— matchingctx.kg.update().
Observability. Both functions emit a sparql_escape_hatch event
through the same observability pipeline as ORM operations, carrying
trace_id, principal, sparql_kind, and duration_ms.
When to use which.
| Need | Surface |
|---|---|
| Typed CRUD on a declared model | ctx.kg.add / Model.where / instance.save |
| Simple raw query, no params | ctx.kg.query(sparql) |
| Raw query with safe param binding | sparql(ctx, query, **params) |
| Raw update with safe param binding | sparql_update(ctx, query, **params) |
| Label-first nodes without a type | ctx.kg.node / ctx.kg.edge / ctx.kg.match |
The escape hatch is a feature, not a failure. Use it without apology
when the ORM does not cover your shape, and leave a
# TODO(promote to ORM) comment when the pattern recurs so the next
sprint can widen the surface.