Indexer

Namespace: dftracer::utils::utilities::indexer

For usage guide and examples, see Indexer Components.

struct FileMetadataResult

Public Members

std::uint64_t checkpoint_size = 0
std::uint64_t num_lines = 0
std::uint64_t max_bytes = 0
struct FileRegistryEntry

Public Members

int file_id = -1
IndexFileEntryCapability capabilities = IndexFileEntryCapability::NONE
class IndexBatchBuilderUtility

Public Static Functions

static coro::CoroTask<IndexBuildBatchResult> process(CoroScope *scope, std::shared_ptr<IndexBuildBatchConfig> config)
class IndexBatchSink

Abstract sink that accepts index records for a batch of files.

Two backends implement this:

Only the step-1 subset of methods is abstracted here (file metadata, checkpoints, manifest event ranges and metadata lines). Bloom/hash/stats writes remain on the concrete type until their CFs are ported to SST.

Subclassed by dftracer::utils::utilities::indexer::IndexDatabaseSstWriterContext, dftracer::utils::utilities::indexer::IndexDatabaseWriterContext

Public Types

using IndexerCheckpoint = internal::IndexerCheckpoint
using ChunkStatistics = composites::dft::indexing::ChunkStatistics
using ChunkDimensionStats = composites::dft::indexing::ChunkDimensionStats

Public Functions

virtual ~IndexBatchSink() = default
virtual void insert_file_metadata(int file_id, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size) = 0
virtual void insert_checkpoint(int file_id, const IndexerCheckpoint &checkpoint) = 0
virtual void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) = 0
virtual void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) = 0
virtual void insert_file_pids(int file_id, const std::unordered_set<std::uint64_t> &pids) = 0
virtual void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) = 0
virtual void insert_file_bloom_filter(int file_id, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) = 0
virtual void insert_chunk_statistics(int file_id, std::uint64_t checkpoint_idx, const ChunkStatistics &stats) = 0
virtual void insert_file_scalar_stats(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks) = 0
virtual void insert_file_category_counts(int file_id, const StringViewMap<std::uint64_t> &counts) = 0
virtual void insert_file_pid_tid_counts(int file_id, const StringViewMap<std::uint64_t> &counts) = 0
virtual void insert_file_name_counts(int file_id, const StringViewMap<std::uint64_t> &counts) = 0
virtual void insert_index_dimension(int file_id, std::string_view dimension) = 0
virtual void insert_chunk_dimension_stats(int file_id, std::uint64_t checkpoint_idx, const ChunkDimensionStats &stats, std::size_t value_counts_cap = 4096) = 0
virtual void insert_name_dictionary_entry(std::uint64_t name_id, std::string_view name) = 0
virtual void insert_name_file_posting(std::uint64_t name_id, int file_id) = 0
virtual void insert_name_chunk_posting(std::uint64_t name_id, int file_id, std::uint64_t checkpoint_idx) = 0
virtual void insert_hash_table_entry(std::uint8_t type, std::string_view hash, std::string_view name) = 0
virtual void insert_aggregation_merge(std::string_view key, std::string_view operand) = 0
virtual void insert_aggregation_put(std::string_view key, std::string_view value) = 0
virtual void insert_system_metrics_merge(std::string_view key, std::string_view operand) = 0
inline void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, const std::vector<std::uint32_t> &line_numbers)
inline void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, const std::vector<std::uint32_t> &line_numbers)
struct IndexBuildBatchConfig

Public Types

using DftVisitorFactory = std::function<std::vector<std::unique_ptr<composites::dft::DftEventVisitor>>(const std::string &file_path)>

Factory for creating per-file DftEventVisitors during the parse phase. Called once per file with the file path. Caller owns the returned visitors and can extract results after the batch completes.

using ExtraVisitorsDrainFn = std::function<void(std::vector<std::vector<std::unique_ptr<composites::dft::DftEventVisitor>>>)>

Optional drain callback invoked once per sub-batch with the extra visitors for that sub-batch’s files. Lets the caller consume and release visitor state immediately, keeping memory bounded by flush_every_files instead of accumulating across the whole pipeline.

using SinkFactory = std::function<std::unique_ptr<IndexBatchSink>()>

