Generic Composites

Namespace: dftracer::utils::utilities::composites

template<typename T>
struct BatchFileProcessOutput

Aggregated output from processing multiple files.

Public Functions

BatchFileProcessOutput() = default
inline void add(T result)
inline void finalize()

Public Members

std::vector<T> results
std::size_t successful_count = 0
std::size_t failed_count = 0
std::size_t total_items_processed = 0
template<typename ItemInput, typename ItemOutput>
class BatchProcessorUtility : public dftracer::utils::utilities::Utility<std::vector<ItemInput>, std::vector<ItemOutput>, utilities::tags::NeedsContext>

Generic batch processor that processes a list of items in parallel.

Template Parameters:
  • ItemInput – Type of input items

  • ItemOutput – Type of output from processing each item

Public Types

using ItemProcessorFn = std::function<ItemOutput(CoroScope&, const ItemInput&)>
using ComparatorFn = std::function<bool(const ItemOutput&, const ItemOutput&)>

Public Functions

inline explicit BatchProcessorUtility(ItemProcessorFn processor)

Construct batch processor with an item processing function.

Parameters:

processor – Function that processes a single item

~BatchProcessorUtility() override = default
template<typename UtilityType, typename = std::enable_if_t<utilities::detail::has_process_v<UtilityType, ItemInput, ItemOutput>>>
inline explicit BatchProcessorUtility(std::shared_ptr<UtilityType> utility)

Construct batch processor with a parallelizable utility.

This constructor enforces at compile-time that the utility has the tags::Parallelizable tag, ensuring thread-safety.

Template Parameters:

UtilityType – Type of the utility (must have tags::Parallelizable)

Parameters:

utility – Shared pointer to the utility

inline BatchProcessorUtility<ItemInput, ItemOutput> &with_comparator(ComparatorFn comparator)

Set a comparator for sorting results.

Parameters:

comparator – Function to compare two outputs for sorting

Returns:

Reference to this processor for chaining

inline virtual coro::CoroTask<std::vector<ItemOutput>> process(const std::vector<ItemInput> &items) override

Process all items in parallel, optionally sorting results.

Parameters:

items – List of items to process

Returns:

Vector of results (sorted if comparator was set)

template<typename ChunkType, typename MetadataType>
struct ChunkVerificationUtilityInput

Input for chunk verification.

Public Functions

ChunkVerificationUtilityInput() = default
inline ChunkVerificationUtilityInput(std::vector<ChunkType> c, std::vector<MetadataType> m)
inline ChunkVerificationUtilityInput<ChunkType, MetadataType> &with_metadata(std::vector<MetadataType> m)
inline bool operator==(const ChunkVerificationUtilityInput<ChunkType, MetadataType> &other) const

Public Members

std::vector<ChunkType> chunks
std::vector<MetadataType> metadata

Public Static Functions

static inline ChunkVerificationUtilityInput<ChunkType, MetadataType> from_chunks(std::vector<ChunkType> c)
struct ChunkVerificationUtilityOutput

Output from chunk verification.

Public Functions

ChunkVerificationUtilityOutput() = default
inline bool operator==(const ChunkVerificationUtilityOutput &other) const

Public Members

bool passed = false
std::uint64_t input_hash = 0
std::uint64_t output_hash = 0
std::string error_message

Public Static Functions

static inline ChunkVerificationUtilityOutput success(std::uint64_t input_h, std::uint64_t output_h)
static inline ChunkVerificationUtilityOutput failure(std::uint64_t input_h, std::uint64_t output_h, const std::string &error)
template<typename ChunkType, typename MetadataType, typename EventType>
class ChunkVerifierUtility : public dftracer::utils::utilities::Utility<ChunkVerificationUtilityInput<ChunkType, MetadataType>, ChunkVerificationUtilityOutput, utilities::tags::NeedsContext>

Generic chunk verifier that compares input and output events.

Template Parameters:
  • ChunkType – Type of chunks (e.g., ChunkResult)

  • MetadataType – Type of metadata (e.g., FileMetadata)

  • EventType – Type of events (e.g., EventId)

Public Types

using InputHashFn = std::function<std::uint64_t(const std::vector<MetadataType>&)>
using EventCollectorFn = std::function<std::vector<EventType>(CoroScope&, const ChunkType&)>
using EventHashFn = std::function<std::uint64_t(const std::vector<EventType>&)>

Public Functions

inline ChunkVerifierUtility(InputHashFn input_hasher, EventCollectorFn event_collector, EventHashFn event_hasher)

Construct verifier with hash and collection functions.

