Memory Security & Trust¶
Agent memory is a high-value target. A compromised or misbehaving agent
can spoof identities, inflate confidence scores, poison shared
knowledge, or leak confidential facts to federation peers. The memory
security layer (trails.memory_security) and trust boundary system
(trails.memory_trust) enforce invariants that prevent these attacks
at the framework level, transparently to agents using trails.invoke().
The full designs live in ADR-0052 (security gateway) and ADR-0053 (trust boundaries).
Threat model¶
| Threat | Mitigation |
|---|---|
| Identity spoofing -- agent claims another agent's DID | Gateway injects identity from authenticated session; agents cannot set agent_did |
Confidence inflation -- agent sets confidence: 1.0 to dominate recall |
Trust-level caps; correction-rate adjustment reduces effective trust |
| Knowledge poisoning -- agent writes false facts to shared memory | Cross-agent correction requires Cedar policy; provenance hash chain detects tampering |
| Data leakage -- confidential facts exported to untrusted peers | Classification-aware federation gate; export filtering by trust level |
| Taint propagation -- inference based on retracted fact remains trusted | Contamination tracker propagates taint through supports links |
Memory Gateway¶
MemoryGateway is the trusted intermediary between agents and the
MemoryStore. All memory operations pass through it:
from trails.memory_security import MemoryGateway, MemorySession
session = MemorySession(principal_did="did:key:alice", trust_level="authenticated")
gw = MemoryGateway(memory_store, session)
# Learn -- gateway injects identity, caps confidence, chains provenance
iri = gw.learn(
"Deploy key rotates weekly",
confidence_hint=0.9,
topic="ops",
source="observed in CI config",
ttl_seconds=86400, # expires in 24h
)
# Recall -- delegates to store, adds audit entry
facts = gw.recall("deploy key rotation", topic="ops")
# Correct -- enforces authorization for cross-agent corrections
gw.correct(iri, "Deploy key rotates daily", reason="updated schedule")
# Forget -- soft delete with audit trail
gw.forget(iri, reason="no longer relevant")
What the gateway enforces¶
- Identity injection.
agent_did,timestamp, and provenance are set from the authenticatedMemorySession. Agents cannot override these fields. - Confidence calibration.
confidence_hintis capped by the agent's trust level (see below). - Correction authorization. Cross-agent corrections require explicit Cedar policy.
- Provenance hash chain. Every operation appends a SHA-256-chained record for tamper evidence.
- Audit logging. Every operation is logged with agent DID, action, timestamp, and detail.
- TTL tracking. Facts with
ttl_secondsare tracked for expiry-based garbage collection.
Identity enforcement¶
MemorySession carries the agent's DID (extracted from Biscuit token
or MCP handshake) and trust level. The gateway reads these -- agents
cannot override them:
from trails.memory_security import MemorySession
session = MemorySession(
principal_did="did:key:z6Mk...",
trust_level="established",
roles=["analyst"],
)
# Properties
session.trust_level_float # 0.9
session.is_anonymous # False
Anonymous sessions (did:key:anonymous or empty DID) receive the
lowest trust multiplier (0.3).
Confidence calibration¶
calibrate_confidence() applies a trust-level cap and correction-rate
adjustment to an agent's confidence_hint:
from trails.memory_security import calibrate_confidence, TrustLevel
# Trust level caps the maximum effective confidence
calibrate_confidence(hint=0.95, agent_did="did:key:alice",
trust_level=TrustLevel.AUTHENTICATED)
# -> 0.7 (capped at AUTHENTICATED ceiling)
# High correction rate reduces effective trust
calibrate_confidence(hint=0.8, agent_did="did:key:bob",
trust_level=TrustLevel.ESTABLISHED,
correction_rate=0.4)
# -> 0.54 (0.9 * max(0.5, 1.0 - 0.4) = 0.54)
Trust levels¶
| Level | Multiplier | When assigned |
|---|---|---|
ANONYMOUS |
0.3 | No DID or did:key:anonymous |
AUTHENTICATED |
0.7 | DID-bearing agent (default) |
ESTABLISHED |
0.9 | Agent with track record |
HUMAN |
1.0 | Human operator |
SYSTEM |
1.0 | Framework-internal operations |
The formula: effective_confidence = min(hint, trust_level *
max(0.5, 1.0 - correction_rate)). An agent whose facts are frequently
corrected by others sees its effective trust decay toward 50% of its
base level.
Correction authorization¶
Corrections follow three rules from ADR-0052 section 6:
- Self-correction -- agents can always correct their own facts.
- Human override -- agents with
trust_level="human"can correct any fact. - Cross-agent correction -- requires an explicit Cedar policy
granting
memory.correcton the target resource.
from trails.memory_security import authorize_correction
# Self-correction: always allowed
authorize_correction(
corrector_did="did:key:alice",
fact_owner_did="did:key:alice",
fact_iri="urn:trails:memory:fact:123",
) # -> True
# Cross-agent without policy: denied
authorize_correction(
corrector_did="did:key:bob",
fact_owner_did="did:key:alice",
fact_iri="urn:trails:memory:fact:123",
) # -> False
# Cross-agent with Cedar policy
policies = [{
"effect": "permit",
"principal": "did:key:bob",
"action": "memory.correct",
"resource": {"type": "Trails::Resource::Fact"},
}]
authorize_correction(
corrector_did="did:key:bob",
fact_owner_did="did:key:alice",
fact_iri="urn:trails:memory:fact:123",
policies=policies,
) # -> True (if policy evaluates to ALLOW)
Cedar policy context for corrections:
| Field | Value |
|---|---|
principal |
Corrector's DID |
action |
"memory.correct" |
resource.type |
"Trails::Resource::Fact" |
resource.id |
Fact IRI |
resource.agent_did |
Original author's DID |
Provenance integrity¶
Every memory operation appends to a SHA-256 hash chain. Each record's
self_hash covers the record's identity fields plus the prev_hash
of the preceding record, forming a tamper-evident audit trail:
from trails.memory_security import ProvenanceChain, compute_fact_hash, verify_chain
chain = ProvenanceChain()
# Records are appended automatically by the gateway.
# Manual use for testing:
rec = chain.append(
fact_iri="urn:trails:memory:fact:1",
agent_did="did:key:alice",
action="memory.learn",
timestamp="2026-04-19T10:00:00+00:00",
)
print(rec.self_hash) # sha256:a1b2c3...
print(rec.prev_hash) # sha256:000...000 (genesis)
# Verify integrity
valid, broken = chain.verify()
assert valid
assert broken == []
# Convenience wrapper
assert verify_chain(chain)
The genesis hash is sha256: followed by 64 zeros. Hash computation
uses canonical JSON with sorted keys:
hash = compute_fact_hash(
content="Deploy key rotates weekly",
agent_did="did:key:alice",
timestamp="2026-04-19T10:00:00+00:00",
prev_hash="sha256:000...000",
action="memory.learn",
fact_iri="urn:trails:memory:fact:1",
)
The gateway exposes verify_provenance() for on-demand chain
verification:
valid, broken_indices = gw.verify_provenance()
if not valid:
print(f"Chain broken at indices: {broken_indices}")
# A CHAIN_BREAK audit event is logged automatically
Trust levels (origin)¶
trails.memory_trust.TrustLevel classifies where a fact originated:
| Level | Description | Ordering |
|---|---|---|
LOCAL |
Produced by agents on this instance | Highest trust |
PEER |
Received from a federated trusted peer | Middle |
PUBLIC |
From untrusted or external sources | Lowest trust |
from trails.memory_trust import TrustLevel
TrustLevel.LOCAL > TrustLevel.PEER # True
TrustLevel.PEER > TrustLevel.PUBLIC # True
Data classification¶
DataClassification controls which trust levels may read a fact:
| Classification | Who can read | Sensitivity rank |
|---|---|---|
OPEN |
Any agent (LOCAL, PEER, PUBLIC) | 0 |
INTERNAL |
LOCAL and PEER agents | 1 |
CONFIDENTIAL |
LOCAL agents only | 2 |
RESTRICTED |
Named agents only (explicit grant) | 3 |
from trails.memory_trust import DataClassification, FactTrustMetadata
meta = FactTrustMetadata(
trust_level=TrustLevel.LOCAL,
classification=DataClassification.CONFIDENTIAL,
)
Default read matrix¶
The default mapping of which trust levels can read which classifications:
| Classification | LOCAL | PEER | PUBLIC |
|---|---|---|---|
| OPEN | yes | yes | yes |
| INTERNAL | yes | yes | no |
| CONFIDENTIAL | yes | no | no |
| RESTRICTED | explicit grant only |
Topic-scoped read policies¶
TopicScopedReadPolicy extends recall with classification-aware
access control:
from trails.memory_trust import TopicScopedReadPolicy, DataClassification
policy = TopicScopedReadPolicy(
topic_rules={
"financials": DataClassification.CONFIDENTIAL,
"public-docs": DataClassification.OPEN,
},
default_classification=DataClassification.INTERNAL,
agent_clearances={
"did:key:auditor": {DataClassification.CONFIDENTIAL, DataClassification.INTERNAL},
},
)
# Check access
policy.can_read("did:key:auditor", "financials") # True (explicit clearance)
policy.can_read("did:key:analyst", "financials") # False (no clearance for CONFIDENTIAL)
policy.can_read("did:key:analyst", "public-docs") # True (OPEN is readable by LOCAL)
# Filter a fact list
visible = policy.filter_facts(all_facts, agent_did="did:key:analyst")
Cedar policies can be attached for fine-grained control. When provided, Cedar evaluation takes precedence over the default matrix:
policy = TopicScopedReadPolicy(
cedar_policies=[{
"effect": "permit",
"principal": "did:key:analyst",
"action": "memory.recall",
"resource": {"type": "Trails::Memory::Fact", "topic": "financials"},
}],
)
Contamination tracking¶
ContaminationTracker marks facts as tainted and propagates taint
through inference chains. When a source fact is tainted, all downstream
facts linked via supports relationships are transitively tainted:
from trails.memory_trust import ContaminationTracker
tracker = ContaminationTracker(store=ctx.kg._store, graph="urn:trails:memory")
# Mark a fact as tainted
tracker.mark_tainted(
"urn:trails:memory:fact:42",
reason="Source paper retracted",
)
# Check taint status
tracker.is_tainted("urn:trails:memory:fact:42") # True
# All downstream facts linked via "supports" are also tainted
chain = tracker.taint_chain("urn:trails:memory:fact:42")
# ["urn:trails:memory:fact:43", "urn:trails:memory:fact:44"]
Taint propagation follows supports links in the knowledge graph
(brain ontology namespace https://trails.dev/ns/brain/v1#). The
tracker uses SPARQL INSERT to mark taint and SPARQL SELECT to traverse
the inference chain.
Federation trust¶
FederationTrustGate controls which facts cross federation boundaries:
Export filtering¶
Facts are excluded from export if: - Their classification exceeds the peer's allowed level. - They are tainted. - Their confidence is below the configured floor. - They originated from the requesting peer (no echo).
from trails.memory_trust import FederationTrustGate, TrustLevel, DataClassification
gate = FederationTrustGate(
local_did="did:key:my-instance",
peer_trust_levels={
"did:key:partner": TrustLevel.PEER,
"did:key:public-api": TrustLevel.PUBLIC,
},
export_max_classification=DataClassification.INTERNAL,
confidence_floor=0.3,
)
# Filter facts for a trusted peer
exportable = gate.filter_for_export(
facts=all_facts,
peer_trust_level=TrustLevel.PEER,
fact_metadata=metadata_map,
)
Import validation¶
All imported facts receive trust metadata based on the source peer:
annotated = gate.validate_import(incoming_facts, source_peer="did:key:partner")
for fact, meta in annotated:
print(f"Trust: {meta.trust_level.value}, tainted: {meta.tainted}")
# PEER facts: confidence *= 0.7
# PUBLIC facts: auto-tainted with reason
Trust multipliers on import:
| Source trust | Confidence multiplier | Auto-tainted |
|---|---|---|
| LOCAL | 1.0 | No |
| PEER | 0.7 | No |
| PUBLIC | 0.3 | Yes |
Namespace isolation¶
MemoryIsolation provides multi-tenant isolation within a single
knowledge graph. Each agent operates in its own named graph, preventing
cross-tenant data leakage:
from trails.memory_trust import MemoryIsolation
isolation = MemoryIsolation(
namespace_map={
"did:key:alice": "urn:trails:memory:ns:team-alpha",
"did:key:bob": "urn:trails:memory:ns:team-alpha",
},
shared_namespace="urn:trails:memory:ns:shared",
)
# Resolve namespace
isolation.get_namespace("did:key:alice")
# "urn:trails:memory:ns:team-alpha"
isolation.get_namespace("did:key:charlie")
# "urn:trails:memory:ns:did_key_charlie" (auto-generated)
# Scope queries to agent namespace
scoped_query = isolation.enforce_namespace(query, agent_did="did:key:alice")
# Replaces urn:trails:memory with the agent's namespace graph
# Adds FROM NAMED for the shared namespace on read queries
# Access checks
isolation.agent_can_access_namespace("did:key:alice", "urn:trails:memory:ns:team-alpha") # True
isolation.agent_can_access_namespace("did:key:alice", "urn:trails:memory:ns:shared") # True (read)
isolation.agent_can_access_namespace("did:key:alice", "urn:trails:memory:ns:team-beta") # False
Agents can always access their own namespace and the shared namespace (read-only). Cross-namespace access is denied by default.
Audit logging¶
MemoryAuditLog is a thread-safe, append-only log for all memory
operations:
from trails.memory_security import MemoryAuditLog, MemoryAuditEvent
log = MemoryAuditLog()
# Querying the audit log
entries = log.query(
agent_did="did:key:alice",
action=MemoryAuditEvent.LEARN,
since="2026-04-19T00:00:00+00:00",
limit=50,
)
for entry in entries:
print(f"{entry.timestamp}: {entry.action} on {entry.fact_iri}")
# Export for persistence
all_entries = log.export()
# [{"agent_did": "...", "action": "memory.learn", "timestamp": "...", ...}]
Audit event types¶
| Event | When logged |
|---|---|
memory.learn |
New fact stored |
memory.correct |
Fact corrected |
memory.forget |
Fact retracted |
memory.budget_warning |
Memory budget approaching limit |
memory.budget_exceeded |
Memory budget exceeded |
memory.chain_break |
Provenance chain integrity violation detected |
memory.rapid_writes |
Abnormal write rate detected |
memory.cross_correction |
Cross-agent correction attempted (allowed or denied) |
Source attestation¶
Facts carry a source attestation type indicating how the source was verified:
| Attestation | Meaning |
|---|---|
self-reported |
Agent states the source without verification |
tool-observed |
Observed via a tool invocation |
content-hashed |
Source content is hash-verified |
human-confirmed |
A human verified the source |
scitt-anchored |
Anchored in a SCITT transparency log |
Example: secured multi-agent memory¶
from trails.memory_security import (
MemoryGateway,
MemorySession,
MemoryAuditLog,
ProvenanceChain,
)
from trails.memory_trust import (
TopicScopedReadPolicy,
DataClassification,
FederationTrustGate,
TrustLevel,
MemoryIsolation,
)
# Shared infrastructure
audit_log = MemoryAuditLog()
chain = ProvenanceChain()
# Agent A: analyst with established trust
session_a = MemorySession(
principal_did="did:key:analyst-a",
trust_level="established",
)
gw_a = MemoryGateway(
memory_store,
session_a,
audit_log=audit_log,
provenance_chain=chain,
correction_policies=[{
"effect": "permit",
"principal": "did:key:analyst-a",
"action": "memory.correct",
"resource": {"type": "Trails::Resource::Fact"},
}],
)
# Agent B: junior agent with authenticated trust
session_b = MemorySession(
principal_did="did:key:junior-b",
trust_level="authenticated",
)
gw_b = MemoryGateway(
memory_store,
session_b,
audit_log=audit_log,
provenance_chain=chain,
)
# Agent A stores a high-confidence fact
iri = gw_a.learn(
"Patient cohort shows 15% improvement",
confidence_hint=0.95,
topic="clinical",
source="RCT results 2026-Q1",
ttl_seconds=7776000, # 90 days
)
# Effective confidence: min(0.95, 0.9) = 0.9 (ESTABLISHED cap)
# Agent B tries to store with inflated confidence
iri_b = gw_b.learn(
"Secondary analysis confirms trend",
confidence_hint=0.99,
topic="clinical",
)
# Effective confidence: min(0.99, 0.7) = 0.7 (AUTHENTICATED cap)
# Read policy: clinical topic is CONFIDENTIAL
read_policy = TopicScopedReadPolicy(
topic_rules={"clinical": DataClassification.CONFIDENTIAL},
agent_clearances={
"did:key:analyst-a": {DataClassification.CONFIDENTIAL},
},
)
read_policy.can_read("did:key:analyst-a", "clinical") # True
read_policy.can_read("did:key:junior-b", "clinical") # False
# Federation gate: only share INTERNAL and below
gate = FederationTrustGate(
local_did="did:key:my-instance",
export_max_classification=DataClassification.INTERNAL,
confidence_floor=0.5,
)
# Verify provenance integrity
valid, broken = chain.verify()
assert valid
# Check audit trail
recent = audit_log.query(action="memory.learn", limit=10)
print(f"{len(recent)} recent learn operations")
Reference¶
| Symbol | Description |
|---|---|
MemoryGateway(memory_store, session, *, audit_log, provenance_chain, expiry_tracker, correction_policies) |
Trusted intermediary for all memory operations |
MemoryGateway.learn(content, *, confidence_hint, topic, source, scope, staleness, tags, metadata, ttl_seconds) |
Store a fact with enforced provenance |
MemoryGateway.correct(fact_iri, new_content, *, reason, new_confidence_hint, fact_owner_did) |
Supersede a fact with authorization checks |
MemoryGateway.forget(fact_iri, *, reason) |
Soft-delete with audit trail |
MemoryGateway.recall(context, **kwargs) |
Delegate recall with audit entry |
MemoryGateway.verify_provenance() |
Verify hash chain; returns (valid, broken_indices) |
MemoryGateway.gc_expired() |
Garbage-collect expired facts |
MemorySession(principal_did, trust_level, roles) |
Authenticated session context |
TrustLevel (security) |
Agent trust levels: ANONYMOUS, AUTHENTICATED, ESTABLISHED, HUMAN, SYSTEM |
calibrate_confidence(hint, agent_did, topic, *, trust_level, correction_rate) |
Apply trust cap and correction adjustment |
authorize_correction(corrector_did, fact_owner_did, fact_iri, *, corrector_trust_level, policies) |
Check cross-agent correction authorization |
ProvenanceChain |
In-memory SHA-256 hash chain |
ProvenanceChain.append(fact_iri, agent_did, action, timestamp, *, detail) |
Append a record |
ProvenanceChain.verify() |
Walk and verify; returns (valid, broken_indices) |
compute_fact_hash(content, agent_did, timestamp, prev_hash, *, action, fact_iri) |
Compute a single provenance hash |
verify_chain(chain) |
Convenience bool wrapper |
MemoryAuditLog |
Thread-safe append-only audit log |
MemoryAuditLog.query(*, agent_did, action, since, fact_iri, limit) |
Query with filters |
MemoryAuditLog.export() |
Export all entries as dicts |
MemoryAuditEvent |
Event type constants |
SourceAttestation |
Source verification type constants |
FactExpiryTracker |
TTL-based expiry tracking |
TrustLevel (trust) |
Origin trust: LOCAL, PEER, PUBLIC |
DataClassification |
Sensitivity: OPEN, INTERNAL, CONFIDENTIAL, RESTRICTED |
FactTrustMetadata(trust_level, classification, tainted, taint_reason, source_peer) |
Trust metadata for a fact |
TopicScopedReadPolicy(*, topic_rules, default_classification, agent_clearances, cedar_policies) |
Classification-aware read control |
ContaminationTracker(store, *, graph) |
Taint propagation through inference chains |
FederationTrustGate(local_did, *, peer_trust_levels, export_max_classification, confidence_floor) |
Federation import/export filtering |
MemoryIsolation(*, namespace_map, shared_namespace) |
Multi-tenant namespace isolation |
See also¶
- ADR-0052 -- Memory security gateway design
- ADR-0053 -- Memory trust boundaries
- Agent Memory -- the
trails.memorysystem these modules secure - Policy & Authorization -- Cedar policy syntax
- Federation -- peer-to-peer data exchange
- Agent Runtime -- agent sessions and planning loops