DFTracer Composites (General)

Namespace: dftracer::utils::utilities::composites::dft

class ArgsMap

Public Functions

ArgsMap() = default
inline bool exists() const
inline explicit operator bool() const
inline void set_valid(bool v)
inline void insert(std::string_view key, ArgsValue value)
inline void clear()
inline ArgsValueProxy operator[](std::string_view key) const
inline ArgsValueProxy operator[](const char *key) const
inline ArgsValueProxy operator[](const std::string &key) const
inline ArgsValueProxy at(const char *key) const
inline ArgsValueProxy at(const std::string &key) const
inline ArgsValueProxy at(std::string_view key) const
template<typename Fn>
inline void for_each_member(Fn &&fn) const
inline const Map &raw() const
class ArgsValueProxy

Public Functions

inline explicit ArgsValueProxy(const ArgsValue *v = nullptr)
inline bool exists() const
inline explicit operator bool() const
inline bool is_null() const
inline bool is_string() const
inline bool is_uint() const
inline bool is_int() const
inline bool is_number() const
inline bool is_bool() const
inline bool is_object() const
inline bool is_array() const
template<typename T>
inline T get(const T &default_val = T{}) const
template<typename T>
inline std::optional<T> get_optional() const
class ChunkExtractorUtility : public dftracer::utils::utilities::Utility<ChunkExtractorUtilityInput, ChunkExtractorUtilityOutput, utilities::tags::Parallelizable>

Extracts and merges chunks from DFTracer files.

Usage:

ChunkExtractorUtility extractor;

auto input = ChunkExtractorUtilityInput::from_manifest(1, manifest)
                 .with_output_dir("/output")
                 .with_app_name("myapp")
                 .with_compression(true);

auto result = extractor.process(input);
if (result.success) {
    std::cout << "Extracted " << result.events << " events\n";
}

Public Functions

coro::CoroTask<ChunkExtractorUtilityOutput> process(const ChunkExtractorUtilityInput &input) override
struct ChunkExtractorUtilityInput

Input for DFTracer chunk extraction.

Accepts DFTracerChunkManifest with line tracking, but converts to byte-based fileio::ChunkManifest for extraction.

Public Functions

inline ChunkExtractorUtilityInput()
inline ChunkExtractorUtilityInput &with_output_dir(std::string dir)
inline ChunkExtractorUtilityInput &with_app_name(std::string name)
inline ChunkExtractorUtilityInput &with_compression(bool enabled)
inline ChunkExtractorUtilityInput &with_compute_hash(bool enabled)
inline fileio::ChunkManifest to_io_manifest() const
inline bool operator==(const ChunkExtractorUtilityInput &other) const

Public Members

int chunk_index
internal::DFTracerChunkManifest manifest
std::string output_dir
std::string app_name
bool compress = false
bool compute_hash = true

Public Static Functions

static inline ChunkExtractorUtilityInput from_manifest(int index, internal::DFTracerChunkManifest m)
struct ChunkExtractorUtilityOutput

Result of DFTracer chunk extraction.

Public Functions

inline ChunkExtractorUtilityOutput()
inline ChunkExtractorUtilityOutput(int index, std::string path, double mb, std::size_t event_count, bool succ)
inline bool operator==(const ChunkExtractorUtilityOutput &other) const
inline bool operator!=(const ChunkExtractorUtilityOutput &other) const

Public Members

int chunk_index
std::string output_path
double size_mb
std::size_t events
bool success
std::size_t event_hash = 0
class ChunkManifestMapperUtility : public dftracer::utils::utilities::Utility<ChunkManifestMapperUtilityInput, ChunkManifestMapperUtilityOutput>

Workflow for mapping DFTracer file metadata to chunk manifests with line tracking.

This workflow takes DFTracer-specific file metadata (line counts, sizes, event counts) and distributes files across chunks to achieve a target chunk size. It creates DFTracerChunkManifest objects with both byte offsets and line numbers.

Algorithm:

  • Greedily fills chunks up to target size

  • Splits large files across multiple chunks if needed

  • Tracks both byte offsets and line ranges for each chunk

  • Approximates byte offsets from line-based metadata

  • Assumes uniform byte distribution across lines

  • Line boundary alignment happens during extraction (LineBytesRange)

Usage:

DFTracerChunkManifestMapper mapper;

auto input = DFTracerChunkManifestMapperInput::from_metadata(file_metadata)
                 .with_target_size(100.0);  // 100 MB chunks

auto manifests = mapper.process(input);
// manifests[i] contains DFTracerChunkSpec objects with line tracking

Public Functions

coro::CoroTask<ChunkManifestMapperUtilityOutput> process(const ChunkManifestMapperUtilityInput &input) override
struct ChunkManifestMapperUtilityInput

Input for DFTracer chunk manifest mapping.

Public Functions

inline ChunkManifestMapperUtilityInput()
inline ChunkManifestMapperUtilityInput &with_target_size(double size_mb)