Parameters:
  • input_hasher – Function to compute hash from metadata

  • event_collector – Function to collect events from chunks

  • event_hasher – Function to compute hash from events

inline coro::CoroTask<ChunkVerificationUtilityOutput> process(const ChunkVerificationUtilityInput<ChunkType, MetadataType> &input) override

Verify that output chunks contain the same events as input.

Parameters:

input – Verification input with chunks and metadata

Returns:

Verification result with pass/fail and hashes

template<typename FileOutput>
class DirectoryFileProcessorUtility : public dftracer::utils::utilities::Utility<DirectoryProcessInput, BatchFileProcessOutput<FileOutput>, utilities::tags::NeedsContext>

Generic workflow for scanning a directory and processing files in parallel.

This workflow utility:

  1. Scans a directory for files matching specified extensions

  2. Processes each file in parallel using CoroScope::emit()

  3. Aggregates results and waits for completion

Template Parameters:

  • FileOutput: Type returned by the file processor function

Usage:

auto processor = [](CoroScope& ctx, const std::string& path) {
    // Process file and return result
    return MyFileOutput{...};
};

DirectoryFileProcessor<MyFileOutput> workflow(processor);
auto output = workflow.process(DirectoryProcessInput{"/path/to/dir"});

Public Types

using FileProcessorFn = std::function<FileOutput(CoroScope&, const std::string&)>

Public Functions

inline explicit DirectoryFileProcessorUtility(FileProcessorFn processor)

Construct processor with a file processing function.

Parameters:

processor – Function that processes a single file

inline coro::CoroTask<BatchFileProcessOutput<FileOutput>> process(const DirectoryProcessInput &input) override

Process all files in a directory in parallel.

Parameters:

input – Directory configuration

Returns:

Aggregated results from all file processing

struct DirectoryProcessInput

Input for processing files in a directory.

Public Functions

DirectoryProcessInput() = default
inline DirectoryProcessInput(std::string path, std::vector<std::string> exts = {".pfw", ".pfw.gz"}, bool recurse = false)
inline DirectoryProcessInput &with_extensions(std::vector<std::string> exts)
inline DirectoryProcessInput &with_recursive(bool rec)
inline bool operator==(const DirectoryProcessInput &other) const
inline bool operator!=(const DirectoryProcessInput &other) const

Public Members

std::string directory_path
bool recursive = false
std::vector<std::string> extensions

Public Static Functions

static inline DirectoryProcessInput from_directory(std::string path)
struct FileCompressionUtilityInput

Input for file compression workflow.

Public Functions

inline FileCompressionUtilityInput &with_output(const std::string &path)

Set output path.

inline FileCompressionUtilityInput &with_compression_level(int level)

Set compression level.

inline FileCompressionUtilityInput &with_chunk_size(std::size_t size)

Set chunk size.

inline FileCompressionUtilityInput &with_format(compression::zlib::CompressionFormat fmt)

Set compression format.

Public Members

std::string input_path
std::string output_path
int compression_level
std::size_t chunk_size
compression::zlib::CompressionFormat format = compression::zlib::CompressionFormat::AUTO

Public Static Functions

static inline FileCompressionUtilityInput from_file(const std::string &input_path, int compression_level = Z_DEFAULT_COMPRESSION, std::size_t chunk_size = 64 * 1024)

Create input with auto-generated output path.

struct FileCompressionUtilityOutput

Output from file compression workflow.

Public Functions

inline double compression_ratio() const

Get compression ratio (compressed / original).

inline double compression_percentage() const

Get compression percentage (how much space saved).

Public Members

std::string input_path
std::string output_path
bool success
std::size_t original_size
std::size_t compressed_size
std::string error_message
class FileCompressorUtility : public dftracer::utils::utilities::Utility<FileCompressionUtilityInput, FileCompressionUtilityOutput, utilities::tags::Parallelizable>

Workflow for compressing files using streaming gzip compression.

This workflow:

  1. Reads input file in chunks using StreamingFileReader

  2. Compresses each chunk using StreamingCompressor

  3. Writes compressed data to .gz file using StreamingFileWriter

Tagged with Parallelizable - safe for parallel batch processing.

Usage:

// Single file compression
auto compressor = std::make_shared<FileCompressor>();
auto input = FileCompressionInput::from_file("large_file.txt")
                 .with_compression_level(9);
auto result = compressor->process(input);

// Parallel batch compression
auto batch_compressor = std::make_shared<
    BatchProcessor<FileCompressionUtilityInput,
FileCompressionUtilityOutput>>( [compressor](const
FileCompressionUtilityInput& input, CoroScope& ctx) { return
compressor->process(input);
        }
);

