AI/ML Tracing Guide
This guide covers how to use pydftracer’s specialized AI/ML tracing features to profile deep learning and machine learning workflows.
Motivation
Since DFTracer’s release, we’ve successfully traced numerous AI/DL pipelines. However, analysis revealed that the resulting traces differ widely across workloads.
This inconsistency is largely due to varied naming schemes used by different users. Even when the intent is similar, the lack of a standard makes it hard to build analysis tools that work reliably across use cases.
This API introduces consistent annotation conventions to help users instrument their code more uniformly. With these standards in place, tools like DFAnalyzer can operate more effectively — they will just work™, reducing fatigue for researchers and developers analyzing AI/DL workloads.
Overview
The ai module provides decorators and context managers for tracing common AI/ML operations:
Data operations: Loading, preprocessing, and augmentation
Dataloader: Batch fetching and iteration
Device operations: Data transfer to/from GPU
Compute operations: Forward pass, backward pass, optimization steps
Communication: Distributed training operations (all_reduce, etc.)
Checkpointing: Model save/load operations
Pipeline: Training/validation/test loops
Basic Setup
First, enable DFTracer and initialize the logger:
export DFTRACER_ENABLE=1
from dftracer.python import dftracer, ai
import numpy as np
# Initialize logger
df_logger = dftracer.initialize_log("ai_trace.pfw", "/tmp/data", -1)
# Your AI/ML code here
# Finalize when done
df_logger.finalize()
Data Operations
Tracing Data Loading
from dftracer.python import ai
import numpy as np
class IOHandler:
@ai.data.item
def read(self, filename: str):
return np.load(filename)
def write(self, filename: str, data):
with open(filename, "wb") as f:
np.save(f, data)
io = IOHandler()
data = io.read("data.npy") # This read will be traced
Dataloader Integration
from dftracer.python import ai
@ai.dataloader.fetch
def read_batch(data_dir: str, num_files: int):
for i in range(num_files):
yield io.read(f"{data_dir}/{i}.npy")
# Iterate over batches with tracing
for step, data in ai.dataloader.fetch.iter(enumerate(read_batch("/data", 100))):
# Process data
pass
Data Preprocessing
from dftracer.python import ai
@ai.data.preprocess.derive(name="collate")
def collate(data):
# Collate batch data
return data
@ai.data.preprocess.derive(name="augment")
def augment(data):
# Apply data augmentation
return data
# Use in your pipeline
processed_data = collate(raw_data)
augmented_data = augment(processed_data)
Device Operations
Tracing GPU Transfers
from dftracer.python import ai
@ai.device.transfer
def transfer_to_gpu(data):
# Transfer data to GPU
# In real code: return data.cuda()
return data
# Traced transfer
gpu_data = transfer_to_gpu(cpu_data)
Compute Operations
Forward and Backward Passes
from dftracer.python import ai
@ai.compute.forward
def forward(model, data):
return model(data)
@ai.compute.backward
def backward(loss):
loss.backward()
# Use in training loop
output = forward(model, batch)
loss = criterion(output, labels)
backward(loss)
Optimization Steps
from dftracer.python import ai
class Hook:
def before_step(self):
ai.compute.step.start()
def after_step(self):
ai.compute.step.stop()
hook = Hook()
# In training loop
hook.before_step()
# ... forward, backward, optimizer.step()
hook.after_step()
Communication Tracing
Distributed Training
from dftracer.python import ai
@ai.compute.backward
def backward_with_sync():
loss.backward()
# Trace distributed communication
with ai.comm.all_reduce():
# All-reduce gradients
pass
# Can also disable tracing for specific operations
with ai.comm.all_reduce(enable=False):
# This won't be traced
pass
Checkpointing
Model Checkpoints
from dftracer.python import ai
from time import sleep
class Checkpoint:
@ai.checkpoint.init
def __init__(self):
# Initialize checkpoint system
pass
@ai.checkpoint.capture
def save(self, state):
# Save model checkpoint
return state
@ai.checkpoint.restart
def load(self, checkpoint_path):
# Load model checkpoint
return {}
checkpoint = Checkpoint()
checkpoint.load("checkpoint.pt")
# ... training ...
checkpoint.save({"model": model.state_dict()})
Training Pipeline
Complete Training Loop
from dftracer.python import dftracer, ai
import numpy as np
# Initialize logger
df_logger = dftracer.initialize_log("training.pfw", "/tmp/data", -1)
@ai.pipeline.train
def train(num_epochs, num_batches):
# Training loop with epoch tracing
for epoch in ai.pipeline.epoch.iter(range(num_epochs)):
for step, data in ai.dataloader.fetch.iter(range(num_batches)):
# Update current step and epoch
ai.update(step=step, epoch=epoch)
# Data loading
batch = load_batch(step)
# Transfer to device
batch = transfer(batch)
# Forward pass
output = forward(model, batch)
# Backward pass
backward(loss)
train(num_epochs=5, num_batches=100)
df_logger.finalize()
Advanced Features
Custom Categories
You can create custom AI tracers with specific categories:
from dftracer.python import DFTracerAI
# Create custom AI tracer
custom_tracer = DFTracerAI(
cat="custom_category",
name="my_operation",
epoch=1,
step=100,
enable=True
)
Disabling Specific Categories
You can selectively disable tracing for specific AI categories programmatically:
from dftracer.python import ai
# Disable all AI tracing
ai.disable()
# Or disable specific categories
ai.dataloader.disable()
ai.device.disable()
ai.compute.disable()
ai.comm.disable()
ai.checkpoint.disable()
AI/DL Logging Conventions
We define six main categories of logging. Each category, along with its subcategories (children),
is implemented as a wrapper around dft_fn. This means you can use these categories in your
codebase the same way you would use dft_fn directly.
Category |
Name |
Access Path |
Description |
|---|---|---|---|
Compute |
Forward |
|
Forward pass of the network |
Backward |
|
Backward pass / gradient computation |
|
Step |
|
Optimizer step (parameter update) |
|
Data |
Preprocess |
|
Dataset-level preprocessing |
Item |
|
Per-item transformation or loading |
|
DataLoader |
Fetch |
|
Fetch a batch from DataLoader |
Comm |
Send |
|
Point-to-point send |
Receive |
|
Point-to-point receive |
|
Barrier |
|
Synchronization barrier |
|
Broadcast |
|
Broadcast (one-to-many) |
|
Reduce |
|
Reduce (many-to-one) |
|
All-Reduce |
|
All-reduce (many-to-many) |
|
Gather |
|
Gather (many-to-one) |
|
All-Gather |
|
All-gather (many-to-many) |
|
Scatter |
|
Scatter (one-to-many) |
|
Reduce-Scatter |
|
Reduce-scatter (many-to-many) |
|
All-to-All |
|
All-to-all (many-to-many) |
|
Device |
Transfer |
|
Host-to-device or device-to-host memory transfer |
Checkpoint |
Capture |
|
Capture a model checkpoint |
Restart |
|
Restart from a model checkpoint |
|
Pipeline |
Epoch |
|
An entire training or evaluation epoch |
Train |
|
Training phase |
|
Evaluate |
|
Evaluation or validation phase |
|
Test |
|
Testing or inference phase |
Flexible API Styles
DFTracer AI Logging provides flexible APIs to match different coding styles. You can use decorators, context managers, or iterable wrappers.
Decorator Style
Without arguments — use it directly to wrap a function:
@ai.compute.forward
def forward(model, x):
loss = model(x)
return loss
With arguments — pass metadata to the event:
@ai.compute.forward(args={"arg1": "value1", "arg2": "value2"})
def forward(model, x):
loss = model(x)
return loss
Context Manager Style
Use it to wrap blocks of code inside a with statement:
Without arguments:
with ai.compute.forward:
loss = model(x)
With arguments:
with ai.compute.forward(args={"arg1": "value1", "arg2": "value2"}):
loss = model(x)
Iterable Style
You can also wrap iterators like data loaders:
for batch in ai.dataloader.fetch.iter(dataloader):
# Process batch
pass
Constructor Hooking
You can annotate constructors directly using category-specific hooks:
class MyDataset:
@ai.data.item.init # special `init` event for this category
def __init__(self, ...):
# Initialization logic
pass
Updating Arguments
Every profiler (like ai.compute.forward) provides an update method to
dynamically change metadata. These updates apply to the entire subtree of that event.
@ai.compute.forward
def forward(model, x):
loss = model(x)
return loss
for epoch in ai.pipeline.epoch.iter(range(num_epochs)):
for step, batch in ai.dataloader.fetch.iter(enumerate(dataloader)):
# Update metadata for the current context
ai.compute.forward.update(epoch=epoch, step=step)
forward(model, batch)
Force Enable or Disable Specific Events
You can override the global or category-level logging state for individual events
by setting the enable flag explicitly.
ai.compute.disable() # Disable all compute events
@ai.compute.forward(enable=True) # Force-enable this specific event
def forward(model, x):
loss = model(x)
return loss
with ai.compute.backward(enable=True): # Force-enable this block
loss.backward()
ai.compute.enable() # Enable all compute events
@ai.compute.forward(enable=False) # Force-disable this one
def forward(model, x):
loss = model(x)
return loss
Hook/Checkpoint Style
For scenarios where you can’t use decorators or context managers directly (e.g., TensorFlow SessionHook), you can manually call profiler methods:
class DFTracerProfilingHook(tf.train.SessionRunHook):
def begin(self):
self._global_step_tensor = training_util._get_or_create_global_step_read()
if self._global_step_tensor is None:
raise RuntimeError("Global step should be created.")
ai.pipeline.epoch.start()
def end(self, session):
ai.pipeline.epoch.stop()
def before_run(self, run_context):
global_step = run_context.session.run(self._global_step_tensor)
ai.update(step=global_step)
ai.compute.start()
def after_run(self, run_context, run_values):
ai.compute.stop()
Derivation
You can derive new profilers from existing ones for more dynamic logging. The derived profiler becomes a child of the original profiler, inheriting its context.
class Dataset:
def __getitem__(self, idx: int):
data = ...
with ai.data.preprocess:
# Process data
pass
return data
# This becomes name="preprocess.collate" with cat="data"
@ai.data.preprocess.derive(name="collate")
def collate(batch):
return batch
# Or (context-manager style)
profiler_collate = ai.data.preprocess.derive(name="collate")
def collate_fn(batch):
with profiler_collate:
return collate(batch)
# Update derived profiler
profiler_collate.update(epoch=epoch)
# This also updates all children of ai.data.preprocess
ai.data.preprocess.update(epoch=epoch)
Metadata / Streaming Style
By default, DFTracer logs events with a start and end time (duration-based).
For real-time monitoring, use metadata=True to log events immediately:
# Regular mode
for epoch in ai.pipeline.epoch.iter(range(num_epochs)):
for step in range(num_steps):
# Do work
pass
# Metadata mode
for epoch in range(num_epochs):
ai.pipeline.epoch.start(metadata=True)
for step in range(num_steps):
# Do work
pass
ai.pipeline.epoch.stop(metadata=True)
Regular mode output:
{"id":27,"name":"epoch.block","cat":"pipeline","pid":2877353,"tid":2877353,
"ts":1753123213646764,"dur":828765,"ph":"X",
"args":{"hhash":"2a702c695247d487","p_idx":6,"count":"1","level":2}}
Metadata mode output:
{"id":6,"name":"CM","cat":"dftracer","pid":2876815,"tid":2876815,"ph":"M",
"args":{"hhash":"2a702c695247d487","name":"epoch.end","value":"1753123070219202"}}
{"id":6,"name":"CM","cat":"dftracer","pid":2876815,"tid":2876815,"ph":"M",
"args":{"hhash":"2a702c695247d487","name":"epoch.start","value":"1753123070219648"}}
Init Events
Log initialization phases using the init method:
class Checkpoint:
@ai.checkpoint.init
def __init__(self):
# Initialize something
pass
# Or
with ai.checkpoint.init:
# Initialize something
pass
Output:
{"id":7,"name":"checkpoint.init","cat":"checkpoint","pid":444541,"tid":444541,
"ts":1753136835509693,"dur":100583,"ph":"X",
"args":{"hhash":"2a702c695247d487","p_idx":6,"level":2}}
Caveats
Call Ordering Matters
The order of calls affects whether events get logged.
This works:
class Checkpoint:
@ai.checkpoint.init # Instance tracked internally
def __init__(self):
pass
if __name__ == "__main__":
ai.checkpoint.disable() # Disables all checkpoint events
This doesn’t work as expected:
class Checkpoint:
@ai.checkpoint.init() # Parentheses create instance immediately
def __init__(self):
pass
if __name__ == "__main__":
ai.checkpoint.disable() # Can't affect already-created instance
Solutions:
Use the decorator without parentheses, or call
disable()before defining your classOnly use parentheses
()when you need to force enable/disable a specific eventTo add metadata, use the
update()method insteadTo create variations of an event, use the
derive()method instead
Summary
The AI/ML tracing features in pydftracer provide:
Structured tracing for common ML operations
Hierarchical tracking of training loops
Minimal overhead with automatic profiling
Flexible decorator-based API
Multiple usage patterns (decorators, context managers, iterables)
Dynamic configuration (enable/disable, metadata updates)
Integration with existing ML code
For complete API reference, see AI/ML API.