Skip to content

Federation

Federation lets Trails instances share data over standard SPARQL endpoints. Instead of bulk-exporting and re-importing, a remote instance exposes a read-only /sparql route and any SPARQL client (including another Trails instance) queries it directly. The full design lives in ADR-0023; the progressive-enhancement framing that keeps federation additive is ADR-0021.

When to use federation

  • Two Trails apps maintained by different teams need to cross-reference each other's graphs (e.g. a hospital evidence graph querying a research publication graph).
  • An auditor instance needs live read access to a regulator's knowledge graph without importing a full snapshot.
  • A literature-review tool federates across three domain-specific KG instances, each owned by a separate org.

If you only need to import data once, trails import is simpler. If you need to invoke a remote capability (not just query data), see the MCP capability relay (Phase 3) section below.

Quickstart

Enable federation, expose the endpoint, and query it from a second instance:

from trails.federation import FederationConfig, FederationEndpoint

# On the server side — wrap an existing store.
config = FederationConfig(enabled=True, read_only=True, max_query_time_ms=30000)
endpoint = FederationEndpoint(store=ctx.kg._store, config=config)

# Execute a federated query directly (e.g. in tests).
result = endpoint.execute(
    "SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 10",
    principal="remote-peer",
)
print(result)
# {"head": {"vars": ["s", "p", "o"]}, "results": {"bindings": [...]}}

In production the endpoint is mounted as HTTP routes (see below).

Enabling federation in trails.toml

Add a [federation] section:

[federation]
enabled = true
read_only = true           # always true in Phase 1
max_query_time_ms = 30000  # 30-second timeout per query

When enabled = false (the default), no routes are registered and no SPARQL endpoint is exposed. Standalone behaviour is unchanged — federation is purely additive per ADR-0021.

Exposing your SPARQL endpoint (Phase 1)

The HTTP layer provides W3C SPARQL Protocol-compliant routes. Mount them on an existing FastAPI app:

from fastapi import FastAPI
from trails.federation import FederationConfig, FederationEndpoint
from trails.federation_http import mount_federation_routes

app = FastAPI()
config = FederationConfig(enabled=True)
endpoint = FederationEndpoint(store=ctx.kg._store, config=config)
mount_federation_routes(app, endpoint)

This registers three routes:

Method Path Content-Type Description
GET /sparql?query=... application/sparql-results+json URL-encoded query parameter
POST /sparql application/sparql-query body Query in request body
GET /sparql/status application/json Endpoint status and config

Content negotiation follows the SPARQL Protocol spec:

  • SELECT / ASK respond with application/sparql-results+json.
  • CONSTRUCT / DESCRIBE respond with text/turtle.

Every response carries an X-Trails-Trace-Id header for provenance correlation.

Accepted query forms

The federation endpoint is broader than the internal sparql_proxy (which only allows SELECT/ASK). Federation accepts:

Form Allowed Notes
SELECT Yes Standard result bindings
ASK Yes Boolean result
CONSTRUCT Yes Graph-producing query
DESCRIBE Yes Graph-producing query
INSERT / DELETE / UPDATE No Read-only — always rejected

Forbidden keywords (INSERT, DELETE, LOAD, CLEAR, DROP, COPY, MOVE, ADD, CREATE) are detected after stripping string literals, IRIs, and comments so they cannot be smuggled inside a quoted value.

Configuring peers in trails.toml

Declare every remote instance your app needs to talk to under [federation.peers]. Each peer has a URL, an optional MCP endpoint, and a timeout:

[federation.peers.pharma]
url = "https://pharma.example/sparql"
mcp_url = "https://pharma.example/mcp"   # optional — needed for Phase 3 relay
label = "Pharma KG"
trust = "verified"
timeout_ms = 30000                         # per-query timeout (default 30s)

[federation.peers.regulatory]
url = "https://reg.example/sparql"
label = "Regulatory KG"
trust = "verified"
timeout_ms = 15000