Optional batch-sink factory. If set, the write phase constructs a fresh sink per batch via this factory instead of opening the RocksDB-backed writer on index_dir. Used by the distributed (SST) pipeline to route writes to per-worker SstWriterContext instances. sink_commit must also be set and is responsible for finalising each sink (RocksDB path: call .commit(); SST path: flush + route Artifacts to a registry).

using SinkCommitFn = std::function<void(IndexBatchSink&)>

Public Members

std::vector<std::string> file_paths
std::string index_dir
std::size_t checkpoint_size = 32 * 1024 * 1024
std::size_t parallelism = 1
bool force_rebuild = false
bool build_manifest = false
composites::dft::indexing::ChunkIndexerConfig bloom_config
std::vector<std::string> bloom_dimensions
bool use_batch_write = true
bool rebuild_root_summaries = true
std::size_t flush_every_files = 0

If > 0, process files in sub-batches of this size, flushing parsed artifacts to the write phase between sub-batches. Bounds peak memory to ~flush_every_files worth of ParsedBloomJob state. 0 = no flush (all files parsed before any write).

DftVisitorFactory dft_visitor_factory
ExtraVisitorsDrainFn extra_visitors_drain
std::vector<int> preassigned_file_ids

If non-empty, parallel to file_paths: use these file_ids instead of allocating via get_or_create_file_info. Used by the distributed indexer where the coordinator pre-registers all files. When set, the write phase skips the DEFAULT-CF registry open/write step.

std::vector<FileSlice> file_slices
SinkFactory sink_factory
SinkCommitFn sink_commit
struct FileSlice

Optional per-file member slice (cross-rank file splitting). When non-empty, must be parallel to file_paths. A null/empty entry means “process the whole file”; a populated entry restricts the build to [member_begin, member_end). The members vector must outlive the batch (typically stored in a shared member map).

Public Members

const std::vector<internal::GzipMember> *members = nullptr
std::size_t member_begin = 0
std::size_t member_end = 0
std::uint64_t checkpoint_idx_base = 0
bool skip_file_scoped_writes = false

When true, this file’s file-scoped data (checkpoints, bloom/manifest/hashtable, file_metadata) is NOT persisted by the write phase. Aggregation/system-metrics SSTs produced by extra visitors are still collected. Set by the MPI driver for sliced ranks where member_begin > 0 to avoid cross-rank key collisions on file-scoped CFs.

struct IndexBuildBatchMetrics

Public Members

std::uint64_t parse_ns = 0
std::uint64_t write_ns = 0
std::size_t files_enqueued = 0
std::size_t files_parsed = 0
std::size_t files_written = 0
struct IndexBuildBatchResult

Public Members

std::vector<IndexBuildResult> results
std::size_t indexed = 0
std::size_t skipped = 0
std::size_t failed = 0
std::uint64_t total_events = 0
IndexBuildBatchMetrics metrics
std::vector<std::vector<std::unique_ptr<composites::dft::DftEventVisitor>>> extra_visitors

Per-file extra visitors created by dft_visitor_factory during parsing. Index corresponds to the file index in the original file_paths vector. Empty vectors for files that failed or had no factory.

struct IndexBuildConfig

Public Functions

IndexBuildConfig &with_index_dir(const std::string &dir)
IndexBuildConfig &with_checkpoint_size(std::size_t size)
IndexBuildConfig &with_force_rebuild(bool force)
IndexBuildConfig &with_manifest(bool enable = true)
IndexBuildConfig &with_bloom_config(const composites::dft::indexing::ChunkIndexerConfig &config)
IndexBuildConfig &with_bloom_dimensions(std::vector<std::string> dims)

Public Members

std::string file_path
std::string index_dir
std::size_t checkpoint_size = 32 * 1024 * 1024
bool force_rebuild = false
bool build_manifest = false
composites::dft::indexing::ChunkIndexerConfig bloom_config
std::vector<std::string> bloom_dimensions
std::vector<std::reference_wrapper<composites::dft::DftEventVisitor>> extra_dft_visitors

Public Static Functions

static IndexBuildConfig for_file(const std::string &path)
struct IndexBuildResult

Public Members

