Composites

High-level utilities that combine multiple operations into complete workflows.

#include <dftracer/utils/utilities/composites/composites.h>

using namespace dftracer::utils::utilities::composites;

BatchProcessorUtility

Processes multiple items in parallel with optional result sorting.

template <typename Input, typename Output>
class BatchProcessorUtility;

Example:

// Create workflow utility
auto workflow = std::make_shared<MyItemProcessor>();

// Wrap in batch processor
auto batch = std::make_shared<BatchProcessorUtility<ItemInput, ItemOutput>>(workflow);

// Optional: sort results by index
batch->with_comparator([](const ItemOutput& a, const ItemOutput& b) {
    return a.index < b.index;
});

// Process items
std::vector<ItemInput> inputs = /* ... */;
std::vector<ItemOutput> outputs = batch->process(inputs);

DirectoryFileProcessorUtility

Scans directory and processes each matching file in parallel.

template <typename FileOutput>
class DirectoryFileProcessorUtility;

Input:

struct DirectoryProcessInput {
    fs::path directory;
    std::vector<std::string> extensions;  // e.g., {".pfw", ".gz"}
    bool recursive = false;

    static DirectoryProcessInput from_directory(const fs::path& dir);
    DirectoryProcessInput& with_extensions(std::vector<std::string> exts);
    DirectoryProcessInput& with_recursive(bool value);
};

Output:

template <typename T>
struct BatchFileProcessOutput {
    std::vector<T> results;
    std::size_t total_files;
    std::size_t successful_files;
};

LineBatchProcessorUtility

Streaming line processing with optional filtering.

template <typename LineOutput>
class LineBatchProcessorUtility;

Processes lines lazily and returns vector of results. Line processor returns std::optional<LineOutput> to filter lines.

Example:

auto processor = std::make_shared<LineBatchProcessorUtility<ParsedEvent>>(
    [](const Line& line) -> std::optional<ParsedEvent> {
        if (line.content.empty()) return std::nullopt;
        return parse_event(line);
    }
);

LineReadInput input{
    "/path/to/file.pfw.gz",
    "/path/to/file.pfw.gz.idx",
    0,     // start_line
    1000   // end_line
};

std::vector<ParsedEvent> events = processor->process(input);

FileCompressorUtility

Compresses files with streaming.

Input:

struct FileCompressionUtilityInput {
    fs::path input_path;
    fs::path output_path;
    int compression_level = 6;

    static FileCompressionUtilityInput from_file(const fs::path& path);
    FileCompressionUtilityInput& with_output(const fs::path& path);
    FileCompressionUtilityInput& with_level(int level);
};

Output:

struct FileCompressionUtilityOutput {
    bool success;
    std::size_t original_size;
    std::size_t compressed_size;
    double compression_ratio;
};

FileDecompressorUtility

Decompresses files with streaming.

Input:

struct FileDecompressionUtilityInput {
    fs::path input_path;
    fs::path output_path;

    static FileDecompressionUtilityInput from_file(const fs::path& path);
    FileDecompressionUtilityInput& with_output(const fs::path& path);
};

IndexedFileReaderUtility

Creates readers for indexed compressed files, handling index creation.

struct IndexedReadInput {
    fs::path file_path;
    fs::path index_path;
    std::size_t checkpoint_size = DEFAULT_CHECKPOINT_SIZE;
    bool force_rebuild = false;

    static IndexedReadInput from_file(const fs::path& path);
    IndexedReadInput& with_index(const fs::path& path);
    IndexedReadInput& with_checkpoint_size(std::size_t size);
    IndexedReadInput& with_force_rebuild(bool value);
};

Example:

IndexedFileReaderUtility utility;

auto input = IndexedReadInput::from_file("/data/trace.pfw.gz")
    .with_index("/data/trace.pfw.gz.idx")
    .with_force_rebuild(false);

std::shared_ptr<Reader> reader = utility.process(input);

DFTracer-Specific Composites

These composites are specialized for analyzing DFTracer HPC trace files. They provide distributed statistics collection, bloom filter-based chunk skipping, view queries with predicate filtering, and aggregation across multiple files.

