Chapter 6 — Data Integration¶
Learning objectives¶
After this chapter you will be able to:
- Ingest documents (PDF, HTML, Markdown, DOCX, plain text) into the knowledge graph.
- Map structured data (CSV, JSON, XML) to graph triples using RML.
- Auto-generate RML mappings from source file schemas.
- Index chunks as vectors and run hybrid graph+vector retrieval.
- Choose the right embedder and vector store for your workload.
The ingestion pipeline¶
trails.ingest is the entry point for loading unstructured documents
into the KG. It extracts text, splits it into chunks, stores both as
typed graph nodes, and records full PROV-O provenance -- all in one
call.
from trails.testing import fresh_context
from trails.ingest import ingest_file, ingest_directory
ctx = fresh_context()
# Single file — returns the minted Document IRI
doc_iri = ingest_file("paper.pdf", ctx, source="arxiv:2404.12345")
# Directory walk (recursive by default)
report = ingest_directory("./corpus", ctx, glob="*.pdf")
print(f"Ingested {len(report)} documents, {len(report.failures)} failures")
# Re-ingesting the same file is idempotent — content-hash dedup
same_iri = ingest_file("paper.pdf", ctx)
assert same_iri == doc_iri
Each ingest_file call runs three stages: extract text from the
source format, chunk it into manageable pieces, and load both
the Document and Chunk nodes into the KG. A prov:Activity is
emitted for every run.
Extractors¶
Install the optional text backends:
Each backend is lazy-imported. When a dependency is missing, the
extractor raises MissingExtractorDep with the exact pip install
line -- no silent fallback.
| Format | Extensions | Backend | Notes |
|---|---|---|---|
.pdf |
pypdf |
Concatenates per-page text; image-only pages treated as empty | |
| HTML | .html, .htm |
trafilatura |
Strips navigation, ads, chrome automatically |
| Markdown | .md |
markdown-it-py |
Strips YAML front-matter, validates syntax |
| DOCX | .docx |
python-docx |
Body paragraphs + tables; headers/footers dropped |
| RTF | .rtf |
stdlib | Basic RTF stripping; for rich RTF use a dedicated parser |
| Plain text | .txt |
stdlib | UTF-8 with BOM stripping and line-ending normalization |
The chunker¶
After extraction, text is split into chunks using a deterministic, NLP-free paragraph chunker:
from trails.ingest import paragraph_chunker
chunks = paragraph_chunker(text, min_chars=100, max_chars=1500)
The algorithm splits on paragraph breaks (two+ newlines), merges short
paragraphs into the next one, and splits oversized paragraphs at
sentence boundaries. Each chunk carries start_char / end_char
offsets for provenance and highlight-on-hover.
You can plug in a different chunker:
The protocol is a bare Callable[[str], list[Chunk]].
Querying ingested documents¶
Document and Chunk are ordinary @node_type classes, so you query
them with the same ORM surface as any other node type:
from trails.ingest import Document, Chunk
doc = Document.find(ctx, doc_iri)
chunks = Chunk.where(document=doc).order_by("index").fetch(ctx)
for chunk in chunks:
print(f"Chunk {chunk.index}: {chunk.text[:80]}...")
RML data mapping: structured data without code¶
For structured data (CSV, JSON, XML), writing a Python extractor for every source schema is tedious and brittle. RML (RDF Mapping Language) lets you declare how rows and fields map to triples in a Turtle file. The mapping is data, not code -- versionable, diffable, reviewable.
When to use RML vs code extractors¶
| Scenario | Recommended approach |
|---|---|
| Unstructured text (PDF, HTML, Markdown) | Code extractors via trails.ingest |
| Structured tabular data (CSV, TSV) | RML mapping |
| Semi-structured data (JSON, XML) | RML mapping |
| One-off import with custom logic | Code extractor |
| Many source schemas that change often | RML mappings |
Installation¶
This installs Morph-KGC, a
W3C-test-suite-compliant RML processor. If not installed, trails.rml
raises TrailsError with the install command.
Writing your first RML mapping¶
Suppose you have a CSV file employees.csv:
Write a Turtle mapping file mappings/employees.ttl:
@prefix rr: <http://www.w3.org/ns/r2rml#> .
@prefix rml: <http://semweb.mmlab.be/ns/rml#> .
@prefix ql: <http://semweb.mmlab.be/ns/ql#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix ex: <https://myapp.example/> .
<#EmployeeMapping> a rr:TriplesMap ;
rml:logicalSource [
rml:source "employees.csv" ;
rml:referenceFormulation ql:CSV
] ;
rr:subjectMap [
rr:template "https://myapp.example/Employee/{id}" ;
rr:class ex:Employee
] ;
rr:predicateObjectMap [
rr:predicate ex:name ;
rr:objectMap [ rml:reference "name" ]
] ;
rr:predicateObjectMap [
rr:predicate ex:department ;
rr:objectMap [ rml:reference "department" ]
] ;
rr:predicateObjectMap [
rr:predicate ex:salary ;
rr:objectMap [
rml:reference "salary" ;
rr:datatype xsd:integer
]
] .
Execute it:
from trails.rml import run_mapping
result = run_mapping(ctx, "mappings/employees.ttl")
print(f"Loaded {result.triples_added} triples in {result.duration_ms:.0f}ms")
print(f"Activity: {result.activity_iri}") # PROV-O trace
This loads 3 employees x 4 triples each (type + 3 properties) = 12
triples. Each employee gets an IRI like
https://myapp.example/Employee/1 with typed properties.
Validating mappings¶
Check a mapping for structural errors before executing it:
from trails.rml import validate_mapping
issues = validate_mapping("mappings/employees.ttl")
if issues:
for issue in issues:
print(f"[{issue.severity.upper()}] {issue.message}")
else:
print("Mapping is valid")
The validator checks Turtle syntax, subject maps, predicate-object maps, and source file reachability.
Overriding source paths¶
Decouple mapping logic from file paths using --source-override on the
CLI or keyword arguments in Python:
# CLI
trails rml run mappings/sales.ttl --source-override sales=/tmp/staged/sales.csv
# Python
result = run_mapping(ctx, "mappings/sales.ttl", sales="/data/exports/sales.csv")
For reusable source management, register sources with @rml_source:
from trails.rml import rml_source
@rml_source("sales_csv", kind="csv")
def sales_data():
return "/data/exports/sales_2026.csv"
@rml_source("product_feed", kind="json")
def product_api():
return "https://api.example.com/products.json"
The same mapping runs in dev (local CSV) and prod (S3 export) by rebinding the source.
Auto-generating RML mappings¶
Instead of hand-writing Turtle for every source, use trails rml
generate to infer a mapping from the source file's schema:
# Auto-detect format from extension, print Turtle to stdout
trails rml generate data/employees.csv
# Custom base IRI and node type name, write to file
trails rml generate data/orders.csv \
--base-iri https://myapp.example/ \
--node-type-name Order \
-o mappings/orders.ttl
from trails.rml.generate import generate_mapping
result = generate_mapping(
"data/employees.csv",
source_type="csv",
base_iri="https://myapp.example/",
node_type_name="Employee",
)
print(f"Detected columns: {result.columns}")
print(f"Inferred types: {result.inferred_types}")
print(result.turtle)
The generator samples the first 100 rows and infers XSD types automatically: integers, floats, dates, datetimes, booleans, URLs, and strings. The output is a starting point -- review and adjust before running.
RML through the ingest pipeline¶
RML integrates with ingest_file() via the rml_mapping= parameter:
from trails.ingest import ingest_file
# Standard code extractor (default)
doc_iri = ingest_file("paper.pdf", ctx)
# RML-driven ingestion for structured data
doc_iri = ingest_file(
"transactions.csv",
ctx,
rml_mapping="mappings/transactions.ttl",
)
When using RML through the pipeline, the chunker step is skipped (RML produces typed triples directly), and PROV-O provenance is emitted by the RML runner.
Vector search and hybrid retrieval¶
After loading documents into the KG, you often want to search them by
meaning, not just by exact graph patterns. trails.vector provides
embeddings, vector storage, and hybrid graph+vector retrieval.
Embedders¶
Trails ships three embedders:
SentenceTransformerEmbedder -- local, no API cost:
from trails.vector import SentenceTransformerEmbedder
embedder = SentenceTransformerEmbedder(model="all-MiniLM-L6-v2")
# 22 MB model, 384 dimensions, runs on CPU
OpenAIEmbedder -- API-based:
MockEmbedder -- deterministic, for tests:
from trails.vector import MockEmbedder
embedder = MockEmbedder(dim=16, seed=1)
# Same input always produces the same vector
Vector stores¶
SqliteVecStore -- zero-ops, great for development:
from trails.vector import SqliteVecStore
store = SqliteVecStore(path="vectors.db", dim=384)
# or path=":memory:" for tests
QdrantStore -- for production scale:
from trails.vector import QdrantStore
store = QdrantStore(
collection="my-chunks",
dim=384,
url="http://localhost:6333",
)
Both stores implement the same interface: add(), search(),
delete(), count().
Indexing chunks¶
After ingesting documents, embed and store their chunks:
from trails.ingest import Chunk
chunks = Chunk.where(document=doc_iri).fetch(ctx)
for chunk in chunks:
store.add(
id=chunk.iri,
vector=embedder.embed(chunk.text),
metadata={"iri": chunk.iri, "snippet": chunk.text[:200]},
)
Retrieval modes¶
retrieve() provides three modes in a single entry point:
from trails.vector import retrieve
# Pure vector similarity
hits = retrieve("drug interactions", mode="vector", k=10,
vector_store=store, embedder=embedder)
# Pure SPARQL (graph patterns only)
hits = retrieve("drug interactions", mode="graph", k=10, ctx=ctx,
sparql_filter="SELECT ?iri WHERE { ?iri a <trails://app/Chunk> }")
# Hybrid: SPARQL narrows candidates, vector reranks
hits = retrieve(
"drug interactions",
mode="hybrid",
k=10,
ctx=ctx,
vector_store=store,
embedder=embedder,
sparql_filter="""
SELECT ?iri WHERE {
?iri a <trails://app/Chunk> ;
<trails://app/Chunk/document> ?doc .
?doc <trails://app/Document/source> "arxiv:2404.12345" .
}
""",
)
for hit in hits:
print(f"{hit.iri} (score: {hit.score:.3f}): {hit.snippet[:80]}...")
Hybrid mode is the recommended default for production: SPARQL provides precision (only chunks from the right documents), vector provides recall (semantic similarity even when exact terms differ).
Example: loading a CSV dataset and querying it¶
This end-to-end example loads a CSV file using RML, infers a schema, and queries the result.
from trails.testing import fresh_context
from trails.rml import run_mapping
from trails.onto_infer import infer_schema, generate_code
ctx = fresh_context()
# Step 1: Load the CSV via RML
result = run_mapping(ctx, "mappings/employees.ttl")
print(f"Loaded {result.triples_added} triples")
# Step 2: Query the data with SPARQL
engineers = ctx.kg.query("""
PREFIX ex: <https://myapp.example/>
SELECT ?name ?salary WHERE {
?e a ex:Employee ;
ex:department "Engineering" ;
ex:name ?name ;
ex:salary ?salary .
}
ORDER BY DESC(?salary)
""")
for row in engineers:
print(f"{row['name']}: ${row['salary']}")
# Step 3: Infer the schema from the loaded data
schema = infer_schema(ctx.kg._store, trace_id="csv-demo")
print(f"\nInferred {len(schema.candidates)} types:")
for candidate in schema.candidates:
print(f" {candidate.name} ({candidate.instance_count} instances)")
for prop in candidate.properties:
print(f" {prop.name}: {prop.python_type}")
# Step 4: Generate @node_type code
code = generate_code(schema)
print("\nGenerated code:")
print(code)
After running this, you have typed @node_type declarations generated
from your CSV data. Review, edit, and import them into your project.
From there, use the ORM (Chapter 3) to query and manipulate the data
with Python objects instead of raw SPARQL.
Deep dives¶
- Document Ingestion guide -- full extractor API, chunker protocol, PROV-O shape, pipeline options.
- RML Data Mapping guide -- source registry, RDBMS and HTTP sources, IngestPipeline integration, anti-patterns.
- Auto-Ontology guide -- schema inference, LLM-assisted generation, usage-driven refinement.
- Vector Retrieval guide -- embedder protocol, store interface, hybrid retrieval internals.
What's next: Chapter 7 -- Federation and Scaling covers how to connect multiple Trails instances, expose SPARQL endpoints, and deploy for production.