Peer names (pharma, regulatory) are used as identifiers in SERVICE queries, MCP relay calls, and CLI commands.

Querying remote peers with SERVICE (Phase 2)

The FederatedQueryEngine rewrites SPARQL queries that contain SERVICE blocks. For each SERVICE URL it finds a matching peer, sends the sub-query over HTTP, and merges the remote bindings with the local results:

PREFIX : <https://myapp.example/>

SELECT ?drug ?interaction WHERE {
  ?drug :name "Aspirin" .
  SERVICE <https://pharma.example/sparql> {
    ?drug :interactsWith ?interaction .
  }
}

Python API

from trails.federation import FederatedQueryEngine

peers = {
    "pharma": {
        "url": "https://pharma.example/sparql",
        "timeout_ms": 30000,
    },
}

engine = FederatedQueryEngine(peers=peers, store=ctx.kg._store)
result = engine.execute(ctx, """
    PREFIX : <https://myapp.example/>
    SELECT ?drug ?interaction WHERE {
      ?drug :name "Aspirin" .
      SERVICE <https://pharma.example/sparql> {
        ?drug :interactsWith ?interaction .
      }
    }
""")
for binding in result["results"]["bindings"]:
    print(binding["drug"]["value"], binding["interaction"]["value"])

How SERVICE rewriting works

  1. The engine extracts all SERVICE <url> { ... } blocks from the query.
  2. For each block it resolves the URL against configured peers (raises FederationQueryError if the URL is unknown).
  3. The SERVICE body is wrapped in SELECT * WHERE { ... } and sent to the remote peer via HTTP POST with Content-Type: application/sparql-query.
  4. The local query (with SERVICE blocks removed) executes against the local store.
  5. Local and remote bindings are merged via compatible-variable join: shared variables must match, then rows are combined.

Remote queries carry the X-Trails-Principal header so the remote instance can evaluate its own Cedar policies.

Cost tracking for remote queries

Every remote SERVICE dispatch records a "federation:remote_query" cost entry with latency. When a CostTracker is provided to the engine:

engine = FederatedQueryEngine(
    peers=peers,
    store=ctx.kg._store,
    cost_tracker=my_tracker,
)

Each remote hop is tracked with kind="federation:remote_query", tokens=0, and the measured latency_ms.

MCP capability relay (Phase 3)

While SERVICE queries read data from remote graphs, MCP relay invokes capabilities on remote instances. A local capability can call a remote tool via the MCP protocol (JSON-RPC 2.0 over HTTP).

Configuration

Add mcp_url to any peer that exposes an MCP endpoint:

[federation.peers.warehouse]
url = "https://warehouse.internal:8000/sparql"
mcp_url = "https://warehouse.internal:8000/mcp"
label = "Data Warehouse"
timeout_ms = 30000

The relay() helper

Inside a @capability body, use relay() to invoke a remote tool:

from trails import capability
from trails.federation_mcp import relay

@capability("aggregate_report")
async def aggregate(ctx):
    result = await relay(
        ctx,
        peer="warehouse",
        tool="summarize_sales",
        arguments={"year": 2026},
    )
    return result

relay() resolves the peer from trails.toml, sends a tools/call JSON-RPC request, tracks cost as "federation:mcp_relay", and attaches PROV-O provenance (prov:wasInformedBy linking local and remote trace IDs).

Discovering remote capabilities

Before invoking, you can list what a peer offers:

from trails.federation_mcp import MCPRelayClient

client = MCPRelayClient(
    federation_config=my_config,
    principal="my-instance",
)

# List all tools on the warehouse peer
tools = client.list_tools("warehouse")
for t in tools:
    print(t["name"], t.get("description", ""))

# Invoke a specific tool
result = client.invoke_tool(
    "warehouse",
    "summarize_sales",
    arguments={"year": 2026},
)
print(result)

Cedar policy gating for relay

MCP relay uses action == Action::"mcp_relay" with a resource of Trails::Federation::MCPRelay::"<peer>/<tool>". Example Cedar policy:

