Common

Shared utilities used across the library: JSON parsing, query language, statistics collection, and Arrow data interchange.

JSON

JSON parsing uses simdjson exclusively (DOM and On-Demand APIs). JsonValue is a lightweight wrapper around simdjson::dom::element; JsonParser exposes the On-Demand API for zero-copy lazy field access.

#include <dftracer/utils/utilities/common/json/json.h>
#include <dftracer/utils/utilities/common/json/json_value.h>
#include <dftracer/utils/utilities/common/json/parser.h>

JsonValue

Wrapper over simdjson::dom::element with fluent navigation and type-safe accessors. Non-owning: only valid while the backing simdjson::dom::document is alive.

Parse and navigate:

simdjson::dom::parser parser;
simdjson::dom::element doc = parser.parse(json_str);
JsonValue root(doc);

// Fluent navigation with defaults
std::string name = root["metadata"]["name"].get<std::string>("unknown");
uint64_t id = root["id"].get<uint64_t>(0);
double value = root["data"]["value"].get<double>(1.0);

Dot-path navigation:

auto nested = root.at("metadata.config.timeout");
if (nested.exists()) {
    int timeout = nested.get<int>(30);
}

Optional access:

if (auto val = root["optional_field"].get_optional<int64_t>()) {
    use(*val);
}

Type checking:

JsonValue val = root["field"];
if (val.is_string()) { /* ... */ }
if (val.is_number()) { /* ... */ }
if (val.is_object()) { /* ... */ }
if (val.is_array())  { /* ... */ }
if (val.exists())    { /* not null */ }

JsonDocGuard

RAII helper that owns a simdjson::dom::parser; parse(data, len) reuses the parser buffer and root() returns the parsed element. Use across short-lived parse sites; StringJsonParserUtility is preferred when the document must outlive a co_await boundary.

JsonDocGuard guard;
if (guard.parse(data, len)) {
    JsonValue root(guard.root());
    // ... use root ...
}

JsonParser (On-Demand)

On-Demand parser for zero-copy lazy field access. Reuses an internal padded buffer across rows; string_view results are valid until the next parse() call. Used by the indexing visitors and TraceReader::read_json.

JsonParser parser;

for (auto& line : input_lines) {
    if (!parser.parse(line)) continue;
    auto name = parser.get_string("name");
    auto ts   = parser.get_int64("ts");

    parser.for_each_field("args", [](std::string_view k,
                                     simdjson::ondemand::value v) {
        // process nested fields
    });
}

StringJsonParserUtility

Parses JSON strings with owned document lifetime. Safe for use across co_await boundaries.

StringJsonParserUtility parser;

// From string
auto input = StringJsonParserInput::from_string(R"({"x": 42})");
auto json = co_await parser.process(input);
int x = json["x"].get<int>(0);

// From file (async)
auto input = co_await StringJsonParserInput::from_file_async("config.json");
auto json = co_await parser.process(input);

parser.reset();  // Cleanup

ArgsMap and ArgsValueProxy

Owned key/value map used for trace event args. Replaces JsonValue for event-args storage in the DFT composites: keys are interned with the global dftracer::utils::StringIntern and values are a typed std::variant (string, int64, uint64, double, bool). ArgsValueProxy mirrors the JsonValue accessor surface (get<T>, get_optional<T>, is_string() …) so event visitors can be written generically.

#include <dftracer/utils/utilities/composites/dft/args_map.h>

using dftracer::utils::utilities::composites::dft::ArgsMap;

ArgsMap args;
args.insert("hhash", std::uint64_t{0x1234});
args.set_valid(true);

uint64_t h = args["hhash"].get<uint64_t>(0);
args.for_each_member([](std::string_view k, auto v) { /* ... */ });

JsonDictValue (Python)

Python-facing wrapper that exposes a parsed JSON object as a lazy Mapping. Used by the TraceReader Python binding to surface parsed events without materialising a dict per row. Defined in src/dftracer/utils/python/json.h.

Note