std::string file_path
std::string index_path
bool success = false
bool was_skipped = false
bool index_created = false
std::size_t events_processed = 0
std::size_t chunks_processed = 0
std::size_t total_lines = 0
std::string error_message
class IndexBuilderUtility : public dftracer::utils::utilities::Utility<IndexBuildConfig, IndexBuildResult, tags::NeedsContext>

Public Functions

coro::CoroTask<IndexBuildResult> process(const IndexBuildConfig &config) override
class IndexDatabase

Public Types

enum class HashType : std::uint8_t

Values:

enumerator FILE
enumerator HOST
enumerator STRING
enumerator PROC

Public Functions

explicit IndexDatabase(const std::string &index_path, dftracer::utils::rocksdb::RocksDatabase::OpenMode open_mode = dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadWrite)
IndexDatabase(const IndexDatabase&) = delete
IndexDatabase &operator=(const IndexDatabase&) = delete
IndexDatabase(IndexDatabase&&) noexcept = default
IndexDatabase &operator=(IndexDatabase&&) noexcept = default
~IndexDatabase() = default
std::unique_ptr<IndexDatabaseWriterContext> begin_write()
void bulk_ingest(const SstArtifactRegistry &registry, const std::unordered_set<std::string> &skip_cfs = {})

Ingest SST files produced by IndexDatabaseSstWriterContext instances. File-id ranges across input SSTs must be disjoint for the METADATA, CHECKPOINTS, and MANIFEST column families (step-1 scope). No-op on an empty registry. Does NOT refresh root summaries; call rebuild_root_summaries() afterward once all ingest phases are done.

skip_cfs optionally holds CF names (e.g. cf::AGGREGATION) whose SSTs must be left outside the unified DB. Distributed builds use this to keep per-worker AGGREGATION / SYSTEM_METRICS SSTs addressable by manifest for parallel reads at analyze time.

void rebuild_root_summaries()

Recompute ROOT_SCALAR_STATS, ROOT_{CAT,NAME,PID_TID}_COUNTS from the current per-file CFs. Call after bulk_ingest completes, or whenever root-level summaries need to be regenerated from scratch.

void write_agg_global_config(std::uint64_t time_interval_us, std::uint32_t config_hash = 0)

Write the aggregation global-config key (0xFFFE) into the AGGREGATION CF. Required for iter_arrow_dfanalyzer_all to recognise the index as aggregator-populated. Distributed builds call this after bulk_ingest(skip_cfs={aggregation, system_metrics}) so the unified DB has a config marker even though the AGG SSTs live in the manifest. consolidate_index invokes it too before the deferred AGG ingest.

void write_agg_file_markers(const std::vector<int> &file_ids)

Write per-file aggregation completion markers (0xFFFF + file_id BE) into the AGGREGATION CF. The index resolver treats these as “this

file has aggregated data”; without them,

ensure_indexed() concludes the aggregation tier is incomplete and re-runs the build. Distributed builds must call this after bulk_ingest, since per-worker SSTs carry data but not markers (markers are written via direct db->put, not via the SST sink).

void write_aggregation_tracker(const std::vector<std::string> &blobs)

Merge per-worker AssociationTracker blobs and write the result to the AGGREGATION CF under the __tracker__ key.

int reserve_file_id_range(std::size_t count)

Atomically reserve count contiguous file_ids, returning the first id in the range [first, first + count). Intended for the distributed indexer: coordinator hands each worker its own disjoint range up front so workers need no cross-worker coordination.

std::vector<int> register_files(const std::vector<std::string> &file_paths, bool build_manifest)

Register a list of trace files in the DEFAULT-CF file registry and return the assigned file_ids (parallel to file_paths). Idempotent: files already registered with a matching hash keep their existing id. Used by the distributed indexer’s coordinator to pre-register every file before dispatching work to SST-backed workers, so workers never need to touch the DEFAULT column family themselves.