permit(
  principal == User::"clinical-app",
  action == Action::"mcp_relay",
  resource == Trails::Federation::MCPRelay::"warehouse/summarize_sales"
);

When no policies are configured, relay proceeds without authorization checks.

Instance mesh: health checks and discovery (Phase 4)

The MeshManager and MeshMonitor provide continuous health monitoring for all configured peers. The mesh layer sits below the query engine and relay client, enabling health-aware routing and graceful degradation.

Health checks

from trails.federation_mesh import MeshManager

mesh = MeshManager(federation_config=my_config, timeout_s=5.0)

# Check a single peer
health = mesh.health_check("pharma")
print(f"{health.name}: {health.status} ({health.latency_ms}ms)")
print(f"  SPARQL: {health.sparql_endpoint}, MCP: {health.mcp_endpoint}")
print(f"  Remote capabilities: {health.capabilities_count}")

# Check all peers
statuses = mesh.health_check_all()
for name, h in statuses.items():
    print(f"{name}: {h.status}")

Health status is one of "healthy" (all endpoints reachable), "degraded" (some endpoints up), or "unreachable" (nothing responds). A peer that fails 5 consecutive health checks is soft-removed and excluded from query routing until it recovers.

Peer discovery

# Validate static peers + attempt DNS-SD discovery
discovered = mesh.discover_peers()
for peer in discovered:
    print(f"{peer['name']} ({peer['source']}): "
          f"{peer['url']} — reachable={peer['reachable']}")

DNS-SD queries _trails._tcp.local using stdlib socket. On networks with mDNS responders, new Trails instances are discovered automatically. This is best-effort; static config is always the primary source.

Peer selection strategies

When multiple peers can serve a query, use select_peer():

# Latency-based (default): prefer lowest-latency healthy peer
best = mesh.select_peer(strategy="latency")

# Round-robin: distribute load evenly
best = mesh.select_peer(strategy="round-robin")

Background monitoring with MeshMonitor

Run periodic health checks in a daemon thread:

from trails.federation_mesh import MeshMonitor

monitor = MeshMonitor(mesh_manager=mesh, interval_seconds=60)
monitor.start()

# ... your app runs ...

# Check status
print(f"Running: {monitor.is_running}, checks: {monitor.check_count}")
print(f"Soft-removed peers: {mesh.soft_removed_peers}")
print(f"Current status: {mesh.peer_status}")

# Shutdown
monitor.stop()

The monitor runs as a daemon thread — it dies when the process exits. Between rounds it sleeps for interval_seconds and can be stopped cleanly via stop().

Health-aware query routing

Pass a MeshManager to FederatedQueryEngine to enable automatic skipping of unreachable peers:

engine = FederatedQueryEngine(
    peers=peers,
    store=ctx.kg._store,
    mesh_manager=mesh,
)

When mesh_manager is set:

  • SERVICE blocks targeting an unreachable or soft-removed peer are skipped with a warning (partial results instead of failure).
  • If a remote query fails at runtime, the engine logs a warning and continues instead of raising, so queries degrade gracefully.

CLI commands

# Check health of all configured peers
trails federation status

# Discover peers (static + DNS-SD)
trails federation discover

# Ping a specific peer
trails federation ping pharma

Policy gating

Every federated query passes through Cedar policy evaluation before execution. The policy context uses:

Field Value
principal From X-Trails-Principal header (default: "anonymous")
action "sparql_query"
resource.type "Trails::Federation::Endpoint"
resource.id "sparql"

A minimal Cedar policy that allows federation queries from authenticated principals:

permit(
  principal,
  action == Action::"sparql_query",
  resource == Trails::Federation::Endpoint::"sparql"
) when {
  principal != "anonymous"
};

A restrictive policy that limits federation to a specific peer identity:

permit(
  principal == User::"pharma-instance",
  action == Action::"sparql_query",
  resource == Trails::Federation::Endpoint::"sparql"
);

