Provenance in AI: Auto-Capturing Provenance with MLflow and W3C PROV-O in PyTorch Pipelines – Part 4

AI engineers spend a lot of time building, training, and iterating on models. But as pipelines grow more complex, it becomes difficult to answer simple but crucial questions:

  • Which dataset version trained this model?
  • Which parameters were used?
  • Who triggered this training job?
  • Can I reproduce this run six months later?

Without structured provenance tracking, reproducibility and compliance become almost impossible. In regulated domains, this is not optional — it’s mandatory.

In this article, we’ll show how to integrate W3C PROV-O (a standard for provenance modeling) with MLflow (a popular experiment tracking framework) in a PyTorch pipeline. The result: every training run not only logs metrics and artifacts but also generates a machine-readable provenance graph for accountability, auditability, and governance.

🔎 Background: Why PROV-O + MLflow?

  • MLflow is widely used for experiment tracking. It records metrics, parameters, and artifacts like models and logs. However, MLflow’s logs are application-specific and not standardized for knowledge sharing across systems.
  • W3C PROV-O is a semantic ontology (built on RDF/OWL2) that provides a standardized vocabulary for describing provenance: Entities, Activities, and Agents, and their relationships (prov:used, prov:wasGeneratedBy, prov:wasAttributedTo).

By combining the two:

  • MLflow provides the data source of truth for training runs.
  • PROV-O provides the interoperable representation of lineage, useful for audits, governance, and integration into knowledge graphs.

🏗️ Architecture Overview

Our integration maps MLflow concepts to PROV-O concepts:

MLflow ConceptPROV-O EquivalentExample
MLflow Runprov:ActivityTraining job run ID f4a22
MLflow Artifact (model)prov:Entitymodel_v1.pth
Dataset (input)prov:Entitydataset.csv
Metrics (loss, accuracy)prov:Entitymetrics.json
MLflow User/Systemprov:AgentEngineer triggering the run

⚙️ Step 1: Setup

We need a combination of MLflow (for tracking) and rdflib (for provenance graph generation).

pip install mlflow torch rdflib prov

  • mlflow → tracks experiments, models, metrics, and artifacts.
  • torch → used for building the PyTorch model.
  • rdflib → builds and serializes RDF/PROV-O graphs.
  • prov → utilities for working with W3C PROV specifications.

🧑‍💻 Step 2: PyTorch Training with MLflow Logging

We start with a simple PyTorch script that trains a small neural network while logging to MLflow.

import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch

# Fake dataset
X = torch.randn(100, 10)
y = torch.randint(0, 2, (100,))

# Simple NN
model = nn.Sequential(nn.Linear(10, 32), nn.ReLU(), nn.Linear(32, 2))
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

with mlflow.start_run() as run:
    for epoch in range(5):
        optimizer.zero_grad()
        preds = model(X)
        loss = loss_fn(preds, y)
        loss.backward()
        optimizer.step()
        mlflow.log_metric("loss", loss.item(), step=epoch)

    mlflow.log_param("lr", 0.001)
    mlflow.pytorch.log_model(model, "model")

At this point, MLflow is recording metrics (loss), params (lr), and the trained model artifact. But it doesn’t capture semantic provenance — for example, which dataset was used, who ran this job, and how results are connected.

🔗 Step 3: Provenance Tracker for MLflow

Here’s where PROV-O comes in. We build a Provenance Tracker that:

  1. Defines entities (datasets, models, metrics).
  2. Defines activities (the MLflow run).
  3. Defines agents (engineer, system).
  4. Links them using PROV-O relations.
  5. Serializes into Turtle (.ttl) or JSON-LD.
from rdflib import Graph, Namespace, URIRef, Literal
from rdflib.namespace import RDF, FOAF
import mlflow

PROV = Namespace("http://www.w3.org/ns/prov#")
EX = Namespace("http://example.org/")

