Quick Start Guide¶
This guide will help you get started with dftracer utilities quickly.
Python Quick Start¶
Reading Trace Files¶
The most common use case is reading trace files:
from dftracer.utils import TraceReader
# Open a compressed trace file (auto-detects index sidecar)
reader = TraceReader("trace.pfw.gz")
# ...or pass a directory; TraceReader scans for .pfw / .pfw.gz files
# and streams them transparently as a single logical input.
reader = TraceReader("./traces")
# Read all lines
lines = reader.read_lines()
for line in lines:
print(line)
# Read lines as JSON objects
json_objects = reader.read_lines_json()
for obj in json_objects:
print(obj["name"], obj["dur"])
# Stream for memory efficiency
for obj in reader.iter_lines_json():
process(obj)
Streaming with TraceReader¶
TraceReader is the recommended way to read trace files. It auto-selects
sequential or indexed reading and supports streaming iterators:
from dftracer.utils import TraceReader
reader = TraceReader("trace.pfw.gz")
# Stream lines (memory-efficient, uses iterator)
for line in reader.iter_lines():
process(line)
# Stream raw byte chunks
for chunk in reader.iter_raw(multi_line=False):
process(chunk) # one line per chunk as bytes
# Materialize all lines (convenience wrapper)
lines = reader.read_lines()
# With explicit Runtime for thread pool control
from dftracer.utils import Runtime
with Runtime(threads=8) as rt:
reader = TraceReader("trace.pfw.gz", runtime=rt)
for line in reader.iter_lines():
process(line)
# Check progress
print(rt.get_progress())
Async Task Submission¶
Runtime.submit() runs tasks asynchronously and returns a TaskHandle:
from dftracer.utils import Runtime
with Runtime(threads=8, python_threads=4) as rt:
# Submit Python callables -- runs on Python thread pool
h1 = rt.submit(process_file, "trace1.pfw.gz", name="proc-1")
h2 = rt.submit(process_file, "trace2.pfw.gz", name="proc-2")
# Wait for all tasks
rt.wait_all()
# Or get individual results
result = h1.get() # blocks until h1 completes
Task names are auto-derived from the callable when not provided:
rt.submit(my_function) # name = "my_function"
rt.submit(obj.method) # name = "MyClass.method"
rt.submit(lambda: None) # name = "<lambda>"
Composing tasks with dependency chains:
def compose(filename):
h1 = rt.submit(index_file, filename)
result = h1.get() # wait for index
h2 = rt.submit(query_index, result) # use result
return h2.get()
h = rt.submit(compose, "trace.pfw.gz", name="compose")
print(h.get())
Error handling:
# Per-task: .get() re-raises the original exception
try:
h.get()
except ValueError as e:
print(f"Task failed: {e}")
# Batch: wait_all(raise_on_error=True) raises after all complete
rt.wait_all(raise_on_error=True)
# Callback: async notification on failure
rt.set_error_callback(lambda h, e: log.error(f"{h.name}: {e}"))
# Inspect failures
for h in rt.get_failed():
print(f"{h.name}: {h.exception}")
Arrow Data Interchange¶
TraceReader and several utilities support Arrow output for efficient
columnar data access. Arrow batches implement the PyCapsule protocol for
zero-copy interchange with pyarrow, polars, and DuckDB.
from dftracer.utils import TraceReader
reader = TraceReader("trace.pfw.gz")
# Stream Arrow batches
for batch in reader.iter_arrow(batch_size=10000):
df = pyarrow.record_batch(batch).to_pandas()
# Materialize as ArrowTable
table = reader.read_arrow()
df = table.to_pandas()
Query Filtering¶
TraceReader supports a query DSL for filtering events. When an index
exists, chunk pruning skips non-matching chunks automatically.
from dftracer.utils import TraceReader
reader = TraceReader("trace.pfw.gz")
# Filter by category
for line in reader.iter_lines(query='cat == "POSIX"'):
process(line)
# Combine filters
lines = reader.read_lines(query='cat == "POSIX" and dur > 1000')
# Arrow output with query
table = reader.read_arrow(query='name in ["read", "write"]')
df = table.to_pandas()
# Programmatic query building
from dftracer.utils.query import Field
cat = Field("cat")
dur = Field("dur")
q = (cat == "POSIX") & (dur > 1000)
lines = reader.read_lines(query=str(q))
Utility Bindings¶
The dftracer.utils.utilities module provides Python bindings for
DFTracer’s C++ utility classes. Tabular utilities return Arrow;
scalar utilities return dicts.
from dftracer.utils.utilities import (
AggregatorUtility,
StatisticsQueryUtility,
MetadataCollectorUtility,
)
# Aggregation pipeline (returns Arrow)
agg = AggregatorUtility()
table = agg.process("./traces", time_interval_ms=1000.0)
df = table.to_pandas()
# Optional: aggregate extra numeric args fields
table = agg.process(
"./traces",
custom_metric_fields=["bytes"],
compute_percentiles=True,
)
# Statistics query (returns dict)
sq = StatisticsQueryUtility()
stats = sq.process("trace.pfw.gz", query_type="summary")
print(f"Events: {stats['total_events']}")
# File metadata (returns dict)
mc = MetadataCollectorUtility()
meta = mc.process("trace.pfw.gz")
print(f"Size: {meta['size_mb']:.2f} MB")
Using with Dask¶
For distributed processing with dask.distributed:
from dask.distributed import Client
from dftracer.utils.dask import DFTracerUtilsDaskWorkerPlugin
client = Client("scheduler:8786")
client.register_plugin(DFTracerUtilsDaskWorkerPlugin(threads=48))
def count_lines(path):
from dftracer.utils import TraceReader
return sum(1 for _ in TraceReader(path).iter_lines())
futures = client.map(count_lines, file_paths)
results = client.gather(futures)
Working with Indexer¶
Create and use indexes for faster access:
from dftracer.utils import Indexer
# Create an indexer
indexer = Indexer("trace.pfw.gz")
# Build the index if needed
if indexer.need_rebuild():
indexer.build()
# Get index information
print(f"Max bytes: {indexer.get_max_bytes()}")
print(f"Num lines: {indexer.get_num_lines()}")
# Get checkpoints
checkpoints = indexer.get_checkpoints()
for cp in checkpoints:
print(f"Checkpoint {cp.checkpoint_idx}: {cp.num_lines} lines")
C++ Quick Start¶
Building Parallel Pipelines¶
Create and execute parallel data processing tasks using coroutines:
#include <dftracer/utils/core/pipeline/pipeline.h>
#include <dftracer/utils/core/pipeline/pipeline_config.h>
#include <dftracer/utils/core/tasks/task.h>
#include <dftracer/utils/core/coro/channel.h>
#include <iostream>
using namespace dftracer::utils;
using namespace dftracer::utils::coro;
int main() {
// Configure executor
auto config = PipelineConfig()
.with_name("MyPipeline")
.with_compute_threads(4);
// Create channel for data streaming
auto channel = make_channel<std::string>(100);
// Producer task - reads data and sends through channel
auto producer = make_task(
[ch = channel->producer()](CoroScope& scope) mutable
-> CoroTask<void> {
auto guard = ch.guard();
for (int i = 0; i < 100; i++) {
std::string data = "item-" + std::to_string(i);
co_await ch.send(std::move(data));
}
co_return;
}, "Producer");
// Consumer task - reads from channel and processes
auto consumer = make_task([channel](CoroScope& scope) -> CoroTask<void> {
while (auto item = co_await channel->receive()) {
std::cout << "Processing: " << *item << std::endl;
}
co_return;
}, "Consumer");
// Execute pipeline
Pipeline pipeline(config);
pipeline.set_source({producer});
pipeline.set_destination(consumer);
pipeline.execute();
return 0;
}
Spawning Parallel Work¶
Spawn multiple tasks to run in parallel:
#include <dftracer/utils/core/tasks/task.h>
#include <dftracer/utils/core/tasks/coro_scope.h>
auto task = make_task([](CoroScope& scope) -> CoroTask<void> {
std::vector<SpawnFuture<int>> futures;
// Spawn 10 parallel workers
for (int i = 0; i < 10; ++i) {
auto future = scope.spawn([i](CoroScope& s) -> CoroTask<int> {
// Each worker processes item i
int result = compute(i);
co_return result;
});
futures.push_back(std::move(future));
}
// Collect results
int total = 0;
for (auto& fut : futures) {
total += co_await fut;
}
std::cout << "Total: " << total << std::endl;
co_return;
});
Creating a Reader (Legacy)¶
Use the factory pattern to create a reader:
#include <dftracer/utils/reader/reader_factory.h>
#include <dftracer/utils/indexer/indexer_factory.h>
#include <iostream>
#include <memory>
int main() {
// Create indexer first
auto indexer = dftracer::utils::IndexerFactory::create(
"trace.pfw.gz",
"trace.pfw.gz.idx"
);
// Create reader with indexer (transfers ownership)
auto reader = dftracer::utils::ReaderFactory::create(indexer.release());
// Simple: Read lines by line range (returns string with all lines)
std::string lines = reader->read_lines(1, 100); // Lines 1-100
std::cout << lines;
// Advanced: Buffer-based reading for large files
const size_t read_buffer_size = 1024 * 1024; // 1MB buffer
auto buffer = std::make_unique<char[]>(read_buffer_size);
size_t start_bytes = 0;
size_t end_bytes = reader->get_max_bytes();
size_t bytes_written;
// Read in chunks
while (start_bytes < end_bytes &&
(bytes_written = reader->read_line_bytes(
start_bytes, end_bytes,
buffer.get(), read_buffer_size)) > 0) {
// Process the chunk
std::cout.write(buffer.get(), bytes_written);
start_bytes += bytes_written; // Advance for next read
}
return 0;
}
Using Async Generators for Streaming Data¶
Process data lazily without materializing everything in memory:
#include <dftracer/utils/core/tasks/task.h>
#include <dftracer/utils/core/tasks/coro_scope.h>
#include <dftracer/utils/core/coro/async_generator.h>
// Streaming data source (lazy evaluation)
AsyncGenerator<std::string> read_lines_async(
const std::string& file_path) {
// Opens file, yields lines on-demand
// ... streaming implementation ...
}
auto task = make_task([](CoroScope& scope) -> CoroTask<void> {
auto gen = read_lines_async("trace.pfw.gz");
// Iterate asynchronously - each call to next() yields one item
while (auto line = co_await gen.next()) {
// Process line lazily as it's read
parse_and_process(*line);
}
co_return;
});
Reading with Line Processor (Legacy)¶
Use a custom line processor for efficient line-by-line processing:
#include <dftracer/utils/reader/reader_factory.h>
#include <dftracer/utils/reader/line_processor.h>
#include <iostream>
// Custom line processor
class MyLineProcessor : public dftracer::utils::LineProcessor {
public:
void process_line(const char* line, size_t length) override {
// Process each line
std::cout.write(line, length);
}
};
int main() {
auto indexer = dftracer::utils::IndexerFactory::create(
"trace.pfw.gz", "trace.pfw.gz.idx"
);
auto reader = dftracer::utils::ReaderFactory::create(indexer.release());
MyLineProcessor processor;
// Process lines 1-1000 with custom processor
reader->read_lines_with_processor(1, 1000, processor);
return 0;
}
Working with Indexer (Legacy)¶
Use the factory pattern to create an indexer:
#include <dftracer/utils/indexer/indexer_factory.h>
int main() {
// Create an indexer using the factory
auto indexer = dftracer::utils::IndexerFactory::create(
"trace.pfw.gz", // Archive path
"trace.pfw.gz.idx", // Index path
true // Force rebuild
);
// Build the index
indexer->build();
// Get index information
std::cout << "Max bytes: " << indexer->get_max_bytes() << std::endl;
std::cout << "Num lines: " << indexer->get_num_lines() << std::endl;
return 0;
}
C Quick Start¶
Reading Trace Files¶
Using the C API for reading trace files:
#include <dftracer/utils/reader/reader.h>
#include <stdio.h>
#include <stdlib.h>
int main() {
// Create reader
dft_reader_handle_t reader = dft_reader_create(
"trace.pfw.gz",
"trace.pfw.gz.idx",
1048576 // checkpoint_size
);
// Allocate buffer
char *buffer = malloc(1024 * 1024); // 1MB buffer
// Read lines 1-100
int result = dft_reader_read_lines(
reader,
1, 100, // start_line, end_line
buffer,
1024 * 1024 // buffer_size
);
if (result == 0) {
printf("%s", buffer);
}
// Cleanup
free(buffer);
dft_reader_destroy(reader);
return 0;
}
Working with Indexer¶
Creating and using an indexer:
#include <dftracer/utils/indexer/indexer.h>
#include <stdio.h>
int main() {
// Create indexer
dft_indexer_handle_t indexer = dft_indexer_create(
"trace.pfw.gz",
"trace.pfw.gz.idx",
1048576, // checkpoint_size
0 // force_rebuild
);
// Build index if needed
if (dft_indexer_need_rebuild(indexer)) {
printf("Building index...\n");
dft_indexer_build(indexer);
}
// Get index information
size_t max_bytes, num_lines;
dft_indexer_get_max_bytes(indexer, &max_bytes);
dft_indexer_get_num_lines(indexer, &num_lines);
printf("Max bytes: %zu\n", max_bytes);
printf("Num lines: %zu\n", num_lines);
// Cleanup
dft_indexer_destroy(indexer);
return 0;
}
Next Steps¶
Read Pipeline Guide for comprehensive coroutine pipeline guide
Check Python API Reference for detailed Python API documentation
See C++ API Reference for C++ API reference
Visit Utilities for built-in composable components
Read Developer’s Guide for development guidelines
Key Resources:
Pipeline patterns: Pipeline Guide covers CoroScope, channels, fan-out/fan-in, async generators
CLI tools: Command-Line Tools lists all available command-line utilities
Python bindings: Use
from dftracer.utils import TraceReader, Indexerfor Python scriptsC++ integration: Link
dftracer-utilslibrary and include headers frominclude/dftracer/utils/