DFTracer Aggregation Pipeline

See also

For complete class and member documentation, see the API Reference.

Getting Started

Minimal example using the high-level AggregatorUtility:

#include <dftracer/utils/utilities/composites/dft/aggregators/aggregator_utility.h>

AggregatorUtility util;
AggregatorInput input;
input.directory = "./traces";
input.config.time_interval_us = 5000000;  // 5-second buckets
input.config.compute_percentiles = true;

auto gen = util.process(input);
while (auto batch = co_await gen.next()) {
    auto arrow = batch->to_arrow();  // 18-column Arrow batch
    // process arrow data...
}

Event aggregation pipeline for computing statistics over DFTracer trace files. All classes are in the dftracer::utils::utilities::composites::dft::aggregators namespace.

The aggregation pipeline processes trace files in parallel chunks, computes per-key metrics (duration, size, custom fields), and merges results into a unified output. It supports time bucketing, process hierarchy tracking, boundary event association, and Perfetto trace output.

        graph LR
    subgraph Input
        Files["Trace Files<br/>(.pfw.gz)"]
    end

    subgraph Mapping["Chunk Mapping"]
        CM["ChunkMapperUtility"]
    end

    subgraph Parallel["Parallel Aggregation"]
        CA1["ChunkAggregatorUtility"]
        CA2["ChunkAggregatorUtility"]
        CAN["ChunkAggregatorUtility"]
    end

    subgraph Merge["Merge & Resolve"]
        EA["EventAggregator"]
        AR["AssociationResolverUtility"]
    end

    subgraph Output
        Summary["AggregatorSummaryUtility"]
        Perfetto["PerfettoTraceWriterUtility"]
    end

    Files --> CM
    CM --> CA1
    CM --> CA2
    CM --> CAN
    CA1 --> EA
    CA2 --> EA
    CAN --> EA
    EA --> AR
    AR --> Summary
    AR --> Perfetto
    

Configuration

AggregationConfig

Main configuration for the aggregation pipeline.

Controls time bucketing, event filtering, statistical computation, boundary event tracking, and output format.

AggregationConfig config;
config.time_interval_us = 1000000;  // 1-second buckets
config.use_relative_time = true;
config.compute_statistics = true;
config.compute_percentiles = true;
config.percentiles = {0.25, 0.5, 0.75, 0.90, 0.99};

// Filter events
config.include_categories = {"POSIX", "STDIO"};
config.exclude_names = {"metadata"};

// Track boundary events (e.g., epoch boundaries)
config.boundary_events.push_back({
    .event_name = "epoch_start",
    .value_field = "epoch_id",
    .output_name = "epoch"
});

Grouping Keys

AggregationKey

Composite key for grouping events during aggregation.

Events are grouped by category, name, process/thread IDs, host/function hashes, time bucket, and any extra grouping dimensions specified in the config.

String fields (cat, name, hhash, fhash) are stored as interned uint32_t IDs via a global StringIntern table (see Core Infrastructure), reducing memory usage and enabling faster hashing. Accessor methods (.cat(), .name(), etc.) resolve IDs back to string_view.

Extra key-value pairs use a lazily-allocated unique_ptr<vector<pair<uint32_t, uint32_t>>> to avoid heap allocation for the common case of no extra keys.

AggregationMap

Type alias for the map from aggregation keys to metrics:

using AggregationMap =
    std::unordered_map<AggregationKey, AggregationMetrics,
                       AggregationKeyHash, AggregationKeyEqual>;

Metrics

AggregationMetrics

Per-key aggregated metrics using Welford’s online algorithm for numerically stable variance computation and DDSketch for percentile estimation.

Supports incremental updates and merging across chunks.

MetricStats

Single-metric statistics using Welford’s online algorithm.

Tracks count, min, max, mean, variance (M2), skewness (M3), kurtosis (M4), and a DDSketch for percentile estimation. All operations are O(1) per update.

The DDSketch uses a collapsing dense store with 128 fixed bins (uint16_t counters), giving ~256 bytes per sketch. When the bin range exceeds MAX_BINS, the oldest bins are collapsed into bin[0].

Pipeline Stages

ChunkMapperUtility

Maps trace files to parallel chunk work items.

Takes file metadata (from MetadataCollectorUtility) and splits each file into chunks based on checkpoint boundaries. Each chunk becomes a ChunkAggregatorInput for parallel processing.

ChunkAggregatorUtility

Per-chunk event aggregation (parallelizable).

Reads events from a byte range within a trace file, applies filters, computes aggregation keys, and accumulates metrics. Uses bloom filter predicates for early chunk skipping when available.

Tagged Parallelizable — multiple instances run concurrently across chunks.

EventAggregator

Unified event aggregator (formerly EventAggregatorUtility and the internal RocksDbAggregator, now merged into one class). Holds a RocksDatabase handle and merges per-chunk aggregation results into a unified output, deduplicating file counts and collecting association trackers for downstream resolution.

AggregationVisitor

DftEventVisitor subclass that accumulates AggregationMetrics per AggregationKey directly from parsed events during a scan, so the aggregation pass can share a single parse with bloom and manifest visitors via DftEventDispatcher. Defined in dftracer/utils/utilities/composites/dft/aggregators/aggregation_visitor.h.

DftEventDispatcher

Fan-out adapter that implements the IndexVisitor interface, parses each line once, and dispatches the parsed DftEvent to a list of registered DftEventVisitor instances (BloomVisitor, ManifestVisitor, AggregationVisitor, …). This collapses multiple visitor passes into a single read of the input. Defined in dftracer/utils/utilities/composites/dft/dft_event_dispatcher.h.

Association Tracking

AssociationTracker

Tracks process hierarchy (parent-child PIDs) and boundary event intervals during chunk processing. Each chunk gets its own tracker, and trackers are merged during the resolution phase.

Process hierarchy: Extracts parent PID from metadata events to build a process tree. Used to annotate aggregated events with their root process.

Boundary events: Tracks named intervals (e.g., training epochs) by matching start/end events. Aggregated events are associated with the boundary interval that contains their timestamp.

AssociationResolverUtility

Resolves process hierarchy and boundary associations across all chunks.

Merges all per-chunk AssociationTracker instances, resolves parent PIDs to root processes, computes trace-wide metadata (duration, boundary ranges), and annotates aggregated events with their associations.

High-Level Aggregator

AggregatorUtility

High-level StreamingUtility that orchestrates the full aggregation pipeline: directory scan, index building, metadata collection, chunk mapping, parallel aggregation, merge, and association resolution.

Yields AggregationBatch objects that can be converted to Arrow via to_arrow().

AggregatorUtility util;
AggregatorInput input;
input.directory = "./traces";
input.config.time_interval_us = 1000000;

auto gen = util.process(input);
while (auto batch = co_await gen.next()) {
    auto arrow = batch->to_arrow();  // 18-column Arrow batch
    // write to IPC file, send to Python, etc.
}

Output Utilities

AggregatorSummaryUtility

Outputs a human-readable summary of aggregation results to stdout.

PerfettoTraceWriterUtility

Writes aggregated results in Perfetto trace format for visualization in the Perfetto UI (https://ui.perfetto.dev).

Supports three event formats:

  • COUNTER — Counter track events (default, best for time-series metrics)

  • ASYNC — Async slice events (shows duration spans)

  • REGULAR — Regular slice events