def log_provenance(run):
    g = Graph()
    g.bind("prov", PROV)
    g.bind("ex", EX)

    # Agent (engineer/system)
    user = EX["engineer"]
    g.add((user, RDF.type, PROV.Agent))
    g.add((user, FOAF.name, Literal("AI Engineer")))

    # Activity (the MLflow run)
    activity = EX[f"run_{run.info.run_id}"]
    g.add((activity, RDF.type, PROV.Activity))

    # Input dataset
    dataset = EX["dataset.csv"]
    g.add((dataset, RDF.type, PROV.Entity))
    g.add((activity, PROV.used, dataset))

    # Model entity
    model = EX[f"model_{run.info.run_id}.pth"]
    g.add((model, RDF.type, PROV.Entity))
    g.add((model, PROV.wasGeneratedBy, activity))
    g.add((model, PROV.wasAttributedTo, user))

    # Metrics entity
    metrics = EX[f"metrics_{run.info.run_id}.json"]
    g.add((metrics, RDF.type, PROV.Entity))
    g.add((metrics, PROV.wasGeneratedBy, activity))
    g.add((metrics, PROV.wasAttributedTo, user))

    # Serialize + store
    prov_file = f"prov_{run.info.run_id}.ttl"
    g.serialize(prov_file, format="turtle")
    mlflow.log_artifact(prov_file, artifact_path="provenance")
    print(f"✅ Provenance logged in {prov_file}")

📦 Step 4: Integrate Tracker

Modify the training script to call log_provenance(run) after training completes.

with mlflow.start_run() as run:
    # Training loop (as above) ...
    mlflow.pytorch.log_model(model, "model")

    # Capture provenance
    log_provenance(run)

Now every MLflow run will automatically create a provenance graph and store it alongside model artifacts.

Final script train-small-nn-pytorch.py:

import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch
from rdflib import Graph, Namespace, URIRef, Literal
from rdflib.namespace import RDF, FOAF
import mlflow

# Fake dataset
X = torch.randn(100, 10)
y = torch.randint(0, 2, (100,))

# Simple NN
model = nn.Sequential(nn.Linear(10, 32), nn.ReLU(), nn.Linear(32, 2))
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Provenance Tracker for MLflow
PROV = Namespace("http://www.w3.org/ns/prov#")
EX = Namespace("http://example.org/")

def log_provenance(run):
    g = Graph()
    g.bind("prov", PROV)
    g.bind("ex", EX)

    # Agent (engineer/system)
    user = EX["engineer"]
    g.add((user, RDF.type, PROV.Agent))
    g.add((user, FOAF.name, Literal("AI Engineer")))

    # Activity (the MLflow run)
    activity = EX[f"run_{run.info.run_id}"]
    g.add((activity, RDF.type, PROV.Activity))

    # Input dataset
    dataset = EX["dataset.csv"]
    g.add((dataset, RDF.type, PROV.Entity))
    g.add((activity, PROV.used, dataset))

    # Model entity
    model = EX[f"model_{run.info.run_id}.pth"]
    g.add((model, RDF.type, PROV.Entity))
    g.add((model, PROV.wasGeneratedBy, activity))
    g.add((model, PROV.wasAttributedTo, user))

    # Metrics entity
    metrics = EX[f"metrics_{run.info.run_id}.json"]
    g.add((metrics, RDF.type, PROV.Entity))
    g.add((metrics, PROV.wasGeneratedBy, activity))
    g.add((metrics, PROV.wasAttributedTo, user))

    # Serialize + store
    prov_file = f"prov_{run.info.run_id}.ttl"
    g.serialize(prov_file, format="turtle")
    mlflow.log_artifact(prov_file, artifact_path="provenance")
    print(f"✅ Provenance logged in {prov_file}")

# MLflow
with mlflow.start_run() as run:
    for epoch in range(5):
        # Training loop
        optimizer.zero_grad()
        preds = model(X)
        loss = loss_fn(preds, y)
        loss.backward()
        optimizer.step()
        mlflow.log_metric("loss", loss.item(), step=epoch)
        
        mlflow.pytorch.log_model(model, "model")
        # Capture provenance
        log_provenance(run)

    mlflow.log_param("lr", 0.001)
    mlflow.pytorch.log_model(model, "model")

📂 Step 5: Example Output

Provenance graph (Turtle format) prov_70d8b46c6451416d92a0ae7cac4c8602.ttl:

@prefix ex: <http://example.org/> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix prov: <http://www.w3.org/ns/prov#> .

ex:metrics_70d8b46c6451416d92a0ae7cac4c8602.json a prov:Entity ;
    prov:wasAttributedTo ex:engineer ;
    prov:wasGeneratedBy ex:run_70d8b46c6451416d92a0ae7cac4c8602 .