#include <dftracer/utils/utilities/composites/dft/statistics/detailed_statistics.h>
#include <dftracer/utils/utilities/composites/dft/indexing/bloom_filter_cache.h>
#include <dftracer/utils/utilities/composites/dft/views/predicate_filter.h>

Statistics

DDSketch and Log2Histogram

Two probabilistic data structures for efficient, order-independent statistics:

DDSketch — Percentile estimation with bounded relative error:

#include <dftracer/utils/utilities/common/statistics/ddsketch.h>

using dftracer::utils::utilities::common::statistics::DDSketch;

// Create sketch with 1% relative error bound
DDSketch sketch(0.01);

// Add event durations
for (auto duration_us : event_durations) {
    sketch.add(duration_us);
}

// Query percentiles
double p50 = sketch.quantile(0.5);      // median
double p99 = sketch.quantile(0.99);     // 99th percentile
double p999 = sketch.quantile(0.999);   // 99.9th percentile

// Merge sketches from parallel workers (order-independent)
DDSketch merged(0.01);
merged.merge(sketch_from_worker_1);
merged.merge(sketch_from_worker_2);
// merged now contains combined percentiles

// Access raw statistics
std::uint64_t count = sketch.count();
double min_val = sketch.min();
double max_val = sketch.max();
std::size_t memory_bytes = sketch.memory_usage();

Log2Histogram — Fixed 65-bin logarithmic histogram for size distributions:

#include <dftracer/utils/utilities/common/statistics/log2_histogram.h>

using dftracer::utils::utilities::common::statistics::Log2Histogram;

// Create histogram with bins for powers of 2
Log2Histogram histogram;

// Add event sizes
for (auto size_bytes : event_sizes) {
    histogram.add(size_bytes);
}

// Bin layout:
// Bin 0: value == 0
// Bin k (1 <= k <= 64): values in [2^(k-1), 2^k)
// Covers full uint64_t range (0 to 2^63)

// Query approximate percentile
double p95 = histogram.approx_percentile(0.95);

// Merge histograms (commutative)
Log2Histogram merged;
merged.merge(histogram);

// Human-readable output
std::string ascii_chart = histogram.render_ascii(80, "microseconds");
std::string block_chart = histogram.render_blocks(80, "bytes");
std::cout << ascii_chart << std::endl;

// JSON serialization for storage
std::string json_str = histogram.to_json();
Log2Histogram restored = Log2Histogram::from_json(json_str);

Chunk Statistics

Per-chunk aggregation of event counts, timestamps, and duration distributions:

#include <dftracer/utils/utilities/composites/dft/indexing/chunk_statistics.h>

using dftracer::utils::utilities::composites::dft::indexing::ChunkStatistics;

// Initialize per-chunk tracker
ChunkStatistics chunk_stats;

// Update as events are scanned
for (auto& event : events_in_chunk) {
    chunk_stats.update_from_event(
        event.name,               // e.g., "read", "write"
        event.category,           // e.g., "POSIX", "DLIO"
        event.pid, event.tid,
        event.timestamp_us,
        event.duration_us
    );
}

// Access aggregated data
std::uint64_t total = chunk_stats.total_events;
auto& cat_counts = chunk_stats.category_counts;    // name -> count
auto& op_counts = chunk_stats.name_counts;         // operation -> count
auto& tid_counts = chunk_stats.pid_tid_counts;     // "pid:tid" -> count

// Duration statistics (Welford's online algorithm + sketches)
double mean_duration = chunk_stats.duration_mean();
double variance = chunk_stats.duration_variance();
double p50 = chunk_stats.duration_sketch.quantile(0.5);

// Per-operation duration distributions
auto& name_sketches = chunk_stats.name_duration_sketches;
auto& name_histograms = chunk_stats.name_duration_histograms;

// Merge statistics from multiple workers
ChunkStatistics merged;
merged.merge_from(chunk_stats);

// JSON serialization for SQLite storage
std::string cat_json = chunk_stats.category_counts_json();
std::string hist_json = chunk_stats.name_duration_histograms_json();

Bloom Filter Indexing

BloomFilterCache

Thread-safe bounded cache for deserialized bloom filters used during chunk skipping:

#include <dftracer/utils/utilities/composites/dft/indexing/bloom_filter_cache.h>

using dftracer::utils::utilities::composites::dft::indexing::BloomFilterCache;

// Create cache with max 10,000 entries (auto-clears when full)
BloomFilterCache cache(BloomFilterCache::DEFAULT_MAX_ENTRIES);

// Typical usage during index query: check if event might be in chunk
const std::string idx_path = "/data/trace.pfw.gz.idx";
const std::string dimension = "name";  // which filter (by operation name)
std::uint64_t checkpoint_idx = 5;     // chunk number

// Look up cached bloom filter
auto cached = cache.get(idx_path, dimension, checkpoint_idx);
if (cached) {
    // Filter was in cache
    auto& filter = cached.value();
    if (filter.possibly_contains("read")) {
        // Event might be in this chunk - need to scan
    } else {
        // Event definitely NOT in this chunk - safe to skip
    }
} else {
    // Cache miss - load filter from .idx file, add to cache
    auto filter = load_bloom_from_idx(idx_path, dimension, checkpoint_idx);
    cache.put(idx_path, dimension, checkpoint_idx, filter);
}

// For file-level bloom filters
std::uint64_t FILE_LEVEL = BloomFilterCache::FILE_LEVEL_SENTINEL;  // UINT64_MAX
cache.put(idx_path, dimension, FILE_LEVEL, file_level_filter);

// Cache statistics
std::size_t cache_size = cache.size();

View Queries with Predicates

PredicateFilter

Build and evaluate event matching predicates for view queries:

#include <dftracer/utils/utilities/composites/dft/views/predicate_filter.h>
#include <dftracer/utils/utilities/composites/dft/views/view_definition.h>

using dftracer::utils::utilities::composites::dft::views::PredicateFilter;
using dftracer::utils::utilities::composites::dft::views::build_predicate_filter;
using dftracer::utils::utilities::composites::dft::views::matches_predicate;
using dftracer::utils::utilities::composites::dft::views::matches_any_predicate;

// Define view query predicates
ViewPredicate predicate;
predicate.dimensions["name"] = {"read", "write"};     // event name filter
predicate.dimensions["cat"] = {"POSIX", "DLIO"};      // category filter
predicate.time_range = {1000.0, 5000.0};              // microseconds
predicate.duration_bounds = {10.0, 100.0};            // microseconds

// Compile to efficient filter representation
PredicateFilter filter = build_predicate_filter(predicate);

// Check if a parsed event matches
auto& json_event = parsed_json_value;
if (matches_predicate(json_event, filter)) {
    // Include this event in view results
    results.push_back(json_event);
}

// Check multiple predicates (OR semantics)
std::vector<PredicateFilter> filters = {
    build_predicate_filter(read_predicate),
    build_predicate_filter(write_predicate)
};

if (matches_any_predicate(json_event, filters)) {
    // Event matches at least one predicate
    results.push_back(json_event);
}

Aggregation Patterns

DistributionStats and DetailedStatistics

Reusable building blocks for multi-worker aggregation:

#include <dftracer/utils/utilities/composites/dft/statistics/detailed_statistics.h>

using dftracer::utils::utilities::composites::dft::statistics::DistributionStats;
using dftracer::utils::utilities::composites::dft::statistics::DetailedStatistics;
using dftracer::utils::utilities::composites::dft::statistics::IOEventMetrics;

// Per-worker collection (one worker per file/chunk batch)
DistributionStats duration_stats;
for (auto& event : events) {
    duration_stats.update(event.duration_us);
}

// DistributionStats combines three representations
// - histogram: 65 fixed bins
// - sketch: percentile estimation (0.01 relative error)
// - sum: for mean/variance computation
double mean = duration_stats.mean();
double stddev = duration_stats.stddev();
std::uint64_t count = duration_stats.count();

// Per-group aggregation (group by operation name, category, etc.)
DetailedStatistics stats;
stats.duration.update(event.duration_us);  // global duration

// Group by operation name
std::string key = event.name;
stats.grouped_duration[key].update(event.duration_us);

