Quick Start Guide

This guide will help you get started with dftracer utilities quickly.

Python Quick Start

Reading Trace Files

The most common use case is reading trace files:

from dftracer.utils import TraceReader

# Open a compressed trace file (auto-detects index sidecar)
reader = TraceReader("trace.pfw.gz")

# ...or pass a directory; TraceReader scans for .pfw / .pfw.gz files
# and streams them transparently as a single logical input.
reader = TraceReader("./traces")

# Read all lines
lines = reader.read_lines()
for line in lines:
    print(line)

# Read lines as JSON objects
json_objects = reader.read_lines_json()
for obj in json_objects:
    print(obj["name"], obj["dur"])

# Stream for memory efficiency
for obj in reader.iter_lines_json():
    process(obj)

Streaming with TraceReader

TraceReader is the recommended way to read trace files. It auto-selects sequential or indexed reading and supports streaming iterators:

from dftracer.utils import TraceReader

reader = TraceReader("trace.pfw.gz")

# Stream lines (memory-efficient, uses iterator)
for line in reader.iter_lines():
    process(line)

# Stream raw byte chunks
for chunk in reader.iter_raw(multi_line=False):
    process(chunk)  # one line per chunk as bytes

# Materialize all lines (convenience wrapper)
lines = reader.read_lines()

# With explicit Runtime for thread pool control
from dftracer.utils import Runtime

with Runtime(threads=8) as rt:
    reader = TraceReader("trace.pfw.gz", runtime=rt)
    for line in reader.iter_lines():
        process(line)

    # Check progress
    print(rt.get_progress())

Async Task Submission

Runtime.submit() runs tasks asynchronously and returns a TaskHandle:

from dftracer.utils import Runtime

with Runtime(threads=8, python_threads=4) as rt:
    # Submit Python callables -- runs on Python thread pool
    h1 = rt.submit(process_file, "trace1.pfw.gz", name="proc-1")
    h2 = rt.submit(process_file, "trace2.pfw.gz", name="proc-2")

    # Wait for all tasks
    rt.wait_all()

    # Or get individual results
    result = h1.get()  # blocks until h1 completes

Task names are auto-derived from the callable when not provided:

rt.submit(my_function)           # name = "my_function"
rt.submit(obj.method)            # name = "MyClass.method"
rt.submit(lambda: None)          # name = "<lambda>"

Composing tasks with dependency chains:

def compose(filename):
    h1 = rt.submit(index_file, filename)
    result = h1.get()                       # wait for index
    h2 = rt.submit(query_index, result)     # use result
    return h2.get()

h = rt.submit(compose, "trace.pfw.gz", name="compose")
print(h.get())

Error handling:

# Per-task: .get() re-raises the original exception
try:
    h.get()
except ValueError as e:
    print(f"Task failed: {e}")

# Batch: wait_all(raise_on_error=True) raises after all complete
rt.wait_all(raise_on_error=True)

# Callback: async notification on failure
rt.set_error_callback(lambda h, e: log.error(f"{h.name}: {e}"))

# Inspect failures
for h in rt.get_failed():
    print(f"{h.name}: {h.exception}")

Arrow Data Interchange

TraceReader and several utilities support Arrow output for efficient columnar data access. Arrow batches implement the PyCapsule protocol for zero-copy interchange with pyarrow, polars, and DuckDB.

from dftracer.utils import TraceReader

reader = TraceReader("trace.pfw.gz")

# Stream Arrow batches
for batch in reader.iter_arrow(batch_size=10000):
    df = pyarrow.record_batch(batch).to_pandas()

# Materialize as ArrowTable
table = reader.read_arrow()
df = table.to_pandas()

Query Filtering

TraceReader supports a query DSL for filtering events. When an index exists, chunk pruning skips non-matching chunks automatically.

from dftracer.utils import TraceReader

reader = TraceReader("trace.pfw.gz")

# Filter by category
for line in reader.iter_lines(query='cat == "POSIX"'):
    process(line)

# Combine filters
lines = reader.read_lines(query='cat == "POSIX" and dur > 1000')

# Arrow output with query
table = reader.read_arrow(query='name in ["read", "write"]')
df = table.to_pandas()

# Programmatic query building
from dftracer.utils.query import Field

cat = Field("cat")
dur = Field("dur")
q = (cat == "POSIX") & (dur > 1000)
lines = reader.read_lines(query=str(q))

Utility Bindings

The dftracer.utils.utilities module provides Python bindings for DFTracer’s C++ utility classes. Tabular utilities return Arrow; scalar utilities return dicts.

from dftracer.utils.utilities import (
    AggregatorUtility,
    StatisticsQueryUtility,
    MetadataCollectorUtility,
)

# Aggregation pipeline (returns Arrow)
agg = AggregatorUtility()
table = agg.process("./traces", time_interval_ms=1000.0)
df = table.to_pandas()

