Indexer Module

The indexer module provides functionality for indexing DFTracer trace files (.pfw / .pfw.gz) backed by a .dftindex RocksDB store. The top-level Indexer follows a resolve / build pattern over a directory or file list and exposes the higher index tiers (checkpoints, bloom filters, manifests, aggregation). CheckpointIndexer is the lower-level single-file interface used for checkpoint-level operations.

Indexer Class

class dftracer.utils.Indexer(directory: str = '', files: list[str] | None = None, index_dir: str = '', require_checkpoint: bool = True, require_bloom: bool = True, require_manifest: bool = True, require_aggregation: bool = False, time_interval_ms: float = 5000.0, group_keys: list[str] | None = None, custom_metric_fields: list[str] | None = None, compute_percentiles: bool = False, checkpoint_size: int = 33554432, parallelism: int = 0, force_rebuild: bool = False, runtime: Runtime | None = None)[source]

Bases: object

High-level indexer for building and managing trace indexes.

Supports tiered indexing: - Tier 1: Checkpoints (for random access) - Tier 2: Bloom filters and manifests (for fast filtering) - Tier 3: Aggregation data (config-dependent)

At least one of ‘directory’ or ‘files’ must be provided.

Parameters:
  • directory (str) – Directory containing trace files (.pfw/.pfw.gz).

  • files (List[str] | None) – List of specific file paths to index.

  • index_dir (str) – Directory for .dftindex stores (default: next to files).

  • require_checkpoint (bool) – Build checkpoint tier (default True).

  • require_bloom (bool) – Build bloom filter tier (default True).

  • require_manifest (bool) – Build manifest tier (default True).

  • require_aggregation (bool | AggregationConfig | None) – Aggregation config or True for defaults (default None).

  • parallelism (int) – Number of parallel workers (0 = all cores).

  • force_rebuild (bool) – Force rebuild even if index exists.

  • runtime (Runtime | None) – Runtime for executor parallelism (default: global runtime).

Example

>>> indexer = Indexer("/path/to/traces")
>>> indexer.ensure_indexed()  # builds checkpoint, bloom, manifest
>>> # With explicit file list
>>> indexer = Indexer(files=["/path/to/trace1.pfw.gz", "/path/to/trace2.pfw.gz"])
>>> indexer.ensure_indexed()
>>> # With aggregation
>>> indexer = Indexer(
...     "/path/to/traces",
...     require_aggregation=AggregationConfig(time_interval_ms=1000),
... )
>>> indexer.ensure_indexed()  # fused pass with aggregation
close()[source]

Release resources.

property aggregation_config: AggregationConfig | None

Aggregation configuration, if enabled.

resolve() IndexStatus[source]

Check what files exist vs need indexing.

Returns:

IndexStatus with total_files, ready, and needs_work lists.

Return type:

IndexStatus

build() None[source]

Build all missing index tiers based on require_* flags.

This method builds indexes in parallel using the Runtime executor. When aggregation is enabled, it performs a fused pass for efficiency.

ensure_indexed() IndexStatus[source]

Resolve and build if needed.

Convenience method that calls resolve() then build() if needed.

Returns:

IndexStatus after building.

Return type:

IndexStatus

get_checkpoint_indexer(file_path: str) CheckpointIndexer[source]

Get a checkpoint indexer for a specific file.

Returns an indexer for checkpoint-level operations on a single file, such as finding checkpoints for random access.

Parameters:

file_path (str) – Path to the trace file (.pfw/.pfw.gz).

Returns:

Indexer instance for checkpoint operations (checkpoints, find_checkpoint, etc).

Return type:

CheckpointIndexer

get_hash_table(hash_type: str) dict[source]

Query hash table mappings.

Returns a dictionary mapping hash values to resolved names for the given hash type. This is useful for resolving fhash/hhash values in aggregated data.

Parameters:

hash_type (str) – One of ‘file’, ‘host’, ‘string’, or ‘proc’.

Returns:

dict mapping hash values (str) to resolved names (str).

Return type:

dict

Example

>>> indexer = Indexer("/path/to/traces")
>>> indexer.ensure_indexed()
>>> file_names = indexer.get_hash_table("file")
>>> # file_names = {"abc123": "/path/to/data.h5", ...}
query_file_pids(file_id: int) set[source]