dftracer::utils::StringIntern was reimplemented as a lock-free open-chained hash table with a fast-path id table (FAST_CAPACITY = 1<<20). Lookups are fully lock-free; only the rare first insert of a string takes the insertion mutex.

Query

Query DSL for filtering JSON events. Recursive descent parser, AST evaluator, and Query class that owns a parsed AST.

#include <dftracer/utils/utilities/common/query/query.h>

Query Class

Owns a parsed query AST. Provides evaluation against JsonValue (for JSON events) and ValueMap (for typed key-value maps).

using namespace dftracer::utils::utilities::common::query;

// Parse and evaluate against JSON
auto q = parse_or_throw(R"(cat == "POSIX" and dur > 1000)");
JsonValue event = ...;
if (q.evaluate(event)) { /* match */ }

// Evaluate against a typed map (no JSON needed)
ValueMap fields = {{"cat", std::string("POSIX")}, {"dur", uint64_t(2000)}};
if (q.evaluate(fields)) { /* match */ }

// Safe parsing with error handling
auto result = Query::from_string(R"(cat in ["POSIX", "STDIO"])");
if (!result) {
    std::cerr << result.error().format();  // formatted error with column indicator
}

Parser

Tokenizes and parses query DSL strings into an AST. Keywords (and, or, not, in, true, false) are case-insensitive. String values are case-sensitive.

// Tokenize
auto tokens = tokenize(R"(cat == "POSIX" AND dur > 1000)");

// Parse tokens into AST
auto ast = parse(R"(name in ["read", "write"] or not cat == "MPI")");

Evaluator

Evaluates an AST node against a JsonValue or ValueMap. Missing fields and type mismatches evaluate to false.

// Direct AST evaluation (Query::evaluate wraps this)
bool match = evaluate(*ast, json_event);
bool match2 = evaluate(*ast, value_map);

AST Types

The query AST uses std::variant-based nodes:

  • CompareNode — field comparison (==, !=, >, <, >=, <=)

  • InNode / NotInNode — set membership

  • AndNode / OrNode — logical connectives

  • NotNode — logical negation

// Programmatic AST construction
auto node = make_node(CompareNode{
    FieldNode{"cat"}, CompareOp::EQ, LiteralNode{std::string("POSIX")}});

// AST to string (round-trip)
std::string s = to_string(*node);  // cat == "POSIX"

Statistics

Percentile estimation, histogram, accumulator, and distribution-fitting utilities for trace analysis.

#include <dftracer/utils/utilities/common/statistics/ddsketch.h>
#include <dftracer/utils/utilities/common/statistics/log2_histogram.h>
#include <dftracer/utils/utilities/common/statistics/timestamp_histogram.h>
#include <dftracer/utils/utilities/common/statistics/statistic.h>
#include <dftracer/utils/utilities/common/statistics/distributions.h>
#include <dftracer/utils/utilities/common/statistics/mixture.h>
// Or use the umbrella header:
#include <dftracer/utils/utilities/common/statistics/statistics.h>

DDSketch

Deterministic, merge-order-independent percentile estimation with bounded relative error.

Basic usage:

DDSketch sketch(0.01);  // 1% relative accuracy

for (double duration : durations) {
    sketch.add(duration);
}

double p50 = sketch.quantile(0.5);
double p95 = sketch.quantile(0.95);
double p99 = sketch.quantile(0.99);

printf("p50=%.1f p95=%.1f p99=%.1f (n=%lu)\n",
       p50, p95, p99, sketch.count());

Merging sketches from distributed sources:

DDSketch sketch_a(0.01);
DDSketch sketch_b(0.01);

// ... populate independently ...

// Merge is commutative and associative
sketch_a.merge(sketch_b);
// sketch_a now contains combined data

Serialization:

auto bytes = sketch.serialize();
DDSketch restored = DDSketch::deserialize(bytes.data(), bytes.size());

Log2Histogram

Fixed 65-bin logarithmic histogram covering the uint64_t range. Bin 0 holds value 0, bin k (1-64) holds values in [2^(k-1), 2^k).

Basic usage:

Log2Histogram hist;

for (uint64_t duration_us : durations) {
    hist.add(duration_us);
}

double p50 = hist.approx_percentile(0.5);
double p99 = hist.approx_percentile(0.99);

Visualization:

// ASCII bar chart
std::cout << hist.render_ascii(50, "us");
//     [0, 0) us                |# 1
//     [64, 128) us             |############ 100

// Unicode block elements
std::cout << hist.render_blocks(50, "us");

Merging and serialization:

Log2Histogram a, b;
// ... populate ...
a.merge(b);  // Commutative

std::string json = a.to_json();
Log2Histogram restored = Log2Histogram::from_json(json);

TimestampHistogram

Sparse fixed-width (100 ms) histogram over event timestamps. Used by the chunk pruner to compute time-range selectivity and to weight sub-bucket expansions for adaptive aggregation.

TimestampHistogram th;
for (auto ts_us : timestamps) th.add(ts_us);

std::uint64_t in_window = th.count_in_range(ts_lo, ts_hi);
double sel = th.selectivity(ts_lo, ts_hi);

auto bytes = th.serialize();
auto restored = TimestampHistogram::deserialize(bytes.data(), bytes.size());

Statistic

Lightweight min/max/mean/count accumulator with an optional DDSketch backing for quantile queries. When a sketch is attached, quantile() consults it; when no sketch is present, the fallback is a uniform interpolation between observed min and max.

Statistic stat;
for (double v : samples) stat.update(v);

double mean = stat.mean();
double approx_p50 = stat.quantile(0.5);  // uses linear-interp without a sketch

// Promote to DDSketch-backed quantiles by attaching a populated sketch.
auto sketch = std::make_shared<DDSketch>(0.01);
for (double v : samples) sketch->add(v);
stat.attach_sketch(std::move(sketch));
double real_p99 = stat.quantile(0.99);  // now consults the sketch

Distributions

Maximum-likelihood fitting for five parametric families plus a Kolmogorov- Smirnov goodness-of-fit score and BIC. Backed by Boost.Math standalone for CDF/PDF/quantile evaluation; samplers use <random>.

Supported families: Normal, Lognormal, Gamma, Exponential, Weibull.

std::vector<double> data = ...;

// Fit one family directly.
FittedDistribution fit = fit_single_distribution(
    DistributionKind::Lognormal, data);
if (fit.valid) {
    printf("lognormal mu=%.4f sigma=%.4f KS=%.4f BIC=%.2f\n",
           fit.params[0], fit.params[1], fit.ks_stat, fit.bic);
}

// Fit all five and pick the lowest-KS valid fit.
auto fits = fit_all_single_distributions(data);
if (auto best = best_fit_by_ks(fits)) {
    printf("best family: %s\n",
           std::string(distribution_name(best->kind)).c_str());
}

// Build a sampler from a fit (optionally bounded).
auto sampler = make_sampler(*best, /*min_bound=*/0.0,
                             /*max_bound=*/0.5);
std::mt19937_64 rng(42);
double draw = sampler(rng);

Mixture

Univariate Gaussian Mixture Model fitting via EM (K=2, K=3) with log-sum-exp responsibilities, quantile-spread initial means, and a variance floor to prevent component collapse. Plus a BIC-based selector across single distributions and mixtures.

// Fit a 2-component Gaussian mixture.
FittedMixture m = fit_gaussian_mixture(data, /*K=*/2);
if (m.valid && m.converged) {
    for (size_t k = 0; k < m.weights.size(); ++k) {
        printf("comp %zu weight=%.3f mean=%.6f stddev=%.6f\n",
               k, m.weights[k],
               m.components[k].mean, m.components[k].stddev);
    }
}

// Pick the lowest-BIC model across {singles, GMM-2, GMM-3}.
auto singles = fit_all_single_distributions(data);
std::vector<FittedMixture> mixes{
    fit_gaussian_mixture(data, 2),
    fit_gaussian_mixture(data, 3),
};
auto selection = select_best_model(singles, mixes);

