CRDT Editing¶
trails.crdt provides conflict-free concurrent graph mutations using a state-based
LWW-Element-Set CRDT (Last-Writer-Wins). Each replica maintains add-set and
remove-set entries with Lamport timestamps. Merge uses add-wins semantics: when a
triple appears in both sets, the entry with the highest timestamp wins.
See ADR-0041 for the full design rationale.
Quick start¶
from trails.crdt import CRDTStore, merge_replicas, crdt_context
# Standalone CRDT stores for two concurrent agents
a = CRDTStore("agent-a")
b = CRDTStore("agent-b")
a.add("urn:s", "urn:p", "value-1")
b.add("urn:s", "urn:p", "value-2")
# Merge without conflicts -- both triples survive (add-wins)
merged = merge_replicas(a, b)
print(merged.effective_triples())
# Integration with ctx.kg via context manager
with crdt_context(ctx, "agent-1") as crdt:
ctx.kg.update('INSERT DATA { <urn:s> <urn:p> "v" . }')
# Writes are automatically tracked as CRDT operations
state = crdt.state() # export for sync with other replicas
How it works¶
- Lamport clock: Thread-safe monotonic timestamp for causal ordering
- Add-wins: When the same triple has both an add and remove entry, the higher timestamp wins; ties go to add (bias toward keeping data)
- State-based merge:
apply_state()merges a remote snapshot by keeping the highest-timestamp entry per triple, then rebuilds the backing store
Key types¶
| Type | Description |
|---|---|
CRDTStore |
Wraps a Trails store with CRDT semantics: add(), remove(), merge(), state(), effective_triples() |
CRDTState |
Serializable snapshot: replica_id, add_set, remove_set, clock |
CRDTOperation |
Single operation: op_type ("add"|"remove"), triple, timestamp, replica_id |
LamportClock |
Thread-safe monotonic clock: tick(), update(remote_time), current |
API¶
| Function | Signature | Description |
|---|---|---|
CRDTStore() |
(replica_id, *, store=None) |
Create a CRDT-wrapped store for a replica |
.add() |
(subject, predicate, object, *, graph=None) -> CRDTOperation |
Add a triple with a new Lamport timestamp |
.remove() |
(subject, predicate, object, *, graph=None) -> CRDTOperation |
Mark a triple as removed (tombstone) |
.merge() |
(other: CRDTStore) -> None |
Merge another replica's state into this one |
.state() |
() -> CRDTState |
Export state for transmission |
.effective_triples() |
() -> set[Triple] |
Triples currently alive after conflict resolution |
merge_replicas |
(store_a, store_b) -> CRDTStore |
Merge two replicas into a fresh third replica |
resolve_conflicts |
(ops_a, ops_b) -> list[CRDTOperation] |
Merge two operation lists and return effective operations |
crdt_context |
(ctx, replica_id) -> ContextManager[CRDTStore] |
Wrap ctx.kg with CRDT tracking for the block's duration |