Query PIDs observed in a specific file.

Parameters:

file_id (int) – Integer file ID from index.

Returns:

set of PIDs (int) observed in the file.

Return type:

set

query_all_file_pids() dict[source]

Query PIDs for all indexed files.

Returns a dictionary mapping file_id to the set of PIDs observed in that file. This is useful for distributed aggregation to assign files to workers by PID affinity.

Returns:

dict mapping file_id (int) to set of PIDs (int).

Return type:

dict

query_file_info() Tuple[Dict[int, str], Dict[int, Set[int]]][source]

Query file distribution info in a single DB open.

Returns:

  • file_id_to_path: dict[int, str] mapping DB file ID to path

  • file_pids: dict[int, set[int]] mapping file ID to PIDs

Return type:

Tuple of (file_id_to_path, file_pids) where

iter_aggregation(type: str = 'events', batch_size: int = 10000)[source]

Iterate over aggregation data as Arrow batches.

Requires that the index was built with require_aggregation=True. Returns Arrow batches that can be converted to pandas or pyarrow.

Parameters:
  • type (str) – Type of aggregation data - ‘events’, ‘profiles’, or ‘system’

  • batch_size (int) – Number of entries per Arrow batch (default 10000)

Yields:

Arrow batch capsules implementing __arrow_c_array__

Example

>>> import pyarrow as pa
>>> indexer = Indexer("/traces", require_aggregation=True)
>>> indexer.ensure_indexed()
>>> batches = [pa.record_batch(b) for b in indexer.iter_aggregation("events")]
>>> table = pa.concat_tables([pa.Table.from_batches([b]) for b in batches])
iter_arrow_dfanalyzer(type: str = 'events', batch_size: int = 10000, time_granularity: float = 1.0, time_resolution: float = 1000000.0, query: str | None = None)[source]

Iterate over aggregation data as dfanalyzer-compatible Arrow batches.

Returns Arrow batches with columns matching dfanalyzer schema:

  • Events/Profiles: cat, func_name, pid, tid, file_hash, host_hash, file_name, host_name, proc_name, io_cat, acc_pat, count, time, size, time_min, time_max, size_min, size_max, time_range, time_start, time_end

  • System: host_hash, time_range, sys_cpu_*, sys_mem_*

Hash resolution, time normalization, and computed columns (proc_name, io_cat) are done in C++ for performance.

Parameters:
  • type (str) – Type of aggregation data - ‘events’, ‘profiles’, or ‘system’

  • batch_size (int) – Number of entries per Arrow batch (default 10000)

  • time_granularity (float) – Bucket width in seconds (default 1.0)

  • time_resolution (float) – Microseconds per output time unit (default 1e6)

  • query (str | None) – Optional query filter string (e.g., “pid == 1234 or pid == 5678”)

Yields:

Arrow batch capsules implementing __arrow_c_array__

Example

>>> import pyarrow as pa
>>> indexer = Indexer("/traces", require_aggregation=True)
>>> indexer.ensure_indexed()
>>> batches = list(indexer.iter_arrow_dfanalyzer("events"))
>>> table = pa.concat_tables([pa.Table.from_batches([pa.record_batch(b)]) for b in batches])
iter_arrow_dfanalyzer_all(batch_size: int = 10000, time_granularity: float = 1.0, time_resolution: float = 1000000.0, query: str | None = None, group_by: List[str] | None = None)[source]

Iterate over all aggregation types in a single scan.

This is ~3x faster than calling iter_arrow_dfanalyzer separately for events, profiles, and system because it scans the index only once.

When group_by is provided, aggregation collapses dimensions during the scan and emits a reduced schema containing only the requested group columns plus aggregated metrics (count, time, size, time_sq, size_sq, time_min, time_max, size_min, size_max, time_call_min, time_call_max, size_call_min, size_call_max, time_start, time_end).

Parameters:
  • batch_size (int) – Number of entries per Arrow batch (default 10000)

  • time_granularity (float) – Bucket width in seconds (default 1.0)

  • time_resolution (float) – Microseconds per output time unit (default 1e6)

  • query (str | None) – Optional query filter string (e.g., “pid == 1234 or pid == 5678”)

  • group_by (List[str] | None) – Optional list of columns to group by for coarse in-scan aggregation. Supported: cat, func_name, pid, tid, file_hash, host_hash, file_name, host_name, proc_name, io_cat, acc_pat, time_range.