inline std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db() const
void init_schema()
IndexFileEntryCapability get_file_capabilities(int file_id) const
bool has_bloom_data(int file_id) const
bool has_manifest_data(int file_id) const
int get_file_info_id(std::string_view path) const
std::optional<std::uint64_t> get_file_hash(std::string_view path) const
std::unordered_map<std::string, int> query_all_file_info_ids() const
std::unordered_map<std::string, FileRegistryEntry> query_all_file_registry() const
std::unordered_set<int> query_files_with_file_scalar_stats() const
std::unordered_set<int> query_files_with_bloom_data() const
int find_file(std::string_view file_path) const
std::uint64_t get_checkpoint_size(int file_id) const
std::uint64_t get_num_lines(int file_id) const
std::uint64_t get_max_bytes(int file_id) const
std::uint64_t get_total_events(int file_id) const
std::vector<ChunkBloomResult> query_chunk_bloom_filters(int file_id, std::string_view dimension) const
std::unordered_map<std::string, std::vector<ChunkBloomResult>> query_chunk_bloom_filters_batch(int file_id, const std::vector<std::string> &dimensions) const
std::optional<FileBloomResult> query_file_bloom_filter(int file_id, std::string_view dimension) const
std::unordered_map<std::string, FileBloomResult> query_file_bloom_filters_batch(int file_id, const std::vector<std::string> &dimensions) const
std::vector<std::string> query_index_dimensions(int file_id) const
bool has_index_dimension(int file_id, std::string_view dimension) const
std::vector<ChunkStatisticsResult> query_chunk_statistics(int file_id) const
std::unordered_map<int, std::vector<ChunkStatisticsResult>> query_chunk_statistics_batch(const std::vector<int> &file_ids) const
std::unordered_map<int, MergedStatisticsResult> query_merged_statistics_batch(const std::vector<int> &file_ids) const
std::unordered_map<int, MergedStatisticsResult> query_file_scalar_stats_batch(const std::vector<int> &file_ids) const
std::unordered_map<int, FileMetadataResult> query_file_metadata_batch(const std::vector<int> &file_ids) const
std::unordered_map<int, StringViewMap<std::uint64_t>> query_file_category_counts_batch(const std::vector<int> &file_ids) const
std::unordered_map<int, StringViewMap<std::uint64_t>> query_file_pid_tid_counts_batch(const std::vector<int> &file_ids) const
std::unordered_map<int, NameSummaryResult> query_file_name_summaries_batch(const std::vector<int> &file_ids) const
std::optional<RootStatisticsResult> query_root_scalar_stats() const
StringViewMap<std::uint64_t> query_root_category_counts() const
StringViewMap<std::uint64_t> query_root_pid_tid_counts() const
StringViewMap<std::uint64_t> query_root_name_counts() const
void merge_file_category_counts_batch_into(const std::vector<int> &file_ids, std::unordered_map<int, ChunkStatistics*> &targets) const
void merge_file_pid_tid_counts_batch_into(const std::vector<int> &file_ids, std::unordered_map<int, ChunkStatistics*> &targets) const
void merge_file_name_counts_batch_into(const std::vector<int> &file_ids, std::unordered_map<int, ChunkStatistics*> &targets) const
void merge_root_category_counts_into(ChunkStatistics &target) const
void merge_root_pid_tid_counts_into(ChunkStatistics &target) const
void merge_root_name_counts_into(ChunkStatistics &target) const
std::vector<int> query_name_file_postings(std::string_view name) const
std::vector<std::uint64_t> query_name_chunk_postings(std::string_view name, int file_id) const
bool has_file_scalar_stats(int file_id) const
bool find_checkpoint(int file_id, std::size_t target_offset, IndexerCheckpoint &checkpoint) const
std::vector<IndexerCheckpoint> query_checkpoints(int file_id) const
std::vector<IndexerCheckpoint> query_checkpoints_for_line_range(int file_id, std::uint64_t start_line, std::uint64_t end_line) const
std::optional<TarArchiveMetadata> query_tar_archive_metadata(int file_id) const
std::vector<TarFileRecord> query_tar_files(int file_id) const
bool find_tar_file(int file_id, std::string_view file_name, TarFileRecord &record) const
std::vector<TarFileRecord> query_tar_files_in_range(int file_id, std::uint64_t start_offset, std::uint64_t end_offset) const
TimeBounds query_time_bounds(int file_id) const
std::vector<ChunkDimensionStatsResult> query_chunk_dimension_stats(int file_id) const
std::unordered_map<int, std::vector<ChunkDimensionStatsResult>> query_chunk_dimension_stats_batch(const std::vector<int> &file_ids) const
std::vector<ChunkDimensionStatsResult> query_chunk_dimension_stats_for_dimension(int file_id, std::string_view dimension) const
std::optional<std::uint64_t> query_name_id(std::string_view name) const
std::optional<std::string> query_name_by_id(std::uint64_t name_id) const
std::vector<EventRangeResult> query_event_ranges(int file_id) const
std::vector<EventRangeResult> query_event_ranges_for_checkpoint(int file_id, std::uint64_t checkpoint_idx) const
std::vector<MetadataLinesResult> query_metadata_lines(int file_id) const
std::vector<MetadataLinesResult> query_metadata_lines_for_checkpoint(int file_id, std::uint64_t checkpoint_idx) const
std::unordered_set<std::uint64_t> query_file_pids(int file_id) const