if (selection) {
    // `BestModel` is a std::variant<FittedDistribution, FittedMixture>.
    // pdf / cdf / make_sampler are overloaded and dispatch through it.
    auto sampler = make_sampler(selection->model);
    std::mt19937_64 rng(42);
    double draw = sampler(rng);
}

Arrow

Arrow data interchange via nanoarrow. Guarded by DFTRACER_UTILS_ENABLE_ARROW (ON by default).

#include <dftracer/utils/utilities/common/arrow/arrow.h>

RecordBatchBuilder

Type-safe columnar builder with two modes:

  • Static schemadeclare_schema() upfront, direct index append. Best for utility to_arrow() methods with known column layouts.

  • Dynamic schemaadd_or_get_column() discovers columns from data, end_row() backfills nulls. Best for arbitrary JSON (e.g., TraceReader.iter_arrow()).

Column types: INT64, UINT64, DOUBLE, STRING, BOOL.

String columns store string_view into source data — zero copies during build, bulk copy only at finish(). Caller must keep source data alive until finish() returns.

Static schema example:

RecordBatchBuilder builder;
builder.declare_schema({
    {"id", ColumnType::INT64},
    {"name", ColumnType::STRING},
    {"value", ColumnType::DOUBLE},
});
builder.reserve(1000);

builder.append_int64(0, 42);
builder.append_string(1, "hello");
builder.append_double(2, 3.14);
builder.end_row();

auto result = builder.finish();  // ArrowExportResult
// result.num_rows() == 1, result.num_columns() == 3

builder.reset(true);  // keep schema, clear data for next batch

Dynamic schema example:

RecordBatchBuilder builder;

// Columns discovered from data
auto col_x = builder.add_or_get_column("x", ColumnType::INT64);
builder.append_int64(col_x, 1);
builder.end_row();  // row 0: x=1

auto col_y = builder.add_or_get_column("y", ColumnType::STRING);
// col_y is new — backfills null for row 0
builder.append_int64(col_x, 2);
builder.append_string(col_y, "hello");
builder.end_row();  // row 1: x=2, y="hello"

auto result = builder.finish();
// result.num_rows() == 2, result.num_columns() == 2
// column "y" has null in row 0

ArrowExportResult

Move-only RAII wrapper holding nanoarrow::UniqueSchema + nanoarrow::UniqueArray. Self-contained, safe to send across threads and channels.

auto result = builder.finish();
assert(result.valid());
assert(result.num_rows() == 100);
assert(result.num_columns() == 5);

// Access raw Arrow C Data Interface pointers
ArrowSchema* schema = result.get_schema();
ArrowArray* array = result.get_array();

// Move ownership out
auto schema = result.release_schema();
auto array = result.release_array();

IpcWriter

Streaming Arrow IPC file writer (.arrows format). Writes files readable by pyarrow, polars, DuckDB, and any Arrow-compatible tool.

Guarded by DFTRACER_UTILS_ENABLE_ARROW_IPC.

#include <dftracer/utils/utilities/common/arrow/ipc_writer.h>

IpcWriter writer;
writer.open("output.arrows");

// Stream batches — schema written on first write_batch()
for (auto& batch : batches) {
    auto arrow = batch.to_arrow();
    writer.write_batch(arrow);
}

writer.close();  // writes footer, closes file

Batch to_arrow() Pattern

Streaming utilities yield batch structs with a to_arrow() method. This keeps Arrow conversion in C++ (not the Python binding) and allows both C++ and Python consumers to use Arrow output.

// ViewReaderBatch, AggregationBatch both follow this pattern
struct MyBatch {
    std::vector<std::string> events;

    ArrowExportResult to_arrow() const {
        RecordBatchBuilder builder;
        // ... parse events, append columns ...
        return builder.finish();
    }
};

// Consuming a StreamingUtility with Arrow output
MyStreamingUtility util;
auto gen = util.process(input);
while (auto batch = co_await gen.next()) {
    auto arrow = batch->to_arrow();
    ipc_writer.write_batch(arrow);
}

See Also