Utilities Module¶
The dftracer.utils.utilities module provides Python bindings for
DFTracer’s composable C++ utility classes. Each utility wraps a C++
pipeline stage and exposes it as a callable Python object.
Utilities fall into two categories:
Tabular utilities return Arrow data via
process()(materializedArrowTable) anditer_arrow()(streamingArrowBatch).Scalar utilities return Python dicts from
process().
All utilities accept an optional runtime argument for thread pool
control. Per-call inputs (file_path, predicates, etc.) are
passed to process(), not the constructor.
All utilities are callable: util(...) is equivalent to
util.process(...).
from dftracer.utils.utilities import (
AggregatorUtility,
MetadataCollectorUtility,
ReconstructionPlannerUtility,
ReorganizationPlannerUtility,
StatisticsAggregatorUtility,
StatisticsQueryUtility,
)
Tabular Utilities (Arrow Output)¶
These utilities return columnar Arrow data. process() returns a
materialized ArrowTable;
iter_arrow() streams ArrowBatch
objects one at a time.
AggregatorUtility¶
High-level aggregation pipeline. Scans a directory for .pfw /
.pfw.gz files, builds indexes, aggregates events into time-bucketed
counters, and returns the result as Arrow.
The Arrow output always includes the base aggregation columns:
batch_type, cat, name, pid, tid, hhash,
fhash, time_bucket, count, dur_total, dur_min,
dur_max, dur_mean, dur_std, size_total, size_min,
size_max, size_mean, size_std, ts, and te.
When custom_metric_fields is provided, each field adds
<field>_total, <field>_min, <field>_max, <field>_mean,
and <field>_std columns. batch_type distinguishes regular event,
profile-counter, and system-counter rows.
- class dftracer.utils.dftracer_utils_ext.AggregatorUtility(runtime: Runtime | None = None)¶
- iter_arrow(directory: str, time_interval_ms: float = 5000.0, group_keys: list[str] | None = None, categories: list[str] | None = None, names: list[str] | None = None, index_dir: str = '', checkpoint_size: int = 33554432, force_rebuild: bool = False, chunk_size_mb: int = 64, batch_size_mb: int = 4, event_batch_size: int = 10000, custom_metric_fields: list[str] | None = None, compute_percentiles: bool = False) Iterator[object]¶
- process(directory: str, time_interval_ms: float = 5000.0, group_keys: list[str] | None = None, categories: list[str] | None = None, names: list[str] | None = None, index_dir: str = '', checkpoint_size: int = 33554432, force_rebuild: bool = False, chunk_size_mb: int = 64, batch_size_mb: int = 4, event_batch_size: int = 10000, custom_metric_fields: list[str] | None = None, compute_percentiles: bool = False) object¶
agg = AggregatorUtility()
# Materialized
table = agg.process("./traces", time_interval_ms=1000.0)
# Include custom metrics from event args
table = agg.process(
"./traces",
time_interval_ms=1000.0,
custom_metric_fields=["bytes", "ops"],
compute_percentiles=True,
)
# Streaming
for batch in agg.iter_arrow("./traces"):
pa_batch = pyarrow.record_batch(batch)
process(pa_batch)
# Callable shorthand
table = agg("./traces")
ComparatorUtility¶
Compare trace metrics between a baseline and variant run. Returns a hierarchical comparison with per-category and per-operation deltas, Cohen’s d significance, and regression detection.
Three output methods:
compare()returns a materializedArrowTablewith columns:node_path,metric_group,metric_name,baseline,variant,baseline_stdev,variant_stdev,delta,pct_change,cohens_d,significance,is_regression.compare_json()returns a JSON string with the full hierarchical tree.compare_table()returns a formatted ASCII table string.
- class dftracer.utils.dftracer_utils_ext.ComparatorUtility(runtime: Runtime | None = None)¶
- compare(baseline: str, variant: str, query: str = '', group_by: str = '', format: str = 'table', time_interval_ms: float = 5000.0, threshold: float = 0.0, executor_threads: int = 0, index_dir: str = '', force_rebuild: bool = False, config: str = '') object¶
from dftracer.utils.utilities import ComparatorUtility
cmp = ComparatorUtility()
# Arrow table for programmatic analysis
table = cmp.compare("./traces_v1/run.pfw.gz", "./traces_v2/run.pfw.gz")
# JSON for serialization
json_str = cmp.compare_json("./traces_v1", "./traces_v2")
# Formatted table for display
print(cmp.compare_table("./baseline.pfw.gz", "./variant.pfw.gz"))
# With options
table = cmp.compare(
"./baseline.pfw.gz",
"./variant.pfw.gz",
query='cat == "POSIX"',
time_interval_ms=1000.0,
threshold=1.0,
)
# Callable shorthand (delegates to compare)
table = cmp("./baseline.pfw.gz", "./variant.pfw.gz")
Scalar Utilities (Dict Output)¶
These utilities return Python dicts. Arrow output is not applicable since their results are scalar or structural (not tabular).
StatisticsQueryUtility¶
Query pre-computed statistics from an indexed trace file. When bloom/chunk statistics are not available, the utility falls back to streaming the file sequentially and computing statistics on-the-fly.
process(file_path, query_type="summary", top_n=10, index_dir="") returns
a dict; query_type accepts "summary", "top_n_names", and other
pre-computed statistics views.
sq = StatisticsQueryUtility()
result = sq.process("trace.pfw.gz", query_type="summary")
print(result["total_events"])
result = sq.process("trace.pfw.gz", query_type="top_n_names", top_n=5)
for name, count in result["results"]:
print(f" {name}: {count}")
StatisticsAggregatorUtility¶
Aggregate statistics from a trace file. Uses pre-computed chunk
statistics from the .dftindex store when available. When chunk
statistics are absent, falls back to streaming the .pfw.gz
line-by-line and computing statistics on-the-fly.
- class dftracer.utils.dftracer_utils_ext.StatisticsAggregatorUtility(runtime: Runtime | None = None)¶
sa = StatisticsAggregatorUtility()
result = sa.process("trace.pfw.gz")
print(f"Events: {result['total_events']}")
print(f"Duration mean: {result['duration_mean_us']} us")
MetadataCollectorUtility¶
Collect metadata from a DFTracer trace file.
mc = MetadataCollectorUtility()
result = mc.process("trace.pfw.gz")
print(f"Size: {result['size_mb']:.2f} MB")
print(f"Format: {result['format']}")
print(f"Events: {result['valid_events']}")
ReorganizationPlannerUtility¶
Plan semantic reorganization of trace files. When manifest data is
available in the .dftindex store, produces per-checkpoint extraction
tasks. When manifest tables are absent, falls back to streaming the file
line-by-line and emitting one whole-file extraction task per query group.
- class dftracer.utils.dftracer_utils_ext.ReorganizationPlannerUtility(runtime: Runtime | None = None)¶
rp = ReorganizationPlannerUtility()
plan = rp.process(
source_files=["trace1.pfw.gz", "trace2.pfw.gz"],
groups=[{"name": "posix", "query": 'cat == "POSIX"'}],
)
print(f"Tasks: {len(plan['tasks'])}")
ReconstructionPlannerUtility¶
Plan reconstruction of original files from reorganized traces.
- class dftracer.utils.dftracer_utils_ext.ReconstructionPlannerUtility(runtime: Runtime | None = None)¶
rcp = ReconstructionPlannerUtility()
plan = rcp.process(reorganized_files=["reorg1.pfw.gz"])
print(f"Segments: {plan['total_segments']}")
Arrow Data Types¶
- class dftracer.utils.arrow.ArrowBatch(capsule: Any)[source]¶
Wrapper around an Arrow RecordBatch from the C extension.
Supports the Arrow PyCapsule protocol (__arrow_c_array__) for zero-copy interchange with pyarrow, polars, and DuckDB.
The underlying C data is exported via ownership transfer on first use. The pyarrow RecordBatch is cached so subsequent calls to
to_pandas(),to_polars(), or__arrow_c_array__()are safe.- __arrow_c_array__(requested_schema: Any = None) Tuple[Any, Any][source]¶
Export via Arrow C Data Interface.
- to_pandas() Any[source]¶
Convert to pandas DataFrame.
- Returns:
The converted DataFrame.
- Return type:
pandas.DataFrame
- Raises:
ImportError – If pyarrow is not installed.
- to_polars() Any[source]¶
Convert to polars DataFrame.
- Returns:
The converted DataFrame.
- Return type:
polars.DataFrame
- Raises:
ImportError – If polars is not installed.
- class dftracer.utils.arrow.ArrowTable(batches: Any, schema_capsule: Any | None = None)[source]¶
Wrapper around a collection of Arrow RecordBatches.
Returned by read_arrow() and utility process() methods. Supports the Arrow PyCapsule stream protocol (__arrow_c_stream__) for zero-copy interchange.
Accepts either a pre-built list of batches or a lazy iterator. When constructed from an iterator, batches are not materialized until data access (to_pandas, to_polars, batches, etc.).
num_rowsis special: if the iterator has not been consumed yet, it streams through counting rows without retaining batches (O(1) memory). After a streamingnum_rows, data access methods will return empty results – useiter_arrowdirectly if you need both count and data for very large datasets.- __arrow_c_stream__(requested_schema: Any = None) Any[source]¶
Arrow C Stream Interface – yields batches to consumers.
- to_pandas() Any[source]¶
Convert all batches to a single pandas DataFrame.
- Returns:
The converted DataFrame.
- Return type:
pandas.DataFrame
- Raises:
ImportError – If pyarrow is not installed.
- to_polars() Any[source]¶
Convert all batches to a single polars DataFrame.
- Returns:
The converted DataFrame.
- Return type:
polars.DataFrame
- Raises:
ImportError – If polars is not installed.