Query the set of PIDs observed in a specific file.

std::unordered_map<int, std::unordered_set<std::uint64_t>> query_all_file_pids() const

Query the PIDs for all files at once. Returns {file_id -> set of PIDs}.

std::unordered_map<std::string, std::string> query_hash_table(HashType type) const

Query all entries of a given hash type. Returns map of {hash_value -> resolved_name}.

std::optional<std::string> resolve_hash(HashType type, std::string_view hash) const

Resolve a single hash to its name. Returns nullopt if hash is not found.

std::unordered_map<HashType, std::unordered_map<std::string, std::string>> query_all_hash_tables() const

Query all hash tables at once. Returns {type -> {hash -> name}}.

std::optional<std::string> resolve_name_to_hash(HashType type, std::string_view name) const

Resolve a name to its hash (reverse lookup for query DSL). Returns nullopt if name is not found.

class IndexDatabaseSstWriterContext : public dftracer::utils::utilities::indexer::IndexBatchSink

Per-batch SST emitter that implements IndexBatchSink by buffering (key, value) pairs in memory per column family and flushing them to sorted SST files on commit().

Usage is identical to IndexDatabaseWriterContext: construct one per batch, call the insert_* methods, then commit(). The returned Artifacts hold the paths of the SST files produced, which a coordinator later ingests via IndexDatabase::bulk_ingest().

Process-safe: holds no RocksDB handle. Many contexts run concurrently across threads or processes, provided each is given a disjoint file_id range so SST key prefixes do not overlap.

Public Functions

IndexDatabaseSstWriterContext(std::string staging_dir, std::string batch_id)

Build SSTs into a unique subdirectory under staging_dir. batch_id must be unique across concurrent writers pointing at the same staging root so paths do not collide.

IndexDatabaseSstWriterContext(const IndexDatabaseSstWriterContext&) = delete
IndexDatabaseSstWriterContext &operator=(const IndexDatabaseSstWriterContext&) = delete
IndexDatabaseSstWriterContext(IndexDatabaseSstWriterContext&&) noexcept
IndexDatabaseSstWriterContext &operator=(IndexDatabaseSstWriterContext&&) noexcept
~IndexDatabaseSstWriterContext() override
virtual void insert_file_metadata(int file_id, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size) override
virtual void insert_checkpoint(int file_id, const IndexerCheckpoint &checkpoint) override
virtual void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) override
virtual void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) override
virtual void insert_file_pids(int file_id, const std::unordered_set<std::uint64_t> &pids) override
virtual void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override
virtual void insert_file_bloom_filter(int file_id, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override
virtual void insert_chunk_statistics(int file_id, std::uint64_t checkpoint_idx, const ChunkStatistics &stats) override
virtual void insert_file_scalar_stats(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks) override
virtual void insert_file_category_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override
virtual void insert_file_pid_tid_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override
virtual void insert_file_name_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override
virtual void insert_index_dimension(int file_id, std::string_view dimension) override
virtual void insert_chunk_dimension_stats(int file_id, std::uint64_t checkpoint_idx, const ChunkDimensionStats &stats, std::size_t value_counts_cap = 4096) override
virtual void insert_name_dictionary_entry(std::uint64_t name_id, std::string_view name) override
virtual void insert_name_file_posting(std::uint64_t name_id, int file_id) override
virtual void insert_name_chunk_posting(std::uint64_t name_id, int file_id, std::uint64_t checkpoint_idx) override
virtual void insert_hash_table_entry(std::uint8_t type, std::string_view hash, std::string_view name) override
virtual void insert_aggregation_merge(std::string_view key, std::string_view operand) override
virtual void insert_aggregation_put(std::string_view key, std::string_view value) override
virtual void insert_system_metrics_merge(std::string_view key, std::string_view operand) override
Artifacts commit()

Sort buffers, emit one SST per non-empty column family, return the resulting paths. Calling twice or after a move is a no-op.

void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) = 0
inline void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, const std::vector<std::uint32_t> &line_numbers)
void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) = 0
inline void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, const std::vector<std::uint32_t> &line_numbers)
struct Artifacts

