Skip to content

Live Schema Inference

trails.schema_watcher observes kg_write events and incrementally builds per-type field statistics. It detects new types, new fields, type drift (a field's inferred Python type changes), and cardinality changes (single-valued to multi-valued). The watcher is opt-in and runs at near-zero cost (dict updates, no SPARQL).

See ADR-0039 for the full design rationale.

Quick start

from trails.schema_watcher import SchemaWatcher

watcher = SchemaWatcher(ctx, min_samples=5)
watcher.start()

# ... normal ctx.kg usage — the watcher observes in the background ...

# Check for schema anomalies
for alert in watcher.get_alerts():
    print(f"[{alert.alert_type}] {alert.type_name}.{alert.field_name}: {alert.details}")

# Get @node_type suggestions for types with enough samples
for suggestion in watcher.get_suggestions():
    print(suggestion.suggested_node_type_code)
    # @node_type("Person", fields={
    #     "name": str,
    #     "age": int,
    # })
    # class Person:
    #     pass

# Summary statistics
stats = watcher.get_stats()
print(f"Types observed: {stats['types_observed']}")

watcher.stop()

Alert types

Alert Trigger Suggested action
new_type First write of an unregistered type Consider defining a @node_type
new_field New field appears on a known type after multiple instances Verify and update the @node_type definition
type_drift A field's dominant Python type changes Check whether the field type should be unified
cardinality_change Single-valued field becomes multi-valued Update definition to use list[T]

Key types

Type Description
SchemaWatcher Main class: start(), stop(), get_alerts(), get_suggestions(), get_stats()
SchemaAlert Alert with alert_type, type_name, field_name, details, suggested_action
SchemaSuggestion Suggested @node_type definition: type_name, fields, confidence, suggested_node_type_code

API

Method Signature Description
SchemaWatcher() (ctx=None, *, min_samples=5, alert_callback=None) Create a watcher; min_samples controls suggestion threshold
.start() () -> None Register as an observability observer on kg_write events
.stop() () -> None Unregister the observer
.get_alerts() () -> list[SchemaAlert] Return all alerts emitted so far
.get_suggestions() () -> list[SchemaSuggestion] Return suggestions for types with enough samples
.get_stats() () -> dict[str, Any] Summary: types observed, fields per type, sample counts