DFTracer Reorganization

Namespace: dftracer::utils::utilities::composites::dft::reorganize

struct ChunkMemberLayout

Public Members

std::string path
std::vector<fileio::parallel::ParallelWriter::MemberSpan> members
struct EventRouterConfig

Public Members

ExtractionPlan plan
std::string output_dir
std::string index_dir
std::size_t chunk_size_bytes = 256 * 1024 * 1024
std::size_t checkpoint_size = 0
std::size_t executor_threads = 4
bool compress = true
bool verify = false
struct EventRouterResult

Public Members

std::size_t total_events_written = 0
std::size_t total_bytes_written = 0
std::size_t chunks_created = 0
std::size_t source_files_processed = 0
std::vector<std::string> output_files
bool success = false
struct ExtractionPlan

Public Members

std::vector<PredicateGroup> groups
std::vector<SourceFileInfo> source_files
std::vector<ExtractionTask> tasks
std::size_t total_events = 0
struct ExtractionTask

Public Members

std::size_t source_file_idx
std::uint64_t checkpoint_idx
std::string target_group
std::vector<std::uint32_t> line_numbers
std::uint64_t start_byte
std::uint64_t end_byte
struct GroupWriterConfig

Public Members

std::string group_name
std::string group_query
std::string output_dir
std::size_t chunk_size_bytes = 256 * 1024 * 1024
bool compress = true
int compression_level = -1
std::shared_ptr<coro::Channel<std::shared_ptr<LineBatch>>> input_channel
const std::vector<SourceFileInfo> *source_files = nullptr
bool build_output_index = true
std::string index_dir
bool with_aggregation = false
double agg_time_interval_us = 5'000'000.0
std::vector<std::string> bloom_dimensions
indexing::ChunkIndexerConfig bloom_config
std::string staging_root
std::shared_ptr<moodycamel::ConcurrentQueue<indexer::IndexDatabaseSstWriterContext::Artifacts>> artifacts_queue
std::shared_ptr<std::atomic<std::size_t>> batch_counter
struct GroupWriterResult

Public Members

std::string group_name
std::size_t events_written = 0
std::size_t bytes_written = 0
std::size_t chunks_created = 0
std::vector<std::string> output_files
std::vector<ChunkMemberLayout> chunk_layouts

Per-chunk-file gzip-member layout captured directly from the writer. Lets downstream indexing skip the post-write gzip header re-scan.

bool indexed_inline = false
bool success = false
std::string error_message
struct LineBatch

Public Functions

inline void reserve(std::size_t n)
inline std::size_t size() const
inline bool empty() const
inline void clear()
inline std::string_view line_view(std::size_t i) const
inline void append_line(std::string_view line, std::size_t source_file_idx, std::size_t checkpoint_idx, std::size_t source_line_number)

Public Members

std::string bytes
std::vector<LineRecord> lines
struct LineRecord

Public Members

std::uint32_t offset
std::uint32_t length
std::size_t source_file_idx
std::size_t checkpoint_idx
std::size_t source_line_number
struct ManifestExtractorConfig

Public Members

std::string file_path
std::string index_path
std::size_t source_file_idx = 0
std::vector<PredicateGroup> groups
std::vector<std::shared_ptr<coro::Channel<std::shared_ptr<LineBatch>>>> group_channels
std::size_t batch_size = 1024
struct ManifestExtractorResult

Public Members

std::size_t events_extracted = 0
std::size_t events_unmatched = 0
bool success = false
std::string error_message
class OrganizeVisitor : public dftracer::utils::utilities::composites::dft::DftEventVisitor

Public Functions

explicit OrganizeVisitor(OrganizeVisitorConfig config)
virtual void begin(std::size_t num_checkpoints) override
virtual void on_checkpoint(std::size_t checkpoint_idx) override
virtual void on_event(const EventRecord &record) override
virtual bool wants_drain() const noexcept override
virtual coro::CoroTask<void> drain_pending() override
virtual coro::CoroTask<void> on_file_complete() override
virtual std::unique_ptr<DftEventVisitor> create_parallel_slice() const override
virtual void merge_parallel_slice(DftEventVisitor &slice) override
inline std::size_t events_routed() const
inline std::size_t events_unmatched() const
struct OrganizeVisitorConfig

