Utilities Module

The dftracer.utils.utilities module provides Python bindings for DFTracer’s composable C++ utility classes. Each utility wraps a C++ pipeline stage and exposes it as a callable Python object.

Utilities fall into two categories:

  • Tabular utilities return Arrow data via process() (materialized ArrowTable) and iter_arrow() (streaming ArrowBatch).

  • Scalar utilities return Python dicts from process().

All utilities accept an optional runtime argument for thread pool control. Per-call inputs (file_path, predicates, etc.) are passed to process(), not the constructor.

All utilities are callable: util(...) is equivalent to util.process(...).

from dftracer.utils.utilities import (
    AggregatorUtility,
    MetadataCollectorUtility,
    ReconstructionPlannerUtility,
    ReorganizationPlannerUtility,
    StatisticsAggregatorUtility,
    StatisticsQueryUtility,
)

Tabular Utilities (Arrow Output)

These utilities return columnar Arrow data. process() returns a materialized ArrowTable; iter_arrow() streams ArrowBatch objects one at a time.

AggregatorUtility

High-level aggregation pipeline. Scans a directory for .pfw / .pfw.gz files, builds indexes, aggregates events into time-bucketed counters, and returns the result as Arrow.

The Arrow output always includes the base aggregation columns: batch_type, cat, name, pid, tid, hhash, fhash, time_bucket, count, dur_total, dur_min, dur_max, dur_mean, dur_std, size_total, size_min, size_max, size_mean, size_std, ts, and te. When custom_metric_fields is provided, each field adds <field>_total, <field>_min, <field>_max, <field>_mean, and <field>_std columns. batch_type distinguishes regular event, profile-counter, and system-counter rows.

class dftracer.utils.dftracer_utils_ext.AggregatorUtility(runtime: Runtime | None = None)
iter_arrow(directory: str, time_interval_ms: float = 5000.0, group_keys: list[str] | None = None, categories: list[str] | None = None, names: list[str] | None = None, index_dir: str = '', checkpoint_size: int = 33554432, force_rebuild: bool = False, chunk_size_mb: int = 64, batch_size_mb: int = 4, event_batch_size: int = 10000, custom_metric_fields: list[str] | None = None, compute_percentiles: bool = False) Iterator[object]
process(directory: str, time_interval_ms: float = 5000.0, group_keys: list[str] | None = None, categories: list[str] | None = None, names: list[str] | None = None, index_dir: str = '', checkpoint_size: int = 33554432, force_rebuild: bool = False, chunk_size_mb: int = 64, batch_size_mb: int = 4, event_batch_size: int = 10000, custom_metric_fields: list[str] | None = None, compute_percentiles: bool = False) object
agg = AggregatorUtility()

# Materialized
table = agg.process("./traces", time_interval_ms=1000.0)

# Include custom metrics from event args
table = agg.process(
    "./traces",
    time_interval_ms=1000.0,
    custom_metric_fields=["bytes", "ops"],
    compute_percentiles=True,
)

# Streaming
for batch in agg.iter_arrow("./traces"):
    pa_batch = pyarrow.record_batch(batch)
    process(pa_batch)

# Callable shorthand
table = agg("./traces")

ComparatorUtility

Compare trace metrics between a baseline and variant run. Returns a hierarchical comparison with per-category and per-operation deltas, Cohen’s d significance, and regression detection.

Three output methods:

  • compare() returns a materialized ArrowTable with columns: node_path, metric_group, metric_name, baseline, variant, baseline_stdev, variant_stdev, delta, pct_change, cohens_d, significance, is_regression.

  • compare_json() returns a JSON string with the full hierarchical tree.

  • compare_table() returns a formatted ASCII table string.

class dftracer.utils.dftracer_utils_ext.ComparatorUtility(runtime: Runtime | None = None)
compare(baseline: str, variant: str, query: str = '', group_by: str = '', format: str = 'table', time_interval_ms: float = 5000.0, threshold: float = 0.0, executor_threads: int = 0, index_dir: str = '', force_rebuild: bool = False, config: str = '') object
compare_json(baseline: str, variant: str, query: str = '', group_by: str = '', format: str = 'table', time_interval_ms: float = 5000.0, threshold: float = 0.0, executor_threads: int = 0, index_dir: str = '', force_rebuild: bool = False, config: str = '') str
compare_table(baseline: str, variant: str, query: str = '', group_by: str = '', format: str = 'table', time_interval_ms: float = 5000.0, threshold: float = 0.0, executor_threads: int = 0, index_dir: str = '', force_rebuild: bool = False, config: str = '') str
from dftracer.utils.utilities import ComparatorUtility

cmp = ComparatorUtility()

# Arrow table for programmatic analysis
table = cmp.compare("./traces_v1/run.pfw.gz", "./traces_v2/run.pfw.gz")

# JSON for serialization
json_str = cmp.compare_json("./traces_v1", "./traces_v2")

# Formatted table for display
print(cmp.compare_table("./baseline.pfw.gz", "./variant.pfw.gz"))

# With options
table = cmp.compare(
    "./baseline.pfw.gz",
    "./variant.pfw.gz",
    query='cat == "POSIX"',
    time_interval_ms=1000.0,
    threshold=1.0,
)

# Callable shorthand (delegates to compare)
table = cmp("./baseline.pfw.gz", "./variant.pfw.gz")

Scalar Utilities (Dict Output)

These utilities return Python dicts. Arrow output is not applicable since their results are scalar or structural (not tabular).

StatisticsQueryUtility

Query pre-computed statistics from an indexed trace file. When bloom/chunk statistics are not available, the utility falls back to streaming the file sequentially and computing statistics on-the-fly.

