Skip to content

Chapter 6 — Data Integration

Learning objectives

After this chapter you will be able to:

  • Ingest documents (PDF, HTML, Markdown, DOCX, plain text) into the knowledge graph.
  • Map structured data (CSV, JSON, XML) to graph triples using RML.
  • Auto-generate RML mappings from source file schemas.
  • Index chunks as vectors and run hybrid graph+vector retrieval.
  • Choose the right embedder and vector store for your workload.

The ingestion pipeline

trails.ingest is the entry point for loading unstructured documents into the KG. It extracts text, splits it into chunks, stores both as typed graph nodes, and records full PROV-O provenance -- all in one call.

from trails.testing import fresh_context
from trails.ingest import ingest_file, ingest_directory

ctx = fresh_context()

# Single file — returns the minted Document IRI
doc_iri = ingest_file("paper.pdf", ctx, source="arxiv:2404.12345")

# Directory walk (recursive by default)
report = ingest_directory("./corpus", ctx, glob="*.pdf")
print(f"Ingested {len(report)} documents, {len(report.failures)} failures")

# Re-ingesting the same file is idempotent — content-hash dedup
same_iri = ingest_file("paper.pdf", ctx)
assert same_iri == doc_iri

Each ingest_file call runs three stages: extract text from the source format, chunk it into manageable pieces, and load both the Document and Chunk nodes into the KG. A prov:Activity is emitted for every run.

Extractors

Install the optional text backends:

pip install 'trails[ingest]'

Each backend is lazy-imported. When a dependency is missing, the extractor raises MissingExtractorDep with the exact pip install line -- no silent fallback.

Format Extensions Backend Notes
PDF .pdf pypdf Concatenates per-page text; image-only pages treated as empty
HTML .html, .htm trafilatura Strips navigation, ads, chrome automatically
Markdown .md markdown-it-py Strips YAML front-matter, validates syntax
DOCX .docx python-docx Body paragraphs + tables; headers/footers dropped
RTF .rtf stdlib Basic RTF stripping; for rich RTF use a dedicated parser
Plain text .txt stdlib UTF-8 with BOM stripping and line-ending normalization

The chunker

After extraction, text is split into chunks using a deterministic, NLP-free paragraph chunker:

from trails.ingest import paragraph_chunker

chunks = paragraph_chunker(text, min_chars=100, max_chars=1500)

The algorithm splits on paragraph breaks (two+ newlines), merges short paragraphs into the next one, and splits oversized paragraphs at sentence boundaries. Each chunk carries start_char / end_char offsets for provenance and highlight-on-hover.

You can plug in a different chunker:

doc_iri = ingest_file("paper.pdf", ctx, chunker=my_custom_chunker)

The protocol is a bare Callable[[str], list[Chunk]].

Querying ingested documents

Document and Chunk are ordinary @node_type classes, so you query them with the same ORM surface as any other node type:

from trails.ingest import Document, Chunk

doc = Document.find(ctx, doc_iri)
chunks = Chunk.where(document=doc).order_by("index").fetch(ctx)

for chunk in chunks:
    print(f"Chunk {chunk.index}: {chunk.text[:80]}...")

RML data mapping: structured data without code

For structured data (CSV, JSON, XML), writing a Python extractor for every source schema is tedious and brittle. RML (RDF Mapping Language) lets you declare how rows and fields map to triples in a Turtle file. The mapping is data, not code -- versionable, diffable, reviewable.

When to use RML vs code extractors

Scenario Recommended approach
Unstructured text (PDF, HTML, Markdown) Code extractors via trails.ingest
Structured tabular data (CSV, TSV) RML mapping
Semi-structured data (JSON, XML) RML mapping
One-off import with custom logic Code extractor
Many source schemas that change often RML mappings

Installation

pip install 'trails[rml]'

This installs Morph-KGC, a W3C-test-suite-compliant RML processor. If not installed, trails.rml raises TrailsError with the install command.

Writing your first RML mapping

Suppose you have a CSV file employees.csv:

id,name,department,salary
1,Alice,Engineering,95000
2,Bob,Marketing,82000
3,Carol,Engineering,98000

Write a Turtle mapping file mappings/employees.ttl:

@prefix rr:  <http://www.w3.org/ns/r2rml#> .
@prefix rml: <http://semweb.mmlab.be/ns/rml#> .
@prefix ql:  <http://semweb.mmlab.be/ns/ql#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix ex:  <https://myapp.example/> .

<#EmployeeMapping> a rr:TriplesMap ;
    rml:logicalSource [
        rml:source "employees.csv" ;
        rml:referenceFormulation ql:CSV
    ] ;
    rr:subjectMap [
        rr:template "https://myapp.example/Employee/{id}" ;
        rr:class ex:Employee
    ] ;
    rr:predicateObjectMap [
        rr:predicate ex:name ;
        rr:objectMap [ rml:reference "name" ]
    ] ;
    rr:predicateObjectMap [
        rr:predicate ex:department ;
        rr:objectMap [ rml:reference "department" ]
    ] ;
    rr:predicateObjectMap [
        rr:predicate ex:salary ;
        rr:objectMap [
            rml:reference "salary" ;
            rr:datatype xsd:integer
        ]
    ] .

Execute it:

from trails.rml import run_mapping

result = run_mapping(ctx, "mappings/employees.ttl")
print(f"Loaded {result.triples_added} triples in {result.duration_ms:.0f}ms")
print(f"Activity: {result.activity_iri}")  # PROV-O trace

This loads 3 employees x 4 triples each (type + 3 properties) = 12 triples. Each employee gets an IRI like https://myapp.example/Employee/1 with typed properties.

Validating mappings

Check a mapping for structural errors before executing it:

trails rml validate mappings/employees.ttl
from trails.rml import validate_mapping

issues = validate_mapping("mappings/employees.ttl")
if issues:
    for issue in issues:
        print(f"[{issue.severity.upper()}] {issue.message}")
else:
    print("Mapping is valid")

The validator checks Turtle syntax, subject maps, predicate-object maps, and source file reachability.

Overriding source paths

Decouple mapping logic from file paths using --source-override on the CLI or keyword arguments in Python:

# CLI
trails rml run mappings/sales.ttl --source-override sales=/tmp/staged/sales.csv

# Python
result = run_mapping(ctx, "mappings/sales.ttl", sales="/data/exports/sales.csv")

For reusable source management, register sources with @rml_source:

from trails.rml import rml_source

@rml_source("sales_csv", kind="csv")
def sales_data():
    return "/data/exports/sales_2026.csv"

@rml_source("product_feed", kind="json")
def product_api():
    return "https://api.example.com/products.json"

The same mapping runs in dev (local CSV) and prod (S3 export) by rebinding the source.

Auto-generating RML mappings

Instead of hand-writing Turtle for every source, use trails rml generate to infer a mapping from the source file's schema:

# Auto-detect format from extension, print Turtle to stdout
trails rml generate data/employees.csv

# Custom base IRI and node type name, write to file
trails rml generate data/orders.csv \
    --base-iri https://myapp.example/ \
    --node-type-name Order \
    -o mappings/orders.ttl
from trails.rml.generate import generate_mapping

result = generate_mapping(
    "data/employees.csv",
    source_type="csv",
    base_iri="https://myapp.example/",
    node_type_name="Employee",
)

print(f"Detected columns: {result.columns}")
print(f"Inferred types: {result.inferred_types}")
print(result.turtle)

The generator samples the first 100 rows and infers XSD types automatically: integers, floats, dates, datetimes, booleans, URLs, and strings. The output is a starting point -- review and adjust before running.

RML through the ingest pipeline

RML integrates with ingest_file() via the rml_mapping= parameter:

from trails.ingest import ingest_file

# Standard code extractor (default)
doc_iri = ingest_file("paper.pdf", ctx)

# RML-driven ingestion for structured data
doc_iri = ingest_file(
    "transactions.csv",
    ctx,
    rml_mapping="mappings/transactions.ttl",
)

When using RML through the pipeline, the chunker step is skipped (RML produces typed triples directly), and PROV-O provenance is emitted by the RML runner.

Vector search and hybrid retrieval

After loading documents into the KG, you often want to search them by meaning, not just by exact graph patterns. trails.vector provides embeddings, vector storage, and hybrid graph+vector retrieval.

Embedders

Trails ships three embedders:

SentenceTransformerEmbedder -- local, no API cost:

from trails.vector import SentenceTransformerEmbedder