Public Functions

inline bool empty() const noexcept
Artifacts move_to(std::string_view dest_dir) &&

Move every populated SST file to dest_dir (created if missing) and return a new Artifacts whose paths point at the new location. Uses fs::rename when src and dst resolve to the same filesystem (O(1), atomic) and falls back to copy + unlink across filesystems. Intended for the node-local -> shared FS handoff in the distributed indexer. Rvalue-qualified: the original Artifacts is left empty.

Public Members

std::optional<std::string> metadata_sst
std::optional<std::string> checkpoints_sst
std::optional<std::string> manifest_sst
std::optional<std::string> chunk_bloom_sst
std::optional<std::string> file_bloom_sst
std::optional<std::string> chunk_stats_sst
std::optional<std::string> chunk_dim_stats_sst
std::optional<std::string> dimensions_sst
std::optional<std::string> file_scalar_stats_sst
std::optional<std::string> file_cat_counts_sst
std::optional<std::string> file_pid_tid_counts_sst
std::optional<std::string> file_name_counts_sst
std::optional<std::string> name_dictionary_sst
std::optional<std::string> name_file_postings_sst
std::optional<std::string> name_chunk_postings_sst
std::optional<std::string> hash_tables_sst
std::optional<std::string> aggregation_sst
std::optional<std::string> system_metrics_sst
struct MergeableKeyValue

Aggregation / system_metrics buffers hold mixed Put+Merge entries in one CF. is_merge distinguishes them at emit time so the SST records the right operation kind (rocksdb supports mixed-op SSTs).

Public Members

std::string key
std::string value
bool is_merge = true
class IndexDatabaseWriterContext : public dftracer::utils::utilities::indexer::IndexBatchSink

Public Types

using ChunkDimensionStats = composites::dft::indexing::ChunkDimensionStats
using ChunkStatistics = composites::dft::indexing::ChunkStatistics
using IndexerCheckpoint = internal::IndexerCheckpoint

Public Functions

