Federation¶
Federation lets Trails instances share data over standard SPARQL
endpoints. Instead of bulk-exporting and re-importing, a remote instance
exposes a read-only /sparql route and any SPARQL client (including
another Trails instance) queries it directly. The full design lives in
ADR-0023; the
progressive-enhancement framing that keeps federation additive is
ADR-0021.
When to use federation¶
- Two Trails apps maintained by different teams need to cross-reference each other's graphs (e.g. a hospital evidence graph querying a research publication graph).
- An auditor instance needs live read access to a regulator's knowledge graph without importing a full snapshot.
- A literature-review tool federates across three domain-specific KG instances, each owned by a separate org.
If you only need to import data once, trails import is simpler. If you
need to invoke a remote capability (not just query data), see the
MCP capability relay (Phase 3) section
below.
Quickstart¶
Enable federation, expose the endpoint, and query it from a second instance:
from trails.federation import FederationConfig, FederationEndpoint
# On the server side — wrap an existing store.
config = FederationConfig(enabled=True, read_only=True, max_query_time_ms=30000)
endpoint = FederationEndpoint(store=ctx.kg._store, config=config)
# Execute a federated query directly (e.g. in tests).
result = endpoint.execute(
"SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10",
principal="remote-peer",
)
print(result)
# {"head": {"vars": ["s", "p", "o"]}, "results": {"bindings": [...]}}
In production the endpoint is mounted as HTTP routes (see below).
Enabling federation in trails.toml¶
Add a [federation] section:
[federation]
enabled = true
read_only = true # always true in Phase 1
max_query_time_ms = 30000 # 30-second timeout per query
When enabled = false (the default), no routes are registered and no
SPARQL endpoint is exposed. Standalone behaviour is unchanged —
federation is purely additive per ADR-0021.
Exposing your SPARQL endpoint (Phase 1)¶
The HTTP layer provides W3C SPARQL Protocol-compliant routes. Mount them on an existing FastAPI app:
from fastapi import FastAPI
from trails.federation import FederationConfig, FederationEndpoint
from trails.federation_http import mount_federation_routes
app = FastAPI()
config = FederationConfig(enabled=True)
endpoint = FederationEndpoint(store=ctx.kg._store, config=config)
mount_federation_routes(app, endpoint)
This registers three routes:
| Method | Path | Content-Type | Description |
|---|---|---|---|
GET |
/sparql?query=... |
application/sparql-results+json |
URL-encoded query parameter |
POST |
/sparql |
application/sparql-query body |
Query in request body |
GET |
/sparql/status |
application/json |
Endpoint status and config |
Content negotiation follows the SPARQL Protocol spec:
- SELECT / ASK respond with
application/sparql-results+json. - CONSTRUCT / DESCRIBE respond with
text/turtle.
Every response carries an X-Trails-Trace-Id header for provenance
correlation.
Accepted query forms¶
The federation endpoint is broader than the internal sparql_proxy
(which only allows SELECT/ASK). Federation accepts:
| Form | Allowed | Notes |
|---|---|---|
SELECT |
Yes | Standard result bindings |
ASK |
Yes | Boolean result |
CONSTRUCT |
Yes | Graph-producing query |
DESCRIBE |
Yes | Graph-producing query |
INSERT / DELETE / UPDATE |
No | Read-only — always rejected |
Forbidden keywords (INSERT, DELETE, LOAD, CLEAR, DROP, COPY,
MOVE, ADD, CREATE) are detected after stripping string literals,
IRIs, and comments so they cannot be smuggled inside a quoted value.
Configuring peers in trails.toml¶
Declare every remote instance your app needs to talk to under
[federation.peers]. Each peer has a URL, an optional MCP endpoint,
and a timeout:
[federation.peers.pharma]
url = "https://pharma.example/sparql"
mcp_url = "https://pharma.example/mcp" # optional — needed for Phase 3 relay
label = "Pharma KG"
trust = "verified"
timeout_ms = 30000 # per-query timeout (default 30s)
[federation.peers.regulatory]
url = "https://reg.example/sparql"
label = "Regulatory KG"
trust = "verified"
timeout_ms = 15000
Peer names (pharma, regulatory) are used as identifiers in SERVICE
queries, MCP relay calls, and CLI commands.
Querying remote peers with SERVICE (Phase 2)¶
The FederatedQueryEngine rewrites SPARQL queries that contain
SERVICE blocks. For each SERVICE URL it finds a matching peer, sends
the sub-query over HTTP, and merges the remote bindings with the local
results:
PREFIX : <https://myapp.example/>
SELECT ?drug ?interaction WHERE {
?drug :name "Aspirin" .
SERVICE <https://pharma.example/sparql> {
?drug :interactsWith ?interaction .
}
}
Python API¶
from trails.federation import FederatedQueryEngine
peers = {
"pharma": {
"url": "https://pharma.example/sparql",
"timeout_ms": 30000,
},
}
engine = FederatedQueryEngine(peers=peers, store=ctx.kg._store)
result = engine.execute(ctx, """
PREFIX : <https://myapp.example/>
SELECT ?drug ?interaction WHERE {
?drug :name "Aspirin" .
SERVICE <https://pharma.example/sparql> {
?drug :interactsWith ?interaction .
}
}
""")
for binding in result["results"]["bindings"]:
print(binding["drug"]["value"], binding["interaction"]["value"])
How SERVICE rewriting works¶
- The engine extracts all
SERVICE <url> { ... }blocks from the query. - For each block it resolves the URL against configured peers (raises
FederationQueryErrorif the URL is unknown). - The SERVICE body is wrapped in
SELECT * WHERE { ... }and sent to the remote peer via HTTP POST withContent-Type: application/sparql-query. - The local query (with SERVICE blocks removed) executes against the local store.
- Local and remote bindings are merged via compatible-variable join: shared variables must match, then rows are combined.
Remote queries carry the X-Trails-Principal header so the remote
instance can evaluate its own Cedar policies.
Cost tracking for remote queries¶
Every remote SERVICE dispatch records a "federation:remote_query" cost
entry with latency. When a CostTracker is provided to the engine:
Each remote hop is tracked with kind="federation:remote_query",
tokens=0, and the measured latency_ms.
MCP capability relay (Phase 3)¶
While SERVICE queries read data from remote graphs, MCP relay invokes capabilities on remote instances. A local capability can call a remote tool via the MCP protocol (JSON-RPC 2.0 over HTTP).
Configuration¶
Add mcp_url to any peer that exposes an MCP endpoint:
[federation.peers.warehouse]
url = "https://warehouse.internal:8000/sparql"
mcp_url = "https://warehouse.internal:8000/mcp"
label = "Data Warehouse"
timeout_ms = 30000
The relay() helper¶
Inside a @capability body, use relay() to invoke a remote tool:
from trails import capability
from trails.federation_mcp import relay
@capability("aggregate_report")
async def aggregate(ctx):
result = await relay(
ctx,
peer="warehouse",
tool="summarize_sales",
arguments={"year": 2026},
)
return result
relay() resolves the peer from trails.toml, sends a tools/call
JSON-RPC request, tracks cost as "federation:mcp_relay", and attaches
PROV-O provenance (prov:wasInformedBy linking local and remote trace
IDs).
Discovering remote capabilities¶
Before invoking, you can list what a peer offers:
from trails.federation_mcp import MCPRelayClient
client = MCPRelayClient(
federation_config=my_config,
principal="my-instance",
)
# List all tools on the warehouse peer
tools = client.list_tools("warehouse")
for t in tools:
print(t["name"], t.get("description", ""))
# Invoke a specific tool
result = client.invoke_tool(
"warehouse",
"summarize_sales",
arguments={"year": 2026},
)
print(result)
Cedar policy gating for relay¶
MCP relay uses action == Action::"mcp_relay" with a resource of
Trails::Federation::MCPRelay::"<peer>/<tool>". Example Cedar policy:
permit(
principal == User::"clinical-app",
action == Action::"mcp_relay",
resource == Trails::Federation::MCPRelay::"warehouse/summarize_sales"
);
When no policies are configured, relay proceeds without authorization checks.
Instance mesh: health checks and discovery (Phase 4)¶
The MeshManager and MeshMonitor provide continuous health monitoring
for all configured peers. The mesh layer sits below the query engine and
relay client, enabling health-aware routing and graceful degradation.
Health checks¶
from trails.federation_mesh import MeshManager
mesh = MeshManager(federation_config=my_config, timeout_s=5.0)
# Check a single peer
health = mesh.health_check("pharma")
print(f"{health.name}: {health.status} ({health.latency_ms}ms)")
print(f" SPARQL: {health.sparql_endpoint}, MCP: {health.mcp_endpoint}")
print(f" Remote capabilities: {health.capabilities_count}")
# Check all peers
statuses = mesh.health_check_all()
for name, h in statuses.items():
print(f"{name}: {h.status}")
Health status is one of "healthy" (all endpoints reachable),
"degraded" (some endpoints up), or "unreachable" (nothing
responds). A peer that fails 5 consecutive health checks is
soft-removed and excluded from query routing until it recovers.
Peer discovery¶
# Validate static peers + attempt DNS-SD discovery
discovered = mesh.discover_peers()
for peer in discovered:
print(f"{peer['name']} ({peer['source']}): "
f"{peer['url']} — reachable={peer['reachable']}")
DNS-SD queries _trails._tcp.local using stdlib socket. On networks
with mDNS responders, new Trails instances are discovered automatically.
This is best-effort; static config is always the primary source.
Peer selection strategies¶
When multiple peers can serve a query, use select_peer():
# Latency-based (default): prefer lowest-latency healthy peer
best = mesh.select_peer(strategy="latency")
# Round-robin: distribute load evenly
best = mesh.select_peer(strategy="round-robin")
Background monitoring with MeshMonitor¶
Run periodic health checks in a daemon thread:
from trails.federation_mesh import MeshMonitor
monitor = MeshMonitor(mesh_manager=mesh, interval_seconds=60)
monitor.start()
# ... your app runs ...
# Check status
print(f"Running: {monitor.is_running}, checks: {monitor.check_count}")
print(f"Soft-removed peers: {mesh.soft_removed_peers}")
print(f"Current status: {mesh.peer_status}")
# Shutdown
monitor.stop()
The monitor runs as a daemon thread — it dies when the process exits.
Between rounds it sleeps for interval_seconds and can be stopped
cleanly via stop().
Health-aware query routing¶
Pass a MeshManager to FederatedQueryEngine to enable automatic
skipping of unreachable peers:
When mesh_manager is set:
- SERVICE blocks targeting an unreachable or soft-removed peer are skipped with a warning (partial results instead of failure).
- If a remote query fails at runtime, the engine logs a warning and continues instead of raising, so queries degrade gracefully.
CLI commands¶
# Check health of all configured peers
trails federation status
# Discover peers (static + DNS-SD)
trails federation discover
# Ping a specific peer
trails federation ping pharma
Policy gating¶
Every federated query passes through Cedar policy evaluation before execution. The policy context uses:
| Field | Value |
|---|---|
principal |
From X-Trails-Principal header (default: "anonymous") |
action |
"sparql_query" |
resource.type |
"Trails::Federation::Endpoint" |
resource.id |
"sparql" |
A minimal Cedar policy that allows federation queries from authenticated principals:
permit(
principal,
action == Action::"sparql_query",
resource == Trails::Federation::Endpoint::"sparql"
) when {
principal != "anonymous"
};
A restrictive policy that limits federation to a specific peer identity:
permit(
principal == User::"pharma-instance",
action == Action::"sparql_query",
resource == Trails::Federation::Endpoint::"sparql"
);
When no policies are configured, the endpoint executes queries without
authorization checks. When policies are configured and the decision is
DENY, the endpoint returns HTTP 403.
See the Policy guide for Cedar syntax and the
@policy decorator.
Cost tracking¶
Federated queries participate in the cost envelope system (ADR-0012).
When a CostTracker is provided to the endpoint, every query records:
from trails.federation import FederationConfig, FederationEndpoint
endpoint = FederationEndpoint(
store=ctx.kg._store,
config=FederationConfig(enabled=True),
cost_tracker=my_cost_tracker,
)
Each tracked entry includes:
| Field | Value |
|---|---|
kind |
"federation:sparql_query" |
tokens |
0 (no LLM tokens — pure SPARQL) |
usd |
0.0 |
latency_ms |
Wall-clock execution time |
principal |
The requesting principal |
Remote SERVICE queries (Phase 2) and MCP relay calls (Phase 3) each
record their own cost entries (federation:remote_query and
federation:mcp_relay respectively) with measured latency.
Security considerations¶
-
Read-only by design. The
read_onlyflag is alwaysTruein Phase 1. UPDATE keywords are rejected at the lexical level before the query reaches the store. There is no code path that allows writes through the federation endpoint. -
Principal forwarding. The
X-Trails-Principalheader carries the requester's identity. The remote instance evaluates its own Cedar policies against this principal. DID + scoped biscuit tokens (ADR-0010, ADR-0011) are planned for a future hardening pass. -
Timeout enforcement.
max_query_time_msbounds how long any single query can execute. A slow or adversarial query is killed rather than allowed to monopolize the store. -
No data leakage. When Cedar policies are configured, unauthenticated or unauthorized queries are rejected before execution. The store is never queried for a denied principal.
-
SPARQL injection defence. String literals, IRIs, and comments are stripped before keyword scanning so forbidden keywords embedded in quoted values do not produce false positives or bypasses.
Example: two Trails instances sharing data¶
Instance A — research publications¶
# trails.toml on Instance A
[project]
name = "research-pubs"
[federation]
enabled = true
max_query_time_ms = 10000
# Load some publication data
from trails import capability, node_type
@node_type("Paper", fields={"title": str, "doi": str, "year": int})
class Paper: ...
@capability
def load_papers(ctx) -> dict:
ctx.kg.add(Paper(title="Drug X interactions", doi="10.1234/abc", year=2025))
ctx.kg.add(Paper(title="Drug Y meta-analysis", doi="10.1234/def", year=2026))
return {"loaded": 2}
Instance B — clinical evidence¶
# On Instance B, query Instance A's federation endpoint
import httpx
resp = httpx.post(
"https://research-pubs.example/sparql",
content='SELECT ?title ?doi WHERE { ?p a <trails://research-pubs/Paper> ; <trails://research-pubs/Paper/title> ?title ; <trails://research-pubs/Paper/doi> ?doi . }',
headers={
"Content-Type": "application/sparql-query",
"X-Trails-Principal": "clinical-evidence-instance",
},
)
results = resp.json()
for binding in results["results"]["bindings"]:
print(binding["title"]["value"], binding["doi"]["value"])
With Phase 2's FederatedQueryEngine, this becomes a single SERVICE
clause inside Instance B's own SPARQL queries — no manual HTTP calls:
from trails.federation import FederatedQueryEngine
peers = {
"research": {"url": "https://research-pubs.example/sparql", "timeout_ms": 10000},
}
engine = FederatedQueryEngine(peers=peers, store=ctx.kg._store)
result = engine.execute(ctx, """
SELECT ?title ?doi WHERE {
SERVICE <https://research-pubs.example/sparql> {
?p a <trails://research-pubs/Paper> ;
<trails://research-pubs/Paper/title> ?title ;
<trails://research-pubs/Paper/doi> ?doi .
}
}
""")
for binding in result["results"]["bindings"]:
print(binding["title"]["value"], binding["doi"]["value"])
Reference¶
| Symbol | Description |
|---|---|
FederationConfig(enabled, read_only, max_query_time_ms) |
Configuration dataclass; parsed from [federation] in trails.toml |
FederationEndpoint(store, config, cost_tracker, policies) |
Wraps a store and exposes SPARQL query evaluation |
FederationEndpoint.execute(sparql, *, principal, trace_id) |
Execute a query; returns SPARQL JSON Results |
FederatedQueryEngine(peers, store, cost_tracker, mesh_manager) |
Phase 2: rewrites SERVICE blocks and dispatches to remote peers |
FederatedQueryEngine.execute(ctx, query, *, principal) |
Execute a query with SERVICE rewriting; returns merged results |
validate_federation_query(sparql) |
Validate a query is read-only; returns the detected form or raises |
FederationQueryError |
Raised when a query fails validation |
mount_federation_routes(app, endpoint) |
Mount /sparql routes on a FastAPI app |
create_federation_router(endpoint) |
Create a standalone FastAPI router |
MCPRelayClient(federation_config, cost_tracker, policies, principal) |
Phase 3: client for invoking remote capabilities via MCP |
MCPRelayClient.list_tools(peer_name) |
Discover remote capabilities via MCP tools/list |
MCPRelayClient.invoke_tool(peer_name, tool_name, arguments) |
Invoke a remote tool via MCP tools/call |
MCPRelayError |
Raised when an MCP relay operation fails |
relay(ctx, *, peer, tool, arguments, policies) |
Phase 3: async convenience helper for use inside @capability bodies |
MeshManager(federation_config, timeout_s) |
Phase 4: manages health checks, discovery, and peer selection |
MeshManager.health_check(peer_name) |
Run a health check against a single peer; returns PeerHealth |
MeshManager.health_check_all() |
Health-check all configured peers |
MeshManager.discover_peers() |
Validate static peers + DNS-SD discovery |
MeshManager.select_peer(strategy) |
Select best peer by "latency" or "round-robin" |
MeshManager.is_peer_healthy(peer_name) |
Check cached health status of a peer |
MeshMonitor(mesh_manager, interval_seconds) |
Phase 4: background daemon thread for periodic health checks |
PeerHealth |
.name, .url, .status, .latency_ms, .last_checked, .sparql_endpoint, .mcp_endpoint |
MeshError |
Raised when a mesh operation fails |
SchemaAdvertisement |
Phase 5: a peer's published schema (node types, predicates, capabilities) |
NodeTypeInfo |
One node type in the schema advertisement |
PredicateInfo |
One predicate discovered in the store |
build_local_schema(store, project_name, base_iri) |
Build schema advertisement from local state |
schema_to_json(schema) / schema_from_json(data) |
Serialization / deserialization |
fetch_peer_schema(peer_url, timeout) |
Fetch schema from a remote peer's /schema endpoint |
SchemaCache(ttl_seconds) |
In-memory cache of peer schemas with TTL |
MeshManager.schema_cache |
Schema cache populated during discover_peers() |
register_schema_resource() |
Register the trails://schema MCP resource |
Ontology exchange: schema advertisement and discovery (Phase 5)¶
When two Trails instances federate, they can query each other's data — but without knowing each other's schemas, writing correct SERVICE queries requires out-of-band knowledge. Ontology exchange solves this by letting each instance publish its schema as a JSON document.
How it works¶
Every Trails instance with federation enabled exposes a GET /schema
endpoint that returns a schema advertisement — a JSON document
listing all registered @node_type definitions, SHACL constraints from
@shape, registered capabilities, and data-discovered predicates:
from trails.federation_schema import build_local_schema, schema_to_json
# Build the advertisement from the local state
schema = build_local_schema(store=ctx.kg._store, project_name="myapp", base_iri="trails://myapp")
print(schema_to_json(schema))
# {
# "instance_name": "myapp",
# "base_iri": "trails://myapp",
# "version": "0.1.0a0",
# "node_types": [
# {"name": "Patient", "iri": "trails://myapp/Patient", "fields": {"name": "str", "age": "int"}, "constraints": {}},
# ],
# "predicates": [...],
# "capabilities": ["load_data", "analyze"],
# "generated_at": "2026-04-18T12:00:00+00:00"
# }
Fetching a peer's schema¶
Use fetch_peer_schema() to retrieve a remote peer's schema:
from trails.federation_schema import fetch_peer_schema
schema = fetch_peer_schema("https://pharma.example")
if schema:
print(f"Peer has {len(schema.node_types)} node types:")
for nt in schema.node_types:
print(f" {nt.name}: {list(nt.fields.keys())}")
The function returns None on any failure (logged, never raises),
making it safe for best-effort discovery.
Schema cache¶
The SchemaCache provides TTL-based in-memory caching to avoid
redundant fetches:
from trails.federation_schema import SchemaCache
cache = SchemaCache(ttl_seconds=300) # 5-minute TTL
cache.refresh("pharma", "https://pharma.example")
# Later — returns cached schema without HTTP call
schema = cache.get("pharma")
The mesh manager (MeshManager) includes a schema cache that is
populated automatically during discover_peers().
MCP resource¶
The schema is also available as an MCP resource at trails://schema:
from trails.mcp_resources import register_schema_resource, read_resource
register_schema_resource()
result = read_resource("trails://schema")
# Returns JSON text with the schema advertisement
CLI commands¶
# Show local schema
trails federation schema
# Show as raw JSON
trails federation schema --json
# Fetch a remote peer's schema
trails federation schema --peer pharma
Integration with discover_peers¶
When MeshManager.discover_peers() finds a reachable peer, it
automatically fetches the peer's /schema endpoint and caches the
result. The returned peer dict includes a "schema" key:
from trails.federation_mesh import MeshManager
mesh = MeshManager(federation_config=my_config)
discovered = mesh.discover_peers()
for peer in discovered:
if peer["schema"]:
print(f"{peer['name']} has {len(peer['schema'].node_types)} node types")
else:
print(f"{peer['name']} — schema not available")
FederationPeer proxy (lazy connections)¶
FederationPeer (in trails.federation_proxy) delays connection,
health probing, and schema fetching until the peer is actually used.
This avoids startup failures when remote peers are offline and skips
unnecessary network calls for configured-but-unused peers.
from trails.federation_proxy import FederationPeer
peer = FederationPeer("analytics", "http://analytics:8000")
# No network call yet.
peer.schema # Fetches schema on first access, caches it.
peer.healthy # Probes health on first access, caches it.
peer.refresh() # Clears cache so next access re-probes.
Schema fingerprints¶
Bloom-filter-based schema fingerprints (~128 bytes per peer) enable
fast overlap estimation without transferring full schemas. Built in
M20 Phase 4 (b5ee0ed).
from trails.federation_schema import build_local_schema
schema = build_local_schema(store=ctx.kg._store, project_name="myapp")
fingerprint = schema.fingerprint() # Bloom filter, hex-serializable
overlap = fingerprint.estimate_overlap(remote_fingerprint)
Schema registry¶
A content-addressed schema registry (SchemaRegistry) stores schemas
by CID (SHA-256 hash). Peers publish their schemas; consumers query
and resolve by CID. Includes a stdlib HTTP server for self-hosting.
from trails.federation_registry import SchemaRegistry
registry = SchemaRegistry()
cid = registry.publish(schema)
resolved = registry.resolve(cid)
Gossip discovery¶
Epidemic gossip with TTL and fan-out propagates peer information across the mesh without a central coordinator. Kademlia XOR-distance routing with k-buckets enables efficient iterative lookup in large networks.
from trails.federation_gossip import GossipManager
gossip = GossipManager(local_peer=my_peer, fan_out=3, ttl=5)
gossip.announce() # Propagate to neighbors
Shared vocabularies¶
Six built-in vocabularies (schema.org, Dublin Core, FHIR, DCAT, PROV-O, SKOS) enable cross-instance interoperability checks.
from trails.federation_vocab import check_compatibility, vocabulary_coverage
compat = check_compatibility(local_schema, remote_schema)
coverage = vocabulary_coverage(schema, vocabulary="schema.org")
See also¶
- ADR-0023 — full federation and instance mesh design
- ADR-0031 — ontology exchange design decision
- Policy & Authorization — Cedar policy syntax
- Observability — cost tracking and event hooks
- MCP Integration — the transport layer that capability relay (Phase 3) builds on