Returns:

Dict with ‘events’, ‘profiles’, ‘system’ keys, each containing a list of Arrow batch capsules.

Example

>>> import pyarrow as pa
>>> indexer = Indexer("/traces", require_aggregation=True)
>>> indexer.ensure_indexed()
>>> all_batches = indexer.iter_arrow_dfanalyzer_all()
>>> events = [pa.record_batch(b) for b in all_batches["events"]]

CheckpointIndexer Class

class dftracer.utils.CheckpointIndexer(gz_path: str, index_path: str | None = None, checkpoint_size: int = 1048576, force_rebuild: bool = False, build_bloom: bool = False, build_manifest: bool = False, runtime: Runtime | None = None)

Bases: _BaseNative

Checkpoint indexer for single-file checkpoint-level operations.

__enter__() CheckpointIndexer

Enter the runtime context for the with statement.

__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None

Release this Python wrapper on context exit.

This does not force-close the shared RocksDB instance for the same .dftindex path.

build() None

Build the index.

property checkpoint_size: int

Get checkpoint size.

close() None

Release this Python wrapper’s native indexer handle.

This does not force-close the shared RocksDB instance for the same .dftindex path.

exists() bool

Check if the .dftindex store exists.

find_checkpoint(target_offset: int) IndexerCheckpoint | None

Find checkpoint for target offset.

get_checkpoints() list[IndexerCheckpoint]

Get all checkpoints.

get_max_bytes() int

Get maximum byte position.

get_num_lines() int

Get number of lines.

property gz_path: str

Get gzip path.

property has_bloom: bool

Whether bloom filter data exists in the .dftindex store.

property has_manifest: bool

Whether manifest data exists in the .dftindex store.

property index_path: str

Get the .dftindex path.

need_rebuild() bool

Check if index needs rebuilding.

IndexerCheckpoint Class

class dftracer.utils.IndexerCheckpoint

Bases: _BaseNative

Information about a checkpoint in the index.

bits = 0
c_offset = 0
c_size = 0
checkpoint_idx = 0
num_lines = 0
uc_offset = 0
uc_size = 0

Distributed Index (SST-based)

The distributed-index path lets the coordinator pre-register files, hand out file_id ranges to workers, and bulk-ingest worker-produced SST artifacts back into the unified .dftindex store.

IndexDatabase

class dftracer.utils.dftracer_utils_ext.IndexDatabase(index_path: str)

Handle to a .dftindex RocksDB store.

Used by the distributed indexer coordinator to pre-register files, reserve file_id ranges, bulk-ingest worker-produced SSTs, and rebuild root summaries.

bulk_ingest(registry: SstArtifactRegistry, skip_cfs: object = None) None

Ingest all SSTs collected in the registry.

skip_cfs is an optional iterable of CF names whose SSTs are left outside the unified DB. Distributed builds pass {“aggregation”, “system_metrics”} to keep per-worker AGG/SYS SSTs addressable via agg_manifest.json for parallel reads at analyze time. See dftracer.utils.dask.consolidate_index to fold them back into the unified DB later.

init_schema() None
rebuild_root_summaries() None

Recompute ROOT_* summary column families from per-file CFs.

register_files(paths: list[str], build_manifest: bool = False) list[int]

Register each path in the DEFAULT-CF file registry and return the assigned file_ids (parallel to paths). Idempotent for files with matching hash.

reserve_file_id_range(count: int) int

Atomically reserve count contiguous file_ids; return first.

write_agg_file_markers(file_ids: object) None

Write per-file aggregation completion markers into the AGGREGATION CF.

Each marker is \xFF\xFF + file_id_be32. The index resolver uses their presence to decide whether each file has aggregated data; if missing, ensure_indexed() concludes the aggregation tier is incomplete and re-runs the entire build. Distributed_index must call this after bulk_ingest so subsequent read_trace calls do not redundantly re-aggregate.

write_agg_global_config(time_interval_us: int, config_hash: int = 0) None

