Indexer Module¶
The indexer module provides functionality for indexing DFTracer trace files
(.pfw / .pfw.gz) backed by a .dftindex RocksDB store. The
top-level Indexer follows a resolve / build
pattern over a directory or file list and exposes the higher index tiers
(checkpoints, bloom filters, manifests, aggregation).
CheckpointIndexer is the lower-level single-file
interface used for checkpoint-level operations.
Indexer Class¶
- class dftracer.utils.Indexer(directory: str = '', files: list[str] | None = None, index_dir: str = '', require_checkpoint: bool = True, require_bloom: bool = True, require_manifest: bool = True, require_aggregation: bool = False, time_interval_ms: float = 5000.0, group_keys: list[str] | None = None, custom_metric_fields: list[str] | None = None, compute_percentiles: bool = False, checkpoint_size: int = 33554432, parallelism: int = 0, force_rebuild: bool = False, runtime: Runtime | None = None)[source]¶
Bases:
objectHigh-level indexer for building and managing trace indexes.
Supports tiered indexing: - Tier 1: Checkpoints (for random access) - Tier 2: Bloom filters and manifests (for fast filtering) - Tier 3: Aggregation data (config-dependent)
At least one of ‘directory’ or ‘files’ must be provided.
- Parameters:
directory (str) – Directory containing trace files (.pfw/.pfw.gz).
files (List[str] | None) – List of specific file paths to index.
index_dir (str) – Directory for .dftindex stores (default: next to files).
require_checkpoint (bool) – Build checkpoint tier (default True).
require_bloom (bool) – Build bloom filter tier (default True).
require_manifest (bool) – Build manifest tier (default True).
require_aggregation (bool | AggregationConfig | None) – Aggregation config or True for defaults (default None).
parallelism (int) – Number of parallel workers (0 = all cores).
force_rebuild (bool) – Force rebuild even if index exists.
runtime (Runtime | None) – Runtime for executor parallelism (default: global runtime).
Example
>>> indexer = Indexer("/path/to/traces") >>> indexer.ensure_indexed() # builds checkpoint, bloom, manifest
>>> # With explicit file list >>> indexer = Indexer(files=["/path/to/trace1.pfw.gz", "/path/to/trace2.pfw.gz"]) >>> indexer.ensure_indexed()
>>> # With aggregation >>> indexer = Indexer( ... "/path/to/traces", ... require_aggregation=AggregationConfig(time_interval_ms=1000), ... ) >>> indexer.ensure_indexed() # fused pass with aggregation
- resolve() IndexStatus[source]¶
Check what files exist vs need indexing.
- Returns:
IndexStatus with total_files, ready, and needs_work lists.
- Return type:
IndexStatus
- build() None[source]¶
Build all missing index tiers based on require_* flags.
This method builds indexes in parallel using the Runtime executor. When aggregation is enabled, it performs a fused pass for efficiency.
- ensure_indexed() IndexStatus[source]¶
Resolve and build if needed.
Convenience method that calls resolve() then build() if needed.
- Returns:
IndexStatus after building.
- Return type:
IndexStatus
- get_checkpoint_indexer(file_path: str) CheckpointIndexer[source]¶
Get a checkpoint indexer for a specific file.
Returns an indexer for checkpoint-level operations on a single file, such as finding checkpoints for random access.
- Parameters:
file_path (str) – Path to the trace file (.pfw/.pfw.gz).
- Returns:
Indexer instance for checkpoint operations (checkpoints, find_checkpoint, etc).
- Return type:
CheckpointIndexer
- get_hash_table(hash_type: str) dict[source]¶
Query hash table mappings.
Returns a dictionary mapping hash values to resolved names for the given hash type. This is useful for resolving fhash/hhash values in aggregated data.
- Parameters:
hash_type (str) – One of ‘file’, ‘host’, ‘string’, or ‘proc’.
- Returns:
dict mapping hash values (str) to resolved names (str).
- Return type:
Example
>>> indexer = Indexer("/path/to/traces") >>> indexer.ensure_indexed() >>> file_names = indexer.get_hash_table("file") >>> # file_names = {"abc123": "/path/to/data.h5", ...}
- query_all_file_pids() dict[source]¶
Query PIDs for all indexed files.
Returns a dictionary mapping file_id to the set of PIDs observed in that file. This is useful for distributed aggregation to assign files to workers by PID affinity.
- Returns:
dict mapping file_id (int) to set of PIDs (int).
- Return type:
- query_file_info() Tuple[Dict[int, str], Dict[int, Set[int]]][source]¶
Query file distribution info in a single DB open.
- Returns:
file_id_to_path: dict[int, str] mapping DB file ID to path
file_pids: dict[int, set[int]] mapping file ID to PIDs
- Return type:
Tuple of (file_id_to_path, file_pids) where
- iter_aggregation(type: str = 'events', batch_size: int = 10000)[source]¶
Iterate over aggregation data as Arrow batches.
Requires that the index was built with require_aggregation=True. Returns Arrow batches that can be converted to pandas or pyarrow.
- Parameters:
- Yields:
Arrow batch capsules implementing __arrow_c_array__
Example
>>> import pyarrow as pa >>> indexer = Indexer("/traces", require_aggregation=True) >>> indexer.ensure_indexed() >>> batches = [pa.record_batch(b) for b in indexer.iter_aggregation("events")] >>> table = pa.concat_tables([pa.Table.from_batches([b]) for b in batches])
- iter_arrow_dfanalyzer(type: str = 'events', batch_size: int = 10000, time_granularity: float = 1.0, time_resolution: float = 1000000.0, query: str | None = None)[source]¶
Iterate over aggregation data as dfanalyzer-compatible Arrow batches.
Returns Arrow batches with columns matching dfanalyzer schema:
Events/Profiles: cat, func_name, pid, tid, file_hash, host_hash, file_name, host_name, proc_name, io_cat, acc_pat, count, time, size, time_min, time_max, size_min, size_max, time_range, time_start, time_end
System: host_hash, time_range,
sys_cpu_*,sys_mem_*
Hash resolution, time normalization, and computed columns (proc_name, io_cat) are done in C++ for performance.
- Parameters:
type (str) – Type of aggregation data - ‘events’, ‘profiles’, or ‘system’
batch_size (int) – Number of entries per Arrow batch (default 10000)
time_granularity (float) – Bucket width in seconds (default 1.0)
time_resolution (float) – Microseconds per output time unit (default 1e6)
query (str | None) – Optional query filter string (e.g., “pid == 1234 or pid == 5678”)
- Yields:
Arrow batch capsules implementing __arrow_c_array__
Example
>>> import pyarrow as pa >>> indexer = Indexer("/traces", require_aggregation=True) >>> indexer.ensure_indexed() >>> batches = list(indexer.iter_arrow_dfanalyzer("events")) >>> table = pa.concat_tables([pa.Table.from_batches([pa.record_batch(b)]) for b in batches])
- iter_arrow_dfanalyzer_all(batch_size: int = 10000, time_granularity: float = 1.0, time_resolution: float = 1000000.0, query: str | None = None, group_by: List[str] | None = None)[source]¶
Iterate over all aggregation types in a single scan.
This is ~3x faster than calling iter_arrow_dfanalyzer separately for events, profiles, and system because it scans the index only once.
When
group_byis provided, aggregation collapses dimensions during the scan and emits a reduced schema containing only the requested group columns plus aggregated metrics (count,time,size,time_sq,size_sq,time_min,time_max,size_min,size_max,time_call_min,time_call_max,size_call_min,size_call_max,time_start,time_end).- Parameters:
batch_size (int) – Number of entries per Arrow batch (default 10000)
time_granularity (float) – Bucket width in seconds (default 1.0)
time_resolution (float) – Microseconds per output time unit (default 1e6)
query (str | None) – Optional query filter string (e.g., “pid == 1234 or pid == 5678”)
group_by (List[str] | None) – Optional list of columns to group by for coarse in-scan aggregation. Supported:
cat,func_name,pid,tid,file_hash,host_hash,file_name,host_name,proc_name,io_cat,acc_pat,time_range.
- Returns:
Dict with ‘events’, ‘profiles’, ‘system’ keys, each containing a list of Arrow batch capsules.
Example
>>> import pyarrow as pa >>> indexer = Indexer("/traces", require_aggregation=True) >>> indexer.ensure_indexed() >>> all_batches = indexer.iter_arrow_dfanalyzer_all() >>> events = [pa.record_batch(b) for b in all_batches["events"]]
CheckpointIndexer Class¶
- class dftracer.utils.CheckpointIndexer(gz_path: str, index_path: str | None = None, checkpoint_size: int = 1048576, force_rebuild: bool = False, build_bloom: bool = False, build_manifest: bool = False, runtime: Runtime | None = None)¶
Bases:
_BaseNativeCheckpoint indexer for single-file checkpoint-level operations.
- __enter__() CheckpointIndexer¶
Enter the runtime context for the with statement.
- __exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None¶
Release this Python wrapper on context exit.
This does not force-close the shared RocksDB instance for the same
.dftindexpath.
- close() None¶
Release this Python wrapper’s native indexer handle.
This does not force-close the shared RocksDB instance for the same
.dftindexpath.
- find_checkpoint(target_offset: int) IndexerCheckpoint | None¶
Find checkpoint for target offset.
- get_checkpoints() list[IndexerCheckpoint]¶
Get all checkpoints.
IndexerCheckpoint Class¶
Distributed Index (SST-based)¶
The distributed-index path lets the coordinator pre-register files, hand out
file_id ranges to workers, and bulk-ingest worker-produced SST artifacts
back into the unified .dftindex store.
IndexDatabase¶
- class dftracer.utils.dftracer_utils_ext.IndexDatabase(index_path: str)¶
Handle to a .dftindex RocksDB store.
Used by the distributed indexer coordinator to pre-register files, reserve file_id ranges, bulk-ingest worker-produced SSTs, and rebuild root summaries.
- bulk_ingest(registry: SstArtifactRegistry, skip_cfs: object = None) None¶
Ingest all SSTs collected in the registry.
skip_cfs is an optional iterable of CF names whose SSTs are left outside the unified DB. Distributed builds pass {“aggregation”, “system_metrics”} to keep per-worker AGG/SYS SSTs addressable via agg_manifest.json for parallel reads at analyze time. See dftracer.utils.dask.consolidate_index to fold them back into the unified DB later.
- register_files(paths: list[str], build_manifest: bool = False) list[int]¶
Register each path in the DEFAULT-CF file registry and return the assigned file_ids (parallel to paths). Idempotent for files with matching hash.
- write_agg_file_markers(file_ids: object) None¶
Write per-file aggregation completion markers into the AGGREGATION CF.
Each marker is
\xFF\xFF + file_id_be32. The index resolver uses their presence to decide whether each file has aggregated data; if missing,ensure_indexed()concludes the aggregation tier is incomplete and re-runs the entire build. Distributed_index must call this afterbulk_ingestso subsequentread_tracecalls do not redundantly re-aggregate.
SstArtifactRegistry¶
Module-level Functions¶
- dftracer.utils.dftracer_utils_ext.scan_files(directory: str, patterns: list[str] | None = None, recursive: bool = False, runtime: Runtime | object | None = None) list[tuple[str, int]]¶
Parallel directory scan returning (path, size) tuples for regular files matching the patterns.
- dftracer.utils.dftracer_utils_ext.scan_aggregation_manifest(agg_ssts: list[str], sys_ssts: list[str], scratch_dir: str, meta_index_path: str, batch_size: int = 10000, time_granularity: float = 1.0, time_resolution: float = 1000000.0, query: str | None = None, group_by: list[str] | None = None, shard_begin: int = 0, shard_end: int = 4096, runtime: Runtime | object | None = None, file_hashes: dict[str, str] | None = None, host_hashes: dict[str, str] | None = None) dict[str, list['_ArrowBatchCapsule']]¶
Scan a worker’s slice of the distributed aggregation manifest.
Ingests agg_ssts + sys_ssts into a scratch IndexDatabase at scratch_dir (caller owns the directory lifecycle) and runs the dfanalyzer aggregation scan over [shard_begin, shard_end). meta_index_path is the unified .dftindex used to resolve file / host hashes.
Returns the same dict shape as Indexer.iter_arrow_dfanalyzer_all: {“events”: […], “profiles”: […], “system”: […]}.
- dftracer.utils.dftracer_utils_ext.build_sst_batch(files: list[str], file_ids: list[int], staging_dir: str, batch_id: str, index_dir: str = '', checkpoint_size: int = 33554432, build_manifest: bool = False, force_rebuild: bool = False, bloom_dimensions: list[str] | None = None, parallelism: int = 0, flush_every_files: int = 0, runtime: Runtime | object | None = None, aggregation_config: object = None, file_slices: object = None) tuple[list[dict[str, str | None]], bytes]¶
Run the indexer pipeline with an SST sink. Returns (artifact_dicts, tracker_blob). tracker_blob is the serialized merged AssociationTracker for the batch (empty bytes when aggregation_config is None). file_slices enables intra-file parallelism; entries are None (whole file) or (member_begin, member_end, checkpoint_idx_base, skip_file_scoped_writes, members).
- dftracer.utils.dftracer_utils_ext.plan_lpt_partition(entries: list[tuple[str, int]], num_workers: int) list[list[tuple[str, int]]]¶
Greedy LPT bin-packing of (path, size) tuples into num_workers buckets, minimising the maximum per-worker total size.
- dftracer.utils.dftracer_utils_ext.enumerate_gzip_members(files: list[str], runtime: Runtime | object | None = None) list[list[tuple[int, int]]]¶
Cooperative async scan of gzip member offsets. Returns lists of (c_offset, c_size) parallel to files; empty for non-gzip files.
- dftracer.utils.dftracer_utils_ext.plan_work_units(member_map: list[list[tuple[int, int]]], num_workers: int, target_c_size: int = 0) list[list[tuple[int, int, int, int]]]¶
Deterministic LPT assignment of intra-file gzip-member slices across workers. Returns per-worker lists of (file_idx, member_begin, member_end, c_size).
Dask Helpers¶
The dftracer.utils.dask module provides Dask-distributed drivers built on
the SST-based primitives above:
- dftracer.utils.dask.distributed_index(directory: str = '', files: List[str] | None = None, index_path: str = '', local_staging: str = '', shared_staging: str = '', client: dask.distributed.Client | None = None, checkpoint_size: int = 33554432, bloom_dimensions: List[str] | None = None, build_manifest: bool = True, force_rebuild: bool = False, partition: str = 'lpt', rebuild_root_summaries: bool = True, parallelism_per_worker: int = 0, flush_every_files: int = 0, aggregation_config: Any | None = None) Dict[str, Any][source]¶
Index a set of trace files using Dask workers writing SSTs in parallel.
- Steps (all O(1) on the coordinator except the fan-out):
Enumerate files + sizes via parallel scan.
LPT bin-pack files into one bucket per Dask worker.
Register all files on the coordinator’s IndexDatabase (pre-assigns file_ids and writes DEFAULT-CF entries once).
Submit one Dask task per non-empty worker that runs the existing indexer pipeline with an SST sink, writing SSTs to local_staging and (if different) moving them to shared_staging.
Collect artifact dicts into an SstArtifactRegistry; coordinator calls bulk_ingest + rebuild_root_summaries.
- Parameters:
directory (str) – Directory containing trace files.
files (List[str] | None) – Explicit file list (alternative to directory).
index_path (str) – Target .dftindex path (coordinator-writable).
local_staging (str) – Per-worker SST build dir. If equal to shared_staging, no post-build move.
shared_staging (str) – Shared FS dir the coordinator reads SSTs from during ingest. Must be on the same filesystem as index_path for the cheapest ingest.
client (dask.distributed.Client | None) – Dask distributed Client. None -> run tasks inline.
partition (str) – “lpt” (greedy longest-processing-time bin-pack) or “round_robin”.
rebuild_root_summaries (bool) – If True, recompute ROOT_* CFs after ingest.
parallelism_per_worker (int) – 0 -> let the plugin/default Runtime choose (one coroutine thread per core).
flush_every_files (int) – 0 -> build SSTs once per worker; >0 -> flush mid-batch to bound peak memory.
- Returns:
dict with total_files, per_worker sizes, index_path, artifact_count.
- Return type:
- dftracer.utils.dask.distributed_aggregate(directory: str = '', files: List[str] | None = None, client: dask.distributed.Client | None = None, time_interval_ms: float = 5000.0, time_granularity: float = 1.0, time_resolution: float = 1000000.0, index_dir: str = '', data_type: str = 'events') pyarrow.Table[source]¶
Aggregate trace data using Dask distributed workers.
This function: 1. Indexes all files on coordinator to get PID manifests 2. Assigns files to workers by PID affinity (minimize cross-worker overlap) 3. Each worker aggregates its files using iter_arrow_dfanalyzer 4. Gathers partial Arrow tables from workers 5. Re-aggregates overlapping keys (same PID/time_range across files)
- Parameters:
directory (str) – Directory containing trace files (.pfw/.pfw.gz).
files (List[str] | None) – Explicit list of files (alternative to directory).
client (dask.distributed.Client | None) – Dask distributed Client. If None, uses dask.delayed locally.
time_interval_ms (float) – Aggregation time bucket in milliseconds.
time_granularity (float) – Output time bucket width in seconds.
time_resolution (float) – Microseconds per output time unit.
index_dir (str) – Directory for index storage.
data_type (str) – Type of data to aggregate - ‘events’, ‘profiles’, or ‘system’.
- Returns:
PyArrow Table with aggregated data.
- Return type:
pyarrow.Table
Example
>>> from dask.distributed import Client >>> from dftracer.utils.dask import distributed_aggregate >>> >>> client = Client("scheduler:8786") >>> client.register_plugin(DFTracerUtilsDaskWorkerPlugin(threads=48)) >>> >>> table = distributed_aggregate( ... directory="/traces", ... client=client, ... time_interval_ms=5000, ... ) >>> df = table.to_pandas()
Dask is an optional dependency – this module is only importable when
dask.distributed is installed.