When no policies are configured, the endpoint executes queries without authorization checks. When policies are configured and the decision is DENY, the endpoint returns HTTP 403.

See the Policy guide for Cedar syntax and the @policy decorator.

Cost tracking

Federated queries participate in the cost envelope system (ADR-0012). When a CostTracker is provided to the endpoint, every query records:

from trails.federation import FederationConfig, FederationEndpoint

endpoint = FederationEndpoint(
    store=ctx.kg._store,
    config=FederationConfig(enabled=True),
    cost_tracker=my_cost_tracker,
)

Each tracked entry includes:

Field Value
kind "federation:sparql_query"
tokens 0 (no LLM tokens — pure SPARQL)
usd 0.0
latency_ms Wall-clock execution time
principal The requesting principal

Remote SERVICE queries (Phase 2) and MCP relay calls (Phase 3) each record their own cost entries (federation:remote_query and federation:mcp_relay respectively) with measured latency.

Security considerations

  • Read-only by design. The read_only flag is always True in Phase 1. UPDATE keywords are rejected at the lexical level before the query reaches the store. There is no code path that allows writes through the federation endpoint.

  • Principal forwarding. The X-Trails-Principal header carries the requester's identity. The remote instance evaluates its own Cedar policies against this principal. DID + scoped biscuit tokens (ADR-0010, ADR-0011) are planned for a future hardening pass.

  • Timeout enforcement. max_query_time_ms bounds how long any single query can execute. A slow or adversarial query is killed rather than allowed to monopolize the store.

  • No data leakage. When Cedar policies are configured, unauthenticated or unauthorized queries are rejected before execution. The store is never queried for a denied principal.

  • SPARQL injection defence. String literals, IRIs, and comments are stripped before keyword scanning so forbidden keywords embedded in quoted values do not produce false positives or bypasses.

Example: two Trails instances sharing data

Instance A — research publications

# trails.toml on Instance A
[project]
name = "research-pubs"

[federation]
enabled = true
max_query_time_ms = 10000
# Load some publication data
from trails import capability, node_type

@node_type("Paper", fields={"title": str, "doi": str, "year": int})
class Paper: ...

@capability
def load_papers(ctx) -> dict:
    ctx.kg.add(Paper(title="Drug X interactions", doi="10.1234/abc", year=2025))
    ctx.kg.add(Paper(title="Drug Y meta-analysis", doi="10.1234/def", year=2026))
    return {"loaded": 2}

Instance B — clinical evidence

# On Instance B, query Instance A's federation endpoint
import httpx

resp = httpx.post(
    "https://research-pubs.example/sparql",
    content='SELECT ?title ?doi WHERE { ?p a <trails://research-pubs/Paper> ; <trails://research-pubs/Paper/title> ?title ; <trails://research-pubs/Paper/doi> ?doi . }',
    headers={
        "Content-Type": "application/sparql-query",
        "X-Trails-Principal": "clinical-evidence-instance",
    },
)
results = resp.json()
for binding in results["results"]["bindings"]:
    print(binding["title"]["value"], binding["doi"]["value"])

With Phase 2's FederatedQueryEngine, this becomes a single SERVICE clause inside Instance B's own SPARQL queries — no manual HTTP calls:

from trails.federation import FederatedQueryEngine

peers = {
    "research": {"url": "https://research-pubs.example/sparql", "timeout_ms": 10000},
}
engine = FederatedQueryEngine(peers=peers, store=ctx.kg._store)

result = engine.execute(ctx, """
    SELECT ?title ?doi WHERE {
      SERVICE <https://research-pubs.example/sparql> {
        ?p a <trails://research-pubs/Paper> ;
           <trails://research-pubs/Paper/title> ?title ;
           <trails://research-pubs/Paper/doi> ?doi .
      }
    }
""")
for binding in result["results"]["bindings"]:
    print(binding["title"]["value"], binding["doi"]["value"])

Reference