Public Members

std::vector<PredicateGroup> groups
std::vector<std::shared_ptr<coro::Channel<std::shared_ptr<LineBatch>>>> group_channels
std::size_t source_file_idx = 0
std::size_t batch_size = 1024
struct OriginalFileReconstruction

Public Members

std::string original_path
int num_checkpoints
std::string event_hash
std::map<int, std::vector<ReconstructionSegment>> checkpoint_segments
struct PredicateGroup

Public Members

std::string name
std::string query
struct ProvenanceRecord

Public Members

int source_file_idx
int checkpoint_idx
int output_chunk_idx
int output_line_start
int output_line_end
int event_count
class ProvenanceTracker

Public Functions

ProvenanceTracker() = default
void record(int source_file_idx, int checkpoint_idx, int output_chunk_idx, int output_line_start, int output_line_end, int event_count)
coro::CoroTask<void> flush_to_db(const ExtractionPlan &plan, const std::string &group_name, const std::string &group_query, const std::vector<fileio::ChunkInfo> &chunks, const std::string &output_dir)
inline std::size_t record_count() const
inline const std::vector<ProvenanceRecord> &records() const
struct ReconstructedFileInfo

Public Members

std::string original_path
std::string output_path
std::size_t events_written = 0
std::size_t bytes_written = 0
struct ReconstructionPlan

Public Members

std::map<std::string, OriginalFileReconstruction> files
std::size_t total_segments = 0
std::size_t total_events = 0
struct ReconstructionPlannerInput

Public Members

std::vector<std::string> reorganized_files
std::string index_dir
class ReconstructionPlannerUtility : public dftracer::utils::utilities::Utility<ReconstructionPlannerInput, ReconstructionPlan>

Public Functions

ReconstructionPlannerUtility() = default
coro::CoroTask<ReconstructionPlan> process(const ReconstructionPlannerInput &input) override
struct ReconstructionSegment

Public Members

std::string reorg_file
int output_line_start
int output_line_end
int source_checkpoint
int event_count
struct ReconstructorInput

Public Functions

ReconstructorInput &with_input_dir(std::string dir)
ReconstructorInput &with_output_dir(std::string dir)
ReconstructorInput &with_checkpoint_size(std::size_t sz)
ReconstructorInput &with_parallelism(std::size_t n)
ReconstructorInput &with_compress(bool c)

Public Members

std::string input_dir
std::string output_dir
std::size_t checkpoint_size = constants::indexer::DEFAULT_CHECKPOINT_SIZE
std::size_t parallelism = 0
bool compress = true
struct ReconstructorResult

Public Members

std::vector<ReconstructedFileInfo> files
std::size_t total_events = 0
std::size_t total_bytes = 0
std::size_t total_segments = 0
bool success = false
std::string error_message
class ReconstructorUtility : public dftracer::utils::utilities::Utility<ReconstructorInput, ReconstructorResult, utilities::tags::NeedsContext>

Public Functions

coro::CoroTask<ReconstructorResult> process(const ReconstructorInput &input) override
struct ReorganizationPlannerInput

Public Members

std::vector<std::string> source_files
std::vector<PredicateGroup> groups
std::string index_dir
std::size_t checkpoint_size = 0
class ReorganizationPlannerUtility : public dftracer::utils::utilities::Utility<ReorganizationPlannerInput, ExtractionPlan, utilities::tags::NeedsContext>

Public Functions

ReorganizationPlannerUtility() = default
coro::CoroTask<ExtractionPlan> process(const ReorganizationPlannerInput &input) override
struct SourceFileInfo

Public Members

std::string file_path
std::string index_path
std::size_t num_checkpoints = 0
std::uint64_t uncompressed_size = 0
std::uint64_t checkpoint_size = 0