// I/O event metrics (if event has I/O fields)
auto& io_metrics = stats.grouped_io[key];
io_metrics.duration.update(event.duration_us);
io_metrics.size.update(event.size_bytes);
io_metrics.bandwidth.update(bytes_per_second);
io_metrics.offset.update(file_offset);

// Merge results from all workers (order-independent)
DetailedStatistics merged;
for (auto& worker_stats : worker_results) {
    merged.merge(worker_stats);
}

// Export to JSON for further analysis
std::string json_output = merged.to_json();

Real-World Example: Statistics Pipeline

Complete example of gathering statistics from a DFTracer trace file:

#include <dftracer/utils/utilities/composites/dft/statistics/statistics_aggregator_utility.h>
#include <dftracer/utils/utilities/composites/dft/indexing/bloom_filter_cache.h>

using dftracer::utils::utilities::composites::dft::statistics::
    StatisticsAggregatorUtility;
using dftracer::utils::utilities::composites::dft::statistics::
    StatisticsAggregatorInput;
using dftracer::utils::utilities::composites::dft::indexing::BloomFilterCache;

// Create aggregator utility
auto aggregator = std::make_shared<StatisticsAggregatorUtility>();

// Prepare input (uses bloom index for chunk skipping)
StatisticsAggregatorInput input{
    .file_path = "/data/trace.pfw.gz",
    .idx_path = "/data/trace.pfw.gz.idx",
    .index_dir = "/data/.indexes"
};

// Run aggregation (coroutine-based, parallelizable)
auto stats = co_await aggregator->process(input);

// Results contain merged statistics across all chunks
// - Overall duration percentiles (p50, p99, p999)
// - Per-operation counts and distributions
// - Category and thread breakdowns
std::cout << "Total events: " << stats.merged.total_events << std::endl;
std::cout << "Duration p99: " << stats.merged.duration_sketch.quantile(0.99)
          << " us" << std::endl;

DFT Event Pipeline

DftEventDispatcher

Adapter that turns a list of DftEventVisitor instances into a single IndexVisitor consumable by IndexBuilderUtility. Owns a per-instance JsonParser and parses each decompressed line once before fanning out to the configured visitors (BloomVisitor, ManifestVisitor, AggregationVisitor, …). Supports a force_serial mode for deterministic-order replays.

#include <dftracer/utils/utilities/composites/dft/dft_event_dispatcher.h>

std::vector<std::unique_ptr<DftEventVisitor>> visitors;
visitors.push_back(std::make_unique<BloomVisitor>(...));
visitors.push_back(std::make_unique<ManifestVisitor>(...));
DftEventDispatcher dispatcher(std::move(visitors));

AggregationVisitor

Emits per-chunk aggregation + system-metric merge operands into the distributed aggregation column families. Pairs with AggregationMergeOperator / SystemMetricsMergeOperator for distributed reduction; lives in composites/dft/aggregators/aggregation_visitor.h.

Reorganization Pipeline

Parallel event routing for reorganizing traces by query-based groups. The organize flow is a streaming pipeline that fans events through visitor groups, batches output, and periodically flushes group writers (GroupWriterTask) to bound peak memory.

ChunkWriter

Streaming file writer that automatically splits output into chunked files. Supports optional gzip compression and JSON array wrapping.

#include <dftracer/utils/utilities/fileio/chunk_writer.h>

using dftracer::utils::utilities::fileio::ChunkWriter;
using dftracer::utils::utilities::fileio::ChunkWriterConfig;

ChunkWriterConfig config;
config.output_dir = "./output";
config.base_name = "io_events";
config.chunk_size_bytes = 256 * 1024 * 1024;  // 256 MB per chunk
config.compress = true;
config.json_array_wrapper = true;

ChunkWriter writer(config);
// Write events — automatically rolls to new chunk file when size exceeded
// Each chunk is a separate .pfw.gz file

EventRouter

Routes events from source trace files to output groups in parallel using AsyncMutex-protected ChunkWriter instances. Each group is defined by a query predicate (from ExtractionPlan), and events matching a group are written to that group’s chunked output.

#include <dftracer/utils/utilities/composites/dft/reorganize/event_router.h>

using namespace dftracer::utils::utilities::composites::dft::reorganize;