std::vector<FileCompressionInput> files = { ... };
auto results = batch_compressor->process(files);

Public Functions

FileCompressorUtility() = default
~FileCompressorUtility() override = default
inline coro::CoroTask<FileCompressionUtilityOutput> process(const FileCompressionUtilityInput &input) override

Compress a file using streaming gzip compression.

Parameters:

input – Compression configuration

Returns:

Compression result with statistics

struct FileDecompressionUtilityInput

Input for file decompression workflow.

Public Functions

inline FileDecompressionUtilityInput &with_output(const std::string &path)

Fluent builder: Set output path.

inline FileDecompressionUtilityInput &with_chunk_size(std::size_t size)

Fluent builder: Set chunk size.

inline FileDecompressionUtilityInput &with_format(compression::zlib::DecompressionFormat fmt)

Fluent builder: Set decompression format.

Public Members

std::string input_path
std::string output_path
std::size_t chunk_size
compression::zlib::DecompressionFormat format = compression::zlib::DecompressionFormat::AUTO

Public Static Functions

static inline FileDecompressionUtilityInput from_file(const std::string &input_path, std::size_t chunk_size = 64 * 1024)

Create input with auto-generated output path.

Strips .gz extension from input path to generate output path.

struct FileDecompressionUtilityOutput

Output from file decompression workflow.

Public Functions

inline FileDecompressionUtilityOutput()
inline FileDecompressionUtilityOutput &with_error(const std::string &error)
inline FileDecompressionUtilityOutput &with_success(std::size_t comp_size, std::size_t decomp_size)
inline FileDecompressionUtilityOutput &with_paths(const std::string &in_path, const std::string &out_path)
inline FileDecompressionUtilityOutput &with_sizes(std::size_t comp_size, std::size_t decomp_size)
inline FileDecompressionUtilityOutput &with_success(bool succ)
inline FileDecompressionUtilityOutput &with_error_message(const std::string &error)
inline FileDecompressionUtilityOutput &with_compressed_size(std::size_t size)
inline FileDecompressionUtilityOutput &with_decompressed_size(std::size_t size)
inline FileDecompressionUtilityOutput &with_input_path(const std::string &path)
inline FileDecompressionUtilityOutput &with_output_path(const std::string &path)
inline double original_compression_ratio() const

Get compression ratio of the original file.

Public Members

std::string input_path
std::string output_path
bool success
std::size_t compressed_size
std::size_t decompressed_size
std::string error_message
class FileDecompressorUtility : public dftracer::utils::utilities::Utility<FileDecompressionUtilityInput, FileDecompressionUtilityOutput, utilities::tags::Parallelizable>

Workflow for decompressing gzip files using streaming decompression.

This workflow:

  1. Reads compressed .gz file in chunks using StreamingFileReader

  2. Decompresses each chunk using StreamingDecompressor

  3. Writes decompressed data to output file using StreamingFileWriter

Tagged with Parallelizable - safe for parallel batch processing.

Usage:

// Single file decompression
auto decompressor = std::make_shared<FileDecompressor>();
auto input = FileDecompressionInput::from_file("archive.gz");
auto result = decompressor->process(input);

// Parallel batch decompression
auto batch_decompressor = std::make_shared<
    BatchProcessor<FileDecompressionUtilityInput,
FileDecompressionUtilityOutput>>( [decompressor](const
FileDecompressionUtilityInput& input, CoroScope& ctx) { return
decompressor->process(input);
        }
);

std::vector<FileDecompressionUtilityInput> files = { ... };
auto results = batch_decompressor->process(files);

Public Functions

FileDecompressorUtility() = default
~FileDecompressorUtility() override = default
inline coro::CoroTask<FileDecompressionUtilityOutput> process(const FileDecompressionUtilityInput &input) override

Decompress a gzip file using streaming decompression.

Parameters:

input – Decompression configuration

Returns:

Decompression result with statistics

class FileMergeValidatorUtility : public dftracer::utils::utilities::Utility<FileMergeValidatorUtilityInput, FileMergeValidatorUtilityOutput>

Utility for processing and validating lines from a file for merging.

This utility:

  1. Handles both compressed (.pfw.gz) and plain (.pfw) files

  2. Builds or loads indexes for compressed files using IndexBuilderUtility

  3. Uses LineBatchProcessor to process lines

  4. Validates JSON events and writes to output using StreamingFileWriter

Public Functions

coro::CoroTask<FileMergeValidatorUtilityOutput> process(const FileMergeValidatorUtilityInput &input) override

Public Static Functions

static inline int get_next_counter()
struct FileMergeValidatorUtilityInput

Input for file merge processing.

