Skip to content

CRDT Editing

trails.crdt provides conflict-free concurrent graph mutations using a state-based LWW-Element-Set CRDT (Last-Writer-Wins). Each replica maintains add-set and remove-set entries with Lamport timestamps. Merge uses add-wins semantics: when a triple appears in both sets, the entry with the highest timestamp wins.

See ADR-0041 for the full design rationale.

Quick start

from trails.crdt import CRDTStore, merge_replicas, crdt_context

# Standalone CRDT stores for two concurrent agents
a = CRDTStore("agent-a")
b = CRDTStore("agent-b")

a.add("urn:s", "urn:p", "value-1")
b.add("urn:s", "urn:p", "value-2")

# Merge without conflicts -- both triples survive (add-wins)
merged = merge_replicas(a, b)
print(merged.effective_triples())

# Integration with ctx.kg via context manager
with crdt_context(ctx, "agent-1") as crdt:
    ctx.kg.update('INSERT DATA { <urn:s> <urn:p> "v" . }')
    # Writes are automatically tracked as CRDT operations

state = crdt.state()   # export for sync with other replicas

How it works

  • Lamport clock: Thread-safe monotonic timestamp for causal ordering
  • Add-wins: When the same triple has both an add and remove entry, the higher timestamp wins; ties go to add (bias toward keeping data)
  • State-based merge: apply_state() merges a remote snapshot by keeping the highest-timestamp entry per triple, then rebuilds the backing store

Key types

Type Description
CRDTStore Wraps a Trails store with CRDT semantics: add(), remove(), merge(), state(), effective_triples()
CRDTState Serializable snapshot: replica_id, add_set, remove_set, clock
CRDTOperation Single operation: op_type ("add"|"remove"), triple, timestamp, replica_id
LamportClock Thread-safe monotonic clock: tick(), update(remote_time), current

API

Function Signature Description
CRDTStore() (replica_id, *, store=None) Create a CRDT-wrapped store for a replica
.add() (subject, predicate, object, *, graph=None) -> CRDTOperation Add a triple with a new Lamport timestamp
.remove() (subject, predicate, object, *, graph=None) -> CRDTOperation Mark a triple as removed (tombstone)
.merge() (other: CRDTStore) -> None Merge another replica's state into this one
.state() () -> CRDTState Export state for transmission
.effective_triples() () -> set[Triple] Triples currently alive after conflict resolution
merge_replicas (store_a, store_b) -> CRDTStore Merge two replicas into a fresh third replica
resolve_conflicts (ops_a, ops_b) -> list[CRDTOperation] Merge two operation lists and return effective operations
crdt_context (ctx, replica_id) -> ContextManager[CRDTStore] Wrap ctx.kg with CRDT tracking for the block's duration