# Optional: aggregate extra numeric args fields
table = agg.process(
    "./traces",
    custom_metric_fields=["bytes"],
    compute_percentiles=True,
)

# Statistics query (returns dict)
sq = StatisticsQueryUtility()
stats = sq.process("trace.pfw.gz", query_type="summary")
print(f"Events: {stats['total_events']}")

# File metadata (returns dict)
mc = MetadataCollectorUtility()
meta = mc.process("trace.pfw.gz")
print(f"Size: {meta['size_mb']:.2f} MB")

Using with Dask

For distributed processing with dask.distributed:

from dask.distributed import Client
from dftracer.utils.dask import DFTracerUtilsDaskWorkerPlugin

client = Client("scheduler:8786")
client.register_plugin(DFTracerUtilsDaskWorkerPlugin(threads=48))

def count_lines(path):
    from dftracer.utils import TraceReader
    return sum(1 for _ in TraceReader(path).iter_lines())

futures = client.map(count_lines, file_paths)
results = client.gather(futures)

Working with Indexer

Create and use indexes for faster access:

from dftracer.utils import Indexer

# Create an indexer
indexer = Indexer("trace.pfw.gz")

# Build the index if needed
if indexer.need_rebuild():
    indexer.build()

# Get index information
print(f"Max bytes: {indexer.get_max_bytes()}")
print(f"Num lines: {indexer.get_num_lines()}")

# Get checkpoints
checkpoints = indexer.get_checkpoints()
for cp in checkpoints:
    print(f"Checkpoint {cp.checkpoint_idx}: {cp.num_lines} lines")

C++ Quick Start

Building Parallel Pipelines

Create and execute parallel data processing tasks using coroutines:

#include <dftracer/utils/core/pipeline/pipeline.h>
#include <dftracer/utils/core/pipeline/pipeline_config.h>
#include <dftracer/utils/core/tasks/task.h>
#include <dftracer/utils/core/coro/channel.h>
#include <iostream>

using namespace dftracer::utils;
using namespace dftracer::utils::coro;

int main() {
    // Configure executor
    auto config = PipelineConfig()
        .with_name("MyPipeline")
        .with_compute_threads(4);

    // Create channel for data streaming
    auto channel = make_channel<std::string>(100);

    // Producer task - reads data and sends through channel
    auto producer = make_task(
        [ch = channel->producer()](CoroScope& scope) mutable
            -> CoroTask<void> {
            auto guard = ch.guard();

            for (int i = 0; i < 100; i++) {
                std::string data = "item-" + std::to_string(i);
                co_await ch.send(std::move(data));
            }
            co_return;
        }, "Producer");

    // Consumer task - reads from channel and processes
    auto consumer = make_task([channel](CoroScope& scope) -> CoroTask<void> {
        while (auto item = co_await channel->receive()) {
            std::cout << "Processing: " << *item << std::endl;
        }
        co_return;
    }, "Consumer");

    // Execute pipeline
    Pipeline pipeline(config);
    pipeline.set_source({producer});
    pipeline.set_destination(consumer);
    pipeline.execute();

    return 0;
}

Spawning Parallel Work

Spawn multiple tasks to run in parallel:

#include <dftracer/utils/core/tasks/task.h>
#include <dftracer/utils/core/tasks/coro_scope.h>

auto task = make_task([](CoroScope& scope) -> CoroTask<void> {
    std::vector<SpawnFuture<int>> futures;

    // Spawn 10 parallel workers
    for (int i = 0; i < 10; ++i) {
        auto future = scope.spawn([i](CoroScope& s) -> CoroTask<int> {
            // Each worker processes item i
            int result = compute(i);
            co_return result;
        });
        futures.push_back(std::move(future));
    }

    // Collect results
    int total = 0;
    for (auto& fut : futures) {
        total += co_await fut;
    }

    std::cout << "Total: " << total << std::endl;
    co_return;
});

Creating a Reader (Legacy)

Use the factory pattern to create a reader:

#include <dftracer/utils/reader/reader_factory.h>
#include <dftracer/utils/indexer/indexer_factory.h>
#include <iostream>
#include <memory>

int main() {
    // Create indexer first
    auto indexer = dftracer::utils::IndexerFactory::create(
        "trace.pfw.gz",
        "trace.pfw.gz.idx"
    );

    // Create reader with indexer (transfers ownership)
    auto reader = dftracer::utils::ReaderFactory::create(indexer.release());

    // Simple: Read lines by line range (returns string with all lines)
    std::string lines = reader->read_lines(1, 100);  // Lines 1-100
    std::cout << lines;

    // Advanced: Buffer-based reading for large files
    const size_t read_buffer_size = 1024 * 1024;  // 1MB buffer
    auto buffer = std::make_unique<char[]>(read_buffer_size);

    size_t start_bytes = 0;
    size_t end_bytes = reader->get_max_bytes();
    size_t bytes_written;

    // Read in chunks
    while (start_bytes < end_bytes &&
           (bytes_written = reader->read_line_bytes(
                start_bytes, end_bytes,
                buffer.get(), read_buffer_size)) > 0) {
        // Process the chunk
        std::cout.write(buffer.get(), bytes_written);
        start_bytes += bytes_written;  // Advance for next read
    }

    return 0;
}