ex:model_70d8b46c6451416d92a0ae7cac4c8602.pth a prov:Entity ;
    prov:wasAttributedTo ex:engineer ;
    prov:wasGeneratedBy ex:run_70d8b46c6451416d92a0ae7cac4c8602 .

ex:dataset.csv a prov:Entity .

ex:engineer a prov:Agent ;
    foaf:name "AI Engineer" .

ex:run_70d8b46c6451416d92a0ae7cac4c8602 a prov:Activity ;
    prov:used ex:dataset.csv .

This graph is machine-readable and interoperable with semantic web tools, knowledge graphs, and governance platforms.

🔍 Step 6: Query Provenance

Since PROV-O is RDF-based, we can load graphs into a triple store and query with SPARQL. The following are a few example queries:

1️⃣Which dataset was used to generate a given model?

SELECT ?dataset WHERE {
  ex:model_70d8b46c6451416d92a0ae7cac4c8602.pth prov:wasGeneratedBy ?activity .
  ?activity prov:used ?dataset .
}

This query returns dataset.csv as the dataset that trained model_f4a22.pth.

The SPARQL queries can be run using the following Python script:

import rdflib

# Create a Graph object
g = rdflib.Graph()

# Parse the TTL file into the graph
g.parse("prov_70d8b46c6451416d92a0ae7cac4c8602.ttl", format='turtle')

# Define your SPARQL query
sparql_query = """
SELECT ?dataset WHERE {
  ex:model_70d8b46c6451416d92a0ae7cac4c8602.pth prov:wasGeneratedBy ?activity .
  ?activity prov:used ?dataset .
}
"""

# Execute the query
results = g.query(sparql_query)

# Process the results
for row in results:
	print(row)

2️⃣All models generated by a given engineer

SELECT ?model
WHERE {
  ?model a prov:Entity ;
         prov:wasAttributedTo ex:engineer .
}

👉 Returns all model URIs that were attributed to the engineer ex:engineer.

3️⃣All datasets used in the last month

If your provenance tracker adds prov:generatedAtTime or similar timestamps on entities/activities, you can filter by date. Example:

SELECT ?dataset ?time
WHERE {
  ?activity a prov:Activity ;
            prov:used ?dataset ;
            prov:endedAtTime ?time .
  ?dataset a prov:Entity .
  FILTER (?time >= "2025-07-28T00:00:00Z"^^xsd:dateTime && 
          ?time <= "2025-08-28T23:59:59Z"^^xsd:dateTime)
}

👉 This finds all prov:Entity datasets used by any activity that ended in the last month.

4️⃣Provenance chains across multiple runs (for auditing)

Here we want to trace lineage from dataset → activity → model → metrics.

SELECT ?dataset ?activity ?model ?metrics
WHERE {
  ?dataset a prov:Entity .
  ?activity a prov:Activity ;
            prov:used ?dataset ;
            prov:generated ?model, ?metrics .
  ?model a prov:Entity .
  ?metrics a prov:Entity .
}

👉 This gives a table of full provenance chains, so you can audit multiple runs together.

5️⃣Find all runs that reused the same dataset

Useful for detecting data reuse:

SELECT ?dataset (GROUP_CONCAT(?model; separator=", ") AS ?models)
WHERE {
  ?activity prov:used ?dataset ;
            prov:generated ?model .
}
GROUP BY ?dataset
HAVING (COUNT(?model) > 1)

👉 Returns datasets that were reused in multiple model generations.

⚡ These queries assume you have prov:used, prov:generated, prov:wasAttributedTo, and timestamps (prov:endedAtTime or prov:generatedAtTime) in your TTL logs.

✅ Why This Matters

By extending MLflow with PROV-O, AI engineers gain:

  • Reproducibility → Every model is linked to the exact data and parameters that generated it.
  • Auditability → Regulators and compliance teams can trace how outputs were produced.
  • Transparency → Business stakeholders can understand lineage without relying on tribal knowledge.
  • Interoperability → Since PROV-O is a W3C standard, provenance metadata can be integrated into external governance, data catalog, and knowledge graph systems.

🚀 What We Learnt

We’ve seen how to:

  1. Train a PyTorch model with MLflow.
  2. Capture provenance automatically using PROV-O.
  3. Serialize provenance graphs as RDF/Turtle.
  4. Query lineage with SPARQL.

Leave a Comment

Your email address will not be published. Required fields are marked *