Public Members

std::vector<MetadataCollectorUtilityOutput> file_metadata
double target_chunk_size_mb

Public Static Functions

static inline ChunkManifestMapperUtilityInput from_metadata(std::vector<MetadataCollectorUtilityOutput> metadata)
struct DFTracerEvent

Public Functions

inline bool is_metadata() const
inline bool is_counter() const
inline bool is_profile() const
inline bool is_system() const
inline bool is_event() const
inline bool is_complete() const
inline bool has_id() const

Public Members

std::uint64_t id = 0
std::string_view name
std::string_view cat
std::string_view ph
std::uint64_t pid = 0
std::uint64_t tid = 0
std::uint64_t ts = 0
std::uint64_t dur = 0
ArgsMap args

Public Static Functions

static inline bool parse(const JsonValue &json, DFTracerEvent &out)
static inline bool parse_scalars(simdjson::dom::element root, DFTracerEvent &out, simdjson::dom::element &out_args, bool &out_has_args)
static inline bool parse_ondemand(common::json::JsonParser &parser, DFTracerEvent &out)
class DftEventDispatcher : public dftracer::utils::utilities::indexer::IndexVisitor

Public Types

using VisitorList = std::vector<std::reference_wrapper<DftEventVisitor>>

Public Functions

inline explicit DftEventDispatcher(VisitorList visitors, bool force_serial = false)
inline virtual void begin(std::size_t num_checkpoints) override
inline virtual coro::CoroTask<void> on_checkpoint(std::size_t checkpoint_idx) override
inline virtual coro::CoroTask<void> on_chunk(const char *data, std::size_t len, std::size_t checkpoint_idx) override
inline virtual coro::CoroTask<void> flush() override
inline virtual bool wants_drain() const noexcept override

Cheap hint that drain_pending() should be called to apply backpressure. Default false. Polled after each on_line call.

inline virtual coro::CoroTask<void> drain_pending() override

Drain accumulated work via async ops (e.g. channel send). Suspends the calling coroutine when downstream is full &#8212; real backpressure without blocking an executor thread.

inline virtual void on_line(std::string_view line, indexer::SharedLineBuffer buffer, std::size_t checkpoint_idx) override

Called for each line. The line string_view points into buffer. Implementations that need the data to outlive this call should store the buffer shared_ptr (zero-copy) rather than copying line.

inline virtual void finalize(indexer::IndexDatabaseWriterContext &writer, int file_id) override

Public Static Attributes

static constexpr std::size_t FLUSH_THRESHOLD = 4 * 1024 * 1024
class DftEventVisitor

Subclassed by dftracer::utils::utilities::composites::dft::aggregators::AggregationVisitor, dftracer::utils::utilities::composites::dft::reorganize::OrganizeVisitor, dftracer::utils::utilities::composites::dft::visitors::BloomVisitor, dftracer::utils::utilities::composites::dft::visitors::HashTableVisitor, dftracer::utils::utilities::composites::dft::visitors::ManifestVisitor

Public Functions

virtual ~DftEventVisitor() = default
virtual void begin(std::size_t num_checkpoints) = 0
virtual void on_checkpoint(std::size_t checkpoint_idx) = 0
virtual void on_event(const EventRecord &record) = 0
inline virtual bool wants_drain() const noexcept
inline virtual coro::CoroTask<void> drain_pending()
inline virtual coro::CoroTask<void> on_file_complete()
inline virtual std::unique_ptr<DftEventVisitor> create_parallel_slice() const
inline virtual void merge_parallel_slice(DftEventVisitor&)
inline virtual void set_line_offset(std::size_t)

In parallel-flush mode, slices receive events with slice-local line numbers (0..N-1). The dispatcher calls this on the slice before merge_parallel_slice with the cumulative successful-event count of prior slices, so the slice can renumber its stored line indices.

inline virtual std::size_t parallel_event_count() const

Successful events processed by this slice. Used by the dispatcher to propagate line offsets across slices in byte order.

inline virtual bool needs_args_map() const
struct EventCollectorFromMetadataCollectorUtilityInput

Input for event collection from DFTracer metadata.

Public Functions

inline EventCollectorFromMetadataCollectorUtilityInput &with_trim_commas(bool trim)

Public Members

std::vector<MetadataCollectorUtilityOutput> metadata
bool trim_commas{false}

Public Static Functions

static inline EventCollectorFromMetadataCollectorUtilityInput from_metadata(std::vector<MetadataCollectorUtilityOutput> meta)
class EventCollectorFromMetadataUtility : public dftracer::utils::utilities::Utility<EventCollectorFromMetadataCollectorUtilityInput, EventCollectorUtilityOutput>

Workflow for collecting event IDs from DFTracer metadata files.

Reads files specified in metadata and extracts EventId (id, pid, tid) from each valid JSON event.

Public Functions

