Indexer¶
Namespace: dftracer::utils::utilities::indexer
For usage guide and examples, see Indexer Components.
-
struct FileMetadataResult¶
-
struct FileRegistryEntry¶
-
class IndexBatchBuilderUtility¶
Public Static Functions
-
class IndexBatchSink¶
Abstract sink that accepts index records for a batch of files.
Two backends implement this:
IndexDatabaseWriterContext: writes directly to a live RocksDB.IndexDatabaseSstWriterContext: writes to SST files for later bulk ingest (process-safe fan-out for distributed indexing).
Only the step-1 subset of methods is abstracted here (file metadata, checkpoints, manifest event ranges and metadata lines). Bloom/hash/stats writes remain on the concrete type until their CFs are ported to SST.
Subclassed by dftracer::utils::utilities::indexer::IndexDatabaseSstWriterContext, dftracer::utils::utilities::indexer::IndexDatabaseWriterContext
Public Types
-
using IndexerCheckpoint = internal::IndexerCheckpoint¶
-
using ChunkStatistics = composites::dft::indexing::ChunkStatistics¶
-
using ChunkDimensionStats = composites::dft::indexing::ChunkDimensionStats¶
Public Functions
-
virtual ~IndexBatchSink() = default¶
-
virtual void insert_file_metadata(int file_id, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size) = 0¶
-
virtual void insert_checkpoint(int file_id, const IndexerCheckpoint &checkpoint) = 0¶
-
virtual void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) = 0¶
-
virtual void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) = 0¶
-
virtual void insert_file_pids(int file_id, const std::unordered_set<std::uint64_t> &pids) = 0¶
-
virtual void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) = 0¶
-
virtual void insert_file_bloom_filter(int file_id, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) = 0¶
-
virtual void insert_chunk_statistics(int file_id, std::uint64_t checkpoint_idx, const ChunkStatistics &stats) = 0¶
-
virtual void insert_file_scalar_stats(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks) = 0¶
-
virtual void insert_file_category_counts(int file_id, const StringViewMap<std::uint64_t> &counts) = 0¶
-
virtual void insert_file_pid_tid_counts(int file_id, const StringViewMap<std::uint64_t> &counts) = 0¶
-
virtual void insert_file_name_counts(int file_id, const StringViewMap<std::uint64_t> &counts) = 0¶
-
virtual void insert_index_dimension(int file_id, std::string_view dimension) = 0¶
-
virtual void insert_chunk_dimension_stats(int file_id, std::uint64_t checkpoint_idx, const ChunkDimensionStats &stats, std::size_t value_counts_cap = 4096) = 0¶
-
virtual void insert_name_dictionary_entry(std::uint64_t name_id, std::string_view name) = 0¶
-
virtual void insert_name_file_posting(std::uint64_t name_id, int file_id) = 0¶
-
virtual void insert_name_chunk_posting(std::uint64_t name_id, int file_id, std::uint64_t checkpoint_idx) = 0¶
-
virtual void insert_hash_table_entry(std::uint8_t type, std::string_view hash, std::string_view name) = 0¶
-
virtual void insert_aggregation_merge(std::string_view key, std::string_view operand) = 0¶
-
virtual void insert_aggregation_put(std::string_view key, std::string_view value) = 0¶
-
virtual void insert_system_metrics_merge(std::string_view key, std::string_view operand) = 0¶
-
inline void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, const std::vector<std::uint32_t> &line_numbers)¶
-
inline void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, const std::vector<std::uint32_t> &line_numbers)¶
-
struct IndexBuildBatchConfig¶
Public Types
-
using DftVisitorFactory = std::function<std::vector<std::unique_ptr<composites::dft::DftEventVisitor>>(const std::string &file_path)>¶
Factory for creating per-file DftEventVisitors during the parse phase. Called once per file with the file path. Caller owns the returned visitors and can extract results after the batch completes.
-
using ExtraVisitorsDrainFn = std::function<void(std::vector<std::vector<std::unique_ptr<composites::dft::DftEventVisitor>>>)>¶
Optional drain callback invoked once per sub-batch with the extra visitors for that sub-batch’s files. Lets the caller consume and release visitor state immediately, keeping memory bounded by flush_every_files instead of accumulating across the whole pipeline.
-
using SinkFactory = std::function<std::unique_ptr<IndexBatchSink>()>¶
Optional batch-sink factory. If set, the write phase constructs a fresh sink per batch via this factory instead of opening the RocksDB-backed writer on
index_dir. Used by the distributed (SST) pipeline to route writes to per-worker SstWriterContext instances.sink_commitmust also be set and is responsible for finalising each sink (RocksDB path: call .commit(); SST path: flush + route Artifacts to a registry).
-
using SinkCommitFn = std::function<void(IndexBatchSink&)>¶
Public Members
-
std::vector<std::string> file_paths¶
-
std::string index_dir¶
-
std::size_t checkpoint_size = 32 * 1024 * 1024¶
-
std::size_t parallelism = 1¶
-
bool force_rebuild = false¶
-
bool build_manifest = false¶
-
composites::dft::indexing::ChunkIndexerConfig bloom_config¶
-
std::vector<std::string> bloom_dimensions¶
-
bool use_batch_write = true¶
-
bool rebuild_root_summaries = true¶
-
std::size_t flush_every_files = 0¶
If > 0, process files in sub-batches of this size, flushing parsed artifacts to the write phase between sub-batches. Bounds peak memory to ~flush_every_files worth of ParsedBloomJob state. 0 = no flush (all files parsed before any write).
-
DftVisitorFactory dft_visitor_factory¶
-
ExtraVisitorsDrainFn extra_visitors_drain¶
-
std::vector<int> preassigned_file_ids¶
If non-empty, parallel to
file_paths: use these file_ids instead of allocating viaget_or_create_file_info. Used by the distributed indexer where the coordinator pre-registers all files. When set, the write phase skips the DEFAULT-CF registry open/write step.
-
SinkFactory sink_factory¶
-
SinkCommitFn sink_commit¶
-
struct FileSlice¶
Optional per-file member slice (cross-rank file splitting). When non-empty, must be parallel to
file_paths. A null/empty entry means “process the whole file”; a populated entry restricts the build to[member_begin, member_end). Themembersvector must outlive the batch (typically stored in a shared member map).Public Members
-
const std::vector<internal::GzipMember> *members = nullptr¶
-
std::size_t member_begin = 0¶
-
std::size_t member_end = 0¶
-
std::uint64_t checkpoint_idx_base = 0¶
-
bool skip_file_scoped_writes = false¶
When true, this file’s file-scoped data (checkpoints, bloom/manifest/hashtable, file_metadata) is NOT persisted by the write phase. Aggregation/system-metrics SSTs produced by extra visitors are still collected. Set by the MPI driver for sliced ranks where
member_begin > 0to avoid cross-rank key collisions on file-scoped CFs.
-
const std::vector<internal::GzipMember> *members = nullptr¶
-
using DftVisitorFactory = std::function<std::vector<std::unique_ptr<composites::dft::DftEventVisitor>>(const std::string &file_path)>¶
-
struct IndexBuildBatchMetrics¶
-
struct IndexBuildBatchResult¶
Public Members
-
std::vector<IndexBuildResult> results¶
-
std::size_t indexed = 0¶
-
std::size_t skipped = 0¶
-
std::size_t failed = 0¶
-
std::uint64_t total_events = 0¶
-
IndexBuildBatchMetrics metrics¶
-
std::vector<std::vector<std::unique_ptr<composites::dft::DftEventVisitor>>> extra_visitors¶
Per-file extra visitors created by dft_visitor_factory during parsing. Index corresponds to the file index in the original file_paths vector. Empty vectors for files that failed or had no factory.
-
std::vector<IndexBuildResult> results¶
-
struct IndexBuildConfig¶
Public Functions
-
IndexBuildConfig &with_index_dir(const std::string &dir)¶
-
IndexBuildConfig &with_checkpoint_size(std::size_t size)¶
-
IndexBuildConfig &with_force_rebuild(bool force)¶
-
IndexBuildConfig &with_manifest(bool enable = true)¶
-
IndexBuildConfig &with_bloom_config(const composites::dft::indexing::ChunkIndexerConfig &config)¶
-
IndexBuildConfig &with_bloom_dimensions(std::vector<std::string> dims)¶
Public Members
-
std::string file_path¶
-
std::string index_dir¶
-
std::size_t checkpoint_size = 32 * 1024 * 1024¶
-
bool force_rebuild = false¶
-
bool build_manifest = false¶
-
composites::dft::indexing::ChunkIndexerConfig bloom_config¶
-
std::vector<std::string> bloom_dimensions¶
-
std::vector<std::reference_wrapper<composites::dft::DftEventVisitor>> extra_dft_visitors¶
Public Static Functions
-
static IndexBuildConfig for_file(const std::string &path)¶
-
IndexBuildConfig &with_index_dir(const std::string &dir)¶
-
struct IndexBuildResult¶
-
class IndexBuilderUtility : public dftracer::utils::utilities::Utility<IndexBuildConfig, IndexBuildResult, tags::NeedsContext>¶
Public Functions
-
coro::CoroTask<IndexBuildResult> process(const IndexBuildConfig &config) override¶
-
coro::CoroTask<IndexBuildResult> process(const IndexBuildConfig &config) override¶
-
class IndexDatabase¶
Public Types
Public Functions
-
explicit IndexDatabase(const std::string &index_path, dftracer::utils::rocksdb::RocksDatabase::OpenMode open_mode = dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadWrite)¶
-
IndexDatabase(const IndexDatabase&) = delete¶
-
IndexDatabase &operator=(const IndexDatabase&) = delete¶
-
IndexDatabase(IndexDatabase&&) noexcept = default¶
-
IndexDatabase &operator=(IndexDatabase&&) noexcept = default¶
-
~IndexDatabase() = default¶
-
std::unique_ptr<IndexDatabaseWriterContext> begin_write()¶
-
void bulk_ingest(const SstArtifactRegistry ®istry, const std::unordered_set<std::string> &skip_cfs = {})¶
Ingest SST files produced by
IndexDatabaseSstWriterContextinstances. File-id ranges across input SSTs must be disjoint for the METADATA, CHECKPOINTS, and MANIFEST column families (step-1 scope). No-op on an empty registry. Does NOT refresh root summaries; callrebuild_root_summaries()afterward once all ingest phases are done.skip_cfsoptionally holds CF names (e.g.cf::AGGREGATION) whose SSTs must be left outside the unified DB. Distributed builds use this to keep per-worker AGGREGATION / SYSTEM_METRICS SSTs addressable by manifest for parallel reads at analyze time.
-
void rebuild_root_summaries()¶
Recompute ROOT_SCALAR_STATS, ROOT_{CAT,NAME,PID_TID}_COUNTS from the current per-file CFs. Call after
bulk_ingestcompletes, or whenever root-level summaries need to be regenerated from scratch.
-
void write_agg_global_config(std::uint64_t time_interval_us, std::uint32_t config_hash = 0)¶
Write the aggregation global-config key (0xFFFE) into the AGGREGATION CF. Required for
iter_arrow_dfanalyzer_allto recognise the index as aggregator-populated. Distributed builds call this afterbulk_ingest(skip_cfs={aggregation, system_metrics})so the unified DB has a config marker even though the AGG SSTs live in the manifest.consolidate_indexinvokes it too before the deferred AGG ingest.
-
void write_agg_file_markers(const std::vector<int> &file_ids)¶
Write per-file aggregation completion markers (0xFFFF + file_id BE) into the AGGREGATION CF. The index resolver treats these as “this
file has aggregated data”; without them,
ensure_indexed()concludes the aggregation tier is incomplete and re-runs the build. Distributed builds must call this afterbulk_ingest, since per-worker SSTs carry data but not markers (markers are written via direct db->put, not via the SST sink).
-
void write_aggregation_tracker(const std::vector<std::string> &blobs)¶
Merge per-worker AssociationTracker blobs and write the result to the AGGREGATION CF under the
__tracker__key.
-
int reserve_file_id_range(std::size_t count)¶
Atomically reserve
countcontiguous file_ids, returning the first id in the range[first, first + count). Intended for the distributed indexer: coordinator hands each worker its own disjoint range up front so workers need no cross-worker coordination.
-
std::vector<int> register_files(const std::vector<std::string> &file_paths, bool build_manifest)¶
Register a list of trace files in the DEFAULT-CF file registry and return the assigned file_ids (parallel to
file_paths). Idempotent: files already registered with a matching hash keep their existing id. Used by the distributed indexer’s coordinator to pre-register every file before dispatching work to SST-backed workers, so workers never need to touch the DEFAULT column family themselves.
-
inline std::shared_ptr<dftracer::utils::rocksdb::RocksDatabase> db() const¶
-
void init_schema()¶
-
IndexFileEntryCapability get_file_capabilities(int file_id) const¶
-
bool has_bloom_data(int file_id) const¶
-
bool has_manifest_data(int file_id) const¶
-
int get_file_info_id(std::string_view path) const¶
-
std::optional<std::uint64_t> get_file_hash(std::string_view path) const¶
-
std::unordered_map<std::string, int> query_all_file_info_ids() const¶
-
std::unordered_map<std::string, FileRegistryEntry> query_all_file_registry() const¶
-
std::unordered_set<int> query_files_with_file_scalar_stats() const¶
-
std::unordered_set<int> query_files_with_bloom_data() const¶
-
int find_file(std::string_view file_path) const¶
-
std::uint64_t get_checkpoint_size(int file_id) const¶
-
std::uint64_t get_num_lines(int file_id) const¶
-
std::uint64_t get_max_bytes(int file_id) const¶
-
std::uint64_t get_total_events(int file_id) const¶
-
std::vector<ChunkBloomResult> query_chunk_bloom_filters(int file_id, std::string_view dimension) const¶
-
std::unordered_map<std::string, std::vector<ChunkBloomResult>> query_chunk_bloom_filters_batch(int file_id, const std::vector<std::string> &dimensions) const¶
-
std::optional<FileBloomResult> query_file_bloom_filter(int file_id, std::string_view dimension) const¶
-
std::unordered_map<std::string, FileBloomResult> query_file_bloom_filters_batch(int file_id, const std::vector<std::string> &dimensions) const¶
-
std::vector<std::string> query_index_dimensions(int file_id) const¶
-
bool has_index_dimension(int file_id, std::string_view dimension) const¶
-
std::vector<ChunkStatisticsResult> query_chunk_statistics(int file_id) const¶
-
std::unordered_map<int, std::vector<ChunkStatisticsResult>> query_chunk_statistics_batch(const std::vector<int> &file_ids) const¶
-
std::unordered_map<int, MergedStatisticsResult> query_merged_statistics_batch(const std::vector<int> &file_ids) const¶
-
std::unordered_map<int, MergedStatisticsResult> query_file_scalar_stats_batch(const std::vector<int> &file_ids) const¶
-
std::unordered_map<int, FileMetadataResult> query_file_metadata_batch(const std::vector<int> &file_ids) const¶
-
std::unordered_map<int, StringViewMap<std::uint64_t>> query_file_category_counts_batch(const std::vector<int> &file_ids) const¶
-
std::unordered_map<int, StringViewMap<std::uint64_t>> query_file_pid_tid_counts_batch(const std::vector<int> &file_ids) const¶
-
std::unordered_map<int, NameSummaryResult> query_file_name_summaries_batch(const std::vector<int> &file_ids) const¶
-
std::optional<RootStatisticsResult> query_root_scalar_stats() const¶
-
StringViewMap<std::uint64_t> query_root_category_counts() const¶
-
StringViewMap<std::uint64_t> query_root_pid_tid_counts() const¶
-
StringViewMap<std::uint64_t> query_root_name_counts() const¶
-
void merge_file_category_counts_batch_into(const std::vector<int> &file_ids, std::unordered_map<int, ChunkStatistics*> &targets) const¶
-
void merge_file_pid_tid_counts_batch_into(const std::vector<int> &file_ids, std::unordered_map<int, ChunkStatistics*> &targets) const¶
-
void merge_file_name_counts_batch_into(const std::vector<int> &file_ids, std::unordered_map<int, ChunkStatistics*> &targets) const¶
-
void merge_root_category_counts_into(ChunkStatistics &target) const¶
-
void merge_root_pid_tid_counts_into(ChunkStatistics &target) const¶
-
void merge_root_name_counts_into(ChunkStatistics &target) const¶
-
std::vector<int> query_name_file_postings(std::string_view name) const¶
-
std::vector<std::uint64_t> query_name_chunk_postings(std::string_view name, int file_id) const¶
-
bool has_file_scalar_stats(int file_id) const¶
-
bool find_checkpoint(int file_id, std::size_t target_offset, IndexerCheckpoint &checkpoint) const¶
-
std::vector<IndexerCheckpoint> query_checkpoints(int file_id) const¶
-
std::vector<IndexerCheckpoint> query_checkpoints_for_line_range(int file_id, std::uint64_t start_line, std::uint64_t end_line) const¶
-
std::optional<TarArchiveMetadata> query_tar_archive_metadata(int file_id) const¶
-
std::vector<TarFileRecord> query_tar_files(int file_id) const¶
-
bool find_tar_file(int file_id, std::string_view file_name, TarFileRecord &record) const¶
-
std::vector<TarFileRecord> query_tar_files_in_range(int file_id, std::uint64_t start_offset, std::uint64_t end_offset) const¶
-
TimeBounds query_time_bounds(int file_id) const¶
-
std::vector<ChunkDimensionStatsResult> query_chunk_dimension_stats(int file_id) const¶
-
std::unordered_map<int, std::vector<ChunkDimensionStatsResult>> query_chunk_dimension_stats_batch(const std::vector<int> &file_ids) const¶
-
std::vector<ChunkDimensionStatsResult> query_chunk_dimension_stats_for_dimension(int file_id, std::string_view dimension) const¶
-
std::optional<std::uint64_t> query_name_id(std::string_view name) const¶
-
std::optional<std::string> query_name_by_id(std::uint64_t name_id) const¶
-
std::vector<EventRangeResult> query_event_ranges(int file_id) const¶
-
std::vector<EventRangeResult> query_event_ranges_for_checkpoint(int file_id, std::uint64_t checkpoint_idx) const¶
-
std::vector<MetadataLinesResult> query_metadata_lines(int file_id) const¶
-
std::vector<MetadataLinesResult> query_metadata_lines_for_checkpoint(int file_id, std::uint64_t checkpoint_idx) const¶
-
std::unordered_set<std::uint64_t> query_file_pids(int file_id) const¶
Query the set of PIDs observed in a specific file.
-
std::unordered_map<int, std::unordered_set<std::uint64_t>> query_all_file_pids() const¶
Query the PIDs for all files at once. Returns {file_id -> set of PIDs}.
-
std::unordered_map<std::string, std::string> query_hash_table(HashType type) const¶
Query all entries of a given hash type. Returns map of {hash_value -> resolved_name}.
-
std::optional<std::string> resolve_hash(HashType type, std::string_view hash) const¶
Resolve a single hash to its name. Returns nullopt if hash is not found.
-
explicit IndexDatabase(const std::string &index_path, dftracer::utils::rocksdb::RocksDatabase::OpenMode open_mode = dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadWrite)¶
-
class IndexDatabaseSstWriterContext : public dftracer::utils::utilities::indexer::IndexBatchSink¶
Per-batch SST emitter that implements
IndexBatchSinkby buffering (key, value) pairs in memory per column family and flushing them to sorted SST files oncommit().Usage is identical to
IndexDatabaseWriterContext: construct one per batch, call theinsert_*methods, thencommit(). The returnedArtifactshold the paths of the SST files produced, which a coordinator later ingests viaIndexDatabase::bulk_ingest().Process-safe: holds no RocksDB handle. Many contexts run concurrently across threads or processes, provided each is given a disjoint file_id range so SST key prefixes do not overlap.
Public Functions
-
IndexDatabaseSstWriterContext(std::string staging_dir, std::string batch_id)¶
Build SSTs into a unique subdirectory under
staging_dir.batch_idmust be unique across concurrent writers pointing at the same staging root so paths do not collide.
-
IndexDatabaseSstWriterContext(const IndexDatabaseSstWriterContext&) = delete¶
-
IndexDatabaseSstWriterContext &operator=(const IndexDatabaseSstWriterContext&) = delete¶
-
IndexDatabaseSstWriterContext(IndexDatabaseSstWriterContext&&) noexcept¶
-
IndexDatabaseSstWriterContext &operator=(IndexDatabaseSstWriterContext&&) noexcept¶
-
~IndexDatabaseSstWriterContext() override¶
-
virtual void insert_file_metadata(int file_id, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size) override¶
-
virtual void insert_checkpoint(int file_id, const IndexerCheckpoint &checkpoint) override¶
-
virtual void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) override¶
-
virtual void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) override¶
-
virtual void insert_file_pids(int file_id, const std::unordered_set<std::uint64_t> &pids) override¶
-
virtual void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override¶
-
virtual void insert_file_bloom_filter(int file_id, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override¶
-
virtual void insert_chunk_statistics(int file_id, std::uint64_t checkpoint_idx, const ChunkStatistics &stats) override¶
-
virtual void insert_file_scalar_stats(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks) override¶
-
virtual void insert_file_category_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override¶
-
virtual void insert_file_pid_tid_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override¶
-
virtual void insert_file_name_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override¶
-
virtual void insert_index_dimension(int file_id, std::string_view dimension) override¶
-
virtual void insert_chunk_dimension_stats(int file_id, std::uint64_t checkpoint_idx, const ChunkDimensionStats &stats, std::size_t value_counts_cap = 4096) override¶
-
virtual void insert_name_dictionary_entry(std::uint64_t name_id, std::string_view name) override¶
-
virtual void insert_name_file_posting(std::uint64_t name_id, int file_id) override¶
-
virtual void insert_name_chunk_posting(std::uint64_t name_id, int file_id, std::uint64_t checkpoint_idx) override¶
-
virtual void insert_hash_table_entry(std::uint8_t type, std::string_view hash, std::string_view name) override¶
-
virtual void insert_aggregation_merge(std::string_view key, std::string_view operand) override¶
-
virtual void insert_aggregation_put(std::string_view key, std::string_view value) override¶
-
virtual void insert_system_metrics_merge(std::string_view key, std::string_view operand) override¶
-
Artifacts commit()¶
Sort buffers, emit one SST per non-empty column family, return the resulting paths. Calling twice or after a move is a no-op.
-
void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) = 0
-
inline void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, const std::vector<std::uint32_t> &line_numbers)¶
-
void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) = 0
-
inline void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, const std::vector<std::uint32_t> &line_numbers)¶
-
struct Artifacts¶
Public Functions
-
inline bool empty() const noexcept¶
-
Artifacts move_to(std::string_view dest_dir) &&¶
Move every populated SST file to
dest_dir(created if missing) and return a new Artifacts whose paths point at the new location. Usesfs::renamewhen src and dst resolve to the same filesystem (O(1), atomic) and falls back to copy + unlink across filesystems. Intended for the node-local -> shared FS handoff in the distributed indexer. Rvalue-qualified: the original Artifacts is left empty.
Public Members
-
std::optional<std::string> metadata_sst¶
-
std::optional<std::string> checkpoints_sst¶
-
std::optional<std::string> manifest_sst¶
-
std::optional<std::string> chunk_bloom_sst¶
-
std::optional<std::string> file_bloom_sst¶
-
std::optional<std::string> chunk_stats_sst¶
-
std::optional<std::string> chunk_dim_stats_sst¶
-
std::optional<std::string> dimensions_sst¶
-
std::optional<std::string> file_scalar_stats_sst¶
-
std::optional<std::string> file_cat_counts_sst¶
-
std::optional<std::string> file_pid_tid_counts_sst¶
-
std::optional<std::string> file_name_counts_sst¶
-
std::optional<std::string> name_dictionary_sst¶
-
std::optional<std::string> name_file_postings_sst¶
-
std::optional<std::string> name_chunk_postings_sst¶
-
std::optional<std::string> hash_tables_sst¶
-
std::optional<std::string> aggregation_sst¶
-
std::optional<std::string> system_metrics_sst¶
-
inline bool empty() const noexcept¶
-
struct MergeableKeyValue¶
Aggregation / system_metrics buffers hold mixed Put+Merge entries in one CF.
is_mergedistinguishes them at emit time so the SST records the right operation kind (rocksdb supports mixed-op SSTs).
-
IndexDatabaseSstWriterContext(std::string staging_dir, std::string batch_id)¶
-
class IndexDatabaseWriterContext : public dftracer::utils::utilities::indexer::IndexBatchSink¶
Public Types
-
using ChunkDimensionStats = composites::dft::indexing::ChunkDimensionStats¶
-
using ChunkStatistics = composites::dft::indexing::ChunkStatistics¶
-
using IndexerCheckpoint = internal::IndexerCheckpoint¶
Public Functions
-
IndexDatabaseWriterContext(IndexDatabaseWriterContext&&) noexcept¶
-
IndexDatabaseWriterContext &operator=(IndexDatabaseWriterContext&&) noexcept¶
-
IndexDatabaseWriterContext(const IndexDatabaseWriterContext&) = delete¶
-
IndexDatabaseWriterContext &operator=(const IndexDatabaseWriterContext&) = delete¶
-
~IndexDatabaseWriterContext() override¶
-
void commit()¶
-
bool has_file_scalar_stats(int file_id) const¶
-
void init_schema()¶
-
int get_or_create_file_info(std::string_view path, std::uint64_t file_hash, IndexFileEntryCapability caps = IndexFileEntryCapability::NONE)¶
-
void set_file_capabilities(int file_id, IndexFileEntryCapability caps)¶
-
void set_file_capabilities_by_path(std::string_view logical_path, IndexFileEntryCapability caps)¶
-
void add_file_capability(int file_id, IndexFileEntryCapability cap)¶
-
virtual void insert_file_metadata(int file_id, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size) override¶
-
virtual void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override¶
-
void insert_chunk_bloom_filter(int file_id, std::uint64_t checkpoint_idx, std::string_view dimension, const void *blob_data, int blob_size, std::uint64_t num_entries)¶
-
virtual void insert_file_bloom_filter(int file_id, std::string_view dimension, std::span<const unsigned char> blob_data, std::uint64_t num_entries) override¶
-
void insert_file_bloom_filter(int file_id, std::string_view dimension, const void *blob_data, int blob_size, std::uint64_t num_entries)¶
-
virtual void insert_chunk_statistics(int file_id, std::uint64_t checkpoint_idx, const ChunkStatistics &stats) override¶
-
virtual void insert_file_scalar_stats(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks) override¶
-
virtual void insert_file_category_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override¶
-
virtual void insert_file_pid_tid_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override¶
-
virtual void insert_file_name_counts(int file_id, const StringViewMap<std::uint64_t> &counts) override¶
-
std::uint64_t get_or_create_name_id(std::string_view name)¶
-
virtual void insert_name_dictionary_entry(std::uint64_t name_id, std::string_view name) override¶
-
virtual void insert_name_file_posting(std::uint64_t name_id, int file_id) override¶
-
virtual void insert_name_chunk_posting(std::uint64_t name_id, int file_id, std::uint64_t checkpoint_idx) override¶
-
void refresh_root_summaries_after_file_write(int file_id, const ChunkStatistics &stats, std::uint64_t num_chunks, bool had_existing_file_summary, std::uint64_t file_lines = 0, std::uint64_t file_uncompressed_bytes = 0)¶
-
void rebuild_root_summaries()¶
-
virtual void insert_checkpoint(int file_id, const IndexerCheckpoint &checkpoint) override¶
-
virtual void insert_index_dimension(int file_id, std::string_view dimension) override¶
-
virtual void insert_hash_table_entry(std::uint8_t type, std::string_view hash, std::string_view name) override¶
Insert a hash table entry with bidirectional storage. Forward: [type][hash] -> name (for output resolution) Reverse: [type+4][name] -> hash (for query DSL) Type: 0=FILE, 1=HOST, 2=STRING, 3=PROC
-
virtual void insert_aggregation_merge(std::string_view key, std::string_view operand) override¶
-
virtual void insert_aggregation_put(std::string_view key, std::string_view value) override¶
-
virtual void insert_system_metrics_merge(std::string_view key, std::string_view operand) override¶
-
virtual void insert_chunk_dimension_stats(int file_id, std::uint64_t checkpoint_idx, const ChunkDimensionStats &stats, std::size_t value_counts_cap = 4096) override¶
-
void insert_tar_archive_metadata(int file_id, std::string_view archive_name, std::uint64_t checkpoint_size, std::uint64_t total_lines, std::uint64_t total_uc_size, std::uint64_t total_files)¶
-
void insert_tar_file(int file_id, const TarFileRecord &record)¶
-
void delete_chunk_bloom_filters(int file_id, std::string_view dimension)¶
-
void delete_file_bloom_filter(int file_id, std::string_view dimension)¶
-
void delete_chunk_statistics(int file_id)¶
-
void delete_chunk_dimension_stats(int file_id)¶
-
void delete_file_contents(int file_id)¶
-
void delete_event_ranges(int file_id)¶
-
void delete_metadata_lines(int file_id)¶
-
virtual void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) override¶
-
virtual void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) override¶
-
virtual void insert_file_pids(int file_id, const std::unordered_set<std::uint64_t> &pids) override¶
Insert the set of PIDs observed in a file (for distributed aggregation)
-
void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, std::span<const std::uint32_t> line_numbers) = 0
-
inline void insert_event_range(int file_id, std::uint64_t checkpoint_idx, std::string_view cat, std::string_view name, const std::vector<std::uint32_t> &line_numbers)¶
-
void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, std::span<const std::uint32_t> line_numbers) = 0
-
inline void insert_metadata_lines(int file_id, std::uint64_t checkpoint_idx, std::string_view meta_type, const std::vector<std::uint32_t> &line_numbers)¶
-
struct TarFileRecord¶
-
using ChunkDimensionStats = composites::dft::indexing::ChunkDimensionStats¶
-
class IndexVisitor¶
Subclassed by dftracer::utils::utilities::composites::dft::DftEventDispatcher
Public Functions
-
virtual ~IndexVisitor() = default¶
-
virtual void begin(std::size_t num_checkpoints) = 0¶
-
inline virtual coro::CoroTask<void> on_chunk(const char *data, std::size_t len, std::size_t checkpoint_idx)¶
Called for each line. The line string_view points into buffer. Implementations that need the data to outlive this call should store the buffer shared_ptr (zero-copy) rather than copying line.
-
inline virtual bool wants_drain() const noexcept¶
Cheap hint that drain_pending() should be called to apply backpressure. Default false. Polled after each on_line call.
-
inline virtual coro::CoroTask<void> drain_pending()¶
Drain accumulated work via async ops (e.g. channel send). Suspends the calling coroutine when downstream is full — real backpressure without blocking an executor thread.
-
virtual void finalize(IndexDatabaseWriterContext &writer, int file_id) = 0¶
-
virtual ~IndexVisitor() = default¶
-
struct MergedStatisticsResult¶
-
struct NameSummaryResult¶
-
class ProvenanceDatabase¶
Manages provenance data in the shared
.dftindexRocksDB store.Shared index data that records the full reorganization provenance of an output file: which source files contributed, which checkpoints, and which line ranges map to which output lines.
Schema:
file_info: output file identity (path + hash)
provenance_info: key/value metadata (tool version, timestamp, etc.)
provenance_sources: source files that contributed to this output
provenance_group: named predicate groups used during reorganization
provenance_segments: per-checkpoint line range mappings
Public Types
-
using ProvenanceSource = composites::dft::indexing::queries::ProvenanceSource¶
-
using ProvenanceSegment = composites::dft::indexing::queries::ProvenanceSegment¶
Public Functions
-
explicit ProvenanceDatabase(const std::string &provenance_path, dftracer::utils::rocksdb::RocksDatabase::OpenMode open_mode = dftracer::utils::rocksdb::RocksDatabase::OpenMode::ReadWrite)¶
-
ProvenanceDatabase(const ProvenanceDatabase&) = delete¶
-
ProvenanceDatabase &operator=(const ProvenanceDatabase&) = delete¶
-
ProvenanceDatabase(ProvenanceDatabase&&) noexcept = default¶
-
ProvenanceDatabase &operator=(ProvenanceDatabase&&) noexcept = default¶
-
void init_schema()¶
-
int get_or_create_file_info(const std::string &path, std::uint64_t file_hash)¶
-
int get_file_info_id(const std::string &path) const¶
-
void begin_transaction()¶
-
void commit_transaction()¶
-
void rollback_transaction() noexcept¶
-
void insert_info(int file_info_id, std::string_view key, std::string_view value)¶
-
void insert_source(int file_info_id, int source_idx, std::string_view path, int num_checkpoints, std::string_view event_hash = "")¶
-
void insert_group(int file_info_id, std::string_view name, std::string_view predicate)¶
-
void insert_segment(int file_info_id, int source_idx, int source_checkpoint, int segment_seq, int output_line_start, int output_line_end, int event_count)¶
-
std::vector<ProvenanceSource> query_sources(int file_info_id) const¶
-
std::vector<ProvenanceSegment> query_segments(int file_info_id, int source_idx) const¶
-
std::vector<ProvenanceSegment> query_all_segments(int file_info_id) const¶
-
std::string query_info(int file_info_id, std::string_view key) const¶
-
std::string query_group_name(int file_info_id) const¶
-
std::string query_group_predicate(int file_info_id) const¶
-
struct RootStatisticsResult¶
-
class SstArtifactRegistry¶
Thread-safe collector for SST artifacts produced by many concurrent
IndexDatabaseSstWriterContextinstances. The coordinator hands the populated registry toIndexDatabase::bulk_ingest().Public Functions
-
inline void append(IndexDatabaseSstWriterContext::Artifacts artifacts)¶
-
inline const std::vector<std::string> &metadata() const¶
-
inline const std::vector<std::string> &checkpoints() const¶
-
inline const std::vector<std::string> &manifest() const¶
-
inline const std::vector<std::string> &chunk_bloom() const¶
-
inline const std::vector<std::string> &file_bloom() const¶
-
inline const std::vector<std::string> &chunk_stats() const¶
-
inline const std::vector<std::string> &chunk_dim_stats() const¶
-
inline const std::vector<std::string> &dimensions() const¶
-
inline const std::vector<std::string> &file_scalar_stats() const¶
-
inline const std::vector<std::string> &file_cat_counts() const¶
-
inline const std::vector<std::string> &file_pid_tid_counts() const¶
-
inline const std::vector<std::string> &file_name_counts() const¶
-
inline const std::vector<std::string> &name_dictionary() const¶
-
inline const std::vector<std::string> &name_file_postings() const¶
-
inline const std::vector<std::string> &name_chunk_postings() const¶
-
inline const std::vector<std::string> &hash_tables() const¶
-
inline const std::vector<std::string> &aggregation() const¶
-
inline const std::vector<std::string> &system_metrics() const¶
-
inline void append(IndexDatabaseSstWriterContext::Artifacts artifacts)¶
-
struct TarArchiveMetadata¶
-
struct TarFileRecord¶