EventRouterConfig config;
config.plan = extraction_plan;   // from ReorganizationPlanner
config.output_dir = "./organized";
config.chunk_size_bytes = 256 * 1024 * 1024;
config.compress = true;
config.executor_threads = 8;

auto result = co_await route_events(scope, config);
// result.total_events_written, result.chunks_created, result.output_files

ProvenanceTracker

Tracks source-to-output mapping during reorganization. Records which source file and line produced each output event, enabling reconstruction of original traces from reorganized files via dftracer_reconstruct.

ReconstructorUtility

Streaming reconstruction pipeline that inverts the organize pipeline: plans a reconstruction over a .pidx provenance store, fans out per-source read tasks through coroutines and channels, and merges results in original-order back into the requested output. Defined in composites/dft/reorganize/reconstructor_utility.h.

Comparison

Compare trace metrics between a baseline and variant run. Aggregates events into time-bucketed windows, computes per-window max across processes, then mean +/- stdev across windows, and classifies deltas using Cohen’s d effect size (NEGLIGIBLE / SMALL / MEDIUM / LARGE).

ComparisonConfig

Configuration can be built from CLI arguments or loaded from a JSON file with hierarchical node trees. Nodes inherit query filters, metrics, and percentiles from their parent unless overridden.

#include <dftracer/utils/utilities/composites/dft/comparator/comparison_config.h>

using namespace dftracer::utils::utilities::composites::dft::comparator;

// Quick mode from CLI arguments
auto config = ComparisonConfig::from_cli(
    "./traces_v1", "./traces_v2",
    R"(cat == "POSIX" OR cat == "STDIO")",  // query
    "");                                      // group_by
config.defaults.time_interval_ms = 1000.0;
config.resolve();  // propagate defaults down the tree

// Or load from JSON config file
std::string error;
auto config = ComparisonConfig::from_json_file("compare.json", error);
if (!config) {
    // handle error
}
config->resolve();

ComparisonUtility

Joins baseline and variant aggregation outputs, builds the hierarchical comparison tree (root -> categories -> operations), and computes deltas with significance classification.

#include <dftracer/utils/utilities/composites/dft/comparator/comparison_utility.h>

using namespace dftracer::utils::utilities::composites::dft::comparator;

// Prepare visitor pairs (one per flattened node)
ComparisonVisitorPair pair;
pair.baseline = baseline_aggregation_output;
pair.variant = variant_aggregation_output;
pair.node = resolved_config_node;

ComparisonUtilityInput input;
input.visitors = {pair};
input.root_node = config.nodes[0];
input.baseline_file_count = 3;
input.variant_file_count = 3;

ComparisonUtility cmp;
auto output = co_await cmp.process(input);
// output.result contains the hierarchical NodeResult tree

TreeTableFormatter

Renders ComparisonOutput as an ASCII tree table or JSON string. The table uses dynamic column alignment with UTF-8 display width awareness.

#include <dftracer/utils/utilities/composites/dft/comparator/tree_table_formatter.h>

using namespace dftracer::utils::utilities::composites::dft::comparator;

// Table output to stdout
FormatterOptions opts;
opts.use_color = true;
opts.use_unicode = true;
TreeTableFormatter formatter(opts);
formatter.render(stdout, comparison_output);

// JSON output
std::string json = formatter.render_json(comparison_output);

// No-color output (for piping or logging)
FormatterOptions plain{.use_color = false, .use_unicode = false};
TreeTableFormatter plain_fmt(plain);
plain_fmt.render(log_file, comparison_output);

Arrow Export

ComparisonOutput::to_arrow() flattens the hierarchical tree into a single Arrow record batch for programmatic analysis. Requires DFTRACER_UTILS_ENABLE_ARROW=ON at build time.

#include <dftracer/utils/utilities/composites/dft/comparator/comparison_result.h>

// After running the comparison pipeline:
auto arrow_result = comparison_output.to_arrow();
// Columns: node_path, metric_group, metric_name, baseline, variant,
//          baseline_stdev, variant_stdev, delta, pct_change,
//          cohens_d, significance, is_regression

See Utilities API for the full API reference of all comparator types.