Write the aggregation global-config marker into the AGGREGATION CF.

Required for Indexer.iter_arrow_dfanalyzer_all on distributed builds (which never materialise the key via worker SSTs) and post-consolidate indices.

write_aggregation_tracker(blobs: list[bytes]) None

Merge serialized AssociationTracker blobs and write the result to the AGGREGATION CF under the __tracker__ key.

SstArtifactRegistry

class dftracer.utils.dftracer_utils_ext.SstArtifactRegistry

Thread-safe collector for SST artifact paths produced by workers.

append(artifacts_dict: dict[str, str | None]) None

Add a per-batch Artifacts dict as returned by build_sst_batch.

Module-level Functions

dftracer.utils.dftracer_utils_ext.scan_files(directory: str, patterns: list[str] | None = None, recursive: bool = False, runtime: Runtime | object | None = None) list[tuple[str, int]]

Parallel directory scan returning (path, size) tuples for regular files matching the patterns.

dftracer.utils.dftracer_utils_ext.scan_aggregation_manifest(agg_ssts: list[str], sys_ssts: list[str], scratch_dir: str, meta_index_path: str, batch_size: int = 10000, time_granularity: float = 1.0, time_resolution: float = 1000000.0, query: str | None = None, group_by: list[str] | None = None, shard_begin: int = 0, shard_end: int = 4096, runtime: Runtime | object | None = None, file_hashes: dict[str, str] | None = None, host_hashes: dict[str, str] | None = None) dict[str, list['_ArrowBatchCapsule']]

Scan a worker’s slice of the distributed aggregation manifest.

Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at scratch_dir (caller owns the directory lifecycle) and runs the dfanalyzer aggregation scan over [shard_begin, shard_end). meta_index_path is the unified .dftindex used to resolve file / host hashes.

Returns the same dict shape as Indexer.iter_arrow_dfanalyzer_all: {“events”: […], “profiles”: […], “system”: […]}.

dftracer.utils.dftracer_utils_ext.build_sst_batch(files: list[str], file_ids: list[int], staging_dir: str, batch_id: str, index_dir: str = '', checkpoint_size: int = 33554432, build_manifest: bool = False, force_rebuild: bool = False, bloom_dimensions: list[str] | None = None, parallelism: int = 0, flush_every_files: int = 0, runtime: Runtime | object | None = None, aggregation_config: object = None, file_slices: object = None) tuple[list[dict[str, str | None]], bytes]

Run the indexer pipeline with an SST sink. Returns (artifact_dicts, tracker_blob). tracker_blob is the serialized merged AssociationTracker for the batch (empty bytes when aggregation_config is None). file_slices enables intra-file parallelism; entries are None (whole file) or (member_begin, member_end, checkpoint_idx_base, skip_file_scoped_writes, members).

dftracer.utils.dftracer_utils_ext.plan_lpt_partition(entries: list[tuple[str, int]], num_workers: int) list[list[tuple[str, int]]]

Greedy LPT bin-packing of (path, size) tuples into num_workers buckets, minimising the maximum per-worker total size.

dftracer.utils.dftracer_utils_ext.enumerate_gzip_members(files: list[str], runtime: Runtime | object | None = None) list[list[tuple[int, int]]]

Cooperative async scan of gzip member offsets. Returns lists of (c_offset, c_size) parallel to files; empty for non-gzip files.

dftracer.utils.dftracer_utils_ext.plan_work_units(member_map: list[list[tuple[int, int]]], num_workers: int, target_c_size: int = 0) list[list[tuple[int, int, int, int]]]

Deterministic LPT assignment of intra-file gzip-member slices across workers. Returns per-worker lists of (file_idx, member_begin, member_end, c_size).

dftracer.utils.dftracer_utils_ext.move_artifacts(artifacts: dict[str, str | None], dest_dir: str) dict[str, str | None]

Move every populated SST in artifacts into dest_dir via the C++ rename/copy helper, returning a fresh dict with the new paths.

dftracer.utils.dftracer_utils_ext.enable_aggregation_deterministic_ids() None

Flip the global aggregation StringIntern into deterministic-id mode so the same string maps to the same 32-bit id in every worker process.

Dask Helpers