coro::CoroTask<EventCollectorUtilityOutput> process(const EventCollectorFromMetadataCollectorUtilityInput &input) override
struct EventHashInput

Input for event hashing.

Public Members

std::vector<EventId> events

Public Static Functions

static inline EventHashInput from_events(std::vector<EventId> event_list)
class EventHasher : public dftracer::utils::utilities::Utility<EventHashInput, EventHashOutput>

Workflow for computing a hash from a collection of EventIds.

Uses HasherUtility to hash the id, pid, tid fields of each event. Events should be sorted before hashing for consistent results.

Public Functions

coro::CoroTask<EventHashOutput> process(const EventHashInput &input) override
struct EventId

Simple event identifier (id, pid, tid).

Public Functions

inline EventId()
inline EventId(std::int64_t i, std::int64_t p, std::int64_t t)
inline bool operator<(const EventId &other) const
inline bool operator==(const EventId &other) const
inline bool is_valid() const

Public Members

std::int64_t id
std::int64_t pid
std::int64_t tid
struct EventIdExtractionInput

Input for event ID extraction.

Public Members

std::string_view json_data

Public Static Functions

static inline EventIdExtractionInput from_json(std::string_view json)
class EventIdExtractor : public dftracer::utils::utilities::Utility<EventIdExtractionInput, EventIdExtractionOutput>

Utility for extracting EventId from JSON.

This is a composable utility that follows the Utility<Input, Output> pattern. It has no I/O dependencies and can be easily tested and composed.

Intent: “Parse JSON and extract (id, pid, tid)”

Public Functions

coro::CoroTask<EventIdExtractionOutput> process(const EventIdExtractionInput &input) override
struct EventRecord

Public Members

const DFTracerEvent &ev
const common::json::JsonValue &json
std::string_view line
indexer::SharedLineBuffer line_buffer
std::size_t checkpoint_idx
std::size_t line_number
simdjson::dom::element args_dom = {}
bool has_args = {false}
struct IncrementalEventHashInput

Input for incremental event hashing.

Public Members

std::vector<EventId> events

Public Static Functions

static inline IncrementalEventHashInput from_events(std::vector<EventId> evts)
class IncrementalEventHasher : public dftracer::utils::utilities::Utility<IncrementalEventHashInput, std::size_t>

Incremental event hasher with order-independent (additive) hashing.

Public Functions

IncrementalEventHasher() = default
inline void update(const EventId &event)
inline void update(const std::vector<EventId> &events)
inline coro::CoroTask<std::size_t> process(const IncrementalEventHashInput &input) override
inline std::size_t get_hash() const
inline void reset()
inline void merge(const IncrementalEventHasher &other)
class MetadataCollectorUtility : public dftracer::utils::utilities::Utility<MetadataCollectorUtilityInput, MetadataCollectorUtilityOutput, utilities::tags::Parallelizable>

Collects metadata (size, line count, events) from DFTracer trace files.

Supports both plain (.pfw) and compressed (.pfw.gz) files. For compressed files, builds/uses the root-local .dftindex store for efficient access.

Tagged with Parallelizable - safe for parallel batch processing.

Public Functions

MetadataCollectorUtility() = default
coro::CoroTask<MetadataCollectorUtilityOutput> process(const MetadataCollectorUtilityInput &input) override
struct MetadataCollectorUtilityInput

Input for DFTracer file metadata collection.

Public Functions

MetadataCollectorUtilityInput() = default
inline MetadataCollectorUtilityInput(std::string fpath, std::string ipath = "", std::size_t ckpt = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE, bool force = false, bool hash = false)
inline MetadataCollectorUtilityInput &with_index(std::string idx)
inline MetadataCollectorUtilityInput &with_checkpoint_size(std::size_t size)
inline MetadataCollectorUtilityInput &with_force_rebuild(bool force)
inline MetadataCollectorUtilityInput &with_compute_hash(bool hash)
inline bool operator==(const MetadataCollectorUtilityInput &other) const

Public Members

std::string file_path
std::string index_path
std::size_t checkpoint_size = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE
bool force_rebuild = false
bool compute_hash = false

Public Static Functions

static inline MetadataCollectorUtilityInput from_file(std::string path)
struct MetadataCollectorUtilityOutput

Output from DFTracer file metadata collection.

Public Functions

MetadataCollectorUtilityOutput() = default
inline bool operator==(const MetadataCollectorUtilityOutput &other) const

Public Members

std::string file_path
std::string index_path
double size_mb = 0
std::size_t start_line = 0
std::size_t end_line = 0
std::size_t valid_events = 0
double size_per_line = 0
bool success = false
bool has_index = false
bool index_valid = false
std::uint64_t compressed_size = 0
std::uint64_t uncompressed_size = 0
std::uint64_t num_lines = 0
std::uint64_t checkpoint_size = 0
std::size_t num_checkpoints = 0
ArchiveFormat format = ArchiveFormat::UNKNOWN
std::string error_message
std::size_t event_hash = 0