Public Functions

inline FileMergeValidatorUtilityInput &with_index(const std::string &index_path_value)
inline FileMergeValidatorUtilityInput &with_output(const std::string &out_path)
inline FileMergeValidatorUtilityInput &with_checkpoint_size(std::size_t size)
inline FileMergeValidatorUtilityInput &with_force_rebuild(bool force)

Public Members

std::string file_path
std::string index_path
std::string output_path
std::size_t checkpoint_size = {constants::indexer::DEFAULT_CHECKPOINT_SIZE}
bool force_rebuild = {false}

Public Static Functions

static inline FileMergeValidatorUtilityInput from_file(const std::string &path)
struct FileMergeValidatorUtilityOutput

Result of processing a single file for merging.

Public Members

std::string file_path
std::string output_path
bool success = {false}
std::size_t lines_processed = {0}
std::size_t valid_events = {0}
std::size_t total_lines = {0}
class FileMergerUtility : public dftracer::utils::utilities::Utility<FileMergerUtilityInput, FileMergerUtilityOutput>

Utility to combine temp files into final output.

This utility:

  1. Combines multiple temporary files into a single JSON array

  2. Handles proper JSON formatting

  3. Optionally compresses the output

  4. Cleans up temporary files

Public Functions

coro::CoroTask<FileMergerUtilityOutput> process(const FileMergerUtilityInput &input) override
struct FileMergerUtilityInput

Input for file merger utility.

Public Functions

inline FileMergerUtilityInput &with_output(const std::string &path)
inline FileMergerUtilityInput &with_compression(bool enable)

Public Members

std::vector<FileMergeValidatorUtilityOutput> file_results
std::string output_file
bool compress = {false}

Public Static Functions

static inline FileMergerUtilityInput from_results(const std::vector<FileMergeValidatorUtilityOutput> &results)
struct FileMergerUtilityOutput

Output from file merger utility.

Public Members

bool success = {false}
std::string output_path
std::size_t total_events = {0}
std::size_t files_combined = {0}
std::vector<utilities::composites::dft::EventId> collected_events
struct FileProcessOutput

Output from processing a single file.

Public Functions

FileProcessOutput() = default
inline FileProcessOutput(std::string path, bool succ, std::size_t items = 0, std::string error = "")

Public Members

std::string file_path
bool success = false
std::size_t items_processed = 0
std::string error_message

Public Static Functions

static inline FileProcessOutput success_output(const std::string &path, std::size_t items)
static inline FileProcessOutput error_output(const std::string &path, const std::string &error)
class IndexedFileReaderUtility : public dftracer::utils::utilities::Utility<IndexedReadInput, std::shared_ptr<reader::internal::Reader>>

Workflow utility for managing indexed file reading.

This workflow handles:

  1. Index existence checking

  2. Index building/rebuilding if needed

  3. Reader creation from indexed file

This encapsulates the common pattern from your binaries where you need to ensure an index exists before creating a Reader.

Usage:

IndexedFileReader reader_workflow;
auto reader = reader_workflow.process(
    IndexedReadInput{"file.gz", ".dftindex", checkpoint_size, false}
);
// Now use reader to read lines

Public Functions

inline coro::CoroTask<std::shared_ptr<reader::internal::Reader>> process(const IndexedReadInput &input) override

Process index management and create Reader.

Parameters:

input – Index configuration

Returns:

Shared pointer to Reader ready for use

struct IndexedReadInput

Input for reading an indexed file.

Public Functions

IndexedReadInput() = default
inline IndexedReadInput(std::string fpath, std::string ipath, std::size_t ckpt_size = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE, bool force = false)
inline IndexedReadInput &with_index(std::string idx)
inline IndexedReadInput &with_checkpoint_size(std::size_t size)
inline IndexedReadInput &with_force_rebuild(bool force)

Public Members

std::string file_path
std::string index_path
std::size_t checkpoint_size = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE
bool force_rebuild = false

Public Static Functions

static inline IndexedReadInput from_file(std::string path)
struct LineBatchInput

Input for batch processing lines from a file.

Public Functions

LineBatchInput() = default
inline LineBatchInput(std::string fpath, std::string ipath = "", std::size_t start = 0, std::size_t end = 0)
inline LineBatchInput &with_index(std::string idx)
inline LineBatchInput &with_line_range(std::size_t start, std::size_t end)
inline LineBatchInput &with_checkpoint_size(std::size_t size)

Public Members

std::string file_path
std::string index_path
std::size_t start_line = 0
std::size_t end_line = 0
std::size_t checkpoint_size = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE

Public Static Functions