The dftracer.utils.dask module provides Dask-distributed drivers built on the SST-based primitives above:

dftracer.utils.dask.distributed_index(directory: str = '', files: List[str] | None = None, index_path: str = '', local_staging: str = '', shared_staging: str = '', client: dask.distributed.Client | None = None, checkpoint_size: int = 33554432, bloom_dimensions: List[str] | None = None, build_manifest: bool = True, force_rebuild: bool = False, partition: str = 'lpt', rebuild_root_summaries: bool = True, parallelism_per_worker: int = 0, flush_every_files: int = 0, aggregation_config: Any | None = None) Dict[str, Any][source]

Index a set of trace files using Dask workers writing SSTs in parallel.

Steps (all O(1) on the coordinator except the fan-out):
  1. Enumerate files + sizes via parallel scan.

  2. LPT bin-pack files into one bucket per Dask worker.

  3. Register all files on the coordinator’s IndexDatabase (pre-assigns file_ids and writes DEFAULT-CF entries once).

  4. Submit one Dask task per non-empty worker that runs the existing indexer pipeline with an SST sink, writing SSTs to local_staging and (if different) moving them to shared_staging.

  5. Collect artifact dicts into an SstArtifactRegistry; coordinator calls bulk_ingest + rebuild_root_summaries.

Parameters:
  • directory (str) – Directory containing trace files.

  • files (List[str] | None) – Explicit file list (alternative to directory).

  • index_path (str) – Target .dftindex path (coordinator-writable).

  • local_staging (str) – Per-worker SST build dir. If equal to shared_staging, no post-build move.

  • shared_staging (str) – Shared FS dir the coordinator reads SSTs from during ingest. Must be on the same filesystem as index_path for the cheapest ingest.

  • client (dask.distributed.Client | None) – Dask distributed Client. None -> run tasks inline.

  • partition (str) – “lpt” (greedy longest-processing-time bin-pack) or “round_robin”.

  • rebuild_root_summaries (bool) – If True, recompute ROOT_* CFs after ingest.

  • parallelism_per_worker (int) – 0 -> let the plugin/default Runtime choose (one coroutine thread per core).

  • flush_every_files (int) – 0 -> build SSTs once per worker; >0 -> flush mid-batch to bound peak memory.

Returns:

dict with total_files, per_worker sizes, index_path, artifact_count.

Return type:

Dict[str, Any]

dftracer.utils.dask.distributed_aggregate(directory: str = '', files: List[str] | None = None, client: dask.distributed.Client | None = None, time_interval_ms: float = 5000.0, time_granularity: float = 1.0, time_resolution: float = 1000000.0, index_dir: str = '', data_type: str = 'events') pyarrow.Table[source]

Aggregate trace data using Dask distributed workers.

This function: 1. Indexes all files on coordinator to get PID manifests 2. Assigns files to workers by PID affinity (minimize cross-worker overlap) 3. Each worker aggregates its files using iter_arrow_dfanalyzer 4. Gathers partial Arrow tables from workers 5. Re-aggregates overlapping keys (same PID/time_range across files)

Parameters:
  • directory (str) – Directory containing trace files (.pfw/.pfw.gz).

  • files (List[str] | None) – Explicit list of files (alternative to directory).

  • client (dask.distributed.Client | None) – Dask distributed Client. If None, uses dask.delayed locally.

  • time_interval_ms (float) – Aggregation time bucket in milliseconds.

  • time_granularity (float) – Output time bucket width in seconds.

  • time_resolution (float) – Microseconds per output time unit.

  • index_dir (str) – Directory for index storage.

  • data_type (str) – Type of data to aggregate - ‘events’, ‘profiles’, or ‘system’.

Returns:

PyArrow Table with aggregated data.

Return type:

pyarrow.Table

Example

>>> from dask.distributed import Client
>>> from dftracer.utils.dask import distributed_aggregate
>>>
>>> client = Client("scheduler:8786")
>>> client.register_plugin(DFTracerUtilsDaskWorkerPlugin(threads=48))
>>>
>>> table = distributed_aggregate(
...     directory="/traces",
...     client=client,
...     time_interval_ms=5000,
... )
>>> df = table.to_pandas()

Dask is an optional dependency – this module is only importable when dask.distributed is installed.