Symbol Description
FederationConfig(enabled, read_only, max_query_time_ms) Configuration dataclass; parsed from [federation] in trails.toml
FederationEndpoint(store, config, cost_tracker, policies) Wraps a store and exposes SPARQL query evaluation
FederationEndpoint.execute(sparql, *, principal, trace_id) Execute a query; returns SPARQL JSON Results
FederatedQueryEngine(peers, store, cost_tracker, mesh_manager) Phase 2: rewrites SERVICE blocks and dispatches to remote peers
FederatedQueryEngine.execute(ctx, query, *, principal) Execute a query with SERVICE rewriting; returns merged results
validate_federation_query(sparql) Validate a query is read-only; returns the detected form or raises
FederationQueryError Raised when a query fails validation
mount_federation_routes(app, endpoint) Mount /sparql routes on a FastAPI app
create_federation_router(endpoint) Create a standalone FastAPI router
MCPRelayClient(federation_config, cost_tracker, policies, principal) Phase 3: client for invoking remote capabilities via MCP
MCPRelayClient.list_tools(peer_name) Discover remote capabilities via MCP tools/list
MCPRelayClient.invoke_tool(peer_name, tool_name, arguments) Invoke a remote tool via MCP tools/call
MCPRelayError Raised when an MCP relay operation fails
relay(ctx, *, peer, tool, arguments, policies) Phase 3: async convenience helper for use inside @capability bodies
MeshManager(federation_config, timeout_s) Phase 4: manages health checks, discovery, and peer selection
MeshManager.health_check(peer_name) Run a health check against a single peer; returns PeerHealth
MeshManager.health_check_all() Health-check all configured peers
MeshManager.discover_peers() Validate static peers + DNS-SD discovery
MeshManager.select_peer(strategy) Select best peer by "latency" or "round-robin"
MeshManager.is_peer_healthy(peer_name) Check cached health status of a peer
MeshMonitor(mesh_manager, interval_seconds) Phase 4: background daemon thread for periodic health checks
PeerHealth .name, .url, .status, .latency_ms, .last_checked, .sparql_endpoint, .mcp_endpoint
MeshError Raised when a mesh operation fails
SchemaAdvertisement Phase 5: a peer's published schema (node types, predicates, capabilities)
NodeTypeInfo One node type in the schema advertisement
PredicateInfo One predicate discovered in the store
build_local_schema(store, project_name, base_iri) Build schema advertisement from local state
schema_to_json(schema) / schema_from_json(data) Serialization / deserialization
fetch_peer_schema(peer_url, timeout) Fetch schema from a remote peer's /schema endpoint
SchemaCache(ttl_seconds) In-memory cache of peer schemas with TTL
MeshManager.schema_cache Schema cache populated during discover_peers()
register_schema_resource() Register the trails://schema MCP resource

Ontology exchange: schema advertisement and discovery (Phase 5)

When two Trails instances federate, they can query each other's data — but without knowing each other's schemas, writing correct SERVICE queries requires out-of-band knowledge. Ontology exchange solves this by letting each instance publish its schema as a JSON document.

How it works

Every Trails instance with federation enabled exposes a GET /schema endpoint that returns a schema advertisement — a JSON document listing all registered @node_type definitions, SHACL constraints from @shape, registered capabilities, and data-discovered predicates:

from trails.federation_schema import build_local_schema, schema_to_json

# Build the advertisement from the local state
schema = build_local_schema(store=ctx.kg._store, project_name="myapp", base_iri="trails://myapp")
print(schema_to_json(schema))
# {
#   "instance_name": "myapp",
#   "base_iri": "trails://myapp",
#   "version": "0.1.0a0",
#   "node_types": [
#     {"name": "Patient", "iri": "trails://myapp/Patient", "fields": {"name": "str", "age": "int"}, "constraints": {}},
#   ],
#   "predicates": [...],
#   "capabilities": ["load_data", "analyze"],
#   "generated_at": "2026-04-18T12:00:00+00:00"
# }