Using Async Generators for Streaming Data

Process data lazily without materializing everything in memory:

#include <dftracer/utils/core/tasks/task.h>
#include <dftracer/utils/core/tasks/coro_scope.h>
#include <dftracer/utils/core/coro/async_generator.h>

// Streaming data source (lazy evaluation)
AsyncGenerator<std::string> read_lines_async(
    const std::string& file_path) {
    // Opens file, yields lines on-demand
    // ... streaming implementation ...
}

auto task = make_task([](CoroScope& scope) -> CoroTask<void> {
    auto gen = read_lines_async("trace.pfw.gz");

    // Iterate asynchronously - each call to next() yields one item
    while (auto line = co_await gen.next()) {
        // Process line lazily as it's read
        parse_and_process(*line);
    }

    co_return;
});

Reading with Line Processor (Legacy)

Use a custom line processor for efficient line-by-line processing:

#include <dftracer/utils/reader/reader_factory.h>
#include <dftracer/utils/reader/line_processor.h>
#include <iostream>

// Custom line processor
class MyLineProcessor : public dftracer::utils::LineProcessor {
public:
    void process_line(const char* line, size_t length) override {
        // Process each line
        std::cout.write(line, length);
    }
};

int main() {
    auto indexer = dftracer::utils::IndexerFactory::create(
        "trace.pfw.gz", "trace.pfw.gz.idx"
    );
    auto reader = dftracer::utils::ReaderFactory::create(indexer.release());

    MyLineProcessor processor;

    // Process lines 1-1000 with custom processor
    reader->read_lines_with_processor(1, 1000, processor);

    return 0;
}

Working with Indexer (Legacy)

Use the factory pattern to create an indexer:

#include <dftracer/utils/indexer/indexer_factory.h>

int main() {
    // Create an indexer using the factory
    auto indexer = dftracer::utils::IndexerFactory::create(
        "trace.pfw.gz",           // Archive path
        "trace.pfw.gz.idx",       // Index path
        true                       // Force rebuild
    );

    // Build the index
    indexer->build();

    // Get index information
    std::cout << "Max bytes: " << indexer->get_max_bytes() << std::endl;
    std::cout << "Num lines: " << indexer->get_num_lines() << std::endl;

    return 0;
}

C Quick Start

Reading Trace Files

Using the C API for reading trace files:

#include <dftracer/utils/reader/reader.h>
#include <stdio.h>
#include <stdlib.h>

int main() {
    // Create reader
    dft_reader_handle_t reader = dft_reader_create(
        "trace.pfw.gz",
        "trace.pfw.gz.idx",
        1048576  // checkpoint_size
    );

    // Allocate buffer
    char *buffer = malloc(1024 * 1024);  // 1MB buffer

    // Read lines 1-100
    int result = dft_reader_read_lines(
        reader,
        1, 100,              // start_line, end_line
        buffer,
        1024 * 1024          // buffer_size
    );

    if (result == 0) {
        printf("%s", buffer);
    }

    // Cleanup
    free(buffer);
    dft_reader_destroy(reader);

    return 0;
}

Working with Indexer

Creating and using an indexer:

#include <dftracer/utils/indexer/indexer.h>
#include <stdio.h>

int main() {
    // Create indexer
    dft_indexer_handle_t indexer = dft_indexer_create(
        "trace.pfw.gz",
        "trace.pfw.gz.idx",
        1048576,  // checkpoint_size
        0         // force_rebuild
    );

    // Build index if needed
    if (dft_indexer_need_rebuild(indexer)) {
        printf("Building index...\n");
        dft_indexer_build(indexer);
    }

    // Get index information
    size_t max_bytes, num_lines;
    dft_indexer_get_max_bytes(indexer, &max_bytes);
    dft_indexer_get_num_lines(indexer, &num_lines);

    printf("Max bytes: %zu\n", max_bytes);
    printf("Num lines: %zu\n", num_lines);

    // Cleanup
    dft_indexer_destroy(indexer);

    return 0;
}

Next Steps

Key Resources:

  • Pipeline patterns: Pipeline Guide covers CoroScope, channels, fan-out/fan-in, async generators

  • CLI tools: Command-Line Tools lists all available command-line utilities

  • Python bindings: Use from dftracer.utils import TraceReader, Indexer for Python scripts

  • C++ integration: Link dftracer-utils library and include headers from include/dftracer/utils/