IndexDatabaseWriterContext(IndexDatabaseWriterContext&&) noexcept
IndexDatabaseWriterContext &operator=(IndexDatabaseWriterContext&&) noexcept
IndexDatabaseWriterContext(const IndexDatabaseWriterContext&) = delete
IndexDatabaseWriterContext &operator=(const IndexDatabaseWriterContext&) = delete
~IndexDatabaseWriterContext() override
void commit()
bool has_file_scalar_stats(int file_id) const
void init_schema()
int get_or_create_file_info(std::string_view path, std::uint64_t file_hash, IndexFileEntryCapability caps = IndexFileEntryCapability::NONE)
void set_file_capabilities(int file_id, IndexFileEntryCapability caps)
void set_file_capabilities_by_path(std::string_view logical_path, IndexFileEntryCapability caps)
void add_file_capability(int file_id, IndexFileEntryCapability cap)
virtual void insert_file_metadata(int file_id, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size) override
virtual void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override
void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, const void *blob_data, int blob_size, std::uint64_t num_entries)
virtual void insert_file_bloom_filter(int file_id, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override
void insert_file_bloom_filter(int file_id, std::string_view dimension, const void *blob_data, int blob_size, std::uint64_t num_entries)
virtual void insert_chunk_statistics(int file_id, std::uint64_t checkpoint_idx, const ChunkStatistics &stats) override
virtual void insert_file_scalar_stats(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks) override
virtual void insert_file_category_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override
virtual void insert_file_pid_tid_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override
virtual void insert_file_name_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override
std::uint64_t get_or_create_name_id(std::string_view name)
virtual void insert_name_dictionary_entry(std::uint64_t name_id, std::string_view name) override
virtual void insert_name_file_posting(std::uint64_t name_id, int file_id) override
virtual void insert_name_chunk_posting(std::uint64_t name_id, int file_id, std::uint64_t checkpoint_idx) override
void refresh_root_summaries_after_file_write(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks, bool had_existing_file_summary, std::uint64_t file_lines = 0, std::uint64_t file_uncompressed_bytes = 0)
void rebuild_root_summaries()
virtual void insert_checkpoint(int file_id, const IndexerCheckpoint &checkpoint) override
virtual void insert_index_dimension(int file_id, std::string_view dimension) override
virtual void insert_hash_table_entry(std::uint8_t type, std::string_view hash, std::string_view name) override

Insert a hash table entry with bidirectional storage. Forward: [type][hash] -> name (for output resolution) Reverse: [type+4][name] -> hash (for query DSL) Type: 0=FILE, 1=HOST, 2=STRING, 3=PROC

virtual void insert_aggregation_merge(std::string_view key, std::string_view operand) override
virtual void insert_aggregation_put(std::string_view key, std::string_view value) override
virtual void insert_system_metrics_merge(std::string_view key, std::string_view operand) override
virtual void insert_chunk_dimension_stats(int file_id, std::uint64_t checkpoint_idx, const ChunkDimensionStats &stats, std::size_t value_counts_cap = 4096) override
void insert_tar_archive_metadata(int file_id, std::string_view archive_name, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size, std::uint64_t total_files)
void insert_tar_file(int file_id, const TarFileRecord &record)
void delete_chunk_bloom_filters(int file_id, std::string_view dimension)
void delete_file_bloom_filter(int file_id, std::string_view dimension)
void delete_chunk_statistics(int file_id)
void delete_chunk_dimension_stats(int file_id)
void delete_file_contents(int file_id)
void delete_event_ranges(int file_id)
void delete_metadata_lines(int file_id)
virtual void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) override
virtual void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) override
virtual void insert_file_pids(int file_id, const std::unordered_set<std::uint64_t> &pids) override

Insert the set of PIDs observed in a file (for distributed aggregation)

void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) = 0
inline void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, const std::vector<std::uint32_t> &line_numbers)
void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) = 0
inline void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, const std::vector<std::uint32_t> &line_numbers)
struct TarFileRecord

Public Members

std::string file_name
std::uint64_t file_size = 0
std::uint64_t file_mtime = 0
char typeflag = '\0'
std::uint64_t data_offset = 0
std::uint64_t uncompressed_offset = 0
class IndexVisitor

Subclassed by dftracer::utils::utilities::composites::dft::DftEventDispatcher

Public Functions

virtual ~IndexVisitor() = default
virtual void begin(std::size_t num_checkpoints) = 0
virtual coro::CoroTask<void> on_checkpoint(std::size_t checkpoint_idx) = 0
inline virtual coro::CoroTask<void> on_chunk(const char *data, std::size_t len, std::size_t checkpoint_idx)
virtual void on_line(std::string_view line, SharedLineBuffer buffer, std::size_t checkpoint_idx) = 0

Called for each line. The line string_view points into buffer. Implementations that need the data to outlive this call should store the buffer shared_ptr (zero-copy) rather than copying line.

inline virtual coro::CoroTask<void> flush()
inline virtual bool wants_drain() const noexcept

Cheap hint that drain_pending() should be called to apply backpressure. Default false. Polled after each on_line call.

inline virtual coro::CoroTask<void> drain_pending()

Drain accumulated work via async ops (e.g. channel send). Suspends the calling coroutine when downstream is full &#8212; real backpressure without blocking an executor thread.

virtual void finalize(IndexDatabaseWriterContext &writer, int file_id) = 0
struct MergedStatisticsResult

Public Members

ChunkStatistics stats
std::uint64_t num_chunks = 0
struct NameSummaryResult

Public Members

StringViewMap<std::uint64_t> counts
std::uint64_t other_count = 0
std::uint64_t unique_count = 0
class ProvenanceDatabase