static inline LineBatchInput from_file(std::string path)
template<typename LineOutput>
class LineBatchProcessorUtility : public dftracer::utils::utilities::Utility<LineBatchProcessUtilityInput, LineBatchProcessUtilityOutput<LineOutput>>

Workflow for processing lines from a file with streaming iteration.

This workflow:

  1. Uses StreamingLineReader for lazy line iteration

  2. Applies a processing function to each line

  3. Collects results (optional - can filter out nullopt)

Template Parameters:

  • LineOutput: Type returned by the line processor function

Usage:

auto processor = [](const Line& line) -> std::optional<MyData> {
    // Process line, return std::nullopt to skip
    if (should_process(line)) {
        return MyData{...};
    }
    return std::nullopt;
};

LineBatchProcessor<MyData> workflow(processor);
auto results = workflow.process(LineBatchInput{"/path/to/file.gz",
"/path/to/.dftindex"});

Public Types

using LineProcessorFn = std::function<std::optional<LineOutput>(const fileio::lines::Line&)>

Public Functions

inline explicit LineBatchProcessorUtility(LineProcessorFn processor)

Construct processor with a line processing function.

Parameters:

processor – Function that processes a single line

inline coro::CoroTask<LineBatchProcessUtilityOutput<LineOutput>> process(const LineBatchProcessUtilityInput &input) override

Process lines from a file using streaming iteration.

Parameters:

input – Line batch configuration

Returns:

Vector of processed line results

template<typename LineOutput>
class SimpleLineBatchProcessorUtility : public dftracer::utils::utilities::Utility<SimpleLineBatchProcessUtilityInput, SimpleLineBatchProcessUtilityOutput<LineOutput>>

Simplified line batch processor that always processes all lines.

Use this when you want to process every line without filtering.

Public Types

using SimpleLineProcessorFn = std::function<LineOutput(const fileio::lines::Line&)>

Public Functions

inline explicit SimpleLineBatchProcessorUtility(SimpleLineProcessorFn processor)
inline coro::CoroTask<SimpleLineBatchProcessUtilityOutput<LineOutput>> process(const SimpleLineBatchProcessUtilityInput &input) override
struct StreamingFileConsumerInput

Input for streaming file consumer.

Public Functions

inline StreamingFileConsumerInput &with_compression(bool enable)

Public Members

std::string output_file
bool compress = {false}

Public Static Functions

static inline StreamingFileConsumerInput with_output(const std::string &path)
struct StreamingFileConsumerOutput

Output from streaming file consumer.

Public Members

bool success = {false}
std::string output_path
std::size_t total_events = {0}
std::size_t output_hash = {0}
class StreamingFileConsumerUtility

Consumer utility that reads raw byte batches from channel and writes to file, wrapping in JSON array delimiters.

Public Functions

inline StreamingFileConsumerUtility(std::shared_ptr<coro::Channel<StreamingMergeBatch>> channel, std::shared_ptr<BufferPool<std::string>> buf_pool)
coro::CoroTask<StreamingFileConsumerOutput> process_async(CoroScope &ctx, const StreamingFileConsumerInput &input)
struct StreamingFileProducerInput

Input for streaming file producer.

Public Functions

inline StreamingFileProducerInput &with_batch_byte_budget(std::size_t bytes)
inline StreamingFileProducerInput &with_verify(bool v)

Public Members

std::string file_path
std::size_t batch_byte_budget = {256 * 1024}
bool verify = {false}

Public Static Functions

static inline StreamingFileProducerInput from_file(const std::string &path)
struct StreamingFileProducerOutput

Output from streaming file producer.

Public Members

std::string file_path
bool success = {false}
std::size_t events_sent = {0}
std::size_t input_hash = {0}
class StreamingFileProducerUtility

Producer utility that decompresses a file and sends raw chunks to channel with array delimiters stripped.

In verify mode, computes per-line byte hash for integrity checking.

Public Functions

inline StreamingFileProducerUtility(std::shared_ptr<coro::Channel<StreamingMergeBatch>> channel, std::shared_ptr<BufferPool<std::string>> buf_pool)
coro::CoroTask<StreamingFileProducerOutput> process_async(CoroScope &ctx, const StreamingFileProducerInput &input)
struct StreamingMergeBatch

Raw byte buffer sent through channel during streaming merge.

Contains pre-stripped chunk data (no array delimiters) ready to write. Each buf holds valid NDJSON lines separated by newlines.

Public Members

std::string buf
std::size_t event_count = {0}
std::size_t batch_hash = {0}
struct ValidatedEvent

Validated JSON event for merging.

Public Members

std::string content
std::size_t line_number