class dftracer.utils.dftracer_utils_ext.StatisticsQueryUtility(runtime: Runtime | None = None)
process(file_path: str, query_type: str = 'summary', top_n: int = 10, index_dir: str = '') dict[str, object]

process(file_path, query_type="summary", top_n=10, index_dir="") returns a dict; query_type accepts "summary", "top_n_names", and other pre-computed statistics views.

sq = StatisticsQueryUtility()
result = sq.process("trace.pfw.gz", query_type="summary")
print(result["total_events"])

result = sq.process("trace.pfw.gz", query_type="top_n_names", top_n=5)
for name, count in result["results"]:
    print(f"  {name}: {count}")

StatisticsAggregatorUtility

Aggregate statistics from a trace file. Uses pre-computed chunk statistics from the .dftindex store when available. When chunk statistics are absent, falls back to streaming the .pfw.gz line-by-line and computing statistics on-the-fly.

class dftracer.utils.dftracer_utils_ext.StatisticsAggregatorUtility(runtime: Runtime | None = None)
process(file_path: str, index_dir: str = '') dict[str, object]
sa = StatisticsAggregatorUtility()
result = sa.process("trace.pfw.gz")
print(f"Events: {result['total_events']}")
print(f"Duration mean: {result['duration_mean_us']} us")

MetadataCollectorUtility

Collect metadata from a DFTracer trace file.

class dftracer.utils.dftracer_utils_ext.MetadataCollectorUtility(runtime: Runtime | None = None)
process(file_path: str, index_dir: str = '') dict[str, object]
mc = MetadataCollectorUtility()
result = mc.process("trace.pfw.gz")
print(f"Size: {result['size_mb']:.2f} MB")
print(f"Format: {result['format']}")
print(f"Events: {result['valid_events']}")

ReorganizationPlannerUtility

Plan semantic reorganization of trace files. When manifest data is available in the .dftindex store, produces per-checkpoint extraction tasks. When manifest tables are absent, falls back to streaming the file line-by-line and emitting one whole-file extraction task per query group.

class dftracer.utils.dftracer_utils_ext.ReorganizationPlannerUtility(runtime: Runtime | None = None)
process(source_files: list[str], groups: list[dict[str, str]] | None = None, index_dir: str = '') dict[str, object]
rp = ReorganizationPlannerUtility()
plan = rp.process(
    source_files=["trace1.pfw.gz", "trace2.pfw.gz"],
    groups=[{"name": "posix", "query": 'cat == "POSIX"'}],
)
print(f"Tasks: {len(plan['tasks'])}")

ReconstructionPlannerUtility

Plan reconstruction of original files from reorganized traces.

class dftracer.utils.dftracer_utils_ext.ReconstructionPlannerUtility(runtime: Runtime | None = None)
process(reorganized_files: list[str], index_dir: str = '') dict[str, object]
rcp = ReconstructionPlannerUtility()
plan = rcp.process(reorganized_files=["reorg1.pfw.gz"])
print(f"Segments: {plan['total_segments']}")

Arrow Data Types

class dftracer.utils.arrow.ArrowBatch(capsule: Any)[source]

Wrapper around an Arrow RecordBatch from the C extension.

Supports the Arrow PyCapsule protocol (__arrow_c_array__) for zero-copy interchange with pyarrow, polars, and DuckDB.

The underlying C data is exported via ownership transfer on first use. The pyarrow RecordBatch is cached so subsequent calls to to_pandas(), to_polars(), or __arrow_c_array__() are safe.

__arrow_c_array__(requested_schema: Any = None) Tuple[Any, Any][source]

Export via Arrow C Data Interface.

property num_rows: int

Number of rows in this batch.

property num_columns: int

Number of columns in this batch.

to_pandas() Any[source]

Convert to pandas DataFrame.

Returns:

The converted DataFrame.

Return type:

pandas.DataFrame

Raises:

ImportError – If pyarrow is not installed.

to_polars() Any[source]

Convert to polars DataFrame.

Returns:

The converted DataFrame.

Return type:

polars.DataFrame

Raises:

ImportError – If polars is not installed.

class dftracer.utils.arrow.ArrowTable(batches: Any, schema_capsule: Any | None = None)[source]

Wrapper around a collection of Arrow RecordBatches.

Returned by read_arrow() and utility process() methods. Supports the Arrow PyCapsule stream protocol (__arrow_c_stream__) for zero-copy interchange.

Accepts either a pre-built list of batches or a lazy iterator. When constructed from an iterator, batches are not materialized until data access (to_pandas, to_polars, batches, etc.).

num_rows is special: if the iterator has not been consumed yet, it streams through counting rows without retaining batches (O(1) memory). After a streaming num_rows, data access methods will return empty results – use iter_arrow directly if you need both count and data for very large datasets.

__arrow_c_stream__(requested_schema: Any = None) Any[source]

Arrow C Stream Interface – yields batches to consumers.

property num_batches: int

Number of batches.

property num_rows: int

Total number of rows across all batches.

property empty: bool

True if there are no batches.

batch(i: int) Any[source]

Get the i-th batch.

batches() Iterator[Any][source]

Iterate over batches.

to_pandas() Any[source]

Convert all batches to a single pandas DataFrame.

Returns:

The converted DataFrame.

Return type:

pandas.DataFrame

Raises:

ImportError – If pyarrow is not installed.

to_polars() Any[source]

Convert all batches to a single polars DataFrame.

Returns:

The converted DataFrame.

Return type:

polars.DataFrame

Raises:

ImportError – If polars is not installed.