Manages provenance data in the shared .dftindex RocksDB store.

Shared index data that records the full reorganization provenance of an output file: which source files contributed, which checkpoints, and which line ranges map to which output lines.

Schema:

  • file_info: output file identity (path + hash)

  • provenance_info: key/value metadata (tool version, timestamp, etc.)

  • provenance_sources: source files that contributed to this output

  • provenance_group: named predicate groups used during reorganization

  • provenance_segments: per-checkpoint line range mappings

Public Types

using ProvenanceSource = composites::dft::indexing::queries::ProvenanceSource
using ProvenanceSegment = composites::dft::indexing::queries::ProvenanceSegment

Public Functions

explicit ProvenanceDatabase(const std::string &provenance_path, dftracer::utils::rocksdb::RocksDatabase::OpenMode open_mode = dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadWrite)
ProvenanceDatabase(const ProvenanceDatabase&) = delete
ProvenanceDatabase &operator=(const ProvenanceDatabase&) = delete
ProvenanceDatabase(ProvenanceDatabase&&) noexcept = default
ProvenanceDatabase &operator=(ProvenanceDatabase&&) noexcept = default
void init_schema()
int get_or_create_file_info(const std::string &path, std::uint64_t file_hash)
int get_file_info_id(const std::string &path) const
void begin_transaction()
void commit_transaction()
void rollback_transaction() noexcept
void insert_info(int file_info_id, std::string_view key, std::string_view value)
void insert_source(int file_info_id, int source_idx, std::string_view path, int num_checkpoints, std::string_view event_hash = "")
void insert_group(int file_info_id, std::string_view name, std::string_view predicate)
void insert_segment(int file_info_id, int source_idx, int source_checkpoint, int segment_seq, int output_line_start, int output_line_end, int event_count)
std::vector<ProvenanceSource> query_sources(int file_info_id) const
std::vector<ProvenanceSegment> query_segments(int file_info_id, int source_idx) const
std::vector<ProvenanceSegment> query_all_segments(int file_info_id) const
std::string query_info(int file_info_id, std::string_view key) const
std::string query_group_name(int file_info_id) const
std::string query_group_predicate(int file_info_id) const
struct RootStatisticsResult

Public Members

ChunkStatistics stats
std::uint64_t num_chunks = 0
std::uint64_t num_files = 0
std::uint64_t total_lines = 0
std::uint64_t total_uncompressed_bytes = 0
class SstArtifactRegistry

Thread-safe collector for SST artifacts produced by many concurrent IndexDatabaseSstWriterContext instances. The coordinator hands the populated registry to IndexDatabase::bulk_ingest().

Public Functions

inline void append(IndexDatabaseSstWriterContext::Artifacts artifacts)
inline const std::vector<std::string> &metadata() const
inline const std::vector<std::string> &checkpoints() const
inline const std::vector<std::string> &manifest() const
inline const std::vector<std::string> &chunk_bloom() const
inline const std::vector<std::string> &file_bloom() const
inline const std::vector<std::string> &chunk_stats() const
inline const std::vector<std::string> &chunk_dim_stats() const
inline const std::vector<std::string> &dimensions() const
inline const std::vector<std::string> &file_scalar_stats() const
inline const std::vector<std::string> &file_cat_counts() const
inline const std::vector<std::string> &file_pid_tid_counts() const
inline const std::vector<std::string> &file_name_counts() const
inline const std::vector<std::string> &name_dictionary() const
inline const std::vector<std::string> &name_file_postings() const
inline const std::vector<std::string> &name_chunk_postings() const
inline const std::vector<std::string> &hash_tables() const
inline const std::vector<std::string> &aggregation() const
inline const std::vector<std::string> &system_metrics() const
struct TarArchiveMetadata

Public Members

std::string archive_name
std::uint64_t checkpoint_size = 0
std::uint64_t total_lines = 0
std::uint64_t total_uc_size = 0
std::uint64_t total_files = 0
struct TarFileRecord

Public Members

std::string file_name
std::uint64_t file_size = 0
std::uint64_t file_mtime = 0
char typeflag = '\0'
std::uint64_t data_offset = 0
std::uint64_t uncompressed_offset = 0