Generic Composites¶
Namespace: dftracer::utils::utilities::composites
-
template<typename T>
struct BatchFileProcessOutput¶ Aggregated output from processing multiple files.
-
template<typename ItemInput, typename ItemOutput>
class BatchProcessorUtility : public dftracer::utils::utilities::Utility<std::vector<ItemInput>, std::vector<ItemOutput>, utilities::tags::NeedsContext>¶ Generic batch processor that processes a list of items in parallel.
- Template Parameters:
ItemInput – Type of input items
ItemOutput – Type of output from processing each item
Public Types
-
using ItemProcessorFn = std::function<ItemOutput(CoroScope&, const ItemInput&)>¶
-
using ComparatorFn = std::function<bool(const ItemOutput&, const ItemOutput&)>¶
Public Functions
-
inline explicit BatchProcessorUtility(ItemProcessorFn processor)¶
Construct batch processor with an item processing function.
- Parameters:
processor – Function that processes a single item
-
~BatchProcessorUtility() override = default¶
Construct batch processor with a parallelizable utility.
This constructor enforces at compile-time that the utility has the tags::Parallelizable tag, ensuring thread-safety.
- Template Parameters:
UtilityType – Type of the utility (must have tags::Parallelizable)
- Parameters:
utility – Shared pointer to the utility
-
inline BatchProcessorUtility<ItemInput, ItemOutput> &with_comparator(ComparatorFn comparator)¶
Set a comparator for sorting results.
- Parameters:
comparator – Function to compare two outputs for sorting
- Returns:
Reference to this processor for chaining
-
inline virtual coro::CoroTask<std::vector<ItemOutput>> process(const std::vector<ItemInput> &items) override¶
Process all items in parallel, optionally sorting results.
- Parameters:
items – List of items to process
- Returns:
Vector of results (sorted if comparator was set)
-
template<typename ChunkType, typename MetadataType>
struct ChunkVerificationUtilityInput¶ Input for chunk verification.
Public Functions
-
ChunkVerificationUtilityInput() = default¶
-
inline ChunkVerificationUtilityInput(std::vector<ChunkType> c, std::vector<MetadataType> m)¶
-
inline ChunkVerificationUtilityInput<ChunkType, MetadataType> &with_metadata(std::vector<MetadataType> m)¶
-
inline bool operator==(const ChunkVerificationUtilityInput<ChunkType, MetadataType> &other) const¶
Public Static Functions
-
static inline ChunkVerificationUtilityInput<ChunkType, MetadataType> from_chunks(std::vector<ChunkType> c)¶
-
ChunkVerificationUtilityInput() = default¶
-
struct ChunkVerificationUtilityOutput¶
Output from chunk verification.
Public Functions
-
ChunkVerificationUtilityOutput() = default¶
-
inline bool operator==(const ChunkVerificationUtilityOutput &other) const¶
Public Members
-
bool passed = false¶
-
std::uint64_t input_hash = 0¶
-
std::uint64_t output_hash = 0¶
-
std::string error_message¶
Public Static Functions
-
static inline ChunkVerificationUtilityOutput success(std::uint64_t input_h, std::uint64_t output_h)¶
-
static inline ChunkVerificationUtilityOutput failure(std::uint64_t input_h, std::uint64_t output_h, const std::string &error)¶
-
ChunkVerificationUtilityOutput() = default¶
-
template<typename ChunkType, typename MetadataType, typename EventType>
class ChunkVerifierUtility : public dftracer::utils::utilities::Utility<ChunkVerificationUtilityInput<ChunkType, MetadataType>, ChunkVerificationUtilityOutput, utilities::tags::NeedsContext>¶ Generic chunk verifier that compares input and output events.
- Template Parameters:
ChunkType – Type of chunks (e.g., ChunkResult)
MetadataType – Type of metadata (e.g., FileMetadata)
EventType – Type of events (e.g., EventId)
Public Types
-
using InputHashFn = std::function<std::uint64_t(const std::vector<MetadataType>&)>¶
Public Functions
-
inline ChunkVerifierUtility(InputHashFn input_hasher, EventCollectorFn event_collector, EventHashFn event_hasher)¶
Construct verifier with hash and collection functions.
- Parameters:
input_hasher – Function to compute hash from metadata
event_collector – Function to collect events from chunks
event_hasher – Function to compute hash from events
-
inline coro::CoroTask<ChunkVerificationUtilityOutput> process(const ChunkVerificationUtilityInput<ChunkType, MetadataType> &input) override¶
Verify that output chunks contain the same events as input.
- Parameters:
input – Verification input with chunks and metadata
- Returns:
Verification result with pass/fail and hashes
-
template<typename FileOutput>
class DirectoryFileProcessorUtility : public dftracer::utils::utilities::Utility<DirectoryProcessInput, BatchFileProcessOutput<FileOutput>, utilities::tags::NeedsContext>¶ Generic workflow for scanning a directory and processing files in parallel.
This workflow utility:
Scans a directory for files matching specified extensions
Processes each file in parallel using CoroScope::emit()
Aggregates results and waits for completion
Template Parameters:
FileOutput: Type returned by the file processor function
Usage:
auto processor = [](CoroScope& ctx, const std::string& path) { // Process file and return result return MyFileOutput{...}; }; DirectoryFileProcessor<MyFileOutput> workflow(processor); auto output = workflow.process(DirectoryProcessInput{"/path/to/dir"});
Public Types
-
using FileProcessorFn = std::function<FileOutput(CoroScope&, const std::string&)>¶
Public Functions
-
inline explicit DirectoryFileProcessorUtility(FileProcessorFn processor)¶
Construct processor with a file processing function.
- Parameters:
processor – Function that processes a single file
-
inline coro::CoroTask<BatchFileProcessOutput<FileOutput>> process(const DirectoryProcessInput &input) override¶
Process all files in a directory in parallel.
- Parameters:
input – Directory configuration
- Returns:
Aggregated results from all file processing
-
struct DirectoryProcessInput¶
Input for processing files in a directory.
Public Functions
-
DirectoryProcessInput() = default¶
-
inline DirectoryProcessInput(std::string path, std::vector<std::string> exts = {".pfw", ".pfw.gz"}, bool recurse = false)¶
-
inline DirectoryProcessInput &with_extensions(std::vector<std::string> exts)¶
-
inline DirectoryProcessInput &with_recursive(bool rec)¶
-
inline bool operator==(const DirectoryProcessInput &other) const¶
-
inline bool operator!=(const DirectoryProcessInput &other) const¶
Public Members
-
std::string directory_path¶
-
bool recursive = false¶
-
std::vector<std::string> extensions¶
Public Static Functions
-
static inline DirectoryProcessInput from_directory(std::string path)¶
-
DirectoryProcessInput() = default¶
-
struct FileCompressionUtilityInput¶
Input for file compression workflow.
Public Functions
-
inline FileCompressionUtilityInput &with_output(const std::string &path)¶
Set output path.
-
inline FileCompressionUtilityInput &with_compression_level(int level)¶
Set compression level.
-
inline FileCompressionUtilityInput &with_chunk_size(std::size_t size)¶
Set chunk size.
-
inline FileCompressionUtilityInput &with_format(compression::zlib::CompressionFormat fmt)¶
Set compression format.
Public Members
-
std::string input_path¶
-
std::string output_path¶
-
int compression_level¶
-
std::size_t chunk_size¶
-
compression::zlib::CompressionFormat format = compression::zlib::CompressionFormat::AUTO¶
Public Static Functions
-
static inline FileCompressionUtilityInput from_file(const std::string &input_path, int compression_level = Z_DEFAULT_COMPRESSION, std::size_t chunk_size = 64 * 1024)¶
Create input with auto-generated output path.
-
inline FileCompressionUtilityInput &with_output(const std::string &path)¶
-
struct FileCompressionUtilityOutput¶
Output from file compression workflow.
-
class FileCompressorUtility : public dftracer::utils::utilities::Utility<FileCompressionUtilityInput, FileCompressionUtilityOutput, utilities::tags::Parallelizable>¶
Workflow for compressing files using streaming gzip compression.
This workflow:
Reads input file in chunks using StreamingFileReader
Compresses each chunk using StreamingCompressor
Writes compressed data to .gz file using StreamingFileWriter
Tagged with Parallelizable - safe for parallel batch processing.
Usage:
// Single file compression auto compressor = std::make_shared<FileCompressor>(); auto input = FileCompressionInput::from_file("large_file.txt") .with_compression_level(9); auto result = compressor->process(input); // Parallel batch compression auto batch_compressor = std::make_shared< BatchProcessor<FileCompressionUtilityInput, FileCompressionUtilityOutput>>( [compressor](const FileCompressionUtilityInput& input, CoroScope& ctx) { return compressor->process(input); } ); std::vector<FileCompressionInput> files = { ... }; auto results = batch_compressor->process(files);
Public Functions
-
FileCompressorUtility() = default¶
-
~FileCompressorUtility() override = default¶
-
inline coro::CoroTask<FileCompressionUtilityOutput> process(const FileCompressionUtilityInput &input) override¶
Compress a file using streaming gzip compression.
- Parameters:
input – Compression configuration
- Returns:
Compression result with statistics
-
struct FileDecompressionUtilityInput¶
Input for file decompression workflow.
Public Functions
-
inline FileDecompressionUtilityInput &with_output(const std::string &path)¶
Fluent builder: Set output path.
-
inline FileDecompressionUtilityInput &with_chunk_size(std::size_t size)¶
Fluent builder: Set chunk size.
-
inline FileDecompressionUtilityInput &with_format(compression::zlib::DecompressionFormat fmt)¶
Fluent builder: Set decompression format.
Public Members
-
std::string input_path¶
-
std::string output_path¶
-
std::size_t chunk_size¶
-
compression::zlib::DecompressionFormat format = compression::zlib::DecompressionFormat::AUTO¶
Public Static Functions
-
static inline FileDecompressionUtilityInput from_file(const std::string &input_path, std::size_t chunk_size = 64 * 1024)¶
Create input with auto-generated output path.
Strips .gz extension from input path to generate output path.
-
inline FileDecompressionUtilityInput &with_output(const std::string &path)¶
-
struct FileDecompressionUtilityOutput¶
Output from file decompression workflow.
Public Functions
-
inline FileDecompressionUtilityOutput()¶
-
inline FileDecompressionUtilityOutput &with_error(const std::string &error)¶
-
inline FileDecompressionUtilityOutput &with_success(std::size_t comp_size, std::size_t decomp_size)¶
-
inline FileDecompressionUtilityOutput &with_paths(const std::string &in_path, const std::string &out_path)¶
-
inline FileDecompressionUtilityOutput &with_sizes(std::size_t comp_size, std::size_t decomp_size)¶
-
inline FileDecompressionUtilityOutput &with_success(bool succ)¶
-
inline FileDecompressionUtilityOutput &with_error_message(const std::string &error)¶
-
inline FileDecompressionUtilityOutput &with_compressed_size(std::size_t size)¶
-
inline FileDecompressionUtilityOutput &with_decompressed_size(std::size_t size)¶
-
inline FileDecompressionUtilityOutput &with_input_path(const std::string &path)¶
-
inline FileDecompressionUtilityOutput &with_output_path(const std::string &path)¶
-
inline double original_compression_ratio() const¶
Get compression ratio of the original file.
-
inline FileDecompressionUtilityOutput()¶
-
class FileDecompressorUtility : public dftracer::utils::utilities::Utility<FileDecompressionUtilityInput, FileDecompressionUtilityOutput, utilities::tags::Parallelizable>¶
Workflow for decompressing gzip files using streaming decompression.
This workflow:
Reads compressed .gz file in chunks using StreamingFileReader
Decompresses each chunk using StreamingDecompressor
Writes decompressed data to output file using StreamingFileWriter
Tagged with Parallelizable - safe for parallel batch processing.
Usage:
// Single file decompression auto decompressor = std::make_shared<FileDecompressor>(); auto input = FileDecompressionInput::from_file("archive.gz"); auto result = decompressor->process(input); // Parallel batch decompression auto batch_decompressor = std::make_shared< BatchProcessor<FileDecompressionUtilityInput, FileDecompressionUtilityOutput>>( [decompressor](const FileDecompressionUtilityInput& input, CoroScope& ctx) { return decompressor->process(input); } ); std::vector<FileDecompressionUtilityInput> files = { ... }; auto results = batch_decompressor->process(files);
Public Functions
-
FileDecompressorUtility() = default¶
-
~FileDecompressorUtility() override = default¶
-
inline coro::CoroTask<FileDecompressionUtilityOutput> process(const FileDecompressionUtilityInput &input) override¶
Decompress a gzip file using streaming decompression.
- Parameters:
input – Decompression configuration
- Returns:
Decompression result with statistics
-
class FileMergeValidatorUtility : public dftracer::utils::utilities::Utility<FileMergeValidatorUtilityInput, FileMergeValidatorUtilityOutput>¶
Utility for processing and validating lines from a file for merging.
This utility:
Handles both compressed (.pfw.gz) and plain (.pfw) files
Builds or loads indexes for compressed files using IndexBuilderUtility
Uses LineBatchProcessor to process lines
Validates JSON events and writes to output using StreamingFileWriter
Public Functions
-
coro::CoroTask<FileMergeValidatorUtilityOutput> process(const FileMergeValidatorUtilityInput &input) override¶
Public Static Functions
-
static inline int get_next_counter()¶
-
struct FileMergeValidatorUtilityInput¶
Input for file merge processing.
Public Functions
-
inline FileMergeValidatorUtilityInput &with_index(const std::string &index_path_value)¶
-
inline FileMergeValidatorUtilityInput &with_output(const std::string &out_path)¶
-
inline FileMergeValidatorUtilityInput &with_checkpoint_size(std::size_t size)¶
-
inline FileMergeValidatorUtilityInput &with_force_rebuild(bool force)¶
Public Members
-
std::string file_path¶
-
std::string index_path¶
-
std::string output_path¶
-
std::size_t checkpoint_size = {constants::indexer::DEFAULT_CHECKPOINT_SIZE}¶
-
bool force_rebuild = {false}¶
Public Static Functions
-
static inline FileMergeValidatorUtilityInput from_file(const std::string &path)¶
-
inline FileMergeValidatorUtilityInput &with_index(const std::string &index_path_value)¶
-
struct FileMergeValidatorUtilityOutput¶
Result of processing a single file for merging.
-
class FileMergerUtility : public dftracer::utils::utilities::Utility<FileMergerUtilityInput, FileMergerUtilityOutput>¶
Utility to combine temp files into final output.
This utility:
Combines multiple temporary files into a single JSON array
Handles proper JSON formatting
Optionally compresses the output
Cleans up temporary files
Public Functions
-
coro::CoroTask<FileMergerUtilityOutput> process(const FileMergerUtilityInput &input) override¶
-
struct FileMergerUtilityInput¶
Input for file merger utility.
Public Functions
-
inline FileMergerUtilityInput &with_output(const std::string &path)¶
-
inline FileMergerUtilityInput &with_compression(bool enable)¶
Public Members
-
std::vector<FileMergeValidatorUtilityOutput> file_results¶
-
std::string output_file¶
-
bool compress = {false}¶
Public Static Functions
-
static inline FileMergerUtilityInput from_results(const std::vector<FileMergeValidatorUtilityOutput> &results)¶
-
inline FileMergerUtilityInput &with_output(const std::string &path)¶
-
struct FileMergerUtilityOutput¶
Output from file merger utility.
-
struct FileProcessOutput¶
Output from processing a single file.
Public Functions
-
FileProcessOutput() = default¶
-
inline FileProcessOutput(std::string path, bool succ, std::size_t items = 0, std::string error = "")¶
Public Members
-
std::string file_path¶
-
bool success = false¶
-
std::size_t items_processed = 0¶
-
std::string error_message¶
Public Static Functions
-
static inline FileProcessOutput success_output(const std::string &path, std::size_t items)¶
-
static inline FileProcessOutput error_output(const std::string &path, const std::string &error)¶
-
FileProcessOutput() = default¶
-
class IndexedFileReaderUtility : public dftracer::utils::utilities::Utility<IndexedReadInput, std::shared_ptr<reader::internal::Reader>>¶
Workflow utility for managing indexed file reading.
This workflow handles:
Index existence checking
Index building/rebuilding if needed
Reader creation from indexed file
This encapsulates the common pattern from your binaries where you need to ensure an index exists before creating a Reader.
Usage:
IndexedFileReader reader_workflow; auto reader = reader_workflow.process( IndexedReadInput{"file.gz", ".dftindex", checkpoint_size, false} ); // Now use reader to read lines
Public Functions
-
inline coro::CoroTask<std::shared_ptr<reader::internal::Reader>> process(const IndexedReadInput &input) override¶
Process index management and create Reader.
- Parameters:
input – Index configuration
- Returns:
Shared pointer to Reader ready for use
-
struct IndexedReadInput¶
Input for reading an indexed file.
Public Functions
-
IndexedReadInput() = default¶
-
inline IndexedReadInput(std::string fpath, std::string ipath, std::size_t ckpt_size = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE, bool force = false)¶
-
inline IndexedReadInput &with_index(std::string idx)¶
-
inline IndexedReadInput &with_checkpoint_size(std::size_t size)¶
-
inline IndexedReadInput &with_force_rebuild(bool force)¶
Public Members
-
std::string file_path¶
-
std::string index_path¶
-
std::size_t checkpoint_size = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE¶
-
bool force_rebuild = false¶
Public Static Functions
-
static inline IndexedReadInput from_file(std::string path)¶
-
IndexedReadInput() = default¶
-
struct LineBatchInput¶
Input for batch processing lines from a file.
Public Functions
-
LineBatchInput() = default¶
-
inline LineBatchInput(std::string fpath, std::string ipath = "", std::size_t start = 0, std::size_t end = 0)¶
-
inline LineBatchInput &with_index(std::string idx)¶
-
inline LineBatchInput &with_line_range(std::size_t start, std::size_t end)¶
-
inline LineBatchInput &with_checkpoint_size(std::size_t size)¶
Public Members
-
std::string file_path¶
-
std::string index_path¶
-
std::size_t start_line = 0¶
-
std::size_t end_line = 0¶
-
std::size_t checkpoint_size = dftracer::utils::utilities::indexer::internal::Indexer::DEFAULT_CHECKPOINT_SIZE¶
Public Static Functions
-
static inline LineBatchInput from_file(std::string path)¶
-
LineBatchInput() = default¶
-
template<typename LineOutput>
class LineBatchProcessorUtility : public dftracer::utils::utilities::Utility<LineBatchProcessUtilityInput, LineBatchProcessUtilityOutput<LineOutput>>¶ Workflow for processing lines from a file with streaming iteration.
This workflow:
Uses StreamingLineReader for lazy line iteration
Applies a processing function to each line
Collects results (optional - can filter out nullopt)
Template Parameters:
LineOutput: Type returned by the line processor function
Usage:
auto processor = [](const Line& line) -> std::optional<MyData> { // Process line, return std::nullopt to skip if (should_process(line)) { return MyData{...}; } return std::nullopt; }; LineBatchProcessor<MyData> workflow(processor); auto results = workflow.process(LineBatchInput{"/path/to/file.gz", "/path/to/.dftindex"});
Public Types
-
using LineProcessorFn = std::function<std::optional<LineOutput>(const fileio::lines::Line&)>¶
Public Functions
-
inline explicit LineBatchProcessorUtility(LineProcessorFn processor)¶
Construct processor with a line processing function.
- Parameters:
processor – Function that processes a single line
-
inline coro::CoroTask<LineBatchProcessUtilityOutput<LineOutput>> process(const LineBatchProcessUtilityInput &input) override¶
Process lines from a file using streaming iteration.
- Parameters:
input – Line batch configuration
- Returns:
Vector of processed line results
-
template<typename LineOutput>
class SimpleLineBatchProcessorUtility : public dftracer::utils::utilities::Utility<SimpleLineBatchProcessUtilityInput, SimpleLineBatchProcessUtilityOutput<LineOutput>>¶ Simplified line batch processor that always processes all lines.
Use this when you want to process every line without filtering.
Public Types
-
using SimpleLineProcessorFn = std::function<LineOutput(const fileio::lines::Line&)>¶
Public Functions
-
inline explicit SimpleLineBatchProcessorUtility(SimpleLineProcessorFn processor)¶
-
inline coro::CoroTask<SimpleLineBatchProcessUtilityOutput<LineOutput>> process(const SimpleLineBatchProcessUtilityInput &input) override¶
-
using SimpleLineProcessorFn = std::function<LineOutput(const fileio::lines::Line&)>¶
-
struct StreamingFileConsumerInput¶
Input for streaming file consumer.
Public Functions
-
inline StreamingFileConsumerInput &with_compression(bool enable)¶
Public Static Functions
-
static inline StreamingFileConsumerInput with_output(const std::string &path)¶
-
inline StreamingFileConsumerInput &with_compression(bool enable)¶
-
struct StreamingFileConsumerOutput¶
Output from streaming file consumer.
-
class StreamingFileConsumerUtility¶
Consumer utility that reads raw byte batches from channel and writes to file, wrapping in JSON array delimiters.
Public Functions
-
coro::CoroTask<StreamingFileConsumerOutput> process_async(CoroScope &ctx, const StreamingFileConsumerInput &input)¶
-
coro::CoroTask<StreamingFileConsumerOutput> process_async(CoroScope &ctx, const StreamingFileConsumerInput &input)¶
-
struct StreamingFileProducerInput¶
Input for streaming file producer.
Public Functions
-
inline StreamingFileProducerInput &with_batch_byte_budget(std::size_t bytes)¶
-
inline StreamingFileProducerInput &with_verify(bool v)¶
Public Members
-
std::string file_path¶
-
std::size_t batch_byte_budget = {256 * 1024}¶
-
bool verify = {false}¶
Public Static Functions
-
static inline StreamingFileProducerInput from_file(const std::string &path)¶
-
inline StreamingFileProducerInput &with_batch_byte_budget(std::size_t bytes)¶
-
struct StreamingFileProducerOutput¶
Output from streaming file producer.
-
class StreamingFileProducerUtility¶
Producer utility that decompresses a file and sends raw chunks to channel with array delimiters stripped.
In verify mode, computes per-line byte hash for integrity checking.
Public Functions
-
coro::CoroTask<StreamingFileProducerOutput> process_async(CoroScope &ctx, const StreamingFileProducerInput &input)¶
-
coro::CoroTask<StreamingFileProducerOutput> process_async(CoroScope &ctx, const StreamingFileProducerInput &input)¶
-
struct StreamingMergeBatch¶
Raw byte buffer sent through channel during streaming merge.
Contains pre-stripped chunk data (no array delimiters) ready to write. Each buf holds valid NDJSON lines separated by newlines.
-
struct ValidatedEvent¶
Validated JSON event for merging.