embedder = SentenceTransformerEmbedder(model="all-MiniLM-L6-v2")
# 22 MB model, 384 dimensions, runs on CPU
pip install 'trails[vector]'

OpenAIEmbedder -- API-based:

from trails.vector import OpenAIEmbedder

embedder = OpenAIEmbedder(model="text-embedding-3-small")
pip install 'trails[vector-openai]'

MockEmbedder -- deterministic, for tests:

from trails.vector import MockEmbedder

embedder = MockEmbedder(dim=16, seed=1)
# Same input always produces the same vector

Vector stores

SqliteVecStore -- zero-ops, great for development:

from trails.vector import SqliteVecStore

store = SqliteVecStore(path="vectors.db", dim=384)
# or path=":memory:" for tests

QdrantStore -- for production scale:

from trails.vector import QdrantStore

store = QdrantStore(
    collection="my-chunks",
    dim=384,
    url="http://localhost:6333",
)
pip install 'trails[vector-qdrant]'

Both stores implement the same interface: add(), search(), delete(), count().

Indexing chunks

After ingesting documents, embed and store their chunks:

from trails.ingest import Chunk

chunks = Chunk.where(document=doc_iri).fetch(ctx)

for chunk in chunks:
    store.add(
        id=chunk.iri,
        vector=embedder.embed(chunk.text),
        metadata={"iri": chunk.iri, "snippet": chunk.text[:200]},
    )

Retrieval modes

retrieve() provides three modes in a single entry point:

from trails.vector import retrieve

# Pure vector similarity
hits = retrieve("drug interactions", mode="vector", k=10,
                vector_store=store, embedder=embedder)

# Pure SPARQL (graph patterns only)
hits = retrieve("drug interactions", mode="graph", k=10, ctx=ctx,
                sparql_filter="SELECT ?iri WHERE { ?iri a <trails://app/Chunk> }")

# Hybrid: SPARQL narrows candidates, vector reranks
hits = retrieve(
    "drug interactions",
    mode="hybrid",
    k=10,
    ctx=ctx,
    vector_store=store,
    embedder=embedder,
    sparql_filter="""
        SELECT ?iri WHERE {
            ?iri a <trails://app/Chunk> ;
                 <trails://app/Chunk/document> ?doc .
            ?doc <trails://app/Document/source> "arxiv:2404.12345" .
        }
    """,
)

for hit in hits:
    print(f"{hit.iri} (score: {hit.score:.3f}): {hit.snippet[:80]}...")

Hybrid mode is the recommended default for production: SPARQL provides precision (only chunks from the right documents), vector provides recall (semantic similarity even when exact terms differ).

Example: loading a CSV dataset and querying it

This end-to-end example loads a CSV file using RML, infers a schema, and queries the result.

from trails.testing import fresh_context
from trails.rml import run_mapping
from trails.onto_infer import infer_schema, generate_code

ctx = fresh_context()

# Step 1: Load the CSV via RML
result = run_mapping(ctx, "mappings/employees.ttl")
print(f"Loaded {result.triples_added} triples")

# Step 2: Query the data with SPARQL
engineers = ctx.kg.query("""
    PREFIX ex: <https://myapp.example/>
    SELECT ?name ?salary WHERE {
        ?e a ex:Employee ;
           ex:department "Engineering" ;
           ex:name ?name ;
           ex:salary ?salary .
    }
    ORDER BY DESC(?salary)
""")
for row in engineers:
    print(f"{row['name']}: ${row['salary']}")

# Step 3: Infer the schema from the loaded data
schema = infer_schema(ctx.kg._store, trace_id="csv-demo")
print(f"\nInferred {len(schema.candidates)} types:")
for candidate in schema.candidates:
    print(f"  {candidate.name} ({candidate.instance_count} instances)")
    for prop in candidate.properties:
        print(f"    {prop.name}: {prop.python_type}")

# Step 4: Generate @node_type code
code = generate_code(schema)
print("\nGenerated code:")
print(code)

After running this, you have typed @node_type declarations generated from your CSV data. Review, edit, and import them into your project. From there, use the ORM (Chapter 3) to query and manipulate the data with Python objects instead of raw SPARQL.

Deep dives


What's next: Chapter 7 -- Federation and Scaling covers how to connect multiple Trails instances, expose SPARQL endpoints, and deploy for production.