Fetching a peer's schema

Use fetch_peer_schema() to retrieve a remote peer's schema:

from trails.federation_schema import fetch_peer_schema

schema = fetch_peer_schema("https://pharma.example")
if schema:
    print(f"Peer has {len(schema.node_types)} node types:")
    for nt in schema.node_types:
        print(f"  {nt.name}: {list(nt.fields.keys())}")

The function returns None on any failure (logged, never raises), making it safe for best-effort discovery.

Schema cache

The SchemaCache provides TTL-based in-memory caching to avoid redundant fetches:

from trails.federation_schema import SchemaCache

cache = SchemaCache(ttl_seconds=300)  # 5-minute TTL
cache.refresh("pharma", "https://pharma.example")

# Later — returns cached schema without HTTP call
schema = cache.get("pharma")

The mesh manager (MeshManager) includes a schema cache that is populated automatically during discover_peers().

MCP resource

The schema is also available as an MCP resource at trails://schema:

from trails.mcp_resources import register_schema_resource, read_resource

register_schema_resource()
result = read_resource("trails://schema")
# Returns JSON text with the schema advertisement

CLI commands

# Show local schema
trails federation schema

# Show as raw JSON
trails federation schema --json

# Fetch a remote peer's schema
trails federation schema --peer pharma

Integration with discover_peers

When MeshManager.discover_peers() finds a reachable peer, it automatically fetches the peer's /schema endpoint and caches the result. The returned peer dict includes a "schema" key:

from trails.federation_mesh import MeshManager

mesh = MeshManager(federation_config=my_config)
discovered = mesh.discover_peers()
for peer in discovered:
    if peer["schema"]:
        print(f"{peer['name']} has {len(peer['schema'].node_types)} node types")
    else:
        print(f"{peer['name']} — schema not available")

FederationPeer proxy (lazy connections)

FederationPeer (in trails.federation_proxy) delays connection, health probing, and schema fetching until the peer is actually used. This avoids startup failures when remote peers are offline and skips unnecessary network calls for configured-but-unused peers.

from trails.federation_proxy import FederationPeer

peer = FederationPeer("analytics", "http://analytics:8000")
# No network call yet.

peer.schema    # Fetches schema on first access, caches it.
peer.healthy   # Probes health on first access, caches it.
peer.refresh() # Clears cache so next access re-probes.

Schema fingerprints

Bloom-filter-based schema fingerprints (~128 bytes per peer) enable fast overlap estimation without transferring full schemas. Built in M20 Phase 4 (b5ee0ed).

from trails.federation_schema import build_local_schema

schema = build_local_schema(store=ctx.kg._store, project_name="myapp")
fingerprint = schema.fingerprint()  # Bloom filter, hex-serializable
overlap = fingerprint.estimate_overlap(remote_fingerprint)

Schema registry

A content-addressed schema registry (SchemaRegistry) stores schemas by CID (SHA-256 hash). Peers publish their schemas; consumers query and resolve by CID. Includes a stdlib HTTP server for self-hosting.

from trails.federation_registry import SchemaRegistry

registry = SchemaRegistry()
cid = registry.publish(schema)
resolved = registry.resolve(cid)

Gossip discovery

Epidemic gossip with TTL and fan-out propagates peer information across the mesh without a central coordinator. Kademlia XOR-distance routing with k-buckets enables efficient iterative lookup in large networks.

from trails.federation_gossip import GossipManager

gossip = GossipManager(local_peer=my_peer, fan_out=3, ttl=5)
gossip.announce()  # Propagate to neighbors

Shared vocabularies

Six built-in vocabularies (schema.org, Dublin Core, FHIR, DCAT, PROV-O, SKOS) enable cross-instance interoperability checks.

from trails.federation_vocab import check_compatibility, vocabulary_coverage

compat = check_compatibility(local_schema, remote_schema)
coverage = vocabulary_coverage(schema, vocabulary="schema.org")

See also